codex_memory/monitoring/
connection_monitor.rs

1use anyhow::Result;
2use serde::{Deserialize, Serialize};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5use tokio::sync::RwLock;
6use tokio::time::interval;
7use tracing::{error, info, warn};
8
9use crate::memory::connection::ConnectionPool;
10
11/// Connection pool monitoring and alerting system
12/// Implements HIGH-004 requirement for 70% utilization alerting
13#[derive(Debug, Clone)]
14pub struct ConnectionMonitor {
15    pool: Arc<ConnectionPool>,
16    config: MonitoringConfig,
17    metrics: Arc<RwLock<ConnectionMetrics>>,
18    alert_history: Arc<RwLock<Vec<AlertEvent>>>,
19}
20
21#[derive(Debug, Clone)]
22pub struct MonitoringConfig {
23    /// Monitoring interval in seconds
24    pub check_interval_seconds: u64,
25    /// Warning threshold (70% as per HIGH-004 requirements)
26    pub warning_threshold: f32,
27    /// Critical threshold (90% utilization)
28    pub critical_threshold: f32,
29    /// Maximum number of alerts to keep in history
30    pub max_alert_history: usize,
31    /// Minimum time between duplicate alerts (seconds)
32    pub alert_cooldown_seconds: u64,
33    /// Enable detailed logging
34    pub enable_detailed_logging: bool,
35}
36
37impl Default for MonitoringConfig {
38    fn default() -> Self {
39        Self {
40            check_interval_seconds: 30, // Check every 30 seconds
41            warning_threshold: 70.0,    // HIGH-004 requirement
42            critical_threshold: 90.0,   // Critical level
43            max_alert_history: 1000,
44            alert_cooldown_seconds: 300, // 5 minutes between duplicate alerts
45            enable_detailed_logging: true,
46        }
47    }
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct ConnectionMetrics {
52    pub timestamp: chrono::DateTime<chrono::Utc>,
53    pub pool_stats: PoolStatsSnapshot,
54    pub health_status: String,
55    pub alert_level: AlertLevel,
56    pub uptime_seconds: u64,
57    pub total_checks: u64,
58    pub alert_count: u64,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct PoolStatsSnapshot {
63    pub size: u32,
64    pub idle: u32,
65    pub active_connections: u32,
66    pub max_size: u32,
67    pub utilization_percentage: f32,
68    pub waiting_for_connection: u32,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
72pub enum AlertLevel {
73    Healthy,
74    Warning,
75    Critical,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct AlertEvent {
80    pub timestamp: chrono::DateTime<chrono::Utc>,
81    pub level: AlertLevel,
82    pub message: String,
83    pub pool_stats: PoolStatsSnapshot,
84    pub resolved: bool,
85}
86
87impl ConnectionMonitor {
88    pub fn new(pool: Arc<ConnectionPool>, config: MonitoringConfig) -> Self {
89        Self {
90            pool,
91            config,
92            metrics: Arc::new(RwLock::new(ConnectionMetrics {
93                timestamp: chrono::Utc::now(),
94                pool_stats: PoolStatsSnapshot::default(),
95                health_status: "Starting".to_string(),
96                alert_level: AlertLevel::Healthy,
97                uptime_seconds: 0,
98                total_checks: 0,
99                alert_count: 0,
100            })),
101            alert_history: Arc::new(RwLock::new(Vec::new())),
102        }
103    }
104
105    /// Start the monitoring service
106    pub async fn start_monitoring(&self) -> Result<()> {
107        info!("🔍 Starting connection pool monitoring service");
108        info!("  Warning threshold: {}%", self.config.warning_threshold);
109        info!("  Critical threshold: {}%", self.config.critical_threshold);
110        info!("  Check interval: {}s", self.config.check_interval_seconds);
111
112        let start_time = Instant::now();
113        let mut interval = interval(Duration::from_secs(self.config.check_interval_seconds));
114
115        loop {
116            interval.tick().await;
117
118            match self.perform_health_check(start_time.elapsed()).await {
119                Ok(_) => {
120                    // Health check successful
121                }
122                Err(e) => {
123                    error!("Health check failed: {}", e);
124                    // Continue monitoring even if individual checks fail
125                }
126            }
127        }
128    }
129
130    async fn perform_health_check(&self, uptime: Duration) -> Result<()> {
131        let pool_stats = self.pool.get_pool_stats().await;
132        let timestamp = chrono::Utc::now();
133
134        // Convert to snapshot format
135        let stats_snapshot = PoolStatsSnapshot {
136            size: pool_stats.size,
137            idle: pool_stats.idle,
138            active_connections: pool_stats.active_connections,
139            max_size: pool_stats.max_size,
140            utilization_percentage: pool_stats.utilization_percentage(),
141            waiting_for_connection: pool_stats.waiting_for_connection,
142        };
143
144        // Determine alert level
145        let alert_level = self.determine_alert_level(&stats_snapshot);
146        let health_status = self.generate_health_status(&stats_snapshot, &alert_level);
147
148        // Update metrics
149        {
150            let mut metrics = self.metrics.write().await;
151            metrics.timestamp = timestamp;
152            metrics.pool_stats = stats_snapshot.clone();
153            metrics.health_status = health_status.clone();
154            metrics.alert_level = alert_level.clone();
155            metrics.uptime_seconds = uptime.as_secs();
156            metrics.total_checks += 1;
157        }
158
159        // Handle alerting
160        if let Some(alert) = self.should_alert(&alert_level, &stats_snapshot).await? {
161            self.send_alert(alert).await?;
162        }
163
164        // Log status if detailed logging is enabled
165        if self.config.enable_detailed_logging {
166            self.log_status(&stats_snapshot, &alert_level).await;
167        }
168
169        Ok(())
170    }
171
172    fn determine_alert_level(&self, stats: &PoolStatsSnapshot) -> AlertLevel {
173        let utilization = stats.utilization_percentage;
174
175        if utilization >= self.config.critical_threshold {
176            AlertLevel::Critical
177        } else if utilization >= self.config.warning_threshold {
178            AlertLevel::Warning
179        } else {
180            AlertLevel::Healthy
181        }
182    }
183
184    fn generate_health_status(
185        &self,
186        stats: &PoolStatsSnapshot,
187        alert_level: &AlertLevel,
188    ) -> String {
189        match alert_level {
190            AlertLevel::Healthy => format!(
191                "HEALTHY: Pool at {:.1}% utilization ({}/{} connections active)",
192                stats.utilization_percentage, stats.active_connections, stats.max_size
193            ),
194            AlertLevel::Warning => format!(
195                "WARNING: Pool at {:.1}% utilization ({}/{} connections active) - Approaching capacity",
196                stats.utilization_percentage, stats.active_connections, stats.max_size
197            ),
198            AlertLevel::Critical => format!(
199                "CRITICAL: Pool at {:.1}% utilization ({}/{} connections active) - Pool saturated!",
200                stats.utilization_percentage, stats.active_connections, stats.max_size
201            ),
202        }
203    }
204
205    async fn should_alert(
206        &self,
207        level: &AlertLevel,
208        stats: &PoolStatsSnapshot,
209    ) -> Result<Option<AlertEvent>> {
210        if matches!(level, AlertLevel::Healthy) {
211            return Ok(None);
212        }
213
214        let alert_history = self.alert_history.read().await;
215        let now = chrono::Utc::now();
216
217        // Check for recent similar alerts (cooldown period)
218        if let Some(last_alert) = alert_history.iter().rev().find(|a| a.level == *level) {
219            let time_since_last = now.signed_duration_since(last_alert.timestamp);
220            if time_since_last.num_seconds() < self.config.alert_cooldown_seconds as i64 {
221                return Ok(None); // Still in cooldown period
222            }
223        }
224
225        // Create new alert
226        let message = match level {
227            AlertLevel::Warning => format!(
228                "Connection pool utilization at {:.1}% (threshold: {}%) - {} active of {} max connections",
229                stats.utilization_percentage,
230                self.config.warning_threshold,
231                stats.active_connections,
232                stats.max_size
233            ),
234            AlertLevel::Critical => format!(
235                "Connection pool critically saturated at {:.1}% (threshold: {}%) - {} active of {} max connections. Immediate attention required!",
236                stats.utilization_percentage,
237                self.config.critical_threshold,
238                stats.active_connections,
239                stats.max_size
240            ),
241            AlertLevel::Healthy => unreachable!(),
242        };
243
244        Ok(Some(AlertEvent {
245            timestamp: now,
246            level: level.clone(),
247            message,
248            pool_stats: stats.clone(),
249            resolved: false,
250        }))
251    }
252
253    async fn send_alert(&self, alert: AlertEvent) -> Result<()> {
254        // Log the alert
255        match alert.level {
256            AlertLevel::Warning => warn!("🚨 POOL WARNING: {}", alert.message),
257            AlertLevel::Critical => error!("🚨🚨 POOL CRITICAL: {}", alert.message),
258            AlertLevel::Healthy => unreachable!(),
259        }
260
261        // Update alert count
262        {
263            let mut metrics = self.metrics.write().await;
264            metrics.alert_count += 1;
265        }
266
267        // Add to alert history
268        {
269            let mut history = self.alert_history.write().await;
270            history.push(alert.clone());
271
272            // Maintain history size limit
273            if history.len() > self.config.max_alert_history {
274                history.remove(0);
275            }
276        }
277
278        // In a production system, you would integrate with:
279        // - PagerDuty, OpsGenie, or similar alerting systems
280        // - Slack/Teams notifications
281        // - Email alerts
282        // - Metrics systems like Prometheus/Grafana
283
284        info!("Alert sent: {} - {}", alert.level as u8, alert.message);
285
286        Ok(())
287    }
288
289    async fn log_status(&self, stats: &PoolStatsSnapshot, level: &AlertLevel) {
290        match level {
291            AlertLevel::Healthy => {
292                if self.config.enable_detailed_logging {
293                    info!(
294                        "Pool Status: {:.1}% utilization - {}/{} active connections",
295                        stats.utilization_percentage, stats.active_connections, stats.max_size
296                    );
297                }
298            }
299            AlertLevel::Warning => {
300                warn!(
301                    "Pool Status: {:.1}% utilization - {}/{} active connections (WARNING LEVEL)",
302                    stats.utilization_percentage, stats.active_connections, stats.max_size
303                );
304            }
305            AlertLevel::Critical => {
306                error!(
307                    "Pool Status: {:.1}% utilization - {}/{} active connections (CRITICAL LEVEL)",
308                    stats.utilization_percentage, stats.active_connections, stats.max_size
309                );
310            }
311        }
312    }
313
314    /// Get current metrics
315    pub async fn get_metrics(&self) -> ConnectionMetrics {
316        self.metrics.read().await.clone()
317    }
318
319    /// Get alert history
320    pub async fn get_alert_history(&self, limit: Option<usize>) -> Vec<AlertEvent> {
321        let history = self.alert_history.read().await;
322        let limit = limit.unwrap_or(history.len());
323        history.iter().rev().take(limit).cloned().collect()
324    }
325
326    /// Get current pool health summary
327    pub async fn get_health_summary(&self) -> PoolHealthSummary {
328        let metrics = self.get_metrics().await;
329        let recent_alerts = self.get_alert_history(Some(10)).await;
330
331        let critical_alerts = recent_alerts
332            .iter()
333            .filter(|a| matches!(a.level, AlertLevel::Critical))
334            .count();
335        let warning_alerts = recent_alerts
336            .iter()
337            .filter(|a| matches!(a.level, AlertLevel::Warning))
338            .count();
339
340        PoolHealthSummary {
341            current_status: metrics.health_status,
342            current_utilization: metrics.pool_stats.utilization_percentage,
343            alert_level: metrics.alert_level,
344            uptime_hours: metrics.uptime_seconds / 3600,
345            total_alerts: metrics.alert_count,
346            recent_critical_alerts: critical_alerts as u64,
347            recent_warning_alerts: warning_alerts as u64,
348            last_check: metrics.timestamp,
349        }
350    }
351}
352
353impl Default for PoolStatsSnapshot {
354    fn default() -> Self {
355        Self {
356            size: 0,
357            idle: 0,
358            active_connections: 0,
359            max_size: 0,
360            utilization_percentage: 0.0,
361            waiting_for_connection: 0,
362        }
363    }
364}
365
366#[derive(Debug, Clone, Serialize, Deserialize)]
367pub struct PoolHealthSummary {
368    pub current_status: String,
369    pub current_utilization: f32,
370    pub alert_level: AlertLevel,
371    pub uptime_hours: u64,
372    pub total_alerts: u64,
373    pub recent_critical_alerts: u64,
374    pub recent_warning_alerts: u64,
375    pub last_check: chrono::DateTime<chrono::Utc>,
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    #[test]
383    fn test_alert_level_determination() {
384        let config = MonitoringConfig::default();
385
386        // Test healthy level
387        let healthy_stats = PoolStatsSnapshot {
388            utilization_percentage: 50.0,
389            ..Default::default()
390        };
391        assert_eq!(
392            determine_alert_level(&config, &healthy_stats),
393            AlertLevel::Healthy
394        );
395
396        // Test warning level
397        let warning_stats = PoolStatsSnapshot {
398            utilization_percentage: 75.0,
399            ..Default::default()
400        };
401        assert_eq!(
402            determine_alert_level(&config, &warning_stats),
403            AlertLevel::Warning
404        );
405
406        // Test critical level
407        let critical_stats = PoolStatsSnapshot {
408            utilization_percentage: 95.0,
409            ..Default::default()
410        };
411        assert_eq!(
412            determine_alert_level(&config, &critical_stats),
413            AlertLevel::Critical
414        );
415    }
416
417    #[test]
418    fn test_health_status_generation() {
419        let stats = PoolStatsSnapshot {
420            utilization_percentage: 75.0,
421            active_connections: 75,
422            max_size: 100,
423            ..Default::default()
424        };
425
426        let status = generate_health_status(&stats, &AlertLevel::Warning);
427        assert!(status.contains("WARNING"));
428        assert!(status.contains("75.0%"));
429        assert!(status.contains("75/100"));
430    }
431}
432
433// Helper functions for tests
434fn determine_alert_level(config: &MonitoringConfig, stats: &PoolStatsSnapshot) -> AlertLevel {
435    let utilization = stats.utilization_percentage;
436
437    if utilization >= config.critical_threshold {
438        AlertLevel::Critical
439    } else if utilization >= config.warning_threshold {
440        AlertLevel::Warning
441    } else {
442        AlertLevel::Healthy
443    }
444}
445
446fn generate_health_status(stats: &PoolStatsSnapshot, alert_level: &AlertLevel) -> String {
447    match alert_level {
448        AlertLevel::Healthy => format!(
449            "HEALTHY: Pool at {:.1}% utilization ({}/{} connections active)",
450            stats.utilization_percentage, stats.active_connections, stats.max_size
451        ),
452        AlertLevel::Warning => format!(
453            "WARNING: Pool at {:.1}% utilization ({}/{} connections active) - Approaching capacity",
454            stats.utilization_percentage, stats.active_connections, stats.max_size
455        ),
456        AlertLevel::Critical => format!(
457            "CRITICAL: Pool at {:.1}% utilization ({}/{} connections active) - Pool saturated!",
458            stats.utilization_percentage, stats.active_connections, stats.max_size
459        ),
460    }
461}