Skip to main content

synaps_cli/events/
queue.rs

1use super::types::{Event, Severity};
2use std::collections::VecDeque;
3use std::sync::Mutex;
4
5pub struct EventQueue {
6    inner: Mutex<VecDeque<Event>>,
7    capacity: usize,
8    notify: tokio::sync::Notify,
9}
10
11impl EventQueue {
12    pub fn new(capacity: usize) -> Self {
13        Self {
14            inner: Mutex::new(VecDeque::with_capacity(capacity)),
15            capacity,
16            notify: tokio::sync::Notify::new(),
17        }
18    }
19
20    /// Push an event. Critical goes to front, High goes after existing Criticals,
21    /// everything else goes to the back. Returns Err if queue is full.
22    pub fn push(&self, event: Event) -> Result<(), String> {
23        let mut q = self.inner.lock().unwrap();
24        if q.len() >= self.capacity {
25            return Err(format!("event queue full (capacity {})", self.capacity));
26        }
27        let sev = event.content.severity.clone();
28        match sev {
29            Some(Severity::Critical) => q.push_front(event),
30            Some(Severity::High) => {
31                // insert after trailing Critical events at the front
32                let mut idx = 0;
33                while idx < q.len()
34                    && matches!(
35                        q[idx].content.severity,
36                        Some(Severity::Critical)
37                    )
38                {
39                    idx += 1;
40                }
41                q.insert(idx, event);
42            }
43            _ => q.push_back(event),
44        }
45        drop(q);
46        self.notify.notify_one();
47        Ok(())
48    }
49
50    /// Force push to front regardless of severity. Evicts the oldest (back) if full.
51    pub fn push_priority(&self, event: Event) {
52        let mut q = self.inner.lock().unwrap();
53        if q.len() >= self.capacity {
54            if let Some(evicted) = q.back() {
55                tracing::warn!("event queue full — evicting event id={}", evicted.id);
56            }
57            q.pop_back();
58        }
59        q.push_front(event);
60        drop(q);
61        self.notify.notify_one();
62    }
63
64    pub fn pop(&self) -> Option<Event> {
65        self.inner.lock().unwrap().pop_front()
66    }
67
68    pub fn peek(&self) -> Option<Event> {
69        self.inner.lock().unwrap().front().cloned()
70    }
71
72    pub fn len(&self) -> usize {
73        self.inner.lock().unwrap().len()
74    }
75
76    /// Wait until an event is pushed. Use in tokio::select! for instant wake.
77    pub fn notified(&self) -> impl std::future::Future<Output = ()> + '_ {
78        self.notify.notified()
79    }
80
81    pub fn is_empty(&self) -> bool {
82        self.inner.lock().unwrap().is_empty()
83    }
84
85    pub fn drain(&self) -> Vec<Event> {
86        let mut q = self.inner.lock().unwrap();
87        q.drain(..).collect()
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94
95    fn ev(text: &str, sev: Option<Severity>) -> Event {
96        Event::simple("test", text, sev)
97    }
98
99    #[test]
100    fn push_pop_fifo_for_medium() {
101        let q = EventQueue::new(10);
102        q.push(ev("a", Some(Severity::Medium))).unwrap();
103        q.push(ev("b", Some(Severity::Low))).unwrap();
104        q.push(ev("c", None)).unwrap();
105        assert_eq!(q.pop().unwrap().content.text, "a");
106        assert_eq!(q.pop().unwrap().content.text, "b");
107        assert_eq!(q.pop().unwrap().content.text, "c");
108        assert!(q.is_empty());
109    }
110
111    #[test]
112    fn critical_jumps_to_front() {
113        let q = EventQueue::new(10);
114        q.push(ev("a", Some(Severity::Medium))).unwrap();
115        q.push(ev("b", Some(Severity::Medium))).unwrap();
116        q.push(ev("CRIT", Some(Severity::Critical))).unwrap();
117        assert_eq!(q.pop().unwrap().content.text, "CRIT");
118        assert_eq!(q.pop().unwrap().content.text, "a");
119    }
120
121    #[test]
122    fn high_sits_after_critical() {
123        let q = EventQueue::new(10);
124        q.push(ev("med", Some(Severity::Medium))).unwrap();
125        q.push(ev("c1", Some(Severity::Critical))).unwrap();
126        q.push(ev("c2", Some(Severity::Critical))).unwrap();
127        q.push(ev("high", Some(Severity::High))).unwrap();
128        // Order should be: c2, c1, high, med
129        assert_eq!(q.pop().unwrap().content.text, "c2");
130        assert_eq!(q.pop().unwrap().content.text, "c1");
131        assert_eq!(q.pop().unwrap().content.text, "high");
132        assert_eq!(q.pop().unwrap().content.text, "med");
133    }
134
135    #[test]
136    fn capacity_limit() {
137        let q = EventQueue::new(2);
138        q.push(ev("a", None)).unwrap();
139        q.push(ev("b", None)).unwrap();
140        assert!(q.push(ev("c", None)).is_err());
141    }
142
143    #[test]
144    fn drain_takes_all() {
145        let q = EventQueue::new(10);
146        q.push(ev("a", None)).unwrap();
147        q.push(ev("b", None)).unwrap();
148        let all = q.drain();
149        assert_eq!(all.len(), 2);
150        assert!(q.is_empty());
151    }
152
153    #[test]
154    fn peek_does_not_remove() {
155        let q = EventQueue::new(10);
156        q.push(ev("a", None)).unwrap();
157        assert_eq!(q.peek().unwrap().content.text, "a");
158        assert_eq!(q.len(), 1);
159    }
160}