Skip to main content

deepstrike_core/signals/
router.rs

1use std::collections::HashSet;
2
3use compact_str::CompactString;
4
5use super::attention::UrgencyBasedPolicy;
6use super::queue::SignalQueue;
7use crate::types::policy::{AttentionPolicy, SignalDisposition};
8use crate::types::signal::RuntimeSignal;
9
10/// High-performance signal router with deduplication and priority queuing.
11pub struct SignalRouter {
12    seen: HashSet<CompactString>,
13    queue: SignalQueue,
14    attention: Box<dyn AttentionPolicy>,
15}
16
17impl SignalRouter {
18    pub fn new(max_queue_size: usize) -> Self {
19        Self::with_policy(max_queue_size, Box::new(UrgencyBasedPolicy))
20    }
21
22    pub fn with_policy(max_queue_size: usize, policy: Box<dyn AttentionPolicy>) -> Self {
23        Self {
24            seen: HashSet::new(),
25            queue: SignalQueue::new(max_queue_size),
26            attention: policy,
27        }
28    }
29
30    /// Ingest a signal. Returns the disposition after dedup + attention evaluation.
31    /// `Queue` dispositions are buffered; if the queue is full, returns `Dropped`
32    /// so the SDK can apply backpressure or surface the loss to telemetry.
33    /// All other dispositions are returned directly to the caller.
34    pub fn ingest(&mut self, signal: RuntimeSignal, is_running: bool) -> SignalDisposition {
35        if let Some(ref key) = signal.dedupe_key {
36            if !self.seen.insert(key.clone()) {
37                return SignalDisposition::Ignore;
38            }
39        }
40
41        let disposition = self.attention.evaluate(&signal, is_running);
42
43        if disposition == SignalDisposition::Queue {
44            if !self.queue.push(signal) {
45                return SignalDisposition::Dropped;
46            }
47        }
48
49        disposition
50    }
51
52    /// Pull next queued signal.
53    pub fn next(&mut self) -> Option<RuntimeSignal> {
54        self.queue.pop()
55    }
56
57    /// Pull the next queued signal visible to `recipient` (broadcasts plus signals
58    /// addressed to it); other recipients' signals stay queued. `None` ⇒ no filter.
59    pub fn next_for(&mut self, recipient: Option<&str>) -> Option<RuntimeSignal> {
60        self.queue.pop_for(recipient)
61    }
62
63    /// Number of queued signals.
64    pub fn depth(&self) -> usize {
65        self.queue.len()
66    }
67
68    /// Clear the dedup set (call at session boundaries to prevent unbounded growth).
69    pub fn clear_dedup(&mut self) {
70        self.seen.clear();
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77    use crate::types::signal::{SignalSource, SignalType, Urgency};
78
79    #[test]
80    fn deduplicates_signals() {
81        let mut router = SignalRouter::new(100);
82        let sig = RuntimeSignal::new(
83            SignalSource::Cron,
84            SignalType::Event,
85            Urgency::Normal,
86            "tick",
87        )
88        .with_dedupe("cron-tick-1");
89
90        let d1 = router.ingest(sig.clone(), false);
91        assert_ne!(d1, SignalDisposition::Ignore);
92
93        let d2 = router.ingest(sig, false);
94        assert_eq!(d2, SignalDisposition::Ignore);
95    }
96
97    #[test]
98    fn normal_signal_queued() {
99        let mut router = SignalRouter::new(100);
100        let sig = RuntimeSignal::new(
101            SignalSource::Cron,
102            SignalType::Event,
103            Urgency::Normal,
104            "job",
105        );
106
107        let d = router.ingest(sig, false);
108        assert_eq!(d, SignalDisposition::Queue);
109        assert_eq!(router.depth(), 1);
110        assert!(router.next().is_some());
111    }
112
113    #[test]
114    fn interrupt_signals_not_queued() {
115        let mut router = SignalRouter::new(100);
116        let sig = RuntimeSignal::new(
117            SignalSource::Gateway,
118            SignalType::Alert,
119            Urgency::Critical,
120            "fire",
121        );
122
123        let d = router.ingest(sig, true);
124        assert_eq!(d, SignalDisposition::InterruptNow);
125        assert_eq!(router.depth(), 0);
126    }
127
128    #[test]
129    fn full_queue_drops_signal() {
130        let mut router = SignalRouter::new(1);
131        let s1 = RuntimeSignal::new(
132            SignalSource::Cron,
133            SignalType::Event,
134            Urgency::Normal,
135            "first",
136        );
137        let s2 = RuntimeSignal::new(
138            SignalSource::Cron,
139            SignalType::Event,
140            Urgency::Normal,
141            "second",
142        );
143
144        assert_eq!(router.ingest(s1, false), SignalDisposition::Queue);
145        assert_eq!(router.ingest(s2, false), SignalDisposition::Dropped);
146    }
147
148    #[test]
149    fn next_for_drains_recipient_plus_broadcast_only() {
150        let mut router = SignalRouter::new(100);
151        let to_a = RuntimeSignal::new(SignalSource::Gateway, SignalType::Event, Urgency::Normal, "a")
152            .with_recipient("sess-a")
153            .with_timestamp(1);
154        let to_b = RuntimeSignal::new(SignalSource::Gateway, SignalType::Event, Urgency::Normal, "b")
155            .with_recipient("sess-b")
156            .with_timestamp(2);
157        let broadcast =
158            RuntimeSignal::new(SignalSource::Cron, SignalType::Job, Urgency::Normal, "all")
159                .with_timestamp(3);
160        router.ingest(to_a, false);
161        router.ingest(to_b, false);
162        router.ingest(broadcast, false);
163
164        // Puller "sess-a" sees only its own + the broadcast, never sess-b's.
165        let first = router.next_for(Some("sess-a")).unwrap();
166        let second = router.next_for(Some("sess-a")).unwrap();
167        let mut got: Vec<_> = [first.summary.as_str(), second.summary.as_str()]
168            .iter()
169            .map(|s| s.to_string())
170            .collect();
171        got.sort();
172        assert_eq!(got, vec!["a".to_string(), "all".to_string()]);
173        assert!(router.next_for(Some("sess-a")).is_none());
174
175        // sess-b's signal is still queued for its own puller.
176        assert_eq!(router.next_for(Some("sess-b")).unwrap().summary.as_str(), "b");
177    }
178
179    #[test]
180    fn next_for_none_drains_anything() {
181        let mut router = SignalRouter::new(100);
182        router.ingest(
183            RuntimeSignal::new(SignalSource::Gateway, SignalType::Event, Urgency::Normal, "x")
184                .with_recipient("sess-a"),
185            false,
186        );
187        // No recipient filter ⇒ back-compat: drains the addressed signal too.
188        assert_eq!(router.next_for(None).unwrap().summary.as_str(), "x");
189    }
190
191    #[test]
192    fn clear_dedup_allows_reingest() {
193        let mut router = SignalRouter::new(100);
194        let sig = RuntimeSignal::new(
195            SignalSource::Cron,
196            SignalType::Event,
197            Urgency::Normal,
198            "tick",
199        )
200        .with_dedupe("key-1");
201
202        router.ingest(sig.clone(), false);
203        assert_eq!(router.ingest(sig.clone(), false), SignalDisposition::Ignore);
204
205        router.clear_dedup();
206        assert_ne!(router.ingest(sig, false), SignalDisposition::Ignore);
207    }
208}