ant_quic/monitoring/
alerting.rs

1//! Production Alerting System
2//!
3//! This module implements intelligent alerting for NAT traversal operations
4//! with anomaly detection, escalation policies, and multi-channel notifications.
5
6use std::{
7    collections::HashMap,
8    sync::Arc,
9    time::{Duration, Instant, SystemTime},
10};
11
12use tokio::{
13    sync::{RwLock, Mutex},
14    time::interval,
15};
16use tracing::{debug, info, warn};
17
18use crate::monitoring::{
19    MonitoringError, NatTraversalAttempt, NatTraversalResult,
20};
21
22/// Production alert manager with intelligent escalation
23pub struct ProductionAlertManager {
24    /// Alert configuration
25    config: AlertingConfig,
26    /// Rule engine for alert evaluation
27    rule_engine: Arc<AlertRuleEngine>,
28    /// Notification dispatcher
29    notification_dispatcher: Arc<NotificationDispatcher>,
30    /// Alert state manager
31    state_manager: Arc<AlertStateManager>,
32    /// Escalation manager
33    escalation_manager: Arc<EscalationManager>,
34    /// Alert suppression engine
35    suppression_engine: Arc<SuppressionEngine>,
36    /// Background tasks
37    tasks: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
38}
39
40impl ProductionAlertManager {
41    /// Create new production alert manager
42    pub async fn new(config: AlertingConfig) -> Result<Self, MonitoringError> {
43        let rule_engine = Arc::new(AlertRuleEngine::new(config.rules.clone()));
44        let notification_dispatcher = Arc::new(NotificationDispatcher::new(config.notifications.clone()));
45        let state_manager = Arc::new(AlertStateManager::new());
46        let escalation_manager = Arc::new(EscalationManager::new(config.escalation.clone()));
47        let suppression_engine = Arc::new(SuppressionEngine::new(config.suppression.clone()));
48        
49        Ok(Self {
50            config,
51            rule_engine,
52            notification_dispatcher,
53            state_manager,
54            escalation_manager,
55            suppression_engine,
56            tasks: Arc::new(Mutex::new(Vec::new())),
57        })
58    }
59    
60    /// Start alert manager
61    pub async fn start(&self) -> Result<(), MonitoringError> {
62        info!("Starting production alert manager");
63        
64        // Start background tasks
65        self.start_rule_evaluation_task().await?;
66        self.start_escalation_task().await?;
67        self.start_suppression_cleanup_task().await?;
68        self.start_health_monitoring_task().await?;
69        
70        info!("Production alert manager started");
71        Ok(())
72    }
73    
74    /// Stop alert manager
75    pub async fn stop(&self) -> Result<(), MonitoringError> {
76        info!("Stopping production alert manager");
77        
78        // Stop background tasks
79        let mut tasks = self.tasks.lock().await;
80        for task in tasks.drain(..) {
81            task.abort();
82        }
83        
84        info!("Production alert manager stopped");
85        Ok(())
86    }
87    
88    /// Evaluate NAT traversal attempt for alerts
89    pub async fn evaluate_nat_attempt(&self, attempt: &NatTraversalAttempt) -> Result<(), MonitoringError> {
90        // Create evaluation context
91        let context = AlertEvaluationContext {
92            event_type: AlertEventType::NatAttempt,
93            timestamp: attempt.timestamp,
94            attempt_info: Some(attempt.clone()),
95            result_info: None,
96            metrics: HashMap::new(),
97        };
98        
99        // Evaluate rules
100        self.rule_engine.evaluate_rules(&context).await?;
101        
102        Ok(())
103    }
104    
105    /// Evaluate NAT traversal result for alerts
106    pub async fn evaluate_nat_result(&self, result: &NatTraversalResult) -> Result<(), MonitoringError> {
107        // Create evaluation context
108        let mut metrics = HashMap::new();
109        metrics.insert("duration_ms".to_string(), result.duration.as_millis() as f64);
110        metrics.insert("success".to_string(), if result.success { 1.0 } else { 0.0 });
111        
112        let perf = &result.performance_metrics;
113        metrics.insert("connection_time_ms".to_string(), perf.connection_time_ms as f64);
114        metrics.insert("candidates_tried".to_string(), perf.candidates_tried as f64);
115        
116        let context = AlertEvaluationContext {
117            event_type: AlertEventType::NatResult,
118            timestamp: SystemTime::now(),
119            attempt_info: None,
120            result_info: Some(result.clone()),
121            metrics,
122        };
123        
124        // Evaluate rules
125        self.rule_engine.evaluate_rules(&context).await?;
126        
127        Ok(())
128    }
129    
130    /// Get alert manager status
131    pub async fn get_status(&self) -> String {
132        let active_alerts = self.state_manager.get_active_alert_count().await;
133        let suppressed_alerts = self.suppression_engine.get_suppressed_count().await;
134        
135        format!("Active: {}, Suppressed: {}", active_alerts, suppressed_alerts)
136    }
137    
138    /// Manually trigger alert
139    pub async fn trigger_alert(&self, alert: Alert) -> Result<(), MonitoringError> {
140        self.process_alert(alert).await
141    }
142    
143    /// Process triggered alert
144    async fn process_alert(&self, alert: Alert) -> Result<(), MonitoringError> {
145        // Check if alert should be suppressed
146        if self.suppression_engine.should_suppress(&alert).await {
147            debug!("Alert {} suppressed", alert.id);
148            return Ok(());
149        }
150        
151        // Update alert state
152        let alert_state = self.state_manager.update_alert_state(alert.clone()).await?;
153        
154        // Check if alert needs escalation
155        if self.escalation_manager.should_escalate(&alert_state).await {
156            self.escalation_manager.escalate_alert(&alert).await?;
157        }
158        
159        // Send notifications
160        self.notification_dispatcher.dispatch_alert(&alert).await?;
161        
162        info!("Processed alert: {} (severity: {:?})", alert.title, alert.severity);
163        Ok(())
164    }
165    
166    /// Start rule evaluation background task
167    async fn start_rule_evaluation_task(&self) -> Result<(), MonitoringError> {
168        let rule_engine = self.rule_engine.clone();
169        let interval_duration = self.config.evaluation_interval;
170        
171        let task = tokio::spawn(async move {
172            let mut interval = interval(interval_duration);
173            
174            loop {
175                interval.tick().await;
176                
177                // Evaluate time-based rules
178                let context = AlertEvaluationContext {
179                    event_type: AlertEventType::Scheduled,
180                    timestamp: SystemTime::now(),
181                    attempt_info: None,
182                    result_info: None,
183                    metrics: HashMap::new(),
184                };
185                
186                if let Err(e) = rule_engine.evaluate_scheduled_rules(&context).await {
187                    warn!("Scheduled rule evaluation failed: {}", e);
188                }
189            }
190        });
191        
192        self.tasks.lock().await.push(task);
193        Ok(())
194    }
195    
196    /// Start escalation background task
197    async fn start_escalation_task(&self) -> Result<(), MonitoringError> {
198        let escalation_manager = self.escalation_manager.clone();
199        let state_manager = self.state_manager.clone();
200        
201        let task = tokio::spawn(async move {
202            let mut interval = interval(Duration::from_secs(60)); // Check every minute
203            
204            loop {
205                interval.tick().await;
206                
207                let active_alerts = state_manager.get_active_alerts().await;
208                for alert_state in active_alerts {
209                    if escalation_manager.should_escalate(&alert_state).await {
210                        if let Err(e) = escalation_manager.escalate_alert(&alert_state.alert).await {
211                            warn!("Alert escalation failed: {}", e);
212                        }
213                    }
214                }
215            }
216        });
217        
218        self.tasks.lock().await.push(task);
219        Ok(())
220    }
221    
222    /// Start suppression cleanup task
223    async fn start_suppression_cleanup_task(&self) -> Result<(), MonitoringError> {
224        let suppression_engine = self.suppression_engine.clone();
225        
226        let task = tokio::spawn(async move {
227            let mut interval = interval(Duration::from_secs(300)); // Cleanup every 5 minutes
228            
229            loop {
230                interval.tick().await;
231                
232                if let Err(e) = suppression_engine.cleanup_expired_suppressions().await {
233                    warn!("Suppression cleanup failed: {}", e);
234                }
235            }
236        });
237        
238        self.tasks.lock().await.push(task);
239        Ok(())
240    }
241    
242    /// Start health monitoring task
243    async fn start_health_monitoring_task(&self) -> Result<(), MonitoringError> {
244        let notification_dispatcher = self.notification_dispatcher.clone();
245        
246        let task = tokio::spawn(async move {
247            let mut interval = interval(Duration::from_secs(60)); // Health check every minute
248            
249            loop {
250                interval.tick().await;
251                
252                if let Err(e) = notification_dispatcher.health_check().await {
253                    warn!("Notification system health check failed: {}", e);
254                }
255            }
256        });
257        
258        self.tasks.lock().await.push(task);
259        Ok(())
260    }
261}
262
263/// Alerting configuration
264#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
265pub struct AlertingConfig {
266    /// Alert rules
267    pub rules: Vec<AlertRule>,
268    /// Notification channels
269    pub notifications: NotificationConfig,
270    /// Escalation policies
271    pub escalation: EscalationConfig,
272    /// Suppression settings
273    pub suppression: SuppressionConfig,
274    /// Rule evaluation interval
275    pub evaluation_interval: Duration,
276    /// Alert deduplication window
277    pub deduplication_window: Duration,
278}
279
280impl Default for AlertingConfig {
281    fn default() -> Self {
282        Self {
283            rules: vec![
284                AlertRule::default_success_rate_rule(),
285                AlertRule::default_latency_rule(),
286                AlertRule::default_error_rate_rule(),
287            ],
288            notifications: NotificationConfig::default(),
289            escalation: EscalationConfig::default(),
290            suppression: SuppressionConfig::default(),
291            evaluation_interval: Duration::from_secs(30),
292            deduplication_window: Duration::from_secs(300),
293        }
294    }
295}
296
297/// Alert rule definition
298#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
299pub struct AlertRule {
300    /// Rule identifier
301    pub id: String,
302    /// Rule name
303    pub name: String,
304    /// Rule description
305    pub description: String,
306    /// Alert severity
307    pub severity: AlertSeverity,
308    /// Rule condition
309    pub condition: AlertCondition,
310    /// Evaluation frequency
311    pub evaluation_frequency: Duration,
312    /// Alert labels
313    pub labels: HashMap<String, String>,
314    /// Annotations
315    pub annotations: HashMap<String, String>,
316}
317
318impl AlertRule {
319    /// Default success rate alert rule
320    fn default_success_rate_rule() -> Self {
321        Self {
322            id: "nat_success_rate_low".to_string(),
323            name: "NAT Success Rate Low".to_string(),
324            description: "NAT traversal success rate is below threshold".to_string(),
325            severity: AlertSeverity::Warning,
326            condition: AlertCondition::Threshold {
327                metric: "nat_success_rate".to_string(),
328                operator: ThresholdOperator::LessThan,
329                value: 0.8,
330                duration: Duration::from_secs(300),
331            },
332            evaluation_frequency: Duration::from_secs(60),
333            labels: HashMap::from([
334                ("component".to_string(), "nat_traversal".to_string()),
335                ("type".to_string(), "success_rate".to_string()),
336            ]),
337            annotations: HashMap::from([
338                ("summary".to_string(), "NAT traversal success rate below 80%".to_string()),
339                ("runbook".to_string(), "https://docs.example.com/runbooks/nat-success-rate".to_string()),
340            ]),
341        }
342    }
343    
344    /// Default latency alert rule
345    fn default_latency_rule() -> Self {
346        Self {
347            id: "nat_latency_high".to_string(),
348            name: "NAT Latency High".to_string(),
349            description: "NAT traversal latency is above threshold".to_string(),
350            severity: AlertSeverity::Warning,
351            condition: AlertCondition::Threshold {
352                metric: "nat_duration_p95".to_string(),
353                operator: ThresholdOperator::GreaterThan,
354                value: 5000.0, // 5 seconds
355                duration: Duration::from_secs(300),
356            },
357            evaluation_frequency: Duration::from_secs(60),
358            labels: HashMap::from([
359                ("component".to_string(), "nat_traversal".to_string()),
360                ("type".to_string(), "latency".to_string()),
361            ]),
362            annotations: HashMap::from([
363                ("summary".to_string(), "NAT traversal P95 latency above 5s".to_string()),
364            ]),
365        }
366    }
367    
368    /// Default error rate alert rule
369    fn default_error_rate_rule() -> Self {
370        Self {
371            id: "nat_error_rate_high".to_string(),
372            name: "NAT Error Rate High".to_string(),
373            description: "NAT traversal error rate is above threshold".to_string(),
374            severity: AlertSeverity::Critical,
375            condition: AlertCondition::Threshold {
376                metric: "nat_error_rate".to_string(),
377                operator: ThresholdOperator::GreaterThan,
378                value: 0.1, // 10% error rate
379                duration: Duration::from_secs(180),
380            },
381            evaluation_frequency: Duration::from_secs(30),
382            labels: HashMap::from([
383                ("component".to_string(), "nat_traversal".to_string()),
384                ("type".to_string(), "error_rate".to_string()),
385            ]),
386            annotations: HashMap::from([
387                ("summary".to_string(), "NAT traversal error rate above 10%".to_string()),
388                ("priority".to_string(), "high".to_string()),
389            ]),
390        }
391    }
392}
393
394/// Alert condition types
395#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
396pub enum AlertCondition {
397    /// Simple threshold condition
398    Threshold {
399        metric: String,
400        operator: ThresholdOperator,
401        value: f64,
402        duration: Duration,
403    },
404    /// Rate of change condition
405    RateOfChange {
406        metric: String,
407        rate_threshold: f64,
408        duration: Duration,
409    },
410    /// Anomaly detection condition
411    Anomaly {
412        metric: String,
413        sensitivity: f64,
414        baseline_duration: Duration,
415    },
416    /// Complex expression condition
417    Expression {
418        expression: String,
419        duration: Duration,
420    },
421}
422
423/// Threshold operators
424#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
425pub enum ThresholdOperator {
426    GreaterThan,
427    LessThan,
428    Equal,
429    NotEqual,
430    GreaterThanOrEqual,
431    LessThanOrEqual,
432}
433
434/// Alert severity levels
435#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
436pub enum AlertSeverity {
437    Info,
438    Warning,
439    Critical,
440    Fatal,
441}
442
443/// Alert structure
444#[derive(Debug, Clone)]
445pub struct Alert {
446    /// Unique alert identifier
447    pub id: String,
448    /// Alert title
449    pub title: String,
450    /// Alert description
451    pub description: String,
452    /// Alert severity
453    pub severity: AlertSeverity,
454    /// Alert state
455    pub state: AlertState,
456    /// Timestamp when alert was triggered
457    pub triggered_at: SystemTime,
458    /// Labels associated with alert
459    pub labels: HashMap<String, String>,
460    /// Additional annotations
461    pub annotations: HashMap<String, String>,
462    /// Alert source rule
463    pub source_rule: String,
464    /// Current metric value
465    pub current_value: Option<f64>,
466    /// Threshold value
467    pub threshold_value: Option<f64>,
468}
469
470/// Alert states
471#[derive(Debug, Clone, PartialEq)]
472pub enum AlertState {
473    Triggered,
474    Acknowledged,
475    Resolved,
476    Suppressed,
477}
478
479/// Alert rule engine
480struct AlertRuleEngine {
481    rules: Vec<AlertRule>,
482    rule_states: Arc<RwLock<HashMap<String, RuleState>>>,
483}
484
485impl AlertRuleEngine {
486    fn new(rules: Vec<AlertRule>) -> Self {
487        Self {
488            rules,
489            rule_states: Arc::new(RwLock::new(HashMap::new())),
490        }
491    }
492    
493    async fn evaluate_rules(&self, context: &AlertEvaluationContext) -> Result<(), MonitoringError> {
494        for rule in &self.rules {
495            if let Err(e) = self.evaluate_single_rule(rule, context).await {
496                warn!("Rule evaluation failed for {}: {}", rule.id, e);
497            }
498        }
499        Ok(())
500    }
501    
502    async fn evaluate_scheduled_rules(&self, context: &AlertEvaluationContext) -> Result<(), MonitoringError> {
503        // Evaluate time-based rules (would query metrics backend)
504        for rule in &self.rules {
505            // Mock evaluation for demonstration
506            if self.should_evaluate_rule(rule, context.timestamp).await {
507                self.evaluate_single_rule(rule, context).await?;
508            }
509        }
510        Ok(())
511    }
512    
513    async fn evaluate_single_rule(&self, rule: &AlertRule, context: &AlertEvaluationContext) -> Result<(), MonitoringError> {
514        let should_alert = match &rule.condition {
515            AlertCondition::Threshold { metric, operator, value, duration: _ } => {
516                self.evaluate_threshold_condition(metric, operator, *value, context).await?
517            }
518            AlertCondition::RateOfChange { metric, rate_threshold, .. } => {
519                self.evaluate_rate_condition(metric, *rate_threshold, context).await?
520            }
521            AlertCondition::Anomaly { metric, sensitivity, .. } => {
522                self.evaluate_anomaly_condition(metric, *sensitivity, context).await?
523            }
524            AlertCondition::Expression { expression, .. } => {
525                self.evaluate_expression_condition(expression, context).await?
526            }
527        };
528        
529        if should_alert {
530            let alert = self.create_alert_from_rule(rule, context).await;
531            // Would send alert to alert manager for processing
532            debug!("Rule {} triggered alert: {}", rule.id, alert.title);
533        }
534        
535        Ok(())
536    }
537    
538    async fn evaluate_threshold_condition(
539        &self,
540        metric: &str,
541        operator: &ThresholdOperator,
542        threshold: f64,
543        context: &AlertEvaluationContext,
544    ) -> Result<bool, MonitoringError> {
545        let current_value = context.metrics.get(metric).copied().unwrap_or(0.0);
546        
547        let result = match operator {
548            ThresholdOperator::GreaterThan => current_value > threshold,
549            ThresholdOperator::LessThan => current_value < threshold,
550            ThresholdOperator::Equal => (current_value - threshold).abs() < f64::EPSILON,
551            ThresholdOperator::NotEqual => (current_value - threshold).abs() > f64::EPSILON,
552            ThresholdOperator::GreaterThanOrEqual => current_value >= threshold,
553            ThresholdOperator::LessThanOrEqual => current_value <= threshold,
554        };
555        
556        Ok(result)
557    }
558    
559    async fn evaluate_rate_condition(&self, _metric: &str, _rate_threshold: f64, _context: &AlertEvaluationContext) -> Result<bool, MonitoringError> {
560        // Would calculate rate of change
561        Ok(false)
562    }
563    
564    async fn evaluate_anomaly_condition(&self, _metric: &str, _sensitivity: f64, _context: &AlertEvaluationContext) -> Result<bool, MonitoringError> {
565        // Would use anomaly detection algorithms
566        Ok(false)
567    }
568    
569    async fn evaluate_expression_condition(&self, _expression: &str, _context: &AlertEvaluationContext) -> Result<bool, MonitoringError> {
570        // Would evaluate complex expressions
571        Ok(false)
572    }
573    
574    async fn create_alert_from_rule(&self, rule: &AlertRule, context: &AlertEvaluationContext) -> Alert {
575        Alert {
576            id: format!("{}_{}", rule.id, context.timestamp.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()),
577            title: rule.name.clone(),
578            description: rule.description.clone(),
579            severity: rule.severity.clone(),
580            state: AlertState::Triggered,
581            triggered_at: context.timestamp,
582            labels: rule.labels.clone(),
583            annotations: rule.annotations.clone(),
584            source_rule: rule.id.clone(),
585            current_value: None,
586            threshold_value: None,
587        }
588    }
589    
590    async fn should_evaluate_rule(&self, _rule: &AlertRule, _timestamp: SystemTime) -> bool {
591        // Would check if enough time has passed since last evaluation
592        // For now, always evaluate
593        true
594    }
595}
596
597/// Rule evaluation state
598#[derive(Debug)]
599struct RuleState {
600    last_evaluation: Instant,
601    consecutive_violations: u32,
602    last_alert: Option<Instant>,
603}
604
605/// Alert evaluation context
606struct AlertEvaluationContext {
607    event_type: AlertEventType,
608    timestamp: SystemTime,
609    attempt_info: Option<NatTraversalAttempt>,
610    result_info: Option<NatTraversalResult>,
611    metrics: HashMap<String, f64>,
612}
613
614/// Alert event types
615enum AlertEventType {
616    NatAttempt,
617    NatResult,
618    Scheduled,
619}
620
621/// Notification configuration
622#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
623pub struct NotificationConfig {
624    /// Notification channels
625    pub channels: Vec<NotificationChannel>,
626    /// Default channel for alerts
627    pub default_channel: String,
628    /// Rate limiting configuration
629    pub rate_limiting: RateLimitConfig,
630}
631
632impl Default for NotificationConfig {
633    fn default() -> Self {
634        Self {
635            channels: vec![
636                NotificationChannel::Slack {
637                    id: "default".to_string(),
638                    webhook_url: "https://hooks.slack.com/services/...".to_string(),
639                    channel: "#alerts".to_string(),
640                },
641            ],
642            default_channel: "default".to_string(),
643            rate_limiting: RateLimitConfig::default(),
644        }
645    }
646}
647
648/// Notification channels
649#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
650pub enum NotificationChannel {
651    Email {
652        id: String,
653        smtp_server: String,
654        recipients: Vec<String>,
655    },
656    Slack {
657        id: String,
658        webhook_url: String,
659        channel: String,
660    },
661    PagerDuty {
662        id: String,
663        service_key: String,
664    },
665    Webhook {
666        id: String,
667        url: String,
668        headers: HashMap<String, String>,
669    },
670    SMS {
671        id: String,
672        provider: String,
673        numbers: Vec<String>,
674    },
675}
676
677/// Rate limiting configuration
678#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
679pub struct RateLimitConfig {
680    /// Maximum alerts per time window
681    pub max_alerts_per_window: u32,
682    /// Time window duration
683    pub window_duration: Duration,
684    /// Burst allowance
685    pub burst_allowance: u32,
686}
687
688impl Default for RateLimitConfig {
689    fn default() -> Self {
690        Self {
691            max_alerts_per_window: 10,
692            window_duration: Duration::from_secs(60),
693            burst_allowance: 3,
694        }
695    }
696}
697
698/// Escalation configuration
699#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
700pub struct EscalationConfig {
701    /// Escalation policies
702    pub policies: Vec<EscalationPolicy>,
703    /// Default escalation time
704    pub default_escalation_time: Duration,
705}
706
707impl Default for EscalationConfig {
708    fn default() -> Self {
709        Self {
710            policies: vec![
711                EscalationPolicy {
712                    severity: AlertSeverity::Critical,
713                    escalation_time: Duration::from_secs(300), // 5 minutes
714                    escalation_channels: vec!["pagerduty".to_string()],
715                },
716                EscalationPolicy {
717                    severity: AlertSeverity::Fatal,
718                    escalation_time: Duration::from_secs(60), // 1 minute
719                    escalation_channels: vec!["pagerduty".to_string(), "sms".to_string()],
720                },
721            ],
722            default_escalation_time: Duration::from_secs(600), // 10 minutes
723        }
724    }
725}
726
727/// Escalation policy
728#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
729pub struct EscalationPolicy {
730    /// Alert severity this policy applies to
731    pub severity: AlertSeverity,
732    /// Time before escalation
733    pub escalation_time: Duration,
734    /// Channels to escalate to
735    pub escalation_channels: Vec<String>,
736}
737
738/// Suppression configuration
739#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
740pub struct SuppressionConfig {
741    /// Suppression rules
742    pub rules: Vec<SuppressionRule>,
743    /// Default suppression time
744    pub default_suppression_time: Duration,
745}
746
747impl Default for SuppressionConfig {
748    fn default() -> Self {
749        Self {
750            rules: vec![],
751            default_suppression_time: Duration::from_secs(300), // 5 minutes
752        }
753    }
754}
755
756/// Suppression rule
757#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
758pub struct SuppressionRule {
759    /// Rule identifier
760    pub id: String,
761    /// Labels to match for suppression
762    pub label_matchers: HashMap<String, String>,
763    /// Suppression duration
764    pub duration: Duration,
765    /// Reason for suppression
766    pub reason: String,
767}
768
769/// Notification dispatcher
770struct NotificationDispatcher {
771    config: NotificationConfig,
772    rate_limiter: Arc<RateLimiter>,
773}
774
775impl NotificationDispatcher {
776    fn new(config: NotificationConfig) -> Self {
777        Self {
778            rate_limiter: Arc::new(RateLimiter::new(config.rate_limiting.clone())),
779            config,
780        }
781    }
782    
783    async fn dispatch_alert(&self, alert: &Alert) -> Result<(), MonitoringError> {
784        // Check rate limiting
785        if !self.rate_limiter.allow_alert().await {
786            warn!("Alert rate limited: {}", alert.title);
787            return Ok(());
788        }
789        
790        // Send to appropriate channels based on severity
791        let channels = self.select_channels_for_alert(alert);
792        
793        for channel_id in channels {
794            if let Some(channel) = self.find_channel(&channel_id) {
795                if let Err(e) = self.send_to_channel(channel, alert).await {
796                    warn!("Failed to send alert to channel {}: {}", channel_id, e);
797                }
798            }
799        }
800        
801        Ok(())
802    }
803    
804    fn select_channels_for_alert(&self, alert: &Alert) -> Vec<String> {
805        // Select channels based on severity and labels
806        match alert.severity {
807            AlertSeverity::Info => vec![self.config.default_channel.clone()],
808            AlertSeverity::Warning => vec![self.config.default_channel.clone()],
809            AlertSeverity::Critical => vec![self.config.default_channel.clone(), "pagerduty".to_string()],
810            AlertSeverity::Fatal => vec![self.config.default_channel.clone(), "pagerduty".to_string(), "sms".to_string()],
811        }
812    }
813    
814    fn find_channel(&self, channel_id: &str) -> Option<&NotificationChannel> {
815        self.config.channels.iter().find(|ch| match ch {
816            NotificationChannel::Email { id, .. } => id == channel_id,
817            NotificationChannel::Slack { id, .. } => id == channel_id,
818            NotificationChannel::PagerDuty { id, .. } => id == channel_id,
819            NotificationChannel::Webhook { id, .. } => id == channel_id,
820            NotificationChannel::SMS { id, .. } => id == channel_id,
821        })
822    }
823    
824    async fn send_to_channel(&self, channel: &NotificationChannel, alert: &Alert) -> Result<(), MonitoringError> {
825        match channel {
826            NotificationChannel::Slack { webhook_url, channel, .. } => {
827                self.send_slack_notification(webhook_url, channel, alert).await
828            }
829            NotificationChannel::Email { recipients, .. } => {
830                self.send_email_notification(recipients, alert).await
831            }
832            NotificationChannel::PagerDuty { service_key, .. } => {
833                self.send_pagerduty_notification(service_key, alert).await
834            }
835            NotificationChannel::Webhook { url, headers, .. } => {
836                self.send_webhook_notification(url, headers, alert).await
837            }
838            NotificationChannel::SMS { numbers, .. } => {
839                self.send_sms_notification(numbers, alert).await
840            }
841        }
842    }
843    
844    async fn send_slack_notification(&self, _webhook_url: &str, _channel: &str, alert: &Alert) -> Result<(), MonitoringError> {
845        info!("Sending Slack notification for alert: {}", alert.title);
846        // Would implement actual Slack webhook call
847        Ok(())
848    }
849    
850    async fn send_email_notification(&self, _recipients: &[String], alert: &Alert) -> Result<(), MonitoringError> {
851        info!("Sending email notification for alert: {}", alert.title);
852        // Would implement actual email sending
853        Ok(())
854    }
855    
856    async fn send_pagerduty_notification(&self, _service_key: &str, alert: &Alert) -> Result<(), MonitoringError> {
857        info!("Sending PagerDuty notification for alert: {}", alert.title);
858        // Would implement actual PagerDuty API call
859        Ok(())
860    }
861    
862    async fn send_webhook_notification(&self, _url: &str, _headers: &HashMap<String, String>, alert: &Alert) -> Result<(), MonitoringError> {
863        info!("Sending webhook notification for alert: {}", alert.title);
864        // Would implement actual HTTP webhook call
865        Ok(())
866    }
867    
868    async fn send_sms_notification(&self, _numbers: &[String], alert: &Alert) -> Result<(), MonitoringError> {
869        info!("Sending SMS notification for alert: {}", alert.title);
870        // Would implement actual SMS sending
871        Ok(())
872    }
873    
874    async fn health_check(&self) -> Result<(), MonitoringError> {
875        // Health check for notification channels
876        debug!("Notification system health check passed");
877        Ok(())
878    }
879}
880
881/// Rate limiter for notifications
882struct RateLimiter {
883    config: RateLimitConfig,
884    window_start: Arc<RwLock<Instant>>,
885    current_count: Arc<RwLock<u32>>,
886}
887
888impl RateLimiter {
889    fn new(config: RateLimitConfig) -> Self {
890        Self {
891            config,
892            window_start: Arc::new(RwLock::new(Instant::now())),
893            current_count: Arc::new(RwLock::new(0)),
894        }
895    }
896    
897    async fn allow_alert(&self) -> bool {
898        let now = Instant::now();
899        let mut window_start = self.window_start.write().await;
900        let mut current_count = self.current_count.write().await;
901        
902        // Check if we need to reset the window
903        if now.duration_since(*window_start) >= self.config.window_duration {
904            *window_start = now;
905            *current_count = 0;
906        }
907        
908        // Check if we're within limits
909        if *current_count < self.config.max_alerts_per_window {
910            *current_count += 1;
911            true
912        } else {
913            false
914        }
915    }
916}
917
918/// Alert state manager
919struct AlertStateManager {
920    active_alerts: Arc<RwLock<HashMap<String, AlertStateInfo>>>,
921}
922
923impl AlertStateManager {
924    fn new() -> Self {
925        Self {
926            active_alerts: Arc::new(RwLock::new(HashMap::new())),
927        }
928    }
929    
930    async fn update_alert_state(&self, alert: Alert) -> Result<AlertStateInfo, MonitoringError> {
931        let mut active_alerts = self.active_alerts.write().await;
932        
933        let state_info = AlertStateInfo {
934            alert: alert.clone(),
935            first_triggered: SystemTime::now(),
936            last_updated: SystemTime::now(),
937            escalation_level: 0,
938            acknowledgments: Vec::new(),
939        };
940        
941        active_alerts.insert(alert.id.clone(), state_info.clone());
942        Ok(state_info)
943    }
944    
945    async fn get_active_alert_count(&self) -> usize {
946        let active_alerts = self.active_alerts.read().await;
947        active_alerts.len()
948    }
949    
950    async fn get_active_alerts(&self) -> Vec<AlertStateInfo> {
951        let active_alerts = self.active_alerts.read().await;
952        active_alerts.values().cloned().collect()
953    }
954}
955
956/// Alert state information
957#[derive(Debug, Clone)]
958struct AlertStateInfo {
959    alert: Alert,
960    first_triggered: SystemTime,
961    last_updated: SystemTime,
962    escalation_level: u32,
963    acknowledgments: Vec<AlertAcknowledgment>,
964}
965
966/// Alert acknowledgment
967#[derive(Debug, Clone)]
968struct AlertAcknowledgment {
969    user: String,
970    timestamp: SystemTime,
971    message: Option<String>,
972}
973
974/// Escalation manager
975struct EscalationManager {
976    config: EscalationConfig,
977}
978
979impl EscalationManager {
980    fn new(config: EscalationConfig) -> Self {
981        Self { config }
982    }
983    
984    async fn should_escalate(&self, alert_state: &AlertStateInfo) -> bool {
985        let elapsed = alert_state.first_triggered.elapsed().unwrap_or_default();
986        
987        // Find applicable escalation policy
988        for policy in &self.config.policies {
989            if policy.severity == alert_state.alert.severity {
990                return elapsed >= policy.escalation_time;
991            }
992        }
993        
994        // Use default escalation time
995        elapsed >= self.config.default_escalation_time
996    }
997    
998    async fn escalate_alert(&self, alert: &Alert) -> Result<(), MonitoringError> {
999        info!("Escalating alert: {} (severity: {:?})", alert.title, alert.severity);
1000        
1001        // Find escalation policy and send to escalation channels
1002        for policy in &self.config.policies {
1003            if policy.severity == alert.severity {
1004                for channel in &policy.escalation_channels {
1005                    info!("Escalating to channel: {}", channel);
1006                    // Would send escalation notification
1007                }
1008                break;
1009            }
1010        }
1011        
1012        Ok(())
1013    }
1014}
1015
1016/// Suppression engine
1017struct SuppressionEngine {
1018    config: SuppressionConfig,
1019    active_suppressions: Arc<RwLock<HashMap<String, SuppressionInfo>>>,
1020}
1021
1022impl SuppressionEngine {
1023    fn new(config: SuppressionConfig) -> Self {
1024        Self {
1025            config,
1026            active_suppressions: Arc::new(RwLock::new(HashMap::new())),
1027        }
1028    }
1029    
1030    async fn should_suppress(&self, alert: &Alert) -> bool {
1031        let suppressions = self.active_suppressions.read().await;
1032        
1033        for suppression in suppressions.values() {
1034            if self.alert_matches_suppression(alert, &suppression.rule) {
1035                return true;
1036            }
1037        }
1038        
1039        false
1040    }
1041    
1042    fn alert_matches_suppression(&self, alert: &Alert, rule: &SuppressionRule) -> bool {
1043        // Check if alert labels match suppression rule
1044        for (key, value) in &rule.label_matchers {
1045            if alert.labels.get(key) != Some(value) {
1046                return false;
1047            }
1048        }
1049        true
1050    }
1051    
1052    async fn get_suppressed_count(&self) -> usize {
1053        let suppressions = self.active_suppressions.read().await;
1054        suppressions.len()
1055    }
1056    
1057    async fn cleanup_expired_suppressions(&self) -> Result<(), MonitoringError> {
1058        let mut suppressions = self.active_suppressions.write().await;
1059        let now = SystemTime::now();
1060        
1061        suppressions.retain(|_, suppression| {
1062            now.duration_since(suppression.created_at).unwrap_or_default() < suppression.rule.duration
1063        });
1064        
1065        Ok(())
1066    }
1067}
1068
1069/// Suppression information
1070#[derive(Debug, Clone)]
1071struct SuppressionInfo {
1072    rule: SuppressionRule,
1073    created_at: SystemTime,
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078    use super::*;
1079
1080    #[tokio::test]
1081    async fn test_alert_manager_creation() {
1082        let config = AlertingConfig::default();
1083        let manager = ProductionAlertManager::new(config).await.unwrap();
1084        
1085        let status = manager.get_status().await;
1086        assert!(status.contains("Active: 0"));
1087    }
1088    
1089    #[tokio::test]
1090    async fn test_threshold_evaluation() {
1091        let rules = vec![AlertRule::default_success_rate_rule()];
1092        let engine = AlertRuleEngine::new(rules);
1093        
1094        let mut metrics = HashMap::new();
1095        metrics.insert("nat_success_rate".to_string(), 0.5); // Below threshold
1096        
1097        let context = AlertEvaluationContext {
1098            event_type: AlertEventType::Scheduled,
1099            timestamp: SystemTime::now(),
1100            attempt_info: None,
1101            result_info: None,
1102            metrics,
1103        };
1104        
1105        let should_alert = engine.evaluate_threshold_condition(
1106            "nat_success_rate",
1107            &ThresholdOperator::LessThan,
1108            0.8,
1109            &context,
1110        ).await.unwrap();
1111        
1112        assert!(should_alert);
1113    }
1114    
1115    #[tokio::test]
1116    async fn test_rate_limiter() {
1117        let config = RateLimitConfig {
1118            max_alerts_per_window: 2,
1119            window_duration: Duration::from_secs(60),
1120            burst_allowance: 1,
1121        };
1122        
1123        let limiter = RateLimiter::new(config);
1124        
1125        // First two alerts should be allowed
1126        assert!(limiter.allow_alert().await);
1127        assert!(limiter.allow_alert().await);
1128        
1129        // Third alert should be rate limited
1130        assert!(!limiter.allow_alert().await);
1131    }
1132}