Skip to main content

deepstrike_core/signals/
router.rs

1use std::collections::HashSet;
2
3use compact_str::CompactString;
4
5use super::attention::UrgencyBasedPolicy;
6use super::queue::SignalQueue;
7use crate::types::policy::{AttentionPolicy, SignalDisposition};
8use crate::types::signal::RuntimeSignal;
9
10/// High-performance signal router with deduplication and priority queuing.
11pub struct SignalRouter {
12    seen: HashSet<CompactString>,
13    queue: SignalQueue,
14    attention: Box<dyn AttentionPolicy>,
15}
16
17impl SignalRouter {
18    pub fn new(max_queue_size: usize) -> Self {
19        Self::with_policy(max_queue_size, Box::new(UrgencyBasedPolicy))
20    }
21
22    pub fn with_policy(max_queue_size: usize, policy: Box<dyn AttentionPolicy>) -> Self {
23        Self {
24            seen: HashSet::new(),
25            queue: SignalQueue::new(max_queue_size),
26            attention: policy,
27        }
28    }
29
30    /// Ingest a signal. Returns the disposition after dedup + attention evaluation.
31    /// `Queue` dispositions are buffered; if the queue is full, returns `Dropped`
32    /// so the SDK can apply backpressure or surface the loss to telemetry.
33    /// All other dispositions are returned directly to the caller.
34    pub fn ingest(&mut self, signal: RuntimeSignal, is_running: bool) -> SignalDisposition {
35        if let Some(ref key) = signal.dedupe_key {
36            if !self.seen.insert(key.clone()) {
37                return SignalDisposition::Ignore;
38            }
39        }
40
41        let disposition = self.attention.evaluate(&signal, is_running);
42
43        if disposition == SignalDisposition::Queue {
44            if !self.queue.push(signal) {
45                return SignalDisposition::Dropped;
46            }
47        }
48
49        disposition
50    }
51
52    /// Pull next queued signal.
53    pub fn next(&mut self) -> Option<RuntimeSignal> {
54        self.queue.pop()
55    }
56
57    /// Number of queued signals.
58    pub fn depth(&self) -> usize {
59        self.queue.len()
60    }
61
62    /// Clear the dedup set (call at session boundaries to prevent unbounded growth).
63    pub fn clear_dedup(&mut self) {
64        self.seen.clear();
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use super::*;
71    use crate::types::signal::{SignalSource, SignalType, Urgency};
72
73    #[test]
74    fn deduplicates_signals() {
75        let mut router = SignalRouter::new(100);
76        let sig = RuntimeSignal::new(
77            SignalSource::Cron,
78            SignalType::Event,
79            Urgency::Normal,
80            "tick",
81        )
82        .with_dedupe("cron-tick-1");
83
84        let d1 = router.ingest(sig.clone(), false);
85        assert_ne!(d1, SignalDisposition::Ignore);
86
87        let d2 = router.ingest(sig, false);
88        assert_eq!(d2, SignalDisposition::Ignore);
89    }
90
91    #[test]
92    fn normal_signal_queued() {
93        let mut router = SignalRouter::new(100);
94        let sig = RuntimeSignal::new(
95            SignalSource::Cron,
96            SignalType::Event,
97            Urgency::Normal,
98            "job",
99        );
100
101        let d = router.ingest(sig, false);
102        assert_eq!(d, SignalDisposition::Queue);
103        assert_eq!(router.depth(), 1);
104        assert!(router.next().is_some());
105    }
106
107    #[test]
108    fn interrupt_signals_not_queued() {
109        let mut router = SignalRouter::new(100);
110        let sig = RuntimeSignal::new(
111            SignalSource::Gateway,
112            SignalType::Alert,
113            Urgency::Critical,
114            "fire",
115        );
116
117        let d = router.ingest(sig, true);
118        assert_eq!(d, SignalDisposition::InterruptNow);
119        assert_eq!(router.depth(), 0);
120    }
121
122    #[test]
123    fn full_queue_drops_signal() {
124        let mut router = SignalRouter::new(1);
125        let s1 = RuntimeSignal::new(
126            SignalSource::Cron,
127            SignalType::Event,
128            Urgency::Normal,
129            "first",
130        );
131        let s2 = RuntimeSignal::new(
132            SignalSource::Cron,
133            SignalType::Event,
134            Urgency::Normal,
135            "second",
136        );
137
138        assert_eq!(router.ingest(s1, false), SignalDisposition::Queue);
139        assert_eq!(router.ingest(s2, false), SignalDisposition::Dropped);
140    }
141
142    #[test]
143    fn clear_dedup_allows_reingest() {
144        let mut router = SignalRouter::new(100);
145        let sig = RuntimeSignal::new(
146            SignalSource::Cron,
147            SignalType::Event,
148            Urgency::Normal,
149            "tick",
150        )
151        .with_dedupe("key-1");
152
153        router.ingest(sig.clone(), false);
154        assert_eq!(router.ingest(sig.clone(), false), SignalDisposition::Ignore);
155
156        router.clear_dedup();
157        assert_ne!(router.ingest(sig, false), SignalDisposition::Ignore);
158    }
159}