1use std::{
7 collections::HashMap,
8 sync::Arc,
9 time::{Duration, Instant, SystemTime},
10};
11
12use tokio::{
13 sync::{RwLock, Mutex},
14 time::interval,
15};
16use tracing::{debug, info, warn};
17
18use crate::monitoring::{
19 MonitoringError, NatTraversalAttempt, NatTraversalResult,
20};
21
22pub struct ProductionAlertManager {
24 config: AlertingConfig,
26 rule_engine: Arc<AlertRuleEngine>,
28 notification_dispatcher: Arc<NotificationDispatcher>,
30 state_manager: Arc<AlertStateManager>,
32 escalation_manager: Arc<EscalationManager>,
34 suppression_engine: Arc<SuppressionEngine>,
36 tasks: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>,
38}
39
40impl ProductionAlertManager {
41 pub async fn new(config: AlertingConfig) -> Result<Self, MonitoringError> {
43 let rule_engine = Arc::new(AlertRuleEngine::new(config.rules.clone()));
44 let notification_dispatcher = Arc::new(NotificationDispatcher::new(config.notifications.clone()));
45 let state_manager = Arc::new(AlertStateManager::new());
46 let escalation_manager = Arc::new(EscalationManager::new(config.escalation.clone()));
47 let suppression_engine = Arc::new(SuppressionEngine::new(config.suppression.clone()));
48
49 Ok(Self {
50 config,
51 rule_engine,
52 notification_dispatcher,
53 state_manager,
54 escalation_manager,
55 suppression_engine,
56 tasks: Arc::new(Mutex::new(Vec::new())),
57 })
58 }
59
60 pub async fn start(&self) -> Result<(), MonitoringError> {
62 info!("Starting production alert manager");
63
64 self.start_rule_evaluation_task().await?;
66 self.start_escalation_task().await?;
67 self.start_suppression_cleanup_task().await?;
68 self.start_health_monitoring_task().await?;
69
70 info!("Production alert manager started");
71 Ok(())
72 }
73
74 pub async fn stop(&self) -> Result<(), MonitoringError> {
76 info!("Stopping production alert manager");
77
78 let mut tasks = self.tasks.lock().await;
80 for task in tasks.drain(..) {
81 task.abort();
82 }
83
84 info!("Production alert manager stopped");
85 Ok(())
86 }
87
88 pub async fn evaluate_nat_attempt(&self, attempt: &NatTraversalAttempt) -> Result<(), MonitoringError> {
90 let context = AlertEvaluationContext {
92 event_type: AlertEventType::NatAttempt,
93 timestamp: attempt.timestamp,
94 attempt_info: Some(attempt.clone()),
95 result_info: None,
96 metrics: HashMap::new(),
97 };
98
99 self.rule_engine.evaluate_rules(&context).await?;
101
102 Ok(())
103 }
104
105 pub async fn evaluate_nat_result(&self, result: &NatTraversalResult) -> Result<(), MonitoringError> {
107 let mut metrics = HashMap::new();
109 metrics.insert("duration_ms".to_string(), result.duration.as_millis() as f64);
110 metrics.insert("success".to_string(), if result.success { 1.0 } else { 0.0 });
111
112 let perf = &result.performance_metrics;
113 metrics.insert("connection_time_ms".to_string(), perf.connection_time_ms as f64);
114 metrics.insert("candidates_tried".to_string(), perf.candidates_tried as f64);
115
116 let context = AlertEvaluationContext {
117 event_type: AlertEventType::NatResult,
118 timestamp: SystemTime::now(),
119 attempt_info: None,
120 result_info: Some(result.clone()),
121 metrics,
122 };
123
124 self.rule_engine.evaluate_rules(&context).await?;
126
127 Ok(())
128 }
129
130 pub async fn get_status(&self) -> String {
132 let active_alerts = self.state_manager.get_active_alert_count().await;
133 let suppressed_alerts = self.suppression_engine.get_suppressed_count().await;
134
135 format!("Active: {}, Suppressed: {}", active_alerts, suppressed_alerts)
136 }
137
138 pub async fn trigger_alert(&self, alert: Alert) -> Result<(), MonitoringError> {
140 self.process_alert(alert).await
141 }
142
143 async fn process_alert(&self, alert: Alert) -> Result<(), MonitoringError> {
145 if self.suppression_engine.should_suppress(&alert).await {
147 debug!("Alert {} suppressed", alert.id);
148 return Ok(());
149 }
150
151 let alert_state = self.state_manager.update_alert_state(alert.clone()).await?;
153
154 if self.escalation_manager.should_escalate(&alert_state).await {
156 self.escalation_manager.escalate_alert(&alert).await?;
157 }
158
159 self.notification_dispatcher.dispatch_alert(&alert).await?;
161
162 info!("Processed alert: {} (severity: {:?})", alert.title, alert.severity);
163 Ok(())
164 }
165
166 async fn start_rule_evaluation_task(&self) -> Result<(), MonitoringError> {
168 let rule_engine = self.rule_engine.clone();
169 let interval_duration = self.config.evaluation_interval;
170
171 let task = tokio::spawn(async move {
172 let mut interval = interval(interval_duration);
173
174 loop {
175 interval.tick().await;
176
177 let context = AlertEvaluationContext {
179 event_type: AlertEventType::Scheduled,
180 timestamp: SystemTime::now(),
181 attempt_info: None,
182 result_info: None,
183 metrics: HashMap::new(),
184 };
185
186 if let Err(e) = rule_engine.evaluate_scheduled_rules(&context).await {
187 warn!("Scheduled rule evaluation failed: {}", e);
188 }
189 }
190 });
191
192 self.tasks.lock().await.push(task);
193 Ok(())
194 }
195
196 async fn start_escalation_task(&self) -> Result<(), MonitoringError> {
198 let escalation_manager = self.escalation_manager.clone();
199 let state_manager = self.state_manager.clone();
200
201 let task = tokio::spawn(async move {
202 let mut interval = interval(Duration::from_secs(60)); loop {
205 interval.tick().await;
206
207 let active_alerts = state_manager.get_active_alerts().await;
208 for alert_state in active_alerts {
209 if escalation_manager.should_escalate(&alert_state).await {
210 if let Err(e) = escalation_manager.escalate_alert(&alert_state.alert).await {
211 warn!("Alert escalation failed: {}", e);
212 }
213 }
214 }
215 }
216 });
217
218 self.tasks.lock().await.push(task);
219 Ok(())
220 }
221
222 async fn start_suppression_cleanup_task(&self) -> Result<(), MonitoringError> {
224 let suppression_engine = self.suppression_engine.clone();
225
226 let task = tokio::spawn(async move {
227 let mut interval = interval(Duration::from_secs(300)); loop {
230 interval.tick().await;
231
232 if let Err(e) = suppression_engine.cleanup_expired_suppressions().await {
233 warn!("Suppression cleanup failed: {}", e);
234 }
235 }
236 });
237
238 self.tasks.lock().await.push(task);
239 Ok(())
240 }
241
242 async fn start_health_monitoring_task(&self) -> Result<(), MonitoringError> {
244 let notification_dispatcher = self.notification_dispatcher.clone();
245
246 let task = tokio::spawn(async move {
247 let mut interval = interval(Duration::from_secs(60)); loop {
250 interval.tick().await;
251
252 if let Err(e) = notification_dispatcher.health_check().await {
253 warn!("Notification system health check failed: {}", e);
254 }
255 }
256 });
257
258 self.tasks.lock().await.push(task);
259 Ok(())
260 }
261}
262
263#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
265pub struct AlertingConfig {
266 pub rules: Vec<AlertRule>,
268 pub notifications: NotificationConfig,
270 pub escalation: EscalationConfig,
272 pub suppression: SuppressionConfig,
274 pub evaluation_interval: Duration,
276 pub deduplication_window: Duration,
278}
279
280impl Default for AlertingConfig {
281 fn default() -> Self {
282 Self {
283 rules: vec![
284 AlertRule::default_success_rate_rule(),
285 AlertRule::default_latency_rule(),
286 AlertRule::default_error_rate_rule(),
287 ],
288 notifications: NotificationConfig::default(),
289 escalation: EscalationConfig::default(),
290 suppression: SuppressionConfig::default(),
291 evaluation_interval: Duration::from_secs(30),
292 deduplication_window: Duration::from_secs(300),
293 }
294 }
295}
296
297#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
299pub struct AlertRule {
300 pub id: String,
302 pub name: String,
304 pub description: String,
306 pub severity: AlertSeverity,
308 pub condition: AlertCondition,
310 pub evaluation_frequency: Duration,
312 pub labels: HashMap<String, String>,
314 pub annotations: HashMap<String, String>,
316}
317
318impl AlertRule {
319 fn default_success_rate_rule() -> Self {
321 Self {
322 id: "nat_success_rate_low".to_string(),
323 name: "NAT Success Rate Low".to_string(),
324 description: "NAT traversal success rate is below threshold".to_string(),
325 severity: AlertSeverity::Warning,
326 condition: AlertCondition::Threshold {
327 metric: "nat_success_rate".to_string(),
328 operator: ThresholdOperator::LessThan,
329 value: 0.8,
330 duration: Duration::from_secs(300),
331 },
332 evaluation_frequency: Duration::from_secs(60),
333 labels: HashMap::from([
334 ("component".to_string(), "nat_traversal".to_string()),
335 ("type".to_string(), "success_rate".to_string()),
336 ]),
337 annotations: HashMap::from([
338 ("summary".to_string(), "NAT traversal success rate below 80%".to_string()),
339 ("runbook".to_string(), "https://docs.example.com/runbooks/nat-success-rate".to_string()),
340 ]),
341 }
342 }
343
344 fn default_latency_rule() -> Self {
346 Self {
347 id: "nat_latency_high".to_string(),
348 name: "NAT Latency High".to_string(),
349 description: "NAT traversal latency is above threshold".to_string(),
350 severity: AlertSeverity::Warning,
351 condition: AlertCondition::Threshold {
352 metric: "nat_duration_p95".to_string(),
353 operator: ThresholdOperator::GreaterThan,
354 value: 5000.0, duration: Duration::from_secs(300),
356 },
357 evaluation_frequency: Duration::from_secs(60),
358 labels: HashMap::from([
359 ("component".to_string(), "nat_traversal".to_string()),
360 ("type".to_string(), "latency".to_string()),
361 ]),
362 annotations: HashMap::from([
363 ("summary".to_string(), "NAT traversal P95 latency above 5s".to_string()),
364 ]),
365 }
366 }
367
368 fn default_error_rate_rule() -> Self {
370 Self {
371 id: "nat_error_rate_high".to_string(),
372 name: "NAT Error Rate High".to_string(),
373 description: "NAT traversal error rate is above threshold".to_string(),
374 severity: AlertSeverity::Critical,
375 condition: AlertCondition::Threshold {
376 metric: "nat_error_rate".to_string(),
377 operator: ThresholdOperator::GreaterThan,
378 value: 0.1, duration: Duration::from_secs(180),
380 },
381 evaluation_frequency: Duration::from_secs(30),
382 labels: HashMap::from([
383 ("component".to_string(), "nat_traversal".to_string()),
384 ("type".to_string(), "error_rate".to_string()),
385 ]),
386 annotations: HashMap::from([
387 ("summary".to_string(), "NAT traversal error rate above 10%".to_string()),
388 ("priority".to_string(), "high".to_string()),
389 ]),
390 }
391 }
392}
393
394#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
396pub enum AlertCondition {
397 Threshold {
399 metric: String,
400 operator: ThresholdOperator,
401 value: f64,
402 duration: Duration,
403 },
404 RateOfChange {
406 metric: String,
407 rate_threshold: f64,
408 duration: Duration,
409 },
410 Anomaly {
412 metric: String,
413 sensitivity: f64,
414 baseline_duration: Duration,
415 },
416 Expression {
418 expression: String,
419 duration: Duration,
420 },
421}
422
423#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
425pub enum ThresholdOperator {
426 GreaterThan,
427 LessThan,
428 Equal,
429 NotEqual,
430 GreaterThanOrEqual,
431 LessThanOrEqual,
432}
433
434#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize)]
436pub enum AlertSeverity {
437 Info,
438 Warning,
439 Critical,
440 Fatal,
441}
442
443#[derive(Debug, Clone)]
445pub struct Alert {
446 pub id: String,
448 pub title: String,
450 pub description: String,
452 pub severity: AlertSeverity,
454 pub state: AlertState,
456 pub triggered_at: SystemTime,
458 pub labels: HashMap<String, String>,
460 pub annotations: HashMap<String, String>,
462 pub source_rule: String,
464 pub current_value: Option<f64>,
466 pub threshold_value: Option<f64>,
468}
469
470#[derive(Debug, Clone, PartialEq)]
472pub enum AlertState {
473 Triggered,
474 Acknowledged,
475 Resolved,
476 Suppressed,
477}
478
479struct AlertRuleEngine {
481 rules: Vec<AlertRule>,
482 rule_states: Arc<RwLock<HashMap<String, RuleState>>>,
483}
484
485impl AlertRuleEngine {
486 fn new(rules: Vec<AlertRule>) -> Self {
487 Self {
488 rules,
489 rule_states: Arc::new(RwLock::new(HashMap::new())),
490 }
491 }
492
493 async fn evaluate_rules(&self, context: &AlertEvaluationContext) -> Result<(), MonitoringError> {
494 for rule in &self.rules {
495 if let Err(e) = self.evaluate_single_rule(rule, context).await {
496 warn!("Rule evaluation failed for {}: {}", rule.id, e);
497 }
498 }
499 Ok(())
500 }
501
502 async fn evaluate_scheduled_rules(&self, context: &AlertEvaluationContext) -> Result<(), MonitoringError> {
503 for rule in &self.rules {
505 if self.should_evaluate_rule(rule, context.timestamp).await {
507 self.evaluate_single_rule(rule, context).await?;
508 }
509 }
510 Ok(())
511 }
512
513 async fn evaluate_single_rule(&self, rule: &AlertRule, context: &AlertEvaluationContext) -> Result<(), MonitoringError> {
514 let should_alert = match &rule.condition {
515 AlertCondition::Threshold { metric, operator, value, duration: _ } => {
516 self.evaluate_threshold_condition(metric, operator, *value, context).await?
517 }
518 AlertCondition::RateOfChange { metric, rate_threshold, .. } => {
519 self.evaluate_rate_condition(metric, *rate_threshold, context).await?
520 }
521 AlertCondition::Anomaly { metric, sensitivity, .. } => {
522 self.evaluate_anomaly_condition(metric, *sensitivity, context).await?
523 }
524 AlertCondition::Expression { expression, .. } => {
525 self.evaluate_expression_condition(expression, context).await?
526 }
527 };
528
529 if should_alert {
530 let alert = self.create_alert_from_rule(rule, context).await;
531 debug!("Rule {} triggered alert: {}", rule.id, alert.title);
533 }
534
535 Ok(())
536 }
537
538 async fn evaluate_threshold_condition(
539 &self,
540 metric: &str,
541 operator: &ThresholdOperator,
542 threshold: f64,
543 context: &AlertEvaluationContext,
544 ) -> Result<bool, MonitoringError> {
545 let current_value = context.metrics.get(metric).copied().unwrap_or(0.0);
546
547 let result = match operator {
548 ThresholdOperator::GreaterThan => current_value > threshold,
549 ThresholdOperator::LessThan => current_value < threshold,
550 ThresholdOperator::Equal => (current_value - threshold).abs() < f64::EPSILON,
551 ThresholdOperator::NotEqual => (current_value - threshold).abs() > f64::EPSILON,
552 ThresholdOperator::GreaterThanOrEqual => current_value >= threshold,
553 ThresholdOperator::LessThanOrEqual => current_value <= threshold,
554 };
555
556 Ok(result)
557 }
558
559 async fn evaluate_rate_condition(&self, _metric: &str, _rate_threshold: f64, _context: &AlertEvaluationContext) -> Result<bool, MonitoringError> {
560 Ok(false)
562 }
563
564 async fn evaluate_anomaly_condition(&self, _metric: &str, _sensitivity: f64, _context: &AlertEvaluationContext) -> Result<bool, MonitoringError> {
565 Ok(false)
567 }
568
569 async fn evaluate_expression_condition(&self, _expression: &str, _context: &AlertEvaluationContext) -> Result<bool, MonitoringError> {
570 Ok(false)
572 }
573
574 async fn create_alert_from_rule(&self, rule: &AlertRule, context: &AlertEvaluationContext) -> Alert {
575 Alert {
576 id: format!("{}_{}", rule.id, context.timestamp.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs()),
577 title: rule.name.clone(),
578 description: rule.description.clone(),
579 severity: rule.severity.clone(),
580 state: AlertState::Triggered,
581 triggered_at: context.timestamp,
582 labels: rule.labels.clone(),
583 annotations: rule.annotations.clone(),
584 source_rule: rule.id.clone(),
585 current_value: None,
586 threshold_value: None,
587 }
588 }
589
590 async fn should_evaluate_rule(&self, _rule: &AlertRule, _timestamp: SystemTime) -> bool {
591 true
594 }
595}
596
597#[derive(Debug)]
599struct RuleState {
600 last_evaluation: Instant,
601 consecutive_violations: u32,
602 last_alert: Option<Instant>,
603}
604
605struct AlertEvaluationContext {
607 event_type: AlertEventType,
608 timestamp: SystemTime,
609 attempt_info: Option<NatTraversalAttempt>,
610 result_info: Option<NatTraversalResult>,
611 metrics: HashMap<String, f64>,
612}
613
614enum AlertEventType {
616 NatAttempt,
617 NatResult,
618 Scheduled,
619}
620
621#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
623pub struct NotificationConfig {
624 pub channels: Vec<NotificationChannel>,
626 pub default_channel: String,
628 pub rate_limiting: RateLimitConfig,
630}
631
632impl Default for NotificationConfig {
633 fn default() -> Self {
634 Self {
635 channels: vec![
636 NotificationChannel::Slack {
637 id: "default".to_string(),
638 webhook_url: "https://hooks.slack.com/services/...".to_string(),
639 channel: "#alerts".to_string(),
640 },
641 ],
642 default_channel: "default".to_string(),
643 rate_limiting: RateLimitConfig::default(),
644 }
645 }
646}
647
648#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
650pub enum NotificationChannel {
651 Email {
652 id: String,
653 smtp_server: String,
654 recipients: Vec<String>,
655 },
656 Slack {
657 id: String,
658 webhook_url: String,
659 channel: String,
660 },
661 PagerDuty {
662 id: String,
663 service_key: String,
664 },
665 Webhook {
666 id: String,
667 url: String,
668 headers: HashMap<String, String>,
669 },
670 SMS {
671 id: String,
672 provider: String,
673 numbers: Vec<String>,
674 },
675}
676
677#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
679pub struct RateLimitConfig {
680 pub max_alerts_per_window: u32,
682 pub window_duration: Duration,
684 pub burst_allowance: u32,
686}
687
688impl Default for RateLimitConfig {
689 fn default() -> Self {
690 Self {
691 max_alerts_per_window: 10,
692 window_duration: Duration::from_secs(60),
693 burst_allowance: 3,
694 }
695 }
696}
697
698#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
700pub struct EscalationConfig {
701 pub policies: Vec<EscalationPolicy>,
703 pub default_escalation_time: Duration,
705}
706
707impl Default for EscalationConfig {
708 fn default() -> Self {
709 Self {
710 policies: vec![
711 EscalationPolicy {
712 severity: AlertSeverity::Critical,
713 escalation_time: Duration::from_secs(300), escalation_channels: vec!["pagerduty".to_string()],
715 },
716 EscalationPolicy {
717 severity: AlertSeverity::Fatal,
718 escalation_time: Duration::from_secs(60), escalation_channels: vec!["pagerduty".to_string(), "sms".to_string()],
720 },
721 ],
722 default_escalation_time: Duration::from_secs(600), }
724 }
725}
726
727#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
729pub struct EscalationPolicy {
730 pub severity: AlertSeverity,
732 pub escalation_time: Duration,
734 pub escalation_channels: Vec<String>,
736}
737
738#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
740pub struct SuppressionConfig {
741 pub rules: Vec<SuppressionRule>,
743 pub default_suppression_time: Duration,
745}
746
747impl Default for SuppressionConfig {
748 fn default() -> Self {
749 Self {
750 rules: vec![],
751 default_suppression_time: Duration::from_secs(300), }
753 }
754}
755
756#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
758pub struct SuppressionRule {
759 pub id: String,
761 pub label_matchers: HashMap<String, String>,
763 pub duration: Duration,
765 pub reason: String,
767}
768
769struct NotificationDispatcher {
771 config: NotificationConfig,
772 rate_limiter: Arc<RateLimiter>,
773}
774
775impl NotificationDispatcher {
776 fn new(config: NotificationConfig) -> Self {
777 Self {
778 rate_limiter: Arc::new(RateLimiter::new(config.rate_limiting.clone())),
779 config,
780 }
781 }
782
783 async fn dispatch_alert(&self, alert: &Alert) -> Result<(), MonitoringError> {
784 if !self.rate_limiter.allow_alert().await {
786 warn!("Alert rate limited: {}", alert.title);
787 return Ok(());
788 }
789
790 let channels = self.select_channels_for_alert(alert);
792
793 for channel_id in channels {
794 if let Some(channel) = self.find_channel(&channel_id) {
795 if let Err(e) = self.send_to_channel(channel, alert).await {
796 warn!("Failed to send alert to channel {}: {}", channel_id, e);
797 }
798 }
799 }
800
801 Ok(())
802 }
803
804 fn select_channels_for_alert(&self, alert: &Alert) -> Vec<String> {
805 match alert.severity {
807 AlertSeverity::Info => vec![self.config.default_channel.clone()],
808 AlertSeverity::Warning => vec![self.config.default_channel.clone()],
809 AlertSeverity::Critical => vec![self.config.default_channel.clone(), "pagerduty".to_string()],
810 AlertSeverity::Fatal => vec![self.config.default_channel.clone(), "pagerduty".to_string(), "sms".to_string()],
811 }
812 }
813
814 fn find_channel(&self, channel_id: &str) -> Option<&NotificationChannel> {
815 self.config.channels.iter().find(|ch| match ch {
816 NotificationChannel::Email { id, .. } => id == channel_id,
817 NotificationChannel::Slack { id, .. } => id == channel_id,
818 NotificationChannel::PagerDuty { id, .. } => id == channel_id,
819 NotificationChannel::Webhook { id, .. } => id == channel_id,
820 NotificationChannel::SMS { id, .. } => id == channel_id,
821 })
822 }
823
824 async fn send_to_channel(&self, channel: &NotificationChannel, alert: &Alert) -> Result<(), MonitoringError> {
825 match channel {
826 NotificationChannel::Slack { webhook_url, channel, .. } => {
827 self.send_slack_notification(webhook_url, channel, alert).await
828 }
829 NotificationChannel::Email { recipients, .. } => {
830 self.send_email_notification(recipients, alert).await
831 }
832 NotificationChannel::PagerDuty { service_key, .. } => {
833 self.send_pagerduty_notification(service_key, alert).await
834 }
835 NotificationChannel::Webhook { url, headers, .. } => {
836 self.send_webhook_notification(url, headers, alert).await
837 }
838 NotificationChannel::SMS { numbers, .. } => {
839 self.send_sms_notification(numbers, alert).await
840 }
841 }
842 }
843
844 async fn send_slack_notification(&self, _webhook_url: &str, _channel: &str, alert: &Alert) -> Result<(), MonitoringError> {
845 info!("Sending Slack notification for alert: {}", alert.title);
846 Ok(())
848 }
849
850 async fn send_email_notification(&self, _recipients: &[String], alert: &Alert) -> Result<(), MonitoringError> {
851 info!("Sending email notification for alert: {}", alert.title);
852 Ok(())
854 }
855
856 async fn send_pagerduty_notification(&self, _service_key: &str, alert: &Alert) -> Result<(), MonitoringError> {
857 info!("Sending PagerDuty notification for alert: {}", alert.title);
858 Ok(())
860 }
861
862 async fn send_webhook_notification(&self, _url: &str, _headers: &HashMap<String, String>, alert: &Alert) -> Result<(), MonitoringError> {
863 info!("Sending webhook notification for alert: {}", alert.title);
864 Ok(())
866 }
867
868 async fn send_sms_notification(&self, _numbers: &[String], alert: &Alert) -> Result<(), MonitoringError> {
869 info!("Sending SMS notification for alert: {}", alert.title);
870 Ok(())
872 }
873
874 async fn health_check(&self) -> Result<(), MonitoringError> {
875 debug!("Notification system health check passed");
877 Ok(())
878 }
879}
880
881struct RateLimiter {
883 config: RateLimitConfig,
884 window_start: Arc<RwLock<Instant>>,
885 current_count: Arc<RwLock<u32>>,
886}
887
888impl RateLimiter {
889 fn new(config: RateLimitConfig) -> Self {
890 Self {
891 config,
892 window_start: Arc::new(RwLock::new(Instant::now())),
893 current_count: Arc::new(RwLock::new(0)),
894 }
895 }
896
897 async fn allow_alert(&self) -> bool {
898 let now = Instant::now();
899 let mut window_start = self.window_start.write().await;
900 let mut current_count = self.current_count.write().await;
901
902 if now.duration_since(*window_start) >= self.config.window_duration {
904 *window_start = now;
905 *current_count = 0;
906 }
907
908 if *current_count < self.config.max_alerts_per_window {
910 *current_count += 1;
911 true
912 } else {
913 false
914 }
915 }
916}
917
918struct AlertStateManager {
920 active_alerts: Arc<RwLock<HashMap<String, AlertStateInfo>>>,
921}
922
923impl AlertStateManager {
924 fn new() -> Self {
925 Self {
926 active_alerts: Arc::new(RwLock::new(HashMap::new())),
927 }
928 }
929
930 async fn update_alert_state(&self, alert: Alert) -> Result<AlertStateInfo, MonitoringError> {
931 let mut active_alerts = self.active_alerts.write().await;
932
933 let state_info = AlertStateInfo {
934 alert: alert.clone(),
935 first_triggered: SystemTime::now(),
936 last_updated: SystemTime::now(),
937 escalation_level: 0,
938 acknowledgments: Vec::new(),
939 };
940
941 active_alerts.insert(alert.id.clone(), state_info.clone());
942 Ok(state_info)
943 }
944
945 async fn get_active_alert_count(&self) -> usize {
946 let active_alerts = self.active_alerts.read().await;
947 active_alerts.len()
948 }
949
950 async fn get_active_alerts(&self) -> Vec<AlertStateInfo> {
951 let active_alerts = self.active_alerts.read().await;
952 active_alerts.values().cloned().collect()
953 }
954}
955
956#[derive(Debug, Clone)]
958struct AlertStateInfo {
959 alert: Alert,
960 first_triggered: SystemTime,
961 last_updated: SystemTime,
962 escalation_level: u32,
963 acknowledgments: Vec<AlertAcknowledgment>,
964}
965
966#[derive(Debug, Clone)]
968struct AlertAcknowledgment {
969 user: String,
970 timestamp: SystemTime,
971 message: Option<String>,
972}
973
974struct EscalationManager {
976 config: EscalationConfig,
977}
978
979impl EscalationManager {
980 fn new(config: EscalationConfig) -> Self {
981 Self { config }
982 }
983
984 async fn should_escalate(&self, alert_state: &AlertStateInfo) -> bool {
985 let elapsed = alert_state.first_triggered.elapsed().unwrap_or_default();
986
987 for policy in &self.config.policies {
989 if policy.severity == alert_state.alert.severity {
990 return elapsed >= policy.escalation_time;
991 }
992 }
993
994 elapsed >= self.config.default_escalation_time
996 }
997
998 async fn escalate_alert(&self, alert: &Alert) -> Result<(), MonitoringError> {
999 info!("Escalating alert: {} (severity: {:?})", alert.title, alert.severity);
1000
1001 for policy in &self.config.policies {
1003 if policy.severity == alert.severity {
1004 for channel in &policy.escalation_channels {
1005 info!("Escalating to channel: {}", channel);
1006 }
1008 break;
1009 }
1010 }
1011
1012 Ok(())
1013 }
1014}
1015
1016struct SuppressionEngine {
1018 config: SuppressionConfig,
1019 active_suppressions: Arc<RwLock<HashMap<String, SuppressionInfo>>>,
1020}
1021
1022impl SuppressionEngine {
1023 fn new(config: SuppressionConfig) -> Self {
1024 Self {
1025 config,
1026 active_suppressions: Arc::new(RwLock::new(HashMap::new())),
1027 }
1028 }
1029
1030 async fn should_suppress(&self, alert: &Alert) -> bool {
1031 let suppressions = self.active_suppressions.read().await;
1032
1033 for suppression in suppressions.values() {
1034 if self.alert_matches_suppression(alert, &suppression.rule) {
1035 return true;
1036 }
1037 }
1038
1039 false
1040 }
1041
1042 fn alert_matches_suppression(&self, alert: &Alert, rule: &SuppressionRule) -> bool {
1043 for (key, value) in &rule.label_matchers {
1045 if alert.labels.get(key) != Some(value) {
1046 return false;
1047 }
1048 }
1049 true
1050 }
1051
1052 async fn get_suppressed_count(&self) -> usize {
1053 let suppressions = self.active_suppressions.read().await;
1054 suppressions.len()
1055 }
1056
1057 async fn cleanup_expired_suppressions(&self) -> Result<(), MonitoringError> {
1058 let mut suppressions = self.active_suppressions.write().await;
1059 let now = SystemTime::now();
1060
1061 suppressions.retain(|_, suppression| {
1062 now.duration_since(suppression.created_at).unwrap_or_default() < suppression.rule.duration
1063 });
1064
1065 Ok(())
1066 }
1067}
1068
1069#[derive(Debug, Clone)]
1071struct SuppressionInfo {
1072 rule: SuppressionRule,
1073 created_at: SystemTime,
1074}
1075
1076#[cfg(test)]
1077mod tests {
1078 use super::*;
1079
1080 #[tokio::test]
1081 async fn test_alert_manager_creation() {
1082 let config = AlertingConfig::default();
1083 let manager = ProductionAlertManager::new(config).await.unwrap();
1084
1085 let status = manager.get_status().await;
1086 assert!(status.contains("Active: 0"));
1087 }
1088
1089 #[tokio::test]
1090 async fn test_threshold_evaluation() {
1091 let rules = vec![AlertRule::default_success_rate_rule()];
1092 let engine = AlertRuleEngine::new(rules);
1093
1094 let mut metrics = HashMap::new();
1095 metrics.insert("nat_success_rate".to_string(), 0.5); let context = AlertEvaluationContext {
1098 event_type: AlertEventType::Scheduled,
1099 timestamp: SystemTime::now(),
1100 attempt_info: None,
1101 result_info: None,
1102 metrics,
1103 };
1104
1105 let should_alert = engine.evaluate_threshold_condition(
1106 "nat_success_rate",
1107 &ThresholdOperator::LessThan,
1108 0.8,
1109 &context,
1110 ).await.unwrap();
1111
1112 assert!(should_alert);
1113 }
1114
1115 #[tokio::test]
1116 async fn test_rate_limiter() {
1117 let config = RateLimitConfig {
1118 max_alerts_per_window: 2,
1119 window_duration: Duration::from_secs(60),
1120 burst_allowance: 1,
1121 };
1122
1123 let limiter = RateLimiter::new(config);
1124
1125 assert!(limiter.allow_alert().await);
1127 assert!(limiter.allow_alert().await);
1128
1129 assert!(!limiter.allow_alert().await);
1131 }
1132}