Skip to main content

peat_protocol/event/
priority_queue.rs

1//! Priority-based event queue for transmission scheduling (ADR-027)
2//!
3//! Implements a 4-level priority queue where:
4//! - CRITICAL events always transmit immediately, preempting others
5//! - HIGH/NORMAL/LOW share remaining bandwidth with weighted fair queuing
6
7use peat_schema::event::v1::{EventPriority, PeatEvent};
8use std::collections::VecDeque;
9
10/// Number of priority levels
11pub const PRIORITY_LEVELS: usize = 4;
12
13/// Priority-based event queue for transmission scheduling
14///
15/// Events are organized into 4 priority levels:
16/// - Level 0: CRITICAL - immediate, preempts all other traffic
17/// - Level 1: HIGH - after CRITICAL
18/// - Level 2: NORMAL - default priority
19/// - Level 3: LOW - transmitted when bandwidth available
20#[derive(Debug, Default)]
21pub struct PriorityEventQueue {
22    /// Queues for each priority level (CRITICAL=0, HIGH=1, NORMAL=2, LOW=3)
23    queues: [VecDeque<PeatEvent>; PRIORITY_LEVELS],
24}
25
26impl PriorityEventQueue {
27    /// Create a new empty priority queue
28    pub fn new() -> Self {
29        Self {
30            queues: Default::default(),
31        }
32    }
33
34    /// Push an event onto the appropriate priority queue
35    pub fn push(&mut self, event: PeatEvent) {
36        let priority = self.get_priority(&event);
37        let level = priority_to_level(priority);
38        self.queues[level].push_back(event);
39    }
40
41    /// Pop the highest-priority event
42    ///
43    /// CRITICAL events are always returned first. If no CRITICAL events,
44    /// returns from HIGH, then NORMAL, then LOW.
45    pub fn pop(&mut self) -> Option<PeatEvent> {
46        for queue in &mut self.queues {
47            if let Some(event) = queue.pop_front() {
48                return Some(event);
49            }
50        }
51        None
52    }
53
54    /// Pop all CRITICAL events (for immediate transmission)
55    ///
56    /// Returns events in FIFO order within CRITICAL priority.
57    pub fn pop_critical(&mut self) -> Vec<PeatEvent> {
58        self.queues[0].drain(..).collect()
59    }
60
61    /// Pop events from non-critical queues for weighted fair transmission
62    ///
63    /// Returns up to `max_events` events, weighted by priority:
64    /// - HIGH: 50% of allocation
65    /// - NORMAL: 35% of allocation
66    /// - LOW: 15% of allocation
67    ///
68    /// Unused allocation rolls to lower priorities.
69    pub fn pop_weighted(&mut self, max_events: usize) -> Vec<PeatEvent> {
70        if max_events == 0 {
71            return Vec::new();
72        }
73
74        let mut result = Vec::with_capacity(max_events);
75
76        // Calculate allocations (weighted fair queuing)
77        let high_alloc = (max_events * 50) / 100;
78        let normal_alloc = (max_events * 35) / 100;
79        // LOW gets remainder
80
81        // Pop from HIGH (level 1)
82        let mut remaining = max_events;
83        let high_count = self.pop_from_level(1, high_alloc.min(remaining), &mut result);
84        remaining -= high_count;
85
86        // Unused HIGH allocation rolls to NORMAL
87        let normal_target = normal_alloc + (high_alloc - high_count);
88        let normal_count = self.pop_from_level(2, normal_target.min(remaining), &mut result);
89        remaining -= normal_count;
90
91        // LOW gets everything remaining
92        self.pop_from_level(3, remaining, &mut result);
93
94        result
95    }
96
97    /// Check if there are any CRITICAL events pending
98    pub fn has_critical(&self) -> bool {
99        !self.queues[0].is_empty()
100    }
101
102    /// Get total count of events across all priorities
103    pub fn len(&self) -> usize {
104        self.queues.iter().map(|q| q.len()).sum()
105    }
106
107    /// Check if queue is empty
108    pub fn is_empty(&self) -> bool {
109        self.queues.iter().all(|q| q.is_empty())
110    }
111
112    /// Get count of events at a specific priority level
113    pub fn len_at_priority(&self, priority: EventPriority) -> usize {
114        let level = priority_to_level(priority);
115        self.queues[level].len()
116    }
117
118    /// Get total count by priority level (for metrics)
119    pub fn counts(&self) -> [usize; PRIORITY_LEVELS] {
120        [
121            self.queues[0].len(),
122            self.queues[1].len(),
123            self.queues[2].len(),
124            self.queues[3].len(),
125        ]
126    }
127
128    /// Clear all events from the queue
129    pub fn clear(&mut self) {
130        for queue in &mut self.queues {
131            queue.clear();
132        }
133    }
134
135    // Internal helpers
136
137    fn get_priority(&self, event: &PeatEvent) -> EventPriority {
138        event
139            .routing
140            .as_ref()
141            .map(|r| EventPriority::try_from(r.priority).unwrap_or(EventPriority::PriorityNormal))
142            .unwrap_or(EventPriority::PriorityNormal)
143    }
144
145    fn pop_from_level(&mut self, level: usize, count: usize, result: &mut Vec<PeatEvent>) -> usize {
146        let mut popped = 0;
147        while popped < count {
148            if let Some(event) = self.queues[level].pop_front() {
149                result.push(event);
150                popped += 1;
151            } else {
152                break;
153            }
154        }
155        popped
156    }
157}
158
159/// Convert EventPriority enum to queue level index
160fn priority_to_level(priority: EventPriority) -> usize {
161    match priority {
162        EventPriority::PriorityCritical => 0,
163        EventPriority::PriorityHigh => 1,
164        EventPriority::PriorityNormal => 2,
165        EventPriority::PriorityLow => 3,
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172    use peat_schema::event::v1::AggregationPolicy;
173
174    fn make_event(id: &str, priority: EventPriority) -> PeatEvent {
175        PeatEvent {
176            event_id: id.to_string(),
177            timestamp: None,
178            source_node_id: "node-1".to_string(),
179            source_formation_id: "squad-1".to_string(),
180            source_instance_id: None,
181            event_class: peat_schema::event::v1::EventClass::Product as i32,
182            event_type: "test".to_string(),
183            routing: Some(AggregationPolicy {
184                propagation: peat_schema::event::v1::PropagationMode::PropagationFull as i32,
185                priority: priority as i32,
186                ttl_seconds: 300,
187                aggregation_window_ms: 0,
188            }),
189            payload_type_url: String::new(),
190            payload_value: vec![],
191        }
192    }
193
194    #[test]
195    fn test_priority_ordering() {
196        let mut queue = PriorityEventQueue::new();
197
198        // Add events in reverse priority order
199        queue.push(make_event("low", EventPriority::PriorityLow));
200        queue.push(make_event("normal", EventPriority::PriorityNormal));
201        queue.push(make_event("high", EventPriority::PriorityHigh));
202        queue.push(make_event("critical", EventPriority::PriorityCritical));
203
204        // Should pop in priority order (CRITICAL first)
205        assert_eq!(queue.pop().unwrap().event_id, "critical");
206        assert_eq!(queue.pop().unwrap().event_id, "high");
207        assert_eq!(queue.pop().unwrap().event_id, "normal");
208        assert_eq!(queue.pop().unwrap().event_id, "low");
209        assert!(queue.pop().is_none());
210    }
211
212    #[test]
213    fn test_fifo_within_priority() {
214        let mut queue = PriorityEventQueue::new();
215
216        // Add multiple HIGH priority events
217        queue.push(make_event("h1", EventPriority::PriorityHigh));
218        queue.push(make_event("h2", EventPriority::PriorityHigh));
219        queue.push(make_event("h3", EventPriority::PriorityHigh));
220
221        // Should maintain FIFO order within priority
222        assert_eq!(queue.pop().unwrap().event_id, "h1");
223        assert_eq!(queue.pop().unwrap().event_id, "h2");
224        assert_eq!(queue.pop().unwrap().event_id, "h3");
225    }
226
227    #[test]
228    fn test_pop_critical() {
229        let mut queue = PriorityEventQueue::new();
230
231        queue.push(make_event("c1", EventPriority::PriorityCritical));
232        queue.push(make_event("h1", EventPriority::PriorityHigh));
233        queue.push(make_event("c2", EventPriority::PriorityCritical));
234
235        let critical = queue.pop_critical();
236        assert_eq!(critical.len(), 2);
237        assert_eq!(critical[0].event_id, "c1");
238        assert_eq!(critical[1].event_id, "c2");
239
240        // HIGH should still be there
241        assert_eq!(queue.len(), 1);
242        assert_eq!(queue.pop().unwrap().event_id, "h1");
243    }
244
245    #[test]
246    fn test_pop_weighted() {
247        let mut queue = PriorityEventQueue::new();
248
249        // Add 10 events at each non-critical priority
250        for i in 0..10 {
251            queue.push(make_event(&format!("h{}", i), EventPriority::PriorityHigh));
252            queue.push(make_event(
253                &format!("n{}", i),
254                EventPriority::PriorityNormal,
255            ));
256            queue.push(make_event(&format!("l{}", i), EventPriority::PriorityLow));
257        }
258
259        // Pop 10 events with weighted allocation
260        // Expected: ~5 HIGH, ~3.5 NORMAL, ~1.5 LOW
261        let events = queue.pop_weighted(10);
262        assert_eq!(events.len(), 10);
263
264        // Count by priority
265        let high_count = events
266            .iter()
267            .filter(|e| e.event_id.starts_with('h'))
268            .count();
269        let normal_count = events
270            .iter()
271            .filter(|e| e.event_id.starts_with('n'))
272            .count();
273        let low_count = events
274            .iter()
275            .filter(|e| e.event_id.starts_with('l'))
276            .count();
277
278        // Verify weighted distribution (approximate)
279        assert!((4..=6).contains(&high_count), "high={}", high_count);
280        assert!((2..=5).contains(&normal_count), "normal={}", normal_count);
281        assert!(low_count <= 3, "low={}", low_count);
282    }
283
284    #[test]
285    fn test_weighted_rollover() {
286        let mut queue = PriorityEventQueue::new();
287
288        // Only LOW priority events
289        for i in 0..10 {
290            queue.push(make_event(&format!("l{}", i), EventPriority::PriorityLow));
291        }
292
293        // Pop 5 events - should all come from LOW since HIGH/NORMAL are empty
294        let events = queue.pop_weighted(5);
295        assert_eq!(events.len(), 5);
296        assert!(events.iter().all(|e| e.event_id.starts_with('l')));
297    }
298
299    #[test]
300    fn test_has_critical() {
301        let mut queue = PriorityEventQueue::new();
302        assert!(!queue.has_critical());
303
304        queue.push(make_event("h1", EventPriority::PriorityHigh));
305        assert!(!queue.has_critical());
306
307        queue.push(make_event("c1", EventPriority::PriorityCritical));
308        assert!(queue.has_critical());
309
310        queue.pop_critical();
311        assert!(!queue.has_critical());
312    }
313
314    #[test]
315    fn test_counts() {
316        let mut queue = PriorityEventQueue::new();
317
318        queue.push(make_event("c1", EventPriority::PriorityCritical));
319        queue.push(make_event("h1", EventPriority::PriorityHigh));
320        queue.push(make_event("h2", EventPriority::PriorityHigh));
321        queue.push(make_event("n1", EventPriority::PriorityNormal));
322
323        let counts = queue.counts();
324        assert_eq!(counts, [1, 2, 1, 0]);
325        assert_eq!(queue.len(), 4);
326
327        assert_eq!(queue.len_at_priority(EventPriority::PriorityCritical), 1);
328        assert_eq!(queue.len_at_priority(EventPriority::PriorityHigh), 2);
329        assert_eq!(queue.len_at_priority(EventPriority::PriorityNormal), 1);
330        assert_eq!(queue.len_at_priority(EventPriority::PriorityLow), 0);
331    }
332
333    #[test]
334    fn test_default_priority_for_missing_routing() {
335        let mut queue = PriorityEventQueue::new();
336
337        // Event without routing should default to NORMAL
338        let event = PeatEvent {
339            event_id: "no-routing".to_string(),
340            timestamp: None,
341            source_node_id: "node-1".to_string(),
342            source_formation_id: "squad-1".to_string(),
343            source_instance_id: None,
344            event_class: peat_schema::event::v1::EventClass::Product as i32,
345            event_type: "test".to_string(),
346            routing: None,
347            payload_type_url: String::new(),
348            payload_value: vec![],
349        };
350
351        queue.push(event);
352        assert_eq!(queue.len_at_priority(EventPriority::PriorityNormal), 1);
353    }
354}