use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::{Arc, RwLock};
use super::sinks::AlertSink;
use super::types::Alert;
#[derive(Debug)]
pub struct DeadLetterEntry {
pub alert: Alert,
pub sink_name: String,
pub error: String,
pub attempts: u32,
}
pub struct AlertRouter {
sinks: RwLock<HashMap<String, Arc<dyn AlertSink>>>,
tag_routes: RwLock<HashMap<String, Vec<String>>>,
default_sinks: RwLock<Vec<String>>,
dlq: RwLock<VecDeque<DeadLetterEntry>>,
max_dlq_size: usize,
}
impl AlertRouter {
pub fn new() -> Self {
Self {
sinks: RwLock::new(HashMap::new()),
tag_routes: RwLock::new(HashMap::new()),
default_sinks: RwLock::new(Vec::new()),
dlq: RwLock::new(VecDeque::new()),
max_dlq_size: 1000,
}
}
pub fn with_max_dlq_size(mut self, size: usize) -> Self {
self.max_dlq_size = size;
self
}
pub fn register_sink(&self, name: &str, sink: Arc<dyn AlertSink>) {
let tags = sink.handles_tags().to_vec();
let name = name.to_string();
if tags.is_empty() {
let mut defaults = self.default_sinks.write().unwrap();
if !defaults.contains(&name) {
defaults.push(name.clone());
}
} else {
let mut tag_routes = self.tag_routes.write().unwrap();
for tag in tags {
tag_routes.entry(tag).or_default().push(name.clone());
}
}
let mut sinks = self.sinks.write().unwrap();
sinks.insert(name, sink);
}
pub fn unregister_sink(&self, name: &str) -> bool {
let mut sinks = self.sinks.write().unwrap();
let removed = sinks.remove(name).is_some();
if removed {
let mut defaults = self.default_sinks.write().unwrap();
defaults.retain(|n| n != name);
let mut tag_routes = self.tag_routes.write().unwrap();
for sinks in tag_routes.values_mut() {
sinks.retain(|n| n != name);
}
}
removed
}
pub fn emit(&self, alert: Alert) {
let target_sinks = self.get_target_sinks(&alert);
let sinks = self.sinks.read().unwrap();
for sink_name in target_sinks {
if let Some(sink) = sinks.get(&sink_name) {
if let Err(e) = sink.send(&alert) {
self.add_to_dlq(DeadLetterEntry {
alert: alert.clone(),
sink_name,
error: e.to_string(),
attempts: 1,
});
}
}
}
}
fn get_target_sinks(&self, alert: &Alert) -> HashSet<String> {
let mut targets = HashSet::new();
let tag_routes = self.tag_routes.read().unwrap();
for tag in &alert.tags {
if let Some(sinks) = tag_routes.get(tag) {
targets.extend(sinks.iter().cloned());
}
}
if targets.is_empty() {
let defaults = self.default_sinks.read().unwrap();
targets.extend(defaults.iter().cloned());
}
targets
}
fn add_to_dlq(&self, entry: DeadLetterEntry) {
let mut dlq = self.dlq.write().unwrap();
while dlq.len() >= self.max_dlq_size {
dlq.pop_front();
}
dlq.push_back(entry);
}
pub fn dlq_size(&self) -> usize {
self.dlq.read().unwrap().len()
}
pub fn drain_dlq(&self) -> Vec<DeadLetterEntry> {
let mut dlq = self.dlq.write().unwrap();
dlq.drain(..).collect()
}
pub fn flush(&self) {
let sinks = self.sinks.read().unwrap();
for sink in sinks.values() {
let _ = sink.flush();
}
}
pub fn list_sinks(&self) -> Vec<String> {
let sinks = self.sinks.read().unwrap();
sinks.keys().cloned().collect()
}
pub fn get_sink(&self, name: &str) -> Option<Arc<dyn AlertSink>> {
let sinks = self.sinks.read().unwrap();
sinks.get(name).cloned()
}
}
impl Default for AlertRouter {
fn default() -> Self {
Self::new()
}
}
unsafe impl Send for AlertRouter {}
unsafe impl Sync for AlertRouter {}
#[cfg(test)]
mod tests {
use super::*;
use shape_ast::error::Result;
use std::sync::atomic::{AtomicUsize, Ordering};
struct CountingSink {
name: String,
count: AtomicUsize,
tags: Vec<String>,
}
impl CountingSink {
fn new(name: &str, tags: Vec<String>) -> Self {
Self {
name: name.to_string(),
count: AtomicUsize::new(0),
tags,
}
}
fn count(&self) -> usize {
self.count.load(Ordering::SeqCst)
}
}
impl AlertSink for CountingSink {
fn name(&self) -> &str {
&self.name
}
fn send(&self, _alert: &Alert) -> Result<()> {
self.count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn handles_tags(&self) -> &[String] {
&self.tags
}
}
#[test]
fn test_router_default_sink() {
let router = AlertRouter::new();
let sink = Arc::new(CountingSink::new("default", vec![]));
router.register_sink("default", sink.clone());
let alert = Alert::new("Test", "Message");
router.emit(alert);
assert_eq!(sink.count(), 1);
}
#[test]
fn test_router_tag_routing() {
let router = AlertRouter::new();
let sink1 = Arc::new(CountingSink::new("sink1", vec!["tag1".to_string()]));
let sink2 = Arc::new(CountingSink::new("sink2", vec!["tag2".to_string()]));
router.register_sink("sink1", sink1.clone());
router.register_sink("sink2", sink2.clone());
let alert1 = Alert::new("Test1", "Message").with_tag("tag1");
router.emit(alert1);
assert_eq!(sink1.count(), 1);
assert_eq!(sink2.count(), 0);
let alert2 = Alert::new("Test2", "Message").with_tag("tag2");
router.emit(alert2);
assert_eq!(sink1.count(), 1);
assert_eq!(sink2.count(), 1);
}
#[test]
fn test_router_unregister() {
let router = AlertRouter::new();
let sink = Arc::new(CountingSink::new("test", vec![]));
router.register_sink("test", sink);
assert!(router.unregister_sink("test"));
assert!(!router.unregister_sink("test")); }
#[test]
fn test_router_multiple_tags_same_alert() {
let router = AlertRouter::new();
let sink1 = Arc::new(CountingSink::new("sink1", vec!["tag1".to_string()]));
let sink2 = Arc::new(CountingSink::new("sink2", vec!["tag2".to_string()]));
router.register_sink("sink1", sink1.clone());
router.register_sink("sink2", sink2.clone());
let alert = Alert::new("Test", "Message")
.with_tag("tag1")
.with_tag("tag2");
router.emit(alert);
assert_eq!(sink1.count(), 1);
assert_eq!(sink2.count(), 1);
}
#[test]
fn test_router_fallback_to_default() {
let router = AlertRouter::new();
let default_sink = Arc::new(CountingSink::new("default", vec![]));
let tagged_sink = Arc::new(CountingSink::new("tagged", vec!["special".to_string()]));
router.register_sink("default", default_sink.clone());
router.register_sink("tagged", tagged_sink.clone());
let alert = Alert::new("Test", "Message").with_tag("unmatched");
router.emit(alert);
assert_eq!(default_sink.count(), 1);
assert_eq!(tagged_sink.count(), 0);
}
#[test]
fn test_router_list_sinks() {
let router = AlertRouter::new();
let sink1 = Arc::new(CountingSink::new("sink1", vec![]));
let sink2 = Arc::new(CountingSink::new("sink2", vec![]));
router.register_sink("sink1", sink1);
router.register_sink("sink2", sink2);
let sinks = router.list_sinks();
assert_eq!(sinks.len(), 2);
assert!(sinks.contains(&"sink1".to_string()));
assert!(sinks.contains(&"sink2".to_string()));
}
#[test]
fn test_router_get_sink() {
let router = AlertRouter::new();
let sink = Arc::new(CountingSink::new("test", vec![]));
router.register_sink("test", sink.clone());
let retrieved = router.get_sink("test");
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().name(), "test");
let missing = router.get_sink("nonexistent");
assert!(missing.is_none());
}
#[test]
fn test_dlq_max_size() {
let router = AlertRouter::new().with_max_dlq_size(2);
assert_eq!(router.dlq_size(), 0);
}
#[test]
fn test_router_flush() {
let router = AlertRouter::new();
let sink = Arc::new(CountingSink::new("test", vec![]));
router.register_sink("test", sink);
router.flush();
}
struct FailingSink {
name: String,
tags: Vec<String>,
}
impl AlertSink for FailingSink {
fn name(&self) -> &str {
&self.name
}
fn send(&self, _alert: &Alert) -> Result<()> {
Err(shape_ast::ShapeError::RuntimeError {
message: "Simulated failure".to_string(),
location: None,
})
}
fn handles_tags(&self) -> &[String] {
&self.tags
}
}
#[test]
fn test_dlq_captures_failures() {
let router = AlertRouter::new();
let failing_sink = Arc::new(FailingSink {
name: "failing".to_string(),
tags: vec![],
});
router.register_sink("failing", failing_sink);
let alert = Alert::new("Test", "Message");
router.emit(alert);
assert_eq!(router.dlq_size(), 1);
let dlq_entries = router.drain_dlq();
assert_eq!(dlq_entries.len(), 1);
assert_eq!(dlq_entries[0].sink_name, "failing");
assert!(dlq_entries[0].error.contains("Simulated failure"));
}
}