rexis_rag/observability/
alerting.rs

1//! # Alerting System
2//!
3//! Intelligent alerting with threshold monitoring, notification channels,
4//! and automated response capabilities for RRAG system health.
5
6use super::metrics::{Metric, MetricValue, MetricsCollector};
7use crate::{RragError, RragResult};
8use chrono::{DateTime, Duration, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14/// Alerting configuration
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct AlertConfig {
17    pub enabled: bool,
18    pub evaluation_interval_seconds: u64,
19    pub alert_buffer_size: usize,
20    pub notification_channels: Vec<NotificationChannelConfig>,
21    pub default_severity: AlertSeverity,
22    pub alert_grouping_enabled: bool,
23    pub alert_grouping_window_minutes: u32,
24    pub escalation_enabled: bool,
25    pub escalation_delay_minutes: u32,
26}
27
28impl Default for AlertConfig {
29    fn default() -> Self {
30        Self {
31            enabled: true,
32            evaluation_interval_seconds: 30,
33            alert_buffer_size: 1000,
34            notification_channels: vec![NotificationChannelConfig {
35                name: "console".to_string(),
36                channel_type: NotificationChannelType::Console,
37                enabled: true,
38                config: HashMap::new(),
39            }],
40            default_severity: AlertSeverity::Medium,
41            alert_grouping_enabled: true,
42            alert_grouping_window_minutes: 5,
43            escalation_enabled: false,
44            escalation_delay_minutes: 30,
45        }
46    }
47}
48
49/// Notification channel configuration
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct NotificationChannelConfig {
52    pub name: String,
53    pub channel_type: NotificationChannelType,
54    pub enabled: bool,
55    pub config: HashMap<String, String>,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
59pub enum NotificationChannelType {
60    Console,
61    Email,
62    Slack,
63    Webhook,
64    SMS,
65    PagerDuty,
66}
67
68/// Alert severity levels
69#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
70pub enum AlertSeverity {
71    Low = 1,
72    Medium = 2,
73    High = 3,
74    Critical = 4,
75}
76
77impl std::fmt::Display for AlertSeverity {
78    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79        match self {
80            Self::Low => write!(f, "LOW"),
81            Self::Medium => write!(f, "MEDIUM"),
82            Self::High => write!(f, "HIGH"),
83            Self::Critical => write!(f, "CRITICAL"),
84        }
85    }
86}
87
88/// Alert conditions for triggering alerts
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub enum AlertCondition {
91    Threshold {
92        metric_name: String,
93        operator: ComparisonOperator,
94        value: f64,
95        duration_minutes: u32,
96    },
97    RateOfChange {
98        metric_name: String,
99        operator: ComparisonOperator,
100        rate_per_minute: f64,
101        window_minutes: u32,
102    },
103    Anomaly {
104        metric_name: String,
105        sensitivity: f64,
106        baseline_minutes: u32,
107    },
108    Composite {
109        conditions: Vec<AlertCondition>,
110        logic: LogicOperator,
111    },
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
115pub enum ComparisonOperator {
116    GreaterThan,
117    LessThan,
118    GreaterThanOrEqual,
119    LessThanOrEqual,
120    Equal,
121    NotEqual,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub enum LogicOperator {
126    And,
127    Or,
128}
129
130/// Alert rule definition
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct AlertRule {
133    pub id: String,
134    pub name: String,
135    pub description: String,
136    pub condition: AlertCondition,
137    pub severity: AlertSeverity,
138    pub enabled: bool,
139    pub notification_channels: Vec<String>,
140    pub tags: HashMap<String, String>,
141    pub created_at: DateTime<Utc>,
142    pub updated_at: DateTime<Utc>,
143    pub cooldown_minutes: u32,
144    pub auto_resolve: bool,
145    pub auto_resolve_after_minutes: Option<u32>,
146}
147
148impl AlertRule {
149    pub fn new(
150        id: impl Into<String>,
151        name: impl Into<String>,
152        condition: AlertCondition,
153        severity: AlertSeverity,
154    ) -> Self {
155        let now = Utc::now();
156        Self {
157            id: id.into(),
158            name: name.into(),
159            description: String::new(),
160            condition,
161            severity,
162            enabled: true,
163            notification_channels: vec!["console".to_string()],
164            tags: HashMap::new(),
165            created_at: now,
166            updated_at: now,
167            cooldown_minutes: 5,
168            auto_resolve: true,
169            auto_resolve_after_minutes: Some(30),
170        }
171    }
172
173    pub fn with_description(mut self, description: impl Into<String>) -> Self {
174        self.description = description.into();
175        self
176    }
177
178    pub fn with_channels(mut self, channels: Vec<String>) -> Self {
179        self.notification_channels = channels;
180        self
181    }
182
183    pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
184        self.tags.insert(key.into(), value.into());
185        self
186    }
187
188    pub fn with_cooldown(mut self, minutes: u32) -> Self {
189        self.cooldown_minutes = minutes;
190        self
191    }
192}
193
194/// Alert notification
195#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct AlertNotification {
197    pub id: String,
198    pub rule_id: String,
199    pub rule_name: String,
200    pub severity: AlertSeverity,
201    pub status: AlertStatus,
202    pub message: String,
203    pub details: HashMap<String, serde_json::Value>,
204    pub triggered_at: DateTime<Utc>,
205    pub resolved_at: Option<DateTime<Utc>>,
206    pub acknowledged_at: Option<DateTime<Utc>>,
207    pub acknowledged_by: Option<String>,
208    pub notification_channels: Vec<String>,
209    pub tags: HashMap<String, String>,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
213pub enum AlertStatus {
214    Triggered,
215    Acknowledged,
216    Resolved,
217    Suppressed,
218}
219
220/// Notification channel trait
221#[async_trait::async_trait]
222pub trait NotificationChannel: Send + Sync {
223    async fn send_notification(&self, notification: &AlertNotification) -> RragResult<()>;
224    fn channel_type(&self) -> NotificationChannelType;
225    fn name(&self) -> &str;
226    async fn is_healthy(&self) -> bool;
227}
228
229/// Console notification channel
230pub struct ConsoleNotificationChannel {
231    name: String,
232}
233
234impl ConsoleNotificationChannel {
235    pub fn new(name: impl Into<String>) -> Self {
236        Self { name: name.into() }
237    }
238}
239
240#[async_trait::async_trait]
241impl NotificationChannel for ConsoleNotificationChannel {
242    async fn send_notification(&self, notification: &AlertNotification) -> RragResult<()> {
243        let status_symbol = match notification.status {
244            AlertStatus::Triggered => "🚨",
245            AlertStatus::Acknowledged => "✅",
246            AlertStatus::Resolved => "✅",
247            AlertStatus::Suppressed => "🔇",
248        };
249
250        let severity_color = match notification.severity {
251            AlertSeverity::Critical => "\x1b[31m", // Red
252            AlertSeverity::High => "\x1b[33m",     // Yellow
253            AlertSeverity::Medium => "\x1b[36m",   // Cyan
254            AlertSeverity::Low => "\x1b[32m",      // Green
255        };
256
257        tracing::debug!(
258            "{} {}[{}]\x1b[0m {} - {} ({})",
259            status_symbol,
260            severity_color,
261            notification.severity,
262            notification.rule_name,
263            notification.message,
264            notification.triggered_at.format("%Y-%m-%d %H:%M:%S UTC")
265        );
266
267        if !notification.details.is_empty() {
268            tracing::debug!("   Details: {:?}", notification.details);
269        }
270
271        Ok(())
272    }
273
274    fn channel_type(&self) -> NotificationChannelType {
275        NotificationChannelType::Console
276    }
277
278    fn name(&self) -> &str {
279        &self.name
280    }
281
282    async fn is_healthy(&self) -> bool {
283        true // Console is always available
284    }
285}
286
287/// Webhook notification channel
288pub struct WebhookNotificationChannel {
289    name: String,
290    url: String,
291    headers: HashMap<String, String>,
292    #[cfg(feature = "http")]
293    client: reqwest::Client,
294}
295
296impl WebhookNotificationChannel {
297    pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
298        Self {
299            name: name.into(),
300            url: url.into(),
301            headers: HashMap::new(),
302            #[cfg(feature = "http")]
303            client: reqwest::Client::new(),
304        }
305    }
306
307    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
308        self.headers.insert(key.into(), value.into());
309        self
310    }
311}
312
313#[async_trait::async_trait]
314impl NotificationChannel for WebhookNotificationChannel {
315    async fn send_notification(&self, notification: &AlertNotification) -> RragResult<()> {
316        #[cfg(feature = "http")]
317        {
318            let payload = serde_json::json!({
319                "alert_id": notification.id,
320                "rule_name": notification.rule_name,
321                "severity": notification.severity,
322                "status": notification.status,
323                "message": notification.message,
324                "details": notification.details,
325                "triggered_at": notification.triggered_at,
326                "tags": notification.tags
327            });
328
329            let mut request = self.client.post(&self.url).json(&payload);
330
331            for (key, value) in &self.headers {
332                request = request.header(key, value);
333            }
334
335            request
336                .send()
337                .await
338                .map_err(|e| RragError::network("webhook_notification", Box::new(e)))?
339                .error_for_status()
340                .map_err(|e| RragError::network("webhook_notification", Box::new(e)))?;
341
342            Ok(())
343        }
344        #[cfg(not(feature = "http"))]
345        {
346            tracing::warn!(
347                "HTTP feature not enabled, webhook notification to {} skipped",
348                self.url
349            );
350            Ok(())
351        }
352    }
353
354    fn channel_type(&self) -> NotificationChannelType {
355        NotificationChannelType::Webhook
356    }
357
358    fn name(&self) -> &str {
359        &self.name
360    }
361
362    async fn is_healthy(&self) -> bool {
363        #[cfg(feature = "http")]
364        {
365            // Simple health check - try to connect to the webhook URL
366            self.client.head(&self.url).send().await.is_ok()
367        }
368        #[cfg(not(feature = "http"))]
369        {
370            // Without HTTP feature, assume healthy
371            true
372        }
373    }
374}
375
376/// Alert condition evaluator
377pub struct AlertEvaluator {
378    metrics_history: Arc<RwLock<HashMap<String, Vec<(DateTime<Utc>, f64)>>>>,
379    max_history_size: usize,
380}
381
382impl AlertEvaluator {
383    pub fn new(max_history_size: usize) -> Self {
384        Self {
385            metrics_history: Arc::new(RwLock::new(HashMap::new())),
386            max_history_size,
387        }
388    }
389
390    pub async fn update_metric(&self, metric_name: String, value: f64) {
391        let mut history = self.metrics_history.write().await;
392        let entry = history.entry(metric_name).or_insert_with(Vec::new);
393
394        entry.push((Utc::now(), value));
395
396        // Keep only recent data
397        if entry.len() > self.max_history_size {
398            entry.drain(0..entry.len() - self.max_history_size);
399        }
400    }
401
402    pub async fn evaluate_condition(&self, condition: &AlertCondition) -> RragResult<bool> {
403        match condition {
404            AlertCondition::Threshold {
405                metric_name,
406                operator,
407                value,
408                duration_minutes,
409            } => {
410                self.evaluate_threshold(metric_name, operator, *value, *duration_minutes)
411                    .await
412            }
413            AlertCondition::RateOfChange {
414                metric_name,
415                operator,
416                rate_per_minute,
417                window_minutes,
418            } => {
419                self.evaluate_rate_of_change(
420                    metric_name,
421                    operator,
422                    *rate_per_minute,
423                    *window_minutes,
424                )
425                .await
426            }
427            AlertCondition::Anomaly {
428                metric_name,
429                sensitivity,
430                baseline_minutes,
431            } => {
432                self.evaluate_anomaly(metric_name, *sensitivity, *baseline_minutes)
433                    .await
434            }
435            AlertCondition::Composite { conditions, logic } => {
436                self.evaluate_composite(conditions, logic).await
437            }
438        }
439    }
440
441    async fn evaluate_threshold(
442        &self,
443        metric_name: &str,
444        operator: &ComparisonOperator,
445        threshold: f64,
446        duration_minutes: u32,
447    ) -> RragResult<bool> {
448        let history = self.metrics_history.read().await;
449        let values = history.get(metric_name).ok_or_else(|| {
450            RragError::agent(
451                "alert_evaluator",
452                format!("Metric not found: {}", metric_name),
453            )
454        })?;
455
456        if values.is_empty() {
457            return Ok(false);
458        }
459
460        let cutoff_time = Utc::now() - Duration::minutes(duration_minutes as i64);
461        let recent_values: Vec<_> = values
462            .iter()
463            .filter(|(timestamp, _)| *timestamp >= cutoff_time)
464            .map(|(_, value)| *value)
465            .collect();
466
467        if recent_values.is_empty() {
468            return Ok(false);
469        }
470
471        // All values in the duration must satisfy the condition
472        Ok(recent_values.iter().all(|&value| match operator {
473            ComparisonOperator::GreaterThan => value > threshold,
474            ComparisonOperator::LessThan => value < threshold,
475            ComparisonOperator::GreaterThanOrEqual => value >= threshold,
476            ComparisonOperator::LessThanOrEqual => value <= threshold,
477            ComparisonOperator::Equal => (value - threshold).abs() < f64::EPSILON,
478            ComparisonOperator::NotEqual => (value - threshold).abs() >= f64::EPSILON,
479        }))
480    }
481
482    async fn evaluate_rate_of_change(
483        &self,
484        metric_name: &str,
485        operator: &ComparisonOperator,
486        rate_threshold: f64,
487        window_minutes: u32,
488    ) -> RragResult<bool> {
489        let history = self.metrics_history.read().await;
490        let values = history.get(metric_name).ok_or_else(|| {
491            RragError::agent(
492                "alert_evaluator",
493                format!("Metric not found: {}", metric_name),
494            )
495        })?;
496
497        if values.len() < 2 {
498            return Ok(false);
499        }
500
501        let cutoff_time = Utc::now() - Duration::minutes(window_minutes as i64);
502        let recent_values: Vec<_> = values
503            .iter()
504            .filter(|(timestamp, _)| *timestamp >= cutoff_time)
505            .collect();
506
507        if recent_values.len() < 2 {
508            return Ok(false);
509        }
510
511        let (earliest_time, earliest_value) = recent_values.first().unwrap();
512        let (latest_time, latest_value) = recent_values.last().unwrap();
513
514        let time_diff_minutes = (*latest_time - *earliest_time).num_minutes() as f64;
515        if time_diff_minutes <= 0.0 {
516            return Ok(false);
517        }
518
519        let rate_of_change = (latest_value - earliest_value) / time_diff_minutes;
520
521        Ok(match operator {
522            ComparisonOperator::GreaterThan => rate_of_change > rate_threshold,
523            ComparisonOperator::LessThan => rate_of_change < rate_threshold,
524            ComparisonOperator::GreaterThanOrEqual => rate_of_change >= rate_threshold,
525            ComparisonOperator::LessThanOrEqual => rate_of_change <= rate_threshold,
526            ComparisonOperator::Equal => (rate_of_change - rate_threshold).abs() < f64::EPSILON,
527            ComparisonOperator::NotEqual => (rate_of_change - rate_threshold).abs() >= f64::EPSILON,
528        })
529    }
530
531    async fn evaluate_anomaly(
532        &self,
533        metric_name: &str,
534        sensitivity: f64,
535        baseline_minutes: u32,
536    ) -> RragResult<bool> {
537        let history = self.metrics_history.read().await;
538        let values = history.get(metric_name).ok_or_else(|| {
539            RragError::agent(
540                "alert_evaluator",
541                format!("Metric not found: {}", metric_name),
542            )
543        })?;
544
545        if values.len() < 10 {
546            return Ok(false); // Need sufficient data for anomaly detection
547        }
548
549        let cutoff_time = Utc::now() - Duration::minutes(baseline_minutes as i64);
550        let baseline_values: Vec<f64> = values
551            .iter()
552            .filter(|(timestamp, _)| *timestamp >= cutoff_time)
553            .map(|(_, value)| *value)
554            .collect();
555
556        if baseline_values.len() < 5 {
557            return Ok(false);
558        }
559
560        // Simple anomaly detection using standard deviation
561        let mean = baseline_values.iter().sum::<f64>() / baseline_values.len() as f64;
562        let variance = baseline_values
563            .iter()
564            .map(|value| (value - mean).powi(2))
565            .sum::<f64>()
566            / baseline_values.len() as f64;
567        let std_dev = variance.sqrt();
568
569        let current_value = values.last().unwrap().1;
570        let z_score = (current_value - mean) / std_dev;
571
572        Ok(z_score.abs() > sensitivity)
573    }
574
575    async fn evaluate_composite(
576        &self,
577        conditions: &[AlertCondition],
578        logic: &LogicOperator,
579    ) -> RragResult<bool> {
580        let mut results = Vec::new();
581        for condition in conditions {
582            let result = match condition {
583                AlertCondition::Threshold {
584                    metric_name,
585                    operator,
586                    value,
587                    duration_minutes,
588                } => {
589                    self.evaluate_threshold(metric_name, operator, *value, *duration_minutes)
590                        .await?
591                }
592                AlertCondition::RateOfChange {
593                    metric_name,
594                    operator,
595                    rate_per_minute,
596                    window_minutes,
597                } => {
598                    self.evaluate_rate_of_change(
599                        metric_name,
600                        operator,
601                        *rate_per_minute,
602                        *window_minutes,
603                    )
604                    .await?
605                }
606                AlertCondition::Anomaly {
607                    metric_name,
608                    sensitivity,
609                    baseline_minutes,
610                } => {
611                    self.evaluate_anomaly(metric_name, *sensitivity, *baseline_minutes)
612                        .await?
613                }
614                AlertCondition::Composite { .. } => {
615                    // Prevent infinite recursion by limiting depth
616                    return Err(RragError::config(
617                        "alert_condition",
618                        "non-nested composite",
619                        "nested composite",
620                    ));
621                }
622            };
623            results.push(result);
624        }
625
626        Ok(match logic {
627            LogicOperator::And => results.iter().all(|&result| result),
628            LogicOperator::Or => results.iter().any(|&result| result),
629        })
630    }
631}
632
633/// Main alert manager
634pub struct AlertManager {
635    config: AlertConfig,
636    metrics_collector: Arc<MetricsCollector>,
637    alert_rules: Arc<RwLock<HashMap<String, AlertRule>>>,
638    active_alerts: Arc<RwLock<HashMap<String, AlertNotification>>>,
639    notification_channels: Arc<RwLock<HashMap<String, Box<dyn NotificationChannel>>>>,
640    evaluator: Arc<AlertEvaluator>,
641    evaluation_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
642    is_running: Arc<RwLock<bool>>,
643}
644
645impl AlertManager {
646    pub async fn new(
647        config: AlertConfig,
648        metrics_collector: Arc<MetricsCollector>,
649    ) -> RragResult<Self> {
650        let manager = Self {
651            config: config.clone(),
652            metrics_collector,
653            alert_rules: Arc::new(RwLock::new(HashMap::new())),
654            active_alerts: Arc::new(RwLock::new(HashMap::new())),
655            notification_channels: Arc::new(RwLock::new(HashMap::new())),
656            evaluator: Arc::new(AlertEvaluator::new(1000)),
657            evaluation_handle: Arc::new(RwLock::new(None)),
658            is_running: Arc::new(RwLock::new(false)),
659        };
660
661        // Initialize default notification channels
662        manager.setup_notification_channels().await?;
663
664        // Add default alert rules
665        manager.setup_default_rules().await?;
666
667        Ok(manager)
668    }
669
670    async fn setup_notification_channels(&self) -> RragResult<()> {
671        let mut channels = self.notification_channels.write().await;
672
673        for channel_config in &self.config.notification_channels {
674            if !channel_config.enabled {
675                continue;
676            }
677
678            let channel: Box<dyn NotificationChannel> = match channel_config.channel_type {
679                NotificationChannelType::Console => {
680                    Box::new(ConsoleNotificationChannel::new(&channel_config.name))
681                }
682                NotificationChannelType::Webhook => {
683                    if let Some(url) = channel_config.config.get("url") {
684                        let mut webhook =
685                            WebhookNotificationChannel::new(&channel_config.name, url);
686
687                        // Add any custom headers
688                        for (key, value) in &channel_config.config {
689                            if key.starts_with("header_") {
690                                let header_name = key.strip_prefix("header_").unwrap();
691                                webhook = webhook.with_header(header_name, value);
692                            }
693                        }
694
695                        Box::new(webhook)
696                    } else {
697                        return Err(RragError::config("webhook_channel", "url", "missing"));
698                    }
699                }
700                _ => {
701                    tracing::warn!(
702                        "Notification channel type {:?} not yet implemented",
703                        channel_config.channel_type
704                    );
705                    continue;
706                }
707            };
708
709            channels.insert(channel_config.name.clone(), channel);
710        }
711
712        Ok(())
713    }
714
715    async fn setup_default_rules(&self) -> RragResult<()> {
716        let mut rules = self.alert_rules.write().await;
717
718        // High CPU usage alert
719        let cpu_rule = AlertRule::new(
720            "high_cpu_usage",
721            "High CPU Usage",
722            AlertCondition::Threshold {
723                metric_name: "system_cpu_usage_percent".to_string(),
724                operator: ComparisonOperator::GreaterThan,
725                value: 80.0,
726                duration_minutes: 5,
727            },
728            AlertSeverity::High,
729        )
730        .with_description("CPU usage is above 80% for more than 5 minutes");
731
732        // High memory usage alert
733        let memory_rule = AlertRule::new(
734            "high_memory_usage",
735            "High Memory Usage",
736            AlertCondition::Threshold {
737                metric_name: "system_memory_usage_percent".to_string(),
738                operator: ComparisonOperator::GreaterThan,
739                value: 85.0,
740                duration_minutes: 5,
741            },
742            AlertSeverity::High,
743        )
744        .with_description("Memory usage is above 85% for more than 5 minutes");
745
746        // High error rate alert
747        let error_rate_rule = AlertRule::new(
748            "high_error_rate",
749            "High Error Rate",
750            AlertCondition::RateOfChange {
751                metric_name: "search_queries_failed".to_string(),
752                operator: ComparisonOperator::GreaterThan,
753                rate_per_minute: 10.0,
754                window_minutes: 10,
755            },
756            AlertSeverity::Critical,
757        )
758        .with_description("Error rate is increasing rapidly");
759
760        // Slow response time alert
761        let slow_response_rule = AlertRule::new(
762            "slow_response_time",
763            "Slow Response Time",
764            AlertCondition::Threshold {
765                metric_name: "search_processing_time_ms".to_string(),
766                operator: ComparisonOperator::GreaterThan,
767                value: 1000.0,
768                duration_minutes: 3,
769            },
770            AlertSeverity::Medium,
771        )
772        .with_description("Search response time is above 1 second");
773
774        rules.insert("high_cpu_usage".to_string(), cpu_rule);
775        rules.insert("high_memory_usage".to_string(), memory_rule);
776        rules.insert("high_error_rate".to_string(), error_rate_rule);
777        rules.insert("slow_response_time".to_string(), slow_response_rule);
778
779        Ok(())
780    }
781
782    pub async fn start(&self) -> RragResult<()> {
783        let mut running = self.is_running.write().await;
784        if *running {
785            return Err(RragError::config(
786                "alert_manager",
787                "stopped",
788                "already running",
789            ));
790        }
791
792        let handle = self.start_evaluation_loop().await?;
793        {
794            let mut eval_handle = self.evaluation_handle.write().await;
795            *eval_handle = Some(handle);
796        }
797
798        *running = true;
799        tracing::info!("Alert manager started");
800        Ok(())
801    }
802
803    pub async fn stop(&self) -> RragResult<()> {
804        let mut running = self.is_running.write().await;
805        if !*running {
806            return Ok(());
807        }
808
809        {
810            let mut eval_handle = self.evaluation_handle.write().await;
811            if let Some(handle) = eval_handle.take() {
812                handle.abort();
813            }
814        }
815
816        *running = false;
817        tracing::info!("Alert manager stopped");
818        Ok(())
819    }
820
821    pub async fn is_healthy(&self) -> bool {
822        *self.is_running.read().await
823    }
824
825    async fn start_evaluation_loop(&self) -> RragResult<tokio::task::JoinHandle<()>> {
826        let config = self.config.clone();
827        let alert_rules = self.alert_rules.clone();
828        let active_alerts = self.active_alerts.clone();
829        let notification_channels = self.notification_channels.clone();
830        let evaluator = self.evaluator.clone();
831        let metrics_collector = self.metrics_collector.clone();
832        let is_running = self.is_running.clone();
833
834        let handle = tokio::spawn(async move {
835            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
836                config.evaluation_interval_seconds,
837            ));
838
839            while *is_running.read().await {
840                interval.tick().await;
841
842                // Update metrics in evaluator
843                let all_metrics = metrics_collector.get_all_metrics().await;
844                for metric in all_metrics {
845                    if let Some(value) = Self::extract_metric_value(&metric) {
846                        evaluator.update_metric(metric.name, value).await;
847                    }
848                }
849
850                // Evaluate all alert rules
851                let rules = alert_rules.read().await;
852                for (rule_id, rule) in rules.iter() {
853                    if !rule.enabled {
854                        continue;
855                    }
856
857                    match evaluator.evaluate_condition(&rule.condition).await {
858                        Ok(triggered) => {
859                            if triggered {
860                                Self::handle_alert_triggered(
861                                    rule_id,
862                                    rule,
863                                    &active_alerts,
864                                    &notification_channels,
865                                )
866                                .await;
867                            } else {
868                                Self::handle_alert_resolved(
869                                    rule_id,
870                                    rule,
871                                    &active_alerts,
872                                    &notification_channels,
873                                )
874                                .await;
875                            }
876                        }
877                        Err(e) => {
878                            tracing::error!("Failed to evaluate alert rule {}: {}", rule_id, e);
879                        }
880                    }
881                }
882            }
883        });
884
885        Ok(handle)
886    }
887
888    fn extract_metric_value(metric: &Metric) -> Option<f64> {
889        match &metric.value {
890            MetricValue::Counter(value) => Some(*value as f64),
891            MetricValue::Gauge(value) => Some(*value),
892            MetricValue::Timer { duration_ms, .. } => Some(*duration_ms),
893            MetricValue::Histogram { sum, count, .. } => {
894                if *count > 0 {
895                    Some(sum / *count as f64)
896                } else {
897                    Some(0.0)
898                }
899            }
900            MetricValue::Summary { sum, count, .. } => {
901                if *count > 0 {
902                    Some(sum / *count as f64)
903                } else {
904                    Some(0.0)
905                }
906            }
907        }
908    }
909
910    async fn handle_alert_triggered(
911        rule_id: &str,
912        rule: &AlertRule,
913        active_alerts: &Arc<RwLock<HashMap<String, AlertNotification>>>,
914        notification_channels: &Arc<RwLock<HashMap<String, Box<dyn NotificationChannel>>>>,
915    ) {
916        let mut alerts = active_alerts.write().await;
917
918        // Check if alert is already active (within cooldown)
919        if let Some(existing_alert) = alerts.get(rule_id) {
920            let cooldown_duration = Duration::minutes(rule.cooldown_minutes as i64);
921            if existing_alert.triggered_at + cooldown_duration > Utc::now() {
922                return; // Still in cooldown period
923            }
924        }
925
926        let alert_notification = AlertNotification {
927            id: uuid::Uuid::new_v4().to_string(),
928            rule_id: rule_id.to_string(),
929            rule_name: rule.name.clone(),
930            severity: rule.severity,
931            status: AlertStatus::Triggered,
932            message: format!("Alert triggered: {}", rule.description),
933            details: HashMap::new(),
934            triggered_at: Utc::now(),
935            resolved_at: None,
936            acknowledged_at: None,
937            acknowledged_by: None,
938            notification_channels: rule.notification_channels.clone(),
939            tags: rule.tags.clone(),
940        };
941
942        alerts.insert(rule_id.to_string(), alert_notification.clone());
943        drop(alerts);
944
945        // Send notifications
946        let channels = notification_channels.read().await;
947        for channel_name in &rule.notification_channels {
948            if let Some(channel) = channels.get(channel_name) {
949                if let Err(e) = channel.send_notification(&alert_notification).await {
950                    tracing::error!("Failed to send notification via {}: {}", channel_name, e);
951                }
952            }
953        }
954    }
955
956    async fn handle_alert_resolved(
957        rule_id: &str,
958        rule: &AlertRule,
959        active_alerts: &Arc<RwLock<HashMap<String, AlertNotification>>>,
960        notification_channels: &Arc<RwLock<HashMap<String, Box<dyn NotificationChannel>>>>,
961    ) {
962        let mut alerts = active_alerts.write().await;
963
964        if let Some(mut alert) = alerts.remove(rule_id) {
965            if rule.auto_resolve && alert.status == AlertStatus::Triggered {
966                alert.status = AlertStatus::Resolved;
967                alert.resolved_at = Some(Utc::now());
968                alert.message = format!("Alert resolved: {}", rule.description);
969
970                drop(alerts);
971
972                // Send resolution notifications
973                let channels = notification_channels.read().await;
974                for channel_name in &rule.notification_channels {
975                    if let Some(channel) = channels.get(channel_name) {
976                        if let Err(e) = channel.send_notification(&alert).await {
977                            tracing::error!(
978                                "Failed to send resolution notification via {}: {}",
979                                channel_name,
980                                e
981                            );
982                        }
983                    }
984                }
985            }
986        }
987    }
988
989    pub async fn add_alert_rule(&self, rule: AlertRule) -> RragResult<()> {
990        let mut rules = self.alert_rules.write().await;
991        rules.insert(rule.id.clone(), rule);
992        Ok(())
993    }
994
995    pub async fn remove_alert_rule(&self, rule_id: &str) -> RragResult<()> {
996        let mut rules = self.alert_rules.write().await;
997        rules.remove(rule_id);
998
999        // Also remove any active alerts for this rule
1000        let mut alerts = self.active_alerts.write().await;
1001        alerts.remove(rule_id);
1002
1003        Ok(())
1004    }
1005
1006    pub async fn acknowledge_alert(
1007        &self,
1008        rule_id: &str,
1009        acknowledged_by: impl Into<String>,
1010    ) -> RragResult<()> {
1011        let mut alerts = self.active_alerts.write().await;
1012        if let Some(alert) = alerts.get_mut(rule_id) {
1013            alert.status = AlertStatus::Acknowledged;
1014            alert.acknowledged_at = Some(Utc::now());
1015            alert.acknowledged_by = Some(acknowledged_by.into());
1016
1017            tracing::info!("Alert {} acknowledged", rule_id);
1018        }
1019        Ok(())
1020    }
1021
1022    pub async fn get_active_alerts(&self) -> Vec<AlertNotification> {
1023        let alerts = self.active_alerts.read().await;
1024        alerts.values().cloned().collect()
1025    }
1026
1027    pub async fn get_alert_rules(&self) -> Vec<AlertRule> {
1028        let rules = self.alert_rules.read().await;
1029        rules.values().cloned().collect()
1030    }
1031
1032    pub async fn get_alert_stats(&self) -> AlertStats {
1033        let alerts = self.active_alerts.read().await;
1034        let rules = self.alert_rules.read().await;
1035
1036        let total_alerts = alerts.len();
1037        let by_severity = alerts.values().fold(HashMap::new(), |mut acc, alert| {
1038            *acc.entry(alert.severity).or_insert(0) += 1;
1039            acc
1040        });
1041
1042        let by_status = alerts.values().fold(HashMap::new(), |mut acc, alert| {
1043            *acc.entry(alert.status.clone()).or_insert(0) += 1;
1044            acc
1045        });
1046
1047        AlertStats {
1048            total_active_alerts: total_alerts,
1049            total_rules: rules.len(),
1050            alerts_by_severity: by_severity,
1051            alerts_by_status: by_status,
1052            last_evaluation: Utc::now(),
1053        }
1054    }
1055}
1056
1057/// Alert statistics
1058#[derive(Debug, Clone, Serialize, Deserialize)]
1059pub struct AlertStats {
1060    pub total_active_alerts: usize,
1061    pub total_rules: usize,
1062    pub alerts_by_severity: HashMap<AlertSeverity, usize>,
1063    pub alerts_by_status: HashMap<AlertStatus, usize>,
1064    pub last_evaluation: DateTime<Utc>,
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069    use super::*;
1070    use crate::observability::metrics::MetricsConfig;
1071
1072    async fn create_test_metrics_collector() -> Arc<MetricsCollector> {
1073        Arc::new(
1074            MetricsCollector::new(MetricsConfig::default())
1075                .await
1076                .unwrap(),
1077        )
1078    }
1079
1080    #[tokio::test]
1081    async fn test_console_notification_channel() {
1082        let channel = ConsoleNotificationChannel::new("test_console");
1083        assert_eq!(channel.name(), "test_console");
1084        assert_eq!(channel.channel_type(), NotificationChannelType::Console);
1085        assert!(channel.is_healthy().await);
1086
1087        let notification = AlertNotification {
1088            id: "alert123".to_string(),
1089            rule_id: "rule123".to_string(),
1090            rule_name: "Test Alert".to_string(),
1091            severity: AlertSeverity::High,
1092            status: AlertStatus::Triggered,
1093            message: "Test alert message".to_string(),
1094            details: HashMap::new(),
1095            triggered_at: Utc::now(),
1096            resolved_at: None,
1097            acknowledged_at: None,
1098            acknowledged_by: None,
1099            notification_channels: vec!["test_console".to_string()],
1100            tags: HashMap::new(),
1101        };
1102
1103        // This should print to console
1104        channel.send_notification(&notification).await.unwrap();
1105    }
1106
1107    #[tokio::test]
1108    async fn test_alert_evaluator() {
1109        let evaluator = AlertEvaluator::new(100);
1110
1111        // Test threshold condition
1112        evaluator.update_metric("cpu_usage".to_string(), 50.0).await;
1113        evaluator.update_metric("cpu_usage".to_string(), 75.0).await;
1114        evaluator.update_metric("cpu_usage".to_string(), 85.0).await;
1115
1116        let condition = AlertCondition::Threshold {
1117            metric_name: "cpu_usage".to_string(),
1118            operator: ComparisonOperator::GreaterThan,
1119            value: 80.0,
1120            duration_minutes: 1,
1121        };
1122
1123        let result = evaluator.evaluate_condition(&condition).await.unwrap();
1124        assert!(result); // Latest value (85.0) is > 80.0
1125
1126        // Test rate of change condition
1127        let rate_condition = AlertCondition::RateOfChange {
1128            metric_name: "cpu_usage".to_string(),
1129            operator: ComparisonOperator::GreaterThan,
1130            rate_per_minute: 10.0,
1131            window_minutes: 5,
1132        };
1133
1134        let rate_result = evaluator.evaluate_condition(&rate_condition).await.unwrap();
1135        // Should detect increasing trend
1136        assert!(rate_result);
1137    }
1138
1139    #[tokio::test]
1140    async fn test_alert_rule_creation() {
1141        let rule = AlertRule::new(
1142            "test_rule",
1143            "Test Alert Rule",
1144            AlertCondition::Threshold {
1145                metric_name: "test_metric".to_string(),
1146                operator: ComparisonOperator::GreaterThan,
1147                value: 100.0,
1148                duration_minutes: 5,
1149            },
1150            AlertSeverity::High,
1151        )
1152        .with_description("Test alert rule description")
1153        .with_tag("component", "test")
1154        .with_cooldown(10);
1155
1156        assert_eq!(rule.id, "test_rule");
1157        assert_eq!(rule.name, "Test Alert Rule");
1158        assert_eq!(rule.severity, AlertSeverity::High);
1159        assert_eq!(rule.cooldown_minutes, 10);
1160        assert!(rule.tags.contains_key("component"));
1161        assert_eq!(rule.tags["component"], "test");
1162    }
1163
1164    #[tokio::test]
1165    async fn test_alert_manager() {
1166        let metrics_collector = create_test_metrics_collector().await;
1167        let config = AlertConfig::default();
1168        let mut manager = AlertManager::new(config, metrics_collector).await.unwrap();
1169
1170        assert!(!manager.is_healthy().await);
1171
1172        manager.start().await.unwrap();
1173        assert!(manager.is_healthy().await);
1174
1175        // Test adding custom alert rule
1176        let custom_rule = AlertRule::new(
1177            "custom_rule",
1178            "Custom Test Rule",
1179            AlertCondition::Threshold {
1180                metric_name: "custom_metric".to_string(),
1181                operator: ComparisonOperator::GreaterThan,
1182                value: 50.0,
1183                duration_minutes: 1,
1184            },
1185            AlertSeverity::Medium,
1186        );
1187
1188        manager.add_alert_rule(custom_rule).await.unwrap();
1189
1190        let rules = manager.get_alert_rules().await;
1191        assert!(rules.iter().any(|r| r.id == "custom_rule"));
1192
1193        let stats = manager.get_alert_stats().await;
1194        assert!(stats.total_rules > 0);
1195
1196        manager.stop().await.unwrap();
1197        assert!(!manager.is_healthy().await);
1198    }
1199
1200    #[test]
1201    fn test_alert_severity_ordering() {
1202        assert!(AlertSeverity::Critical > AlertSeverity::High);
1203        assert!(AlertSeverity::High > AlertSeverity::Medium);
1204        assert!(AlertSeverity::Medium > AlertSeverity::Low);
1205    }
1206
1207    #[test]
1208    fn test_comparison_operators() {
1209        assert_eq!(
1210            ComparisonOperator::GreaterThan,
1211            ComparisonOperator::GreaterThan
1212        );
1213        assert_ne!(
1214            ComparisonOperator::GreaterThan,
1215            ComparisonOperator::LessThan
1216        );
1217    }
1218
1219    #[tokio::test]
1220    async fn test_alert_acknowledgment() {
1221        let metrics_collector = create_test_metrics_collector().await;
1222        let config = AlertConfig::default();
1223        let manager = AlertManager::new(config, metrics_collector).await.unwrap();
1224
1225        // Create a mock active alert
1226        let alert = AlertNotification {
1227            id: "test_alert".to_string(),
1228            rule_id: "test_rule".to_string(),
1229            rule_name: "Test Rule".to_string(),
1230            severity: AlertSeverity::High,
1231            status: AlertStatus::Triggered,
1232            message: "Test alert".to_string(),
1233            details: HashMap::new(),
1234            triggered_at: Utc::now(),
1235            resolved_at: None,
1236            acknowledged_at: None,
1237            acknowledged_by: None,
1238            notification_channels: vec![],
1239            tags: HashMap::new(),
1240        };
1241
1242        {
1243            let mut alerts = manager.active_alerts.write().await;
1244            alerts.insert("test_rule".to_string(), alert);
1245        }
1246
1247        manager
1248            .acknowledge_alert("test_rule", "test_user")
1249            .await
1250            .unwrap();
1251
1252        let active_alerts = manager.get_active_alerts().await;
1253        let acknowledged_alert = active_alerts
1254            .iter()
1255            .find(|a| a.rule_id == "test_rule")
1256            .unwrap();
1257        assert_eq!(acknowledged_alert.status, AlertStatus::Acknowledged);
1258        assert!(acknowledged_alert.acknowledged_at.is_some());
1259        assert_eq!(
1260            acknowledged_alert.acknowledged_by.as_ref().unwrap(),
1261            "test_user"
1262        );
1263    }
1264}