deepstrike_core/signals/
router.rs1use std::collections::HashSet;
2
3use compact_str::CompactString;
4
5use super::attention::UrgencyBasedPolicy;
6use super::queue::SignalQueue;
7use crate::types::policy::{AttentionPolicy, SignalDisposition};
8use crate::types::signal::RuntimeSignal;
9
10pub struct SignalRouter {
12 seen: HashSet<CompactString>,
13 queue: SignalQueue,
14 attention: Box<dyn AttentionPolicy>,
15}
16
17impl SignalRouter {
18 pub fn new(max_queue_size: usize) -> Self {
19 Self::with_policy(max_queue_size, Box::new(UrgencyBasedPolicy))
20 }
21
22 pub fn with_policy(max_queue_size: usize, policy: Box<dyn AttentionPolicy>) -> Self {
23 Self {
24 seen: HashSet::new(),
25 queue: SignalQueue::new(max_queue_size),
26 attention: policy,
27 }
28 }
29
30 pub fn ingest(&mut self, signal: RuntimeSignal, is_running: bool) -> SignalDisposition {
35 if let Some(ref key) = signal.dedupe_key {
36 if !self.seen.insert(key.clone()) {
37 return SignalDisposition::Ignore;
38 }
39 }
40
41 let disposition = self.attention.evaluate(&signal, is_running);
42
43 if disposition == SignalDisposition::Queue {
44 if !self.queue.push(signal) {
45 return SignalDisposition::Dropped;
46 }
47 }
48
49 disposition
50 }
51
52 pub fn next(&mut self) -> Option<RuntimeSignal> {
54 self.queue.pop()
55 }
56
57 pub fn depth(&self) -> usize {
59 self.queue.len()
60 }
61
62 pub fn clear_dedup(&mut self) {
64 self.seen.clear();
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use super::*;
71 use crate::types::signal::{SignalSource, SignalType, Urgency};
72
73 #[test]
74 fn deduplicates_signals() {
75 let mut router = SignalRouter::new(100);
76 let sig = RuntimeSignal::new(
77 SignalSource::Cron,
78 SignalType::Event,
79 Urgency::Normal,
80 "tick",
81 )
82 .with_dedupe("cron-tick-1");
83
84 let d1 = router.ingest(sig.clone(), false);
85 assert_ne!(d1, SignalDisposition::Ignore);
86
87 let d2 = router.ingest(sig, false);
88 assert_eq!(d2, SignalDisposition::Ignore);
89 }
90
91 #[test]
92 fn normal_signal_queued() {
93 let mut router = SignalRouter::new(100);
94 let sig = RuntimeSignal::new(
95 SignalSource::Cron,
96 SignalType::Event,
97 Urgency::Normal,
98 "job",
99 );
100
101 let d = router.ingest(sig, false);
102 assert_eq!(d, SignalDisposition::Queue);
103 assert_eq!(router.depth(), 1);
104 assert!(router.next().is_some());
105 }
106
107 #[test]
108 fn interrupt_signals_not_queued() {
109 let mut router = SignalRouter::new(100);
110 let sig = RuntimeSignal::new(
111 SignalSource::Gateway,
112 SignalType::Alert,
113 Urgency::Critical,
114 "fire",
115 );
116
117 let d = router.ingest(sig, true);
118 assert_eq!(d, SignalDisposition::InterruptNow);
119 assert_eq!(router.depth(), 0);
120 }
121
122 #[test]
123 fn full_queue_drops_signal() {
124 let mut router = SignalRouter::new(1);
125 let s1 = RuntimeSignal::new(
126 SignalSource::Cron,
127 SignalType::Event,
128 Urgency::Normal,
129 "first",
130 );
131 let s2 = RuntimeSignal::new(
132 SignalSource::Cron,
133 SignalType::Event,
134 Urgency::Normal,
135 "second",
136 );
137
138 assert_eq!(router.ingest(s1, false), SignalDisposition::Queue);
139 assert_eq!(router.ingest(s2, false), SignalDisposition::Dropped);
140 }
141
142 #[test]
143 fn clear_dedup_allows_reingest() {
144 let mut router = SignalRouter::new(100);
145 let sig = RuntimeSignal::new(
146 SignalSource::Cron,
147 SignalType::Event,
148 Urgency::Normal,
149 "tick",
150 )
151 .with_dedupe("key-1");
152
153 router.ingest(sig.clone(), false);
154 assert_eq!(router.ingest(sig.clone(), false), SignalDisposition::Ignore);
155
156 router.clear_dedup();
157 assert_ne!(router.ingest(sig, false), SignalDisposition::Ignore);
158 }
159}