1use std::cmp::Ordering;
8use std::collections::{BinaryHeap, VecDeque};
9
10use crate::dispatch::mailbox::OverflowStrategy;
11use crate::util::BoundedQueue;
12
13pub trait Prioritized {
15 fn priority(&self) -> i32;
16}
17
18#[derive(Debug, Default)]
20pub struct UnboundedQueue<T> {
21 inner: VecDeque<T>,
22}
23
24impl<T> UnboundedQueue<T> {
25 pub fn new() -> Self {
26 Self { inner: VecDeque::new() }
27 }
28
29 pub fn push(&mut self, msg: T) {
30 self.inner.push_back(msg);
31 }
32
33 pub fn pop(&mut self) -> Option<T> {
34 self.inner.pop_front()
35 }
36
37 pub fn len(&self) -> usize {
38 self.inner.len()
39 }
40
41 pub fn is_empty(&self) -> bool {
42 self.inner.is_empty()
43 }
44}
45
46#[derive(Debug, PartialEq, Eq)]
51pub enum PushOutcome<T> {
52 Accepted,
53 Dropped { dropped: T },
54 Rejected(T),
55}
56
57#[derive(Debug)]
59pub struct BoundedMsgQueue<T> {
60 inner: BoundedQueue<T>,
61 overflow: OverflowStrategy,
62}
63
64impl<T> BoundedMsgQueue<T> {
65 pub fn new(capacity: usize) -> Self {
66 Self::with_overflow(capacity, OverflowStrategy::Fail)
67 }
68
69 pub fn with_overflow(capacity: usize, overflow: OverflowStrategy) -> Self {
70 Self { inner: BoundedQueue::new(capacity), overflow }
71 }
72
73 pub fn push(&mut self, msg: T) -> Result<(), T> {
77 match self.push_with_strategy(msg) {
78 PushOutcome::Accepted => Ok(()),
79 PushOutcome::Dropped { dropped } => Err(dropped),
80 PushOutcome::Rejected(msg) => Err(msg),
81 }
82 }
83
84 pub fn push_with_strategy(&mut self, msg: T) -> PushOutcome<T> {
90 if !self.inner.is_full() {
91 return match self.inner.push(msg) {
92 Ok(()) => PushOutcome::Accepted,
93 Err(m) => PushOutcome::Rejected(m),
94 };
95 }
96 match self.overflow {
97 OverflowStrategy::Fail | OverflowStrategy::DropNew => PushOutcome::Rejected(msg),
98 OverflowStrategy::DropHead => match self.inner.pop() {
99 Some(dropped) => match self.inner.push(msg) {
100 Ok(()) => PushOutcome::Dropped { dropped },
101 Err(m) => PushOutcome::Rejected(m),
102 },
103 None => PushOutcome::Rejected(msg),
104 },
105 OverflowStrategy::DropTail => match self.inner.pop_back() {
106 Some(dropped) => match self.inner.push(msg) {
107 Ok(()) => PushOutcome::Dropped { dropped },
108 Err(m) => PushOutcome::Rejected(m),
109 },
110 None => PushOutcome::Rejected(msg),
111 },
112 }
113 }
114
115 pub fn pop(&mut self) -> Option<T> {
116 self.inner.pop()
117 }
118
119 pub fn is_full(&self) -> bool {
120 self.inner.is_full()
121 }
122
123 pub fn overflow(&self) -> OverflowStrategy {
124 self.overflow
125 }
126}
127
128#[derive(Debug)]
133pub enum ControlAware<T> {
134 Control(T),
135 User(T),
136}
137
138#[derive(Debug, Default)]
139pub struct ControlAwareQueue<T> {
140 control: VecDeque<T>,
141 user: VecDeque<T>,
142}
143
144impl<T> ControlAwareQueue<T> {
145 pub fn new() -> Self {
146 Self { control: VecDeque::new(), user: VecDeque::new() }
147 }
148
149 pub fn push(&mut self, msg: ControlAware<T>) {
150 match msg {
151 ControlAware::Control(m) => self.control.push_back(m),
152 ControlAware::User(m) => self.user.push_back(m),
153 }
154 }
155
156 pub fn pop(&mut self) -> Option<T> {
157 self.control.pop_front().or_else(|| self.user.pop_front())
158 }
159
160 pub fn len(&self) -> usize {
161 self.control.len() + self.user.len()
162 }
163
164 pub fn is_empty(&self) -> bool {
165 self.control.is_empty() && self.user.is_empty()
166 }
167}
168
169#[derive(Debug)]
171pub struct DequeQueue<T> {
172 inner: VecDeque<T>,
173}
174
175impl<T> Default for DequeQueue<T> {
176 fn default() -> Self {
177 Self { inner: VecDeque::new() }
178 }
179}
180
181impl<T> DequeQueue<T> {
182 pub fn new() -> Self {
183 Self::default()
184 }
185
186 pub fn push_back(&mut self, msg: T) {
187 self.inner.push_back(msg);
188 }
189
190 pub fn push_front(&mut self, msg: T) {
191 self.inner.push_front(msg);
192 }
193
194 pub fn pop(&mut self) -> Option<T> {
195 self.inner.pop_front()
196 }
197}
198
199pub struct PriorityQueue<T: Prioritized> {
203 heap: BinaryHeap<PriItem<T>>,
204}
205
206impl<T: Prioritized> std::fmt::Debug for PriorityQueue<T> {
207 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208 f.debug_struct("PriorityQueue").field("len", &self.heap.len()).finish()
209 }
210}
211
212impl<T: Prioritized> Default for PriorityQueue<T> {
213 fn default() -> Self {
214 Self { heap: BinaryHeap::new() }
215 }
216}
217
218impl<T: Prioritized> PriorityQueue<T> {
219 pub fn new() -> Self {
220 Self::default()
221 }
222
223 pub fn push(&mut self, msg: T) {
224 let p = msg.priority();
225 self.heap.push(PriItem { prio: p, inner: msg });
226 }
227
228 pub fn pop(&mut self) -> Option<T> {
229 self.heap.pop().map(|i| i.inner)
230 }
231}
232
233struct PriItem<T: Prioritized> {
234 prio: i32,
235 inner: T,
236}
237
238impl<T: Prioritized> PartialEq for PriItem<T> {
239 fn eq(&self, other: &Self) -> bool {
240 self.prio == other.prio
241 }
242}
243impl<T: Prioritized> Eq for PriItem<T> {}
244impl<T: Prioritized> PartialOrd for PriItem<T> {
245 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
246 Some(self.cmp(other))
247 }
248}
249impl<T: Prioritized> Ord for PriItem<T> {
250 fn cmp(&self, other: &Self) -> Ordering {
251 self.prio.cmp(&other.prio)
252 }
253}
254
255pub struct StablePriorityQueue<T: Prioritized> {
257 heap: BinaryHeap<StableItem<T>>,
258 seq: u64,
259}
260
261impl<T: Prioritized> std::fmt::Debug for StablePriorityQueue<T> {
262 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
263 f.debug_struct("StablePriorityQueue").field("len", &self.heap.len()).finish()
264 }
265}
266
267impl<T: Prioritized> Default for StablePriorityQueue<T> {
268 fn default() -> Self {
269 Self { heap: BinaryHeap::new(), seq: 0 }
270 }
271}
272
273impl<T: Prioritized> StablePriorityQueue<T> {
274 pub fn new() -> Self {
275 Self::default()
276 }
277
278 pub fn push(&mut self, msg: T) {
279 let p = msg.priority();
280 let s = self.seq;
281 self.seq = self.seq.wrapping_add(1);
282 self.heap.push(StableItem { prio: p, seq: s, inner: msg });
283 }
284
285 pub fn pop(&mut self) -> Option<T> {
286 self.heap.pop().map(|i| i.inner)
287 }
288}
289
290struct StableItem<T: Prioritized> {
291 prio: i32,
292 seq: u64,
293 inner: T,
294}
295
296impl<T: Prioritized> PartialEq for StableItem<T> {
297 fn eq(&self, other: &Self) -> bool {
298 self.prio == other.prio && self.seq == other.seq
299 }
300}
301impl<T: Prioritized> Eq for StableItem<T> {}
302impl<T: Prioritized> PartialOrd for StableItem<T> {
303 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
304 Some(self.cmp(other))
305 }
306}
307impl<T: Prioritized> Ord for StableItem<T> {
308 fn cmp(&self, other: &Self) -> Ordering {
309 self.prio.cmp(&other.prio).then_with(|| other.seq.cmp(&self.seq))
310 }
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316
317 #[derive(Debug, PartialEq)]
318 struct M(i32);
319 impl Prioritized for M {
320 fn priority(&self) -> i32 {
321 self.0
322 }
323 }
324
325 #[test]
326 fn unbounded_fifo() {
327 let mut q = UnboundedQueue::new();
328 q.push(1);
329 q.push(2);
330 assert_eq!(q.pop(), Some(1));
331 assert_eq!(q.pop(), Some(2));
332 }
333
334 #[test]
335 fn bounded_rejects_when_full() {
336 let mut q = BoundedMsgQueue::new(1);
337 q.push(1).unwrap();
338 assert!(q.push(2).is_err());
339 }
340
341 #[test]
342 fn bounded_drop_head_removes_oldest() {
343 let mut q = BoundedMsgQueue::with_overflow(2, OverflowStrategy::DropHead);
344 assert_eq!(q.push_with_strategy(1), PushOutcome::Accepted);
345 assert_eq!(q.push_with_strategy(2), PushOutcome::Accepted);
346 assert_eq!(q.push_with_strategy(3), PushOutcome::Dropped { dropped: 1 });
347 assert_eq!(q.pop(), Some(2));
348 assert_eq!(q.pop(), Some(3));
349 }
350
351 #[test]
352 fn bounded_drop_tail_removes_newest() {
353 let mut q = BoundedMsgQueue::with_overflow(2, OverflowStrategy::DropTail);
354 q.push_with_strategy(1);
355 q.push_with_strategy(2);
356 assert_eq!(q.push_with_strategy(3), PushOutcome::Dropped { dropped: 2 });
357 assert_eq!(q.pop(), Some(1));
358 assert_eq!(q.pop(), Some(3));
359 }
360
361 #[test]
362 fn bounded_drop_new_rejects_incoming() {
363 let mut q = BoundedMsgQueue::with_overflow(1, OverflowStrategy::DropNew);
364 q.push_with_strategy(1);
365 assert_eq!(q.push_with_strategy(2), PushOutcome::Rejected(2));
366 assert_eq!(q.pop(), Some(1));
367 }
368
369 #[test]
370 fn bounded_fail_rejects_incoming() {
371 let mut q = BoundedMsgQueue::with_overflow(1, OverflowStrategy::Fail);
372 q.push_with_strategy(1);
373 assert_eq!(q.push_with_strategy(2), PushOutcome::Rejected(2));
374 }
375
376 #[test]
377 fn control_aware_drains_control_first() {
378 let mut q = ControlAwareQueue::new();
379 q.push(ControlAware::User(1));
380 q.push(ControlAware::User(2));
381 q.push(ControlAware::Control(99));
382 assert_eq!(q.pop(), Some(99));
383 assert_eq!(q.pop(), Some(1));
384 assert_eq!(q.pop(), Some(2));
385 assert!(q.is_empty());
386 }
387
388 #[test]
389 fn control_aware_preserves_within_class_fifo() {
390 let mut q = ControlAwareQueue::new();
391 q.push(ControlAware::Control(1));
392 q.push(ControlAware::Control(2));
393 q.push(ControlAware::User(10));
394 q.push(ControlAware::User(11));
395 assert_eq!(q.pop(), Some(1));
396 assert_eq!(q.pop(), Some(2));
397 assert_eq!(q.pop(), Some(10));
398 assert_eq!(q.pop(), Some(11));
399 }
400
401 #[test]
402 fn priority_highest_first() {
403 let mut q = PriorityQueue::new();
404 q.push(M(1));
405 q.push(M(5));
406 q.push(M(3));
407 assert_eq!(q.pop().unwrap().0, 5);
408 assert_eq!(q.pop().unwrap().0, 3);
409 }
410
411 #[test]
412 fn stable_priority_preserves_fifo_for_ties() {
413 let mut q = StablePriorityQueue::new();
414 q.push(M(1));
415 q.push(M(2));
416 q.push(M(1));
417 assert_eq!(q.pop().unwrap().0, 2);
418 assert_eq!(q.pop().unwrap().0, 1);
420 assert_eq!(q.pop().unwrap().0, 1);
421 }
422}