atomr_core/dispatch/
message_queues.rs1use std::cmp::Ordering;
8use std::collections::{BinaryHeap, VecDeque};
9
10use crate::util::BoundedQueue;
11
12pub trait Prioritized {
14 fn priority(&self) -> i32;
15}
16
17#[derive(Debug, Default)]
19pub struct UnboundedQueue<T> {
20 inner: VecDeque<T>,
21}
22
23impl<T> UnboundedQueue<T> {
24 pub fn new() -> Self {
25 Self { inner: VecDeque::new() }
26 }
27
28 pub fn push(&mut self, msg: T) {
29 self.inner.push_back(msg);
30 }
31
32 pub fn pop(&mut self) -> Option<T> {
33 self.inner.pop_front()
34 }
35
36 pub fn len(&self) -> usize {
37 self.inner.len()
38 }
39
40 pub fn is_empty(&self) -> bool {
41 self.inner.is_empty()
42 }
43}
44
45#[derive(Debug)]
47pub struct BoundedMsgQueue<T> {
48 inner: BoundedQueue<T>,
49}
50
51impl<T> BoundedMsgQueue<T> {
52 pub fn new(capacity: usize) -> Self {
53 Self { inner: BoundedQueue::new(capacity) }
54 }
55
56 pub fn push(&mut self, msg: T) -> Result<(), T> {
57 self.inner.push(msg)
58 }
59
60 pub fn pop(&mut self) -> Option<T> {
61 self.inner.pop()
62 }
63
64 pub fn is_full(&self) -> bool {
65 self.inner.is_full()
66 }
67}
68
69#[derive(Debug)]
72pub struct DequeQueue<T> {
73 inner: VecDeque<T>,
74}
75
76impl<T> Default for DequeQueue<T> {
77 fn default() -> Self {
78 Self { inner: VecDeque::new() }
79 }
80}
81
82impl<T> DequeQueue<T> {
83 pub fn new() -> Self {
84 Self::default()
85 }
86
87 pub fn push_back(&mut self, msg: T) {
88 self.inner.push_back(msg);
89 }
90
91 pub fn push_front(&mut self, msg: T) {
92 self.inner.push_front(msg);
93 }
94
95 pub fn pop(&mut self) -> Option<T> {
96 self.inner.pop_front()
97 }
98}
99
100pub struct PriorityQueue<T: Prioritized> {
104 heap: BinaryHeap<PriItem<T>>,
105}
106
107impl<T: Prioritized> std::fmt::Debug for PriorityQueue<T> {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 f.debug_struct("PriorityQueue").field("len", &self.heap.len()).finish()
110 }
111}
112
113impl<T: Prioritized> Default for PriorityQueue<T> {
114 fn default() -> Self {
115 Self { heap: BinaryHeap::new() }
116 }
117}
118
119impl<T: Prioritized> PriorityQueue<T> {
120 pub fn new() -> Self {
121 Self::default()
122 }
123
124 pub fn push(&mut self, msg: T) {
125 let p = msg.priority();
126 self.heap.push(PriItem { prio: p, inner: msg });
127 }
128
129 pub fn pop(&mut self) -> Option<T> {
130 self.heap.pop().map(|i| i.inner)
131 }
132}
133
134struct PriItem<T: Prioritized> {
135 prio: i32,
136 inner: T,
137}
138
139impl<T: Prioritized> PartialEq for PriItem<T> {
140 fn eq(&self, other: &Self) -> bool {
141 self.prio == other.prio
142 }
143}
144impl<T: Prioritized> Eq for PriItem<T> {}
145impl<T: Prioritized> PartialOrd for PriItem<T> {
146 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
147 Some(self.cmp(other))
148 }
149}
150impl<T: Prioritized> Ord for PriItem<T> {
151 fn cmp(&self, other: &Self) -> Ordering {
152 self.prio.cmp(&other.prio)
153 }
154}
155
156pub struct StablePriorityQueue<T: Prioritized> {
159 heap: BinaryHeap<StableItem<T>>,
160 seq: u64,
161}
162
163impl<T: Prioritized> std::fmt::Debug for StablePriorityQueue<T> {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 f.debug_struct("StablePriorityQueue").field("len", &self.heap.len()).finish()
166 }
167}
168
169impl<T: Prioritized> Default for StablePriorityQueue<T> {
170 fn default() -> Self {
171 Self { heap: BinaryHeap::new(), seq: 0 }
172 }
173}
174
175impl<T: Prioritized> StablePriorityQueue<T> {
176 pub fn new() -> Self {
177 Self::default()
178 }
179
180 pub fn push(&mut self, msg: T) {
181 let p = msg.priority();
182 let s = self.seq;
183 self.seq = self.seq.wrapping_add(1);
184 self.heap.push(StableItem { prio: p, seq: s, inner: msg });
185 }
186
187 pub fn pop(&mut self) -> Option<T> {
188 self.heap.pop().map(|i| i.inner)
189 }
190}
191
192struct StableItem<T: Prioritized> {
193 prio: i32,
194 seq: u64,
195 inner: T,
196}
197
198impl<T: Prioritized> PartialEq for StableItem<T> {
199 fn eq(&self, other: &Self) -> bool {
200 self.prio == other.prio && self.seq == other.seq
201 }
202}
203impl<T: Prioritized> Eq for StableItem<T> {}
204impl<T: Prioritized> PartialOrd for StableItem<T> {
205 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
206 Some(self.cmp(other))
207 }
208}
209impl<T: Prioritized> Ord for StableItem<T> {
210 fn cmp(&self, other: &Self) -> Ordering {
211 self.prio.cmp(&other.prio).then_with(|| other.seq.cmp(&self.seq))
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218
219 #[derive(Debug, PartialEq)]
220 struct M(i32);
221 impl Prioritized for M {
222 fn priority(&self) -> i32 {
223 self.0
224 }
225 }
226
227 #[test]
228 fn unbounded_fifo() {
229 let mut q = UnboundedQueue::new();
230 q.push(1);
231 q.push(2);
232 assert_eq!(q.pop(), Some(1));
233 assert_eq!(q.pop(), Some(2));
234 }
235
236 #[test]
237 fn bounded_rejects_when_full() {
238 let mut q = BoundedMsgQueue::new(1);
239 q.push(1).unwrap();
240 assert!(q.push(2).is_err());
241 }
242
243 #[test]
244 fn priority_highest_first() {
245 let mut q = PriorityQueue::new();
246 q.push(M(1));
247 q.push(M(5));
248 q.push(M(3));
249 assert_eq!(q.pop().unwrap().0, 5);
250 assert_eq!(q.pop().unwrap().0, 3);
251 }
252
253 #[test]
254 fn stable_priority_preserves_fifo_for_ties() {
255 let mut q = StablePriorityQueue::new();
256 q.push(M(1));
257 q.push(M(2));
258 q.push(M(1));
259 assert_eq!(q.pop().unwrap().0, 2);
260 assert_eq!(q.pop().unwrap().0, 1);
262 assert_eq!(q.pop().unwrap().0, 1);
263 }
264}