1use peat_schema::event::v1::{EventPriority, PeatEvent};
8use std::collections::VecDeque;
9
10pub const PRIORITY_LEVELS: usize = 4;
12
13#[derive(Debug, Default)]
21pub struct PriorityEventQueue {
22 queues: [VecDeque<PeatEvent>; PRIORITY_LEVELS],
24}
25
26impl PriorityEventQueue {
27 pub fn new() -> Self {
29 Self {
30 queues: Default::default(),
31 }
32 }
33
34 pub fn push(&mut self, event: PeatEvent) {
36 let priority = self.get_priority(&event);
37 let level = priority_to_level(priority);
38 self.queues[level].push_back(event);
39 }
40
41 pub fn pop(&mut self) -> Option<PeatEvent> {
46 for queue in &mut self.queues {
47 if let Some(event) = queue.pop_front() {
48 return Some(event);
49 }
50 }
51 None
52 }
53
54 pub fn pop_critical(&mut self) -> Vec<PeatEvent> {
58 self.queues[0].drain(..).collect()
59 }
60
61 pub fn pop_weighted(&mut self, max_events: usize) -> Vec<PeatEvent> {
70 if max_events == 0 {
71 return Vec::new();
72 }
73
74 let mut result = Vec::with_capacity(max_events);
75
76 let high_alloc = (max_events * 50) / 100;
78 let normal_alloc = (max_events * 35) / 100;
79 let mut remaining = max_events;
83 let high_count = self.pop_from_level(1, high_alloc.min(remaining), &mut result);
84 remaining -= high_count;
85
86 let normal_target = normal_alloc + (high_alloc - high_count);
88 let normal_count = self.pop_from_level(2, normal_target.min(remaining), &mut result);
89 remaining -= normal_count;
90
91 self.pop_from_level(3, remaining, &mut result);
93
94 result
95 }
96
97 pub fn has_critical(&self) -> bool {
99 !self.queues[0].is_empty()
100 }
101
102 pub fn len(&self) -> usize {
104 self.queues.iter().map(|q| q.len()).sum()
105 }
106
107 pub fn is_empty(&self) -> bool {
109 self.queues.iter().all(|q| q.is_empty())
110 }
111
112 pub fn len_at_priority(&self, priority: EventPriority) -> usize {
114 let level = priority_to_level(priority);
115 self.queues[level].len()
116 }
117
118 pub fn counts(&self) -> [usize; PRIORITY_LEVELS] {
120 [
121 self.queues[0].len(),
122 self.queues[1].len(),
123 self.queues[2].len(),
124 self.queues[3].len(),
125 ]
126 }
127
128 pub fn clear(&mut self) {
130 for queue in &mut self.queues {
131 queue.clear();
132 }
133 }
134
135 fn get_priority(&self, event: &PeatEvent) -> EventPriority {
138 event
139 .routing
140 .as_ref()
141 .map(|r| EventPriority::try_from(r.priority).unwrap_or(EventPriority::PriorityNormal))
142 .unwrap_or(EventPriority::PriorityNormal)
143 }
144
145 fn pop_from_level(&mut self, level: usize, count: usize, result: &mut Vec<PeatEvent>) -> usize {
146 let mut popped = 0;
147 while popped < count {
148 if let Some(event) = self.queues[level].pop_front() {
149 result.push(event);
150 popped += 1;
151 } else {
152 break;
153 }
154 }
155 popped
156 }
157}
158
159fn priority_to_level(priority: EventPriority) -> usize {
161 match priority {
162 EventPriority::PriorityCritical => 0,
163 EventPriority::PriorityHigh => 1,
164 EventPriority::PriorityNormal => 2,
165 EventPriority::PriorityLow => 3,
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172 use peat_schema::event::v1::AggregationPolicy;
173
174 fn make_event(id: &str, priority: EventPriority) -> PeatEvent {
175 PeatEvent {
176 event_id: id.to_string(),
177 timestamp: None,
178 source_node_id: "node-1".to_string(),
179 source_formation_id: "squad-1".to_string(),
180 source_instance_id: None,
181 event_class: peat_schema::event::v1::EventClass::Product as i32,
182 event_type: "test".to_string(),
183 routing: Some(AggregationPolicy {
184 propagation: peat_schema::event::v1::PropagationMode::PropagationFull as i32,
185 priority: priority as i32,
186 ttl_seconds: 300,
187 aggregation_window_ms: 0,
188 }),
189 payload_type_url: String::new(),
190 payload_value: vec![],
191 }
192 }
193
194 #[test]
195 fn test_priority_ordering() {
196 let mut queue = PriorityEventQueue::new();
197
198 queue.push(make_event("low", EventPriority::PriorityLow));
200 queue.push(make_event("normal", EventPriority::PriorityNormal));
201 queue.push(make_event("high", EventPriority::PriorityHigh));
202 queue.push(make_event("critical", EventPriority::PriorityCritical));
203
204 assert_eq!(queue.pop().unwrap().event_id, "critical");
206 assert_eq!(queue.pop().unwrap().event_id, "high");
207 assert_eq!(queue.pop().unwrap().event_id, "normal");
208 assert_eq!(queue.pop().unwrap().event_id, "low");
209 assert!(queue.pop().is_none());
210 }
211
212 #[test]
213 fn test_fifo_within_priority() {
214 let mut queue = PriorityEventQueue::new();
215
216 queue.push(make_event("h1", EventPriority::PriorityHigh));
218 queue.push(make_event("h2", EventPriority::PriorityHigh));
219 queue.push(make_event("h3", EventPriority::PriorityHigh));
220
221 assert_eq!(queue.pop().unwrap().event_id, "h1");
223 assert_eq!(queue.pop().unwrap().event_id, "h2");
224 assert_eq!(queue.pop().unwrap().event_id, "h3");
225 }
226
227 #[test]
228 fn test_pop_critical() {
229 let mut queue = PriorityEventQueue::new();
230
231 queue.push(make_event("c1", EventPriority::PriorityCritical));
232 queue.push(make_event("h1", EventPriority::PriorityHigh));
233 queue.push(make_event("c2", EventPriority::PriorityCritical));
234
235 let critical = queue.pop_critical();
236 assert_eq!(critical.len(), 2);
237 assert_eq!(critical[0].event_id, "c1");
238 assert_eq!(critical[1].event_id, "c2");
239
240 assert_eq!(queue.len(), 1);
242 assert_eq!(queue.pop().unwrap().event_id, "h1");
243 }
244
245 #[test]
246 fn test_pop_weighted() {
247 let mut queue = PriorityEventQueue::new();
248
249 for i in 0..10 {
251 queue.push(make_event(&format!("h{}", i), EventPriority::PriorityHigh));
252 queue.push(make_event(
253 &format!("n{}", i),
254 EventPriority::PriorityNormal,
255 ));
256 queue.push(make_event(&format!("l{}", i), EventPriority::PriorityLow));
257 }
258
259 let events = queue.pop_weighted(10);
262 assert_eq!(events.len(), 10);
263
264 let high_count = events
266 .iter()
267 .filter(|e| e.event_id.starts_with('h'))
268 .count();
269 let normal_count = events
270 .iter()
271 .filter(|e| e.event_id.starts_with('n'))
272 .count();
273 let low_count = events
274 .iter()
275 .filter(|e| e.event_id.starts_with('l'))
276 .count();
277
278 assert!((4..=6).contains(&high_count), "high={}", high_count);
280 assert!((2..=5).contains(&normal_count), "normal={}", normal_count);
281 assert!(low_count <= 3, "low={}", low_count);
282 }
283
284 #[test]
285 fn test_weighted_rollover() {
286 let mut queue = PriorityEventQueue::new();
287
288 for i in 0..10 {
290 queue.push(make_event(&format!("l{}", i), EventPriority::PriorityLow));
291 }
292
293 let events = queue.pop_weighted(5);
295 assert_eq!(events.len(), 5);
296 assert!(events.iter().all(|e| e.event_id.starts_with('l')));
297 }
298
299 #[test]
300 fn test_has_critical() {
301 let mut queue = PriorityEventQueue::new();
302 assert!(!queue.has_critical());
303
304 queue.push(make_event("h1", EventPriority::PriorityHigh));
305 assert!(!queue.has_critical());
306
307 queue.push(make_event("c1", EventPriority::PriorityCritical));
308 assert!(queue.has_critical());
309
310 queue.pop_critical();
311 assert!(!queue.has_critical());
312 }
313
314 #[test]
315 fn test_counts() {
316 let mut queue = PriorityEventQueue::new();
317
318 queue.push(make_event("c1", EventPriority::PriorityCritical));
319 queue.push(make_event("h1", EventPriority::PriorityHigh));
320 queue.push(make_event("h2", EventPriority::PriorityHigh));
321 queue.push(make_event("n1", EventPriority::PriorityNormal));
322
323 let counts = queue.counts();
324 assert_eq!(counts, [1, 2, 1, 0]);
325 assert_eq!(queue.len(), 4);
326
327 assert_eq!(queue.len_at_priority(EventPriority::PriorityCritical), 1);
328 assert_eq!(queue.len_at_priority(EventPriority::PriorityHigh), 2);
329 assert_eq!(queue.len_at_priority(EventPriority::PriorityNormal), 1);
330 assert_eq!(queue.len_at_priority(EventPriority::PriorityLow), 0);
331 }
332
333 #[test]
334 fn test_default_priority_for_missing_routing() {
335 let mut queue = PriorityEventQueue::new();
336
337 let event = PeatEvent {
339 event_id: "no-routing".to_string(),
340 timestamp: None,
341 source_node_id: "node-1".to_string(),
342 source_formation_id: "squad-1".to_string(),
343 source_instance_id: None,
344 event_class: peat_schema::event::v1::EventClass::Product as i32,
345 event_type: "test".to_string(),
346 routing: None,
347 payload_type_url: String::new(),
348 payload_value: vec![],
349 };
350
351 queue.push(event);
352 assert_eq!(queue.len_at_priority(EventPriority::PriorityNormal), 1);
353 }
354}