use peat_schema::event::v1::{EventPriority, PeatEvent};
use std::collections::VecDeque;
pub const PRIORITY_LEVELS: usize = 4;
#[derive(Debug, Default)]
pub struct PriorityEventQueue {
queues: [VecDeque<PeatEvent>; PRIORITY_LEVELS],
}
impl PriorityEventQueue {
pub fn new() -> Self {
Self {
queues: Default::default(),
}
}
pub fn push(&mut self, event: PeatEvent) {
let priority = self.get_priority(&event);
let level = priority_to_level(priority);
self.queues[level].push_back(event);
}
pub fn pop(&mut self) -> Option<PeatEvent> {
for queue in &mut self.queues {
if let Some(event) = queue.pop_front() {
return Some(event);
}
}
None
}
pub fn pop_critical(&mut self) -> Vec<PeatEvent> {
self.queues[0].drain(..).collect()
}
pub fn pop_weighted(&mut self, max_events: usize) -> Vec<PeatEvent> {
if max_events == 0 {
return Vec::new();
}
let mut result = Vec::with_capacity(max_events);
let high_alloc = (max_events * 50) / 100;
let normal_alloc = (max_events * 35) / 100;
let mut remaining = max_events;
let high_count = self.pop_from_level(1, high_alloc.min(remaining), &mut result);
remaining -= high_count;
let normal_target = normal_alloc + (high_alloc - high_count);
let normal_count = self.pop_from_level(2, normal_target.min(remaining), &mut result);
remaining -= normal_count;
self.pop_from_level(3, remaining, &mut result);
result
}
pub fn has_critical(&self) -> bool {
!self.queues[0].is_empty()
}
pub fn len(&self) -> usize {
self.queues.iter().map(|q| q.len()).sum()
}
pub fn is_empty(&self) -> bool {
self.queues.iter().all(|q| q.is_empty())
}
pub fn len_at_priority(&self, priority: EventPriority) -> usize {
let level = priority_to_level(priority);
self.queues[level].len()
}
pub fn counts(&self) -> [usize; PRIORITY_LEVELS] {
[
self.queues[0].len(),
self.queues[1].len(),
self.queues[2].len(),
self.queues[3].len(),
]
}
pub fn clear(&mut self) {
for queue in &mut self.queues {
queue.clear();
}
}
fn get_priority(&self, event: &PeatEvent) -> EventPriority {
event
.routing
.as_ref()
.map(|r| EventPriority::try_from(r.priority).unwrap_or(EventPriority::PriorityNormal))
.unwrap_or(EventPriority::PriorityNormal)
}
fn pop_from_level(&mut self, level: usize, count: usize, result: &mut Vec<PeatEvent>) -> usize {
let mut popped = 0;
while popped < count {
if let Some(event) = self.queues[level].pop_front() {
result.push(event);
popped += 1;
} else {
break;
}
}
popped
}
}
fn priority_to_level(priority: EventPriority) -> usize {
match priority {
EventPriority::PriorityCritical => 0,
EventPriority::PriorityHigh => 1,
EventPriority::PriorityNormal => 2,
EventPriority::PriorityLow => 3,
}
}
#[cfg(test)]
mod tests {
use super::*;
use peat_schema::event::v1::AggregationPolicy;
fn make_event(id: &str, priority: EventPriority) -> PeatEvent {
PeatEvent {
event_id: id.to_string(),
timestamp: None,
source_node_id: "node-1".to_string(),
source_formation_id: "squad-1".to_string(),
source_instance_id: None,
event_class: peat_schema::event::v1::EventClass::Product as i32,
event_type: "test".to_string(),
routing: Some(AggregationPolicy {
propagation: peat_schema::event::v1::PropagationMode::PropagationFull as i32,
priority: priority as i32,
ttl_seconds: 300,
aggregation_window_ms: 0,
}),
payload_type_url: String::new(),
payload_value: vec![],
}
}
#[test]
fn test_priority_ordering() {
let mut queue = PriorityEventQueue::new();
queue.push(make_event("low", EventPriority::PriorityLow));
queue.push(make_event("normal", EventPriority::PriorityNormal));
queue.push(make_event("high", EventPriority::PriorityHigh));
queue.push(make_event("critical", EventPriority::PriorityCritical));
assert_eq!(queue.pop().unwrap().event_id, "critical");
assert_eq!(queue.pop().unwrap().event_id, "high");
assert_eq!(queue.pop().unwrap().event_id, "normal");
assert_eq!(queue.pop().unwrap().event_id, "low");
assert!(queue.pop().is_none());
}
#[test]
fn test_fifo_within_priority() {
let mut queue = PriorityEventQueue::new();
queue.push(make_event("h1", EventPriority::PriorityHigh));
queue.push(make_event("h2", EventPriority::PriorityHigh));
queue.push(make_event("h3", EventPriority::PriorityHigh));
assert_eq!(queue.pop().unwrap().event_id, "h1");
assert_eq!(queue.pop().unwrap().event_id, "h2");
assert_eq!(queue.pop().unwrap().event_id, "h3");
}
#[test]
fn test_pop_critical() {
let mut queue = PriorityEventQueue::new();
queue.push(make_event("c1", EventPriority::PriorityCritical));
queue.push(make_event("h1", EventPriority::PriorityHigh));
queue.push(make_event("c2", EventPriority::PriorityCritical));
let critical = queue.pop_critical();
assert_eq!(critical.len(), 2);
assert_eq!(critical[0].event_id, "c1");
assert_eq!(critical[1].event_id, "c2");
assert_eq!(queue.len(), 1);
assert_eq!(queue.pop().unwrap().event_id, "h1");
}
#[test]
fn test_pop_weighted() {
let mut queue = PriorityEventQueue::new();
for i in 0..10 {
queue.push(make_event(&format!("h{}", i), EventPriority::PriorityHigh));
queue.push(make_event(
&format!("n{}", i),
EventPriority::PriorityNormal,
));
queue.push(make_event(&format!("l{}", i), EventPriority::PriorityLow));
}
let events = queue.pop_weighted(10);
assert_eq!(events.len(), 10);
let high_count = events
.iter()
.filter(|e| e.event_id.starts_with('h'))
.count();
let normal_count = events
.iter()
.filter(|e| e.event_id.starts_with('n'))
.count();
let low_count = events
.iter()
.filter(|e| e.event_id.starts_with('l'))
.count();
assert!((4..=6).contains(&high_count), "high={}", high_count);
assert!((2..=5).contains(&normal_count), "normal={}", normal_count);
assert!(low_count <= 3, "low={}", low_count);
}
#[test]
fn test_weighted_rollover() {
let mut queue = PriorityEventQueue::new();
for i in 0..10 {
queue.push(make_event(&format!("l{}", i), EventPriority::PriorityLow));
}
let events = queue.pop_weighted(5);
assert_eq!(events.len(), 5);
assert!(events.iter().all(|e| e.event_id.starts_with('l')));
}
#[test]
fn test_has_critical() {
let mut queue = PriorityEventQueue::new();
assert!(!queue.has_critical());
queue.push(make_event("h1", EventPriority::PriorityHigh));
assert!(!queue.has_critical());
queue.push(make_event("c1", EventPriority::PriorityCritical));
assert!(queue.has_critical());
queue.pop_critical();
assert!(!queue.has_critical());
}
#[test]
fn test_counts() {
let mut queue = PriorityEventQueue::new();
queue.push(make_event("c1", EventPriority::PriorityCritical));
queue.push(make_event("h1", EventPriority::PriorityHigh));
queue.push(make_event("h2", EventPriority::PriorityHigh));
queue.push(make_event("n1", EventPriority::PriorityNormal));
let counts = queue.counts();
assert_eq!(counts, [1, 2, 1, 0]);
assert_eq!(queue.len(), 4);
assert_eq!(queue.len_at_priority(EventPriority::PriorityCritical), 1);
assert_eq!(queue.len_at_priority(EventPriority::PriorityHigh), 2);
assert_eq!(queue.len_at_priority(EventPriority::PriorityNormal), 1);
assert_eq!(queue.len_at_priority(EventPriority::PriorityLow), 0);
}
#[test]
fn test_default_priority_for_missing_routing() {
let mut queue = PriorityEventQueue::new();
let event = PeatEvent {
event_id: "no-routing".to_string(),
timestamp: None,
source_node_id: "node-1".to_string(),
source_formation_id: "squad-1".to_string(),
source_instance_id: None,
event_class: peat_schema::event::v1::EventClass::Product as i32,
event_type: "test".to_string(),
routing: None,
payload_type_url: String::new(),
payload_value: vec![],
};
queue.push(event);
assert_eq!(queue.len_at_priority(EventPriority::PriorityNormal), 1);
}
}