1use std::collections::{HashMap, HashSet, VecDeque};
6use std::sync::{Arc, RwLock};
7
8use super::sinks::AlertSink;
9use super::types::Alert;
10
11#[derive(Debug)]
13pub struct DeadLetterEntry {
14 pub alert: Alert,
16 pub sink_name: String,
18 pub error: String,
20 pub attempts: u32,
22}
23
24pub struct AlertRouter {
36 sinks: RwLock<HashMap<String, Arc<dyn AlertSink>>>,
38 tag_routes: RwLock<HashMap<String, Vec<String>>>,
40 default_sinks: RwLock<Vec<String>>,
42 dlq: RwLock<VecDeque<DeadLetterEntry>>,
44 max_dlq_size: usize,
46}
47
48impl AlertRouter {
49 pub fn new() -> Self {
51 Self {
52 sinks: RwLock::new(HashMap::new()),
53 tag_routes: RwLock::new(HashMap::new()),
54 default_sinks: RwLock::new(Vec::new()),
55 dlq: RwLock::new(VecDeque::new()),
56 max_dlq_size: 1000,
57 }
58 }
59
60 pub fn with_max_dlq_size(mut self, size: usize) -> Self {
62 self.max_dlq_size = size;
63 self
64 }
65
66 pub fn register_sink(&self, name: &str, sink: Arc<dyn AlertSink>) {
73 let tags = sink.handles_tags().to_vec();
74 let name = name.to_string();
75
76 if tags.is_empty() {
78 let mut defaults = self.default_sinks.write().unwrap();
80 if !defaults.contains(&name) {
81 defaults.push(name.clone());
82 }
83 } else {
84 let mut tag_routes = self.tag_routes.write().unwrap();
86 for tag in tags {
87 tag_routes.entry(tag).or_default().push(name.clone());
88 }
89 }
90
91 let mut sinks = self.sinks.write().unwrap();
93 sinks.insert(name, sink);
94 }
95
96 pub fn unregister_sink(&self, name: &str) -> bool {
106 let mut sinks = self.sinks.write().unwrap();
107 let removed = sinks.remove(name).is_some();
108
109 if removed {
110 let mut defaults = self.default_sinks.write().unwrap();
112 defaults.retain(|n| n != name);
113
114 let mut tag_routes = self.tag_routes.write().unwrap();
116 for sinks in tag_routes.values_mut() {
117 sinks.retain(|n| n != name);
118 }
119 }
120
121 removed
122 }
123
124 pub fn emit(&self, alert: Alert) {
130 let target_sinks = self.get_target_sinks(&alert);
132
133 let sinks = self.sinks.read().unwrap();
135 for sink_name in target_sinks {
136 if let Some(sink) = sinks.get(&sink_name) {
137 if let Err(e) = sink.send(&alert) {
138 self.add_to_dlq(DeadLetterEntry {
140 alert: alert.clone(),
141 sink_name,
142 error: e.to_string(),
143 attempts: 1,
144 });
145 }
146 }
147 }
148 }
149
150 fn get_target_sinks(&self, alert: &Alert) -> HashSet<String> {
152 let mut targets = HashSet::new();
153 let tag_routes = self.tag_routes.read().unwrap();
154
155 for tag in &alert.tags {
157 if let Some(sinks) = tag_routes.get(tag) {
158 targets.extend(sinks.iter().cloned());
159 }
160 }
161
162 if targets.is_empty() {
164 let defaults = self.default_sinks.read().unwrap();
165 targets.extend(defaults.iter().cloned());
166 }
167
168 targets
169 }
170
171 fn add_to_dlq(&self, entry: DeadLetterEntry) {
173 let mut dlq = self.dlq.write().unwrap();
174
175 while dlq.len() >= self.max_dlq_size {
177 dlq.pop_front();
178 }
179
180 dlq.push_back(entry);
181 }
182
183 pub fn dlq_size(&self) -> usize {
185 self.dlq.read().unwrap().len()
186 }
187
188 pub fn drain_dlq(&self) -> Vec<DeadLetterEntry> {
190 let mut dlq = self.dlq.write().unwrap();
191 dlq.drain(..).collect()
192 }
193
194 pub fn flush(&self) {
196 let sinks = self.sinks.read().unwrap();
197 for sink in sinks.values() {
198 let _ = sink.flush();
199 }
200 }
201
202 pub fn list_sinks(&self) -> Vec<String> {
204 let sinks = self.sinks.read().unwrap();
205 sinks.keys().cloned().collect()
206 }
207
208 pub fn get_sink(&self, name: &str) -> Option<Arc<dyn AlertSink>> {
210 let sinks = self.sinks.read().unwrap();
211 sinks.get(name).cloned()
212 }
213}
214
215impl Default for AlertRouter {
216 fn default() -> Self {
217 Self::new()
218 }
219}
220
221unsafe impl Send for AlertRouter {}
223unsafe impl Sync for AlertRouter {}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228 use shape_ast::error::Result;
229 use std::sync::atomic::{AtomicUsize, Ordering};
230
231 struct CountingSink {
232 name: String,
233 count: AtomicUsize,
234 tags: Vec<String>,
235 }
236
237 impl CountingSink {
238 fn new(name: &str, tags: Vec<String>) -> Self {
239 Self {
240 name: name.to_string(),
241 count: AtomicUsize::new(0),
242 tags,
243 }
244 }
245
246 fn count(&self) -> usize {
247 self.count.load(Ordering::SeqCst)
248 }
249 }
250
251 impl AlertSink for CountingSink {
252 fn name(&self) -> &str {
253 &self.name
254 }
255
256 fn send(&self, _alert: &Alert) -> Result<()> {
257 self.count.fetch_add(1, Ordering::SeqCst);
258 Ok(())
259 }
260
261 fn handles_tags(&self) -> &[String] {
262 &self.tags
263 }
264 }
265
266 #[test]
267 fn test_router_default_sink() {
268 let router = AlertRouter::new();
269 let sink = Arc::new(CountingSink::new("default", vec![]));
270
271 router.register_sink("default", sink.clone());
272
273 let alert = Alert::new("Test", "Message");
274 router.emit(alert);
275
276 assert_eq!(sink.count(), 1);
277 }
278
279 #[test]
280 fn test_router_tag_routing() {
281 let router = AlertRouter::new();
282 let sink1 = Arc::new(CountingSink::new("sink1", vec!["tag1".to_string()]));
283 let sink2 = Arc::new(CountingSink::new("sink2", vec!["tag2".to_string()]));
284
285 router.register_sink("sink1", sink1.clone());
286 router.register_sink("sink2", sink2.clone());
287
288 let alert1 = Alert::new("Test1", "Message").with_tag("tag1");
290 router.emit(alert1);
291
292 assert_eq!(sink1.count(), 1);
293 assert_eq!(sink2.count(), 0);
294
295 let alert2 = Alert::new("Test2", "Message").with_tag("tag2");
297 router.emit(alert2);
298
299 assert_eq!(sink1.count(), 1);
300 assert_eq!(sink2.count(), 1);
301 }
302
303 #[test]
304 fn test_router_unregister() {
305 let router = AlertRouter::new();
306 let sink = Arc::new(CountingSink::new("test", vec![]));
307
308 router.register_sink("test", sink);
309 assert!(router.unregister_sink("test"));
310 assert!(!router.unregister_sink("test")); }
312
313 #[test]
314 fn test_router_multiple_tags_same_alert() {
315 let router = AlertRouter::new();
316 let sink1 = Arc::new(CountingSink::new("sink1", vec!["tag1".to_string()]));
317 let sink2 = Arc::new(CountingSink::new("sink2", vec!["tag2".to_string()]));
318
319 router.register_sink("sink1", sink1.clone());
320 router.register_sink("sink2", sink2.clone());
321
322 let alert = Alert::new("Test", "Message")
324 .with_tag("tag1")
325 .with_tag("tag2");
326 router.emit(alert);
327
328 assert_eq!(sink1.count(), 1);
329 assert_eq!(sink2.count(), 1);
330 }
331
332 #[test]
333 fn test_router_fallback_to_default() {
334 let router = AlertRouter::new();
335 let default_sink = Arc::new(CountingSink::new("default", vec![]));
336 let tagged_sink = Arc::new(CountingSink::new("tagged", vec!["special".to_string()]));
337
338 router.register_sink("default", default_sink.clone());
339 router.register_sink("tagged", tagged_sink.clone());
340
341 let alert = Alert::new("Test", "Message").with_tag("unmatched");
343 router.emit(alert);
344
345 assert_eq!(default_sink.count(), 1);
346 assert_eq!(tagged_sink.count(), 0);
347 }
348
349 #[test]
350 fn test_router_list_sinks() {
351 let router = AlertRouter::new();
352 let sink1 = Arc::new(CountingSink::new("sink1", vec![]));
353 let sink2 = Arc::new(CountingSink::new("sink2", vec![]));
354
355 router.register_sink("sink1", sink1);
356 router.register_sink("sink2", sink2);
357
358 let sinks = router.list_sinks();
359 assert_eq!(sinks.len(), 2);
360 assert!(sinks.contains(&"sink1".to_string()));
361 assert!(sinks.contains(&"sink2".to_string()));
362 }
363
364 #[test]
365 fn test_router_get_sink() {
366 let router = AlertRouter::new();
367 let sink = Arc::new(CountingSink::new("test", vec![]));
368
369 router.register_sink("test", sink.clone());
370
371 let retrieved = router.get_sink("test");
372 assert!(retrieved.is_some());
373 assert_eq!(retrieved.unwrap().name(), "test");
374
375 let missing = router.get_sink("nonexistent");
376 assert!(missing.is_none());
377 }
378
379 #[test]
380 fn test_dlq_max_size() {
381 let router = AlertRouter::new().with_max_dlq_size(2);
383
384 assert_eq!(router.dlq_size(), 0);
386 }
387
388 #[test]
389 fn test_router_flush() {
390 let router = AlertRouter::new();
391 let sink = Arc::new(CountingSink::new("test", vec![]));
392
393 router.register_sink("test", sink);
394
395 router.flush();
397 }
398
399 struct FailingSink {
400 name: String,
401 tags: Vec<String>,
402 }
403
404 impl AlertSink for FailingSink {
405 fn name(&self) -> &str {
406 &self.name
407 }
408
409 fn send(&self, _alert: &Alert) -> Result<()> {
410 Err(shape_ast::ShapeError::RuntimeError {
411 message: "Simulated failure".to_string(),
412 location: None,
413 })
414 }
415
416 fn handles_tags(&self) -> &[String] {
417 &self.tags
418 }
419 }
420
421 #[test]
422 fn test_dlq_captures_failures() {
423 let router = AlertRouter::new();
424 let failing_sink = Arc::new(FailingSink {
425 name: "failing".to_string(),
426 tags: vec![],
427 });
428
429 router.register_sink("failing", failing_sink);
430
431 let alert = Alert::new("Test", "Message");
433 router.emit(alert);
434
435 assert_eq!(router.dlq_size(), 1);
436
437 let dlq_entries = router.drain_dlq();
439 assert_eq!(dlq_entries.len(), 1);
440 assert_eq!(dlq_entries[0].sink_name, "failing");
441 assert!(dlq_entries[0].error.contains("Simulated failure"));
442 }
443}