deepstrike_core/signals/
queue.rs1use std::cmp::Ordering;
2use std::collections::BinaryHeap;
3
4use crate::types::signal::{RuntimeSignal, Urgency};
5
6struct PrioritizedSignal {
8 urgency: Urgency,
9 timestamp_ms: u64,
10 signal: RuntimeSignal,
11}
12
13impl PartialEq for PrioritizedSignal {
14 fn eq(&self, other: &Self) -> bool {
15 self.signal.id == other.signal.id
16 }
17}
18impl Eq for PrioritizedSignal {}
19
20impl PartialOrd for PrioritizedSignal {
21 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
22 Some(self.cmp(other))
23 }
24}
25
26impl Ord for PrioritizedSignal {
27 fn cmp(&self, other: &Self) -> Ordering {
28 self.urgency
29 .cmp(&other.urgency)
30 .then_with(|| other.timestamp_ms.cmp(&self.timestamp_ms))
31 .then_with(|| self.signal.id.cmp(&other.signal.id))
32 }
33}
34
35pub(super) struct SignalQueue {
37 heap: BinaryHeap<PrioritizedSignal>,
38 max_size: usize,
39}
40
41impl SignalQueue {
42 pub(super) fn new(max_size: usize) -> Self {
43 Self {
44 heap: BinaryHeap::new(),
45 max_size,
46 }
47 }
48
49 pub(super) fn push(&mut self, signal: RuntimeSignal) -> bool {
51 if self.heap.len() >= self.max_size {
52 return false;
53 }
54 let urgency = signal.urgency;
55 let timestamp_ms = signal.timestamp_ms;
56 self.heap.push(PrioritizedSignal {
57 urgency,
58 timestamp_ms,
59 signal,
60 });
61 true
62 }
63
64 pub(super) fn pop(&mut self) -> Option<RuntimeSignal> {
65 self.heap.pop().map(|ps| ps.signal)
66 }
67
68 pub(super) fn pop_for(&mut self, recipient: Option<&str>) -> Option<RuntimeSignal> {
73 let Some(recipient) = recipient else {
74 return self.pop();
75 };
76 let mut skipped: Vec<PrioritizedSignal> = Vec::new();
77 let mut found = None;
78 while let Some(ps) = self.heap.pop() {
79 let visible = ps
80 .signal
81 .recipient
82 .as_deref()
83 .is_none_or(|r| r == recipient);
84 if visible {
85 found = Some(ps.signal);
86 break;
87 }
88 skipped.push(ps);
89 }
90 for ps in skipped {
91 self.heap.push(ps);
92 }
93 found
94 }
95
96 pub(super) fn len(&self) -> usize {
97 self.heap.len()
98 }
99
100 #[allow(dead_code)]
101 pub(super) fn is_empty(&self) -> bool {
102 self.heap.is_empty()
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109 use crate::types::signal::{SignalSource, SignalType};
110
111 #[test]
112 fn higher_urgency_dequeued_first() {
113 let mut q = SignalQueue::new(10);
114 q.push(
115 RuntimeSignal::new(SignalSource::Cron, SignalType::Event, Urgency::Low, "low")
116 .with_timestamp(1),
117 );
118 q.push(
119 RuntimeSignal::new(
120 SignalSource::Gateway,
121 SignalType::Alert,
122 Urgency::Critical,
123 "crit",
124 )
125 .with_timestamp(2),
126 );
127 q.push(
128 RuntimeSignal::new(
129 SignalSource::Cron,
130 SignalType::Event,
131 Urgency::Normal,
132 "norm",
133 )
134 .with_timestamp(3),
135 );
136
137 assert_eq!(q.pop().unwrap().urgency, Urgency::Critical);
138 assert_eq!(q.pop().unwrap().urgency, Urgency::Normal);
139 assert_eq!(q.pop().unwrap().urgency, Urgency::Low);
140 }
141
142 #[test]
143 fn respects_max_size() {
144 let mut q = SignalQueue::new(1);
145 assert!(
146 q.push(
147 RuntimeSignal::new(SignalSource::Cron, SignalType::Event, Urgency::Low, "a")
148 .with_timestamp(1)
149 )
150 );
151 assert!(
152 !q.push(
153 RuntimeSignal::new(SignalSource::Cron, SignalType::Event, Urgency::Low, "b")
154 .with_timestamp(2)
155 )
156 );
157 }
158
159 #[test]
160 fn same_urgency_older_first() {
161 let mut q = SignalQueue::new(10);
162 q.push(
163 RuntimeSignal::new(
164 SignalSource::Cron,
165 SignalType::Event,
166 Urgency::Normal,
167 "newer",
168 )
169 .with_timestamp(100),
170 );
171 q.push(
172 RuntimeSignal::new(
173 SignalSource::Cron,
174 SignalType::Event,
175 Urgency::Normal,
176 "older",
177 )
178 .with_timestamp(1),
179 );
180
181 assert_eq!(q.pop().unwrap().summary.as_str(), "older");
182 }
183}