1use crate::Message;
7use std::cmp::Ordering;
8use std::collections::BinaryHeap;
9
10#[derive(Debug, Clone)]
12struct PriorityMessage {
13 message: Message,
14 priority: u8,
15 sequence: u64, }
17
18impl PartialEq for PriorityMessage {
19 fn eq(&self, other: &Self) -> bool {
20 self.priority == other.priority && self.sequence == other.sequence
21 }
22}
23
24impl Eq for PriorityMessage {}
25
26impl PartialOrd for PriorityMessage {
27 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
28 Some(self.cmp(other))
29 }
30}
31
32impl Ord for PriorityMessage {
33 fn cmp(&self, other: &Self) -> Ordering {
34 match self.priority.cmp(&other.priority) {
36 Ordering::Equal => other.sequence.cmp(&self.sequence), ordering => ordering,
38 }
39 }
40}
41
42#[derive(Debug, Clone)]
47pub struct MessagePriorityQueue {
48 heap: BinaryHeap<PriorityMessage>,
49 sequence_counter: u64,
50 max_size: Option<usize>,
51}
52
53impl MessagePriorityQueue {
54 pub fn new() -> Self {
56 Self {
57 heap: BinaryHeap::new(),
58 sequence_counter: 0,
59 max_size: None,
60 }
61 }
62
63 pub fn with_capacity(max_size: usize) -> Self {
65 Self {
66 heap: BinaryHeap::with_capacity(max_size),
67 sequence_counter: 0,
68 max_size: Some(max_size),
69 }
70 }
71
72 pub fn push(&mut self, message: Message) -> bool {
76 if let Some(max) = self.max_size {
77 if self.heap.len() >= max {
78 return false;
79 }
80 }
81
82 let priority = message.properties.priority.unwrap_or(5);
83 let priority_msg = PriorityMessage {
84 message,
85 priority,
86 sequence: self.sequence_counter,
87 };
88
89 self.sequence_counter = self.sequence_counter.wrapping_add(1);
90 self.heap.push(priority_msg);
91 true
92 }
93
94 pub fn pop(&mut self) -> Option<Message> {
96 self.heap.pop().map(|pm| pm.message)
97 }
98
99 pub fn peek(&self) -> Option<&Message> {
101 self.heap.peek().map(|pm| &pm.message)
102 }
103
104 #[inline]
106 pub fn len(&self) -> usize {
107 self.heap.len()
108 }
109
110 #[inline]
112 pub fn is_empty(&self) -> bool {
113 self.heap.is_empty()
114 }
115
116 #[inline]
118 pub fn is_full(&self) -> bool {
119 if let Some(max) = self.max_size {
120 self.heap.len() >= max
121 } else {
122 false
123 }
124 }
125
126 pub fn clear(&mut self) {
128 self.heap.clear();
129 self.sequence_counter = 0;
130 }
131
132 pub fn drain(&mut self) -> Vec<Message> {
134 let mut messages = Vec::with_capacity(self.heap.len());
135 while let Some(msg) = self.pop() {
136 messages.push(msg);
137 }
138 messages
139 }
140
141 pub fn filter_by_priority(&self, priority: u8) -> Vec<&Message> {
143 self.heap
144 .iter()
145 .filter(|pm| pm.priority == priority)
146 .map(|pm| &pm.message)
147 .collect()
148 }
149
150 pub fn count_by_priority(&self) -> [usize; 10] {
152 let mut counts = [0; 10];
153 for pm in &self.heap {
154 if (pm.priority as usize) < 10 {
155 counts[pm.priority as usize] += 1;
156 }
157 }
158 counts
159 }
160}
161
162impl Default for MessagePriorityQueue {
163 fn default() -> Self {
164 Self::new()
165 }
166}
167
168impl FromIterator<Message> for MessagePriorityQueue {
169 fn from_iter<T: IntoIterator<Item = Message>>(iter: T) -> Self {
170 let mut queue = Self::new();
171 for message in iter {
172 queue.push(message);
173 }
174 queue
175 }
176}
177
178impl Extend<Message> for MessagePriorityQueue {
179 fn extend<T: IntoIterator<Item = Message>>(&mut self, iter: T) {
180 for message in iter {
181 if !self.push(message) {
182 break; }
184 }
185 }
186}
187
188impl IntoIterator for MessagePriorityQueue {
189 type Item = Message;
190 type IntoIter = PriorityQueueIter;
191
192 fn into_iter(self) -> Self::IntoIter {
193 PriorityQueueIter { queue: self }
194 }
195}
196
197pub struct PriorityQueueIter {
199 queue: MessagePriorityQueue,
200}
201
202impl Iterator for PriorityQueueIter {
203 type Item = Message;
204
205 fn next(&mut self) -> Option<Self::Item> {
206 self.queue.pop()
207 }
208
209 fn size_hint(&self) -> (usize, Option<usize>) {
210 let len = self.queue.len();
211 (len, Some(len))
212 }
213}
214
215impl ExactSizeIterator for PriorityQueueIter {
216 fn len(&self) -> usize {
217 self.queue.len()
218 }
219}
220
221#[derive(Debug, Clone)]
223pub struct MultiLevelQueue {
224 queues: [Vec<Message>; 10], total_size: usize,
226 max_size: Option<usize>,
227}
228
229impl MultiLevelQueue {
230 pub fn new() -> Self {
232 Self {
233 queues: Default::default(),
234 total_size: 0,
235 max_size: None,
236 }
237 }
238
239 pub fn with_capacity(max_size: usize) -> Self {
241 Self {
242 queues: Default::default(),
243 total_size: 0,
244 max_size: Some(max_size),
245 }
246 }
247
248 pub fn push(&mut self, message: Message) -> bool {
250 if let Some(max) = self.max_size {
251 if self.total_size >= max {
252 return false;
253 }
254 }
255
256 let priority = message.properties.priority.unwrap_or(5) as usize;
257 if priority < 10 {
258 self.queues[priority].push(message);
259 self.total_size += 1;
260 true
261 } else {
262 false
263 }
264 }
265
266 pub fn pop(&mut self) -> Option<Message> {
268 for queue in self.queues.iter_mut().rev() {
270 if let Some(msg) = queue.pop() {
271 self.total_size -= 1;
272 return Some(msg);
273 }
274 }
275 None
276 }
277
278 pub fn peek(&self) -> Option<&Message> {
280 for queue in self.queues.iter().rev() {
281 if let Some(msg) = queue.last() {
282 return Some(msg);
283 }
284 }
285 None
286 }
287
288 #[inline]
290 pub fn len(&self) -> usize {
291 self.total_size
292 }
293
294 #[inline]
296 pub fn is_empty(&self) -> bool {
297 self.total_size == 0
298 }
299
300 #[inline]
302 pub fn len_at_priority(&self, priority: u8) -> usize {
303 if (priority as usize) < 10 {
304 self.queues[priority as usize].len()
305 } else {
306 0
307 }
308 }
309
310 pub fn clear(&mut self) {
312 for queue in &mut self.queues {
313 queue.clear();
314 }
315 self.total_size = 0;
316 }
317
318 pub fn drain(&mut self) -> Vec<Message> {
320 let mut messages = Vec::with_capacity(self.total_size);
321 while let Some(msg) = self.pop() {
322 messages.push(msg);
323 }
324 messages
325 }
326}
327
328impl Default for MultiLevelQueue {
329 fn default() -> Self {
330 Self::new()
331 }
332}
333
334impl FromIterator<Message> for MultiLevelQueue {
335 fn from_iter<T: IntoIterator<Item = Message>>(iter: T) -> Self {
336 let mut queue = Self::new();
337 for message in iter {
338 queue.push(message);
339 }
340 queue
341 }
342}
343
344impl Extend<Message> for MultiLevelQueue {
345 fn extend<T: IntoIterator<Item = Message>>(&mut self, iter: T) {
346 for message in iter {
347 if !self.push(message) {
348 break; }
350 }
351 }
352}
353
354impl IntoIterator for MultiLevelQueue {
355 type Item = Message;
356 type IntoIter = MultiLevelQueueIter;
357
358 fn into_iter(self) -> Self::IntoIter {
359 MultiLevelQueueIter { queue: self }
360 }
361}
362
363pub struct MultiLevelQueueIter {
365 queue: MultiLevelQueue,
366}
367
368impl Iterator for MultiLevelQueueIter {
369 type Item = Message;
370
371 fn next(&mut self) -> Option<Self::Item> {
372 self.queue.pop()
373 }
374
375 fn size_hint(&self) -> (usize, Option<usize>) {
376 let len = self.queue.len();
377 (len, Some(len))
378 }
379}
380
381impl ExactSizeIterator for MultiLevelQueueIter {
382 fn len(&self) -> usize {
383 self.queue.len()
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390 use crate::builder::MessageBuilder;
391
392 fn create_message_with_priority(task: &str, priority: u8) -> Message {
393 MessageBuilder::new(task)
394 .priority(priority)
395 .build()
396 .unwrap()
397 }
398
399 #[test]
400 fn test_priority_queue_push_pop() {
401 let mut queue = MessagePriorityQueue::new();
402
403 let msg1 = create_message_with_priority("task1", 5);
404 let msg2 = create_message_with_priority("task2", 9);
405 let msg3 = create_message_with_priority("task3", 1);
406
407 queue.push(msg1);
408 queue.push(msg2);
409 queue.push(msg3);
410
411 assert_eq!(queue.len(), 3);
412
413 let popped = queue.pop().unwrap();
415 assert_eq!(popped.properties.priority, Some(9));
416
417 let popped = queue.pop().unwrap();
418 assert_eq!(popped.properties.priority, Some(5));
419
420 let popped = queue.pop().unwrap();
421 assert_eq!(popped.properties.priority, Some(1));
422
423 assert!(queue.is_empty());
424 }
425
426 #[test]
427 fn test_priority_queue_fifo_same_priority() {
428 let mut queue = MessagePriorityQueue::new();
429
430 let msg1 = create_message_with_priority("task1", 5);
431 let msg2 = create_message_with_priority("task2", 5);
432 let msg3 = create_message_with_priority("task3", 5);
433
434 queue.push(msg1.clone());
435 queue.push(msg2.clone());
436 queue.push(msg3.clone());
437
438 let popped1 = queue.pop().unwrap();
440 assert_eq!(popped1.headers.task, "task1");
441
442 let popped2 = queue.pop().unwrap();
443 assert_eq!(popped2.headers.task, "task2");
444
445 let popped3 = queue.pop().unwrap();
446 assert_eq!(popped3.headers.task, "task3");
447 }
448
449 #[test]
450 fn test_priority_queue_peek() {
451 let mut queue = MessagePriorityQueue::new();
452
453 let msg1 = create_message_with_priority("task1", 5);
454 let msg2 = create_message_with_priority("task2", 9);
455
456 queue.push(msg1);
457 queue.push(msg2);
458
459 let peeked = queue.peek().unwrap();
460 assert_eq!(peeked.properties.priority, Some(9));
461 assert_eq!(queue.len(), 2); }
463
464 #[test]
465 fn test_priority_queue_with_capacity() {
466 let mut queue = MessagePriorityQueue::with_capacity(2);
467
468 assert!(queue.push(create_message_with_priority("task1", 5)));
469 assert!(queue.push(create_message_with_priority("task2", 5)));
470 assert!(!queue.push(create_message_with_priority("task3", 5))); assert!(queue.is_full());
473 }
474
475 #[test]
476 fn test_priority_queue_clear() {
477 let mut queue = MessagePriorityQueue::new();
478
479 queue.push(create_message_with_priority("task1", 5));
480 queue.push(create_message_with_priority("task2", 5));
481
482 assert_eq!(queue.len(), 2);
483
484 queue.clear();
485 assert_eq!(queue.len(), 0);
486 assert!(queue.is_empty());
487 }
488
489 #[test]
490 fn test_priority_queue_drain() {
491 let mut queue = MessagePriorityQueue::new();
492
493 queue.push(create_message_with_priority("task1", 1));
494 queue.push(create_message_with_priority("task2", 9));
495 queue.push(create_message_with_priority("task3", 5));
496
497 let messages = queue.drain();
498 assert_eq!(messages.len(), 3);
499 assert_eq!(messages[0].properties.priority, Some(9));
500 assert_eq!(messages[1].properties.priority, Some(5));
501 assert_eq!(messages[2].properties.priority, Some(1));
502 assert!(queue.is_empty());
503 }
504
505 #[test]
506 fn test_priority_queue_filter_by_priority() {
507 let mut queue = MessagePriorityQueue::new();
508
509 queue.push(create_message_with_priority("task1", 5));
510 queue.push(create_message_with_priority("task2", 9));
511 queue.push(create_message_with_priority("task3", 5));
512
513 let filtered = queue.filter_by_priority(5);
514 assert_eq!(filtered.len(), 2);
515 }
516
517 #[test]
518 fn test_priority_queue_count_by_priority() {
519 let mut queue = MessagePriorityQueue::new();
520
521 queue.push(create_message_with_priority("task1", 5));
522 queue.push(create_message_with_priority("task2", 9));
523 queue.push(create_message_with_priority("task3", 5));
524 queue.push(create_message_with_priority("task4", 1));
525
526 let counts = queue.count_by_priority();
527 assert_eq!(counts[1], 1);
528 assert_eq!(counts[5], 2);
529 assert_eq!(counts[9], 1);
530 }
531
532 #[test]
533 fn test_multi_level_queue_push_pop() {
534 let mut queue = MultiLevelQueue::new();
535
536 queue.push(create_message_with_priority("task1", 5));
537 queue.push(create_message_with_priority("task2", 9));
538 queue.push(create_message_with_priority("task3", 1));
539
540 assert_eq!(queue.len(), 3);
541
542 let popped = queue.pop().unwrap();
544 assert_eq!(popped.properties.priority, Some(9));
545
546 let popped = queue.pop().unwrap();
547 assert_eq!(popped.properties.priority, Some(5));
548
549 let popped = queue.pop().unwrap();
550 assert_eq!(popped.properties.priority, Some(1));
551 }
552
553 #[test]
554 fn test_multi_level_queue_len_at_priority() {
555 let mut queue = MultiLevelQueue::new();
556
557 queue.push(create_message_with_priority("task1", 5));
558 queue.push(create_message_with_priority("task2", 5));
559 queue.push(create_message_with_priority("task3", 9));
560
561 assert_eq!(queue.len_at_priority(5), 2);
562 assert_eq!(queue.len_at_priority(9), 1);
563 assert_eq!(queue.len_at_priority(0), 0);
564 }
565
566 #[test]
567 fn test_multi_level_queue_peek() {
568 let mut queue = MultiLevelQueue::new();
569
570 queue.push(create_message_with_priority("task1", 5));
571 queue.push(create_message_with_priority("task2", 9));
572
573 let peeked = queue.peek().unwrap();
574 assert_eq!(peeked.properties.priority, Some(9));
575 assert_eq!(queue.len(), 2);
576 }
577
578 #[test]
579 fn test_multi_level_queue_clear() {
580 let mut queue = MultiLevelQueue::new();
581
582 queue.push(create_message_with_priority("task1", 5));
583 queue.push(create_message_with_priority("task2", 9));
584
585 queue.clear();
586 assert!(queue.is_empty());
587 assert_eq!(queue.len(), 0);
588 }
589
590 #[test]
591 fn test_from_iterator() {
592 let messages = vec![
593 create_message_with_priority("task1", 5),
594 create_message_with_priority("task2", 9),
595 create_message_with_priority("task3", 1),
596 ];
597
598 let queue: MessagePriorityQueue = messages.into_iter().collect();
599 assert_eq!(queue.len(), 3);
600 }
601
602 #[test]
603 fn test_priority_queue_extend() {
604 let mut queue = MessagePriorityQueue::new();
605 queue.push(create_message_with_priority("task1", 5));
606
607 let new_messages = vec![
608 create_message_with_priority("task2", 9),
609 create_message_with_priority("task3", 1),
610 ];
611
612 queue.extend(new_messages);
613 assert_eq!(queue.len(), 3);
614
615 assert_eq!(queue.pop().unwrap().properties.priority, Some(9));
617 assert_eq!(queue.pop().unwrap().properties.priority, Some(5));
618 assert_eq!(queue.pop().unwrap().properties.priority, Some(1));
619 }
620
621 #[test]
622 fn test_priority_queue_extend_with_capacity() {
623 let mut queue = MessagePriorityQueue::with_capacity(3);
624 queue.push(create_message_with_priority("task1", 5));
625
626 let new_messages = vec![
627 create_message_with_priority("task2", 9),
628 create_message_with_priority("task3", 1),
629 create_message_with_priority("task4", 7), ];
631
632 queue.extend(new_messages);
633 assert_eq!(queue.len(), 3);
634 assert!(queue.is_full());
635 }
636
637 #[test]
638 fn test_priority_queue_into_iterator() {
639 let messages = vec![
640 create_message_with_priority("task1", 5),
641 create_message_with_priority("task2", 9),
642 create_message_with_priority("task3", 1),
643 ];
644
645 let queue: MessagePriorityQueue = messages.into_iter().collect();
646 let mut count = 0;
647 let mut priorities = Vec::new();
648
649 for msg in queue {
650 priorities.push(msg.properties.priority.unwrap());
651 count += 1;
652 }
653
654 assert_eq!(count, 3);
655 assert_eq!(priorities, vec![9, 5, 1]); }
657
658 #[test]
659 fn test_priority_queue_iter_exact_size() {
660 let messages = vec![
661 create_message_with_priority("task1", 5),
662 create_message_with_priority("task2", 9),
663 create_message_with_priority("task3", 1),
664 ];
665
666 let queue: MessagePriorityQueue = messages.into_iter().collect();
667 let iter = queue.into_iter();
668
669 assert_eq!(iter.len(), 3);
670
671 let collected: Vec<_> = iter.collect();
672 assert_eq!(collected.len(), 3);
673 }
674
675 #[test]
676 fn test_priority_queue_iterator_chain() {
677 let messages = vec![
678 create_message_with_priority("task1", 5),
679 create_message_with_priority("task2", 9),
680 create_message_with_priority("task3", 1),
681 create_message_with_priority("task4", 7),
682 ];
683
684 let queue: MessagePriorityQueue = messages.into_iter().collect();
685
686 let task_names: Vec<String> = queue
687 .into_iter()
688 .map(|msg| msg.headers.task.clone())
689 .collect();
690
691 assert_eq!(task_names, vec!["task2", "task4", "task1", "task3"]);
692 }
693
694 #[test]
695 fn test_multi_level_queue_from_iterator() {
696 let messages = vec![
697 create_message_with_priority("task1", 5),
698 create_message_with_priority("task2", 9),
699 create_message_with_priority("task3", 1),
700 ];
701
702 let queue: MultiLevelQueue = messages.into_iter().collect();
703 assert_eq!(queue.len(), 3);
704 assert_eq!(queue.len_at_priority(5), 1);
705 assert_eq!(queue.len_at_priority(9), 1);
706 assert_eq!(queue.len_at_priority(1), 1);
707 }
708
709 #[test]
710 fn test_multi_level_queue_extend() {
711 let mut queue = MultiLevelQueue::new();
712 queue.push(create_message_with_priority("task1", 5));
713
714 let new_messages = vec![
715 create_message_with_priority("task2", 9),
716 create_message_with_priority("task3", 5),
717 ];
718
719 queue.extend(new_messages);
720 assert_eq!(queue.len(), 3);
721 assert_eq!(queue.len_at_priority(5), 2);
722 assert_eq!(queue.len_at_priority(9), 1);
723 }
724
725 #[test]
726 fn test_multi_level_queue_into_iterator() {
727 let messages = vec![
728 create_message_with_priority("task1", 5),
729 create_message_with_priority("task2", 9),
730 create_message_with_priority("task3", 1),
731 ];
732
733 let queue: MultiLevelQueue = messages.into_iter().collect();
734 let mut count = 0;
735 let mut priorities = Vec::new();
736
737 for msg in queue {
738 priorities.push(msg.properties.priority.unwrap());
739 count += 1;
740 }
741
742 assert_eq!(count, 3);
743 assert_eq!(priorities, vec![9, 5, 1]); }
745
746 #[test]
747 fn test_multi_level_queue_iter_exact_size() {
748 let messages = vec![
749 create_message_with_priority("task1", 5),
750 create_message_with_priority("task2", 9),
751 ];
752
753 let queue: MultiLevelQueue = messages.into_iter().collect();
754 let iter = queue.into_iter();
755
756 assert_eq!(iter.len(), 2);
757
758 let collected: Vec<_> = iter.collect();
759 assert_eq!(collected.len(), 2);
760 }
761
762 #[test]
763 fn test_multi_level_queue_extend_with_capacity() {
764 let mut queue = MultiLevelQueue::with_capacity(3);
765 queue.push(create_message_with_priority("task1", 5));
766
767 let new_messages = vec![
768 create_message_with_priority("task2", 9),
769 create_message_with_priority("task3", 1),
770 create_message_with_priority("task4", 7), ];
772
773 queue.extend(new_messages);
774 assert_eq!(queue.len(), 3);
775 }
776}