celers_protocol/
priority_queue.rs

1//! Priority queue for message processing
2//!
3//! This module provides priority-based message queues for efficient
4//! task scheduling and execution ordering.
5
6use crate::Message;
7use std::cmp::Ordering;
8use std::collections::BinaryHeap;
9
10/// Wrapper for messages with priority ordering
11#[derive(Debug, Clone)]
12struct PriorityMessage {
13    message: Message,
14    priority: u8,
15    sequence: u64, // For FIFO ordering within same priority
16}
17
18impl PartialEq for PriorityMessage {
19    fn eq(&self, other: &Self) -> bool {
20        self.priority == other.priority && self.sequence == other.sequence
21    }
22}
23
24impl Eq for PriorityMessage {}
25
26impl PartialOrd for PriorityMessage {
27    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
28        Some(self.cmp(other))
29    }
30}
31
32impl Ord for PriorityMessage {
33    fn cmp(&self, other: &Self) -> Ordering {
34        // Higher priority first, then earlier sequence
35        match self.priority.cmp(&other.priority) {
36            Ordering::Equal => other.sequence.cmp(&self.sequence), // FIFO for same priority
37            ordering => ordering,
38        }
39    }
40}
41
42/// Priority queue for messages
43///
44/// Messages are ordered by priority (0-9, higher = more important),
45/// with FIFO ordering within the same priority level.
46#[derive(Debug, Clone)]
47pub struct MessagePriorityQueue {
48    heap: BinaryHeap<PriorityMessage>,
49    sequence_counter: u64,
50    max_size: Option<usize>,
51}
52
53impl MessagePriorityQueue {
54    /// Create a new priority queue
55    pub fn new() -> Self {
56        Self {
57            heap: BinaryHeap::new(),
58            sequence_counter: 0,
59            max_size: None,
60        }
61    }
62
63    /// Create a priority queue with a maximum size
64    pub fn with_capacity(max_size: usize) -> Self {
65        Self {
66            heap: BinaryHeap::with_capacity(max_size),
67            sequence_counter: 0,
68            max_size: Some(max_size),
69        }
70    }
71
72    /// Push a message onto the queue
73    ///
74    /// Returns `true` if the message was added, `false` if the queue is full
75    pub fn push(&mut self, message: Message) -> bool {
76        if let Some(max) = self.max_size {
77            if self.heap.len() >= max {
78                return false;
79            }
80        }
81
82        let priority = message.properties.priority.unwrap_or(5);
83        let priority_msg = PriorityMessage {
84            message,
85            priority,
86            sequence: self.sequence_counter,
87        };
88
89        self.sequence_counter = self.sequence_counter.wrapping_add(1);
90        self.heap.push(priority_msg);
91        true
92    }
93
94    /// Pop the highest priority message from the queue
95    pub fn pop(&mut self) -> Option<Message> {
96        self.heap.pop().map(|pm| pm.message)
97    }
98
99    /// Peek at the highest priority message without removing it
100    pub fn peek(&self) -> Option<&Message> {
101        self.heap.peek().map(|pm| &pm.message)
102    }
103
104    /// Get the number of messages in the queue
105    #[inline]
106    pub fn len(&self) -> usize {
107        self.heap.len()
108    }
109
110    /// Check if the queue is empty
111    #[inline]
112    pub fn is_empty(&self) -> bool {
113        self.heap.is_empty()
114    }
115
116    /// Check if the queue is full (if max_size is set)
117    #[inline]
118    pub fn is_full(&self) -> bool {
119        if let Some(max) = self.max_size {
120            self.heap.len() >= max
121        } else {
122            false
123        }
124    }
125
126    /// Clear all messages from the queue
127    pub fn clear(&mut self) {
128        self.heap.clear();
129        self.sequence_counter = 0;
130    }
131
132    /// Drain all messages from the queue in priority order
133    pub fn drain(&mut self) -> Vec<Message> {
134        let mut messages = Vec::with_capacity(self.heap.len());
135        while let Some(msg) = self.pop() {
136            messages.push(msg);
137        }
138        messages
139    }
140
141    /// Get messages with a specific priority
142    pub fn filter_by_priority(&self, priority: u8) -> Vec<&Message> {
143        self.heap
144            .iter()
145            .filter(|pm| pm.priority == priority)
146            .map(|pm| &pm.message)
147            .collect()
148    }
149
150    /// Count messages by priority level
151    pub fn count_by_priority(&self) -> [usize; 10] {
152        let mut counts = [0; 10];
153        for pm in &self.heap {
154            if (pm.priority as usize) < 10 {
155                counts[pm.priority as usize] += 1;
156            }
157        }
158        counts
159    }
160}
161
162impl Default for MessagePriorityQueue {
163    fn default() -> Self {
164        Self::new()
165    }
166}
167
168impl FromIterator<Message> for MessagePriorityQueue {
169    fn from_iter<T: IntoIterator<Item = Message>>(iter: T) -> Self {
170        let mut queue = Self::new();
171        for message in iter {
172            queue.push(message);
173        }
174        queue
175    }
176}
177
178impl Extend<Message> for MessagePriorityQueue {
179    fn extend<T: IntoIterator<Item = Message>>(&mut self, iter: T) {
180        for message in iter {
181            if !self.push(message) {
182                break; // Stop if queue is full
183            }
184        }
185    }
186}
187
188impl IntoIterator for MessagePriorityQueue {
189    type Item = Message;
190    type IntoIter = PriorityQueueIter;
191
192    fn into_iter(self) -> Self::IntoIter {
193        PriorityQueueIter { queue: self }
194    }
195}
196
197/// Iterator that drains messages from a priority queue in priority order
198pub struct PriorityQueueIter {
199    queue: MessagePriorityQueue,
200}
201
202impl Iterator for PriorityQueueIter {
203    type Item = Message;
204
205    fn next(&mut self) -> Option<Self::Item> {
206        self.queue.pop()
207    }
208
209    fn size_hint(&self) -> (usize, Option<usize>) {
210        let len = self.queue.len();
211        (len, Some(len))
212    }
213}
214
215impl ExactSizeIterator for PriorityQueueIter {
216    fn len(&self) -> usize {
217        self.queue.len()
218    }
219}
220
221/// Multi-level priority queues with separate queues for each priority
222#[derive(Debug, Clone)]
223pub struct MultiLevelQueue {
224    queues: [Vec<Message>; 10], // One queue per priority level (0-9)
225    total_size: usize,
226    max_size: Option<usize>,
227}
228
229impl MultiLevelQueue {
230    /// Create a new multi-level queue
231    pub fn new() -> Self {
232        Self {
233            queues: Default::default(),
234            total_size: 0,
235            max_size: None,
236        }
237    }
238
239    /// Create a multi-level queue with maximum size
240    pub fn with_capacity(max_size: usize) -> Self {
241        Self {
242            queues: Default::default(),
243            total_size: 0,
244            max_size: Some(max_size),
245        }
246    }
247
248    /// Push a message to the appropriate priority queue
249    pub fn push(&mut self, message: Message) -> bool {
250        if let Some(max) = self.max_size {
251            if self.total_size >= max {
252                return false;
253            }
254        }
255
256        let priority = message.properties.priority.unwrap_or(5) as usize;
257        if priority < 10 {
258            self.queues[priority].push(message);
259            self.total_size += 1;
260            true
261        } else {
262            false
263        }
264    }
265
266    /// Pop the highest priority message
267    pub fn pop(&mut self) -> Option<Message> {
268        // Iterate from highest to lowest priority
269        for queue in self.queues.iter_mut().rev() {
270            if let Some(msg) = queue.pop() {
271                self.total_size -= 1;
272                return Some(msg);
273            }
274        }
275        None
276    }
277
278    /// Peek at the highest priority message
279    pub fn peek(&self) -> Option<&Message> {
280        for queue in self.queues.iter().rev() {
281            if let Some(msg) = queue.last() {
282                return Some(msg);
283            }
284        }
285        None
286    }
287
288    /// Get the total number of messages across all queues
289    #[inline]
290    pub fn len(&self) -> usize {
291        self.total_size
292    }
293
294    /// Check if all queues are empty
295    #[inline]
296    pub fn is_empty(&self) -> bool {
297        self.total_size == 0
298    }
299
300    /// Get the number of messages at a specific priority level
301    #[inline]
302    pub fn len_at_priority(&self, priority: u8) -> usize {
303        if (priority as usize) < 10 {
304            self.queues[priority as usize].len()
305        } else {
306            0
307        }
308    }
309
310    /// Clear all queues
311    pub fn clear(&mut self) {
312        for queue in &mut self.queues {
313            queue.clear();
314        }
315        self.total_size = 0;
316    }
317
318    /// Drain all messages in priority order
319    pub fn drain(&mut self) -> Vec<Message> {
320        let mut messages = Vec::with_capacity(self.total_size);
321        while let Some(msg) = self.pop() {
322            messages.push(msg);
323        }
324        messages
325    }
326}
327
328impl Default for MultiLevelQueue {
329    fn default() -> Self {
330        Self::new()
331    }
332}
333
334impl FromIterator<Message> for MultiLevelQueue {
335    fn from_iter<T: IntoIterator<Item = Message>>(iter: T) -> Self {
336        let mut queue = Self::new();
337        for message in iter {
338            queue.push(message);
339        }
340        queue
341    }
342}
343
344impl Extend<Message> for MultiLevelQueue {
345    fn extend<T: IntoIterator<Item = Message>>(&mut self, iter: T) {
346        for message in iter {
347            if !self.push(message) {
348                break; // Stop if queue is full
349            }
350        }
351    }
352}
353
354impl IntoIterator for MultiLevelQueue {
355    type Item = Message;
356    type IntoIter = MultiLevelQueueIter;
357
358    fn into_iter(self) -> Self::IntoIter {
359        MultiLevelQueueIter { queue: self }
360    }
361}
362
363/// Iterator that drains messages from a multi-level queue in priority order
364pub struct MultiLevelQueueIter {
365    queue: MultiLevelQueue,
366}
367
368impl Iterator for MultiLevelQueueIter {
369    type Item = Message;
370
371    fn next(&mut self) -> Option<Self::Item> {
372        self.queue.pop()
373    }
374
375    fn size_hint(&self) -> (usize, Option<usize>) {
376        let len = self.queue.len();
377        (len, Some(len))
378    }
379}
380
381impl ExactSizeIterator for MultiLevelQueueIter {
382    fn len(&self) -> usize {
383        self.queue.len()
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390    use crate::builder::MessageBuilder;
391
392    fn create_message_with_priority(task: &str, priority: u8) -> Message {
393        MessageBuilder::new(task)
394            .priority(priority)
395            .build()
396            .unwrap()
397    }
398
399    #[test]
400    fn test_priority_queue_push_pop() {
401        let mut queue = MessagePriorityQueue::new();
402
403        let msg1 = create_message_with_priority("task1", 5);
404        let msg2 = create_message_with_priority("task2", 9);
405        let msg3 = create_message_with_priority("task3", 1);
406
407        queue.push(msg1);
408        queue.push(msg2);
409        queue.push(msg3);
410
411        assert_eq!(queue.len(), 3);
412
413        // Should pop in priority order: 9, 5, 1
414        let popped = queue.pop().unwrap();
415        assert_eq!(popped.properties.priority, Some(9));
416
417        let popped = queue.pop().unwrap();
418        assert_eq!(popped.properties.priority, Some(5));
419
420        let popped = queue.pop().unwrap();
421        assert_eq!(popped.properties.priority, Some(1));
422
423        assert!(queue.is_empty());
424    }
425
426    #[test]
427    fn test_priority_queue_fifo_same_priority() {
428        let mut queue = MessagePriorityQueue::new();
429
430        let msg1 = create_message_with_priority("task1", 5);
431        let msg2 = create_message_with_priority("task2", 5);
432        let msg3 = create_message_with_priority("task3", 5);
433
434        queue.push(msg1.clone());
435        queue.push(msg2.clone());
436        queue.push(msg3.clone());
437
438        // Should pop in FIFO order for same priority
439        let popped1 = queue.pop().unwrap();
440        assert_eq!(popped1.headers.task, "task1");
441
442        let popped2 = queue.pop().unwrap();
443        assert_eq!(popped2.headers.task, "task2");
444
445        let popped3 = queue.pop().unwrap();
446        assert_eq!(popped3.headers.task, "task3");
447    }
448
449    #[test]
450    fn test_priority_queue_peek() {
451        let mut queue = MessagePriorityQueue::new();
452
453        let msg1 = create_message_with_priority("task1", 5);
454        let msg2 = create_message_with_priority("task2", 9);
455
456        queue.push(msg1);
457        queue.push(msg2);
458
459        let peeked = queue.peek().unwrap();
460        assert_eq!(peeked.properties.priority, Some(9));
461        assert_eq!(queue.len(), 2); // Peek doesn't remove
462    }
463
464    #[test]
465    fn test_priority_queue_with_capacity() {
466        let mut queue = MessagePriorityQueue::with_capacity(2);
467
468        assert!(queue.push(create_message_with_priority("task1", 5)));
469        assert!(queue.push(create_message_with_priority("task2", 5)));
470        assert!(!queue.push(create_message_with_priority("task3", 5))); // Full
471
472        assert!(queue.is_full());
473    }
474
475    #[test]
476    fn test_priority_queue_clear() {
477        let mut queue = MessagePriorityQueue::new();
478
479        queue.push(create_message_with_priority("task1", 5));
480        queue.push(create_message_with_priority("task2", 5));
481
482        assert_eq!(queue.len(), 2);
483
484        queue.clear();
485        assert_eq!(queue.len(), 0);
486        assert!(queue.is_empty());
487    }
488
489    #[test]
490    fn test_priority_queue_drain() {
491        let mut queue = MessagePriorityQueue::new();
492
493        queue.push(create_message_with_priority("task1", 1));
494        queue.push(create_message_with_priority("task2", 9));
495        queue.push(create_message_with_priority("task3", 5));
496
497        let messages = queue.drain();
498        assert_eq!(messages.len(), 3);
499        assert_eq!(messages[0].properties.priority, Some(9));
500        assert_eq!(messages[1].properties.priority, Some(5));
501        assert_eq!(messages[2].properties.priority, Some(1));
502        assert!(queue.is_empty());
503    }
504
505    #[test]
506    fn test_priority_queue_filter_by_priority() {
507        let mut queue = MessagePriorityQueue::new();
508
509        queue.push(create_message_with_priority("task1", 5));
510        queue.push(create_message_with_priority("task2", 9));
511        queue.push(create_message_with_priority("task3", 5));
512
513        let filtered = queue.filter_by_priority(5);
514        assert_eq!(filtered.len(), 2);
515    }
516
517    #[test]
518    fn test_priority_queue_count_by_priority() {
519        let mut queue = MessagePriorityQueue::new();
520
521        queue.push(create_message_with_priority("task1", 5));
522        queue.push(create_message_with_priority("task2", 9));
523        queue.push(create_message_with_priority("task3", 5));
524        queue.push(create_message_with_priority("task4", 1));
525
526        let counts = queue.count_by_priority();
527        assert_eq!(counts[1], 1);
528        assert_eq!(counts[5], 2);
529        assert_eq!(counts[9], 1);
530    }
531
532    #[test]
533    fn test_multi_level_queue_push_pop() {
534        let mut queue = MultiLevelQueue::new();
535
536        queue.push(create_message_with_priority("task1", 5));
537        queue.push(create_message_with_priority("task2", 9));
538        queue.push(create_message_with_priority("task3", 1));
539
540        assert_eq!(queue.len(), 3);
541
542        // Should pop in priority order
543        let popped = queue.pop().unwrap();
544        assert_eq!(popped.properties.priority, Some(9));
545
546        let popped = queue.pop().unwrap();
547        assert_eq!(popped.properties.priority, Some(5));
548
549        let popped = queue.pop().unwrap();
550        assert_eq!(popped.properties.priority, Some(1));
551    }
552
553    #[test]
554    fn test_multi_level_queue_len_at_priority() {
555        let mut queue = MultiLevelQueue::new();
556
557        queue.push(create_message_with_priority("task1", 5));
558        queue.push(create_message_with_priority("task2", 5));
559        queue.push(create_message_with_priority("task3", 9));
560
561        assert_eq!(queue.len_at_priority(5), 2);
562        assert_eq!(queue.len_at_priority(9), 1);
563        assert_eq!(queue.len_at_priority(0), 0);
564    }
565
566    #[test]
567    fn test_multi_level_queue_peek() {
568        let mut queue = MultiLevelQueue::new();
569
570        queue.push(create_message_with_priority("task1", 5));
571        queue.push(create_message_with_priority("task2", 9));
572
573        let peeked = queue.peek().unwrap();
574        assert_eq!(peeked.properties.priority, Some(9));
575        assert_eq!(queue.len(), 2);
576    }
577
578    #[test]
579    fn test_multi_level_queue_clear() {
580        let mut queue = MultiLevelQueue::new();
581
582        queue.push(create_message_with_priority("task1", 5));
583        queue.push(create_message_with_priority("task2", 9));
584
585        queue.clear();
586        assert!(queue.is_empty());
587        assert_eq!(queue.len(), 0);
588    }
589
590    #[test]
591    fn test_from_iterator() {
592        let messages = vec![
593            create_message_with_priority("task1", 5),
594            create_message_with_priority("task2", 9),
595            create_message_with_priority("task3", 1),
596        ];
597
598        let queue: MessagePriorityQueue = messages.into_iter().collect();
599        assert_eq!(queue.len(), 3);
600    }
601
602    #[test]
603    fn test_priority_queue_extend() {
604        let mut queue = MessagePriorityQueue::new();
605        queue.push(create_message_with_priority("task1", 5));
606
607        let new_messages = vec![
608            create_message_with_priority("task2", 9),
609            create_message_with_priority("task3", 1),
610        ];
611
612        queue.extend(new_messages);
613        assert_eq!(queue.len(), 3);
614
615        // Check priority order
616        assert_eq!(queue.pop().unwrap().properties.priority, Some(9));
617        assert_eq!(queue.pop().unwrap().properties.priority, Some(5));
618        assert_eq!(queue.pop().unwrap().properties.priority, Some(1));
619    }
620
621    #[test]
622    fn test_priority_queue_extend_with_capacity() {
623        let mut queue = MessagePriorityQueue::with_capacity(3);
624        queue.push(create_message_with_priority("task1", 5));
625
626        let new_messages = vec![
627            create_message_with_priority("task2", 9),
628            create_message_with_priority("task3", 1),
629            create_message_with_priority("task4", 7), // Should not be added (over capacity)
630        ];
631
632        queue.extend(new_messages);
633        assert_eq!(queue.len(), 3);
634        assert!(queue.is_full());
635    }
636
637    #[test]
638    fn test_priority_queue_into_iterator() {
639        let messages = vec![
640            create_message_with_priority("task1", 5),
641            create_message_with_priority("task2", 9),
642            create_message_with_priority("task3", 1),
643        ];
644
645        let queue: MessagePriorityQueue = messages.into_iter().collect();
646        let mut count = 0;
647        let mut priorities = Vec::new();
648
649        for msg in queue {
650            priorities.push(msg.properties.priority.unwrap());
651            count += 1;
652        }
653
654        assert_eq!(count, 3);
655        assert_eq!(priorities, vec![9, 5, 1]); // Should be in priority order
656    }
657
658    #[test]
659    fn test_priority_queue_iter_exact_size() {
660        let messages = vec![
661            create_message_with_priority("task1", 5),
662            create_message_with_priority("task2", 9),
663            create_message_with_priority("task3", 1),
664        ];
665
666        let queue: MessagePriorityQueue = messages.into_iter().collect();
667        let iter = queue.into_iter();
668
669        assert_eq!(iter.len(), 3);
670
671        let collected: Vec<_> = iter.collect();
672        assert_eq!(collected.len(), 3);
673    }
674
675    #[test]
676    fn test_priority_queue_iterator_chain() {
677        let messages = vec![
678            create_message_with_priority("task1", 5),
679            create_message_with_priority("task2", 9),
680            create_message_with_priority("task3", 1),
681            create_message_with_priority("task4", 7),
682        ];
683
684        let queue: MessagePriorityQueue = messages.into_iter().collect();
685
686        let task_names: Vec<String> = queue
687            .into_iter()
688            .map(|msg| msg.headers.task.clone())
689            .collect();
690
691        assert_eq!(task_names, vec!["task2", "task4", "task1", "task3"]);
692    }
693
694    #[test]
695    fn test_multi_level_queue_from_iterator() {
696        let messages = vec![
697            create_message_with_priority("task1", 5),
698            create_message_with_priority("task2", 9),
699            create_message_with_priority("task3", 1),
700        ];
701
702        let queue: MultiLevelQueue = messages.into_iter().collect();
703        assert_eq!(queue.len(), 3);
704        assert_eq!(queue.len_at_priority(5), 1);
705        assert_eq!(queue.len_at_priority(9), 1);
706        assert_eq!(queue.len_at_priority(1), 1);
707    }
708
709    #[test]
710    fn test_multi_level_queue_extend() {
711        let mut queue = MultiLevelQueue::new();
712        queue.push(create_message_with_priority("task1", 5));
713
714        let new_messages = vec![
715            create_message_with_priority("task2", 9),
716            create_message_with_priority("task3", 5),
717        ];
718
719        queue.extend(new_messages);
720        assert_eq!(queue.len(), 3);
721        assert_eq!(queue.len_at_priority(5), 2);
722        assert_eq!(queue.len_at_priority(9), 1);
723    }
724
725    #[test]
726    fn test_multi_level_queue_into_iterator() {
727        let messages = vec![
728            create_message_with_priority("task1", 5),
729            create_message_with_priority("task2", 9),
730            create_message_with_priority("task3", 1),
731        ];
732
733        let queue: MultiLevelQueue = messages.into_iter().collect();
734        let mut count = 0;
735        let mut priorities = Vec::new();
736
737        for msg in queue {
738            priorities.push(msg.properties.priority.unwrap());
739            count += 1;
740        }
741
742        assert_eq!(count, 3);
743        assert_eq!(priorities, vec![9, 5, 1]); // Should be in priority order
744    }
745
746    #[test]
747    fn test_multi_level_queue_iter_exact_size() {
748        let messages = vec![
749            create_message_with_priority("task1", 5),
750            create_message_with_priority("task2", 9),
751        ];
752
753        let queue: MultiLevelQueue = messages.into_iter().collect();
754        let iter = queue.into_iter();
755
756        assert_eq!(iter.len(), 2);
757
758        let collected: Vec<_> = iter.collect();
759        assert_eq!(collected.len(), 2);
760    }
761
762    #[test]
763    fn test_multi_level_queue_extend_with_capacity() {
764        let mut queue = MultiLevelQueue::with_capacity(3);
765        queue.push(create_message_with_priority("task1", 5));
766
767        let new_messages = vec![
768            create_message_with_priority("task2", 9),
769            create_message_with_priority("task3", 1),
770            create_message_with_priority("task4", 7), // Should not be added
771        ];
772
773        queue.extend(new_messages);
774        assert_eq!(queue.len(), 3);
775    }
776}