1use crate::monitoring::{Anomaly, AnomalySeverity, Metric};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use sklears_core::error::{Result as SklResult, SklearsError};
11use std::collections::{HashMap, VecDeque};
12use std::sync::{Arc, Mutex, RwLock};
13use std::time::{Duration, SystemTime, UNIX_EPOCH};
14
15pub struct AutomatedAlerter {
17 config: AlertConfig,
19 rules: RwLock<HashMap<String, AlertRule>>,
21 channels: RwLock<HashMap<String, Box<dyn AlertChannel>>>,
23 alert_history: Arc<Mutex<VecDeque<AlertEvent>>>,
25 silenced_alerts: RwLock<HashMap<String, SilencePeriod>>,
27 active_alerts: RwLock<HashMap<String, ActiveAlert>>,
29 stats: RwLock<AlertStats>,
31}
32
33#[derive(Debug, Clone)]
35pub struct AlertConfig {
36 pub max_history: usize,
38 pub default_cooldown: Duration,
40 pub enable_grouping: bool,
42 pub grouping_window: Duration,
44 pub max_alerts_per_group: usize,
46 pub enable_escalation: bool,
48 pub escalation_levels: Vec<EscalationLevel>,
50}
51
52#[derive(Debug, Clone)]
54pub struct EscalationLevel {
55 pub trigger_after: Duration,
57 pub channels: Vec<String>,
59 pub severity_threshold: AlertSeverity,
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct AlertRule {
66 pub id: String,
68 pub name: String,
70 pub description: String,
72 pub condition: AlertCondition,
74 pub severity: AlertSeverity,
76 pub channels: Vec<String>,
78 pub cooldown: Duration,
80 pub enabled: bool,
82 pub labels: HashMap<String, String>,
84 pub priority: u32,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(tag = "type")]
91pub enum AlertCondition {
92 Threshold {
94 metric: String,
96 operator: ThresholdOperator,
98 value: f64,
100 window: Duration,
102 min_points: usize,
104 },
105 Rate {
107 metric: String,
109 rate_threshold: f64,
111 window: Duration,
113 },
114 Anomaly {
116 metric: String,
118 sensitivity: f64,
120 training_window: Duration,
122 },
123 Composite {
125 operator: LogicalOperator,
127 conditions: Vec<AlertCondition>,
129 },
130 Pattern {
132 pattern: String,
134 field: PatternField,
136 case_sensitive: bool,
138 },
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
143pub enum ThresholdOperator {
144 GreaterThan,
146 GreaterThanOrEqual,
148 LessThan,
150 LessThanOrEqual,
152 Equal,
154 NotEqual,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
160pub enum LogicalOperator {
161 And,
163 Or,
165 Not,
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub enum PatternField {
172 PipelineName,
174 StageName,
176 ErrorMessage,
178 LogMessage,
180 Custom(String),
182}
183
184#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
186pub enum AlertSeverity {
187 Info,
189 Warning,
191 Critical,
193 Emergency,
195}
196
197impl std::fmt::Display for AlertSeverity {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 match self {
200 AlertSeverity::Info => write!(f, "INFO"),
201 AlertSeverity::Warning => write!(f, "WARNING"),
202 AlertSeverity::Critical => write!(f, "CRITICAL"),
203 AlertSeverity::Emergency => write!(f, "EMERGENCY"),
204 }
205 }
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct AlertEvent {
211 pub id: String,
213 pub rule_id: String,
215 pub severity: AlertSeverity,
217 pub message: String,
219 pub timestamp: DateTime<Utc>,
221 pub pipeline: Option<String>,
223 pub stage: Option<String>,
225 pub labels: HashMap<String, String>,
227 pub metadata: HashMap<String, String>,
229 pub trigger_value: Option<f64>,
231 pub status: AlertStatus,
233}
234
235#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
237pub enum AlertStatus {
238 Firing,
240 Acknowledged,
242 Resolved,
244 Silenced,
246}
247
248#[derive(Debug, Clone)]
250pub struct ActiveAlert {
251 pub event: AlertEvent,
253 pub first_fired: DateTime<Utc>,
255 pub last_updated: DateTime<Utc>,
257 pub fire_count: u32,
259 pub escalation_level: usize,
261 pub channels_notified: Vec<String>,
263}
264
265#[derive(Debug, Clone)]
267pub struct SilencePeriod {
268 pub start: DateTime<Utc>,
270 pub end: DateTime<Utc>,
272 pub reason: String,
274 pub created_by: String,
276 pub rule_patterns: Vec<String>,
278}
279
280#[derive(Debug, Clone, Default)]
282pub struct AlertStats {
283 pub total_alerts: u64,
285 pub alerts_by_severity: HashMap<AlertSeverity, u64>,
287 pub alerts_by_rule: HashMap<String, u64>,
289 pub false_positive_rate: f64,
291 pub avg_resolution_time: Duration,
293 pub active_alert_count: usize,
295}
296
297pub trait AlertChannel: Send + Sync + std::fmt::Debug {
299 fn send_alert(&self, alert: &AlertEvent) -> SklResult<()>;
301
302 fn name(&self) -> &str;
304
305 fn health_check(&self) -> SklResult<()>;
307
308 fn config(&self) -> HashMap<String, String>;
310}
311
312#[derive(Debug)]
314pub struct ConsoleAlertChannel {
315 name: String,
317 config: HashMap<String, String>,
319}
320
321#[derive(Debug)]
323pub struct EmailAlertChannel {
324 name: String,
326 smtp_server: String,
328 smtp_port: u16,
330 from_email: String,
332 to_emails: Vec<String>,
334 config: HashMap<String, String>,
336}
337
338#[derive(Debug)]
340pub struct WebhookAlertChannel {
341 name: String,
343 url: String,
345 headers: HashMap<String, String>,
347 timeout: Duration,
349 config: HashMap<String, String>,
351}
352
353#[derive(Debug)]
355pub struct SlackAlertChannel {
356 name: String,
358 webhook_url: String,
360 channel: String,
362 username: String,
364 config: HashMap<String, String>,
366}
367
368impl AutomatedAlerter {
369 #[must_use]
371 pub fn new(config: AlertConfig) -> Self {
372 Self {
373 config,
374 rules: RwLock::new(HashMap::new()),
375 channels: RwLock::new(HashMap::new()),
376 alert_history: Arc::new(Mutex::new(VecDeque::new())),
377 silenced_alerts: RwLock::new(HashMap::new()),
378 active_alerts: RwLock::new(HashMap::new()),
379 stats: RwLock::new(AlertStats::default()),
380 }
381 }
382
383 pub fn add_rule(&self, rule: AlertRule) -> SklResult<()> {
385 let mut rules = self.rules.write().map_err(|_| {
386 SklearsError::InvalidOperation("Failed to acquire write lock for rules".to_string())
387 })?;
388
389 rules.insert(rule.id.clone(), rule);
390 Ok(())
391 }
392
393 pub fn remove_rule(&self, rule_id: &str) -> SklResult<()> {
395 let mut rules = self.rules.write().map_err(|_| {
396 SklearsError::InvalidOperation("Failed to acquire write lock for rules".to_string())
397 })?;
398
399 rules.remove(rule_id);
400 Ok(())
401 }
402
403 pub fn add_channel(&self, name: &str, channel: Box<dyn AlertChannel>) -> SklResult<()> {
405 let mut channels = self.channels.write().map_err(|_| {
406 SklearsError::InvalidOperation("Failed to acquire write lock for channels".to_string())
407 })?;
408
409 channels.insert(name.to_string(), channel);
410 Ok(())
411 }
412
413 pub fn remove_channel(&self, name: &str) -> SklResult<()> {
415 let mut channels = self.channels.write().map_err(|_| {
416 SklearsError::InvalidOperation("Failed to acquire write lock for channels".to_string())
417 })?;
418
419 channels.remove(name);
420 Ok(())
421 }
422
423 pub fn process_metrics(&self, metrics: &[Metric]) -> SklResult<Vec<AlertEvent>> {
425 let mut triggered_alerts = Vec::new();
426
427 let rules = self.rules.read().map_err(|_| {
428 SklearsError::InvalidOperation("Failed to acquire read lock for rules".to_string())
429 })?;
430
431 for (rule_id, rule) in rules.iter() {
432 if !rule.enabled {
433 continue;
434 }
435
436 if self.is_silenced(rule_id)? {
438 continue;
439 }
440
441 if self.is_in_cooldown(rule_id)? {
443 continue;
444 }
445
446 if self.evaluate_condition(&rule.condition, metrics)? {
448 let alert = self.create_alert_event(rule, metrics)?;
449 triggered_alerts.push(alert);
450 }
451 }
452
453 for alert in &triggered_alerts {
455 self.handle_alert(alert.clone())?;
456 }
457
458 Ok(triggered_alerts)
459 }
460
461 pub fn process_anomalies(&self, anomalies: &[Anomaly]) -> SklResult<Vec<AlertEvent>> {
463 let mut triggered_alerts = Vec::new();
464
465 for anomaly in anomalies {
466 let alert_severity = match anomaly.severity {
468 AnomalySeverity::Low => AlertSeverity::Info,
469 AnomalySeverity::Medium => AlertSeverity::Warning,
470 AnomalySeverity::High => AlertSeverity::Critical,
471 AnomalySeverity::Critical => AlertSeverity::Emergency,
472 };
473
474 let alert_event = AlertEvent {
475 id: uuid::Uuid::new_v4().to_string(),
476 rule_id: "anomaly_detection".to_string(),
477 severity: alert_severity,
478 message: format!("Anomaly detected: {}", anomaly.description),
479 timestamp: Utc::now(),
480 pipeline: Some(anomaly.pipeline_name.clone()),
481 stage: None,
482 labels: {
483 let mut labels = HashMap::new();
484 labels.insert("type".to_string(), "anomaly".to_string());
485 labels.insert("metric".to_string(), anomaly.metric_name.clone());
486 labels
487 },
488 metadata: HashMap::new(),
489 trigger_value: None,
490 status: AlertStatus::Firing,
491 };
492
493 triggered_alerts.push(alert_event);
494 }
495
496 for alert in &triggered_alerts {
498 self.handle_alert(alert.clone())?;
499 }
500
501 Ok(triggered_alerts)
502 }
503
504 pub fn create_silence(
506 &self,
507 rule_patterns: Vec<String>,
508 duration: Duration,
509 reason: String,
510 created_by: String,
511 ) -> SklResult<String> {
512 let silence_id = uuid::Uuid::new_v4().to_string();
513 let now = Utc::now();
514
515 let silence = SilencePeriod {
516 start: now,
517 end: now
518 + chrono::Duration::from_std(duration).map_err(|_| {
519 SklearsError::InvalidInput("Invalid duration for silence period".to_string())
520 })?,
521 reason,
522 created_by,
523 rule_patterns,
524 };
525
526 let mut silenced = self.silenced_alerts.write().map_err(|_| {
527 SklearsError::InvalidOperation(
528 "Failed to acquire write lock for silenced alerts".to_string(),
529 )
530 })?;
531
532 silenced.insert(silence_id.clone(), silence);
533 Ok(silence_id)
534 }
535
536 pub fn remove_silence(&self, silence_id: &str) -> SklResult<()> {
538 let mut silenced = self.silenced_alerts.write().map_err(|_| {
539 SklearsError::InvalidOperation(
540 "Failed to acquire write lock for silenced alerts".to_string(),
541 )
542 })?;
543
544 silenced.remove(silence_id);
545 Ok(())
546 }
547
548 pub fn acknowledge_alert(&self, alert_id: &str, acknowledged_by: &str) -> SklResult<()> {
550 let mut active_alerts = self.active_alerts.write().map_err(|_| {
551 SklearsError::InvalidOperation(
552 "Failed to acquire write lock for active alerts".to_string(),
553 )
554 })?;
555
556 if let Some(active_alert) = active_alerts.get_mut(alert_id) {
557 active_alert.event.status = AlertStatus::Acknowledged;
558 active_alert
559 .event
560 .metadata
561 .insert("acknowledged_by".to_string(), acknowledged_by.to_string());
562 active_alert
563 .event
564 .metadata
565 .insert("acknowledged_at".to_string(), Utc::now().to_rfc3339());
566 active_alert.last_updated = Utc::now();
567 }
568
569 Ok(())
570 }
571
572 pub fn resolve_alert(&self, alert_id: &str, resolved_by: &str) -> SklResult<()> {
574 let mut active_alerts = self.active_alerts.write().map_err(|_| {
575 SklearsError::InvalidOperation(
576 "Failed to acquire write lock for active alerts".to_string(),
577 )
578 })?;
579
580 if let Some(mut active_alert) = active_alerts.remove(alert_id) {
581 active_alert.event.status = AlertStatus::Resolved;
582 active_alert
583 .event
584 .metadata
585 .insert("resolved_by".to_string(), resolved_by.to_string());
586 active_alert
587 .event
588 .metadata
589 .insert("resolved_at".to_string(), Utc::now().to_rfc3339());
590
591 self.add_to_history(active_alert.event.clone())?;
593
594 self.update_resolution_stats(&active_alert)?;
596 }
597
598 Ok(())
599 }
600
601 pub fn get_stats(&self) -> SklResult<AlertStats> {
603 let stats = self.stats.read().map_err(|_| {
604 SklearsError::InvalidOperation("Failed to acquire read lock for stats".to_string())
605 })?;
606
607 Ok(stats.clone())
608 }
609
610 pub fn get_active_alerts(&self) -> SklResult<Vec<ActiveAlert>> {
612 let active_alerts = self.active_alerts.read().map_err(|_| {
613 SklearsError::InvalidOperation(
614 "Failed to acquire read lock for active alerts".to_string(),
615 )
616 })?;
617
618 Ok(active_alerts.values().cloned().collect())
619 }
620
621 pub fn get_alert_history(&self, limit: Option<usize>) -> SklResult<Vec<AlertEvent>> {
623 let history = self.alert_history.lock().map_err(|_| {
624 SklearsError::InvalidOperation("Failed to acquire lock for alert history".to_string())
625 })?;
626
627 let alerts: Vec<AlertEvent> = history.iter().cloned().collect();
628
629 if let Some(limit) = limit {
630 Ok(alerts.into_iter().take(limit).collect())
631 } else {
632 Ok(alerts)
633 }
634 }
635
636 fn evaluate_condition(
638 &self,
639 condition: &AlertCondition,
640 metrics: &[Metric],
641 ) -> SklResult<bool> {
642 match condition {
643 AlertCondition::Threshold {
644 metric,
645 operator,
646 value,
647 window,
648 min_points,
649 } => self.evaluate_threshold_condition(
650 metric,
651 operator,
652 *value,
653 *window,
654 *min_points,
655 metrics,
656 ),
657 AlertCondition::Rate {
658 metric,
659 rate_threshold,
660 window,
661 } => self.evaluate_rate_condition(metric, *rate_threshold, *window, metrics),
662 AlertCondition::Anomaly {
663 metric,
664 sensitivity,
665 training_window,
666 } => self.evaluate_anomaly_condition(metric, *sensitivity, *training_window, metrics),
667 AlertCondition::Composite {
668 operator,
669 conditions,
670 } => self.evaluate_composite_condition(operator, conditions, metrics),
671 AlertCondition::Pattern {
672 pattern,
673 field,
674 case_sensitive,
675 } => self.evaluate_pattern_condition(pattern, field, *case_sensitive, metrics),
676 }
677 }
678
679 fn evaluate_threshold_condition(
681 &self,
682 metric_name: &str,
683 operator: &ThresholdOperator,
684 threshold: f64,
685 window: Duration,
686 min_points: usize,
687 metrics: &[Metric],
688 ) -> SklResult<bool> {
689 let cutoff_time = SystemTime::now()
690 .duration_since(UNIX_EPOCH)
691 .unwrap()
692 .as_secs()
693 - window.as_secs();
694
695 let relevant_metrics: Vec<&Metric> = metrics
696 .iter()
697 .filter(|m| m.name == metric_name && m.timestamp >= cutoff_time)
698 .collect();
699
700 if relevant_metrics.len() < min_points {
701 return Ok(false);
702 }
703
704 if let Some(latest_metric) = relevant_metrics.last() {
706 Ok(self.compare_value(latest_metric.value, operator, threshold))
707 } else {
708 Ok(false)
709 }
710 }
711
712 fn evaluate_rate_condition(
714 &self,
715 metric_name: &str,
716 rate_threshold: f64,
717 window: Duration,
718 metrics: &[Metric],
719 ) -> SklResult<bool> {
720 let cutoff_time = SystemTime::now()
721 .duration_since(UNIX_EPOCH)
722 .unwrap()
723 .as_secs()
724 - window.as_secs();
725
726 let relevant_metrics: Vec<&Metric> = metrics
727 .iter()
728 .filter(|m| m.name == metric_name && m.timestamp >= cutoff_time)
729 .collect();
730
731 if relevant_metrics.len() < 2 {
732 return Ok(false);
733 }
734
735 let first = relevant_metrics.first().unwrap();
737 let last = relevant_metrics.last().unwrap();
738
739 let time_diff = last.timestamp - first.timestamp;
740 if time_diff == 0 {
741 return Ok(false);
742 }
743
744 let rate = (last.value - first.value) / time_diff as f64;
745 Ok(rate > rate_threshold)
746 }
747
748 fn evaluate_anomaly_condition(
750 &self,
751 metric_name: &str,
752 sensitivity: f64,
753 training_window: Duration,
754 metrics: &[Metric],
755 ) -> SklResult<bool> {
756 let cutoff_time = SystemTime::now()
757 .duration_since(UNIX_EPOCH)
758 .unwrap()
759 .as_secs()
760 - training_window.as_secs();
761
762 let relevant_metrics: Vec<&Metric> = metrics
763 .iter()
764 .filter(|m| m.name == metric_name && m.timestamp >= cutoff_time)
765 .collect();
766
767 if relevant_metrics.len() < 10 {
768 return Ok(false);
769 }
770
771 let values: Vec<f64> = relevant_metrics.iter().map(|m| m.value).collect();
773 let mean = values.iter().sum::<f64>() / values.len() as f64;
774 let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
775 let std_dev = variance.sqrt();
776
777 if let Some(latest_metric) = relevant_metrics.last() {
778 let z_score = (latest_metric.value - mean) / std_dev;
779 Ok(z_score.abs() > sensitivity)
780 } else {
781 Ok(false)
782 }
783 }
784
785 fn evaluate_composite_condition(
787 &self,
788 operator: &LogicalOperator,
789 conditions: &[AlertCondition],
790 metrics: &[Metric],
791 ) -> SklResult<bool> {
792 match operator {
793 LogicalOperator::And => {
794 for condition in conditions {
795 if !self.evaluate_condition(condition, metrics)? {
796 return Ok(false);
797 }
798 }
799 Ok(true)
800 }
801 LogicalOperator::Or => {
802 for condition in conditions {
803 if self.evaluate_condition(condition, metrics)? {
804 return Ok(true);
805 }
806 }
807 Ok(false)
808 }
809 LogicalOperator::Not => {
810 if conditions.len() != 1 {
811 return Err(SklearsError::InvalidInput(
812 "NOT operator requires exactly one condition".to_string(),
813 ));
814 }
815 Ok(!self.evaluate_condition(&conditions[0], metrics)?)
816 }
817 }
818 }
819
820 fn evaluate_pattern_condition(
822 &self,
823 pattern: &str,
824 field: &PatternField,
825 case_sensitive: bool,
826 metrics: &[Metric],
827 ) -> SklResult<bool> {
828 for metric in metrics {
829 let text_to_search = match field {
830 PatternField::PipelineName => &metric.pipeline_name,
831 PatternField::StageName => {
832 if let Some(ref stage_name) = metric.stage_name {
833 stage_name
834 } else {
835 continue;
836 }
837 }
838 PatternField::ErrorMessage => {
839 continue;
841 }
842 PatternField::LogMessage => {
843 continue;
845 }
846 PatternField::Custom(field_name) => {
847 if let Some(value) = metric.metadata.get(field_name) {
848 value
849 } else {
850 continue;
851 }
852 }
853 };
854
855 let matches = if case_sensitive {
856 text_to_search.contains(pattern)
857 } else {
858 text_to_search
859 .to_lowercase()
860 .contains(&pattern.to_lowercase())
861 };
862
863 if matches {
864 return Ok(true);
865 }
866 }
867
868 Ok(false)
869 }
870
871 fn compare_value(&self, value: f64, operator: &ThresholdOperator, threshold: f64) -> bool {
873 match operator {
874 ThresholdOperator::GreaterThan => value > threshold,
875 ThresholdOperator::GreaterThanOrEqual => value >= threshold,
876 ThresholdOperator::LessThan => value < threshold,
877 ThresholdOperator::LessThanOrEqual => value <= threshold,
878 ThresholdOperator::Equal => (value - threshold).abs() < f64::EPSILON,
879 ThresholdOperator::NotEqual => (value - threshold).abs() >= f64::EPSILON,
880 }
881 }
882
883 fn create_alert_event(&self, rule: &AlertRule, metrics: &[Metric]) -> SklResult<AlertEvent> {
885 let trigger_value = metrics
886 .iter()
887 .find(|m| self.metric_matches_rule(m, rule))
888 .map(|m| m.value);
889
890 let pipeline = metrics
891 .iter()
892 .find(|m| self.metric_matches_rule(m, rule))
893 .map(|m| m.pipeline_name.clone());
894
895 let stage = metrics
896 .iter()
897 .find(|m| self.metric_matches_rule(m, rule))
898 .and_then(|m| m.stage_name.clone());
899
900 Ok(AlertEvent {
901 id: uuid::Uuid::new_v4().to_string(),
902 rule_id: rule.id.clone(),
903 severity: rule.severity.clone(),
904 message: format!("Alert triggered: {}", rule.description),
905 timestamp: Utc::now(),
906 pipeline,
907 stage,
908 labels: rule.labels.clone(),
909 metadata: HashMap::new(),
910 trigger_value,
911 status: AlertStatus::Firing,
912 })
913 }
914
915 fn metric_matches_rule(&self, _metric: &Metric, _rule: &AlertRule) -> bool {
917 true
919 }
920
921 fn handle_alert(&self, alert: AlertEvent) -> SklResult<()> {
923 if self.config.enable_grouping {
925 if let Some(existing_alert_id) = self.find_groupable_alert(&alert)? {
926 self.update_grouped_alert(existing_alert_id, &alert)?;
927 return Ok(());
928 }
929 }
930
931 let active_alert = ActiveAlert {
933 event: alert.clone(),
934 first_fired: alert.timestamp,
935 last_updated: alert.timestamp,
936 fire_count: 1,
937 escalation_level: 0,
938 channels_notified: Vec::new(),
939 };
940
941 {
942 let mut active_alerts = self.active_alerts.write().map_err(|_| {
943 SklearsError::InvalidOperation(
944 "Failed to acquire write lock for active alerts".to_string(),
945 )
946 })?;
947 active_alerts.insert(alert.id.clone(), active_alert);
948 }
949
950 self.send_alert_notifications(&alert)?;
952
953 self.update_stats(&alert)?;
955
956 Ok(())
957 }
958
959 fn send_alert_notifications(&self, alert: &AlertEvent) -> SklResult<()> {
961 let rules = self.rules.read().map_err(|_| {
962 SklearsError::InvalidOperation("Failed to acquire read lock for rules".to_string())
963 })?;
964
965 let channels = self.channels.read().map_err(|_| {
966 SklearsError::InvalidOperation("Failed to acquire read lock for channels".to_string())
967 })?;
968
969 if let Some(rule) = rules.get(&alert.rule_id) {
970 for channel_name in &rule.channels {
971 if let Some(channel) = channels.get(channel_name) {
972 if let Err(e) = channel.send_alert(alert) {
973 eprintln!("Failed to send alert to channel {channel_name}: {e:?}");
974 }
975 }
976 }
977 }
978
979 Ok(())
980 }
981
982 fn is_silenced(&self, rule_id: &str) -> SklResult<bool> {
984 let silenced = self.silenced_alerts.read().map_err(|_| {
985 SklearsError::InvalidOperation(
986 "Failed to acquire read lock for silenced alerts".to_string(),
987 )
988 })?;
989
990 let now = Utc::now();
991
992 for silence in silenced.values() {
993 if now >= silence.start && now <= silence.end {
994 for pattern in &silence.rule_patterns {
995 if rule_id.contains(pattern) {
996 return Ok(true);
997 }
998 }
999 }
1000 }
1001
1002 Ok(false)
1003 }
1004
1005 fn is_in_cooldown(&self, rule_id: &str) -> SklResult<bool> {
1007 let active_alerts = self.active_alerts.read().map_err(|_| {
1008 SklearsError::InvalidOperation(
1009 "Failed to acquire read lock for active alerts".to_string(),
1010 )
1011 })?;
1012
1013 let rules = self.rules.read().map_err(|_| {
1014 SklearsError::InvalidOperation("Failed to acquire read lock for rules".to_string())
1015 })?;
1016
1017 if let Some(rule) = rules.get(rule_id) {
1018 let now = Utc::now();
1019
1020 for active_alert in active_alerts.values() {
1021 if active_alert.event.rule_id == rule_id {
1022 let cooldown_end = active_alert.last_updated
1023 + chrono::Duration::from_std(rule.cooldown).unwrap_or_default();
1024
1025 if now < cooldown_end {
1026 return Ok(true);
1027 }
1028 }
1029 }
1030 }
1031
1032 Ok(false)
1033 }
1034
1035 fn find_groupable_alert(&self, alert: &AlertEvent) -> SklResult<Option<String>> {
1037 let active_alerts = self.active_alerts.read().map_err(|_| {
1038 SklearsError::InvalidOperation(
1039 "Failed to acquire read lock for active alerts".to_string(),
1040 )
1041 })?;
1042
1043 let now = alert.timestamp;
1044
1045 for (alert_id, active_alert) in active_alerts.iter() {
1046 let time_diff = now - active_alert.first_fired;
1047 let window_duration =
1048 chrono::Duration::from_std(self.config.grouping_window).unwrap_or_default();
1049
1050 if time_diff <= window_duration
1051 && active_alert.event.rule_id == alert.rule_id
1052 && active_alert.event.pipeline == alert.pipeline
1053 {
1054 return Ok(Some(alert_id.clone()));
1055 }
1056 }
1057
1058 Ok(None)
1059 }
1060
1061 fn update_grouped_alert(&self, alert_id: String, new_alert: &AlertEvent) -> SklResult<()> {
1063 let mut active_alerts = self.active_alerts.write().map_err(|_| {
1064 SklearsError::InvalidOperation(
1065 "Failed to acquire write lock for active alerts".to_string(),
1066 )
1067 })?;
1068
1069 if let Some(active_alert) = active_alerts.get_mut(&alert_id) {
1070 active_alert.fire_count += 1;
1071 active_alert.last_updated = new_alert.timestamp;
1072
1073 if new_alert.severity > active_alert.event.severity {
1075 active_alert.event.severity = new_alert.severity.clone();
1076 }
1077 }
1078
1079 Ok(())
1080 }
1081
1082 fn add_to_history(&self, alert: AlertEvent) -> SklResult<()> {
1084 let mut history = self.alert_history.lock().map_err(|_| {
1085 SklearsError::InvalidOperation("Failed to acquire lock for alert history".to_string())
1086 })?;
1087
1088 history.push_back(alert);
1089
1090 while history.len() > self.config.max_history {
1092 history.pop_front();
1093 }
1094
1095 Ok(())
1096 }
1097
1098 fn update_stats(&self, alert: &AlertEvent) -> SklResult<()> {
1100 let mut stats = self.stats.write().map_err(|_| {
1101 SklearsError::InvalidOperation("Failed to acquire write lock for stats".to_string())
1102 })?;
1103
1104 stats.total_alerts += 1;
1105 *stats
1106 .alerts_by_severity
1107 .entry(alert.severity.clone())
1108 .or_insert(0) += 1;
1109 *stats
1110 .alerts_by_rule
1111 .entry(alert.rule_id.clone())
1112 .or_insert(0) += 1;
1113
1114 let active_alerts = self.active_alerts.read().map_err(|_| {
1115 SklearsError::InvalidOperation(
1116 "Failed to acquire read lock for active alerts".to_string(),
1117 )
1118 })?;
1119 stats.active_alert_count = active_alerts.len();
1120
1121 Ok(())
1122 }
1123
1124 fn update_resolution_stats(&self, active_alert: &ActiveAlert) -> SklResult<()> {
1126 let mut stats = self.stats.write().map_err(|_| {
1127 SklearsError::InvalidOperation("Failed to acquire write lock for stats".to_string())
1128 })?;
1129
1130 let resolution_time = Utc::now() - active_alert.first_fired;
1131 let resolution_duration = Duration::from_millis(resolution_time.num_milliseconds() as u64);
1132
1133 stats.avg_resolution_time = resolution_duration;
1135
1136 let active_alerts = self.active_alerts.read().map_err(|_| {
1137 SklearsError::InvalidOperation(
1138 "Failed to acquire read lock for active alerts".to_string(),
1139 )
1140 })?;
1141 stats.active_alert_count = active_alerts.len();
1142
1143 Ok(())
1144 }
1145}
1146
1147impl ConsoleAlertChannel {
1149 #[must_use]
1151 pub fn new(name: &str) -> Self {
1152 Self {
1153 name: name.to_string(),
1154 config: HashMap::new(),
1155 }
1156 }
1157}
1158
1159impl AlertChannel for ConsoleAlertChannel {
1160 fn send_alert(&self, alert: &AlertEvent) -> SklResult<()> {
1161 println!(
1162 "🚨 ALERT [{}] {}: {}",
1163 alert.severity, alert.rule_id, alert.message
1164 );
1165
1166 if let Some(pipeline) = &alert.pipeline {
1167 println!(" Pipeline: {pipeline}");
1168 }
1169
1170 if let Some(stage) = &alert.stage {
1171 println!(" Stage: {stage}");
1172 }
1173
1174 if let Some(value) = alert.trigger_value {
1175 println!(" Trigger Value: {value}");
1176 }
1177
1178 println!(" Timestamp: {}", alert.timestamp);
1179
1180 Ok(())
1181 }
1182
1183 fn name(&self) -> &str {
1184 &self.name
1185 }
1186
1187 fn health_check(&self) -> SklResult<()> {
1188 Ok(())
1189 }
1190
1191 fn config(&self) -> HashMap<String, String> {
1192 self.config.clone()
1193 }
1194}
1195
1196impl Default for AlertConfig {
1197 fn default() -> Self {
1198 Self {
1199 max_history: 10000,
1200 default_cooldown: Duration::from_secs(300), enable_grouping: true,
1202 grouping_window: Duration::from_secs(60), max_alerts_per_group: 10,
1204 enable_escalation: false,
1205 escalation_levels: Vec::new(),
1206 }
1207 }
1208}
1209
1210#[allow(non_snake_case)]
1211#[cfg(test)]
1212mod tests {
1213 use super::*;
1214 use std::time::SystemTime;
1215
1216 #[test]
1217 fn test_automated_alerter_creation() {
1218 let config = AlertConfig::default();
1219 let alerter = AutomatedAlerter::new(config);
1220
1221 let stats = alerter.get_stats().unwrap();
1222 assert_eq!(stats.total_alerts, 0);
1223 assert_eq!(stats.active_alert_count, 0);
1224 }
1225
1226 #[test]
1227 fn test_alert_rule_management() {
1228 let config = AlertConfig::default();
1229 let alerter = AutomatedAlerter::new(config);
1230
1231 let rule = AlertRule {
1232 id: "test_rule".to_string(),
1233 name: "Test Rule".to_string(),
1234 description: "Test alert rule".to_string(),
1235 condition: AlertCondition::Threshold {
1236 metric: "test_metric".to_string(),
1237 operator: ThresholdOperator::GreaterThan,
1238 value: 100.0,
1239 window: Duration::from_secs(60),
1240 min_points: 1,
1241 },
1242 severity: AlertSeverity::Warning,
1243 channels: vec!["console".to_string()],
1244 cooldown: Duration::from_secs(300),
1245 enabled: true,
1246 labels: HashMap::new(),
1247 priority: 1,
1248 };
1249
1250 alerter.add_rule(rule).unwrap();
1251
1252 let rules = alerter.rules.read().unwrap();
1253 assert!(rules.contains_key("test_rule"));
1254 }
1255
1256 #[test]
1257 fn test_alert_channel_management() {
1258 let config = AlertConfig::default();
1259 let alerter = AutomatedAlerter::new(config);
1260
1261 let channel = Box::new(ConsoleAlertChannel::new("test_console"));
1262 alerter.add_channel("console", channel).unwrap();
1263
1264 let channels = alerter.channels.read().unwrap();
1265 assert!(channels.contains_key("console"));
1266 }
1267
1268 #[test]
1269 fn test_console_alert_channel() {
1270 let channel = ConsoleAlertChannel::new("test");
1271
1272 let alert = AlertEvent {
1273 id: "test_alert".to_string(),
1274 rule_id: "test_rule".to_string(),
1275 severity: AlertSeverity::Warning,
1276 message: "Test alert message".to_string(),
1277 timestamp: Utc::now(),
1278 pipeline: Some("test_pipeline".to_string()),
1279 stage: Some("test_stage".to_string()),
1280 labels: HashMap::new(),
1281 metadata: HashMap::new(),
1282 trigger_value: Some(150.0),
1283 status: AlertStatus::Firing,
1284 };
1285
1286 assert!(channel.send_alert(&alert).is_ok());
1287 assert!(channel.health_check().is_ok());
1288 assert_eq!(channel.name(), "test");
1289 }
1290
1291 #[test]
1292 fn test_threshold_condition_evaluation() {
1293 let config = AlertConfig::default();
1294 let alerter = AutomatedAlerter::new(config);
1295
1296 let condition = AlertCondition::Threshold {
1297 metric: "cpu_usage".to_string(),
1298 operator: ThresholdOperator::GreaterThan,
1299 value: 80.0,
1300 window: Duration::from_secs(60),
1301 min_points: 1,
1302 };
1303
1304 let metrics = vec![Metric {
1305 name: "cpu_usage".to_string(),
1306 value: 90.0,
1307 timestamp: SystemTime::now()
1308 .duration_since(UNIX_EPOCH)
1309 .unwrap()
1310 .as_secs(),
1311 pipeline_name: "test_pipeline".to_string(),
1312 stage_name: None,
1313 execution_id: None,
1314 metadata: HashMap::new(),
1315 }];
1316
1317 assert!(alerter.evaluate_condition(&condition, &metrics).unwrap());
1318 }
1319
1320 #[test]
1321 fn test_alert_silence() {
1322 let config = AlertConfig::default();
1323 let alerter = AutomatedAlerter::new(config);
1324
1325 let silence_id = alerter
1326 .create_silence(
1327 vec!["test_rule".to_string()],
1328 Duration::from_secs(3600),
1329 "Maintenance window".to_string(),
1330 "admin".to_string(),
1331 )
1332 .unwrap();
1333
1334 assert!(!silence_id.is_empty());
1335 assert!(alerter.is_silenced("test_rule").unwrap());
1336
1337 alerter.remove_silence(&silence_id).unwrap();
1338 assert!(!alerter.is_silenced("test_rule").unwrap());
1339 }
1340
1341 #[test]
1342 fn test_alert_acknowledgment() {
1343 let config = AlertConfig::default();
1344 let alerter = AutomatedAlerter::new(config);
1345
1346 let alert = AlertEvent {
1347 id: "test_alert".to_string(),
1348 rule_id: "test_rule".to_string(),
1349 severity: AlertSeverity::Warning,
1350 message: "Test alert".to_string(),
1351 timestamp: Utc::now(),
1352 pipeline: None,
1353 stage: None,
1354 labels: HashMap::new(),
1355 metadata: HashMap::new(),
1356 trigger_value: None,
1357 status: AlertStatus::Firing,
1358 };
1359
1360 let active_alert = ActiveAlert {
1362 event: alert.clone(),
1363 first_fired: alert.timestamp,
1364 last_updated: alert.timestamp,
1365 fire_count: 1,
1366 escalation_level: 0,
1367 channels_notified: Vec::new(),
1368 };
1369
1370 {
1371 let mut active_alerts = alerter.active_alerts.write().unwrap();
1372 active_alerts.insert(alert.id.clone(), active_alert);
1373 }
1374
1375 alerter.acknowledge_alert(&alert.id, "admin").unwrap();
1376
1377 let active_alerts = alerter.active_alerts.read().unwrap();
1378 let acknowledged_alert = active_alerts.get(&alert.id).unwrap();
1379 assert_eq!(acknowledged_alert.event.status, AlertStatus::Acknowledged);
1380 }
1381}