1use super::{
2 AlertCondition, AlertRule, AlertSeverity, HealthStatus, PerformanceMetrics, SystemHealth,
3};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use tracing::{error, info, warn};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct Alert {
11 pub id: String,
12 pub rule_name: String,
13 pub severity: AlertSeverity,
14 pub condition: AlertCondition,
15 pub message: String,
16 pub value: f64,
17 pub threshold: f64,
18 pub triggered_at: DateTime<Utc>,
19 pub resolved_at: Option<DateTime<Utc>>,
20 pub metadata: HashMap<String, String>,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct AlertManager {
25 rules: Vec<AlertRule>,
26 active_alerts: HashMap<String, Alert>,
27 alert_history: Vec<Alert>,
28 notification_channels: Vec<NotificationChannel>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct NotificationChannel {
33 pub name: String,
34 pub channel_type: ChannelType,
35 pub config: serde_json::Value,
36 pub enabled: bool,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub enum ChannelType {
41 Log,
42 Webhook,
43 Email,
44 Slack,
45}
46
47impl Default for AlertManager {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl AlertManager {
54 pub fn new() -> Self {
55 Self {
56 rules: Self::default_alert_rules(),
57 active_alerts: HashMap::new(),
58 alert_history: Vec::new(),
59 notification_channels: vec![NotificationChannel {
60 name: "log".to_string(),
61 channel_type: ChannelType::Log,
62 config: serde_json::json!({}),
63 enabled: true,
64 }],
65 }
66 }
67
68 fn default_alert_rules() -> Vec<AlertRule> {
70 vec![
71 AlertRule {
72 name: "high_memory_pressure".to_string(),
73 condition: AlertCondition::MemoryPressure,
74 threshold: 80.0, severity: AlertSeverity::Warning,
76 enabled: true,
77 },
78 AlertRule {
79 name: "critical_memory_pressure".to_string(),
80 condition: AlertCondition::MemoryPressure,
81 threshold: 95.0, severity: AlertSeverity::Critical,
83 enabled: true,
84 },
85 AlertRule {
86 name: "high_error_rate".to_string(),
87 condition: AlertCondition::HighErrorRate,
88 threshold: 5.0, severity: AlertSeverity::Warning,
90 enabled: true,
91 },
92 AlertRule {
93 name: "critical_error_rate".to_string(),
94 condition: AlertCondition::HighErrorRate,
95 threshold: 10.0, severity: AlertSeverity::Critical,
97 enabled: true,
98 },
99 AlertRule {
100 name: "slow_response_time".to_string(),
101 condition: AlertCondition::SlowResponse,
102 threshold: 1000.0, severity: AlertSeverity::Warning,
104 enabled: true,
105 },
106 AlertRule {
107 name: "connection_pool_saturation".to_string(),
108 condition: AlertCondition::ConnectionPoolSaturation,
109 threshold: 90.0, severity: AlertSeverity::Critical,
111 enabled: true,
112 },
113 AlertRule {
114 name: "migration_failures".to_string(),
115 condition: AlertCondition::MigrationFailures,
116 threshold: 10.0, severity: AlertSeverity::Warning,
118 enabled: true,
119 },
120 AlertRule {
121 name: "disk_usage".to_string(),
122 condition: AlertCondition::DiskUsage,
123 threshold: 85.0, severity: AlertSeverity::Warning,
125 enabled: true,
126 },
127 ]
128 }
129
130 pub fn evaluate_alerts(
132 &mut self,
133 health: &SystemHealth,
134 _metrics: Option<&PerformanceMetrics>,
135 ) {
136 let now = Utc::now();
137
138 for rule in &self.rules {
139 if !rule.enabled {
140 continue;
141 }
142
143 let alert_id = format!("{}_{}", rule.name, rule.condition.to_string());
144 let should_trigger = self.evaluate_condition(rule, health);
145 let is_active = self.active_alerts.contains_key(&alert_id);
146
147 match (should_trigger, is_active) {
148 (true, false) => {
149 let value = self.get_condition_value(rule, health);
151 let alert = Alert {
152 id: alert_id.clone(),
153 rule_name: rule.name.clone(),
154 severity: rule.severity.clone(),
155 condition: rule.condition.clone(),
156 message: self.format_alert_message(rule, value),
157 value,
158 threshold: rule.threshold,
159 triggered_at: now,
160 resolved_at: None,
161 metadata: self.get_alert_metadata(rule, health),
162 };
163
164 self.active_alerts.insert(alert_id.clone(), alert.clone());
165 self.alert_history.push(alert.clone());
166 self.send_notification(&alert, true);
167
168 match alert.severity {
169 AlertSeverity::Critical => error!("CRITICAL ALERT: {}", alert.message),
170 AlertSeverity::Warning => warn!("WARNING ALERT: {}", alert.message),
171 AlertSeverity::Info => info!("INFO ALERT: {}", alert.message),
172 }
173 }
174 (false, true) => {
175 if let Some(mut alert) = self.active_alerts.remove(&alert_id) {
177 alert.resolved_at = Some(now);
178 self.send_notification(&alert, false);
179 info!("RESOLVED ALERT: {}", alert.message);
180
181 if let Some(history_alert) = self
183 .alert_history
184 .iter_mut()
185 .find(|a| a.id == alert_id && a.resolved_at.is_none())
186 {
187 history_alert.resolved_at = Some(now);
188 }
189 }
190 }
191 _ => {
192 }
194 }
195 }
196 }
197
198 fn evaluate_condition(&self, rule: &AlertRule, health: &SystemHealth) -> bool {
200 match rule.condition {
201 AlertCondition::MemoryPressure => {
202 let working_component = health.components.get("memory_system");
204 if let Some(component) = working_component {
205 match component.status {
206 HealthStatus::Degraded | HealthStatus::Unhealthy => true,
207 _ => false,
208 }
209 } else {
210 false
211 }
212 }
213 AlertCondition::HighErrorRate => {
214 let total_errors: u64 = health.components.values().map(|c| c.error_count).sum();
216 (total_errors as f64) > rule.threshold
217 }
218 AlertCondition::SlowResponse => {
219 health.components.values().any(|component| {
221 component
222 .response_time_ms
223 .map(|rt| rt as f64 > rule.threshold)
224 .unwrap_or(false)
225 })
226 }
227 AlertCondition::ConnectionPoolSaturation => {
228 let pool_component = health.components.get("connection_pool");
230 if let Some(component) = pool_component {
231 match component.status {
232 HealthStatus::Degraded | HealthStatus::Unhealthy => true,
233 _ => false,
234 }
235 } else {
236 false
237 }
238 }
239 AlertCondition::MigrationFailures => {
240 let memory_component = health.components.get("memory_system");
242 if let Some(component) = memory_component {
243 component.error_count > rule.threshold as u64
244 } else {
245 false
246 }
247 }
248 AlertCondition::DiskUsage => {
249 health.memory_usage_bytes
251 > (rule.threshold / 100.0 * 1024.0 * 1024.0 * 1024.0) as u64
252 }
253 }
254 }
255
256 fn get_condition_value(&self, rule: &AlertRule, health: &SystemHealth) -> f64 {
258 match rule.condition {
259 AlertCondition::MemoryPressure => {
260 (health.memory_usage_bytes as f64) / (1024.0 * 1024.0 * 1024.0) * 100.0
261 }
262 AlertCondition::HighErrorRate => health
263 .components
264 .values()
265 .map(|c| c.error_count as f64)
266 .sum(),
267 AlertCondition::SlowResponse => health
268 .components
269 .values()
270 .filter_map(|c| c.response_time_ms)
271 .max()
272 .unwrap_or(0) as f64,
273 AlertCondition::ConnectionPoolSaturation => {
274 75.0 }
277 AlertCondition::MigrationFailures => health
278 .components
279 .get("memory_system")
280 .map(|c| c.error_count as f64)
281 .unwrap_or(0.0),
282 AlertCondition::DiskUsage => {
283 (health.memory_usage_bytes as f64) / (1024.0 * 1024.0 * 1024.0) * 100.0
284 }
285 }
286 }
287
288 fn format_alert_message(&self, rule: &AlertRule, value: f64) -> String {
290 match rule.condition {
291 AlertCondition::MemoryPressure => {
292 format!(
293 "High memory pressure detected: {:.1}% (threshold: {:.1}%)",
294 value, rule.threshold
295 )
296 }
297 AlertCondition::HighErrorRate => {
298 format!(
299 "High error rate: {:.0} errors (threshold: {:.0})",
300 value, rule.threshold
301 )
302 }
303 AlertCondition::SlowResponse => {
304 format!(
305 "Slow response time: {:.0}ms (threshold: {:.0}ms)",
306 value, rule.threshold
307 )
308 }
309 AlertCondition::ConnectionPoolSaturation => {
310 format!(
311 "Connection pool saturation: {:.1}% (threshold: {:.1}%)",
312 value, rule.threshold
313 )
314 }
315 AlertCondition::MigrationFailures => {
316 format!(
317 "Migration failures: {:.0} failures (threshold: {:.0})",
318 value, rule.threshold
319 )
320 }
321 AlertCondition::DiskUsage => {
322 format!(
323 "High disk usage: {:.1}% (threshold: {:.1}%)",
324 value, rule.threshold
325 )
326 }
327 }
328 }
329
330 fn get_alert_metadata(
332 &self,
333 _rule: &AlertRule,
334 health: &SystemHealth,
335 ) -> HashMap<String, String> {
336 let mut metadata = HashMap::new();
337 metadata.insert("timestamp".to_string(), health.timestamp.to_rfc3339());
338 metadata.insert(
339 "uptime_seconds".to_string(),
340 health.uptime_seconds.to_string(),
341 );
342 metadata.insert("system_status".to_string(), format!("{:?}", health.status));
343 metadata
344 }
345
346 fn send_notification(&self, alert: &Alert, is_trigger: bool) {
348 for channel in &self.notification_channels {
349 if !channel.enabled {
350 continue;
351 }
352
353 match channel.channel_type {
354 ChannelType::Log => {
355 let action = if is_trigger { "TRIGGERED" } else { "RESOLVED" };
356 let log_message =
357 format!("[ALERT {}] {} - {}", action, alert.rule_name, alert.message);
358
359 match alert.severity {
360 AlertSeverity::Critical => error!("{}", log_message),
361 AlertSeverity::Warning => warn!("{}", log_message),
362 AlertSeverity::Info => info!("{}", log_message),
363 }
364 }
365 ChannelType::Webhook => {
366 info!(
368 "Would send webhook notification for alert: {}",
369 alert.rule_name
370 );
371 }
372 ChannelType::Email => {
373 info!(
375 "Would send email notification for alert: {}",
376 alert.rule_name
377 );
378 }
379 ChannelType::Slack => {
380 info!(
382 "Would send Slack notification for alert: {}",
383 alert.rule_name
384 );
385 }
386 }
387 }
388 }
389
390 pub fn get_active_alerts(&self) -> Vec<&Alert> {
392 self.active_alerts.values().collect()
393 }
394
395 pub fn get_alert_history(&self, limit: Option<usize>) -> Vec<&Alert> {
397 let mut history: Vec<_> = self.alert_history.iter().collect();
398 history.sort_by(|a, b| b.triggered_at.cmp(&a.triggered_at));
399
400 if let Some(limit) = limit {
401 history.into_iter().take(limit).collect()
402 } else {
403 history
404 }
405 }
406
407 pub fn add_rule(&mut self, rule: AlertRule) {
409 if let Some(existing) = self.rules.iter_mut().find(|r| r.name == rule.name) {
410 *existing = rule;
411 info!("Updated alert rule: {}", existing.name);
412 } else {
413 info!("Added new alert rule: {}", rule.name);
414 self.rules.push(rule);
415 }
416 }
417
418 pub fn remove_rule(&mut self, rule_name: &str) -> bool {
420 let initial_len = self.rules.len();
421 self.rules.retain(|rule| rule.name != rule_name);
422 let removed = self.rules.len() < initial_len;
423
424 if removed {
425 info!("Removed alert rule: {}", rule_name);
426 }
427
428 removed
429 }
430
431 pub fn cleanup_old_alerts(&mut self, max_age_hours: u32) {
433 let cutoff = Utc::now() - chrono::Duration::hours(max_age_hours as i64);
434 let initial_len = self.alert_history.len();
435
436 self.alert_history
437 .retain(|alert| alert.triggered_at > cutoff);
438
439 let removed = initial_len - self.alert_history.len();
440 if removed > 0 {
441 info!("Cleaned up {} old alerts from history", removed);
442 }
443 }
444}
445
446impl AlertCondition {
447 fn to_string(&self) -> String {
448 match self {
449 AlertCondition::MemoryPressure => "memory_pressure".to_string(),
450 AlertCondition::HighErrorRate => "high_error_rate".to_string(),
451 AlertCondition::SlowResponse => "slow_response".to_string(),
452 AlertCondition::ConnectionPoolSaturation => "connection_pool_saturation".to_string(),
453 AlertCondition::MigrationFailures => "migration_failures".to_string(),
454 AlertCondition::DiskUsage => "disk_usage".to_string(),
455 }
456 }
457}
458
459#[cfg(test)]
460mod tests {
461 use super::*;
462 use crate::monitoring::ComponentHealth;
463 use std::collections::HashMap;
464
465 #[test]
466 fn test_alert_manager_creation() {
467 let manager = AlertManager::new();
468 assert!(!manager.rules.is_empty());
469 assert!(manager.active_alerts.is_empty());
470 assert!(!manager.notification_channels.is_empty());
471 }
472
473 #[test]
474 fn test_alert_rule_management() {
475 let mut manager = AlertManager::new();
476 let initial_count = manager.rules.len();
477
478 let new_rule = AlertRule {
479 name: "test_rule".to_string(),
480 condition: AlertCondition::HighErrorRate,
481 threshold: 15.0,
482 severity: AlertSeverity::Warning,
483 enabled: true,
484 };
485
486 manager.add_rule(new_rule);
487 assert_eq!(manager.rules.len(), initial_count + 1);
488
489 let removed = manager.remove_rule("test_rule");
490 assert!(removed);
491 assert_eq!(manager.rules.len(), initial_count);
492 }
493
494 #[test]
495 fn test_alert_evaluation() {
496 let mut manager = AlertManager::new();
497
498 let mut components = HashMap::new();
500 components.insert(
501 "memory_system".to_string(),
502 ComponentHealth {
503 status: HealthStatus::Degraded,
504 message: Some("Test degradation".to_string()),
505 last_checked: Utc::now(),
506 response_time_ms: Some(500),
507 error_count: 15,
508 },
509 );
510
511 let health = SystemHealth {
512 status: HealthStatus::Degraded,
513 timestamp: Utc::now(),
514 components,
515 uptime_seconds: 3600,
516 memory_usage_bytes: 1024 * 1024 * 1024, cpu_usage_percent: 75.0,
518 };
519
520 let initial_alerts = manager.active_alerts.len();
521 manager.evaluate_alerts(&health, None);
522
523 assert!(manager.active_alerts.len() > initial_alerts);
525 assert!(!manager.alert_history.is_empty());
526 }
527
528 #[test]
529 fn test_alert_cleanup() {
530 let mut manager = AlertManager::new();
531
532 let old_alert = Alert {
534 id: "test_alert".to_string(),
535 rule_name: "test_rule".to_string(),
536 severity: AlertSeverity::Warning,
537 condition: AlertCondition::HighErrorRate,
538 message: "Test alert".to_string(),
539 value: 10.0,
540 threshold: 5.0,
541 triggered_at: Utc::now() - chrono::Duration::hours(25), resolved_at: None,
543 metadata: HashMap::new(),
544 };
545
546 manager.alert_history.push(old_alert);
547 assert_eq!(manager.alert_history.len(), 1);
548
549 manager.cleanup_old_alerts(24); assert_eq!(manager.alert_history.len(), 0);
551 }
552}