synaps_cli/events/
queue.rs1use super::types::{Event, Severity};
2use std::collections::VecDeque;
3use std::sync::Mutex;
4
5pub struct EventQueue {
6 inner: Mutex<VecDeque<Event>>,
7 capacity: usize,
8 notify: tokio::sync::Notify,
9}
10
11impl EventQueue {
12 pub fn new(capacity: usize) -> Self {
13 Self {
14 inner: Mutex::new(VecDeque::with_capacity(capacity)),
15 capacity,
16 notify: tokio::sync::Notify::new(),
17 }
18 }
19
20 pub fn push(&self, event: Event) -> Result<(), String> {
23 let mut q = self.inner.lock().unwrap();
24 if q.len() >= self.capacity {
25 return Err(format!("event queue full (capacity {})", self.capacity));
26 }
27 let sev = event.content.severity.clone();
28 match sev {
29 Some(Severity::Critical) => q.push_front(event),
30 Some(Severity::High) => {
31 let mut idx = 0;
33 while idx < q.len()
34 && matches!(
35 q[idx].content.severity,
36 Some(Severity::Critical)
37 )
38 {
39 idx += 1;
40 }
41 q.insert(idx, event);
42 }
43 _ => q.push_back(event),
44 }
45 drop(q);
46 self.notify.notify_one();
47 Ok(())
48 }
49
50 pub fn push_priority(&self, event: Event) {
52 let mut q = self.inner.lock().unwrap();
53 if q.len() >= self.capacity {
54 if let Some(evicted) = q.back() {
55 tracing::warn!("event queue full — evicting event id={}", evicted.id);
56 }
57 q.pop_back();
58 }
59 q.push_front(event);
60 drop(q);
61 self.notify.notify_one();
62 }
63
64 pub fn pop(&self) -> Option<Event> {
65 self.inner.lock().unwrap().pop_front()
66 }
67
68 pub fn peek(&self) -> Option<Event> {
69 self.inner.lock().unwrap().front().cloned()
70 }
71
72 pub fn len(&self) -> usize {
73 self.inner.lock().unwrap().len()
74 }
75
76 pub fn notified(&self) -> impl std::future::Future<Output = ()> + '_ {
78 self.notify.notified()
79 }
80
81 pub fn is_empty(&self) -> bool {
82 self.inner.lock().unwrap().is_empty()
83 }
84
85 pub fn drain(&self) -> Vec<Event> {
86 let mut q = self.inner.lock().unwrap();
87 q.drain(..).collect()
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use super::*;
94
95 fn ev(text: &str, sev: Option<Severity>) -> Event {
96 Event::simple("test", text, sev)
97 }
98
99 #[test]
100 fn push_pop_fifo_for_medium() {
101 let q = EventQueue::new(10);
102 q.push(ev("a", Some(Severity::Medium))).unwrap();
103 q.push(ev("b", Some(Severity::Low))).unwrap();
104 q.push(ev("c", None)).unwrap();
105 assert_eq!(q.pop().unwrap().content.text, "a");
106 assert_eq!(q.pop().unwrap().content.text, "b");
107 assert_eq!(q.pop().unwrap().content.text, "c");
108 assert!(q.is_empty());
109 }
110
111 #[test]
112 fn critical_jumps_to_front() {
113 let q = EventQueue::new(10);
114 q.push(ev("a", Some(Severity::Medium))).unwrap();
115 q.push(ev("b", Some(Severity::Medium))).unwrap();
116 q.push(ev("CRIT", Some(Severity::Critical))).unwrap();
117 assert_eq!(q.pop().unwrap().content.text, "CRIT");
118 assert_eq!(q.pop().unwrap().content.text, "a");
119 }
120
121 #[test]
122 fn high_sits_after_critical() {
123 let q = EventQueue::new(10);
124 q.push(ev("med", Some(Severity::Medium))).unwrap();
125 q.push(ev("c1", Some(Severity::Critical))).unwrap();
126 q.push(ev("c2", Some(Severity::Critical))).unwrap();
127 q.push(ev("high", Some(Severity::High))).unwrap();
128 assert_eq!(q.pop().unwrap().content.text, "c2");
130 assert_eq!(q.pop().unwrap().content.text, "c1");
131 assert_eq!(q.pop().unwrap().content.text, "high");
132 assert_eq!(q.pop().unwrap().content.text, "med");
133 }
134
135 #[test]
136 fn capacity_limit() {
137 let q = EventQueue::new(2);
138 q.push(ev("a", None)).unwrap();
139 q.push(ev("b", None)).unwrap();
140 assert!(q.push(ev("c", None)).is_err());
141 }
142
143 #[test]
144 fn drain_takes_all() {
145 let q = EventQueue::new(10);
146 q.push(ev("a", None)).unwrap();
147 q.push(ev("b", None)).unwrap();
148 let all = q.drain();
149 assert_eq!(all.len(), 2);
150 assert!(q.is_empty());
151 }
152
153 #[test]
154 fn peek_does_not_remove() {
155 let q = EventQueue::new(10);
156 q.push(ev("a", None)).unwrap();
157 assert_eq!(q.peek().unwrap().content.text, "a");
158 assert_eq!(q.len(), 1);
159 }
160}