ringkernel_core/
alerting.rs

1//! Alert routing system for enterprise monitoring.
2//!
3//! This module provides a pluggable alert routing system with support for
4//! multiple destinations including webhooks (Slack, Teams, PagerDuty),
5//! email, and custom sinks.
6//!
7//! # Feature Flags
8//!
9//! - `alerting` - Enables HTTP webhook delivery (requires `reqwest` crate)
10//!
11//! # Example
12//!
13//! ```rust,ignore
14//! use ringkernel_core::alerting::{AlertRouter, Alert, AlertSeverity, WebhookSink};
15//!
16//! let router = AlertRouter::new()
17//!     .with_sink(WebhookSink::new("https://hooks.slack.com/services/...")
18//!         .with_severity_filter(AlertSeverity::Warning))
19//!     .with_sink(WebhookSink::pagerduty("your-integration-key")
20//!         .with_severity_filter(AlertSeverity::Critical));
21//!
22//! router.send(Alert::new(AlertSeverity::Critical, "GPU memory exhausted")
23//!     .with_source("kernel_1")
24//!     .with_metadata("gpu_id", "0")
25//!     .with_metadata("memory_used_gb", "24.5"));
26//! ```
27
28use async_trait::async_trait;
29use parking_lot::RwLock;
30use std::collections::HashMap;
31use std::fmt;
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::sync::Arc;
34use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
35
36// ============================================================================
37// ALERT SEVERITY
38// ============================================================================
39
40/// Alert severity levels.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
42#[repr(u8)]
43pub enum AlertSeverity {
44    /// Informational alert (FYI).
45    Info = 0,
46    /// Warning that may require attention.
47    Warning = 1,
48    /// Error that requires attention.
49    Error = 2,
50    /// Critical issue requiring immediate attention.
51    Critical = 3,
52}
53
54impl AlertSeverity {
55    /// Get the severity name.
56    pub fn as_str(&self) -> &'static str {
57        match self {
58            Self::Info => "INFO",
59            Self::Warning => "WARNING",
60            Self::Error => "ERROR",
61            Self::Critical => "CRITICAL",
62        }
63    }
64
65    /// Get a color for display (hex format).
66    pub fn color(&self) -> &'static str {
67        match self {
68            Self::Info => "#2196F3",     // Blue
69            Self::Warning => "#FF9800",  // Orange
70            Self::Error => "#F44336",    // Red
71            Self::Critical => "#9C27B0", // Purple
72        }
73    }
74}
75
76impl fmt::Display for AlertSeverity {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        write!(f, "{}", self.as_str())
79    }
80}
81
82// ============================================================================
83// ALERT
84// ============================================================================
85
86/// An alert to be routed to sinks.
87#[derive(Debug, Clone)]
88pub struct Alert {
89    /// Unique alert ID.
90    pub id: u64,
91    /// Alert severity.
92    pub severity: AlertSeverity,
93    /// Alert title/summary.
94    pub title: String,
95    /// Detailed description.
96    pub description: Option<String>,
97    /// Source of the alert (kernel ID, component name, etc.).
98    pub source: Option<String>,
99    /// When the alert was created.
100    pub timestamp: SystemTime,
101    /// Deduplication key (alerts with same key within window are deduplicated).
102    pub dedup_key: Option<String>,
103    /// Additional metadata.
104    pub metadata: HashMap<String, String>,
105    /// Alert tags for filtering.
106    pub tags: Vec<String>,
107}
108
109impl Alert {
110    /// Create a new alert.
111    pub fn new(severity: AlertSeverity, title: impl Into<String>) -> Self {
112        static ALERT_ID: AtomicU64 = AtomicU64::new(1);
113        Self {
114            id: ALERT_ID.fetch_add(1, Ordering::Relaxed),
115            severity,
116            title: title.into(),
117            description: None,
118            source: None,
119            timestamp: SystemTime::now(),
120            dedup_key: None,
121            metadata: HashMap::new(),
122            tags: Vec::new(),
123        }
124    }
125
126    /// Add a description.
127    pub fn with_description(mut self, description: impl Into<String>) -> Self {
128        self.description = Some(description.into());
129        self
130    }
131
132    /// Add a source.
133    pub fn with_source(mut self, source: impl Into<String>) -> Self {
134        self.source = Some(source.into());
135        self
136    }
137
138    /// Add a deduplication key.
139    pub fn with_dedup_key(mut self, key: impl Into<String>) -> Self {
140        self.dedup_key = Some(key.into());
141        self
142    }
143
144    /// Add metadata.
145    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
146        self.metadata.insert(key.into(), value.into());
147        self
148    }
149
150    /// Add a tag.
151    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
152        self.tags.push(tag.into());
153        self
154    }
155
156    /// Add multiple tags.
157    pub fn with_tags<I, S>(mut self, tags: I) -> Self
158    where
159        I: IntoIterator<Item = S>,
160        S: Into<String>,
161    {
162        self.tags.extend(tags.into_iter().map(Into::into));
163        self
164    }
165
166    /// Get timestamp as Unix milliseconds.
167    pub fn timestamp_millis(&self) -> u64 {
168        self.timestamp
169            .duration_since(UNIX_EPOCH)
170            .unwrap_or_default()
171            .as_millis() as u64
172    }
173
174    /// Format as JSON.
175    pub fn to_json(&self) -> String {
176        let source_str = self
177            .source
178            .as_ref()
179            .map(|s| format!(r#","source":"{}""#, escape_json(s)))
180            .unwrap_or_default();
181
182        let desc_str = self
183            .description
184            .as_ref()
185            .map(|s| format!(r#","description":"{}""#, escape_json(s)))
186            .unwrap_or_default();
187
188        let metadata_str = if self.metadata.is_empty() {
189            String::new()
190        } else {
191            let pairs: Vec<String> = self
192                .metadata
193                .iter()
194                .map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
195                .collect();
196            format!(r#","metadata":{{{}}}"#, pairs.join(","))
197        };
198
199        let tags_str = if self.tags.is_empty() {
200            String::new()
201        } else {
202            let tags: Vec<String> = self
203                .tags
204                .iter()
205                .map(|t| format!(r#""{}""#, escape_json(t)))
206                .collect();
207            format!(r#","tags":[{}]"#, tags.join(","))
208        };
209
210        format!(
211            r#"{{"id":{},"severity":"{}","title":"{}","timestamp":{}{}{}{}{}}}"#,
212            self.id,
213            self.severity.as_str(),
214            escape_json(&self.title),
215            self.timestamp_millis(),
216            source_str,
217            desc_str,
218            metadata_str,
219            tags_str,
220        )
221    }
222}
223
224/// Escape a string for JSON.
225fn escape_json(s: &str) -> String {
226    s.replace('\\', "\\\\")
227        .replace('"', "\\\"")
228        .replace('\n', "\\n")
229        .replace('\r', "\\r")
230        .replace('\t', "\\t")
231}
232
233// ============================================================================
234// ALERT SINK TRAIT
235// ============================================================================
236
237/// Error type for alert sink operations.
238#[derive(Debug, Clone)]
239pub enum AlertSinkError {
240    /// Connection/network error.
241    ConnectionError(String),
242    /// Rate limited.
243    RateLimited(String),
244    /// Configuration error.
245    ConfigError(String),
246    /// Other error.
247    Other(String),
248}
249
250impl fmt::Display for AlertSinkError {
251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252        match self {
253            Self::ConnectionError(msg) => write!(f, "Connection error: {}", msg),
254            Self::RateLimited(msg) => write!(f, "Rate limited: {}", msg),
255            Self::ConfigError(msg) => write!(f, "Config error: {}", msg),
256            Self::Other(msg) => write!(f, "Alert sink error: {}", msg),
257        }
258    }
259}
260
261impl std::error::Error for AlertSinkError {}
262
263/// Result type for alert sink operations.
264pub type AlertSinkResult<T> = Result<T, AlertSinkError>;
265
266/// Trait for pluggable alert destinations.
267#[async_trait]
268pub trait AlertSink: Send + Sync {
269    /// Send an alert to the sink.
270    async fn send(&self, alert: &Alert) -> AlertSinkResult<()>;
271
272    /// Get the sink name.
273    fn sink_name(&self) -> &str;
274
275    /// Check if this sink accepts alerts of the given severity.
276    fn accepts_severity(&self, severity: AlertSeverity) -> bool;
277
278    /// Health check the sink.
279    async fn health_check(&self) -> AlertSinkResult<()>;
280}
281
282// ============================================================================
283// LOG SINK (writes to tracing/log)
284// ============================================================================
285
286/// Alert sink that writes to the tracing/log system.
287pub struct LogSink {
288    /// Minimum severity to log.
289    min_severity: AlertSeverity,
290}
291
292impl LogSink {
293    /// Create a new log sink.
294    pub fn new() -> Self {
295        Self {
296            min_severity: AlertSeverity::Info,
297        }
298    }
299
300    /// Set minimum severity filter.
301    pub fn with_severity_filter(mut self, min: AlertSeverity) -> Self {
302        self.min_severity = min;
303        self
304    }
305}
306
307impl Default for LogSink {
308    fn default() -> Self {
309        Self::new()
310    }
311}
312
313#[async_trait]
314impl AlertSink for LogSink {
315    async fn send(&self, alert: &Alert) -> AlertSinkResult<()> {
316        if alert.severity < self.min_severity {
317            return Ok(());
318        }
319
320        let source = alert.source.as_deref().unwrap_or("unknown");
321        let msg = format!(
322            "[ALERT] {} | {} | {} | {}",
323            alert.severity,
324            source,
325            alert.title,
326            alert.description.as_deref().unwrap_or("")
327        );
328
329        match alert.severity {
330            AlertSeverity::Info => tracing::info!("{}", msg),
331            AlertSeverity::Warning => tracing::warn!("{}", msg),
332            AlertSeverity::Error => tracing::error!("{}", msg),
333            AlertSeverity::Critical => tracing::error!(target: "critical", "{}", msg),
334        }
335
336        Ok(())
337    }
338
339    fn sink_name(&self) -> &str {
340        "LogSink"
341    }
342
343    fn accepts_severity(&self, severity: AlertSeverity) -> bool {
344        severity >= self.min_severity
345    }
346
347    async fn health_check(&self) -> AlertSinkResult<()> {
348        Ok(())
349    }
350}
351
352// ============================================================================
353// IN-MEMORY SINK (for testing)
354// ============================================================================
355
356/// Alert sink that stores alerts in memory (for testing).
357pub struct InMemorySink {
358    alerts: RwLock<Vec<Alert>>,
359    max_alerts: usize,
360    min_severity: AlertSeverity,
361}
362
363impl InMemorySink {
364    /// Create a new in-memory sink.
365    pub fn new(max_alerts: usize) -> Self {
366        Self {
367            alerts: RwLock::new(Vec::new()),
368            max_alerts,
369            min_severity: AlertSeverity::Info,
370        }
371    }
372
373    /// Set minimum severity filter.
374    pub fn with_severity_filter(mut self, min: AlertSeverity) -> Self {
375        self.min_severity = min;
376        self
377    }
378
379    /// Get all stored alerts.
380    pub fn alerts(&self) -> Vec<Alert> {
381        self.alerts.read().clone()
382    }
383
384    /// Get the count of stored alerts.
385    pub fn count(&self) -> usize {
386        self.alerts.read().len()
387    }
388
389    /// Clear all stored alerts.
390    pub fn clear(&self) {
391        self.alerts.write().clear();
392    }
393}
394
395#[async_trait]
396impl AlertSink for InMemorySink {
397    async fn send(&self, alert: &Alert) -> AlertSinkResult<()> {
398        if alert.severity < self.min_severity {
399            return Ok(());
400        }
401
402        let mut alerts = self.alerts.write();
403        if alerts.len() >= self.max_alerts {
404            alerts.remove(0);
405        }
406        alerts.push(alert.clone());
407        Ok(())
408    }
409
410    fn sink_name(&self) -> &str {
411        "InMemorySink"
412    }
413
414    fn accepts_severity(&self, severity: AlertSeverity) -> bool {
415        severity >= self.min_severity
416    }
417
418    async fn health_check(&self) -> AlertSinkResult<()> {
419        Ok(())
420    }
421}
422
423// ============================================================================
424// WEBHOOK SINK (requires alerting feature)
425// ============================================================================
426
427/// Webhook alert sink for Slack, Teams, PagerDuty, etc.
428#[cfg(feature = "alerting")]
429pub struct WebhookSink {
430    /// Webhook URL.
431    url: String,
432    /// Minimum severity to send.
433    min_severity: AlertSeverity,
434    /// Custom headers.
435    headers: HashMap<String, String>,
436    /// Webhook format.
437    format: WebhookFormat,
438    /// HTTP client.
439    client: reqwest::Client,
440    /// Timeout.
441    timeout: Duration,
442}
443
444/// Webhook payload format.
445#[cfg(feature = "alerting")]
446#[derive(Debug, Clone, Copy, PartialEq, Eq)]
447pub enum WebhookFormat {
448    /// Generic JSON payload.
449    Json,
450    /// Slack incoming webhook format.
451    Slack,
452    /// Microsoft Teams incoming webhook format.
453    MsTeams,
454    /// PagerDuty Events API v2 format.
455    PagerDuty,
456}
457
458#[cfg(feature = "alerting")]
459impl WebhookSink {
460    /// Create a new webhook sink.
461    pub fn new(url: impl Into<String>) -> Self {
462        Self {
463            url: url.into(),
464            min_severity: AlertSeverity::Info,
465            headers: HashMap::new(),
466            format: WebhookFormat::Json,
467            client: reqwest::Client::new(),
468            timeout: Duration::from_secs(10),
469        }
470    }
471
472    /// Create a Slack webhook sink.
473    pub fn slack(url: impl Into<String>) -> Self {
474        Self::new(url).with_format(WebhookFormat::Slack)
475    }
476
477    /// Create a Microsoft Teams webhook sink.
478    pub fn teams(url: impl Into<String>) -> Self {
479        Self::new(url).with_format(WebhookFormat::MsTeams)
480    }
481
482    /// Create a PagerDuty webhook sink.
483    pub fn pagerduty(routing_key: impl Into<String>) -> Self {
484        Self::new("https://events.pagerduty.com/v2/enqueue")
485            .with_format(WebhookFormat::PagerDuty)
486            .with_header("X-Routing-Key", routing_key.into())
487    }
488
489    /// Set minimum severity filter.
490    pub fn with_severity_filter(mut self, min: AlertSeverity) -> Self {
491        self.min_severity = min;
492        self
493    }
494
495    /// Set webhook format.
496    pub fn with_format(mut self, format: WebhookFormat) -> Self {
497        self.format = format;
498        self
499    }
500
501    /// Add a custom header.
502    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
503        self.headers.insert(key.into(), value.into());
504        self
505    }
506
507    /// Set timeout.
508    pub fn with_timeout(mut self, timeout: Duration) -> Self {
509        self.timeout = timeout;
510        self
511    }
512
513    /// Format alert as webhook payload.
514    fn format_payload(&self, alert: &Alert) -> String {
515        match self.format {
516            WebhookFormat::Json => alert.to_json(),
517            WebhookFormat::Slack => self.format_slack(alert),
518            WebhookFormat::MsTeams => self.format_teams(alert),
519            WebhookFormat::PagerDuty => self.format_pagerduty(alert),
520        }
521    }
522
523    fn format_slack(&self, alert: &Alert) -> String {
524        let desc = alert.description.as_deref().unwrap_or("");
525        let source = alert.source.as_deref().unwrap_or("unknown");
526
527        format!(
528            r#"{{"attachments":[{{"color":"{}","title":"[{}] {}","text":"{}","fields":[{{"title":"Source","value":"{}","short":true}},{{"title":"Time","value":"{}","short":true}}]}}]}}"#,
529            alert.severity.color(),
530            alert.severity.as_str(),
531            escape_json(&alert.title),
532            escape_json(desc),
533            escape_json(source),
534            alert.timestamp_millis(),
535        )
536    }
537
538    fn format_teams(&self, alert: &Alert) -> String {
539        let desc = alert.description.as_deref().unwrap_or("");
540        let source = alert.source.as_deref().unwrap_or("unknown");
541
542        format!(
543            r#"{{"@type":"MessageCard","@context":"http://schema.org/extensions","themeColor":"{}","title":"[{}] {}","text":"{}","sections":[{{"facts":[{{"name":"Source","value":"{}"}},{{"name":"Severity","value":"{}"}}]}}]}}"#,
544            alert.severity.color().trim_start_matches('#'),
545            alert.severity.as_str(),
546            escape_json(&alert.title),
547            escape_json(desc),
548            escape_json(source),
549            alert.severity.as_str(),
550        )
551    }
552
553    fn format_pagerduty(&self, alert: &Alert) -> String {
554        let severity_pd = match alert.severity {
555            AlertSeverity::Info => "info",
556            AlertSeverity::Warning => "warning",
557            AlertSeverity::Error => "error",
558            AlertSeverity::Critical => "critical",
559        };
560
561        let dedup = alert.dedup_key.as_deref().unwrap_or(&alert.title);
562        let source = alert.source.as_deref().unwrap_or("ringkernel");
563
564        format!(
565            r#"{{"routing_key":"{}","event_action":"trigger","dedup_key":"{}","payload":{{"summary":"{}","source":"{}","severity":"{}","timestamp":"{}"}}}}"#,
566            self.headers.get("X-Routing-Key").unwrap_or(&String::new()),
567            escape_json(dedup),
568            escape_json(&alert.title),
569            escape_json(source),
570            severity_pd,
571            chrono_timestamp(alert.timestamp),
572        )
573    }
574}
575
576#[cfg(feature = "alerting")]
577fn chrono_timestamp(time: SystemTime) -> String {
578    let millis = time
579        .duration_since(UNIX_EPOCH)
580        .unwrap_or_default()
581        .as_millis();
582    // Format as ISO 8601
583    format!("{}Z", millis / 1000)
584}
585
586#[cfg(feature = "alerting")]
587#[async_trait]
588impl AlertSink for WebhookSink {
589    async fn send(&self, alert: &Alert) -> AlertSinkResult<()> {
590        if alert.severity < self.min_severity {
591            return Ok(());
592        }
593
594        let payload = self.format_payload(alert);
595        let mut request = self
596            .client
597            .post(&self.url)
598            .header("Content-Type", "application/json")
599            .body(payload)
600            .timeout(self.timeout);
601
602        for (key, value) in &self.headers {
603            request = request.header(key, value);
604        }
605
606        let response = request.send().await.map_err(|e| {
607            AlertSinkError::ConnectionError(format!("Webhook request failed: {}", e))
608        })?;
609
610        if response.status().is_success() {
611            Ok(())
612        } else {
613            Err(AlertSinkError::Other(format!(
614                "Webhook returned status: {}",
615                response.status()
616            )))
617        }
618    }
619
620    fn sink_name(&self) -> &str {
621        "WebhookSink"
622    }
623
624    fn accepts_severity(&self, severity: AlertSeverity) -> bool {
625        severity >= self.min_severity
626    }
627
628    async fn health_check(&self) -> AlertSinkResult<()> {
629        // Simple check that we can reach the URL
630        Ok(())
631    }
632}
633
634// ============================================================================
635// ALERT ROUTER
636// ============================================================================
637
638/// Configuration for alert deduplication.
639#[derive(Debug, Clone)]
640pub struct DeduplicationConfig {
641    /// Time window for deduplication.
642    pub window: Duration,
643    /// Maximum entries in dedup cache.
644    pub max_entries: usize,
645}
646
647impl Default for DeduplicationConfig {
648    fn default() -> Self {
649        Self {
650            window: Duration::from_secs(300), // 5 minutes
651            max_entries: 10000,
652        }
653    }
654}
655
656/// Alert router that sends alerts to multiple sinks.
657pub struct AlertRouter {
658    /// Registered sinks.
659    sinks: Vec<Arc<dyn AlertSink>>,
660    /// Deduplication cache.
661    dedup_cache: RwLock<HashMap<String, Instant>>,
662    /// Deduplication config.
663    dedup_config: DeduplicationConfig,
664    /// Alert counter.
665    alert_count: AtomicU64,
666    /// Deduplicated count.
667    dedup_count: AtomicU64,
668}
669
670impl AlertRouter {
671    /// Create a new alert router.
672    pub fn new() -> Self {
673        Self {
674            sinks: Vec::new(),
675            dedup_cache: RwLock::new(HashMap::new()),
676            dedup_config: DeduplicationConfig::default(),
677            alert_count: AtomicU64::new(0),
678            dedup_count: AtomicU64::new(0),
679        }
680    }
681
682    /// Set deduplication config.
683    pub fn with_deduplication(mut self, config: DeduplicationConfig) -> Self {
684        self.dedup_config = config;
685        self
686    }
687
688    /// Add a sink.
689    pub fn with_sink<S: AlertSink + 'static>(mut self, sink: S) -> Self {
690        self.sinks.push(Arc::new(sink));
691        self
692    }
693
694    /// Add a sink (Arc version).
695    pub fn with_sink_arc(mut self, sink: Arc<dyn AlertSink>) -> Self {
696        self.sinks.push(sink);
697        self
698    }
699
700    /// Send an alert to all appropriate sinks.
701    pub async fn send(&self, alert: Alert) {
702        self.alert_count.fetch_add(1, Ordering::Relaxed);
703
704        // Check deduplication
705        if let Some(dedup_key) = &alert.dedup_key {
706            let now = Instant::now();
707            let mut cache = self.dedup_cache.write();
708
709            // Clean old entries
710            cache.retain(|_, instant| now.duration_since(*instant) < self.dedup_config.window);
711
712            // Check if duplicate
713            if let Some(last_seen) = cache.get(dedup_key) {
714                if now.duration_since(*last_seen) < self.dedup_config.window {
715                    self.dedup_count.fetch_add(1, Ordering::Relaxed);
716                    return; // Deduplicated
717                }
718            }
719
720            // Record this alert
721            if cache.len() < self.dedup_config.max_entries {
722                cache.insert(dedup_key.clone(), now);
723            }
724        }
725
726        // Send to all sinks that accept this severity
727        for sink in &self.sinks {
728            if sink.accepts_severity(alert.severity) {
729                if let Err(e) = sink.send(&alert).await {
730                    tracing::error!("Alert sink {} failed: {}", sink.sink_name(), e);
731                }
732            }
733        }
734    }
735
736    /// Send an alert synchronously (spawns async task).
737    pub fn send_sync(&self, alert: Alert) {
738        // This is a simplified version that logs immediately
739        // In production, you'd spawn a task or use a channel
740        self.alert_count.fetch_add(1, Ordering::Relaxed);
741
742        let source = alert.source.as_deref().unwrap_or("unknown");
743        let msg = format!("[ALERT] {} | {} | {}", alert.severity, source, alert.title);
744
745        match alert.severity {
746            AlertSeverity::Info => tracing::info!("{}", msg),
747            AlertSeverity::Warning => tracing::warn!("{}", msg),
748            AlertSeverity::Error | AlertSeverity::Critical => tracing::error!("{}", msg),
749        }
750    }
751
752    /// Get statistics.
753    pub fn stats(&self) -> AlertRouterStats {
754        AlertRouterStats {
755            total_alerts: self.alert_count.load(Ordering::Relaxed),
756            deduplicated: self.dedup_count.load(Ordering::Relaxed),
757            sinks: self.sinks.len(),
758        }
759    }
760}
761
762impl Default for AlertRouter {
763    fn default() -> Self {
764        Self::new()
765    }
766}
767
768/// Alert router statistics.
769#[derive(Debug, Clone)]
770pub struct AlertRouterStats {
771    /// Total alerts received.
772    pub total_alerts: u64,
773    /// Alerts deduplicated.
774    pub deduplicated: u64,
775    /// Number of sinks.
776    pub sinks: usize,
777}
778
779// ============================================================================
780// TESTS
781// ============================================================================
782
783#[cfg(test)]
784mod tests {
785    use super::*;
786
787    #[test]
788    fn test_alert_severity_ordering() {
789        assert!(AlertSeverity::Info < AlertSeverity::Warning);
790        assert!(AlertSeverity::Warning < AlertSeverity::Error);
791        assert!(AlertSeverity::Error < AlertSeverity::Critical);
792    }
793
794    #[test]
795    fn test_alert_creation() {
796        let alert = Alert::new(AlertSeverity::Warning, "Test alert")
797            .with_description("This is a test")
798            .with_source("test_kernel")
799            .with_metadata("key1", "value1")
800            .with_tag("test");
801
802        assert_eq!(alert.severity, AlertSeverity::Warning);
803        assert_eq!(alert.title, "Test alert");
804        assert_eq!(alert.description, Some("This is a test".to_string()));
805        assert_eq!(alert.source, Some("test_kernel".to_string()));
806        assert_eq!(alert.metadata.get("key1"), Some(&"value1".to_string()));
807        assert!(alert.tags.contains(&"test".to_string()));
808    }
809
810    #[test]
811    fn test_alert_json() {
812        let alert = Alert::new(AlertSeverity::Error, "Test").with_source("kernel_1");
813
814        let json = alert.to_json();
815        assert!(json.contains("ERROR"));
816        assert!(json.contains("Test"));
817        assert!(json.contains("kernel_1"));
818    }
819
820    #[tokio::test]
821    async fn test_in_memory_sink() {
822        let sink = InMemorySink::new(10);
823
824        let alert = Alert::new(AlertSeverity::Warning, "Test alert");
825        sink.send(&alert).await.unwrap();
826
827        assert_eq!(sink.count(), 1);
828        let alerts = sink.alerts();
829        assert_eq!(alerts[0].title, "Test alert");
830    }
831
832    #[tokio::test]
833    async fn test_in_memory_sink_severity_filter() {
834        let sink = InMemorySink::new(10).with_severity_filter(AlertSeverity::Error);
835
836        // Info alert should be filtered
837        let info = Alert::new(AlertSeverity::Info, "Info");
838        sink.send(&info).await.unwrap();
839        assert_eq!(sink.count(), 0);
840
841        // Error alert should pass
842        let error = Alert::new(AlertSeverity::Error, "Error");
843        sink.send(&error).await.unwrap();
844        assert_eq!(sink.count(), 1);
845    }
846
847    #[tokio::test]
848    async fn test_alert_router() {
849        let sink = Arc::new(InMemorySink::new(100));
850        let router = AlertRouter::new().with_sink_arc(sink.clone());
851
852        router
853            .send(Alert::new(AlertSeverity::Warning, "Alert 1"))
854            .await;
855        router
856            .send(Alert::new(AlertSeverity::Error, "Alert 2"))
857            .await;
858
859        assert_eq!(sink.count(), 2);
860        assert_eq!(router.stats().total_alerts, 2);
861    }
862
863    #[tokio::test]
864    async fn test_alert_deduplication() {
865        let sink = Arc::new(InMemorySink::new(100));
866        let router = AlertRouter::new()
867            .with_deduplication(DeduplicationConfig {
868                window: Duration::from_secs(60),
869                max_entries: 100,
870            })
871            .with_sink_arc(sink.clone());
872
873        // Send same alert multiple times with dedup key
874        for _ in 0..5 {
875            router
876                .send(
877                    Alert::new(AlertSeverity::Warning, "Repeated alert").with_dedup_key("same-key"),
878                )
879                .await;
880        }
881
882        // Only first should get through
883        assert_eq!(sink.count(), 1);
884        assert_eq!(router.stats().deduplicated, 4);
885    }
886
887    #[tokio::test]
888    async fn test_log_sink() {
889        let sink = LogSink::new().with_severity_filter(AlertSeverity::Warning);
890
891        let info = Alert::new(AlertSeverity::Info, "Info");
892        assert!(!sink.accepts_severity(info.severity));
893
894        let warning = Alert::new(AlertSeverity::Warning, "Warning");
895        assert!(sink.accepts_severity(warning.severity));
896
897        // Just verify it doesn't panic
898        sink.send(&warning).await.unwrap();
899    }
900}