1use super::metrics::{Metric, MetricValue, MetricsCollector};
7use crate::{RragError, RragResult};
8use chrono::{DateTime, Duration, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct AlertConfig {
17 pub enabled: bool,
18 pub evaluation_interval_seconds: u64,
19 pub alert_buffer_size: usize,
20 pub notification_channels: Vec<NotificationChannelConfig>,
21 pub default_severity: AlertSeverity,
22 pub alert_grouping_enabled: bool,
23 pub alert_grouping_window_minutes: u32,
24 pub escalation_enabled: bool,
25 pub escalation_delay_minutes: u32,
26}
27
28impl Default for AlertConfig {
29 fn default() -> Self {
30 Self {
31 enabled: true,
32 evaluation_interval_seconds: 30,
33 alert_buffer_size: 1000,
34 notification_channels: vec![NotificationChannelConfig {
35 name: "console".to_string(),
36 channel_type: NotificationChannelType::Console,
37 enabled: true,
38 config: HashMap::new(),
39 }],
40 default_severity: AlertSeverity::Medium,
41 alert_grouping_enabled: true,
42 alert_grouping_window_minutes: 5,
43 escalation_enabled: false,
44 escalation_delay_minutes: 30,
45 }
46 }
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct NotificationChannelConfig {
52 pub name: String,
53 pub channel_type: NotificationChannelType,
54 pub enabled: bool,
55 pub config: HashMap<String, String>,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
59pub enum NotificationChannelType {
60 Console,
61 Email,
62 Slack,
63 Webhook,
64 SMS,
65 PagerDuty,
66}
67
68#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
70pub enum AlertSeverity {
71 Low = 1,
72 Medium = 2,
73 High = 3,
74 Critical = 4,
75}
76
77impl std::fmt::Display for AlertSeverity {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 match self {
80 Self::Low => write!(f, "LOW"),
81 Self::Medium => write!(f, "MEDIUM"),
82 Self::High => write!(f, "HIGH"),
83 Self::Critical => write!(f, "CRITICAL"),
84 }
85 }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub enum AlertCondition {
91 Threshold {
92 metric_name: String,
93 operator: ComparisonOperator,
94 value: f64,
95 duration_minutes: u32,
96 },
97 RateOfChange {
98 metric_name: String,
99 operator: ComparisonOperator,
100 rate_per_minute: f64,
101 window_minutes: u32,
102 },
103 Anomaly {
104 metric_name: String,
105 sensitivity: f64,
106 baseline_minutes: u32,
107 },
108 Composite {
109 conditions: Vec<AlertCondition>,
110 logic: LogicOperator,
111 },
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
115pub enum ComparisonOperator {
116 GreaterThan,
117 LessThan,
118 GreaterThanOrEqual,
119 LessThanOrEqual,
120 Equal,
121 NotEqual,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub enum LogicOperator {
126 And,
127 Or,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct AlertRule {
133 pub id: String,
134 pub name: String,
135 pub description: String,
136 pub condition: AlertCondition,
137 pub severity: AlertSeverity,
138 pub enabled: bool,
139 pub notification_channels: Vec<String>,
140 pub tags: HashMap<String, String>,
141 pub created_at: DateTime<Utc>,
142 pub updated_at: DateTime<Utc>,
143 pub cooldown_minutes: u32,
144 pub auto_resolve: bool,
145 pub auto_resolve_after_minutes: Option<u32>,
146}
147
148impl AlertRule {
149 pub fn new(
150 id: impl Into<String>,
151 name: impl Into<String>,
152 condition: AlertCondition,
153 severity: AlertSeverity,
154 ) -> Self {
155 let now = Utc::now();
156 Self {
157 id: id.into(),
158 name: name.into(),
159 description: String::new(),
160 condition,
161 severity,
162 enabled: true,
163 notification_channels: vec!["console".to_string()],
164 tags: HashMap::new(),
165 created_at: now,
166 updated_at: now,
167 cooldown_minutes: 5,
168 auto_resolve: true,
169 auto_resolve_after_minutes: Some(30),
170 }
171 }
172
173 pub fn with_description(mut self, description: impl Into<String>) -> Self {
174 self.description = description.into();
175 self
176 }
177
178 pub fn with_channels(mut self, channels: Vec<String>) -> Self {
179 self.notification_channels = channels;
180 self
181 }
182
183 pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
184 self.tags.insert(key.into(), value.into());
185 self
186 }
187
188 pub fn with_cooldown(mut self, minutes: u32) -> Self {
189 self.cooldown_minutes = minutes;
190 self
191 }
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct AlertNotification {
197 pub id: String,
198 pub rule_id: String,
199 pub rule_name: String,
200 pub severity: AlertSeverity,
201 pub status: AlertStatus,
202 pub message: String,
203 pub details: HashMap<String, serde_json::Value>,
204 pub triggered_at: DateTime<Utc>,
205 pub resolved_at: Option<DateTime<Utc>>,
206 pub acknowledged_at: Option<DateTime<Utc>>,
207 pub acknowledged_by: Option<String>,
208 pub notification_channels: Vec<String>,
209 pub tags: HashMap<String, String>,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
213pub enum AlertStatus {
214 Triggered,
215 Acknowledged,
216 Resolved,
217 Suppressed,
218}
219
220#[async_trait::async_trait]
222pub trait NotificationChannel: Send + Sync {
223 async fn send_notification(&self, notification: &AlertNotification) -> RragResult<()>;
224 fn channel_type(&self) -> NotificationChannelType;
225 fn name(&self) -> &str;
226 async fn is_healthy(&self) -> bool;
227}
228
229pub struct ConsoleNotificationChannel {
231 name: String,
232}
233
234impl ConsoleNotificationChannel {
235 pub fn new(name: impl Into<String>) -> Self {
236 Self { name: name.into() }
237 }
238}
239
240#[async_trait::async_trait]
241impl NotificationChannel for ConsoleNotificationChannel {
242 async fn send_notification(&self, notification: &AlertNotification) -> RragResult<()> {
243 let status_symbol = match notification.status {
244 AlertStatus::Triggered => "🚨",
245 AlertStatus::Acknowledged => "✅",
246 AlertStatus::Resolved => "✅",
247 AlertStatus::Suppressed => "🔇",
248 };
249
250 let severity_color = match notification.severity {
251 AlertSeverity::Critical => "\x1b[31m", AlertSeverity::High => "\x1b[33m", AlertSeverity::Medium => "\x1b[36m", AlertSeverity::Low => "\x1b[32m", };
256
257 tracing::debug!(
258 "{} {}[{}]\x1b[0m {} - {} ({})",
259 status_symbol,
260 severity_color,
261 notification.severity,
262 notification.rule_name,
263 notification.message,
264 notification.triggered_at.format("%Y-%m-%d %H:%M:%S UTC")
265 );
266
267 if !notification.details.is_empty() {
268 tracing::debug!(" Details: {:?}", notification.details);
269 }
270
271 Ok(())
272 }
273
274 fn channel_type(&self) -> NotificationChannelType {
275 NotificationChannelType::Console
276 }
277
278 fn name(&self) -> &str {
279 &self.name
280 }
281
282 async fn is_healthy(&self) -> bool {
283 true }
285}
286
287pub struct WebhookNotificationChannel {
289 name: String,
290 url: String,
291 headers: HashMap<String, String>,
292 #[cfg(feature = "http")]
293 client: reqwest::Client,
294}
295
296impl WebhookNotificationChannel {
297 pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
298 Self {
299 name: name.into(),
300 url: url.into(),
301 headers: HashMap::new(),
302 #[cfg(feature = "http")]
303 client: reqwest::Client::new(),
304 }
305 }
306
307 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
308 self.headers.insert(key.into(), value.into());
309 self
310 }
311}
312
313#[async_trait::async_trait]
314impl NotificationChannel for WebhookNotificationChannel {
315 async fn send_notification(&self, notification: &AlertNotification) -> RragResult<()> {
316 #[cfg(feature = "http")]
317 {
318 let payload = serde_json::json!({
319 "alert_id": notification.id,
320 "rule_name": notification.rule_name,
321 "severity": notification.severity,
322 "status": notification.status,
323 "message": notification.message,
324 "details": notification.details,
325 "triggered_at": notification.triggered_at,
326 "tags": notification.tags
327 });
328
329 let mut request = self.client.post(&self.url).json(&payload);
330
331 for (key, value) in &self.headers {
332 request = request.header(key, value);
333 }
334
335 request
336 .send()
337 .await
338 .map_err(|e| RragError::network("webhook_notification", Box::new(e)))?
339 .error_for_status()
340 .map_err(|e| RragError::network("webhook_notification", Box::new(e)))?;
341
342 Ok(())
343 }
344 #[cfg(not(feature = "http"))]
345 {
346 tracing::warn!(
347 "HTTP feature not enabled, webhook notification to {} skipped",
348 self.url
349 );
350 Ok(())
351 }
352 }
353
354 fn channel_type(&self) -> NotificationChannelType {
355 NotificationChannelType::Webhook
356 }
357
358 fn name(&self) -> &str {
359 &self.name
360 }
361
362 async fn is_healthy(&self) -> bool {
363 #[cfg(feature = "http")]
364 {
365 self.client.head(&self.url).send().await.is_ok()
367 }
368 #[cfg(not(feature = "http"))]
369 {
370 true
372 }
373 }
374}
375
376pub struct AlertEvaluator {
378 metrics_history: Arc<RwLock<HashMap<String, Vec<(DateTime<Utc>, f64)>>>>,
379 max_history_size: usize,
380}
381
382impl AlertEvaluator {
383 pub fn new(max_history_size: usize) -> Self {
384 Self {
385 metrics_history: Arc::new(RwLock::new(HashMap::new())),
386 max_history_size,
387 }
388 }
389
390 pub async fn update_metric(&self, metric_name: String, value: f64) {
391 let mut history = self.metrics_history.write().await;
392 let entry = history.entry(metric_name).or_insert_with(Vec::new);
393
394 entry.push((Utc::now(), value));
395
396 if entry.len() > self.max_history_size {
398 entry.drain(0..entry.len() - self.max_history_size);
399 }
400 }
401
402 pub async fn evaluate_condition(&self, condition: &AlertCondition) -> RragResult<bool> {
403 match condition {
404 AlertCondition::Threshold {
405 metric_name,
406 operator,
407 value,
408 duration_minutes,
409 } => {
410 self.evaluate_threshold(metric_name, operator, *value, *duration_minutes)
411 .await
412 }
413 AlertCondition::RateOfChange {
414 metric_name,
415 operator,
416 rate_per_minute,
417 window_minutes,
418 } => {
419 self.evaluate_rate_of_change(
420 metric_name,
421 operator,
422 *rate_per_minute,
423 *window_minutes,
424 )
425 .await
426 }
427 AlertCondition::Anomaly {
428 metric_name,
429 sensitivity,
430 baseline_minutes,
431 } => {
432 self.evaluate_anomaly(metric_name, *sensitivity, *baseline_minutes)
433 .await
434 }
435 AlertCondition::Composite { conditions, logic } => {
436 self.evaluate_composite(conditions, logic).await
437 }
438 }
439 }
440
441 async fn evaluate_threshold(
442 &self,
443 metric_name: &str,
444 operator: &ComparisonOperator,
445 threshold: f64,
446 duration_minutes: u32,
447 ) -> RragResult<bool> {
448 let history = self.metrics_history.read().await;
449 let values = history.get(metric_name).ok_or_else(|| {
450 RragError::agent(
451 "alert_evaluator",
452 format!("Metric not found: {}", metric_name),
453 )
454 })?;
455
456 if values.is_empty() {
457 return Ok(false);
458 }
459
460 let cutoff_time = Utc::now() - Duration::minutes(duration_minutes as i64);
461 let recent_values: Vec<_> = values
462 .iter()
463 .filter(|(timestamp, _)| *timestamp >= cutoff_time)
464 .map(|(_, value)| *value)
465 .collect();
466
467 if recent_values.is_empty() {
468 return Ok(false);
469 }
470
471 Ok(recent_values.iter().all(|&value| match operator {
473 ComparisonOperator::GreaterThan => value > threshold,
474 ComparisonOperator::LessThan => value < threshold,
475 ComparisonOperator::GreaterThanOrEqual => value >= threshold,
476 ComparisonOperator::LessThanOrEqual => value <= threshold,
477 ComparisonOperator::Equal => (value - threshold).abs() < f64::EPSILON,
478 ComparisonOperator::NotEqual => (value - threshold).abs() >= f64::EPSILON,
479 }))
480 }
481
482 async fn evaluate_rate_of_change(
483 &self,
484 metric_name: &str,
485 operator: &ComparisonOperator,
486 rate_threshold: f64,
487 window_minutes: u32,
488 ) -> RragResult<bool> {
489 let history = self.metrics_history.read().await;
490 let values = history.get(metric_name).ok_or_else(|| {
491 RragError::agent(
492 "alert_evaluator",
493 format!("Metric not found: {}", metric_name),
494 )
495 })?;
496
497 if values.len() < 2 {
498 return Ok(false);
499 }
500
501 let cutoff_time = Utc::now() - Duration::minutes(window_minutes as i64);
502 let recent_values: Vec<_> = values
503 .iter()
504 .filter(|(timestamp, _)| *timestamp >= cutoff_time)
505 .collect();
506
507 if recent_values.len() < 2 {
508 return Ok(false);
509 }
510
511 let (earliest_time, earliest_value) = recent_values.first().unwrap();
512 let (latest_time, latest_value) = recent_values.last().unwrap();
513
514 let time_diff_minutes = (*latest_time - *earliest_time).num_minutes() as f64;
515 if time_diff_minutes <= 0.0 {
516 return Ok(false);
517 }
518
519 let rate_of_change = (latest_value - earliest_value) / time_diff_minutes;
520
521 Ok(match operator {
522 ComparisonOperator::GreaterThan => rate_of_change > rate_threshold,
523 ComparisonOperator::LessThan => rate_of_change < rate_threshold,
524 ComparisonOperator::GreaterThanOrEqual => rate_of_change >= rate_threshold,
525 ComparisonOperator::LessThanOrEqual => rate_of_change <= rate_threshold,
526 ComparisonOperator::Equal => (rate_of_change - rate_threshold).abs() < f64::EPSILON,
527 ComparisonOperator::NotEqual => (rate_of_change - rate_threshold).abs() >= f64::EPSILON,
528 })
529 }
530
531 async fn evaluate_anomaly(
532 &self,
533 metric_name: &str,
534 sensitivity: f64,
535 baseline_minutes: u32,
536 ) -> RragResult<bool> {
537 let history = self.metrics_history.read().await;
538 let values = history.get(metric_name).ok_or_else(|| {
539 RragError::agent(
540 "alert_evaluator",
541 format!("Metric not found: {}", metric_name),
542 )
543 })?;
544
545 if values.len() < 10 {
546 return Ok(false); }
548
549 let cutoff_time = Utc::now() - Duration::minutes(baseline_minutes as i64);
550 let baseline_values: Vec<f64> = values
551 .iter()
552 .filter(|(timestamp, _)| *timestamp >= cutoff_time)
553 .map(|(_, value)| *value)
554 .collect();
555
556 if baseline_values.len() < 5 {
557 return Ok(false);
558 }
559
560 let mean = baseline_values.iter().sum::<f64>() / baseline_values.len() as f64;
562 let variance = baseline_values
563 .iter()
564 .map(|value| (value - mean).powi(2))
565 .sum::<f64>()
566 / baseline_values.len() as f64;
567 let std_dev = variance.sqrt();
568
569 let current_value = values.last().unwrap().1;
570 let z_score = (current_value - mean) / std_dev;
571
572 Ok(z_score.abs() > sensitivity)
573 }
574
575 async fn evaluate_composite(
576 &self,
577 conditions: &[AlertCondition],
578 logic: &LogicOperator,
579 ) -> RragResult<bool> {
580 let mut results = Vec::new();
581 for condition in conditions {
582 let result = match condition {
583 AlertCondition::Threshold {
584 metric_name,
585 operator,
586 value,
587 duration_minutes,
588 } => {
589 self.evaluate_threshold(metric_name, operator, *value, *duration_minutes)
590 .await?
591 }
592 AlertCondition::RateOfChange {
593 metric_name,
594 operator,
595 rate_per_minute,
596 window_minutes,
597 } => {
598 self.evaluate_rate_of_change(
599 metric_name,
600 operator,
601 *rate_per_minute,
602 *window_minutes,
603 )
604 .await?
605 }
606 AlertCondition::Anomaly {
607 metric_name,
608 sensitivity,
609 baseline_minutes,
610 } => {
611 self.evaluate_anomaly(metric_name, *sensitivity, *baseline_minutes)
612 .await?
613 }
614 AlertCondition::Composite { .. } => {
615 return Err(RragError::config(
617 "alert_condition",
618 "non-nested composite",
619 "nested composite",
620 ));
621 }
622 };
623 results.push(result);
624 }
625
626 Ok(match logic {
627 LogicOperator::And => results.iter().all(|&result| result),
628 LogicOperator::Or => results.iter().any(|&result| result),
629 })
630 }
631}
632
633pub struct AlertManager {
635 config: AlertConfig,
636 metrics_collector: Arc<MetricsCollector>,
637 alert_rules: Arc<RwLock<HashMap<String, AlertRule>>>,
638 active_alerts: Arc<RwLock<HashMap<String, AlertNotification>>>,
639 notification_channels: Arc<RwLock<HashMap<String, Box<dyn NotificationChannel>>>>,
640 evaluator: Arc<AlertEvaluator>,
641 evaluation_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
642 is_running: Arc<RwLock<bool>>,
643}
644
645impl AlertManager {
646 pub async fn new(
647 config: AlertConfig,
648 metrics_collector: Arc<MetricsCollector>,
649 ) -> RragResult<Self> {
650 let manager = Self {
651 config: config.clone(),
652 metrics_collector,
653 alert_rules: Arc::new(RwLock::new(HashMap::new())),
654 active_alerts: Arc::new(RwLock::new(HashMap::new())),
655 notification_channels: Arc::new(RwLock::new(HashMap::new())),
656 evaluator: Arc::new(AlertEvaluator::new(1000)),
657 evaluation_handle: Arc::new(RwLock::new(None)),
658 is_running: Arc::new(RwLock::new(false)),
659 };
660
661 manager.setup_notification_channels().await?;
663
664 manager.setup_default_rules().await?;
666
667 Ok(manager)
668 }
669
670 async fn setup_notification_channels(&self) -> RragResult<()> {
671 let mut channels = self.notification_channels.write().await;
672
673 for channel_config in &self.config.notification_channels {
674 if !channel_config.enabled {
675 continue;
676 }
677
678 let channel: Box<dyn NotificationChannel> = match channel_config.channel_type {
679 NotificationChannelType::Console => {
680 Box::new(ConsoleNotificationChannel::new(&channel_config.name))
681 }
682 NotificationChannelType::Webhook => {
683 if let Some(url) = channel_config.config.get("url") {
684 let mut webhook =
685 WebhookNotificationChannel::new(&channel_config.name, url);
686
687 for (key, value) in &channel_config.config {
689 if key.starts_with("header_") {
690 let header_name = key.strip_prefix("header_").unwrap();
691 webhook = webhook.with_header(header_name, value);
692 }
693 }
694
695 Box::new(webhook)
696 } else {
697 return Err(RragError::config("webhook_channel", "url", "missing"));
698 }
699 }
700 _ => {
701 tracing::warn!(
702 "Notification channel type {:?} not yet implemented",
703 channel_config.channel_type
704 );
705 continue;
706 }
707 };
708
709 channels.insert(channel_config.name.clone(), channel);
710 }
711
712 Ok(())
713 }
714
715 async fn setup_default_rules(&self) -> RragResult<()> {
716 let mut rules = self.alert_rules.write().await;
717
718 let cpu_rule = AlertRule::new(
720 "high_cpu_usage",
721 "High CPU Usage",
722 AlertCondition::Threshold {
723 metric_name: "system_cpu_usage_percent".to_string(),
724 operator: ComparisonOperator::GreaterThan,
725 value: 80.0,
726 duration_minutes: 5,
727 },
728 AlertSeverity::High,
729 )
730 .with_description("CPU usage is above 80% for more than 5 minutes");
731
732 let memory_rule = AlertRule::new(
734 "high_memory_usage",
735 "High Memory Usage",
736 AlertCondition::Threshold {
737 metric_name: "system_memory_usage_percent".to_string(),
738 operator: ComparisonOperator::GreaterThan,
739 value: 85.0,
740 duration_minutes: 5,
741 },
742 AlertSeverity::High,
743 )
744 .with_description("Memory usage is above 85% for more than 5 minutes");
745
746 let error_rate_rule = AlertRule::new(
748 "high_error_rate",
749 "High Error Rate",
750 AlertCondition::RateOfChange {
751 metric_name: "search_queries_failed".to_string(),
752 operator: ComparisonOperator::GreaterThan,
753 rate_per_minute: 10.0,
754 window_minutes: 10,
755 },
756 AlertSeverity::Critical,
757 )
758 .with_description("Error rate is increasing rapidly");
759
760 let slow_response_rule = AlertRule::new(
762 "slow_response_time",
763 "Slow Response Time",
764 AlertCondition::Threshold {
765 metric_name: "search_processing_time_ms".to_string(),
766 operator: ComparisonOperator::GreaterThan,
767 value: 1000.0,
768 duration_minutes: 3,
769 },
770 AlertSeverity::Medium,
771 )
772 .with_description("Search response time is above 1 second");
773
774 rules.insert("high_cpu_usage".to_string(), cpu_rule);
775 rules.insert("high_memory_usage".to_string(), memory_rule);
776 rules.insert("high_error_rate".to_string(), error_rate_rule);
777 rules.insert("slow_response_time".to_string(), slow_response_rule);
778
779 Ok(())
780 }
781
782 pub async fn start(&self) -> RragResult<()> {
783 let mut running = self.is_running.write().await;
784 if *running {
785 return Err(RragError::config(
786 "alert_manager",
787 "stopped",
788 "already running",
789 ));
790 }
791
792 let handle = self.start_evaluation_loop().await?;
793 {
794 let mut eval_handle = self.evaluation_handle.write().await;
795 *eval_handle = Some(handle);
796 }
797
798 *running = true;
799 tracing::info!("Alert manager started");
800 Ok(())
801 }
802
803 pub async fn stop(&self) -> RragResult<()> {
804 let mut running = self.is_running.write().await;
805 if !*running {
806 return Ok(());
807 }
808
809 {
810 let mut eval_handle = self.evaluation_handle.write().await;
811 if let Some(handle) = eval_handle.take() {
812 handle.abort();
813 }
814 }
815
816 *running = false;
817 tracing::info!("Alert manager stopped");
818 Ok(())
819 }
820
821 pub async fn is_healthy(&self) -> bool {
822 *self.is_running.read().await
823 }
824
825 async fn start_evaluation_loop(&self) -> RragResult<tokio::task::JoinHandle<()>> {
826 let config = self.config.clone();
827 let alert_rules = self.alert_rules.clone();
828 let active_alerts = self.active_alerts.clone();
829 let notification_channels = self.notification_channels.clone();
830 let evaluator = self.evaluator.clone();
831 let metrics_collector = self.metrics_collector.clone();
832 let is_running = self.is_running.clone();
833
834 let handle = tokio::spawn(async move {
835 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
836 config.evaluation_interval_seconds,
837 ));
838
839 while *is_running.read().await {
840 interval.tick().await;
841
842 let all_metrics = metrics_collector.get_all_metrics().await;
844 for metric in all_metrics {
845 if let Some(value) = Self::extract_metric_value(&metric) {
846 evaluator.update_metric(metric.name, value).await;
847 }
848 }
849
850 let rules = alert_rules.read().await;
852 for (rule_id, rule) in rules.iter() {
853 if !rule.enabled {
854 continue;
855 }
856
857 match evaluator.evaluate_condition(&rule.condition).await {
858 Ok(triggered) => {
859 if triggered {
860 Self::handle_alert_triggered(
861 rule_id,
862 rule,
863 &active_alerts,
864 ¬ification_channels,
865 )
866 .await;
867 } else {
868 Self::handle_alert_resolved(
869 rule_id,
870 rule,
871 &active_alerts,
872 ¬ification_channels,
873 )
874 .await;
875 }
876 }
877 Err(e) => {
878 tracing::error!("Failed to evaluate alert rule {}: {}", rule_id, e);
879 }
880 }
881 }
882 }
883 });
884
885 Ok(handle)
886 }
887
888 fn extract_metric_value(metric: &Metric) -> Option<f64> {
889 match &metric.value {
890 MetricValue::Counter(value) => Some(*value as f64),
891 MetricValue::Gauge(value) => Some(*value),
892 MetricValue::Timer { duration_ms, .. } => Some(*duration_ms),
893 MetricValue::Histogram { sum, count, .. } => {
894 if *count > 0 {
895 Some(sum / *count as f64)
896 } else {
897 Some(0.0)
898 }
899 }
900 MetricValue::Summary { sum, count, .. } => {
901 if *count > 0 {
902 Some(sum / *count as f64)
903 } else {
904 Some(0.0)
905 }
906 }
907 }
908 }
909
910 async fn handle_alert_triggered(
911 rule_id: &str,
912 rule: &AlertRule,
913 active_alerts: &Arc<RwLock<HashMap<String, AlertNotification>>>,
914 notification_channels: &Arc<RwLock<HashMap<String, Box<dyn NotificationChannel>>>>,
915 ) {
916 let mut alerts = active_alerts.write().await;
917
918 if let Some(existing_alert) = alerts.get(rule_id) {
920 let cooldown_duration = Duration::minutes(rule.cooldown_minutes as i64);
921 if existing_alert.triggered_at + cooldown_duration > Utc::now() {
922 return; }
924 }
925
926 let alert_notification = AlertNotification {
927 id: uuid::Uuid::new_v4().to_string(),
928 rule_id: rule_id.to_string(),
929 rule_name: rule.name.clone(),
930 severity: rule.severity,
931 status: AlertStatus::Triggered,
932 message: format!("Alert triggered: {}", rule.description),
933 details: HashMap::new(),
934 triggered_at: Utc::now(),
935 resolved_at: None,
936 acknowledged_at: None,
937 acknowledged_by: None,
938 notification_channels: rule.notification_channels.clone(),
939 tags: rule.tags.clone(),
940 };
941
942 alerts.insert(rule_id.to_string(), alert_notification.clone());
943 drop(alerts);
944
945 let channels = notification_channels.read().await;
947 for channel_name in &rule.notification_channels {
948 if let Some(channel) = channels.get(channel_name) {
949 if let Err(e) = channel.send_notification(&alert_notification).await {
950 tracing::error!("Failed to send notification via {}: {}", channel_name, e);
951 }
952 }
953 }
954 }
955
956 async fn handle_alert_resolved(
957 rule_id: &str,
958 rule: &AlertRule,
959 active_alerts: &Arc<RwLock<HashMap<String, AlertNotification>>>,
960 notification_channels: &Arc<RwLock<HashMap<String, Box<dyn NotificationChannel>>>>,
961 ) {
962 let mut alerts = active_alerts.write().await;
963
964 if let Some(mut alert) = alerts.remove(rule_id) {
965 if rule.auto_resolve && alert.status == AlertStatus::Triggered {
966 alert.status = AlertStatus::Resolved;
967 alert.resolved_at = Some(Utc::now());
968 alert.message = format!("Alert resolved: {}", rule.description);
969
970 drop(alerts);
971
972 let channels = notification_channels.read().await;
974 for channel_name in &rule.notification_channels {
975 if let Some(channel) = channels.get(channel_name) {
976 if let Err(e) = channel.send_notification(&alert).await {
977 tracing::error!(
978 "Failed to send resolution notification via {}: {}",
979 channel_name,
980 e
981 );
982 }
983 }
984 }
985 }
986 }
987 }
988
989 pub async fn add_alert_rule(&self, rule: AlertRule) -> RragResult<()> {
990 let mut rules = self.alert_rules.write().await;
991 rules.insert(rule.id.clone(), rule);
992 Ok(())
993 }
994
995 pub async fn remove_alert_rule(&self, rule_id: &str) -> RragResult<()> {
996 let mut rules = self.alert_rules.write().await;
997 rules.remove(rule_id);
998
999 let mut alerts = self.active_alerts.write().await;
1001 alerts.remove(rule_id);
1002
1003 Ok(())
1004 }
1005
1006 pub async fn acknowledge_alert(
1007 &self,
1008 rule_id: &str,
1009 acknowledged_by: impl Into<String>,
1010 ) -> RragResult<()> {
1011 let mut alerts = self.active_alerts.write().await;
1012 if let Some(alert) = alerts.get_mut(rule_id) {
1013 alert.status = AlertStatus::Acknowledged;
1014 alert.acknowledged_at = Some(Utc::now());
1015 alert.acknowledged_by = Some(acknowledged_by.into());
1016
1017 tracing::info!("Alert {} acknowledged", rule_id);
1018 }
1019 Ok(())
1020 }
1021
1022 pub async fn get_active_alerts(&self) -> Vec<AlertNotification> {
1023 let alerts = self.active_alerts.read().await;
1024 alerts.values().cloned().collect()
1025 }
1026
1027 pub async fn get_alert_rules(&self) -> Vec<AlertRule> {
1028 let rules = self.alert_rules.read().await;
1029 rules.values().cloned().collect()
1030 }
1031
1032 pub async fn get_alert_stats(&self) -> AlertStats {
1033 let alerts = self.active_alerts.read().await;
1034 let rules = self.alert_rules.read().await;
1035
1036 let total_alerts = alerts.len();
1037 let by_severity = alerts.values().fold(HashMap::new(), |mut acc, alert| {
1038 *acc.entry(alert.severity).or_insert(0) += 1;
1039 acc
1040 });
1041
1042 let by_status = alerts.values().fold(HashMap::new(), |mut acc, alert| {
1043 *acc.entry(alert.status.clone()).or_insert(0) += 1;
1044 acc
1045 });
1046
1047 AlertStats {
1048 total_active_alerts: total_alerts,
1049 total_rules: rules.len(),
1050 alerts_by_severity: by_severity,
1051 alerts_by_status: by_status,
1052 last_evaluation: Utc::now(),
1053 }
1054 }
1055}
1056
1057#[derive(Debug, Clone, Serialize, Deserialize)]
1059pub struct AlertStats {
1060 pub total_active_alerts: usize,
1061 pub total_rules: usize,
1062 pub alerts_by_severity: HashMap<AlertSeverity, usize>,
1063 pub alerts_by_status: HashMap<AlertStatus, usize>,
1064 pub last_evaluation: DateTime<Utc>,
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069 use super::*;
1070 use crate::observability::metrics::MetricsConfig;
1071
1072 async fn create_test_metrics_collector() -> Arc<MetricsCollector> {
1073 Arc::new(
1074 MetricsCollector::new(MetricsConfig::default())
1075 .await
1076 .unwrap(),
1077 )
1078 }
1079
1080 #[tokio::test]
1081 async fn test_console_notification_channel() {
1082 let channel = ConsoleNotificationChannel::new("test_console");
1083 assert_eq!(channel.name(), "test_console");
1084 assert_eq!(channel.channel_type(), NotificationChannelType::Console);
1085 assert!(channel.is_healthy().await);
1086
1087 let notification = AlertNotification {
1088 id: "alert123".to_string(),
1089 rule_id: "rule123".to_string(),
1090 rule_name: "Test Alert".to_string(),
1091 severity: AlertSeverity::High,
1092 status: AlertStatus::Triggered,
1093 message: "Test alert message".to_string(),
1094 details: HashMap::new(),
1095 triggered_at: Utc::now(),
1096 resolved_at: None,
1097 acknowledged_at: None,
1098 acknowledged_by: None,
1099 notification_channels: vec!["test_console".to_string()],
1100 tags: HashMap::new(),
1101 };
1102
1103 channel.send_notification(¬ification).await.unwrap();
1105 }
1106
1107 #[tokio::test]
1108 async fn test_alert_evaluator() {
1109 let evaluator = AlertEvaluator::new(100);
1110
1111 evaluator.update_metric("cpu_usage".to_string(), 50.0).await;
1113 evaluator.update_metric("cpu_usage".to_string(), 75.0).await;
1114 evaluator.update_metric("cpu_usage".to_string(), 85.0).await;
1115
1116 let condition = AlertCondition::Threshold {
1117 metric_name: "cpu_usage".to_string(),
1118 operator: ComparisonOperator::GreaterThan,
1119 value: 80.0,
1120 duration_minutes: 1,
1121 };
1122
1123 let result = evaluator.evaluate_condition(&condition).await.unwrap();
1124 assert!(result); let rate_condition = AlertCondition::RateOfChange {
1128 metric_name: "cpu_usage".to_string(),
1129 operator: ComparisonOperator::GreaterThan,
1130 rate_per_minute: 10.0,
1131 window_minutes: 5,
1132 };
1133
1134 let rate_result = evaluator.evaluate_condition(&rate_condition).await.unwrap();
1135 assert!(rate_result);
1137 }
1138
1139 #[tokio::test]
1140 async fn test_alert_rule_creation() {
1141 let rule = AlertRule::new(
1142 "test_rule",
1143 "Test Alert Rule",
1144 AlertCondition::Threshold {
1145 metric_name: "test_metric".to_string(),
1146 operator: ComparisonOperator::GreaterThan,
1147 value: 100.0,
1148 duration_minutes: 5,
1149 },
1150 AlertSeverity::High,
1151 )
1152 .with_description("Test alert rule description")
1153 .with_tag("component", "test")
1154 .with_cooldown(10);
1155
1156 assert_eq!(rule.id, "test_rule");
1157 assert_eq!(rule.name, "Test Alert Rule");
1158 assert_eq!(rule.severity, AlertSeverity::High);
1159 assert_eq!(rule.cooldown_minutes, 10);
1160 assert!(rule.tags.contains_key("component"));
1161 assert_eq!(rule.tags["component"], "test");
1162 }
1163
1164 #[tokio::test]
1165 async fn test_alert_manager() {
1166 let metrics_collector = create_test_metrics_collector().await;
1167 let config = AlertConfig::default();
1168 let mut manager = AlertManager::new(config, metrics_collector).await.unwrap();
1169
1170 assert!(!manager.is_healthy().await);
1171
1172 manager.start().await.unwrap();
1173 assert!(manager.is_healthy().await);
1174
1175 let custom_rule = AlertRule::new(
1177 "custom_rule",
1178 "Custom Test Rule",
1179 AlertCondition::Threshold {
1180 metric_name: "custom_metric".to_string(),
1181 operator: ComparisonOperator::GreaterThan,
1182 value: 50.0,
1183 duration_minutes: 1,
1184 },
1185 AlertSeverity::Medium,
1186 );
1187
1188 manager.add_alert_rule(custom_rule).await.unwrap();
1189
1190 let rules = manager.get_alert_rules().await;
1191 assert!(rules.iter().any(|r| r.id == "custom_rule"));
1192
1193 let stats = manager.get_alert_stats().await;
1194 assert!(stats.total_rules > 0);
1195
1196 manager.stop().await.unwrap();
1197 assert!(!manager.is_healthy().await);
1198 }
1199
1200 #[test]
1201 fn test_alert_severity_ordering() {
1202 assert!(AlertSeverity::Critical > AlertSeverity::High);
1203 assert!(AlertSeverity::High > AlertSeverity::Medium);
1204 assert!(AlertSeverity::Medium > AlertSeverity::Low);
1205 }
1206
1207 #[test]
1208 fn test_comparison_operators() {
1209 assert_eq!(
1210 ComparisonOperator::GreaterThan,
1211 ComparisonOperator::GreaterThan
1212 );
1213 assert_ne!(
1214 ComparisonOperator::GreaterThan,
1215 ComparisonOperator::LessThan
1216 );
1217 }
1218
1219 #[tokio::test]
1220 async fn test_alert_acknowledgment() {
1221 let metrics_collector = create_test_metrics_collector().await;
1222 let config = AlertConfig::default();
1223 let manager = AlertManager::new(config, metrics_collector).await.unwrap();
1224
1225 let alert = AlertNotification {
1227 id: "test_alert".to_string(),
1228 rule_id: "test_rule".to_string(),
1229 rule_name: "Test Rule".to_string(),
1230 severity: AlertSeverity::High,
1231 status: AlertStatus::Triggered,
1232 message: "Test alert".to_string(),
1233 details: HashMap::new(),
1234 triggered_at: Utc::now(),
1235 resolved_at: None,
1236 acknowledged_at: None,
1237 acknowledged_by: None,
1238 notification_channels: vec![],
1239 tags: HashMap::new(),
1240 };
1241
1242 {
1243 let mut alerts = manager.active_alerts.write().await;
1244 alerts.insert("test_rule".to_string(), alert);
1245 }
1246
1247 manager
1248 .acknowledge_alert("test_rule", "test_user")
1249 .await
1250 .unwrap();
1251
1252 let active_alerts = manager.get_active_alerts().await;
1253 let acknowledged_alert = active_alerts
1254 .iter()
1255 .find(|a| a.rule_id == "test_rule")
1256 .unwrap();
1257 assert_eq!(acknowledged_alert.status, AlertStatus::Acknowledged);
1258 assert!(acknowledged_alert.acknowledged_at.is_some());
1259 assert_eq!(
1260 acknowledged_alert.acknowledged_by.as_ref().unwrap(),
1261 "test_user"
1262 );
1263 }
1264}