deepstrike_core/signals/
router.rs1use std::collections::HashSet;
2
3use compact_str::CompactString;
4
5use super::attention::UrgencyBasedPolicy;
6use super::queue::SignalQueue;
7use crate::types::policy::{AttentionPolicy, SignalDisposition};
8use crate::types::signal::RuntimeSignal;
9
10pub struct SignalRouter {
12 seen: HashSet<CompactString>,
13 queue: SignalQueue,
14 attention: Box<dyn AttentionPolicy>,
15}
16
17impl SignalRouter {
18 pub fn new(max_queue_size: usize) -> Self {
19 Self::with_policy(max_queue_size, Box::new(UrgencyBasedPolicy))
20 }
21
22 pub fn with_policy(max_queue_size: usize, policy: Box<dyn AttentionPolicy>) -> Self {
23 Self {
24 seen: HashSet::new(),
25 queue: SignalQueue::new(max_queue_size),
26 attention: policy,
27 }
28 }
29
30 pub fn ingest(&mut self, signal: RuntimeSignal, is_running: bool) -> SignalDisposition {
35 if let Some(ref key) = signal.dedupe_key {
36 if !self.seen.insert(key.clone()) {
37 return SignalDisposition::Ignore;
38 }
39 }
40
41 let disposition = self.attention.evaluate(&signal, is_running);
42
43 if disposition == SignalDisposition::Queue {
44 if !self.queue.push(signal) {
45 return SignalDisposition::Dropped;
46 }
47 }
48
49 disposition
50 }
51
52 pub fn next(&mut self) -> Option<RuntimeSignal> {
54 self.queue.pop()
55 }
56
57 pub fn next_for(&mut self, recipient: Option<&str>) -> Option<RuntimeSignal> {
60 self.queue.pop_for(recipient)
61 }
62
63 pub fn depth(&self) -> usize {
65 self.queue.len()
66 }
67
68 pub fn clear_dedup(&mut self) {
70 self.seen.clear();
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use super::*;
77 use crate::types::signal::{SignalSource, SignalType, Urgency};
78
79 #[test]
80 fn deduplicates_signals() {
81 let mut router = SignalRouter::new(100);
82 let sig = RuntimeSignal::new(
83 SignalSource::Cron,
84 SignalType::Event,
85 Urgency::Normal,
86 "tick",
87 )
88 .with_dedupe("cron-tick-1");
89
90 let d1 = router.ingest(sig.clone(), false);
91 assert_ne!(d1, SignalDisposition::Ignore);
92
93 let d2 = router.ingest(sig, false);
94 assert_eq!(d2, SignalDisposition::Ignore);
95 }
96
97 #[test]
98 fn normal_signal_queued() {
99 let mut router = SignalRouter::new(100);
100 let sig = RuntimeSignal::new(
101 SignalSource::Cron,
102 SignalType::Event,
103 Urgency::Normal,
104 "job",
105 );
106
107 let d = router.ingest(sig, false);
108 assert_eq!(d, SignalDisposition::Queue);
109 assert_eq!(router.depth(), 1);
110 assert!(router.next().is_some());
111 }
112
113 #[test]
114 fn interrupt_signals_not_queued() {
115 let mut router = SignalRouter::new(100);
116 let sig = RuntimeSignal::new(
117 SignalSource::Gateway,
118 SignalType::Alert,
119 Urgency::Critical,
120 "fire",
121 );
122
123 let d = router.ingest(sig, true);
124 assert_eq!(d, SignalDisposition::InterruptNow);
125 assert_eq!(router.depth(), 0);
126 }
127
128 #[test]
129 fn full_queue_drops_signal() {
130 let mut router = SignalRouter::new(1);
131 let s1 = RuntimeSignal::new(
132 SignalSource::Cron,
133 SignalType::Event,
134 Urgency::Normal,
135 "first",
136 );
137 let s2 = RuntimeSignal::new(
138 SignalSource::Cron,
139 SignalType::Event,
140 Urgency::Normal,
141 "second",
142 );
143
144 assert_eq!(router.ingest(s1, false), SignalDisposition::Queue);
145 assert_eq!(router.ingest(s2, false), SignalDisposition::Dropped);
146 }
147
148 #[test]
149 fn next_for_drains_recipient_plus_broadcast_only() {
150 let mut router = SignalRouter::new(100);
151 let to_a = RuntimeSignal::new(SignalSource::Gateway, SignalType::Event, Urgency::Normal, "a")
152 .with_recipient("sess-a")
153 .with_timestamp(1);
154 let to_b = RuntimeSignal::new(SignalSource::Gateway, SignalType::Event, Urgency::Normal, "b")
155 .with_recipient("sess-b")
156 .with_timestamp(2);
157 let broadcast =
158 RuntimeSignal::new(SignalSource::Cron, SignalType::Job, Urgency::Normal, "all")
159 .with_timestamp(3);
160 router.ingest(to_a, false);
161 router.ingest(to_b, false);
162 router.ingest(broadcast, false);
163
164 let first = router.next_for(Some("sess-a")).unwrap();
166 let second = router.next_for(Some("sess-a")).unwrap();
167 let mut got: Vec<_> = [first.summary.as_str(), second.summary.as_str()]
168 .iter()
169 .map(|s| s.to_string())
170 .collect();
171 got.sort();
172 assert_eq!(got, vec!["a".to_string(), "all".to_string()]);
173 assert!(router.next_for(Some("sess-a")).is_none());
174
175 assert_eq!(router.next_for(Some("sess-b")).unwrap().summary.as_str(), "b");
177 }
178
179 #[test]
180 fn next_for_none_drains_anything() {
181 let mut router = SignalRouter::new(100);
182 router.ingest(
183 RuntimeSignal::new(SignalSource::Gateway, SignalType::Event, Urgency::Normal, "x")
184 .with_recipient("sess-a"),
185 false,
186 );
187 assert_eq!(router.next_for(None).unwrap().summary.as_str(), "x");
189 }
190
191 #[test]
192 fn clear_dedup_allows_reingest() {
193 let mut router = SignalRouter::new(100);
194 let sig = RuntimeSignal::new(
195 SignalSource::Cron,
196 SignalType::Event,
197 Urgency::Normal,
198 "tick",
199 )
200 .with_dedupe("key-1");
201
202 router.ingest(sig.clone(), false);
203 assert_eq!(router.ingest(sig.clone(), false), SignalDisposition::Ignore);
204
205 router.clear_dedup();
206 assert_ne!(router.ingest(sig, false), SignalDisposition::Ignore);
207 }
208}