sklears_compose/
automated_alerting.rs

1//! Automated Alerting System for Pipeline Monitoring
2//!
3//! This module provides comprehensive alerting capabilities for pipeline monitoring,
4//! including real-time anomaly detection, threshold-based alerts, and integration
5//! with external notification systems.
6
7use crate::monitoring::{Anomaly, AnomalySeverity, Metric};
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use sklears_core::error::{Result as SklResult, SklearsError};
11use std::collections::{HashMap, VecDeque};
12use std::sync::{Arc, Mutex, RwLock};
13use std::time::{Duration, SystemTime, UNIX_EPOCH};
14
15/// Automated alerting system for pipeline monitoring
16pub struct AutomatedAlerter {
17    /// Alert configuration
18    config: AlertConfig,
19    /// Alert rules
20    rules: RwLock<HashMap<String, AlertRule>>,
21    /// Alert channels
22    channels: RwLock<HashMap<String, Box<dyn AlertChannel>>>,
23    /// Alert history
24    alert_history: Arc<Mutex<VecDeque<AlertEvent>>>,
25    /// Silenced alerts
26    silenced_alerts: RwLock<HashMap<String, SilencePeriod>>,
27    /// Active alerts
28    active_alerts: RwLock<HashMap<String, ActiveAlert>>,
29    /// Statistics
30    stats: RwLock<AlertStats>,
31}
32
33/// Alert system configuration
34#[derive(Debug, Clone)]
35pub struct AlertConfig {
36    /// Maximum alert history to retain
37    pub max_history: usize,
38    /// Default alert cooldown period
39    pub default_cooldown: Duration,
40    /// Enable alert grouping
41    pub enable_grouping: bool,
42    /// Alert grouping window
43    pub grouping_window: Duration,
44    /// Maximum alerts per grouping window
45    pub max_alerts_per_group: usize,
46    /// Enable alert escalation
47    pub enable_escalation: bool,
48    /// Escalation levels
49    pub escalation_levels: Vec<EscalationLevel>,
50}
51
52/// Alert escalation level
53#[derive(Debug, Clone)]
54pub struct EscalationLevel {
55    /// Escalation trigger time
56    pub trigger_after: Duration,
57    /// Additional channels to notify
58    pub channels: Vec<String>,
59    /// Escalation severity threshold
60    pub severity_threshold: AlertSeverity,
61}
62
63/// Alert rule definition
64#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct AlertRule {
66    /// Rule ID
67    pub id: String,
68    /// Rule name
69    pub name: String,
70    /// Rule description
71    pub description: String,
72    /// Rule condition
73    pub condition: AlertCondition,
74    /// Alert severity
75    pub severity: AlertSeverity,
76    /// Target channels
77    pub channels: Vec<String>,
78    /// Cooldown period between alerts
79    pub cooldown: Duration,
80    /// Rule enabled status
81    pub enabled: bool,
82    /// Alert labels
83    pub labels: HashMap<String, String>,
84    /// Rule priority
85    pub priority: u32,
86}
87
88/// Alert condition types
89#[derive(Debug, Clone, Serialize, Deserialize)]
90#[serde(tag = "type")]
91pub enum AlertCondition {
92    /// Threshold-based condition
93    Threshold {
94        /// Metric name to monitor
95        metric: String,
96        /// Threshold operator
97        operator: ThresholdOperator,
98        /// Threshold value
99        value: f64,
100        /// Time window for evaluation
101        window: Duration,
102        /// Minimum data points required
103        min_points: usize,
104    },
105    /// Rate-based condition
106    Rate {
107        /// Metric name to monitor
108        metric: String,
109        /// Rate threshold (per second)
110        rate_threshold: f64,
111        /// Time window for rate calculation
112        window: Duration,
113    },
114    /// Anomaly detection condition
115    Anomaly {
116        /// Metric name to monitor
117        metric: String,
118        /// Anomaly detection sensitivity
119        sensitivity: f64,
120        /// Training window size
121        training_window: Duration,
122    },
123    /// Composite condition
124    Composite {
125        /// Logical operator
126        operator: LogicalOperator,
127        /// Sub-conditions
128        conditions: Vec<AlertCondition>,
129    },
130    /// Pattern-based condition
131    Pattern {
132        /// Pattern to match
133        pattern: String,
134        /// Field to search in
135        field: PatternField,
136        /// Case sensitive matching
137        case_sensitive: bool,
138    },
139}
140
141/// Threshold operators
142#[derive(Debug, Clone, Serialize, Deserialize)]
143pub enum ThresholdOperator {
144    /// Greater than
145    GreaterThan,
146    /// Greater than or equal
147    GreaterThanOrEqual,
148    /// Less than
149    LessThan,
150    /// Less than or equal
151    LessThanOrEqual,
152    /// Equal to
153    Equal,
154    /// Not equal to
155    NotEqual,
156}
157
158/// Logical operators for composite conditions
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub enum LogicalOperator {
161    /// All conditions must be true
162    And,
163    /// Any condition must be true
164    Or,
165    /// Condition must not be true
166    Not,
167}
168
169/// Pattern field types
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub enum PatternField {
172    /// Pipeline name
173    PipelineName,
174    /// Stage name
175    StageName,
176    /// Error message
177    ErrorMessage,
178    /// Log message
179    LogMessage,
180    /// Custom field
181    Custom(String),
182}
183
184/// Alert severity levels
185#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
186pub enum AlertSeverity {
187    /// Informational alert
188    Info,
189    /// Warning alert
190    Warning,
191    /// Critical alert
192    Critical,
193    /// Emergency alert
194    Emergency,
195}
196
197impl std::fmt::Display for AlertSeverity {
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        match self {
200            AlertSeverity::Info => write!(f, "INFO"),
201            AlertSeverity::Warning => write!(f, "WARNING"),
202            AlertSeverity::Critical => write!(f, "CRITICAL"),
203            AlertSeverity::Emergency => write!(f, "EMERGENCY"),
204        }
205    }
206}
207
208/// Alert event
209#[derive(Debug, Clone, Serialize, Deserialize)]
210pub struct AlertEvent {
211    /// Alert ID
212    pub id: String,
213    /// Rule ID that triggered this alert
214    pub rule_id: String,
215    /// Alert severity
216    pub severity: AlertSeverity,
217    /// Alert message
218    pub message: String,
219    /// Alert timestamp
220    pub timestamp: DateTime<Utc>,
221    /// Associated pipeline
222    pub pipeline: Option<String>,
223    /// Associated stage
224    pub stage: Option<String>,
225    /// Alert labels
226    pub labels: HashMap<String, String>,
227    /// Alert metadata
228    pub metadata: HashMap<String, String>,
229    /// Metric value that triggered the alert
230    pub trigger_value: Option<f64>,
231    /// Alert status
232    pub status: AlertStatus,
233}
234
235/// Alert status
236#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
237pub enum AlertStatus {
238    /// Alert is firing
239    Firing,
240    /// Alert has been acknowledged
241    Acknowledged,
242    /// Alert has been resolved
243    Resolved,
244    /// Alert has been silenced
245    Silenced,
246}
247
248/// Active alert tracking
249#[derive(Debug, Clone)]
250pub struct ActiveAlert {
251    /// Original alert event
252    pub event: AlertEvent,
253    /// First firing time
254    pub first_fired: DateTime<Utc>,
255    /// Last update time
256    pub last_updated: DateTime<Utc>,
257    /// Fire count
258    pub fire_count: u32,
259    /// Escalation level
260    pub escalation_level: usize,
261    /// Channels notified
262    pub channels_notified: Vec<String>,
263}
264
265/// Silence period for alerts
266#[derive(Debug, Clone)]
267pub struct SilencePeriod {
268    /// Silence start time
269    pub start: DateTime<Utc>,
270    /// Silence end time
271    pub end: DateTime<Utc>,
272    /// Silence reason
273    pub reason: String,
274    /// User who created the silence
275    pub created_by: String,
276    /// Silenced rule patterns
277    pub rule_patterns: Vec<String>,
278}
279
280/// Alert statistics
281#[derive(Debug, Clone, Default)]
282pub struct AlertStats {
283    /// Total alerts fired
284    pub total_alerts: u64,
285    /// Alerts by severity
286    pub alerts_by_severity: HashMap<AlertSeverity, u64>,
287    /// Alerts by rule
288    pub alerts_by_rule: HashMap<String, u64>,
289    /// False positive rate
290    pub false_positive_rate: f64,
291    /// Average resolution time
292    pub avg_resolution_time: Duration,
293    /// Current active alerts
294    pub active_alert_count: usize,
295}
296
297/// Alert channel trait for different notification methods
298pub trait AlertChannel: Send + Sync + std::fmt::Debug {
299    /// Send an alert
300    fn send_alert(&self, alert: &AlertEvent) -> SklResult<()>;
301
302    /// Get channel name
303    fn name(&self) -> &str;
304
305    /// Check if channel is healthy
306    fn health_check(&self) -> SklResult<()>;
307
308    /// Get channel configuration
309    fn config(&self) -> HashMap<String, String>;
310}
311
312/// Console alert channel for debugging
313#[derive(Debug)]
314pub struct ConsoleAlertChannel {
315    /// Channel name
316    name: String,
317    /// Configuration
318    config: HashMap<String, String>,
319}
320
321/// Email alert channel
322#[derive(Debug)]
323pub struct EmailAlertChannel {
324    /// Channel name
325    name: String,
326    /// SMTP server configuration
327    smtp_server: String,
328    /// SMTP port
329    smtp_port: u16,
330    /// Sender email
331    from_email: String,
332    /// Recipient emails
333    to_emails: Vec<String>,
334    /// Configuration
335    config: HashMap<String, String>,
336}
337
338/// Webhook alert channel
339#[derive(Debug)]
340pub struct WebhookAlertChannel {
341    /// Channel name
342    name: String,
343    /// Webhook URL
344    url: String,
345    /// HTTP headers
346    headers: HashMap<String, String>,
347    /// Request timeout
348    timeout: Duration,
349    /// Configuration
350    config: HashMap<String, String>,
351}
352
353/// Slack alert channel
354#[derive(Debug)]
355pub struct SlackAlertChannel {
356    /// Channel name
357    name: String,
358    /// Slack webhook URL
359    webhook_url: String,
360    /// Slack channel
361    channel: String,
362    /// Bot username
363    username: String,
364    /// Configuration
365    config: HashMap<String, String>,
366}
367
368impl AutomatedAlerter {
369    /// Create a new automated alerter
370    #[must_use]
371    pub fn new(config: AlertConfig) -> Self {
372        Self {
373            config,
374            rules: RwLock::new(HashMap::new()),
375            channels: RwLock::new(HashMap::new()),
376            alert_history: Arc::new(Mutex::new(VecDeque::new())),
377            silenced_alerts: RwLock::new(HashMap::new()),
378            active_alerts: RwLock::new(HashMap::new()),
379            stats: RwLock::new(AlertStats::default()),
380        }
381    }
382
383    /// Add an alert rule
384    pub fn add_rule(&self, rule: AlertRule) -> SklResult<()> {
385        let mut rules = self.rules.write().map_err(|_| {
386            SklearsError::InvalidOperation("Failed to acquire write lock for rules".to_string())
387        })?;
388
389        rules.insert(rule.id.clone(), rule);
390        Ok(())
391    }
392
393    /// Remove an alert rule
394    pub fn remove_rule(&self, rule_id: &str) -> SklResult<()> {
395        let mut rules = self.rules.write().map_err(|_| {
396            SklearsError::InvalidOperation("Failed to acquire write lock for rules".to_string())
397        })?;
398
399        rules.remove(rule_id);
400        Ok(())
401    }
402
403    /// Add an alert channel
404    pub fn add_channel(&self, name: &str, channel: Box<dyn AlertChannel>) -> SklResult<()> {
405        let mut channels = self.channels.write().map_err(|_| {
406            SklearsError::InvalidOperation("Failed to acquire write lock for channels".to_string())
407        })?;
408
409        channels.insert(name.to_string(), channel);
410        Ok(())
411    }
412
413    /// Remove an alert channel
414    pub fn remove_channel(&self, name: &str) -> SklResult<()> {
415        let mut channels = self.channels.write().map_err(|_| {
416            SklearsError::InvalidOperation("Failed to acquire write lock for channels".to_string())
417        })?;
418
419        channels.remove(name);
420        Ok(())
421    }
422
423    /// Process metrics and check for alerts
424    pub fn process_metrics(&self, metrics: &[Metric]) -> SklResult<Vec<AlertEvent>> {
425        let mut triggered_alerts = Vec::new();
426
427        let rules = self.rules.read().map_err(|_| {
428            SklearsError::InvalidOperation("Failed to acquire read lock for rules".to_string())
429        })?;
430
431        for (rule_id, rule) in rules.iter() {
432            if !rule.enabled {
433                continue;
434            }
435
436            // Check if rule is silenced
437            if self.is_silenced(rule_id)? {
438                continue;
439            }
440
441            // Check if rule is in cooldown
442            if self.is_in_cooldown(rule_id)? {
443                continue;
444            }
445
446            // Evaluate rule condition
447            if self.evaluate_condition(&rule.condition, metrics)? {
448                let alert = self.create_alert_event(rule, metrics)?;
449                triggered_alerts.push(alert);
450            }
451        }
452
453        // Process triggered alerts
454        for alert in &triggered_alerts {
455            self.handle_alert(alert.clone())?;
456        }
457
458        Ok(triggered_alerts)
459    }
460
461    /// Process anomalies from monitoring system
462    pub fn process_anomalies(&self, anomalies: &[Anomaly]) -> SklResult<Vec<AlertEvent>> {
463        let mut triggered_alerts = Vec::new();
464
465        for anomaly in anomalies {
466            // Create alert based on anomaly severity
467            let alert_severity = match anomaly.severity {
468                AnomalySeverity::Low => AlertSeverity::Info,
469                AnomalySeverity::Medium => AlertSeverity::Warning,
470                AnomalySeverity::High => AlertSeverity::Critical,
471                AnomalySeverity::Critical => AlertSeverity::Emergency,
472            };
473
474            let alert_event = AlertEvent {
475                id: uuid::Uuid::new_v4().to_string(),
476                rule_id: "anomaly_detection".to_string(),
477                severity: alert_severity,
478                message: format!("Anomaly detected: {}", anomaly.description),
479                timestamp: Utc::now(),
480                pipeline: Some(anomaly.pipeline_name.clone()),
481                stage: None,
482                labels: {
483                    let mut labels = HashMap::new();
484                    labels.insert("type".to_string(), "anomaly".to_string());
485                    labels.insert("metric".to_string(), anomaly.metric_name.clone());
486                    labels
487                },
488                metadata: HashMap::new(),
489                trigger_value: None,
490                status: AlertStatus::Firing,
491            };
492
493            triggered_alerts.push(alert_event);
494        }
495
496        // Process triggered alerts
497        for alert in &triggered_alerts {
498            self.handle_alert(alert.clone())?;
499        }
500
501        Ok(triggered_alerts)
502    }
503
504    /// Create silence period
505    pub fn create_silence(
506        &self,
507        rule_patterns: Vec<String>,
508        duration: Duration,
509        reason: String,
510        created_by: String,
511    ) -> SklResult<String> {
512        let silence_id = uuid::Uuid::new_v4().to_string();
513        let now = Utc::now();
514
515        let silence = SilencePeriod {
516            start: now,
517            end: now
518                + chrono::Duration::from_std(duration).map_err(|_| {
519                    SklearsError::InvalidInput("Invalid duration for silence period".to_string())
520                })?,
521            reason,
522            created_by,
523            rule_patterns,
524        };
525
526        let mut silenced = self.silenced_alerts.write().map_err(|_| {
527            SklearsError::InvalidOperation(
528                "Failed to acquire write lock for silenced alerts".to_string(),
529            )
530        })?;
531
532        silenced.insert(silence_id.clone(), silence);
533        Ok(silence_id)
534    }
535
536    /// Remove silence period
537    pub fn remove_silence(&self, silence_id: &str) -> SklResult<()> {
538        let mut silenced = self.silenced_alerts.write().map_err(|_| {
539            SklearsError::InvalidOperation(
540                "Failed to acquire write lock for silenced alerts".to_string(),
541            )
542        })?;
543
544        silenced.remove(silence_id);
545        Ok(())
546    }
547
548    /// Acknowledge an alert
549    pub fn acknowledge_alert(&self, alert_id: &str, acknowledged_by: &str) -> SklResult<()> {
550        let mut active_alerts = self.active_alerts.write().map_err(|_| {
551            SklearsError::InvalidOperation(
552                "Failed to acquire write lock for active alerts".to_string(),
553            )
554        })?;
555
556        if let Some(active_alert) = active_alerts.get_mut(alert_id) {
557            active_alert.event.status = AlertStatus::Acknowledged;
558            active_alert
559                .event
560                .metadata
561                .insert("acknowledged_by".to_string(), acknowledged_by.to_string());
562            active_alert
563                .event
564                .metadata
565                .insert("acknowledged_at".to_string(), Utc::now().to_rfc3339());
566            active_alert.last_updated = Utc::now();
567        }
568
569        Ok(())
570    }
571
572    /// Resolve an alert
573    pub fn resolve_alert(&self, alert_id: &str, resolved_by: &str) -> SklResult<()> {
574        let mut active_alerts = self.active_alerts.write().map_err(|_| {
575            SklearsError::InvalidOperation(
576                "Failed to acquire write lock for active alerts".to_string(),
577            )
578        })?;
579
580        if let Some(mut active_alert) = active_alerts.remove(alert_id) {
581            active_alert.event.status = AlertStatus::Resolved;
582            active_alert
583                .event
584                .metadata
585                .insert("resolved_by".to_string(), resolved_by.to_string());
586            active_alert
587                .event
588                .metadata
589                .insert("resolved_at".to_string(), Utc::now().to_rfc3339());
590
591            // Add to history (clone the event to avoid borrow issues)
592            self.add_to_history(active_alert.event.clone())?;
593
594            // Update statistics
595            self.update_resolution_stats(&active_alert)?;
596        }
597
598        Ok(())
599    }
600
601    /// Get alert statistics
602    pub fn get_stats(&self) -> SklResult<AlertStats> {
603        let stats = self.stats.read().map_err(|_| {
604            SklearsError::InvalidOperation("Failed to acquire read lock for stats".to_string())
605        })?;
606
607        Ok(stats.clone())
608    }
609
610    /// Get active alerts
611    pub fn get_active_alerts(&self) -> SklResult<Vec<ActiveAlert>> {
612        let active_alerts = self.active_alerts.read().map_err(|_| {
613            SklearsError::InvalidOperation(
614                "Failed to acquire read lock for active alerts".to_string(),
615            )
616        })?;
617
618        Ok(active_alerts.values().cloned().collect())
619    }
620
621    /// Get alert history
622    pub fn get_alert_history(&self, limit: Option<usize>) -> SklResult<Vec<AlertEvent>> {
623        let history = self.alert_history.lock().map_err(|_| {
624            SklearsError::InvalidOperation("Failed to acquire lock for alert history".to_string())
625        })?;
626
627        let alerts: Vec<AlertEvent> = history.iter().cloned().collect();
628
629        if let Some(limit) = limit {
630            Ok(alerts.into_iter().take(limit).collect())
631        } else {
632            Ok(alerts)
633        }
634    }
635
636    /// Evaluate alert condition
637    fn evaluate_condition(
638        &self,
639        condition: &AlertCondition,
640        metrics: &[Metric],
641    ) -> SklResult<bool> {
642        match condition {
643            AlertCondition::Threshold {
644                metric,
645                operator,
646                value,
647                window,
648                min_points,
649            } => self.evaluate_threshold_condition(
650                metric,
651                operator,
652                *value,
653                *window,
654                *min_points,
655                metrics,
656            ),
657            AlertCondition::Rate {
658                metric,
659                rate_threshold,
660                window,
661            } => self.evaluate_rate_condition(metric, *rate_threshold, *window, metrics),
662            AlertCondition::Anomaly {
663                metric,
664                sensitivity,
665                training_window,
666            } => self.evaluate_anomaly_condition(metric, *sensitivity, *training_window, metrics),
667            AlertCondition::Composite {
668                operator,
669                conditions,
670            } => self.evaluate_composite_condition(operator, conditions, metrics),
671            AlertCondition::Pattern {
672                pattern,
673                field,
674                case_sensitive,
675            } => self.evaluate_pattern_condition(pattern, field, *case_sensitive, metrics),
676        }
677    }
678
679    /// Evaluate threshold condition
680    fn evaluate_threshold_condition(
681        &self,
682        metric_name: &str,
683        operator: &ThresholdOperator,
684        threshold: f64,
685        window: Duration,
686        min_points: usize,
687        metrics: &[Metric],
688    ) -> SklResult<bool> {
689        let cutoff_time = SystemTime::now()
690            .duration_since(UNIX_EPOCH)
691            .unwrap()
692            .as_secs()
693            - window.as_secs();
694
695        let relevant_metrics: Vec<&Metric> = metrics
696            .iter()
697            .filter(|m| m.name == metric_name && m.timestamp >= cutoff_time)
698            .collect();
699
700        if relevant_metrics.len() < min_points {
701            return Ok(false);
702        }
703
704        // Use the latest value for threshold comparison
705        if let Some(latest_metric) = relevant_metrics.last() {
706            Ok(self.compare_value(latest_metric.value, operator, threshold))
707        } else {
708            Ok(false)
709        }
710    }
711
712    /// Evaluate rate condition
713    fn evaluate_rate_condition(
714        &self,
715        metric_name: &str,
716        rate_threshold: f64,
717        window: Duration,
718        metrics: &[Metric],
719    ) -> SklResult<bool> {
720        let cutoff_time = SystemTime::now()
721            .duration_since(UNIX_EPOCH)
722            .unwrap()
723            .as_secs()
724            - window.as_secs();
725
726        let relevant_metrics: Vec<&Metric> = metrics
727            .iter()
728            .filter(|m| m.name == metric_name && m.timestamp >= cutoff_time)
729            .collect();
730
731        if relevant_metrics.len() < 2 {
732            return Ok(false);
733        }
734
735        // Calculate rate of change
736        let first = relevant_metrics.first().unwrap();
737        let last = relevant_metrics.last().unwrap();
738
739        let time_diff = last.timestamp - first.timestamp;
740        if time_diff == 0 {
741            return Ok(false);
742        }
743
744        let rate = (last.value - first.value) / time_diff as f64;
745        Ok(rate > rate_threshold)
746    }
747
748    /// Evaluate anomaly condition
749    fn evaluate_anomaly_condition(
750        &self,
751        metric_name: &str,
752        sensitivity: f64,
753        training_window: Duration,
754        metrics: &[Metric],
755    ) -> SklResult<bool> {
756        let cutoff_time = SystemTime::now()
757            .duration_since(UNIX_EPOCH)
758            .unwrap()
759            .as_secs()
760            - training_window.as_secs();
761
762        let relevant_metrics: Vec<&Metric> = metrics
763            .iter()
764            .filter(|m| m.name == metric_name && m.timestamp >= cutoff_time)
765            .collect();
766
767        if relevant_metrics.len() < 10 {
768            return Ok(false);
769        }
770
771        // Simple anomaly detection using standard deviation
772        let values: Vec<f64> = relevant_metrics.iter().map(|m| m.value).collect();
773        let mean = values.iter().sum::<f64>() / values.len() as f64;
774        let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
775        let std_dev = variance.sqrt();
776
777        if let Some(latest_metric) = relevant_metrics.last() {
778            let z_score = (latest_metric.value - mean) / std_dev;
779            Ok(z_score.abs() > sensitivity)
780        } else {
781            Ok(false)
782        }
783    }
784
785    /// Evaluate composite condition
786    fn evaluate_composite_condition(
787        &self,
788        operator: &LogicalOperator,
789        conditions: &[AlertCondition],
790        metrics: &[Metric],
791    ) -> SklResult<bool> {
792        match operator {
793            LogicalOperator::And => {
794                for condition in conditions {
795                    if !self.evaluate_condition(condition, metrics)? {
796                        return Ok(false);
797                    }
798                }
799                Ok(true)
800            }
801            LogicalOperator::Or => {
802                for condition in conditions {
803                    if self.evaluate_condition(condition, metrics)? {
804                        return Ok(true);
805                    }
806                }
807                Ok(false)
808            }
809            LogicalOperator::Not => {
810                if conditions.len() != 1 {
811                    return Err(SklearsError::InvalidInput(
812                        "NOT operator requires exactly one condition".to_string(),
813                    ));
814                }
815                Ok(!self.evaluate_condition(&conditions[0], metrics)?)
816            }
817        }
818    }
819
820    /// Evaluate pattern condition
821    fn evaluate_pattern_condition(
822        &self,
823        pattern: &str,
824        field: &PatternField,
825        case_sensitive: bool,
826        metrics: &[Metric],
827    ) -> SklResult<bool> {
828        for metric in metrics {
829            let text_to_search = match field {
830                PatternField::PipelineName => &metric.pipeline_name,
831                PatternField::StageName => {
832                    if let Some(ref stage_name) = metric.stage_name {
833                        stage_name
834                    } else {
835                        continue;
836                    }
837                }
838                PatternField::ErrorMessage => {
839                    // Would need error message field in Metric
840                    continue;
841                }
842                PatternField::LogMessage => {
843                    // Would need log message field in Metric
844                    continue;
845                }
846                PatternField::Custom(field_name) => {
847                    if let Some(value) = metric.metadata.get(field_name) {
848                        value
849                    } else {
850                        continue;
851                    }
852                }
853            };
854
855            let matches = if case_sensitive {
856                text_to_search.contains(pattern)
857            } else {
858                text_to_search
859                    .to_lowercase()
860                    .contains(&pattern.to_lowercase())
861            };
862
863            if matches {
864                return Ok(true);
865            }
866        }
867
868        Ok(false)
869    }
870
871    /// Compare value using threshold operator
872    fn compare_value(&self, value: f64, operator: &ThresholdOperator, threshold: f64) -> bool {
873        match operator {
874            ThresholdOperator::GreaterThan => value > threshold,
875            ThresholdOperator::GreaterThanOrEqual => value >= threshold,
876            ThresholdOperator::LessThan => value < threshold,
877            ThresholdOperator::LessThanOrEqual => value <= threshold,
878            ThresholdOperator::Equal => (value - threshold).abs() < f64::EPSILON,
879            ThresholdOperator::NotEqual => (value - threshold).abs() >= f64::EPSILON,
880        }
881    }
882
883    /// Create alert event from rule and metrics
884    fn create_alert_event(&self, rule: &AlertRule, metrics: &[Metric]) -> SklResult<AlertEvent> {
885        let trigger_value = metrics
886            .iter()
887            .find(|m| self.metric_matches_rule(m, rule))
888            .map(|m| m.value);
889
890        let pipeline = metrics
891            .iter()
892            .find(|m| self.metric_matches_rule(m, rule))
893            .map(|m| m.pipeline_name.clone());
894
895        let stage = metrics
896            .iter()
897            .find(|m| self.metric_matches_rule(m, rule))
898            .and_then(|m| m.stage_name.clone());
899
900        Ok(AlertEvent {
901            id: uuid::Uuid::new_v4().to_string(),
902            rule_id: rule.id.clone(),
903            severity: rule.severity.clone(),
904            message: format!("Alert triggered: {}", rule.description),
905            timestamp: Utc::now(),
906            pipeline,
907            stage,
908            labels: rule.labels.clone(),
909            metadata: HashMap::new(),
910            trigger_value,
911            status: AlertStatus::Firing,
912        })
913    }
914
915    /// Check if metric matches rule conditions
916    fn metric_matches_rule(&self, _metric: &Metric, _rule: &AlertRule) -> bool {
917        // Simplified implementation - would need more sophisticated matching
918        true
919    }
920
921    /// Handle triggered alert
922    fn handle_alert(&self, alert: AlertEvent) -> SklResult<()> {
923        // Check if alert should be grouped
924        if self.config.enable_grouping {
925            if let Some(existing_alert_id) = self.find_groupable_alert(&alert)? {
926                self.update_grouped_alert(existing_alert_id, &alert)?;
927                return Ok(());
928            }
929        }
930
931        // Add to active alerts
932        let active_alert = ActiveAlert {
933            event: alert.clone(),
934            first_fired: alert.timestamp,
935            last_updated: alert.timestamp,
936            fire_count: 1,
937            escalation_level: 0,
938            channels_notified: Vec::new(),
939        };
940
941        {
942            let mut active_alerts = self.active_alerts.write().map_err(|_| {
943                SklearsError::InvalidOperation(
944                    "Failed to acquire write lock for active alerts".to_string(),
945                )
946            })?;
947            active_alerts.insert(alert.id.clone(), active_alert);
948        }
949
950        // Send notifications
951        self.send_alert_notifications(&alert)?;
952
953        // Update statistics
954        self.update_stats(&alert)?;
955
956        Ok(())
957    }
958
959    /// Send alert notifications to configured channels
960    fn send_alert_notifications(&self, alert: &AlertEvent) -> SklResult<()> {
961        let rules = self.rules.read().map_err(|_| {
962            SklearsError::InvalidOperation("Failed to acquire read lock for rules".to_string())
963        })?;
964
965        let channels = self.channels.read().map_err(|_| {
966            SklearsError::InvalidOperation("Failed to acquire read lock for channels".to_string())
967        })?;
968
969        if let Some(rule) = rules.get(&alert.rule_id) {
970            for channel_name in &rule.channels {
971                if let Some(channel) = channels.get(channel_name) {
972                    if let Err(e) = channel.send_alert(alert) {
973                        eprintln!("Failed to send alert to channel {channel_name}: {e:?}");
974                    }
975                }
976            }
977        }
978
979        Ok(())
980    }
981
982    /// Check if rule is currently silenced
983    fn is_silenced(&self, rule_id: &str) -> SklResult<bool> {
984        let silenced = self.silenced_alerts.read().map_err(|_| {
985            SklearsError::InvalidOperation(
986                "Failed to acquire read lock for silenced alerts".to_string(),
987            )
988        })?;
989
990        let now = Utc::now();
991
992        for silence in silenced.values() {
993            if now >= silence.start && now <= silence.end {
994                for pattern in &silence.rule_patterns {
995                    if rule_id.contains(pattern) {
996                        return Ok(true);
997                    }
998                }
999            }
1000        }
1001
1002        Ok(false)
1003    }
1004
1005    /// Check if rule is in cooldown period
1006    fn is_in_cooldown(&self, rule_id: &str) -> SklResult<bool> {
1007        let active_alerts = self.active_alerts.read().map_err(|_| {
1008            SklearsError::InvalidOperation(
1009                "Failed to acquire read lock for active alerts".to_string(),
1010            )
1011        })?;
1012
1013        let rules = self.rules.read().map_err(|_| {
1014            SklearsError::InvalidOperation("Failed to acquire read lock for rules".to_string())
1015        })?;
1016
1017        if let Some(rule) = rules.get(rule_id) {
1018            let now = Utc::now();
1019
1020            for active_alert in active_alerts.values() {
1021                if active_alert.event.rule_id == rule_id {
1022                    let cooldown_end = active_alert.last_updated
1023                        + chrono::Duration::from_std(rule.cooldown).unwrap_or_default();
1024
1025                    if now < cooldown_end {
1026                        return Ok(true);
1027                    }
1028                }
1029            }
1030        }
1031
1032        Ok(false)
1033    }
1034
1035    /// Find groupable alert
1036    fn find_groupable_alert(&self, alert: &AlertEvent) -> SklResult<Option<String>> {
1037        let active_alerts = self.active_alerts.read().map_err(|_| {
1038            SklearsError::InvalidOperation(
1039                "Failed to acquire read lock for active alerts".to_string(),
1040            )
1041        })?;
1042
1043        let now = alert.timestamp;
1044
1045        for (alert_id, active_alert) in active_alerts.iter() {
1046            let time_diff = now - active_alert.first_fired;
1047            let window_duration =
1048                chrono::Duration::from_std(self.config.grouping_window).unwrap_or_default();
1049
1050            if time_diff <= window_duration
1051                && active_alert.event.rule_id == alert.rule_id
1052                && active_alert.event.pipeline == alert.pipeline
1053            {
1054                return Ok(Some(alert_id.clone()));
1055            }
1056        }
1057
1058        Ok(None)
1059    }
1060
1061    /// Update grouped alert
1062    fn update_grouped_alert(&self, alert_id: String, new_alert: &AlertEvent) -> SklResult<()> {
1063        let mut active_alerts = self.active_alerts.write().map_err(|_| {
1064            SklearsError::InvalidOperation(
1065                "Failed to acquire write lock for active alerts".to_string(),
1066            )
1067        })?;
1068
1069        if let Some(active_alert) = active_alerts.get_mut(&alert_id) {
1070            active_alert.fire_count += 1;
1071            active_alert.last_updated = new_alert.timestamp;
1072
1073            // Update severity if higher
1074            if new_alert.severity > active_alert.event.severity {
1075                active_alert.event.severity = new_alert.severity.clone();
1076            }
1077        }
1078
1079        Ok(())
1080    }
1081
1082    /// Add alert to history
1083    fn add_to_history(&self, alert: AlertEvent) -> SklResult<()> {
1084        let mut history = self.alert_history.lock().map_err(|_| {
1085            SklearsError::InvalidOperation("Failed to acquire lock for alert history".to_string())
1086        })?;
1087
1088        history.push_back(alert);
1089
1090        // Maintain history size limit
1091        while history.len() > self.config.max_history {
1092            history.pop_front();
1093        }
1094
1095        Ok(())
1096    }
1097
1098    /// Update alert statistics
1099    fn update_stats(&self, alert: &AlertEvent) -> SklResult<()> {
1100        let mut stats = self.stats.write().map_err(|_| {
1101            SklearsError::InvalidOperation("Failed to acquire write lock for stats".to_string())
1102        })?;
1103
1104        stats.total_alerts += 1;
1105        *stats
1106            .alerts_by_severity
1107            .entry(alert.severity.clone())
1108            .or_insert(0) += 1;
1109        *stats
1110            .alerts_by_rule
1111            .entry(alert.rule_id.clone())
1112            .or_insert(0) += 1;
1113
1114        let active_alerts = self.active_alerts.read().map_err(|_| {
1115            SklearsError::InvalidOperation(
1116                "Failed to acquire read lock for active alerts".to_string(),
1117            )
1118        })?;
1119        stats.active_alert_count = active_alerts.len();
1120
1121        Ok(())
1122    }
1123
1124    /// Update resolution statistics
1125    fn update_resolution_stats(&self, active_alert: &ActiveAlert) -> SklResult<()> {
1126        let mut stats = self.stats.write().map_err(|_| {
1127            SklearsError::InvalidOperation("Failed to acquire write lock for stats".to_string())
1128        })?;
1129
1130        let resolution_time = Utc::now() - active_alert.first_fired;
1131        let resolution_duration = Duration::from_millis(resolution_time.num_milliseconds() as u64);
1132
1133        // Update average resolution time (simplified)
1134        stats.avg_resolution_time = resolution_duration;
1135
1136        let active_alerts = self.active_alerts.read().map_err(|_| {
1137            SklearsError::InvalidOperation(
1138                "Failed to acquire read lock for active alerts".to_string(),
1139            )
1140        })?;
1141        stats.active_alert_count = active_alerts.len();
1142
1143        Ok(())
1144    }
1145}
1146
1147/// Implementation for console alert channel
1148impl ConsoleAlertChannel {
1149    /// Create new console alert channel
1150    #[must_use]
1151    pub fn new(name: &str) -> Self {
1152        Self {
1153            name: name.to_string(),
1154            config: HashMap::new(),
1155        }
1156    }
1157}
1158
1159impl AlertChannel for ConsoleAlertChannel {
1160    fn send_alert(&self, alert: &AlertEvent) -> SklResult<()> {
1161        println!(
1162            "🚨 ALERT [{}] {}: {}",
1163            alert.severity, alert.rule_id, alert.message
1164        );
1165
1166        if let Some(pipeline) = &alert.pipeline {
1167            println!("   Pipeline: {pipeline}");
1168        }
1169
1170        if let Some(stage) = &alert.stage {
1171            println!("   Stage: {stage}");
1172        }
1173
1174        if let Some(value) = alert.trigger_value {
1175            println!("   Trigger Value: {value}");
1176        }
1177
1178        println!("   Timestamp: {}", alert.timestamp);
1179
1180        Ok(())
1181    }
1182
1183    fn name(&self) -> &str {
1184        &self.name
1185    }
1186
1187    fn health_check(&self) -> SklResult<()> {
1188        Ok(())
1189    }
1190
1191    fn config(&self) -> HashMap<String, String> {
1192        self.config.clone()
1193    }
1194}
1195
1196impl Default for AlertConfig {
1197    fn default() -> Self {
1198        Self {
1199            max_history: 10000,
1200            default_cooldown: Duration::from_secs(300), // 5 minutes
1201            enable_grouping: true,
1202            grouping_window: Duration::from_secs(60), // 1 minute
1203            max_alerts_per_group: 10,
1204            enable_escalation: false,
1205            escalation_levels: Vec::new(),
1206        }
1207    }
1208}
1209
1210#[allow(non_snake_case)]
1211#[cfg(test)]
1212mod tests {
1213    use super::*;
1214    use std::time::SystemTime;
1215
1216    #[test]
1217    fn test_automated_alerter_creation() {
1218        let config = AlertConfig::default();
1219        let alerter = AutomatedAlerter::new(config);
1220
1221        let stats = alerter.get_stats().unwrap();
1222        assert_eq!(stats.total_alerts, 0);
1223        assert_eq!(stats.active_alert_count, 0);
1224    }
1225
1226    #[test]
1227    fn test_alert_rule_management() {
1228        let config = AlertConfig::default();
1229        let alerter = AutomatedAlerter::new(config);
1230
1231        let rule = AlertRule {
1232            id: "test_rule".to_string(),
1233            name: "Test Rule".to_string(),
1234            description: "Test alert rule".to_string(),
1235            condition: AlertCondition::Threshold {
1236                metric: "test_metric".to_string(),
1237                operator: ThresholdOperator::GreaterThan,
1238                value: 100.0,
1239                window: Duration::from_secs(60),
1240                min_points: 1,
1241            },
1242            severity: AlertSeverity::Warning,
1243            channels: vec!["console".to_string()],
1244            cooldown: Duration::from_secs(300),
1245            enabled: true,
1246            labels: HashMap::new(),
1247            priority: 1,
1248        };
1249
1250        alerter.add_rule(rule).unwrap();
1251
1252        let rules = alerter.rules.read().unwrap();
1253        assert!(rules.contains_key("test_rule"));
1254    }
1255
1256    #[test]
1257    fn test_alert_channel_management() {
1258        let config = AlertConfig::default();
1259        let alerter = AutomatedAlerter::new(config);
1260
1261        let channel = Box::new(ConsoleAlertChannel::new("test_console"));
1262        alerter.add_channel("console", channel).unwrap();
1263
1264        let channels = alerter.channels.read().unwrap();
1265        assert!(channels.contains_key("console"));
1266    }
1267
1268    #[test]
1269    fn test_console_alert_channel() {
1270        let channel = ConsoleAlertChannel::new("test");
1271
1272        let alert = AlertEvent {
1273            id: "test_alert".to_string(),
1274            rule_id: "test_rule".to_string(),
1275            severity: AlertSeverity::Warning,
1276            message: "Test alert message".to_string(),
1277            timestamp: Utc::now(),
1278            pipeline: Some("test_pipeline".to_string()),
1279            stage: Some("test_stage".to_string()),
1280            labels: HashMap::new(),
1281            metadata: HashMap::new(),
1282            trigger_value: Some(150.0),
1283            status: AlertStatus::Firing,
1284        };
1285
1286        assert!(channel.send_alert(&alert).is_ok());
1287        assert!(channel.health_check().is_ok());
1288        assert_eq!(channel.name(), "test");
1289    }
1290
1291    #[test]
1292    fn test_threshold_condition_evaluation() {
1293        let config = AlertConfig::default();
1294        let alerter = AutomatedAlerter::new(config);
1295
1296        let condition = AlertCondition::Threshold {
1297            metric: "cpu_usage".to_string(),
1298            operator: ThresholdOperator::GreaterThan,
1299            value: 80.0,
1300            window: Duration::from_secs(60),
1301            min_points: 1,
1302        };
1303
1304        let metrics = vec![Metric {
1305            name: "cpu_usage".to_string(),
1306            value: 90.0,
1307            timestamp: SystemTime::now()
1308                .duration_since(UNIX_EPOCH)
1309                .unwrap()
1310                .as_secs(),
1311            pipeline_name: "test_pipeline".to_string(),
1312            stage_name: None,
1313            execution_id: None,
1314            metadata: HashMap::new(),
1315        }];
1316
1317        assert!(alerter.evaluate_condition(&condition, &metrics).unwrap());
1318    }
1319
1320    #[test]
1321    fn test_alert_silence() {
1322        let config = AlertConfig::default();
1323        let alerter = AutomatedAlerter::new(config);
1324
1325        let silence_id = alerter
1326            .create_silence(
1327                vec!["test_rule".to_string()],
1328                Duration::from_secs(3600),
1329                "Maintenance window".to_string(),
1330                "admin".to_string(),
1331            )
1332            .unwrap();
1333
1334        assert!(!silence_id.is_empty());
1335        assert!(alerter.is_silenced("test_rule").unwrap());
1336
1337        alerter.remove_silence(&silence_id).unwrap();
1338        assert!(!alerter.is_silenced("test_rule").unwrap());
1339    }
1340
1341    #[test]
1342    fn test_alert_acknowledgment() {
1343        let config = AlertConfig::default();
1344        let alerter = AutomatedAlerter::new(config);
1345
1346        let alert = AlertEvent {
1347            id: "test_alert".to_string(),
1348            rule_id: "test_rule".to_string(),
1349            severity: AlertSeverity::Warning,
1350            message: "Test alert".to_string(),
1351            timestamp: Utc::now(),
1352            pipeline: None,
1353            stage: None,
1354            labels: HashMap::new(),
1355            metadata: HashMap::new(),
1356            trigger_value: None,
1357            status: AlertStatus::Firing,
1358        };
1359
1360        // Add as active alert
1361        let active_alert = ActiveAlert {
1362            event: alert.clone(),
1363            first_fired: alert.timestamp,
1364            last_updated: alert.timestamp,
1365            fire_count: 1,
1366            escalation_level: 0,
1367            channels_notified: Vec::new(),
1368        };
1369
1370        {
1371            let mut active_alerts = alerter.active_alerts.write().unwrap();
1372            active_alerts.insert(alert.id.clone(), active_alert);
1373        }
1374
1375        alerter.acknowledge_alert(&alert.id, "admin").unwrap();
1376
1377        let active_alerts = alerter.active_alerts.read().unwrap();
1378        let acknowledged_alert = active_alerts.get(&alert.id).unwrap();
1379        assert_eq!(acknowledged_alert.event.status, AlertStatus::Acknowledged);
1380    }
1381}