Skip to main content

shape_runtime/alerts/
router.rs

1//! Alert Router
2//!
3//! Routes alerts to appropriate sinks based on tags.
4
5use std::collections::{HashMap, HashSet, VecDeque};
6use std::sync::{Arc, RwLock};
7
8use super::sinks::AlertSink;
9use super::types::Alert;
10
11/// Dead-letter queue entry
12#[derive(Debug)]
13pub struct DeadLetterEntry {
14    /// The alert that failed
15    pub alert: Alert,
16    /// Name of the sink that failed
17    pub sink_name: String,
18    /// Error message
19    pub error: String,
20    /// Number of retry attempts
21    pub attempts: u32,
22}
23
24/// Alert router that directs alerts to appropriate sinks
25///
26/// # Routing Rules
27///
28/// 1. If an alert has tags, it's sent to sinks that handle those tags
29/// 2. If no tag matches, it's sent to default sinks (those with empty tag lists)
30/// 3. Sinks with no tags configured receive all alerts
31///
32/// # Dead Letter Queue
33///
34/// Failed deliveries are stored in a dead-letter queue for later retry.
35pub struct AlertRouter {
36    /// Named sinks
37    sinks: RwLock<HashMap<String, Arc<dyn AlertSink>>>,
38    /// Tag to sink names mapping
39    tag_routes: RwLock<HashMap<String, Vec<String>>>,
40    /// Default sinks (receive all alerts)
41    default_sinks: RwLock<Vec<String>>,
42    /// Dead-letter queue for failed deliveries
43    dlq: RwLock<VecDeque<DeadLetterEntry>>,
44    /// Maximum DLQ size
45    max_dlq_size: usize,
46}
47
48impl AlertRouter {
49    /// Create a new alert router
50    pub fn new() -> Self {
51        Self {
52            sinks: RwLock::new(HashMap::new()),
53            tag_routes: RwLock::new(HashMap::new()),
54            default_sinks: RwLock::new(Vec::new()),
55            dlq: RwLock::new(VecDeque::new()),
56            max_dlq_size: 1000,
57        }
58    }
59
60    /// Set maximum dead-letter queue size
61    pub fn with_max_dlq_size(mut self, size: usize) -> Self {
62        self.max_dlq_size = size;
63        self
64    }
65
66    /// Register a sink
67    ///
68    /// # Arguments
69    ///
70    /// * `name` - Unique name for the sink
71    /// * `sink` - The sink implementation
72    pub fn register_sink(&self, name: &str, sink: Arc<dyn AlertSink>) {
73        let tags = sink.handles_tags().to_vec();
74        let name = name.to_string();
75
76        // Register tag routes
77        if tags.is_empty() {
78            // Default sink - handles all alerts
79            let mut defaults = self.default_sinks.write().unwrap();
80            if !defaults.contains(&name) {
81                defaults.push(name.clone());
82            }
83        } else {
84            // Tag-specific sink
85            let mut tag_routes = self.tag_routes.write().unwrap();
86            for tag in tags {
87                tag_routes.entry(tag).or_default().push(name.clone());
88            }
89        }
90
91        // Store the sink
92        let mut sinks = self.sinks.write().unwrap();
93        sinks.insert(name, sink);
94    }
95
96    /// Unregister a sink
97    ///
98    /// # Arguments
99    ///
100    /// * `name` - Name of sink to remove
101    ///
102    /// # Returns
103    ///
104    /// true if sink was removed
105    pub fn unregister_sink(&self, name: &str) -> bool {
106        let mut sinks = self.sinks.write().unwrap();
107        let removed = sinks.remove(name).is_some();
108
109        if removed {
110            // Remove from default sinks
111            let mut defaults = self.default_sinks.write().unwrap();
112            defaults.retain(|n| n != name);
113
114            // Remove from tag routes
115            let mut tag_routes = self.tag_routes.write().unwrap();
116            for sinks in tag_routes.values_mut() {
117                sinks.retain(|n| n != name);
118            }
119        }
120
121        removed
122    }
123
124    /// Emit an alert to appropriate sinks
125    ///
126    /// # Arguments
127    ///
128    /// * `alert` - The alert to send
129    pub fn emit(&self, alert: Alert) {
130        // Determine target sinks
131        let target_sinks = self.get_target_sinks(&alert);
132
133        // Send to each sink
134        let sinks = self.sinks.read().unwrap();
135        for sink_name in target_sinks {
136            if let Some(sink) = sinks.get(&sink_name) {
137                if let Err(e) = sink.send(&alert) {
138                    // Add to DLQ
139                    self.add_to_dlq(DeadLetterEntry {
140                        alert: alert.clone(),
141                        sink_name,
142                        error: e.to_string(),
143                        attempts: 1,
144                    });
145                }
146            }
147        }
148    }
149
150    /// Get names of sinks that should receive an alert
151    fn get_target_sinks(&self, alert: &Alert) -> HashSet<String> {
152        let mut targets = HashSet::new();
153        let tag_routes = self.tag_routes.read().unwrap();
154
155        // Check tag routes
156        for tag in &alert.tags {
157            if let Some(sinks) = tag_routes.get(tag) {
158                targets.extend(sinks.iter().cloned());
159            }
160        }
161
162        // If no tag matches, use default sinks
163        if targets.is_empty() {
164            let defaults = self.default_sinks.read().unwrap();
165            targets.extend(defaults.iter().cloned());
166        }
167
168        targets
169    }
170
171    /// Add an entry to the dead-letter queue
172    fn add_to_dlq(&self, entry: DeadLetterEntry) {
173        let mut dlq = self.dlq.write().unwrap();
174
175        // Enforce max size
176        while dlq.len() >= self.max_dlq_size {
177            dlq.pop_front();
178        }
179
180        dlq.push_back(entry);
181    }
182
183    /// Get current dead-letter queue size
184    pub fn dlq_size(&self) -> usize {
185        self.dlq.read().unwrap().len()
186    }
187
188    /// Drain the dead-letter queue
189    pub fn drain_dlq(&self) -> Vec<DeadLetterEntry> {
190        let mut dlq = self.dlq.write().unwrap();
191        dlq.drain(..).collect()
192    }
193
194    /// Flush all sinks
195    pub fn flush(&self) {
196        let sinks = self.sinks.read().unwrap();
197        for sink in sinks.values() {
198            let _ = sink.flush();
199        }
200    }
201
202    /// List registered sink names
203    pub fn list_sinks(&self) -> Vec<String> {
204        let sinks = self.sinks.read().unwrap();
205        sinks.keys().cloned().collect()
206    }
207
208    /// Get sink by name
209    pub fn get_sink(&self, name: &str) -> Option<Arc<dyn AlertSink>> {
210        let sinks = self.sinks.read().unwrap();
211        sinks.get(name).cloned()
212    }
213}
214
215impl Default for AlertRouter {
216    fn default() -> Self {
217        Self::new()
218    }
219}
220
221// SAFETY: All fields use proper synchronization (RwLock)
222unsafe impl Send for AlertRouter {}
223unsafe impl Sync for AlertRouter {}
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228    use shape_ast::error::Result;
229    use std::sync::atomic::{AtomicUsize, Ordering};
230
231    struct CountingSink {
232        name: String,
233        count: AtomicUsize,
234        tags: Vec<String>,
235    }
236
237    impl CountingSink {
238        fn new(name: &str, tags: Vec<String>) -> Self {
239            Self {
240                name: name.to_string(),
241                count: AtomicUsize::new(0),
242                tags,
243            }
244        }
245
246        fn count(&self) -> usize {
247            self.count.load(Ordering::SeqCst)
248        }
249    }
250
251    impl AlertSink for CountingSink {
252        fn name(&self) -> &str {
253            &self.name
254        }
255
256        fn send(&self, _alert: &Alert) -> Result<()> {
257            self.count.fetch_add(1, Ordering::SeqCst);
258            Ok(())
259        }
260
261        fn handles_tags(&self) -> &[String] {
262            &self.tags
263        }
264    }
265
266    #[test]
267    fn test_router_default_sink() {
268        let router = AlertRouter::new();
269        let sink = Arc::new(CountingSink::new("default", vec![]));
270
271        router.register_sink("default", sink.clone());
272
273        let alert = Alert::new("Test", "Message");
274        router.emit(alert);
275
276        assert_eq!(sink.count(), 1);
277    }
278
279    #[test]
280    fn test_router_tag_routing() {
281        let router = AlertRouter::new();
282        let sink1 = Arc::new(CountingSink::new("sink1", vec!["tag1".to_string()]));
283        let sink2 = Arc::new(CountingSink::new("sink2", vec!["tag2".to_string()]));
284
285        router.register_sink("sink1", sink1.clone());
286        router.register_sink("sink2", sink2.clone());
287
288        // Alert with tag1 should go to sink1
289        let alert1 = Alert::new("Test1", "Message").with_tag("tag1");
290        router.emit(alert1);
291
292        assert_eq!(sink1.count(), 1);
293        assert_eq!(sink2.count(), 0);
294
295        // Alert with tag2 should go to sink2
296        let alert2 = Alert::new("Test2", "Message").with_tag("tag2");
297        router.emit(alert2);
298
299        assert_eq!(sink1.count(), 1);
300        assert_eq!(sink2.count(), 1);
301    }
302
303    #[test]
304    fn test_router_unregister() {
305        let router = AlertRouter::new();
306        let sink = Arc::new(CountingSink::new("test", vec![]));
307
308        router.register_sink("test", sink);
309        assert!(router.unregister_sink("test"));
310        assert!(!router.unregister_sink("test")); // Already removed
311    }
312
313    #[test]
314    fn test_router_multiple_tags_same_alert() {
315        let router = AlertRouter::new();
316        let sink1 = Arc::new(CountingSink::new("sink1", vec!["tag1".to_string()]));
317        let sink2 = Arc::new(CountingSink::new("sink2", vec!["tag2".to_string()]));
318
319        router.register_sink("sink1", sink1.clone());
320        router.register_sink("sink2", sink2.clone());
321
322        // Alert with both tags should go to both sinks
323        let alert = Alert::new("Test", "Message")
324            .with_tag("tag1")
325            .with_tag("tag2");
326        router.emit(alert);
327
328        assert_eq!(sink1.count(), 1);
329        assert_eq!(sink2.count(), 1);
330    }
331
332    #[test]
333    fn test_router_fallback_to_default() {
334        let router = AlertRouter::new();
335        let default_sink = Arc::new(CountingSink::new("default", vec![]));
336        let tagged_sink = Arc::new(CountingSink::new("tagged", vec!["special".to_string()]));
337
338        router.register_sink("default", default_sink.clone());
339        router.register_sink("tagged", tagged_sink.clone());
340
341        // Alert without matching tags should go to default
342        let alert = Alert::new("Test", "Message").with_tag("unmatched");
343        router.emit(alert);
344
345        assert_eq!(default_sink.count(), 1);
346        assert_eq!(tagged_sink.count(), 0);
347    }
348
349    #[test]
350    fn test_router_list_sinks() {
351        let router = AlertRouter::new();
352        let sink1 = Arc::new(CountingSink::new("sink1", vec![]));
353        let sink2 = Arc::new(CountingSink::new("sink2", vec![]));
354
355        router.register_sink("sink1", sink1);
356        router.register_sink("sink2", sink2);
357
358        let sinks = router.list_sinks();
359        assert_eq!(sinks.len(), 2);
360        assert!(sinks.contains(&"sink1".to_string()));
361        assert!(sinks.contains(&"sink2".to_string()));
362    }
363
364    #[test]
365    fn test_router_get_sink() {
366        let router = AlertRouter::new();
367        let sink = Arc::new(CountingSink::new("test", vec![]));
368
369        router.register_sink("test", sink.clone());
370
371        let retrieved = router.get_sink("test");
372        assert!(retrieved.is_some());
373        assert_eq!(retrieved.unwrap().name(), "test");
374
375        let missing = router.get_sink("nonexistent");
376        assert!(missing.is_none());
377    }
378
379    #[test]
380    fn test_dlq_max_size() {
381        // Create router with small DLQ
382        let router = AlertRouter::new().with_max_dlq_size(2);
383
384        // Initially empty
385        assert_eq!(router.dlq_size(), 0);
386    }
387
388    #[test]
389    fn test_router_flush() {
390        let router = AlertRouter::new();
391        let sink = Arc::new(CountingSink::new("test", vec![]));
392
393        router.register_sink("test", sink);
394
395        // Flush should not panic even with no pending alerts
396        router.flush();
397    }
398
399    struct FailingSink {
400        name: String,
401        tags: Vec<String>,
402    }
403
404    impl AlertSink for FailingSink {
405        fn name(&self) -> &str {
406            &self.name
407        }
408
409        fn send(&self, _alert: &Alert) -> Result<()> {
410            Err(shape_ast::ShapeError::RuntimeError {
411                message: "Simulated failure".to_string(),
412                location: None,
413            })
414        }
415
416        fn handles_tags(&self) -> &[String] {
417            &self.tags
418        }
419    }
420
421    #[test]
422    fn test_dlq_captures_failures() {
423        let router = AlertRouter::new();
424        let failing_sink = Arc::new(FailingSink {
425            name: "failing".to_string(),
426            tags: vec![],
427        });
428
429        router.register_sink("failing", failing_sink);
430
431        // Emit alert - should fail and go to DLQ
432        let alert = Alert::new("Test", "Message");
433        router.emit(alert);
434
435        assert_eq!(router.dlq_size(), 1);
436
437        // Drain and verify
438        let dlq_entries = router.drain_dlq();
439        assert_eq!(dlq_entries.len(), 1);
440        assert_eq!(dlq_entries[0].sink_name, "failing");
441        assert!(dlq_entries[0].error.contains("Simulated failure"));
442    }
443}