1use async_trait::async_trait;
29use parking_lot::RwLock;
30use std::collections::HashMap;
31use std::fmt;
32use std::sync::atomic::{AtomicU64, Ordering};
33use std::sync::Arc;
34use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
42#[repr(u8)]
43pub enum AlertSeverity {
44 Info = 0,
46 Warning = 1,
48 Error = 2,
50 Critical = 3,
52}
53
54impl AlertSeverity {
55 pub fn as_str(&self) -> &'static str {
57 match self {
58 Self::Info => "INFO",
59 Self::Warning => "WARNING",
60 Self::Error => "ERROR",
61 Self::Critical => "CRITICAL",
62 }
63 }
64
65 pub fn color(&self) -> &'static str {
67 match self {
68 Self::Info => "#2196F3", Self::Warning => "#FF9800", Self::Error => "#F44336", Self::Critical => "#9C27B0", }
73 }
74}
75
76impl fmt::Display for AlertSeverity {
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 write!(f, "{}", self.as_str())
79 }
80}
81
82#[derive(Debug, Clone)]
88pub struct Alert {
89 pub id: u64,
91 pub severity: AlertSeverity,
93 pub title: String,
95 pub description: Option<String>,
97 pub source: Option<String>,
99 pub timestamp: SystemTime,
101 pub dedup_key: Option<String>,
103 pub metadata: HashMap<String, String>,
105 pub tags: Vec<String>,
107}
108
109impl Alert {
110 pub fn new(severity: AlertSeverity, title: impl Into<String>) -> Self {
112 static ALERT_ID: AtomicU64 = AtomicU64::new(1);
113 Self {
114 id: ALERT_ID.fetch_add(1, Ordering::Relaxed),
115 severity,
116 title: title.into(),
117 description: None,
118 source: None,
119 timestamp: SystemTime::now(),
120 dedup_key: None,
121 metadata: HashMap::new(),
122 tags: Vec::new(),
123 }
124 }
125
126 pub fn with_description(mut self, description: impl Into<String>) -> Self {
128 self.description = Some(description.into());
129 self
130 }
131
132 pub fn with_source(mut self, source: impl Into<String>) -> Self {
134 self.source = Some(source.into());
135 self
136 }
137
138 pub fn with_dedup_key(mut self, key: impl Into<String>) -> Self {
140 self.dedup_key = Some(key.into());
141 self
142 }
143
144 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
146 self.metadata.insert(key.into(), value.into());
147 self
148 }
149
150 pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
152 self.tags.push(tag.into());
153 self
154 }
155
156 pub fn with_tags<I, S>(mut self, tags: I) -> Self
158 where
159 I: IntoIterator<Item = S>,
160 S: Into<String>,
161 {
162 self.tags.extend(tags.into_iter().map(Into::into));
163 self
164 }
165
166 pub fn timestamp_millis(&self) -> u64 {
168 self.timestamp
169 .duration_since(UNIX_EPOCH)
170 .unwrap_or_default()
171 .as_millis() as u64
172 }
173
174 pub fn to_json(&self) -> String {
176 let source_str = self
177 .source
178 .as_ref()
179 .map(|s| format!(r#","source":"{}""#, escape_json(s)))
180 .unwrap_or_default();
181
182 let desc_str = self
183 .description
184 .as_ref()
185 .map(|s| format!(r#","description":"{}""#, escape_json(s)))
186 .unwrap_or_default();
187
188 let metadata_str = if self.metadata.is_empty() {
189 String::new()
190 } else {
191 let pairs: Vec<String> = self
192 .metadata
193 .iter()
194 .map(|(k, v)| format!(r#""{}":"{}""#, escape_json(k), escape_json(v)))
195 .collect();
196 format!(r#","metadata":{{{}}}"#, pairs.join(","))
197 };
198
199 let tags_str = if self.tags.is_empty() {
200 String::new()
201 } else {
202 let tags: Vec<String> = self
203 .tags
204 .iter()
205 .map(|t| format!(r#""{}""#, escape_json(t)))
206 .collect();
207 format!(r#","tags":[{}]"#, tags.join(","))
208 };
209
210 format!(
211 r#"{{"id":{},"severity":"{}","title":"{}","timestamp":{}{}{}{}{}}}"#,
212 self.id,
213 self.severity.as_str(),
214 escape_json(&self.title),
215 self.timestamp_millis(),
216 source_str,
217 desc_str,
218 metadata_str,
219 tags_str,
220 )
221 }
222}
223
224fn escape_json(s: &str) -> String {
226 s.replace('\\', "\\\\")
227 .replace('"', "\\\"")
228 .replace('\n', "\\n")
229 .replace('\r', "\\r")
230 .replace('\t', "\\t")
231}
232
233#[derive(Debug, Clone)]
239pub enum AlertSinkError {
240 ConnectionError(String),
242 RateLimited(String),
244 ConfigError(String),
246 Other(String),
248}
249
250impl fmt::Display for AlertSinkError {
251 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
252 match self {
253 Self::ConnectionError(msg) => write!(f, "Connection error: {}", msg),
254 Self::RateLimited(msg) => write!(f, "Rate limited: {}", msg),
255 Self::ConfigError(msg) => write!(f, "Config error: {}", msg),
256 Self::Other(msg) => write!(f, "Alert sink error: {}", msg),
257 }
258 }
259}
260
261impl std::error::Error for AlertSinkError {}
262
263pub type AlertSinkResult<T> = Result<T, AlertSinkError>;
265
266#[async_trait]
268pub trait AlertSink: Send + Sync {
269 async fn send(&self, alert: &Alert) -> AlertSinkResult<()>;
271
272 fn sink_name(&self) -> &str;
274
275 fn accepts_severity(&self, severity: AlertSeverity) -> bool;
277
278 async fn health_check(&self) -> AlertSinkResult<()>;
280}
281
282pub struct LogSink {
288 min_severity: AlertSeverity,
290}
291
292impl LogSink {
293 pub fn new() -> Self {
295 Self {
296 min_severity: AlertSeverity::Info,
297 }
298 }
299
300 pub fn with_severity_filter(mut self, min: AlertSeverity) -> Self {
302 self.min_severity = min;
303 self
304 }
305}
306
307impl Default for LogSink {
308 fn default() -> Self {
309 Self::new()
310 }
311}
312
313#[async_trait]
314impl AlertSink for LogSink {
315 async fn send(&self, alert: &Alert) -> AlertSinkResult<()> {
316 if alert.severity < self.min_severity {
317 return Ok(());
318 }
319
320 let source = alert.source.as_deref().unwrap_or("unknown");
321 let msg = format!(
322 "[ALERT] {} | {} | {} | {}",
323 alert.severity,
324 source,
325 alert.title,
326 alert.description.as_deref().unwrap_or("")
327 );
328
329 match alert.severity {
330 AlertSeverity::Info => tracing::info!("{}", msg),
331 AlertSeverity::Warning => tracing::warn!("{}", msg),
332 AlertSeverity::Error => tracing::error!("{}", msg),
333 AlertSeverity::Critical => tracing::error!(target: "critical", "{}", msg),
334 }
335
336 Ok(())
337 }
338
339 fn sink_name(&self) -> &str {
340 "LogSink"
341 }
342
343 fn accepts_severity(&self, severity: AlertSeverity) -> bool {
344 severity >= self.min_severity
345 }
346
347 async fn health_check(&self) -> AlertSinkResult<()> {
348 Ok(())
349 }
350}
351
352pub struct InMemorySink {
358 alerts: RwLock<Vec<Alert>>,
359 max_alerts: usize,
360 min_severity: AlertSeverity,
361}
362
363impl InMemorySink {
364 pub fn new(max_alerts: usize) -> Self {
366 Self {
367 alerts: RwLock::new(Vec::new()),
368 max_alerts,
369 min_severity: AlertSeverity::Info,
370 }
371 }
372
373 pub fn with_severity_filter(mut self, min: AlertSeverity) -> Self {
375 self.min_severity = min;
376 self
377 }
378
379 pub fn alerts(&self) -> Vec<Alert> {
381 self.alerts.read().clone()
382 }
383
384 pub fn count(&self) -> usize {
386 self.alerts.read().len()
387 }
388
389 pub fn clear(&self) {
391 self.alerts.write().clear();
392 }
393}
394
395#[async_trait]
396impl AlertSink for InMemorySink {
397 async fn send(&self, alert: &Alert) -> AlertSinkResult<()> {
398 if alert.severity < self.min_severity {
399 return Ok(());
400 }
401
402 let mut alerts = self.alerts.write();
403 if alerts.len() >= self.max_alerts {
404 alerts.remove(0);
405 }
406 alerts.push(alert.clone());
407 Ok(())
408 }
409
410 fn sink_name(&self) -> &str {
411 "InMemorySink"
412 }
413
414 fn accepts_severity(&self, severity: AlertSeverity) -> bool {
415 severity >= self.min_severity
416 }
417
418 async fn health_check(&self) -> AlertSinkResult<()> {
419 Ok(())
420 }
421}
422
423#[cfg(feature = "alerting")]
429pub struct WebhookSink {
430 url: String,
432 min_severity: AlertSeverity,
434 headers: HashMap<String, String>,
436 format: WebhookFormat,
438 client: reqwest::Client,
440 timeout: Duration,
442}
443
444#[cfg(feature = "alerting")]
446#[derive(Debug, Clone, Copy, PartialEq, Eq)]
447pub enum WebhookFormat {
448 Json,
450 Slack,
452 MsTeams,
454 PagerDuty,
456}
457
458#[cfg(feature = "alerting")]
459impl WebhookSink {
460 pub fn new(url: impl Into<String>) -> Self {
462 Self {
463 url: url.into(),
464 min_severity: AlertSeverity::Info,
465 headers: HashMap::new(),
466 format: WebhookFormat::Json,
467 client: reqwest::Client::new(),
468 timeout: Duration::from_secs(10),
469 }
470 }
471
472 pub fn slack(url: impl Into<String>) -> Self {
474 Self::new(url).with_format(WebhookFormat::Slack)
475 }
476
477 pub fn teams(url: impl Into<String>) -> Self {
479 Self::new(url).with_format(WebhookFormat::MsTeams)
480 }
481
482 pub fn pagerduty(routing_key: impl Into<String>) -> Self {
484 Self::new("https://events.pagerduty.com/v2/enqueue")
485 .with_format(WebhookFormat::PagerDuty)
486 .with_header("X-Routing-Key", routing_key.into())
487 }
488
489 pub fn with_severity_filter(mut self, min: AlertSeverity) -> Self {
491 self.min_severity = min;
492 self
493 }
494
495 pub fn with_format(mut self, format: WebhookFormat) -> Self {
497 self.format = format;
498 self
499 }
500
501 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
503 self.headers.insert(key.into(), value.into());
504 self
505 }
506
507 pub fn with_timeout(mut self, timeout: Duration) -> Self {
509 self.timeout = timeout;
510 self
511 }
512
513 fn format_payload(&self, alert: &Alert) -> String {
515 match self.format {
516 WebhookFormat::Json => alert.to_json(),
517 WebhookFormat::Slack => self.format_slack(alert),
518 WebhookFormat::MsTeams => self.format_teams(alert),
519 WebhookFormat::PagerDuty => self.format_pagerduty(alert),
520 }
521 }
522
523 fn format_slack(&self, alert: &Alert) -> String {
524 let desc = alert.description.as_deref().unwrap_or("");
525 let source = alert.source.as_deref().unwrap_or("unknown");
526
527 format!(
528 r#"{{"attachments":[{{"color":"{}","title":"[{}] {}","text":"{}","fields":[{{"title":"Source","value":"{}","short":true}},{{"title":"Time","value":"{}","short":true}}]}}]}}"#,
529 alert.severity.color(),
530 alert.severity.as_str(),
531 escape_json(&alert.title),
532 escape_json(desc),
533 escape_json(source),
534 alert.timestamp_millis(),
535 )
536 }
537
538 fn format_teams(&self, alert: &Alert) -> String {
539 let desc = alert.description.as_deref().unwrap_or("");
540 let source = alert.source.as_deref().unwrap_or("unknown");
541
542 format!(
543 r#"{{"@type":"MessageCard","@context":"http://schema.org/extensions","themeColor":"{}","title":"[{}] {}","text":"{}","sections":[{{"facts":[{{"name":"Source","value":"{}"}},{{"name":"Severity","value":"{}"}}]}}]}}"#,
544 alert.severity.color().trim_start_matches('#'),
545 alert.severity.as_str(),
546 escape_json(&alert.title),
547 escape_json(desc),
548 escape_json(source),
549 alert.severity.as_str(),
550 )
551 }
552
553 fn format_pagerduty(&self, alert: &Alert) -> String {
554 let severity_pd = match alert.severity {
555 AlertSeverity::Info => "info",
556 AlertSeverity::Warning => "warning",
557 AlertSeverity::Error => "error",
558 AlertSeverity::Critical => "critical",
559 };
560
561 let dedup = alert.dedup_key.as_deref().unwrap_or(&alert.title);
562 let source = alert.source.as_deref().unwrap_or("ringkernel");
563
564 format!(
565 r#"{{"routing_key":"{}","event_action":"trigger","dedup_key":"{}","payload":{{"summary":"{}","source":"{}","severity":"{}","timestamp":"{}"}}}}"#,
566 self.headers.get("X-Routing-Key").unwrap_or(&String::new()),
567 escape_json(dedup),
568 escape_json(&alert.title),
569 escape_json(source),
570 severity_pd,
571 chrono_timestamp(alert.timestamp),
572 )
573 }
574}
575
576#[cfg(feature = "alerting")]
577fn chrono_timestamp(time: SystemTime) -> String {
578 let millis = time
579 .duration_since(UNIX_EPOCH)
580 .unwrap_or_default()
581 .as_millis();
582 format!("{}Z", millis / 1000)
584}
585
586#[cfg(feature = "alerting")]
587#[async_trait]
588impl AlertSink for WebhookSink {
589 async fn send(&self, alert: &Alert) -> AlertSinkResult<()> {
590 if alert.severity < self.min_severity {
591 return Ok(());
592 }
593
594 let payload = self.format_payload(alert);
595 let mut request = self
596 .client
597 .post(&self.url)
598 .header("Content-Type", "application/json")
599 .body(payload)
600 .timeout(self.timeout);
601
602 for (key, value) in &self.headers {
603 request = request.header(key, value);
604 }
605
606 let response = request.send().await.map_err(|e| {
607 AlertSinkError::ConnectionError(format!("Webhook request failed: {}", e))
608 })?;
609
610 if response.status().is_success() {
611 Ok(())
612 } else {
613 Err(AlertSinkError::Other(format!(
614 "Webhook returned status: {}",
615 response.status()
616 )))
617 }
618 }
619
620 fn sink_name(&self) -> &str {
621 "WebhookSink"
622 }
623
624 fn accepts_severity(&self, severity: AlertSeverity) -> bool {
625 severity >= self.min_severity
626 }
627
628 async fn health_check(&self) -> AlertSinkResult<()> {
629 Ok(())
631 }
632}
633
634#[derive(Debug, Clone)]
640pub struct DeduplicationConfig {
641 pub window: Duration,
643 pub max_entries: usize,
645}
646
647impl Default for DeduplicationConfig {
648 fn default() -> Self {
649 Self {
650 window: Duration::from_secs(300), max_entries: 10000,
652 }
653 }
654}
655
656pub struct AlertRouter {
658 sinks: Vec<Arc<dyn AlertSink>>,
660 dedup_cache: RwLock<HashMap<String, Instant>>,
662 dedup_config: DeduplicationConfig,
664 alert_count: AtomicU64,
666 dedup_count: AtomicU64,
668}
669
670impl AlertRouter {
671 pub fn new() -> Self {
673 Self {
674 sinks: Vec::new(),
675 dedup_cache: RwLock::new(HashMap::new()),
676 dedup_config: DeduplicationConfig::default(),
677 alert_count: AtomicU64::new(0),
678 dedup_count: AtomicU64::new(0),
679 }
680 }
681
682 pub fn with_deduplication(mut self, config: DeduplicationConfig) -> Self {
684 self.dedup_config = config;
685 self
686 }
687
688 pub fn with_sink<S: AlertSink + 'static>(mut self, sink: S) -> Self {
690 self.sinks.push(Arc::new(sink));
691 self
692 }
693
694 pub fn with_sink_arc(mut self, sink: Arc<dyn AlertSink>) -> Self {
696 self.sinks.push(sink);
697 self
698 }
699
700 pub async fn send(&self, alert: Alert) {
702 self.alert_count.fetch_add(1, Ordering::Relaxed);
703
704 if let Some(dedup_key) = &alert.dedup_key {
706 let now = Instant::now();
707 let mut cache = self.dedup_cache.write();
708
709 cache.retain(|_, instant| now.duration_since(*instant) < self.dedup_config.window);
711
712 if let Some(last_seen) = cache.get(dedup_key) {
714 if now.duration_since(*last_seen) < self.dedup_config.window {
715 self.dedup_count.fetch_add(1, Ordering::Relaxed);
716 return; }
718 }
719
720 if cache.len() < self.dedup_config.max_entries {
722 cache.insert(dedup_key.clone(), now);
723 }
724 }
725
726 for sink in &self.sinks {
728 if sink.accepts_severity(alert.severity) {
729 if let Err(e) = sink.send(&alert).await {
730 tracing::error!("Alert sink {} failed: {}", sink.sink_name(), e);
731 }
732 }
733 }
734 }
735
736 pub fn send_sync(&self, alert: Alert) {
738 self.alert_count.fetch_add(1, Ordering::Relaxed);
741
742 let source = alert.source.as_deref().unwrap_or("unknown");
743 let msg = format!("[ALERT] {} | {} | {}", alert.severity, source, alert.title);
744
745 match alert.severity {
746 AlertSeverity::Info => tracing::info!("{}", msg),
747 AlertSeverity::Warning => tracing::warn!("{}", msg),
748 AlertSeverity::Error | AlertSeverity::Critical => tracing::error!("{}", msg),
749 }
750 }
751
752 pub fn stats(&self) -> AlertRouterStats {
754 AlertRouterStats {
755 total_alerts: self.alert_count.load(Ordering::Relaxed),
756 deduplicated: self.dedup_count.load(Ordering::Relaxed),
757 sinks: self.sinks.len(),
758 }
759 }
760}
761
762impl Default for AlertRouter {
763 fn default() -> Self {
764 Self::new()
765 }
766}
767
768#[derive(Debug, Clone)]
770pub struct AlertRouterStats {
771 pub total_alerts: u64,
773 pub deduplicated: u64,
775 pub sinks: usize,
777}
778
779#[cfg(test)]
784mod tests {
785 use super::*;
786
787 #[test]
788 fn test_alert_severity_ordering() {
789 assert!(AlertSeverity::Info < AlertSeverity::Warning);
790 assert!(AlertSeverity::Warning < AlertSeverity::Error);
791 assert!(AlertSeverity::Error < AlertSeverity::Critical);
792 }
793
794 #[test]
795 fn test_alert_creation() {
796 let alert = Alert::new(AlertSeverity::Warning, "Test alert")
797 .with_description("This is a test")
798 .with_source("test_kernel")
799 .with_metadata("key1", "value1")
800 .with_tag("test");
801
802 assert_eq!(alert.severity, AlertSeverity::Warning);
803 assert_eq!(alert.title, "Test alert");
804 assert_eq!(alert.description, Some("This is a test".to_string()));
805 assert_eq!(alert.source, Some("test_kernel".to_string()));
806 assert_eq!(alert.metadata.get("key1"), Some(&"value1".to_string()));
807 assert!(alert.tags.contains(&"test".to_string()));
808 }
809
810 #[test]
811 fn test_alert_json() {
812 let alert = Alert::new(AlertSeverity::Error, "Test").with_source("kernel_1");
813
814 let json = alert.to_json();
815 assert!(json.contains("ERROR"));
816 assert!(json.contains("Test"));
817 assert!(json.contains("kernel_1"));
818 }
819
820 #[tokio::test]
821 async fn test_in_memory_sink() {
822 let sink = InMemorySink::new(10);
823
824 let alert = Alert::new(AlertSeverity::Warning, "Test alert");
825 sink.send(&alert).await.unwrap();
826
827 assert_eq!(sink.count(), 1);
828 let alerts = sink.alerts();
829 assert_eq!(alerts[0].title, "Test alert");
830 }
831
832 #[tokio::test]
833 async fn test_in_memory_sink_severity_filter() {
834 let sink = InMemorySink::new(10).with_severity_filter(AlertSeverity::Error);
835
836 let info = Alert::new(AlertSeverity::Info, "Info");
838 sink.send(&info).await.unwrap();
839 assert_eq!(sink.count(), 0);
840
841 let error = Alert::new(AlertSeverity::Error, "Error");
843 sink.send(&error).await.unwrap();
844 assert_eq!(sink.count(), 1);
845 }
846
847 #[tokio::test]
848 async fn test_alert_router() {
849 let sink = Arc::new(InMemorySink::new(100));
850 let router = AlertRouter::new().with_sink_arc(sink.clone());
851
852 router
853 .send(Alert::new(AlertSeverity::Warning, "Alert 1"))
854 .await;
855 router
856 .send(Alert::new(AlertSeverity::Error, "Alert 2"))
857 .await;
858
859 assert_eq!(sink.count(), 2);
860 assert_eq!(router.stats().total_alerts, 2);
861 }
862
863 #[tokio::test]
864 async fn test_alert_deduplication() {
865 let sink = Arc::new(InMemorySink::new(100));
866 let router = AlertRouter::new()
867 .with_deduplication(DeduplicationConfig {
868 window: Duration::from_secs(60),
869 max_entries: 100,
870 })
871 .with_sink_arc(sink.clone());
872
873 for _ in 0..5 {
875 router
876 .send(
877 Alert::new(AlertSeverity::Warning, "Repeated alert").with_dedup_key("same-key"),
878 )
879 .await;
880 }
881
882 assert_eq!(sink.count(), 1);
884 assert_eq!(router.stats().deduplicated, 4);
885 }
886
887 #[tokio::test]
888 async fn test_log_sink() {
889 let sink = LogSink::new().with_severity_filter(AlertSeverity::Warning);
890
891 let info = Alert::new(AlertSeverity::Info, "Info");
892 assert!(!sink.accepts_severity(info.severity));
893
894 let warning = Alert::new(AlertSeverity::Warning, "Warning");
895 assert!(sink.accepts_severity(warning.severity));
896
897 sink.send(&warning).await.unwrap();
899 }
900}