use crate::Message;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
#[derive(Debug, Clone)]
struct PriorityMessage {
message: Message,
priority: u8,
sequence: u64, }
impl PartialEq for PriorityMessage {
fn eq(&self, other: &Self) -> bool {
self.priority == other.priority && self.sequence == other.sequence
}
}
impl Eq for PriorityMessage {}
impl PartialOrd for PriorityMessage {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PriorityMessage {
fn cmp(&self, other: &Self) -> Ordering {
match self.priority.cmp(&other.priority) {
Ordering::Equal => other.sequence.cmp(&self.sequence), ordering => ordering,
}
}
}
#[derive(Debug, Clone)]
pub struct MessagePriorityQueue {
heap: BinaryHeap<PriorityMessage>,
sequence_counter: u64,
max_size: Option<usize>,
}
impl MessagePriorityQueue {
pub fn new() -> Self {
Self {
heap: BinaryHeap::new(),
sequence_counter: 0,
max_size: None,
}
}
pub fn with_capacity(max_size: usize) -> Self {
Self {
heap: BinaryHeap::with_capacity(max_size),
sequence_counter: 0,
max_size: Some(max_size),
}
}
pub fn push(&mut self, message: Message) -> bool {
if let Some(max) = self.max_size {
if self.heap.len() >= max {
return false;
}
}
let priority = message.properties.priority.unwrap_or(5);
let priority_msg = PriorityMessage {
message,
priority,
sequence: self.sequence_counter,
};
self.sequence_counter = self.sequence_counter.wrapping_add(1);
self.heap.push(priority_msg);
true
}
pub fn pop(&mut self) -> Option<Message> {
self.heap.pop().map(|pm| pm.message)
}
pub fn peek(&self) -> Option<&Message> {
self.heap.peek().map(|pm| &pm.message)
}
#[inline]
pub fn len(&self) -> usize {
self.heap.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.heap.is_empty()
}
#[inline]
pub fn is_full(&self) -> bool {
if let Some(max) = self.max_size {
self.heap.len() >= max
} else {
false
}
}
pub fn clear(&mut self) {
self.heap.clear();
self.sequence_counter = 0;
}
pub fn drain(&mut self) -> Vec<Message> {
let mut messages = Vec::with_capacity(self.heap.len());
while let Some(msg) = self.pop() {
messages.push(msg);
}
messages
}
pub fn filter_by_priority(&self, priority: u8) -> Vec<&Message> {
self.heap
.iter()
.filter(|pm| pm.priority == priority)
.map(|pm| &pm.message)
.collect()
}
pub fn count_by_priority(&self) -> [usize; 10] {
let mut counts = [0; 10];
for pm in &self.heap {
if (pm.priority as usize) < 10 {
counts[pm.priority as usize] += 1;
}
}
counts
}
}
impl Default for MessagePriorityQueue {
fn default() -> Self {
Self::new()
}
}
impl FromIterator<Message> for MessagePriorityQueue {
fn from_iter<T: IntoIterator<Item = Message>>(iter: T) -> Self {
let mut queue = Self::new();
for message in iter {
queue.push(message);
}
queue
}
}
impl Extend<Message> for MessagePriorityQueue {
fn extend<T: IntoIterator<Item = Message>>(&mut self, iter: T) {
for message in iter {
if !self.push(message) {
break; }
}
}
}
impl IntoIterator for MessagePriorityQueue {
type Item = Message;
type IntoIter = PriorityQueueIter;
fn into_iter(self) -> Self::IntoIter {
PriorityQueueIter { queue: self }
}
}
pub struct PriorityQueueIter {
queue: MessagePriorityQueue,
}
impl Iterator for PriorityQueueIter {
type Item = Message;
fn next(&mut self) -> Option<Self::Item> {
self.queue.pop()
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.queue.len();
(len, Some(len))
}
}
impl ExactSizeIterator for PriorityQueueIter {
fn len(&self) -> usize {
self.queue.len()
}
}
#[derive(Debug, Clone)]
pub struct MultiLevelQueue {
queues: [Vec<Message>; 10], total_size: usize,
max_size: Option<usize>,
}
impl MultiLevelQueue {
pub fn new() -> Self {
Self {
queues: Default::default(),
total_size: 0,
max_size: None,
}
}
pub fn with_capacity(max_size: usize) -> Self {
Self {
queues: Default::default(),
total_size: 0,
max_size: Some(max_size),
}
}
pub fn push(&mut self, message: Message) -> bool {
if let Some(max) = self.max_size {
if self.total_size >= max {
return false;
}
}
let priority = message.properties.priority.unwrap_or(5) as usize;
if priority < 10 {
self.queues[priority].push(message);
self.total_size += 1;
true
} else {
false
}
}
pub fn pop(&mut self) -> Option<Message> {
for queue in self.queues.iter_mut().rev() {
if let Some(msg) = queue.pop() {
self.total_size -= 1;
return Some(msg);
}
}
None
}
pub fn peek(&self) -> Option<&Message> {
for queue in self.queues.iter().rev() {
if let Some(msg) = queue.last() {
return Some(msg);
}
}
None
}
#[inline]
pub fn len(&self) -> usize {
self.total_size
}
#[inline]
pub fn is_empty(&self) -> bool {
self.total_size == 0
}
#[inline]
pub fn len_at_priority(&self, priority: u8) -> usize {
if (priority as usize) < 10 {
self.queues[priority as usize].len()
} else {
0
}
}
pub fn clear(&mut self) {
for queue in &mut self.queues {
queue.clear();
}
self.total_size = 0;
}
pub fn drain(&mut self) -> Vec<Message> {
let mut messages = Vec::with_capacity(self.total_size);
while let Some(msg) = self.pop() {
messages.push(msg);
}
messages
}
}
impl Default for MultiLevelQueue {
fn default() -> Self {
Self::new()
}
}
impl FromIterator<Message> for MultiLevelQueue {
fn from_iter<T: IntoIterator<Item = Message>>(iter: T) -> Self {
let mut queue = Self::new();
for message in iter {
queue.push(message);
}
queue
}
}
impl Extend<Message> for MultiLevelQueue {
fn extend<T: IntoIterator<Item = Message>>(&mut self, iter: T) {
for message in iter {
if !self.push(message) {
break; }
}
}
}
impl IntoIterator for MultiLevelQueue {
type Item = Message;
type IntoIter = MultiLevelQueueIter;
fn into_iter(self) -> Self::IntoIter {
MultiLevelQueueIter { queue: self }
}
}
pub struct MultiLevelQueueIter {
queue: MultiLevelQueue,
}
impl Iterator for MultiLevelQueueIter {
type Item = Message;
fn next(&mut self) -> Option<Self::Item> {
self.queue.pop()
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.queue.len();
(len, Some(len))
}
}
impl ExactSizeIterator for MultiLevelQueueIter {
fn len(&self) -> usize {
self.queue.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::builder::MessageBuilder;
fn create_message_with_priority(task: &str, priority: u8) -> Message {
MessageBuilder::new(task)
.priority(priority)
.build()
.unwrap()
}
#[test]
fn test_priority_queue_push_pop() {
let mut queue = MessagePriorityQueue::new();
let msg1 = create_message_with_priority("task1", 5);
let msg2 = create_message_with_priority("task2", 9);
let msg3 = create_message_with_priority("task3", 1);
queue.push(msg1);
queue.push(msg2);
queue.push(msg3);
assert_eq!(queue.len(), 3);
let popped = queue.pop().unwrap();
assert_eq!(popped.properties.priority, Some(9));
let popped = queue.pop().unwrap();
assert_eq!(popped.properties.priority, Some(5));
let popped = queue.pop().unwrap();
assert_eq!(popped.properties.priority, Some(1));
assert!(queue.is_empty());
}
#[test]
fn test_priority_queue_fifo_same_priority() {
let mut queue = MessagePriorityQueue::new();
let msg1 = create_message_with_priority("task1", 5);
let msg2 = create_message_with_priority("task2", 5);
let msg3 = create_message_with_priority("task3", 5);
queue.push(msg1.clone());
queue.push(msg2.clone());
queue.push(msg3.clone());
let popped1 = queue.pop().unwrap();
assert_eq!(popped1.headers.task, "task1");
let popped2 = queue.pop().unwrap();
assert_eq!(popped2.headers.task, "task2");
let popped3 = queue.pop().unwrap();
assert_eq!(popped3.headers.task, "task3");
}
#[test]
fn test_priority_queue_peek() {
let mut queue = MessagePriorityQueue::new();
let msg1 = create_message_with_priority("task1", 5);
let msg2 = create_message_with_priority("task2", 9);
queue.push(msg1);
queue.push(msg2);
let peeked = queue.peek().unwrap();
assert_eq!(peeked.properties.priority, Some(9));
assert_eq!(queue.len(), 2); }
#[test]
fn test_priority_queue_with_capacity() {
let mut queue = MessagePriorityQueue::with_capacity(2);
assert!(queue.push(create_message_with_priority("task1", 5)));
assert!(queue.push(create_message_with_priority("task2", 5)));
assert!(!queue.push(create_message_with_priority("task3", 5)));
assert!(queue.is_full());
}
#[test]
fn test_priority_queue_clear() {
let mut queue = MessagePriorityQueue::new();
queue.push(create_message_with_priority("task1", 5));
queue.push(create_message_with_priority("task2", 5));
assert_eq!(queue.len(), 2);
queue.clear();
assert_eq!(queue.len(), 0);
assert!(queue.is_empty());
}
#[test]
fn test_priority_queue_drain() {
let mut queue = MessagePriorityQueue::new();
queue.push(create_message_with_priority("task1", 1));
queue.push(create_message_with_priority("task2", 9));
queue.push(create_message_with_priority("task3", 5));
let messages = queue.drain();
assert_eq!(messages.len(), 3);
assert_eq!(messages[0].properties.priority, Some(9));
assert_eq!(messages[1].properties.priority, Some(5));
assert_eq!(messages[2].properties.priority, Some(1));
assert!(queue.is_empty());
}
#[test]
fn test_priority_queue_filter_by_priority() {
let mut queue = MessagePriorityQueue::new();
queue.push(create_message_with_priority("task1", 5));
queue.push(create_message_with_priority("task2", 9));
queue.push(create_message_with_priority("task3", 5));
let filtered = queue.filter_by_priority(5);
assert_eq!(filtered.len(), 2);
}
#[test]
fn test_priority_queue_count_by_priority() {
let mut queue = MessagePriorityQueue::new();
queue.push(create_message_with_priority("task1", 5));
queue.push(create_message_with_priority("task2", 9));
queue.push(create_message_with_priority("task3", 5));
queue.push(create_message_with_priority("task4", 1));
let counts = queue.count_by_priority();
assert_eq!(counts[1], 1);
assert_eq!(counts[5], 2);
assert_eq!(counts[9], 1);
}
#[test]
fn test_multi_level_queue_push_pop() {
let mut queue = MultiLevelQueue::new();
queue.push(create_message_with_priority("task1", 5));
queue.push(create_message_with_priority("task2", 9));
queue.push(create_message_with_priority("task3", 1));
assert_eq!(queue.len(), 3);
let popped = queue.pop().unwrap();
assert_eq!(popped.properties.priority, Some(9));
let popped = queue.pop().unwrap();
assert_eq!(popped.properties.priority, Some(5));
let popped = queue.pop().unwrap();
assert_eq!(popped.properties.priority, Some(1));
}
#[test]
fn test_multi_level_queue_len_at_priority() {
let mut queue = MultiLevelQueue::new();
queue.push(create_message_with_priority("task1", 5));
queue.push(create_message_with_priority("task2", 5));
queue.push(create_message_with_priority("task3", 9));
assert_eq!(queue.len_at_priority(5), 2);
assert_eq!(queue.len_at_priority(9), 1);
assert_eq!(queue.len_at_priority(0), 0);
}
#[test]
fn test_multi_level_queue_peek() {
let mut queue = MultiLevelQueue::new();
queue.push(create_message_with_priority("task1", 5));
queue.push(create_message_with_priority("task2", 9));
let peeked = queue.peek().unwrap();
assert_eq!(peeked.properties.priority, Some(9));
assert_eq!(queue.len(), 2);
}
#[test]
fn test_multi_level_queue_clear() {
let mut queue = MultiLevelQueue::new();
queue.push(create_message_with_priority("task1", 5));
queue.push(create_message_with_priority("task2", 9));
queue.clear();
assert!(queue.is_empty());
assert_eq!(queue.len(), 0);
}
#[test]
fn test_from_iterator() {
let messages = vec![
create_message_with_priority("task1", 5),
create_message_with_priority("task2", 9),
create_message_with_priority("task3", 1),
];
let queue: MessagePriorityQueue = messages.into_iter().collect();
assert_eq!(queue.len(), 3);
}
#[test]
fn test_priority_queue_extend() {
let mut queue = MessagePriorityQueue::new();
queue.push(create_message_with_priority("task1", 5));
let new_messages = vec![
create_message_with_priority("task2", 9),
create_message_with_priority("task3", 1),
];
queue.extend(new_messages);
assert_eq!(queue.len(), 3);
assert_eq!(queue.pop().unwrap().properties.priority, Some(9));
assert_eq!(queue.pop().unwrap().properties.priority, Some(5));
assert_eq!(queue.pop().unwrap().properties.priority, Some(1));
}
#[test]
fn test_priority_queue_extend_with_capacity() {
let mut queue = MessagePriorityQueue::with_capacity(3);
queue.push(create_message_with_priority("task1", 5));
let new_messages = vec![
create_message_with_priority("task2", 9),
create_message_with_priority("task3", 1),
create_message_with_priority("task4", 7), ];
queue.extend(new_messages);
assert_eq!(queue.len(), 3);
assert!(queue.is_full());
}
#[test]
fn test_priority_queue_into_iterator() {
let messages = vec![
create_message_with_priority("task1", 5),
create_message_with_priority("task2", 9),
create_message_with_priority("task3", 1),
];
let queue: MessagePriorityQueue = messages.into_iter().collect();
let mut count = 0;
let mut priorities = Vec::new();
for msg in queue {
priorities.push(msg.properties.priority.unwrap());
count += 1;
}
assert_eq!(count, 3);
assert_eq!(priorities, vec![9, 5, 1]); }
#[test]
fn test_priority_queue_iter_exact_size() {
let messages = vec![
create_message_with_priority("task1", 5),
create_message_with_priority("task2", 9),
create_message_with_priority("task3", 1),
];
let queue: MessagePriorityQueue = messages.into_iter().collect();
let iter = queue.into_iter();
assert_eq!(iter.len(), 3);
let collected: Vec<_> = iter.collect();
assert_eq!(collected.len(), 3);
}
#[test]
fn test_priority_queue_iterator_chain() {
let messages = vec![
create_message_with_priority("task1", 5),
create_message_with_priority("task2", 9),
create_message_with_priority("task3", 1),
create_message_with_priority("task4", 7),
];
let queue: MessagePriorityQueue = messages.into_iter().collect();
let task_names: Vec<String> = queue
.into_iter()
.map(|msg| msg.headers.task.clone())
.collect();
assert_eq!(task_names, vec!["task2", "task4", "task1", "task3"]);
}
#[test]
fn test_multi_level_queue_from_iterator() {
let messages = vec![
create_message_with_priority("task1", 5),
create_message_with_priority("task2", 9),
create_message_with_priority("task3", 1),
];
let queue: MultiLevelQueue = messages.into_iter().collect();
assert_eq!(queue.len(), 3);
assert_eq!(queue.len_at_priority(5), 1);
assert_eq!(queue.len_at_priority(9), 1);
assert_eq!(queue.len_at_priority(1), 1);
}
#[test]
fn test_multi_level_queue_extend() {
let mut queue = MultiLevelQueue::new();
queue.push(create_message_with_priority("task1", 5));
let new_messages = vec![
create_message_with_priority("task2", 9),
create_message_with_priority("task3", 5),
];
queue.extend(new_messages);
assert_eq!(queue.len(), 3);
assert_eq!(queue.len_at_priority(5), 2);
assert_eq!(queue.len_at_priority(9), 1);
}
#[test]
fn test_multi_level_queue_into_iterator() {
let messages = vec![
create_message_with_priority("task1", 5),
create_message_with_priority("task2", 9),
create_message_with_priority("task3", 1),
];
let queue: MultiLevelQueue = messages.into_iter().collect();
let mut count = 0;
let mut priorities = Vec::new();
for msg in queue {
priorities.push(msg.properties.priority.unwrap());
count += 1;
}
assert_eq!(count, 3);
assert_eq!(priorities, vec![9, 5, 1]); }
#[test]
fn test_multi_level_queue_iter_exact_size() {
let messages = vec![
create_message_with_priority("task1", 5),
create_message_with_priority("task2", 9),
];
let queue: MultiLevelQueue = messages.into_iter().collect();
let iter = queue.into_iter();
assert_eq!(iter.len(), 2);
let collected: Vec<_> = iter.collect();
assert_eq!(collected.len(), 2);
}
#[test]
fn test_multi_level_queue_extend_with_capacity() {
let mut queue = MultiLevelQueue::with_capacity(3);
queue.push(create_message_with_priority("task1", 5));
let new_messages = vec![
create_message_with_priority("task2", 9),
create_message_with_priority("task3", 1),
create_message_with_priority("task4", 7), ];
queue.extend(new_messages);
assert_eq!(queue.len(), 3);
}
}