Skip to main content

dynomite/msg/
queue.rs

1//! Owning queue of [`Msg`] values.
2//!
3//! The reference engine threads messages through `TAILQ` lists named
4//! `msg_tqh`. The Rust port replaces every such list with a
5//! [`MsgQueue`] backed by a [`VecDeque`]. The owning relationship is
6//! the same: pushing a [`Msg`] into a queue transfers ownership;
7//! popping one transfers it back to the caller.
8//!
9//! [`Msg`]: super::message::Msg
10
11use std::collections::VecDeque;
12
13use crate::core::types::MsgId;
14
15use super::message::Msg;
16
17/// Ordered queue of owned messages.
18#[derive(Debug, Default)]
19pub struct MsgQueue {
20    inner: VecDeque<Msg>,
21}
22
23impl<'a> IntoIterator for &'a MsgQueue {
24    type Item = &'a Msg;
25    type IntoIter = std::collections::vec_deque::Iter<'a, Msg>;
26
27    fn into_iter(self) -> Self::IntoIter {
28        self.inner.iter()
29    }
30}
31
32impl MsgQueue {
33    /// Build an empty queue.
34    ///
35    /// # Examples
36    ///
37    /// ```
38    /// use dynomite::msg::MsgQueue;
39    /// let q = MsgQueue::new();
40    /// assert!(q.is_empty());
41    /// ```
42    #[must_use]
43    pub fn new() -> Self {
44        Self {
45            inner: VecDeque::new(),
46        }
47    }
48
49    /// Build an empty queue with capacity for at least `n` messages.
50    ///
51    /// # Examples
52    ///
53    /// ```
54    /// use dynomite::msg::MsgQueue;
55    /// let q = MsgQueue::with_capacity(8);
56    /// assert!(q.is_empty());
57    /// ```
58    #[must_use]
59    pub fn with_capacity(n: usize) -> Self {
60        Self {
61            inner: VecDeque::with_capacity(n),
62        }
63    }
64
65    /// Append `msg` to the tail of the queue.
66    ///
67    /// # Examples
68    ///
69    /// ```
70    /// use dynomite::msg::{Msg, MsgQueue, MsgType};
71    ///
72    /// let mut q = MsgQueue::new();
73    /// q.push_back(Msg::new(7, MsgType::ReqRedisGet, true));
74    /// assert_eq!(q.len(), 1);
75    /// ```
76    pub fn push_back(&mut self, msg: Msg) {
77        self.inner.push_back(msg);
78    }
79
80    /// Push `msg` to the head of the queue.
81    ///
82    /// # Examples
83    ///
84    /// ```
85    /// use dynomite::msg::{Msg, MsgQueue, MsgType};
86    ///
87    /// let mut q = MsgQueue::new();
88    /// q.push_front(Msg::new(1, MsgType::ReqMcGet, true));
89    /// assert_eq!(q.len(), 1);
90    /// ```
91    pub fn push_front(&mut self, msg: Msg) {
92        self.inner.push_front(msg);
93    }
94
95    /// Remove and return the head of the queue.
96    ///
97    /// # Examples
98    ///
99    /// ```
100    /// use dynomite::msg::{Msg, MsgQueue, MsgType};
101    ///
102    /// let mut q = MsgQueue::new();
103    /// q.push_back(Msg::new(1, MsgType::ReqMcGet, true));
104    /// assert!(q.pop_front().is_some());
105    /// assert!(q.pop_front().is_none());
106    /// ```
107    pub fn pop_front(&mut self) -> Option<Msg> {
108        self.inner.pop_front()
109    }
110
111    /// Remove and return the tail of the queue.
112    ///
113    /// # Examples
114    ///
115    /// ```
116    /// use dynomite::msg::{Msg, MsgQueue, MsgType};
117    ///
118    /// let mut q = MsgQueue::new();
119    /// q.push_back(Msg::new(1, MsgType::ReqMcGet, true));
120    /// q.push_back(Msg::new(2, MsgType::ReqMcGet, true));
121    /// assert_eq!(q.pop_back().unwrap().id(), 2);
122    /// ```
123    pub fn pop_back(&mut self) -> Option<Msg> {
124        self.inner.pop_back()
125    }
126
127    /// Number of messages currently in the queue.
128    ///
129    /// # Examples
130    ///
131    /// ```
132    /// use dynomite::msg::MsgQueue;
133    /// assert_eq!(MsgQueue::new().len(), 0);
134    /// ```
135    #[must_use]
136    pub fn len(&self) -> usize {
137        self.inner.len()
138    }
139
140    /// True when the queue is empty.
141    ///
142    /// # Examples
143    ///
144    /// ```
145    /// use dynomite::msg::MsgQueue;
146    /// assert!(MsgQueue::new().is_empty());
147    /// ```
148    #[must_use]
149    pub fn is_empty(&self) -> bool {
150        self.inner.is_empty()
151    }
152
153    /// Borrow the front message without removing it.
154    ///
155    /// # Examples
156    ///
157    /// ```
158    /// use dynomite::msg::{Msg, MsgQueue, MsgType};
159    ///
160    /// let mut q = MsgQueue::new();
161    /// q.push_back(Msg::new(42, MsgType::ReqRedisGet, true));
162    /// assert_eq!(q.front().unwrap().id(), 42);
163    /// ```
164    #[must_use]
165    pub fn front(&self) -> Option<&Msg> {
166        self.inner.front()
167    }
168
169    /// Mutably borrow the front message.
170    ///
171    /// # Examples
172    ///
173    /// ```
174    /// use dynomite::msg::{Msg, MsgQueue, MsgType};
175    ///
176    /// let mut q = MsgQueue::new();
177    /// q.push_back(Msg::new(1, MsgType::ReqRedisGet, true));
178    /// q.front_mut().unwrap().set_swallow(true);
179    /// ```
180    pub fn front_mut(&mut self) -> Option<&mut Msg> {
181        self.inner.front_mut()
182    }
183
184    /// Iterate over the queue in front-to-back order.
185    ///
186    /// # Examples
187    ///
188    /// ```
189    /// use dynomite::msg::{Msg, MsgQueue, MsgType};
190    ///
191    /// let mut q = MsgQueue::new();
192    /// q.push_back(Msg::new(1, MsgType::ReqRedisGet, true));
193    /// q.push_back(Msg::new(2, MsgType::ReqRedisSet, true));
194    /// let ids: Vec<u64> = q.iter().map(|m| m.id()).collect();
195    /// assert_eq!(ids, vec![1, 2]);
196    /// ```
197    pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, Msg> {
198        self.inner.iter()
199    }
200
201    /// Find a message by id and return a shared reference.
202    ///
203    /// This is the queue-side counterpart to the C
204    /// `dict_msg_id`-shaped lookup that traverses the outstanding-msg
205    /// list. Use [`crate::msg::index::MsgIndex`] when an `O(1)`
206    /// lookup is required across many queues.
207    ///
208    /// # Examples
209    ///
210    /// ```
211    /// use dynomite::msg::{Msg, MsgQueue, MsgType};
212    ///
213    /// let mut q = MsgQueue::new();
214    /// q.push_back(Msg::new(99, MsgType::ReqRedisGet, true));
215    /// assert!(q.msg_get_id_lookup(99).is_some());
216    /// assert!(q.msg_get_id_lookup(1).is_none());
217    /// ```
218    #[must_use]
219    pub fn msg_get_id_lookup(&self, id: MsgId) -> Option<&Msg> {
220        self.inner.iter().find(|m| m.id() == id)
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use crate::msg::{Msg, MsgType};
228
229    #[test]
230    fn fifo_order() {
231        let mut q = MsgQueue::new();
232        for i in 1..=3 {
233            q.push_back(Msg::new(i, MsgType::ReqRedisGet, true));
234        }
235        assert_eq!(q.pop_front().unwrap().id(), 1);
236        assert_eq!(q.pop_front().unwrap().id(), 2);
237        assert_eq!(q.pop_front().unwrap().id(), 3);
238        assert!(q.is_empty());
239    }
240
241    #[test]
242    fn lookup_finds_by_id() {
243        let mut q = MsgQueue::new();
244        q.push_back(Msg::new(11, MsgType::ReqRedisGet, true));
245        q.push_back(Msg::new(22, MsgType::ReqRedisSet, true));
246        assert_eq!(q.msg_get_id_lookup(22).unwrap().id(), 22);
247        assert!(q.msg_get_id_lookup(33).is_none());
248    }
249}