Skip to main content

atomr_core/dispatch/
message_queues.rs

1//! Message queue implementations. akka.net: `Dispatch/MessageQueues`.
2//!
3//! These are in-memory data structures used by the mailbox. They are
4//! `!Send` outside their owning `ActorCell` — all external sending goes
5//! through the typed channel held in [`crate::actor::ActorRef`].
6
7use std::cmp::Ordering;
8use std::collections::{BinaryHeap, VecDeque};
9
10use crate::util::BoundedQueue;
11
12/// Envelope trait used by priority queues.
13pub trait Prioritized {
14    fn priority(&self) -> i32;
15}
16
17/// Unbounded FIFO queue (akka.net: `UnboundedMessageQueue`).
18#[derive(Debug, Default)]
19pub struct UnboundedQueue<T> {
20    inner: VecDeque<T>,
21}
22
23impl<T> UnboundedQueue<T> {
24    pub fn new() -> Self {
25        Self { inner: VecDeque::new() }
26    }
27
28    pub fn push(&mut self, msg: T) {
29        self.inner.push_back(msg);
30    }
31
32    pub fn pop(&mut self) -> Option<T> {
33        self.inner.pop_front()
34    }
35
36    pub fn len(&self) -> usize {
37        self.inner.len()
38    }
39
40    pub fn is_empty(&self) -> bool {
41        self.inner.is_empty()
42    }
43}
44
45/// Bounded FIFO queue (akka.net: `BoundedMessageQueue`).
46#[derive(Debug)]
47pub struct BoundedMsgQueue<T> {
48    inner: BoundedQueue<T>,
49}
50
51impl<T> BoundedMsgQueue<T> {
52    pub fn new(capacity: usize) -> Self {
53        Self { inner: BoundedQueue::new(capacity) }
54    }
55
56    pub fn push(&mut self, msg: T) -> Result<(), T> {
57        self.inner.push(msg)
58    }
59
60    pub fn pop(&mut self) -> Option<T> {
61        self.inner.pop()
62    }
63
64    pub fn is_full(&self) -> bool {
65        self.inner.is_full()
66    }
67}
68
69/// Deque-like queue permitting front insertion (for stash/unstash).
70/// akka.net: `UnboundedDequeMessageQueue`.
71#[derive(Debug)]
72pub struct DequeQueue<T> {
73    inner: VecDeque<T>,
74}
75
76impl<T> Default for DequeQueue<T> {
77    fn default() -> Self {
78        Self { inner: VecDeque::new() }
79    }
80}
81
82impl<T> DequeQueue<T> {
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    pub fn push_back(&mut self, msg: T) {
88        self.inner.push_back(msg);
89    }
90
91    pub fn push_front(&mut self, msg: T) {
92        self.inner.push_front(msg);
93    }
94
95    pub fn pop(&mut self) -> Option<T> {
96        self.inner.pop_front()
97    }
98}
99
100/// Priority queue. akka.net: `UnboundedPriorityMessageQueue`.
101///
102/// `T` must implement [`Prioritized`].
103pub struct PriorityQueue<T: Prioritized> {
104    heap: BinaryHeap<PriItem<T>>,
105}
106
107impl<T: Prioritized> std::fmt::Debug for PriorityQueue<T> {
108    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109        f.debug_struct("PriorityQueue").field("len", &self.heap.len()).finish()
110    }
111}
112
113impl<T: Prioritized> Default for PriorityQueue<T> {
114    fn default() -> Self {
115        Self { heap: BinaryHeap::new() }
116    }
117}
118
119impl<T: Prioritized> PriorityQueue<T> {
120    pub fn new() -> Self {
121        Self::default()
122    }
123
124    pub fn push(&mut self, msg: T) {
125        let p = msg.priority();
126        self.heap.push(PriItem { prio: p, inner: msg });
127    }
128
129    pub fn pop(&mut self) -> Option<T> {
130        self.heap.pop().map(|i| i.inner)
131    }
132}
133
134struct PriItem<T: Prioritized> {
135    prio: i32,
136    inner: T,
137}
138
139impl<T: Prioritized> PartialEq for PriItem<T> {
140    fn eq(&self, other: &Self) -> bool {
141        self.prio == other.prio
142    }
143}
144impl<T: Prioritized> Eq for PriItem<T> {}
145impl<T: Prioritized> PartialOrd for PriItem<T> {
146    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
147        Some(self.cmp(other))
148    }
149}
150impl<T: Prioritized> Ord for PriItem<T> {
151    fn cmp(&self, other: &Self) -> Ordering {
152        self.prio.cmp(&other.prio)
153    }
154}
155
156/// Stable priority queue (FIFO among equal priorities).
157/// akka.net: `UnboundedStablePriorityMessageQueue`.
158pub struct StablePriorityQueue<T: Prioritized> {
159    heap: BinaryHeap<StableItem<T>>,
160    seq: u64,
161}
162
163impl<T: Prioritized> std::fmt::Debug for StablePriorityQueue<T> {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        f.debug_struct("StablePriorityQueue").field("len", &self.heap.len()).finish()
166    }
167}
168
169impl<T: Prioritized> Default for StablePriorityQueue<T> {
170    fn default() -> Self {
171        Self { heap: BinaryHeap::new(), seq: 0 }
172    }
173}
174
175impl<T: Prioritized> StablePriorityQueue<T> {
176    pub fn new() -> Self {
177        Self::default()
178    }
179
180    pub fn push(&mut self, msg: T) {
181        let p = msg.priority();
182        let s = self.seq;
183        self.seq = self.seq.wrapping_add(1);
184        self.heap.push(StableItem { prio: p, seq: s, inner: msg });
185    }
186
187    pub fn pop(&mut self) -> Option<T> {
188        self.heap.pop().map(|i| i.inner)
189    }
190}
191
192struct StableItem<T: Prioritized> {
193    prio: i32,
194    seq: u64,
195    inner: T,
196}
197
198impl<T: Prioritized> PartialEq for StableItem<T> {
199    fn eq(&self, other: &Self) -> bool {
200        self.prio == other.prio && self.seq == other.seq
201    }
202}
203impl<T: Prioritized> Eq for StableItem<T> {}
204impl<T: Prioritized> PartialOrd for StableItem<T> {
205    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
206        Some(self.cmp(other))
207    }
208}
209impl<T: Prioritized> Ord for StableItem<T> {
210    fn cmp(&self, other: &Self) -> Ordering {
211        self.prio.cmp(&other.prio).then_with(|| other.seq.cmp(&self.seq))
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218
219    #[derive(Debug, PartialEq)]
220    struct M(i32);
221    impl Prioritized for M {
222        fn priority(&self) -> i32 {
223            self.0
224        }
225    }
226
227    #[test]
228    fn unbounded_fifo() {
229        let mut q = UnboundedQueue::new();
230        q.push(1);
231        q.push(2);
232        assert_eq!(q.pop(), Some(1));
233        assert_eq!(q.pop(), Some(2));
234    }
235
236    #[test]
237    fn bounded_rejects_when_full() {
238        let mut q = BoundedMsgQueue::new(1);
239        q.push(1).unwrap();
240        assert!(q.push(2).is_err());
241    }
242
243    #[test]
244    fn priority_highest_first() {
245        let mut q = PriorityQueue::new();
246        q.push(M(1));
247        q.push(M(5));
248        q.push(M(3));
249        assert_eq!(q.pop().unwrap().0, 5);
250        assert_eq!(q.pop().unwrap().0, 3);
251    }
252
253    #[test]
254    fn stable_priority_preserves_fifo_for_ties() {
255        let mut q = StablePriorityQueue::new();
256        q.push(M(1));
257        q.push(M(2));
258        q.push(M(1));
259        assert_eq!(q.pop().unwrap().0, 2);
260        // both remaining priorities are 1 — FIFO
261        assert_eq!(q.pop().unwrap().0, 1);
262        assert_eq!(q.pop().unwrap().0, 1);
263    }
264}