dynomite/msg/queue.rs
1//! Owning queue of [`Msg`] values.
2//!
3//! The reference engine threads messages through `TAILQ` lists named
4//! `msg_tqh`. The Rust port replaces every such list with a
5//! [`MsgQueue`] backed by a [`VecDeque`]. The owning relationship is
6//! the same: pushing a [`Msg`] into a queue transfers ownership;
7//! popping one transfers it back to the caller.
8//!
9//! [`Msg`]: super::message::Msg
10
11use std::collections::VecDeque;
12
13use crate::core::types::MsgId;
14
15use super::message::Msg;
16
17/// Ordered queue of owned messages.
18#[derive(Debug, Default)]
19pub struct MsgQueue {
20 inner: VecDeque<Msg>,
21}
22
23impl<'a> IntoIterator for &'a MsgQueue {
24 type Item = &'a Msg;
25 type IntoIter = std::collections::vec_deque::Iter<'a, Msg>;
26
27 fn into_iter(self) -> Self::IntoIter {
28 self.inner.iter()
29 }
30}
31
32impl MsgQueue {
33 /// Build an empty queue.
34 ///
35 /// # Examples
36 ///
37 /// ```
38 /// use dynomite::msg::MsgQueue;
39 /// let q = MsgQueue::new();
40 /// assert!(q.is_empty());
41 /// ```
42 #[must_use]
43 pub fn new() -> Self {
44 Self {
45 inner: VecDeque::new(),
46 }
47 }
48
49 /// Build an empty queue with capacity for at least `n` messages.
50 ///
51 /// # Examples
52 ///
53 /// ```
54 /// use dynomite::msg::MsgQueue;
55 /// let q = MsgQueue::with_capacity(8);
56 /// assert!(q.is_empty());
57 /// ```
58 #[must_use]
59 pub fn with_capacity(n: usize) -> Self {
60 Self {
61 inner: VecDeque::with_capacity(n),
62 }
63 }
64
65 /// Append `msg` to the tail of the queue.
66 ///
67 /// # Examples
68 ///
69 /// ```
70 /// use dynomite::msg::{Msg, MsgQueue, MsgType};
71 ///
72 /// let mut q = MsgQueue::new();
73 /// q.push_back(Msg::new(7, MsgType::ReqRedisGet, true));
74 /// assert_eq!(q.len(), 1);
75 /// ```
76 pub fn push_back(&mut self, msg: Msg) {
77 self.inner.push_back(msg);
78 }
79
80 /// Push `msg` to the head of the queue.
81 ///
82 /// # Examples
83 ///
84 /// ```
85 /// use dynomite::msg::{Msg, MsgQueue, MsgType};
86 ///
87 /// let mut q = MsgQueue::new();
88 /// q.push_front(Msg::new(1, MsgType::ReqMcGet, true));
89 /// assert_eq!(q.len(), 1);
90 /// ```
91 pub fn push_front(&mut self, msg: Msg) {
92 self.inner.push_front(msg);
93 }
94
95 /// Remove and return the head of the queue.
96 ///
97 /// # Examples
98 ///
99 /// ```
100 /// use dynomite::msg::{Msg, MsgQueue, MsgType};
101 ///
102 /// let mut q = MsgQueue::new();
103 /// q.push_back(Msg::new(1, MsgType::ReqMcGet, true));
104 /// assert!(q.pop_front().is_some());
105 /// assert!(q.pop_front().is_none());
106 /// ```
107 pub fn pop_front(&mut self) -> Option<Msg> {
108 self.inner.pop_front()
109 }
110
111 /// Remove and return the tail of the queue.
112 ///
113 /// # Examples
114 ///
115 /// ```
116 /// use dynomite::msg::{Msg, MsgQueue, MsgType};
117 ///
118 /// let mut q = MsgQueue::new();
119 /// q.push_back(Msg::new(1, MsgType::ReqMcGet, true));
120 /// q.push_back(Msg::new(2, MsgType::ReqMcGet, true));
121 /// assert_eq!(q.pop_back().unwrap().id(), 2);
122 /// ```
123 pub fn pop_back(&mut self) -> Option<Msg> {
124 self.inner.pop_back()
125 }
126
127 /// Number of messages currently in the queue.
128 ///
129 /// # Examples
130 ///
131 /// ```
132 /// use dynomite::msg::MsgQueue;
133 /// assert_eq!(MsgQueue::new().len(), 0);
134 /// ```
135 #[must_use]
136 pub fn len(&self) -> usize {
137 self.inner.len()
138 }
139
140 /// True when the queue is empty.
141 ///
142 /// # Examples
143 ///
144 /// ```
145 /// use dynomite::msg::MsgQueue;
146 /// assert!(MsgQueue::new().is_empty());
147 /// ```
148 #[must_use]
149 pub fn is_empty(&self) -> bool {
150 self.inner.is_empty()
151 }
152
153 /// Borrow the front message without removing it.
154 ///
155 /// # Examples
156 ///
157 /// ```
158 /// use dynomite::msg::{Msg, MsgQueue, MsgType};
159 ///
160 /// let mut q = MsgQueue::new();
161 /// q.push_back(Msg::new(42, MsgType::ReqRedisGet, true));
162 /// assert_eq!(q.front().unwrap().id(), 42);
163 /// ```
164 #[must_use]
165 pub fn front(&self) -> Option<&Msg> {
166 self.inner.front()
167 }
168
169 /// Mutably borrow the front message.
170 ///
171 /// # Examples
172 ///
173 /// ```
174 /// use dynomite::msg::{Msg, MsgQueue, MsgType};
175 ///
176 /// let mut q = MsgQueue::new();
177 /// q.push_back(Msg::new(1, MsgType::ReqRedisGet, true));
178 /// q.front_mut().unwrap().set_swallow(true);
179 /// ```
180 pub fn front_mut(&mut self) -> Option<&mut Msg> {
181 self.inner.front_mut()
182 }
183
184 /// Iterate over the queue in front-to-back order.
185 ///
186 /// # Examples
187 ///
188 /// ```
189 /// use dynomite::msg::{Msg, MsgQueue, MsgType};
190 ///
191 /// let mut q = MsgQueue::new();
192 /// q.push_back(Msg::new(1, MsgType::ReqRedisGet, true));
193 /// q.push_back(Msg::new(2, MsgType::ReqRedisSet, true));
194 /// let ids: Vec<u64> = q.iter().map(|m| m.id()).collect();
195 /// assert_eq!(ids, vec![1, 2]);
196 /// ```
197 pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, Msg> {
198 self.inner.iter()
199 }
200
201 /// Find a message by id and return a shared reference.
202 ///
203 /// This is the queue-side counterpart to the C
204 /// `dict_msg_id`-shaped lookup that traverses the outstanding-msg
205 /// list. Use [`crate::msg::index::MsgIndex`] when an `O(1)`
206 /// lookup is required across many queues.
207 ///
208 /// # Examples
209 ///
210 /// ```
211 /// use dynomite::msg::{Msg, MsgQueue, MsgType};
212 ///
213 /// let mut q = MsgQueue::new();
214 /// q.push_back(Msg::new(99, MsgType::ReqRedisGet, true));
215 /// assert!(q.msg_get_id_lookup(99).is_some());
216 /// assert!(q.msg_get_id_lookup(1).is_none());
217 /// ```
218 #[must_use]
219 pub fn msg_get_id_lookup(&self, id: MsgId) -> Option<&Msg> {
220 self.inner.iter().find(|m| m.id() == id)
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use crate::msg::{Msg, MsgType};
228
229 #[test]
230 fn fifo_order() {
231 let mut q = MsgQueue::new();
232 for i in 1..=3 {
233 q.push_back(Msg::new(i, MsgType::ReqRedisGet, true));
234 }
235 assert_eq!(q.pop_front().unwrap().id(), 1);
236 assert_eq!(q.pop_front().unwrap().id(), 2);
237 assert_eq!(q.pop_front().unwrap().id(), 3);
238 assert!(q.is_empty());
239 }
240
241 #[test]
242 fn lookup_finds_by_id() {
243 let mut q = MsgQueue::new();
244 q.push_back(Msg::new(11, MsgType::ReqRedisGet, true));
245 q.push_back(Msg::new(22, MsgType::ReqRedisSet, true));
246 assert_eq!(q.msg_get_id_lookup(22).unwrap().id(), 22);
247 assert!(q.msg_get_id_lookup(33).is_none());
248 }
249}