codex_memory/monitoring/
alerts.rs

1use super::{
2    AlertCondition, AlertRule, AlertSeverity, HealthStatus, PerformanceMetrics, SystemHealth,
3};
4use chrono::{DateTime, Utc};
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use tracing::{error, info, warn};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
10pub struct Alert {
11    pub id: String,
12    pub rule_name: String,
13    pub severity: AlertSeverity,
14    pub condition: AlertCondition,
15    pub message: String,
16    pub value: f64,
17    pub threshold: f64,
18    pub triggered_at: DateTime<Utc>,
19    pub resolved_at: Option<DateTime<Utc>>,
20    pub metadata: HashMap<String, String>,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct AlertManager {
25    rules: Vec<AlertRule>,
26    active_alerts: HashMap<String, Alert>,
27    alert_history: Vec<Alert>,
28    notification_channels: Vec<NotificationChannel>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct NotificationChannel {
33    pub name: String,
34    pub channel_type: ChannelType,
35    pub config: serde_json::Value,
36    pub enabled: bool,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub enum ChannelType {
41    Log,
42    Webhook,
43    Email,
44    Slack,
45}
46
47impl Default for AlertManager {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl AlertManager {
54    pub fn new() -> Self {
55        Self {
56            rules: Self::default_alert_rules(),
57            active_alerts: HashMap::new(),
58            alert_history: Vec::new(),
59            notification_channels: vec![NotificationChannel {
60                name: "log".to_string(),
61                channel_type: ChannelType::Log,
62                config: serde_json::json!({}),
63                enabled: true,
64            }],
65        }
66    }
67
68    /// Default set of alert rules for the memory system
69    fn default_alert_rules() -> Vec<AlertRule> {
70        vec![
71            AlertRule {
72                name: "high_memory_pressure".to_string(),
73                condition: AlertCondition::MemoryPressure,
74                threshold: 80.0, // 80% memory usage
75                severity: AlertSeverity::Warning,
76                enabled: true,
77            },
78            AlertRule {
79                name: "critical_memory_pressure".to_string(),
80                condition: AlertCondition::MemoryPressure,
81                threshold: 95.0, // 95% memory usage
82                severity: AlertSeverity::Critical,
83                enabled: true,
84            },
85            AlertRule {
86                name: "high_error_rate".to_string(),
87                condition: AlertCondition::HighErrorRate,
88                threshold: 5.0, // 5% error rate
89                severity: AlertSeverity::Warning,
90                enabled: true,
91            },
92            AlertRule {
93                name: "critical_error_rate".to_string(),
94                condition: AlertCondition::HighErrorRate,
95                threshold: 10.0, // 10% error rate
96                severity: AlertSeverity::Critical,
97                enabled: true,
98            },
99            AlertRule {
100                name: "slow_response_time".to_string(),
101                condition: AlertCondition::SlowResponse,
102                threshold: 1000.0, // 1 second
103                severity: AlertSeverity::Warning,
104                enabled: true,
105            },
106            AlertRule {
107                name: "connection_pool_saturation".to_string(),
108                condition: AlertCondition::ConnectionPoolSaturation,
109                threshold: 90.0, // 90% pool utilization
110                severity: AlertSeverity::Critical,
111                enabled: true,
112            },
113            AlertRule {
114                name: "migration_failures".to_string(),
115                condition: AlertCondition::MigrationFailures,
116                threshold: 10.0, // 10 failures per hour
117                severity: AlertSeverity::Warning,
118                enabled: true,
119            },
120            AlertRule {
121                name: "disk_usage".to_string(),
122                condition: AlertCondition::DiskUsage,
123                threshold: 85.0, // 85% disk usage
124                severity: AlertSeverity::Warning,
125                enabled: true,
126            },
127        ]
128    }
129
130    /// Evaluate all alert rules against current system state
131    pub fn evaluate_alerts(
132        &mut self,
133        health: &SystemHealth,
134        _metrics: Option<&PerformanceMetrics>,
135    ) {
136        let now = Utc::now();
137
138        for rule in &self.rules {
139            if !rule.enabled {
140                continue;
141            }
142
143            let alert_id = format!("{}_{}", rule.name, rule.condition.to_string());
144            let should_trigger = self.evaluate_condition(rule, health);
145            let is_active = self.active_alerts.contains_key(&alert_id);
146
147            match (should_trigger, is_active) {
148                (true, false) => {
149                    // Trigger new alert
150                    let value = self.get_condition_value(rule, health);
151                    let alert = Alert {
152                        id: alert_id.clone(),
153                        rule_name: rule.name.clone(),
154                        severity: rule.severity.clone(),
155                        condition: rule.condition.clone(),
156                        message: self.format_alert_message(rule, value),
157                        value,
158                        threshold: rule.threshold,
159                        triggered_at: now,
160                        resolved_at: None,
161                        metadata: self.get_alert_metadata(rule, health),
162                    };
163
164                    self.active_alerts.insert(alert_id.clone(), alert.clone());
165                    self.alert_history.push(alert.clone());
166                    self.send_notification(&alert, true);
167
168                    match alert.severity {
169                        AlertSeverity::Critical => error!("CRITICAL ALERT: {}", alert.message),
170                        AlertSeverity::Warning => warn!("WARNING ALERT: {}", alert.message),
171                        AlertSeverity::Info => info!("INFO ALERT: {}", alert.message),
172                    }
173                }
174                (false, true) => {
175                    // Resolve active alert
176                    if let Some(mut alert) = self.active_alerts.remove(&alert_id) {
177                        alert.resolved_at = Some(now);
178                        self.send_notification(&alert, false);
179                        info!("RESOLVED ALERT: {}", alert.message);
180
181                        // Update in history
182                        if let Some(history_alert) = self
183                            .alert_history
184                            .iter_mut()
185                            .find(|a| a.id == alert_id && a.resolved_at.is_none())
186                        {
187                            history_alert.resolved_at = Some(now);
188                        }
189                    }
190                }
191                _ => {
192                    // No change needed
193                }
194            }
195        }
196    }
197
198    /// Evaluate a specific alert condition
199    fn evaluate_condition(&self, rule: &AlertRule, health: &SystemHealth) -> bool {
200        match rule.condition {
201            AlertCondition::MemoryPressure => {
202                // Check if working tier has too many memories
203                let working_component = health.components.get("memory_system");
204                if let Some(component) = working_component {
205                    match component.status {
206                        HealthStatus::Degraded | HealthStatus::Unhealthy => true,
207                        _ => false,
208                    }
209                } else {
210                    false
211                }
212            }
213            AlertCondition::HighErrorRate => {
214                // Check database and memory system for errors
215                let total_errors: u64 = health.components.values().map(|c| c.error_count).sum();
216                (total_errors as f64) > rule.threshold
217            }
218            AlertCondition::SlowResponse => {
219                // Check if any component has slow response times
220                health.components.values().any(|component| {
221                    component
222                        .response_time_ms
223                        .map(|rt| rt as f64 > rule.threshold)
224                        .unwrap_or(false)
225                })
226            }
227            AlertCondition::ConnectionPoolSaturation => {
228                // Check connection pool utilization
229                let pool_component = health.components.get("connection_pool");
230                if let Some(component) = pool_component {
231                    match component.status {
232                        HealthStatus::Degraded | HealthStatus::Unhealthy => true,
233                        _ => false,
234                    }
235                } else {
236                    false
237                }
238            }
239            AlertCondition::MigrationFailures => {
240                // Check memory system for migration failures
241                let memory_component = health.components.get("memory_system");
242                if let Some(component) = memory_component {
243                    component.error_count > rule.threshold as u64
244                } else {
245                    false
246                }
247            }
248            AlertCondition::DiskUsage => {
249                // Simplified disk usage check
250                health.memory_usage_bytes
251                    > (rule.threshold / 100.0 * 1024.0 * 1024.0 * 1024.0) as u64
252            }
253        }
254    }
255
256    /// Get the current value for an alert condition
257    fn get_condition_value(&self, rule: &AlertRule, health: &SystemHealth) -> f64 {
258        match rule.condition {
259            AlertCondition::MemoryPressure => {
260                (health.memory_usage_bytes as f64) / (1024.0 * 1024.0 * 1024.0) * 100.0
261            }
262            AlertCondition::HighErrorRate => health
263                .components
264                .values()
265                .map(|c| c.error_count as f64)
266                .sum(),
267            AlertCondition::SlowResponse => health
268                .components
269                .values()
270                .filter_map(|c| c.response_time_ms)
271                .max()
272                .unwrap_or(0) as f64,
273            AlertCondition::ConnectionPoolSaturation => {
274                // Would calculate actual pool utilization in production
275                75.0 // Placeholder
276            }
277            AlertCondition::MigrationFailures => health
278                .components
279                .get("memory_system")
280                .map(|c| c.error_count as f64)
281                .unwrap_or(0.0),
282            AlertCondition::DiskUsage => {
283                (health.memory_usage_bytes as f64) / (1024.0 * 1024.0 * 1024.0) * 100.0
284            }
285        }
286    }
287
288    /// Format alert message
289    fn format_alert_message(&self, rule: &AlertRule, value: f64) -> String {
290        match rule.condition {
291            AlertCondition::MemoryPressure => {
292                format!(
293                    "High memory pressure detected: {:.1}% (threshold: {:.1}%)",
294                    value, rule.threshold
295                )
296            }
297            AlertCondition::HighErrorRate => {
298                format!(
299                    "High error rate: {:.0} errors (threshold: {:.0})",
300                    value, rule.threshold
301                )
302            }
303            AlertCondition::SlowResponse => {
304                format!(
305                    "Slow response time: {:.0}ms (threshold: {:.0}ms)",
306                    value, rule.threshold
307                )
308            }
309            AlertCondition::ConnectionPoolSaturation => {
310                format!(
311                    "Connection pool saturation: {:.1}% (threshold: {:.1}%)",
312                    value, rule.threshold
313                )
314            }
315            AlertCondition::MigrationFailures => {
316                format!(
317                    "Migration failures: {:.0} failures (threshold: {:.0})",
318                    value, rule.threshold
319                )
320            }
321            AlertCondition::DiskUsage => {
322                format!(
323                    "High disk usage: {:.1}% (threshold: {:.1}%)",
324                    value, rule.threshold
325                )
326            }
327        }
328    }
329
330    /// Get metadata for alert
331    fn get_alert_metadata(
332        &self,
333        _rule: &AlertRule,
334        health: &SystemHealth,
335    ) -> HashMap<String, String> {
336        let mut metadata = HashMap::new();
337        metadata.insert("timestamp".to_string(), health.timestamp.to_rfc3339());
338        metadata.insert(
339            "uptime_seconds".to_string(),
340            health.uptime_seconds.to_string(),
341        );
342        metadata.insert("system_status".to_string(), format!("{:?}", health.status));
343        metadata
344    }
345
346    /// Send notification through configured channels
347    fn send_notification(&self, alert: &Alert, is_trigger: bool) {
348        for channel in &self.notification_channels {
349            if !channel.enabled {
350                continue;
351            }
352
353            match channel.channel_type {
354                ChannelType::Log => {
355                    let action = if is_trigger { "TRIGGERED" } else { "RESOLVED" };
356                    let log_message =
357                        format!("[ALERT {}] {} - {}", action, alert.rule_name, alert.message);
358
359                    match alert.severity {
360                        AlertSeverity::Critical => error!("{}", log_message),
361                        AlertSeverity::Warning => warn!("{}", log_message),
362                        AlertSeverity::Info => info!("{}", log_message),
363                    }
364                }
365                ChannelType::Webhook => {
366                    // Would implement HTTP webhook in production
367                    info!(
368                        "Would send webhook notification for alert: {}",
369                        alert.rule_name
370                    );
371                }
372                ChannelType::Email => {
373                    // Would implement email notification in production
374                    info!(
375                        "Would send email notification for alert: {}",
376                        alert.rule_name
377                    );
378                }
379                ChannelType::Slack => {
380                    // Would implement Slack notification in production
381                    info!(
382                        "Would send Slack notification for alert: {}",
383                        alert.rule_name
384                    );
385                }
386            }
387        }
388    }
389
390    /// Get all active alerts
391    pub fn get_active_alerts(&self) -> Vec<&Alert> {
392        self.active_alerts.values().collect()
393    }
394
395    /// Get alert history
396    pub fn get_alert_history(&self, limit: Option<usize>) -> Vec<&Alert> {
397        let mut history: Vec<_> = self.alert_history.iter().collect();
398        history.sort_by(|a, b| b.triggered_at.cmp(&a.triggered_at));
399
400        if let Some(limit) = limit {
401            history.into_iter().take(limit).collect()
402        } else {
403            history
404        }
405    }
406
407    /// Add or update alert rule
408    pub fn add_rule(&mut self, rule: AlertRule) {
409        if let Some(existing) = self.rules.iter_mut().find(|r| r.name == rule.name) {
410            *existing = rule;
411            info!("Updated alert rule: {}", existing.name);
412        } else {
413            info!("Added new alert rule: {}", rule.name);
414            self.rules.push(rule);
415        }
416    }
417
418    /// Remove alert rule
419    pub fn remove_rule(&mut self, rule_name: &str) -> bool {
420        let initial_len = self.rules.len();
421        self.rules.retain(|rule| rule.name != rule_name);
422        let removed = self.rules.len() < initial_len;
423
424        if removed {
425            info!("Removed alert rule: {}", rule_name);
426        }
427
428        removed
429    }
430
431    /// Clear old alerts from history
432    pub fn cleanup_old_alerts(&mut self, max_age_hours: u32) {
433        let cutoff = Utc::now() - chrono::Duration::hours(max_age_hours as i64);
434        let initial_len = self.alert_history.len();
435
436        self.alert_history
437            .retain(|alert| alert.triggered_at > cutoff);
438
439        let removed = initial_len - self.alert_history.len();
440        if removed > 0 {
441            info!("Cleaned up {} old alerts from history", removed);
442        }
443    }
444}
445
446impl AlertCondition {
447    fn to_string(&self) -> String {
448        match self {
449            AlertCondition::MemoryPressure => "memory_pressure".to_string(),
450            AlertCondition::HighErrorRate => "high_error_rate".to_string(),
451            AlertCondition::SlowResponse => "slow_response".to_string(),
452            AlertCondition::ConnectionPoolSaturation => "connection_pool_saturation".to_string(),
453            AlertCondition::MigrationFailures => "migration_failures".to_string(),
454            AlertCondition::DiskUsage => "disk_usage".to_string(),
455        }
456    }
457}
458
459#[cfg(test)]
460mod tests {
461    use super::*;
462    use crate::monitoring::ComponentHealth;
463    use std::collections::HashMap;
464
465    #[test]
466    fn test_alert_manager_creation() {
467        let manager = AlertManager::new();
468        assert!(!manager.rules.is_empty());
469        assert!(manager.active_alerts.is_empty());
470        assert!(!manager.notification_channels.is_empty());
471    }
472
473    #[test]
474    fn test_alert_rule_management() {
475        let mut manager = AlertManager::new();
476        let initial_count = manager.rules.len();
477
478        let new_rule = AlertRule {
479            name: "test_rule".to_string(),
480            condition: AlertCondition::HighErrorRate,
481            threshold: 15.0,
482            severity: AlertSeverity::Warning,
483            enabled: true,
484        };
485
486        manager.add_rule(new_rule);
487        assert_eq!(manager.rules.len(), initial_count + 1);
488
489        let removed = manager.remove_rule("test_rule");
490        assert!(removed);
491        assert_eq!(manager.rules.len(), initial_count);
492    }
493
494    #[test]
495    fn test_alert_evaluation() {
496        let mut manager = AlertManager::new();
497
498        // Create a degraded health state
499        let mut components = HashMap::new();
500        components.insert(
501            "memory_system".to_string(),
502            ComponentHealth {
503                status: HealthStatus::Degraded,
504                message: Some("Test degradation".to_string()),
505                last_checked: Utc::now(),
506                response_time_ms: Some(500),
507                error_count: 15,
508            },
509        );
510
511        let health = SystemHealth {
512            status: HealthStatus::Degraded,
513            timestamp: Utc::now(),
514            components,
515            uptime_seconds: 3600,
516            memory_usage_bytes: 1024 * 1024 * 1024, // 1GB
517            cpu_usage_percent: 75.0,
518        };
519
520        let initial_alerts = manager.active_alerts.len();
521        manager.evaluate_alerts(&health, None);
522
523        // Should have triggered some alerts
524        assert!(manager.active_alerts.len() > initial_alerts);
525        assert!(!manager.alert_history.is_empty());
526    }
527
528    #[test]
529    fn test_alert_cleanup() {
530        let mut manager = AlertManager::new();
531
532        // Add some old alerts to history
533        let old_alert = Alert {
534            id: "test_alert".to_string(),
535            rule_name: "test_rule".to_string(),
536            severity: AlertSeverity::Warning,
537            condition: AlertCondition::HighErrorRate,
538            message: "Test alert".to_string(),
539            value: 10.0,
540            threshold: 5.0,
541            triggered_at: Utc::now() - chrono::Duration::hours(25), // 25 hours ago
542            resolved_at: None,
543            metadata: HashMap::new(),
544        };
545
546        manager.alert_history.push(old_alert);
547        assert_eq!(manager.alert_history.len(), 1);
548
549        manager.cleanup_old_alerts(24); // Remove alerts older than 24 hours
550        assert_eq!(manager.alert_history.len(), 0);
551    }
552}