Skip to main content

atomr_core/dispatch/
message_queues.rs

1//! Message queue implementations.
2//!
3//! These are in-memory data structures used by the mailbox. They are
4//! `!Send` outside their owning `ActorCell` — all external sending goes
5//! through the typed channel held in [`crate::actor::ActorRef`].
6
7use std::cmp::Ordering;
8use std::collections::{BinaryHeap, VecDeque};
9
10use crate::dispatch::mailbox::OverflowStrategy;
11use crate::util::BoundedQueue;
12
13/// Envelope trait used by priority queues.
14pub trait Prioritized {
15    fn priority(&self) -> i32;
16}
17
18/// Unbounded FIFO queue.
19#[derive(Debug, Default)]
20pub struct UnboundedQueue<T> {
21    inner: VecDeque<T>,
22}
23
24impl<T> UnboundedQueue<T> {
25    pub fn new() -> Self {
26        Self { inner: VecDeque::new() }
27    }
28
29    pub fn push(&mut self, msg: T) {
30        self.inner.push_back(msg);
31    }
32
33    pub fn pop(&mut self) -> Option<T> {
34        self.inner.pop_front()
35    }
36
37    pub fn len(&self) -> usize {
38        self.inner.len()
39    }
40
41    pub fn is_empty(&self) -> bool {
42        self.inner.is_empty()
43    }
44}
45
46/// Outcome of a bounded `push` once an [`OverflowStrategy`] has been
47/// applied. `Accepted` means the message was enqueued (possibly after
48/// dropping another); `Rejected(msg)` means the configured strategy
49/// refused the push and returns the original message.
50#[derive(Debug, PartialEq, Eq)]
51pub enum PushOutcome<T> {
52    Accepted,
53    Dropped { dropped: T },
54    Rejected(T),
55}
56
57/// Bounded FIFO queue.
58#[derive(Debug)]
59pub struct BoundedMsgQueue<T> {
60    inner: BoundedQueue<T>,
61    overflow: OverflowStrategy,
62}
63
64impl<T> BoundedMsgQueue<T> {
65    pub fn new(capacity: usize) -> Self {
66        Self::with_overflow(capacity, OverflowStrategy::Fail)
67    }
68
69    pub fn with_overflow(capacity: usize, overflow: OverflowStrategy) -> Self {
70        Self { inner: BoundedQueue::new(capacity), overflow }
71    }
72
73    /// Legacy `push` that mirrors the original signature: returns the
74    /// original message if the queue is full. Equivalent to using
75    /// [`OverflowStrategy::Fail`].
76    pub fn push(&mut self, msg: T) -> Result<(), T> {
77        match self.push_with_strategy(msg) {
78            PushOutcome::Accepted => Ok(()),
79            PushOutcome::Dropped { dropped } => Err(dropped),
80            PushOutcome::Rejected(msg) => Err(msg),
81        }
82    }
83
84    /// Push with the configured overflow strategy applied. Returns
85    /// [`PushOutcome::Accepted`] if the message was enqueued (possibly
86    /// after dropping another), [`PushOutcome::Dropped`] giving back the
87    /// dropped message when DropHead/DropTail kicked in, or
88    /// [`PushOutcome::Rejected`] when DropNew/Fail refused the push.
89    pub fn push_with_strategy(&mut self, msg: T) -> PushOutcome<T> {
90        if !self.inner.is_full() {
91            return match self.inner.push(msg) {
92                Ok(()) => PushOutcome::Accepted,
93                Err(m) => PushOutcome::Rejected(m),
94            };
95        }
96        match self.overflow {
97            OverflowStrategy::Fail | OverflowStrategy::DropNew => PushOutcome::Rejected(msg),
98            OverflowStrategy::DropHead => match self.inner.pop() {
99                Some(dropped) => match self.inner.push(msg) {
100                    Ok(()) => PushOutcome::Dropped { dropped },
101                    Err(m) => PushOutcome::Rejected(m),
102                },
103                None => PushOutcome::Rejected(msg),
104            },
105            OverflowStrategy::DropTail => match self.inner.pop_back() {
106                Some(dropped) => match self.inner.push(msg) {
107                    Ok(()) => PushOutcome::Dropped { dropped },
108                    Err(m) => PushOutcome::Rejected(m),
109                },
110                None => PushOutcome::Rejected(msg),
111            },
112        }
113    }
114
115    pub fn pop(&mut self) -> Option<T> {
116        self.inner.pop()
117    }
118
119    pub fn is_full(&self) -> bool {
120        self.inner.is_full()
121    }
122
123    pub fn overflow(&self) -> OverflowStrategy {
124        self.overflow
125    }
126}
127
128/// Control-aware queue. Control messages are drained before user
129/// messages regardless of insertion order.
130/// `UnboundedControlAwareMessageQueue`. Use the typed wrapper
131/// [`ControlAware::Control`] / [`ControlAware::User`] to tag a message.
132#[derive(Debug)]
133pub enum ControlAware<T> {
134    Control(T),
135    User(T),
136}
137
138#[derive(Debug, Default)]
139pub struct ControlAwareQueue<T> {
140    control: VecDeque<T>,
141    user: VecDeque<T>,
142}
143
144impl<T> ControlAwareQueue<T> {
145    pub fn new() -> Self {
146        Self { control: VecDeque::new(), user: VecDeque::new() }
147    }
148
149    pub fn push(&mut self, msg: ControlAware<T>) {
150        match msg {
151            ControlAware::Control(m) => self.control.push_back(m),
152            ControlAware::User(m) => self.user.push_back(m),
153        }
154    }
155
156    pub fn pop(&mut self) -> Option<T> {
157        self.control.pop_front().or_else(|| self.user.pop_front())
158    }
159
160    pub fn len(&self) -> usize {
161        self.control.len() + self.user.len()
162    }
163
164    pub fn is_empty(&self) -> bool {
165        self.control.is_empty() && self.user.is_empty()
166    }
167}
168
169/// Deque-like queue permitting front insertion (for stash/unstash).
170#[derive(Debug)]
171pub struct DequeQueue<T> {
172    inner: VecDeque<T>,
173}
174
175impl<T> Default for DequeQueue<T> {
176    fn default() -> Self {
177        Self { inner: VecDeque::new() }
178    }
179}
180
181impl<T> DequeQueue<T> {
182    pub fn new() -> Self {
183        Self::default()
184    }
185
186    pub fn push_back(&mut self, msg: T) {
187        self.inner.push_back(msg);
188    }
189
190    pub fn push_front(&mut self, msg: T) {
191        self.inner.push_front(msg);
192    }
193
194    pub fn pop(&mut self) -> Option<T> {
195        self.inner.pop_front()
196    }
197}
198
199/// Priority queue.
200///
201/// `T` must implement [`Prioritized`].
202pub struct PriorityQueue<T: Prioritized> {
203    heap: BinaryHeap<PriItem<T>>,
204}
205
206impl<T: Prioritized> std::fmt::Debug for PriorityQueue<T> {
207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208        f.debug_struct("PriorityQueue").field("len", &self.heap.len()).finish()
209    }
210}
211
212impl<T: Prioritized> Default for PriorityQueue<T> {
213    fn default() -> Self {
214        Self { heap: BinaryHeap::new() }
215    }
216}
217
218impl<T: Prioritized> PriorityQueue<T> {
219    pub fn new() -> Self {
220        Self::default()
221    }
222
223    pub fn push(&mut self, msg: T) {
224        let p = msg.priority();
225        self.heap.push(PriItem { prio: p, inner: msg });
226    }
227
228    pub fn pop(&mut self) -> Option<T> {
229        self.heap.pop().map(|i| i.inner)
230    }
231}
232
233struct PriItem<T: Prioritized> {
234    prio: i32,
235    inner: T,
236}
237
238impl<T: Prioritized> PartialEq for PriItem<T> {
239    fn eq(&self, other: &Self) -> bool {
240        self.prio == other.prio
241    }
242}
243impl<T: Prioritized> Eq for PriItem<T> {}
244impl<T: Prioritized> PartialOrd for PriItem<T> {
245    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
246        Some(self.cmp(other))
247    }
248}
249impl<T: Prioritized> Ord for PriItem<T> {
250    fn cmp(&self, other: &Self) -> Ordering {
251        self.prio.cmp(&other.prio)
252    }
253}
254
255/// Stable priority queue (FIFO among equal priorities).
256pub struct StablePriorityQueue<T: Prioritized> {
257    heap: BinaryHeap<StableItem<T>>,
258    seq: u64,
259}
260
261impl<T: Prioritized> std::fmt::Debug for StablePriorityQueue<T> {
262    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263        f.debug_struct("StablePriorityQueue").field("len", &self.heap.len()).finish()
264    }
265}
266
267impl<T: Prioritized> Default for StablePriorityQueue<T> {
268    fn default() -> Self {
269        Self { heap: BinaryHeap::new(), seq: 0 }
270    }
271}
272
273impl<T: Prioritized> StablePriorityQueue<T> {
274    pub fn new() -> Self {
275        Self::default()
276    }
277
278    pub fn push(&mut self, msg: T) {
279        let p = msg.priority();
280        let s = self.seq;
281        self.seq = self.seq.wrapping_add(1);
282        self.heap.push(StableItem { prio: p, seq: s, inner: msg });
283    }
284
285    pub fn pop(&mut self) -> Option<T> {
286        self.heap.pop().map(|i| i.inner)
287    }
288}
289
290struct StableItem<T: Prioritized> {
291    prio: i32,
292    seq: u64,
293    inner: T,
294}
295
296impl<T: Prioritized> PartialEq for StableItem<T> {
297    fn eq(&self, other: &Self) -> bool {
298        self.prio == other.prio && self.seq == other.seq
299    }
300}
301impl<T: Prioritized> Eq for StableItem<T> {}
302impl<T: Prioritized> PartialOrd for StableItem<T> {
303    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
304        Some(self.cmp(other))
305    }
306}
307impl<T: Prioritized> Ord for StableItem<T> {
308    fn cmp(&self, other: &Self) -> Ordering {
309        self.prio.cmp(&other.prio).then_with(|| other.seq.cmp(&self.seq))
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316
317    #[derive(Debug, PartialEq)]
318    struct M(i32);
319    impl Prioritized for M {
320        fn priority(&self) -> i32 {
321            self.0
322        }
323    }
324
325    #[test]
326    fn unbounded_fifo() {
327        let mut q = UnboundedQueue::new();
328        q.push(1);
329        q.push(2);
330        assert_eq!(q.pop(), Some(1));
331        assert_eq!(q.pop(), Some(2));
332    }
333
334    #[test]
335    fn bounded_rejects_when_full() {
336        let mut q = BoundedMsgQueue::new(1);
337        q.push(1).unwrap();
338        assert!(q.push(2).is_err());
339    }
340
341    #[test]
342    fn bounded_drop_head_removes_oldest() {
343        let mut q = BoundedMsgQueue::with_overflow(2, OverflowStrategy::DropHead);
344        assert_eq!(q.push_with_strategy(1), PushOutcome::Accepted);
345        assert_eq!(q.push_with_strategy(2), PushOutcome::Accepted);
346        assert_eq!(q.push_with_strategy(3), PushOutcome::Dropped { dropped: 1 });
347        assert_eq!(q.pop(), Some(2));
348        assert_eq!(q.pop(), Some(3));
349    }
350
351    #[test]
352    fn bounded_drop_tail_removes_newest() {
353        let mut q = BoundedMsgQueue::with_overflow(2, OverflowStrategy::DropTail);
354        q.push_with_strategy(1);
355        q.push_with_strategy(2);
356        assert_eq!(q.push_with_strategy(3), PushOutcome::Dropped { dropped: 2 });
357        assert_eq!(q.pop(), Some(1));
358        assert_eq!(q.pop(), Some(3));
359    }
360
361    #[test]
362    fn bounded_drop_new_rejects_incoming() {
363        let mut q = BoundedMsgQueue::with_overflow(1, OverflowStrategy::DropNew);
364        q.push_with_strategy(1);
365        assert_eq!(q.push_with_strategy(2), PushOutcome::Rejected(2));
366        assert_eq!(q.pop(), Some(1));
367    }
368
369    #[test]
370    fn bounded_fail_rejects_incoming() {
371        let mut q = BoundedMsgQueue::with_overflow(1, OverflowStrategy::Fail);
372        q.push_with_strategy(1);
373        assert_eq!(q.push_with_strategy(2), PushOutcome::Rejected(2));
374    }
375
376    #[test]
377    fn control_aware_drains_control_first() {
378        let mut q = ControlAwareQueue::new();
379        q.push(ControlAware::User(1));
380        q.push(ControlAware::User(2));
381        q.push(ControlAware::Control(99));
382        assert_eq!(q.pop(), Some(99));
383        assert_eq!(q.pop(), Some(1));
384        assert_eq!(q.pop(), Some(2));
385        assert!(q.is_empty());
386    }
387
388    #[test]
389    fn control_aware_preserves_within_class_fifo() {
390        let mut q = ControlAwareQueue::new();
391        q.push(ControlAware::Control(1));
392        q.push(ControlAware::Control(2));
393        q.push(ControlAware::User(10));
394        q.push(ControlAware::User(11));
395        assert_eq!(q.pop(), Some(1));
396        assert_eq!(q.pop(), Some(2));
397        assert_eq!(q.pop(), Some(10));
398        assert_eq!(q.pop(), Some(11));
399    }
400
401    #[test]
402    fn priority_highest_first() {
403        let mut q = PriorityQueue::new();
404        q.push(M(1));
405        q.push(M(5));
406        q.push(M(3));
407        assert_eq!(q.pop().unwrap().0, 5);
408        assert_eq!(q.pop().unwrap().0, 3);
409    }
410
411    #[test]
412    fn stable_priority_preserves_fifo_for_ties() {
413        let mut q = StablePriorityQueue::new();
414        q.push(M(1));
415        q.push(M(2));
416        q.push(M(1));
417        assert_eq!(q.pop().unwrap().0, 2);
418        // both remaining priorities are 1 — FIFO
419        assert_eq!(q.pop().unwrap().0, 1);
420        assert_eq!(q.pop().unwrap().0, 1);
421    }
422}