Skip to main content

ringkernel_core/
alerting.rs

1//! Alert routing system for enterprise monitoring.
2//!
3//! This module provides a pluggable alert routing system with support for
4//! multiple destinations including webhooks (Slack, Teams, PagerDuty),
5//! email, and custom sinks.
6//!
7//! # Feature Flags
8//!
9//! - `alerting` - Enables HTTP webhook delivery (requires `reqwest` crate)
10//!
11//! # Example
12//!
13//! ```rust,ignore
14//! use ringkernel_core::alerting::{AlertRouter, Alert, AlertSeverity, WebhookSink};
15//!
16//! let router = AlertRouter::new()
17//!     .with_sink(WebhookSink::new("https://hooks.slack.com/services/...")
18//!         .with_severity_filter(AlertSeverity::Warning))
19//!     .with_sink(WebhookSink::pagerduty("your-integration-key")
20//!         .with_severity_filter(AlertSeverity::Critical));
21//!
22//! router.send(Alert::new(AlertSeverity::Critical, "GPU memory exhausted")
23//!     .with_source("kernel_1")
24//!     .with_metadata("gpu_id", "0")
25//!     .with_metadata("memory_used_gb", "24.5"));
26//! ```
27
28use async_trait::async_trait;
29use parking_lot::RwLock;
30use std::collections::HashMap;
31use std::fmt;
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::sync::Arc;
34use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
35
36// ============================================================================
37// ALERT SEVERITY
38// ============================================================================
39
40/// Alert severity levels.
41#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
42#[repr(u8)]
43pub enum AlertSeverity {
44    /// Informational alert (FYI).
45    Info = 0,
46    /// Warning that may require attention.
47    Warning = 1,
48    /// Error that requires attention.
49    Error = 2,
50    /// Critical issue requiring immediate attention.
51    Critical = 3,
52}
53
54impl AlertSeverity {
55    /// Get the severity name.
56    pub fn as_str(&self) -> &'static str {
57        match self {
58            Self::Info => "INFO",
59            Self::Warning => "WARNING",
60            Self::Error => "ERROR",
61            Self::Critical => "CRITICAL",
62        }
63    }
64
65    /// Get a color for display (hex format).
66    pub fn color(&self) -> &'static str {
67        match self {
68            Self::Info => "#2196F3",     // Blue
69            Self::Warning => "#FF9800",  // Orange
70            Self::Error => "#F44336",    // Red
71            Self::Critical => "#9C27B0", // Purple
72        }
73    }
74}
75
76impl fmt::Display for AlertSeverity {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        write!(f, "{}", self.as_str())
79    }
80}
81
82// ============================================================================
83// ALERT
84// ============================================================================
85
86/// An alert to be routed to sinks.
87#[derive(Debug, Clone)]
88pub struct Alert {
89    /// Unique alert ID.
90    pub id: u64,
91    /// Alert severity.
92    pub severity: AlertSeverity,
93    /// Alert title/summary.
94    pub title: String,
95    /// Detailed description.
96    pub description: Option<String>,
97    /// Source of the alert (kernel ID, component name, etc.).
98    pub source: Option<String>,
99    /// When the alert was created.
100    pub timestamp: SystemTime,
101    /// Deduplication key (alerts with same key within window are deduplicated).
102    pub dedup_key: Option<String>,
103    /// Additional metadata.
104    pub metadata: HashMap<String, String>,
105    /// Alert tags for filtering.
106    pub tags: Vec<String>,
107}
108
109impl Alert {
110    /// Create a new alert.
111    pub fn new(severity: AlertSeverity, title: impl Into<String>) -> Self {
112        static ALERT_ID: AtomicU64 = AtomicU64::new(1);
113        Self {
114            id: ALERT_ID.fetch_add(1, Ordering::Relaxed),
115            severity,
116            title: title.into(),
117            description: None,
118            source: None,
119            timestamp: SystemTime::now(),
120            dedup_key: None,
121            metadata: HashMap::new(),
122            tags: Vec::new(),
123        }
124    }
125
126    /// Add a description.
127    pub fn with_description(mut self, description: impl Into<String>) -> Self {
128        self.description = Some(description.into());
129        self
130    }
131
132    /// Add a source.
133    pub fn with_source(mut self, source: impl Into<String>) -> Self {
134        self.source = Some(source.into());
135        self
136    }
137
138    /// Add a deduplication key.
139    pub fn with_dedup_key(mut self, key: impl Into<String>) -> Self {
140        self.dedup_key = Some(key.into());
141        self
142    }
143
144    /// Add metadata.
145    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
146        self.metadata.insert(key.into(), value.into());
147        self
148    }
149
150    /// Add a tag.
151    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
152        self.tags.push(tag.into());
153        self
154    }
155
156    /// Add multiple tags.
157    pub fn with_tags<I, S>(mut self, tags: I) -> Self
158    where
159        I: IntoIterator<Item = S>,
160        S: Into<String>,
161    {
162        self.tags.extend(tags.into_iter().map(Into::into));
163        self
164    }
165
166    /// Get timestamp as Unix milliseconds.
167    pub fn timestamp_millis(&self) -> u64 {
168        self.timestamp
169            .duration_since(UNIX_EPOCH)
170            .unwrap_or_default()
171            .as_millis() as u64
172    }
173
174    /// Format as JSON.
175    pub fn to_json(&self) -> String {
176        let source_str = self
177            .source
178            .as_ref()
179            .map(|s| format!(r#","source":"{}""#, escape_json(s)))
180            .unwrap_or_default();
181
182        let desc_str = self
183            .description
184            .as_ref()
185            .map(|s| format!(r#","description":"{}""#, escape_json(s)))
186            .unwrap_or_default();
187
188        let metadata_str = if self.metadata.is_empty() {
189            String::new()
190        } else {
191            let pairs: Vec<String> = self
192                .metadata
193                .iter()
194                .map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
195                .collect();
196            format!(r#","metadata":{{{}}}"#, pairs.join(","))
197        };
198
199        let tags_str = if self.tags.is_empty() {
200            String::new()
201        } else {
202            let tags: Vec<String> = self
203                .tags
204                .iter()
205                .map(|t| format!(r#""{}""#, escape_json(t)))
206                .collect();
207            format!(r#","tags":[{}]"#, tags.join(","))
208        };
209
210        format!(
211            r#"{{"id":{},"severity":"{}","title":"{}","timestamp":{}{}{}{}{}}}"#,
212            self.id,
213            self.severity.as_str(),
214            escape_json(&self.title),
215            self.timestamp_millis(),
216            source_str,
217            desc_str,
218            metadata_str,
219            tags_str,
220        )
221    }
222}
223
224/// Escape a string for JSON.
225fn escape_json(s: &str) -> String {
226    s.replace('\\', "\\\\")
227        .replace('"', "\\\"")
228        .replace('\n', "\\n")
229        .replace('\r', "\\r")
230        .replace('\t', "\\t")
231}
232
233// ============================================================================
234// ALERT SINK TRAIT
235// ============================================================================
236
237/// Error type for alert sink operations.
238#[derive(Debug, Clone)]
239pub enum AlertSinkError {
240    /// Connection/network error.
241    ConnectionError(String),
242    /// Rate limited.
243    RateLimited(String),
244    /// Configuration error.
245    ConfigError(String),
246    /// Other error.
247    Other(String),
248}
249
250impl fmt::Display for AlertSinkError {
251    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252        match self {
253            Self::ConnectionError(msg) => write!(f, "Connection error: {}", msg),
254            Self::RateLimited(msg) => write!(f, "Rate limited: {}", msg),
255            Self::ConfigError(msg) => write!(f, "Config error: {}", msg),
256            Self::Other(msg) => write!(f, "Alert sink error: {}", msg),
257        }
258    }
259}
260
261impl std::error::Error for AlertSinkError {}
262
263/// Result type for alert sink operations.
264pub type AlertSinkResult<T> = Result<T, AlertSinkError>;
265
266/// Trait for pluggable alert destinations.
267#[async_trait]
268pub trait AlertSink: Send + Sync {
269    /// Send an alert to the sink.
270    async fn send(&self, alert: &Alert) -> AlertSinkResult<()>;
271
272    /// Get the sink name.
273    fn sink_name(&self) -> &str;
274
275    /// Check if this sink accepts alerts of the given severity.
276    fn accepts_severity(&self, severity: AlertSeverity) -> bool;
277
278    /// Health check the sink.
279    async fn health_check(&self) -> AlertSinkResult<()>;
280}
281
282// ============================================================================
283// LOG SINK (writes to tracing/log)
284// ============================================================================
285
286/// Alert sink that writes to the tracing/log system.
287pub struct LogSink {
288    /// Minimum severity to log.
289    min_severity: AlertSeverity,
290}
291
292impl LogSink {
293    /// Create a new log sink.
294    pub fn new() -> Self {
295        Self {
296            min_severity: AlertSeverity::Info,
297        }
298    }
299
300    /// Set minimum severity filter.
301    pub fn with_severity_filter(mut self, min: AlertSeverity) -> Self {
302        self.min_severity = min;
303        self
304    }
305}
306
307impl Default for LogSink {
308    fn default() -> Self {
309        Self::new()
310    }
311}
312
313#[async_trait]
314impl AlertSink for LogSink {
315    async fn send(&self, alert: &Alert) -> AlertSinkResult<()> {
316        if alert.severity < self.min_severity {
317            return Ok(());
318        }
319
320        let source = alert.source.as_deref().unwrap_or("unknown");
321        let msg = format!(
322            "[ALERT] {} | {} | {} | {}",
323            alert.severity,
324            source,
325            alert.title,
326            alert.description.as_deref().unwrap_or("")
327        );
328
329        match alert.severity {
330            AlertSeverity::Info => tracing::info!("{}", msg),
331            AlertSeverity::Warning => tracing::warn!("{}", msg),
332            AlertSeverity::Error => tracing::error!("{}", msg),
333            AlertSeverity::Critical => tracing::error!(target: "critical", "{}", msg),
334        }
335
336        Ok(())
337    }
338
339    fn sink_name(&self) -> &str {
340        "LogSink"
341    }
342
343    fn accepts_severity(&self, severity: AlertSeverity) -> bool {
344        severity >= self.min_severity
345    }
346
347    async fn health_check(&self) -> AlertSinkResult<()> {
348        Ok(())
349    }
350}
351
352// ============================================================================
353// IN-MEMORY SINK (for testing)
354// ============================================================================
355
356/// Alert sink that stores alerts in memory (for testing).
357pub struct InMemorySink {
358    alerts: RwLock<Vec<Alert>>,
359    max_alerts: usize,
360    min_severity: AlertSeverity,
361}
362
363impl InMemorySink {
364    /// Create a new in-memory sink.
365    pub fn new(max_alerts: usize) -> Self {
366        Self {
367            alerts: RwLock::new(Vec::new()),
368            max_alerts,
369            min_severity: AlertSeverity::Info,
370        }
371    }
372
373    /// Set minimum severity filter.
374    pub fn with_severity_filter(mut self, min: AlertSeverity) -> Self {
375        self.min_severity = min;
376        self
377    }
378
379    /// Get all stored alerts.
380    pub fn alerts(&self) -> Vec<Alert> {
381        self.alerts.read().clone()
382    }
383
384    /// Get the count of stored alerts.
385    pub fn count(&self) -> usize {
386        self.alerts.read().len()
387    }
388
389    /// Clear all stored alerts.
390    pub fn clear(&self) {
391        self.alerts.write().clear();
392    }
393}
394
395#[async_trait]
396impl AlertSink for InMemorySink {
397    async fn send(&self, alert: &Alert) -> AlertSinkResult<()> {
398        if alert.severity < self.min_severity {
399            return Ok(());
400        }
401
402        let mut alerts = self.alerts.write();
403        if alerts.len() >= self.max_alerts {
404            alerts.remove(0);
405        }
406        alerts.push(alert.clone());
407        Ok(())
408    }
409
410    fn sink_name(&self) -> &str {
411        "InMemorySink"
412    }
413
414    fn accepts_severity(&self, severity: AlertSeverity) -> bool {
415        severity >= self.min_severity
416    }
417
418    async fn health_check(&self) -> AlertSinkResult<()> {
419        Ok(())
420    }
421}
422
423// ============================================================================
424// WEBHOOK SINK (requires alerting feature)
425// ============================================================================
426
427/// Webhook alert sink for Slack, Teams, PagerDuty, etc.
428#[cfg(feature = "alerting")]
429pub struct WebhookSink {
430    /// Webhook URL.
431    url: String,
432    /// Minimum severity to send.
433    min_severity: AlertSeverity,
434    /// Custom headers.
435    headers: HashMap<String, String>,
436    /// Webhook format.
437    format: WebhookFormat,
438    /// HTTP client.
439    client: reqwest::Client,
440    /// Timeout.
441    timeout: Duration,
442}
443
444/// Webhook payload format.
445#[cfg(feature = "alerting")]
446#[derive(Debug, Clone, Copy, PartialEq, Eq)]
447pub enum WebhookFormat {
448    /// Generic JSON payload.
449    Json,
450    /// Slack incoming webhook format.
451    Slack,
452    /// Microsoft Teams incoming webhook format.
453    MsTeams,
454    /// PagerDuty Events API v2 format.
455    PagerDuty,
456}
457
458#[cfg(feature = "alerting")]
459impl WebhookSink {
460    /// Create a new webhook sink.
461    pub fn new(url: impl Into<String>) -> Self {
462        Self {
463            url: url.into(),
464            min_severity: AlertSeverity::Info,
465            headers: HashMap::new(),
466            format: WebhookFormat::Json,
467            client: reqwest::Client::new(),
468            timeout: Duration::from_secs(10),
469        }
470    }
471
472    /// Create a Slack webhook sink.
473    pub fn slack(url: impl Into<String>) -> Self {
474        Self::new(url).with_format(WebhookFormat::Slack)
475    }
476
477    /// Create a Microsoft Teams webhook sink.
478    pub fn teams(url: impl Into<String>) -> Self {
479        Self::new(url).with_format(WebhookFormat::MsTeams)
480    }
481
482    /// Create a PagerDuty webhook sink.
483    pub fn pagerduty(routing_key: impl Into<String>) -> Self {
484        Self::new("https://events.pagerduty.com/v2/enqueue")
485            .with_format(WebhookFormat::PagerDuty)
486            .with_header("X-Routing-Key", routing_key.into())
487    }
488
489    /// Set minimum severity filter.
490    pub fn with_severity_filter(mut self, min: AlertSeverity) -> Self {
491        self.min_severity = min;
492        self
493    }
494
495    /// Set webhook format.
496    pub fn with_format(mut self, format: WebhookFormat) -> Self {
497        self.format = format;
498        self
499    }
500
501    /// Add a custom header.
502    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
503        self.headers.insert(key.into(), value.into());
504        self
505    }
506
507    /// Set timeout.
508    pub fn with_timeout(mut self, timeout: Duration) -> Self {
509        self.timeout = timeout;
510        self
511    }
512
513    /// Format alert as webhook payload.
514    fn format_payload(&self, alert: &Alert) -> String {
515        match self.format {
516            WebhookFormat::Json => alert.to_json(),
517            WebhookFormat::Slack => self.format_slack(alert),
518            WebhookFormat::MsTeams => self.format_teams(alert),
519            WebhookFormat::PagerDuty => self.format_pagerduty(alert),
520        }
521    }
522
523    fn format_slack(&self, alert: &Alert) -> String {
524        let desc = alert.description.as_deref().unwrap_or("");
525        let source = alert.source.as_deref().unwrap_or("unknown");
526
527        format!(
528            r#"{{"attachments":[{{"color":"{}","title":"[{}] {}","text":"{}","fields":[{{"title":"Source","value":"{}","short":true}},{{"title":"Time","value":"{}","short":true}}]}}]}}"#,
529            alert.severity.color(),
530            alert.severity.as_str(),
531            escape_json(&alert.title),
532            escape_json(desc),
533            escape_json(source),
534            alert.timestamp_millis(),
535        )
536    }
537
538    fn format_teams(&self, alert: &Alert) -> String {
539        let desc = alert.description.as_deref().unwrap_or("");
540        let source = alert.source.as_deref().unwrap_or("unknown");
541
542        format!(
543            r#"{{"@type":"MessageCard","@context":"http://schema.org/extensions","themeColor":"{}","title":"[{}] {}","text":"{}","sections":[{{"facts":[{{"name":"Source","value":"{}"}},{{"name":"Severity","value":"{}"}}]}}]}}"#,
544            alert.severity.color().trim_start_matches('#'),
545            alert.severity.as_str(),
546            escape_json(&alert.title),
547            escape_json(desc),
548            escape_json(source),
549            alert.severity.as_str(),
550        )
551    }
552
553    fn format_pagerduty(&self, alert: &Alert) -> String {
554        let severity_pd = match alert.severity {
555            AlertSeverity::Info => "info",
556            AlertSeverity::Warning => "warning",
557            AlertSeverity::Error => "error",
558            AlertSeverity::Critical => "critical",
559        };
560
561        let dedup = alert.dedup_key.as_deref().unwrap_or(&alert.title);
562        let source = alert.source.as_deref().unwrap_or("ringkernel");
563
564        format!(
565            r#"{{"routing_key":"{}","event_action":"trigger","dedup_key":"{}","payload":{{"summary":"{}","source":"{}","severity":"{}","timestamp":"{}"}}}}"#,
566            self.headers.get("X-Routing-Key").unwrap_or(&String::new()),
567            escape_json(dedup),
568            escape_json(&alert.title),
569            escape_json(source),
570            severity_pd,
571            chrono_timestamp(alert.timestamp),
572        )
573    }
574}
575
576#[cfg(feature = "alerting")]
577fn chrono_timestamp(time: SystemTime) -> String {
578    let millis = time
579        .duration_since(UNIX_EPOCH)
580        .unwrap_or_default()
581        .as_millis();
582    // Format as ISO 8601
583    format!("{}Z", millis / 1000)
584}
585
586#[cfg(feature = "alerting")]
587#[async_trait]
588impl AlertSink for WebhookSink {
589    async fn send(&self, alert: &Alert) -> AlertSinkResult<()> {
590        if alert.severity < self.min_severity {
591            return Ok(());
592        }
593
594        let payload = self.format_payload(alert);
595        let mut request = self
596            .client
597            .post(&self.url)
598            .header("Content-Type", "application/json")
599            .body(payload)
600            .timeout(self.timeout);
601
602        for (key, value) in &self.headers {
603            request = request.header(key, value);
604        }
605
606        let response = request.send().await.map_err(|e| {
607            AlertSinkError::ConnectionError(format!("Webhook request failed: {}", e))
608        })?;
609
610        if response.status().is_success() {
611            Ok(())
612        } else {
613            Err(AlertSinkError::Other(format!(
614                "Webhook returned status: {}",
615                response.status()
616            )))
617        }
618    }
619
620    fn sink_name(&self) -> &str {
621        "WebhookSink"
622    }
623
624    fn accepts_severity(&self, severity: AlertSeverity) -> bool {
625        severity >= self.min_severity
626    }
627
628    async fn health_check(&self) -> AlertSinkResult<()> {
629        // Simple check that we can reach the URL
630        Ok(())
631    }
632}
633
634// ============================================================================
635// ALERT ROUTER
636// ============================================================================
637
638/// Configuration for alert deduplication.
639#[derive(Debug, Clone)]
640pub struct DeduplicationConfig {
641    /// Time window for deduplication.
642    pub window: Duration,
643    /// Maximum entries in dedup cache.
644    pub max_entries: usize,
645}
646
647impl Default for DeduplicationConfig {
648    fn default() -> Self {
649        Self {
650            window: Duration::from_secs(300), // 5 minutes
651            max_entries: 10000,
652        }
653    }
654}
655
656/// Alert router that sends alerts to multiple sinks.
657pub struct AlertRouter {
658    /// Registered sinks.
659    sinks: Vec<Arc<dyn AlertSink>>,
660    /// Deduplication cache.
661    dedup_cache: RwLock<HashMap<String, Instant>>,
662    /// Deduplication config.
663    dedup_config: DeduplicationConfig,
664    /// Alert counter.
665    alert_count: AtomicU64,
666    /// Deduplicated count.
667    dedup_count: AtomicU64,
668}
669
670impl AlertRouter {
671    /// Create a new alert router.
672    pub fn new() -> Self {
673        Self {
674            sinks: Vec::new(),
675            dedup_cache: RwLock::new(HashMap::new()),
676            dedup_config: DeduplicationConfig::default(),
677            alert_count: AtomicU64::new(0),
678            dedup_count: AtomicU64::new(0),
679        }
680    }
681
682    /// Set deduplication config.
683    pub fn with_deduplication(mut self, config: DeduplicationConfig) -> Self {
684        self.dedup_config = config;
685        self
686    }
687
688    /// Add a sink.
689    pub fn with_sink<S: AlertSink + 'static>(mut self, sink: S) -> Self {
690        self.sinks.push(Arc::new(sink));
691        self
692    }
693
694    /// Add a sink (Arc version).
695    pub fn with_sink_arc(mut self, sink: Arc<dyn AlertSink>) -> Self {
696        self.sinks.push(sink);
697        self
698    }
699
700    /// Send an alert to all appropriate sinks.
701    pub async fn send(&self, alert: Alert) {
702        self.alert_count.fetch_add(1, Ordering::Relaxed);
703
704        // Check deduplication
705        if let Some(dedup_key) = &alert.dedup_key {
706            let now = Instant::now();
707            let mut cache = self.dedup_cache.write();
708
709            // Clean old entries
710            cache.retain(|_, instant| now.duration_since(*instant) < self.dedup_config.window);
711
712            // Check if duplicate
713            if let Some(last_seen) = cache.get(dedup_key) {
714                if now.duration_since(*last_seen) < self.dedup_config.window {
715                    self.dedup_count.fetch_add(1, Ordering::Relaxed);
716                    return; // Deduplicated
717                }
718            }
719
720            // Record this alert
721            if cache.len() < self.dedup_config.max_entries {
722                cache.insert(dedup_key.clone(), now);
723            }
724        }
725
726        // Send to all sinks that accept this severity
727        for sink in &self.sinks {
728            if sink.accepts_severity(alert.severity) {
729                if let Err(e) = sink.send(&alert).await {
730                    tracing::error!("Alert sink {} failed: {}", sink.sink_name(), e);
731                }
732            }
733        }
734    }
735
736    /// Send an alert synchronously (spawns async task).
737    pub fn send_sync(&self, alert: Alert) {
738        // This is a simplified version that logs immediately
739        // In production, you'd spawn a task or use a channel
740        self.alert_count.fetch_add(1, Ordering::Relaxed);
741
742        let source = alert.source.as_deref().unwrap_or("unknown");
743        let msg = format!("[ALERT] {} | {} | {}", alert.severity, source, alert.title);
744
745        match alert.severity {
746            AlertSeverity::Info => tracing::info!("{}", msg),
747            AlertSeverity::Warning => tracing::warn!("{}", msg),
748            AlertSeverity::Error | AlertSeverity::Critical => tracing::error!("{}", msg),
749        }
750    }
751
752    /// Get statistics.
753    pub fn stats(&self) -> AlertRouterStats {
754        AlertRouterStats {
755            total_alerts: self.alert_count.load(Ordering::Relaxed),
756            deduplicated: self.dedup_count.load(Ordering::Relaxed),
757            sinks: self.sinks.len(),
758        }
759    }
760}
761
762impl Default for AlertRouter {
763    fn default() -> Self {
764        Self::new()
765    }
766}
767
768/// Alert router statistics.
769#[derive(Debug, Clone)]
770pub struct AlertRouterStats {
771    /// Total alerts received.
772    pub total_alerts: u64,
773    /// Alerts deduplicated.
774    pub deduplicated: u64,
775    /// Number of sinks.
776    pub sinks: usize,
777}
778
779// ============================================================================
780// TESTS
781// ============================================================================
782
783#[cfg(test)]
784mod tests {
785    use super::*;
786
787    #[test]
788    fn test_alert_severity_ordering() {
789        assert!(AlertSeverity::Info < AlertSeverity::Warning);
790        assert!(AlertSeverity::Warning < AlertSeverity::Error);
791        assert!(AlertSeverity::Error < AlertSeverity::Critical);
792    }
793
794    #[test]
795    fn test_alert_creation() {
796        let alert = Alert::new(AlertSeverity::Warning, "Test alert")
797            .with_description("This is a test")
798            .with_source("test_kernel")
799            .with_metadata("key1", "value1")
800            .with_tag("test");
801
802        assert_eq!(alert.severity, AlertSeverity::Warning);
803        assert_eq!(alert.title, "Test alert");
804        assert_eq!(alert.description, Some("This is a test".to_string()));
805        assert_eq!(alert.source, Some("test_kernel".to_string()));
806        assert_eq!(alert.metadata.get("key1"), Some(&"value1".to_string()));
807        assert!(alert.tags.contains(&"test".to_string()));
808    }
809
810    #[test]
811    fn test_alert_json() {
812        let alert = Alert::new(AlertSeverity::Error, "Test").with_source("kernel_1");
813
814        let json = alert.to_json();
815        assert!(json.contains("ERROR"));
816        assert!(json.contains("Test"));
817        assert!(json.contains("kernel_1"));
818    }
819
820    #[tokio::test]
821    async fn test_in_memory_sink() {
822        let sink = InMemorySink::new(10);
823
824        let alert = Alert::new(AlertSeverity::Warning, "Test alert");
825        sink.send(&alert).await.unwrap();
826
827        assert_eq!(sink.count(), 1);
828        let alerts = sink.alerts();
829        assert_eq!(alerts[0].title, "Test alert");
830    }
831
832    #[tokio::test]
833    async fn test_in_memory_sink_severity_filter() {
834        let sink = InMemorySink::new(10).with_severity_filter(AlertSeverity::Error);
835
836        // Info alert should be filtered
837        let info = Alert::new(AlertSeverity::Info, "Info");
838        sink.send(&info).await.unwrap();
839        assert_eq!(sink.count(), 0);
840
841        // Error alert should pass
842        let error = Alert::new(AlertSeverity::Error, "Error");
843        sink.send(&error).await.unwrap();
844        assert_eq!(sink.count(), 1);
845    }
846
847    #[tokio::test]
848    async fn test_alert_router() {
849        let sink = Arc::new(InMemorySink::new(100));
850        let router = AlertRouter::new().with_sink_arc(sink.clone());
851
852        router
853            .send(Alert::new(AlertSeverity::Warning, "Alert 1"))
854            .await;
855        router
856            .send(Alert::new(AlertSeverity::Error, "Alert 2"))
857            .await;
858
859        assert_eq!(sink.count(), 2);
860        assert_eq!(router.stats().total_alerts, 2);
861    }
862
863    #[tokio::test]
864    async fn test_alert_deduplication() {
865        let sink = Arc::new(InMemorySink::new(100));
866        let router = AlertRouter::new()
867            .with_deduplication(DeduplicationConfig {
868                window: Duration::from_secs(60),
869                max_entries: 100,
870            })
871            .with_sink_arc(sink.clone());
872
873        // Send same alert multiple times with dedup key
874        for _ in 0..5 {
875            router
876                .send(
877                    Alert::new(AlertSeverity::Warning, "Repeated alert").with_dedup_key("same-key"),
878                )
879                .await;
880        }
881
882        // Only first should get through
883        assert_eq!(sink.count(), 1);
884        assert_eq!(router.stats().deduplicated, 4);
885    }
886
887    #[tokio::test]
888    async fn test_log_sink() {
889        let sink = LogSink::new().with_severity_filter(AlertSeverity::Warning);
890
891        let info = Alert::new(AlertSeverity::Info, "Info");
892        assert!(!sink.accepts_severity(info.severity));
893
894        let warning = Alert::new(AlertSeverity::Warning, "Warning");
895        assert!(sink.accepts_severity(warning.severity));
896
897        // Just verify it doesn't panic
898        sink.send(&warning).await.unwrap();
899    }
900}
901
902// ============================================================================
903// FR-014: Threshold-Based Alert Triggers
904// ============================================================================
905
906/// A threshold-based alert trigger that fires when a metric crosses a threshold.
907pub struct AlertTrigger {
908    /// Trigger name.
909    pub name: String,
910    /// Metric name to watch.
911    pub metric: String,
912    /// Threshold value.
913    pub threshold: f64,
914    /// Comparison operator.
915    pub operator: ThresholdOperator,
916    /// Severity when triggered.
917    pub severity: AlertSeverity,
918    /// Minimum duration the condition must hold before triggering.
919    pub hold_duration: Duration,
920    /// Cooldown after firing (suppress repeated alerts).
921    pub cooldown: Duration,
922    /// Last triggered timestamp.
923    last_triggered: Option<Instant>,
924    /// When the condition first became true.
925    condition_start: Option<Instant>,
926}
927
928/// Comparison operator for threshold triggers.
929#[derive(Debug, Clone, Copy, PartialEq, Eq)]
930pub enum ThresholdOperator {
931    /// Fire when value > threshold.
932    GreaterThan,
933    /// Fire when value >= threshold.
934    GreaterThanOrEqual,
935    /// Fire when value < threshold.
936    LessThan,
937    /// Fire when value <= threshold.
938    LessThanOrEqual,
939    /// Fire when value == threshold.
940    Equal,
941}
942
943impl AlertTrigger {
944    /// Create a new alert trigger.
945    pub fn new(
946        name: impl Into<String>,
947        metric: impl Into<String>,
948        operator: ThresholdOperator,
949        threshold: f64,
950        severity: AlertSeverity,
951    ) -> Self {
952        Self {
953            name: name.into(),
954            metric: metric.into(),
955            threshold,
956            operator,
957            severity,
958            hold_duration: Duration::from_secs(0),
959            cooldown: Duration::from_secs(60),
960            last_triggered: None,
961            condition_start: None,
962        }
963    }
964
965    /// Set the hold duration (condition must persist this long before triggering).
966    pub fn with_hold_duration(mut self, duration: Duration) -> Self {
967        self.hold_duration = duration;
968        self
969    }
970
971    /// Set the cooldown period after triggering.
972    pub fn with_cooldown(mut self, duration: Duration) -> Self {
973        self.cooldown = duration;
974        self
975    }
976
977    /// Evaluate the trigger against a metric value.
978    ///
979    /// Returns Some(Alert) if the trigger should fire, None otherwise.
980    pub fn evaluate(&mut self, value: f64) -> Option<Alert> {
981        let condition_met = match self.operator {
982            ThresholdOperator::GreaterThan => value > self.threshold,
983            ThresholdOperator::GreaterThanOrEqual => value >= self.threshold,
984            ThresholdOperator::LessThan => value < self.threshold,
985            ThresholdOperator::LessThanOrEqual => value <= self.threshold,
986            ThresholdOperator::Equal => (value - self.threshold).abs() < f64::EPSILON,
987        };
988
989        if !condition_met {
990            self.condition_start = None;
991            return None;
992        }
993
994        // Track when condition first became true
995        let now = Instant::now();
996        if self.condition_start.is_none() {
997            self.condition_start = Some(now);
998        }
999
1000        // Check hold duration
1001        if let Some(start) = self.condition_start {
1002            if now.duration_since(start) < self.hold_duration {
1003                return None; // Condition hasn't held long enough
1004            }
1005        }
1006
1007        // Check cooldown
1008        if let Some(last) = self.last_triggered {
1009            if now.duration_since(last) < self.cooldown {
1010                return None; // Still in cooldown
1011            }
1012        }
1013
1014        // Fire!
1015        self.last_triggered = Some(now);
1016        self.condition_start = None;
1017
1018        Some(Alert::new(
1019            self.severity,
1020            format!(
1021                "[{}] {} {} {:.2} (threshold: {:.2})",
1022                self.name,
1023                self.metric,
1024                operator_symbol(self.operator),
1025                value,
1026                self.threshold
1027            ),
1028        ))
1029    }
1030}
1031
1032fn operator_symbol(op: ThresholdOperator) -> &'static str {
1033    match op {
1034        ThresholdOperator::GreaterThan => ">",
1035        ThresholdOperator::GreaterThanOrEqual => ">=",
1036        ThresholdOperator::LessThan => "<",
1037        ThresholdOperator::LessThanOrEqual => "<=",
1038        ThresholdOperator::Equal => "==",
1039    }
1040}
1041
1042/// Alert routing rule: route alerts to specific sinks based on criteria.
1043#[derive(Debug, Clone)]
1044pub struct AlertRoutingRule {
1045    /// Rule name.
1046    pub name: String,
1047    /// Match alerts with this severity or higher.
1048    pub min_severity: AlertSeverity,
1049    /// Match alerts containing this text (empty = match all).
1050    pub message_contains: String,
1051    /// Sink indices to route to.
1052    pub sink_indices: Vec<usize>,
1053}
1054
1055impl AlertRoutingRule {
1056    /// Create a new routing rule.
1057    pub fn new(name: impl Into<String>, min_severity: AlertSeverity) -> Self {
1058        Self {
1059            name: name.into(),
1060            min_severity,
1061            message_contains: String::new(),
1062            sink_indices: Vec::new(),
1063        }
1064    }
1065
1066    /// Match alerts containing specific text.
1067    pub fn with_message_filter(mut self, contains: impl Into<String>) -> Self {
1068        self.message_contains = contains.into();
1069        self
1070    }
1071
1072    /// Route to specific sink indices.
1073    pub fn route_to(mut self, indices: Vec<usize>) -> Self {
1074        self.sink_indices = indices;
1075        self
1076    }
1077
1078    /// Check if an alert matches this rule.
1079    pub fn matches(&self, alert: &Alert) -> bool {
1080        if (alert.severity as u32) < (self.min_severity as u32) {
1081            return false;
1082        }
1083        if !self.message_contains.is_empty() && !alert.title.contains(&self.message_contains) {
1084            return false;
1085        }
1086        true
1087    }
1088}
1089
1090#[cfg(test)]
1091mod trigger_tests {
1092    use super::*;
1093
1094    #[test]
1095    fn test_threshold_trigger_fires() {
1096        let mut trigger = AlertTrigger::new(
1097            "high_latency",
1098            "p99_latency_ms",
1099            ThresholdOperator::GreaterThan,
1100            100.0,
1101            AlertSeverity::Warning,
1102        )
1103        .with_cooldown(Duration::from_millis(0)); // No cooldown for testing
1104
1105        // Below threshold: no alert
1106        assert!(trigger.evaluate(50.0).is_none());
1107
1108        // Above threshold: fires
1109        let alert = trigger.evaluate(150.0);
1110        assert!(alert.is_some());
1111        assert_eq!(alert.unwrap().severity, AlertSeverity::Warning);
1112    }
1113
1114    #[test]
1115    fn test_threshold_trigger_cooldown() {
1116        let mut trigger = AlertTrigger::new(
1117            "test",
1118            "metric",
1119            ThresholdOperator::GreaterThan,
1120            10.0,
1121            AlertSeverity::Critical,
1122        )
1123        .with_cooldown(Duration::from_secs(60));
1124
1125        // First fire: OK
1126        assert!(trigger.evaluate(20.0).is_some());
1127
1128        // Second fire: suppressed by cooldown
1129        assert!(trigger.evaluate(20.0).is_none());
1130    }
1131
1132    #[test]
1133    fn test_routing_rule_matches() {
1134        let rule = AlertRoutingRule::new("oncall", AlertSeverity::Critical)
1135            .with_message_filter("OOM")
1136            .route_to(vec![0]);
1137
1138        let critical_oom = Alert::new(AlertSeverity::Critical, "GPU OOM detected");
1139        assert!(rule.matches(&critical_oom));
1140
1141        let warning = Alert::new(AlertSeverity::Warning, "GPU OOM warning");
1142        assert!(!rule.matches(&warning)); // Below severity
1143
1144        let critical_other = Alert::new(AlertSeverity::Critical, "High latency");
1145        assert!(!rule.matches(&critical_other)); // Doesn't contain "OOM"
1146    }
1147
1148    #[test]
1149    fn test_all_operators() {
1150        let test = |op, val, thresh| {
1151            let mut t = AlertTrigger::new("t", "m", op, thresh, AlertSeverity::Info)
1152                .with_cooldown(Duration::from_millis(0));
1153            t.evaluate(val).is_some()
1154        };
1155
1156        assert!(test(ThresholdOperator::GreaterThan, 10.0, 5.0));
1157        assert!(!test(ThresholdOperator::GreaterThan, 5.0, 10.0));
1158        assert!(test(ThresholdOperator::LessThan, 5.0, 10.0));
1159        assert!(test(ThresholdOperator::GreaterThanOrEqual, 10.0, 10.0));
1160        assert!(test(ThresholdOperator::LessThanOrEqual, 10.0, 10.0));
1161    }
1162}