1use async_trait::async_trait;
29use parking_lot::RwLock;
30use std::collections::HashMap;
31use std::fmt;
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::sync::Arc;
34use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
42#[repr(u8)]
43pub enum AlertSeverity {
44 Info = 0,
46 Warning = 1,
48 Error = 2,
50 Critical = 3,
52}
53
54impl AlertSeverity {
55 pub fn as_str(&self) -> &'static str {
57 match self {
58 Self::Info => "INFO",
59 Self::Warning => "WARNING",
60 Self::Error => "ERROR",
61 Self::Critical => "CRITICAL",
62 }
63 }
64
65 pub fn color(&self) -> &'static str {
67 match self {
68 Self::Info => "#2196F3", Self::Warning => "#FF9800", Self::Error => "#F44336", Self::Critical => "#9C27B0", }
73 }
74}
75
76impl fmt::Display for AlertSeverity {
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 write!(f, "{}", self.as_str())
79 }
80}
81
82#[derive(Debug, Clone)]
88pub struct Alert {
89 pub id: u64,
91 pub severity: AlertSeverity,
93 pub title: String,
95 pub description: Option<String>,
97 pub source: Option<String>,
99 pub timestamp: SystemTime,
101 pub dedup_key: Option<String>,
103 pub metadata: HashMap<String, String>,
105 pub tags: Vec<String>,
107}
108
109impl Alert {
110 pub fn new(severity: AlertSeverity, title: impl Into<String>) -> Self {
112 static ALERT_ID: AtomicU64 = AtomicU64::new(1);
113 Self {
114 id: ALERT_ID.fetch_add(1, Ordering::Relaxed),
115 severity,
116 title: title.into(),
117 description: None,
118 source: None,
119 timestamp: SystemTime::now(),
120 dedup_key: None,
121 metadata: HashMap::new(),
122 tags: Vec::new(),
123 }
124 }
125
126 pub fn with_description(mut self, description: impl Into<String>) -> Self {
128 self.description = Some(description.into());
129 self
130 }
131
132 pub fn with_source(mut self, source: impl Into<String>) -> Self {
134 self.source = Some(source.into());
135 self
136 }
137
138 pub fn with_dedup_key(mut self, key: impl Into<String>) -> Self {
140 self.dedup_key = Some(key.into());
141 self
142 }
143
144 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
146 self.metadata.insert(key.into(), value.into());
147 self
148 }
149
150 pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
152 self.tags.push(tag.into());
153 self
154 }
155
156 pub fn with_tags<I, S>(mut self, tags: I) -> Self
158 where
159 I: IntoIterator<Item = S>,
160 S: Into<String>,
161 {
162 self.tags.extend(tags.into_iter().map(Into::into));
163 self
164 }
165
166 pub fn timestamp_millis(&self) -> u64 {
168 self.timestamp
169 .duration_since(UNIX_EPOCH)
170 .unwrap_or_default()
171 .as_millis() as u64
172 }
173
174 pub fn to_json(&self) -> String {
176 let source_str = self
177 .source
178 .as_ref()
179 .map(|s| format!(r#","source":"{}""#, escape_json(s)))
180 .unwrap_or_default();
181
182 let desc_str = self
183 .description
184 .as_ref()
185 .map(|s| format!(r#","description":"{}""#, escape_json(s)))
186 .unwrap_or_default();
187
188 let metadata_str = if self.metadata.is_empty() {
189 String::new()
190 } else {
191 let pairs: Vec<String> = self
192 .metadata
193 .iter()
194 .map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
195 .collect();
196 format!(r#","metadata":{{{}}}"#, pairs.join(","))
197 };
198
199 let tags_str = if self.tags.is_empty() {
200 String::new()
201 } else {
202 let tags: Vec<String> = self
203 .tags
204 .iter()
205 .map(|t| format!(r#""{}""#, escape_json(t)))
206 .collect();
207 format!(r#","tags":[{}]"#, tags.join(","))
208 };
209
210 format!(
211 r#"{{"id":{},"severity":"{}","title":"{}","timestamp":{}{}{}{}{}}}"#,
212 self.id,
213 self.severity.as_str(),
214 escape_json(&self.title),
215 self.timestamp_millis(),
216 source_str,
217 desc_str,
218 metadata_str,
219 tags_str,
220 )
221 }
222}
223
224fn escape_json(s: &str) -> String {
226 s.replace('\\', "\\\\")
227 .replace('"', "\\\"")
228 .replace('\n', "\\n")
229 .replace('\r', "\\r")
230 .replace('\t', "\\t")
231}
232
233#[derive(Debug, Clone)]
239pub enum AlertSinkError {
240 ConnectionError(String),
242 RateLimited(String),
244 ConfigError(String),
246 Other(String),
248}
249
250impl fmt::Display for AlertSinkError {
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252 match self {
253 Self::ConnectionError(msg) => write!(f, "Connection error: {}", msg),
254 Self::RateLimited(msg) => write!(f, "Rate limited: {}", msg),
255 Self::ConfigError(msg) => write!(f, "Config error: {}", msg),
256 Self::Other(msg) => write!(f, "Alert sink error: {}", msg),
257 }
258 }
259}
260
261impl std::error::Error for AlertSinkError {}
262
263pub type AlertSinkResult<T> = Result<T, AlertSinkError>;
265
266#[async_trait]
268pub trait AlertSink: Send + Sync {
269 async fn send(&self, alert: &Alert) -> AlertSinkResult<()>;
271
272 fn sink_name(&self) -> &str;
274
275 fn accepts_severity(&self, severity: AlertSeverity) -> bool;
277
278 async fn health_check(&self) -> AlertSinkResult<()>;
280}
281
282pub struct LogSink {
288 min_severity: AlertSeverity,
290}
291
292impl LogSink {
293 pub fn new() -> Self {
295 Self {
296 min_severity: AlertSeverity::Info,
297 }
298 }
299
300 pub fn with_severity_filter(mut self, min: AlertSeverity) -> Self {
302 self.min_severity = min;
303 self
304 }
305}
306
307impl Default for LogSink {
308 fn default() -> Self {
309 Self::new()
310 }
311}
312
313#[async_trait]
314impl AlertSink for LogSink {
315 async fn send(&self, alert: &Alert) -> AlertSinkResult<()> {
316 if alert.severity < self.min_severity {
317 return Ok(());
318 }
319
320 let source = alert.source.as_deref().unwrap_or("unknown");
321 let msg = format!(
322 "[ALERT] {} | {} | {} | {}",
323 alert.severity,
324 source,
325 alert.title,
326 alert.description.as_deref().unwrap_or("")
327 );
328
329 match alert.severity {
330 AlertSeverity::Info => tracing::info!("{}", msg),
331 AlertSeverity::Warning => tracing::warn!("{}", msg),
332 AlertSeverity::Error => tracing::error!("{}", msg),
333 AlertSeverity::Critical => tracing::error!(target: "critical", "{}", msg),
334 }
335
336 Ok(())
337 }
338
339 fn sink_name(&self) -> &str {
340 "LogSink"
341 }
342
343 fn accepts_severity(&self, severity: AlertSeverity) -> bool {
344 severity >= self.min_severity
345 }
346
347 async fn health_check(&self) -> AlertSinkResult<()> {
348 Ok(())
349 }
350}
351
352pub struct InMemorySink {
358 alerts: RwLock<Vec<Alert>>,
359 max_alerts: usize,
360 min_severity: AlertSeverity,
361}
362
363impl InMemorySink {
364 pub fn new(max_alerts: usize) -> Self {
366 Self {
367 alerts: RwLock::new(Vec::new()),
368 max_alerts,
369 min_severity: AlertSeverity::Info,
370 }
371 }
372
373 pub fn with_severity_filter(mut self, min: AlertSeverity) -> Self {
375 self.min_severity = min;
376 self
377 }
378
379 pub fn alerts(&self) -> Vec<Alert> {
381 self.alerts.read().clone()
382 }
383
384 pub fn count(&self) -> usize {
386 self.alerts.read().len()
387 }
388
389 pub fn clear(&self) {
391 self.alerts.write().clear();
392 }
393}
394
395#[async_trait]
396impl AlertSink for InMemorySink {
397 async fn send(&self, alert: &Alert) -> AlertSinkResult<()> {
398 if alert.severity < self.min_severity {
399 return Ok(());
400 }
401
402 let mut alerts = self.alerts.write();
403 if alerts.len() >= self.max_alerts {
404 alerts.remove(0);
405 }
406 alerts.push(alert.clone());
407 Ok(())
408 }
409
410 fn sink_name(&self) -> &str {
411 "InMemorySink"
412 }
413
414 fn accepts_severity(&self, severity: AlertSeverity) -> bool {
415 severity >= self.min_severity
416 }
417
418 async fn health_check(&self) -> AlertSinkResult<()> {
419 Ok(())
420 }
421}
422
423#[cfg(feature = "alerting")]
429pub struct WebhookSink {
430 url: String,
432 min_severity: AlertSeverity,
434 headers: HashMap<String, String>,
436 format: WebhookFormat,
438 client: reqwest::Client,
440 timeout: Duration,
442}
443
444#[cfg(feature = "alerting")]
446#[derive(Debug, Clone, Copy, PartialEq, Eq)]
447pub enum WebhookFormat {
448 Json,
450 Slack,
452 MsTeams,
454 PagerDuty,
456}
457
458#[cfg(feature = "alerting")]
459impl WebhookSink {
460 pub fn new(url: impl Into<String>) -> Self {
462 Self {
463 url: url.into(),
464 min_severity: AlertSeverity::Info,
465 headers: HashMap::new(),
466 format: WebhookFormat::Json,
467 client: reqwest::Client::new(),
468 timeout: Duration::from_secs(10),
469 }
470 }
471
472 pub fn slack(url: impl Into<String>) -> Self {
474 Self::new(url).with_format(WebhookFormat::Slack)
475 }
476
477 pub fn teams(url: impl Into<String>) -> Self {
479 Self::new(url).with_format(WebhookFormat::MsTeams)
480 }
481
482 pub fn pagerduty(routing_key: impl Into<String>) -> Self {
484 Self::new("https://events.pagerduty.com/v2/enqueue")
485 .with_format(WebhookFormat::PagerDuty)
486 .with_header("X-Routing-Key", routing_key.into())
487 }
488
489 pub fn with_severity_filter(mut self, min: AlertSeverity) -> Self {
491 self.min_severity = min;
492 self
493 }
494
495 pub fn with_format(mut self, format: WebhookFormat) -> Self {
497 self.format = format;
498 self
499 }
500
501 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
503 self.headers.insert(key.into(), value.into());
504 self
505 }
506
507 pub fn with_timeout(mut self, timeout: Duration) -> Self {
509 self.timeout = timeout;
510 self
511 }
512
513 fn format_payload(&self, alert: &Alert) -> String {
515 match self.format {
516 WebhookFormat::Json => alert.to_json(),
517 WebhookFormat::Slack => self.format_slack(alert),
518 WebhookFormat::MsTeams => self.format_teams(alert),
519 WebhookFormat::PagerDuty => self.format_pagerduty(alert),
520 }
521 }
522
523 fn format_slack(&self, alert: &Alert) -> String {
524 let desc = alert.description.as_deref().unwrap_or("");
525 let source = alert.source.as_deref().unwrap_or("unknown");
526
527 format!(
528 r#"{{"attachments":[{{"color":"{}","title":"[{}] {}","text":"{}","fields":[{{"title":"Source","value":"{}","short":true}},{{"title":"Time","value":"{}","short":true}}]}}]}}"#,
529 alert.severity.color(),
530 alert.severity.as_str(),
531 escape_json(&alert.title),
532 escape_json(desc),
533 escape_json(source),
534 alert.timestamp_millis(),
535 )
536 }
537
538 fn format_teams(&self, alert: &Alert) -> String {
539 let desc = alert.description.as_deref().unwrap_or("");
540 let source = alert.source.as_deref().unwrap_or("unknown");
541
542 format!(
543 r#"{{"@type":"MessageCard","@context":"http://schema.org/extensions","themeColor":"{}","title":"[{}] {}","text":"{}","sections":[{{"facts":[{{"name":"Source","value":"{}"}},{{"name":"Severity","value":"{}"}}]}}]}}"#,
544 alert.severity.color().trim_start_matches('#'),
545 alert.severity.as_str(),
546 escape_json(&alert.title),
547 escape_json(desc),
548 escape_json(source),
549 alert.severity.as_str(),
550 )
551 }
552
553 fn format_pagerduty(&self, alert: &Alert) -> String {
554 let severity_pd = match alert.severity {
555 AlertSeverity::Info => "info",
556 AlertSeverity::Warning => "warning",
557 AlertSeverity::Error => "error",
558 AlertSeverity::Critical => "critical",
559 };
560
561 let dedup = alert.dedup_key.as_deref().unwrap_or(&alert.title);
562 let source = alert.source.as_deref().unwrap_or("ringkernel");
563
564 format!(
565 r#"{{"routing_key":"{}","event_action":"trigger","dedup_key":"{}","payload":{{"summary":"{}","source":"{}","severity":"{}","timestamp":"{}"}}}}"#,
566 self.headers.get("X-Routing-Key").unwrap_or(&String::new()),
567 escape_json(dedup),
568 escape_json(&alert.title),
569 escape_json(source),
570 severity_pd,
571 chrono_timestamp(alert.timestamp),
572 )
573 }
574}
575
576#[cfg(feature = "alerting")]
577fn chrono_timestamp(time: SystemTime) -> String {
578 let millis = time
579 .duration_since(UNIX_EPOCH)
580 .unwrap_or_default()
581 .as_millis();
582 format!("{}Z", millis / 1000)
584}
585
586#[cfg(feature = "alerting")]
587#[async_trait]
588impl AlertSink for WebhookSink {
589 async fn send(&self, alert: &Alert) -> AlertSinkResult<()> {
590 if alert.severity < self.min_severity {
591 return Ok(());
592 }
593
594 let payload = self.format_payload(alert);
595 let mut request = self
596 .client
597 .post(&self.url)
598 .header("Content-Type", "application/json")
599 .body(payload)
600 .timeout(self.timeout);
601
602 for (key, value) in &self.headers {
603 request = request.header(key, value);
604 }
605
606 let response = request.send().await.map_err(|e| {
607 AlertSinkError::ConnectionError(format!("Webhook request failed: {}", e))
608 })?;
609
610 if response.status().is_success() {
611 Ok(())
612 } else {
613 Err(AlertSinkError::Other(format!(
614 "Webhook returned status: {}",
615 response.status()
616 )))
617 }
618 }
619
620 fn sink_name(&self) -> &str {
621 "WebhookSink"
622 }
623
624 fn accepts_severity(&self, severity: AlertSeverity) -> bool {
625 severity >= self.min_severity
626 }
627
628 async fn health_check(&self) -> AlertSinkResult<()> {
629 Ok(())
631 }
632}
633
634#[derive(Debug, Clone)]
640pub struct DeduplicationConfig {
641 pub window: Duration,
643 pub max_entries: usize,
645}
646
647impl Default for DeduplicationConfig {
648 fn default() -> Self {
649 Self {
650 window: Duration::from_secs(300), max_entries: 10000,
652 }
653 }
654}
655
656pub struct AlertRouter {
658 sinks: Vec<Arc<dyn AlertSink>>,
660 dedup_cache: RwLock<HashMap<String, Instant>>,
662 dedup_config: DeduplicationConfig,
664 alert_count: AtomicU64,
666 dedup_count: AtomicU64,
668}
669
670impl AlertRouter {
671 pub fn new() -> Self {
673 Self {
674 sinks: Vec::new(),
675 dedup_cache: RwLock::new(HashMap::new()),
676 dedup_config: DeduplicationConfig::default(),
677 alert_count: AtomicU64::new(0),
678 dedup_count: AtomicU64::new(0),
679 }
680 }
681
682 pub fn with_deduplication(mut self, config: DeduplicationConfig) -> Self {
684 self.dedup_config = config;
685 self
686 }
687
688 pub fn with_sink<S: AlertSink + 'static>(mut self, sink: S) -> Self {
690 self.sinks.push(Arc::new(sink));
691 self
692 }
693
694 pub fn with_sink_arc(mut self, sink: Arc<dyn AlertSink>) -> Self {
696 self.sinks.push(sink);
697 self
698 }
699
700 pub async fn send(&self, alert: Alert) {
702 self.alert_count.fetch_add(1, Ordering::Relaxed);
703
704 if let Some(dedup_key) = &alert.dedup_key {
706 let now = Instant::now();
707 let mut cache = self.dedup_cache.write();
708
709 cache.retain(|_, instant| now.duration_since(*instant) < self.dedup_config.window);
711
712 if let Some(last_seen) = cache.get(dedup_key) {
714 if now.duration_since(*last_seen) < self.dedup_config.window {
715 self.dedup_count.fetch_add(1, Ordering::Relaxed);
716 return; }
718 }
719
720 if cache.len() < self.dedup_config.max_entries {
722 cache.insert(dedup_key.clone(), now);
723 }
724 }
725
726 for sink in &self.sinks {
728 if sink.accepts_severity(alert.severity) {
729 if let Err(e) = sink.send(&alert).await {
730 tracing::error!("Alert sink {} failed: {}", sink.sink_name(), e);
731 }
732 }
733 }
734 }
735
736 pub fn send_sync(&self, alert: Alert) {
738 self.alert_count.fetch_add(1, Ordering::Relaxed);
741
742 let source = alert.source.as_deref().unwrap_or("unknown");
743 let msg = format!("[ALERT] {} | {} | {}", alert.severity, source, alert.title);
744
745 match alert.severity {
746 AlertSeverity::Info => tracing::info!("{}", msg),
747 AlertSeverity::Warning => tracing::warn!("{}", msg),
748 AlertSeverity::Error | AlertSeverity::Critical => tracing::error!("{}", msg),
749 }
750 }
751
752 pub fn stats(&self) -> AlertRouterStats {
754 AlertRouterStats {
755 total_alerts: self.alert_count.load(Ordering::Relaxed),
756 deduplicated: self.dedup_count.load(Ordering::Relaxed),
757 sinks: self.sinks.len(),
758 }
759 }
760}
761
762impl Default for AlertRouter {
763 fn default() -> Self {
764 Self::new()
765 }
766}
767
768#[derive(Debug, Clone)]
770pub struct AlertRouterStats {
771 pub total_alerts: u64,
773 pub deduplicated: u64,
775 pub sinks: usize,
777}
778
779#[cfg(test)]
784mod tests {
785 use super::*;
786
787 #[test]
788 fn test_alert_severity_ordering() {
789 assert!(AlertSeverity::Info < AlertSeverity::Warning);
790 assert!(AlertSeverity::Warning < AlertSeverity::Error);
791 assert!(AlertSeverity::Error < AlertSeverity::Critical);
792 }
793
794 #[test]
795 fn test_alert_creation() {
796 let alert = Alert::new(AlertSeverity::Warning, "Test alert")
797 .with_description("This is a test")
798 .with_source("test_kernel")
799 .with_metadata("key1", "value1")
800 .with_tag("test");
801
802 assert_eq!(alert.severity, AlertSeverity::Warning);
803 assert_eq!(alert.title, "Test alert");
804 assert_eq!(alert.description, Some("This is a test".to_string()));
805 assert_eq!(alert.source, Some("test_kernel".to_string()));
806 assert_eq!(alert.metadata.get("key1"), Some(&"value1".to_string()));
807 assert!(alert.tags.contains(&"test".to_string()));
808 }
809
810 #[test]
811 fn test_alert_json() {
812 let alert = Alert::new(AlertSeverity::Error, "Test").with_source("kernel_1");
813
814 let json = alert.to_json();
815 assert!(json.contains("ERROR"));
816 assert!(json.contains("Test"));
817 assert!(json.contains("kernel_1"));
818 }
819
820 #[tokio::test]
821 async fn test_in_memory_sink() {
822 let sink = InMemorySink::new(10);
823
824 let alert = Alert::new(AlertSeverity::Warning, "Test alert");
825 sink.send(&alert).await.unwrap();
826
827 assert_eq!(sink.count(), 1);
828 let alerts = sink.alerts();
829 assert_eq!(alerts[0].title, "Test alert");
830 }
831
832 #[tokio::test]
833 async fn test_in_memory_sink_severity_filter() {
834 let sink = InMemorySink::new(10).with_severity_filter(AlertSeverity::Error);
835
836 let info = Alert::new(AlertSeverity::Info, "Info");
838 sink.send(&info).await.unwrap();
839 assert_eq!(sink.count(), 0);
840
841 let error = Alert::new(AlertSeverity::Error, "Error");
843 sink.send(&error).await.unwrap();
844 assert_eq!(sink.count(), 1);
845 }
846
847 #[tokio::test]
848 async fn test_alert_router() {
849 let sink = Arc::new(InMemorySink::new(100));
850 let router = AlertRouter::new().with_sink_arc(sink.clone());
851
852 router
853 .send(Alert::new(AlertSeverity::Warning, "Alert 1"))
854 .await;
855 router
856 .send(Alert::new(AlertSeverity::Error, "Alert 2"))
857 .await;
858
859 assert_eq!(sink.count(), 2);
860 assert_eq!(router.stats().total_alerts, 2);
861 }
862
863 #[tokio::test]
864 async fn test_alert_deduplication() {
865 let sink = Arc::new(InMemorySink::new(100));
866 let router = AlertRouter::new()
867 .with_deduplication(DeduplicationConfig {
868 window: Duration::from_secs(60),
869 max_entries: 100,
870 })
871 .with_sink_arc(sink.clone());
872
873 for _ in 0..5 {
875 router
876 .send(
877 Alert::new(AlertSeverity::Warning, "Repeated alert").with_dedup_key("same-key"),
878 )
879 .await;
880 }
881
882 assert_eq!(sink.count(), 1);
884 assert_eq!(router.stats().deduplicated, 4);
885 }
886
887 #[tokio::test]
888 async fn test_log_sink() {
889 let sink = LogSink::new().with_severity_filter(AlertSeverity::Warning);
890
891 let info = Alert::new(AlertSeverity::Info, "Info");
892 assert!(!sink.accepts_severity(info.severity));
893
894 let warning = Alert::new(AlertSeverity::Warning, "Warning");
895 assert!(sink.accepts_severity(warning.severity));
896
897 sink.send(&warning).await.unwrap();
899 }
900}
901
902pub struct AlertTrigger {
908 pub name: String,
910 pub metric: String,
912 pub threshold: f64,
914 pub operator: ThresholdOperator,
916 pub severity: AlertSeverity,
918 pub hold_duration: Duration,
920 pub cooldown: Duration,
922 last_triggered: Option<Instant>,
924 condition_start: Option<Instant>,
926}
927
928#[derive(Debug, Clone, Copy, PartialEq, Eq)]
930pub enum ThresholdOperator {
931 GreaterThan,
933 GreaterThanOrEqual,
935 LessThan,
937 LessThanOrEqual,
939 Equal,
941}
942
943impl AlertTrigger {
944 pub fn new(
946 name: impl Into<String>,
947 metric: impl Into<String>,
948 operator: ThresholdOperator,
949 threshold: f64,
950 severity: AlertSeverity,
951 ) -> Self {
952 Self {
953 name: name.into(),
954 metric: metric.into(),
955 threshold,
956 operator,
957 severity,
958 hold_duration: Duration::from_secs(0),
959 cooldown: Duration::from_secs(60),
960 last_triggered: None,
961 condition_start: None,
962 }
963 }
964
965 pub fn with_hold_duration(mut self, duration: Duration) -> Self {
967 self.hold_duration = duration;
968 self
969 }
970
971 pub fn with_cooldown(mut self, duration: Duration) -> Self {
973 self.cooldown = duration;
974 self
975 }
976
977 pub fn evaluate(&mut self, value: f64) -> Option<Alert> {
981 let condition_met = match self.operator {
982 ThresholdOperator::GreaterThan => value > self.threshold,
983 ThresholdOperator::GreaterThanOrEqual => value >= self.threshold,
984 ThresholdOperator::LessThan => value < self.threshold,
985 ThresholdOperator::LessThanOrEqual => value <= self.threshold,
986 ThresholdOperator::Equal => (value - self.threshold).abs() < f64::EPSILON,
987 };
988
989 if !condition_met {
990 self.condition_start = None;
991 return None;
992 }
993
994 let now = Instant::now();
996 if self.condition_start.is_none() {
997 self.condition_start = Some(now);
998 }
999
1000 if let Some(start) = self.condition_start {
1002 if now.duration_since(start) < self.hold_duration {
1003 return None; }
1005 }
1006
1007 if let Some(last) = self.last_triggered {
1009 if now.duration_since(last) < self.cooldown {
1010 return None; }
1012 }
1013
1014 self.last_triggered = Some(now);
1016 self.condition_start = None;
1017
1018 Some(Alert::new(
1019 self.severity,
1020 format!(
1021 "[{}] {} {} {:.2} (threshold: {:.2})",
1022 self.name,
1023 self.metric,
1024 operator_symbol(self.operator),
1025 value,
1026 self.threshold
1027 ),
1028 ))
1029 }
1030}
1031
1032fn operator_symbol(op: ThresholdOperator) -> &'static str {
1033 match op {
1034 ThresholdOperator::GreaterThan => ">",
1035 ThresholdOperator::GreaterThanOrEqual => ">=",
1036 ThresholdOperator::LessThan => "<",
1037 ThresholdOperator::LessThanOrEqual => "<=",
1038 ThresholdOperator::Equal => "==",
1039 }
1040}
1041
1042#[derive(Debug, Clone)]
1044pub struct AlertRoutingRule {
1045 pub name: String,
1047 pub min_severity: AlertSeverity,
1049 pub message_contains: String,
1051 pub sink_indices: Vec<usize>,
1053}
1054
1055impl AlertRoutingRule {
1056 pub fn new(name: impl Into<String>, min_severity: AlertSeverity) -> Self {
1058 Self {
1059 name: name.into(),
1060 min_severity,
1061 message_contains: String::new(),
1062 sink_indices: Vec::new(),
1063 }
1064 }
1065
1066 pub fn with_message_filter(mut self, contains: impl Into<String>) -> Self {
1068 self.message_contains = contains.into();
1069 self
1070 }
1071
1072 pub fn route_to(mut self, indices: Vec<usize>) -> Self {
1074 self.sink_indices = indices;
1075 self
1076 }
1077
1078 pub fn matches(&self, alert: &Alert) -> bool {
1080 if (alert.severity as u32) < (self.min_severity as u32) {
1081 return false;
1082 }
1083 if !self.message_contains.is_empty() && !alert.title.contains(&self.message_contains) {
1084 return false;
1085 }
1086 true
1087 }
1088}
1089
1090#[cfg(test)]
1091mod trigger_tests {
1092 use super::*;
1093
1094 #[test]
1095 fn test_threshold_trigger_fires() {
1096 let mut trigger = AlertTrigger::new(
1097 "high_latency",
1098 "p99_latency_ms",
1099 ThresholdOperator::GreaterThan,
1100 100.0,
1101 AlertSeverity::Warning,
1102 )
1103 .with_cooldown(Duration::from_millis(0)); assert!(trigger.evaluate(50.0).is_none());
1107
1108 let alert = trigger.evaluate(150.0);
1110 assert!(alert.is_some());
1111 assert_eq!(alert.unwrap().severity, AlertSeverity::Warning);
1112 }
1113
1114 #[test]
1115 fn test_threshold_trigger_cooldown() {
1116 let mut trigger = AlertTrigger::new(
1117 "test",
1118 "metric",
1119 ThresholdOperator::GreaterThan,
1120 10.0,
1121 AlertSeverity::Critical,
1122 )
1123 .with_cooldown(Duration::from_secs(60));
1124
1125 assert!(trigger.evaluate(20.0).is_some());
1127
1128 assert!(trigger.evaluate(20.0).is_none());
1130 }
1131
1132 #[test]
1133 fn test_routing_rule_matches() {
1134 let rule = AlertRoutingRule::new("oncall", AlertSeverity::Critical)
1135 .with_message_filter("OOM")
1136 .route_to(vec![0]);
1137
1138 let critical_oom = Alert::new(AlertSeverity::Critical, "GPU OOM detected");
1139 assert!(rule.matches(&critical_oom));
1140
1141 let warning = Alert::new(AlertSeverity::Warning, "GPU OOM warning");
1142 assert!(!rule.matches(&warning)); let critical_other = Alert::new(AlertSeverity::Critical, "High latency");
1145 assert!(!rule.matches(&critical_other)); }
1147
1148 #[test]
1149 fn test_all_operators() {
1150 let test = |op, val, thresh| {
1151 let mut t = AlertTrigger::new("t", "m", op, thresh, AlertSeverity::Info)
1152 .with_cooldown(Duration::from_millis(0));
1153 t.evaluate(val).is_some()
1154 };
1155
1156 assert!(test(ThresholdOperator::GreaterThan, 10.0, 5.0));
1157 assert!(!test(ThresholdOperator::GreaterThan, 5.0, 10.0));
1158 assert!(test(ThresholdOperator::LessThan, 5.0, 10.0));
1159 assert!(test(ThresholdOperator::GreaterThanOrEqual, 10.0, 10.0));
1160 assert!(test(ThresholdOperator::LessThanOrEqual, 10.0, 10.0));
1161 }
1162}