codex_memory/monitoring/
health.rs

1use super::repository::{MonitoringRepository, PostgresMonitoringRepository};
2use super::{ComponentHealth, HealthStatus, SystemHealth};
3use anyhow::Result;
4use chrono::Utc;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, Instant, SystemTime};
9use tracing::{debug, error, info, warn};
10
11#[derive(Debug, Clone)]
12pub struct HealthChecker {
13    repository: Arc<dyn MonitoringRepository>,
14    start_time: SystemTime,
15    component_thresholds: HealthThresholds,
16}
17
18#[derive(Debug, Clone)]
19pub struct HealthThresholds {
20    pub max_response_time_ms: u64,
21    pub max_error_rate: f64,
22    pub max_memory_usage_percent: f64,
23    pub max_cpu_usage_percent: f64,
24    pub max_connection_pool_utilization: f64,
25}
26
27impl Default for HealthThresholds {
28    fn default() -> Self {
29        Self {
30            max_response_time_ms: 1000, // 1 second
31            max_error_rate: 0.05,       // 5%
32            max_memory_usage_percent: 80.0,
33            max_cpu_usage_percent: 90.0,
34            max_connection_pool_utilization: 80.0,
35        }
36    }
37}
38
39impl HealthChecker {
40    pub fn new(db_pool: Arc<sqlx::PgPool>) -> Self {
41        let repository = Arc::new(PostgresMonitoringRepository::new(db_pool));
42        Self {
43            repository,
44            start_time: SystemTime::now(),
45            component_thresholds: HealthThresholds::default(),
46        }
47    }
48
49    pub fn with_repository(repository: Arc<dyn MonitoringRepository>) -> Self {
50        Self {
51            repository,
52            start_time: SystemTime::now(),
53            component_thresholds: HealthThresholds::default(),
54        }
55    }
56
57    pub fn with_thresholds(mut self, thresholds: HealthThresholds) -> Self {
58        self.component_thresholds = thresholds;
59        self
60    }
61
62    /// Perform comprehensive system health check
63    pub async fn check_system_health(&self) -> Result<SystemHealth> {
64        let start_check = Instant::now();
65        let mut components = HashMap::new();
66
67        // Check database health
68        let db_health = self.check_database_health().await;
69        components.insert("database".to_string(), db_health);
70
71        // Check memory health
72        let memory_health = self.check_memory_health().await;
73        components.insert("memory_system".to_string(), memory_health);
74
75        // Check connection pool health
76        let pool_health = self.check_connection_pool_health().await;
77        components.insert("connection_pool".to_string(), pool_health);
78
79        // Check system resources
80        let system_health = self.check_system_resources().await;
81        components.insert("system_resources".to_string(), system_health);
82
83        // Determine overall health status
84        let overall_status = self.determine_overall_status(&components);
85
86        let uptime = self
87            .start_time
88            .elapsed()
89            .unwrap_or_else(|_| Duration::from_secs(0))
90            .as_secs();
91
92        let memory_usage = self.get_memory_usage().await.unwrap_or(0);
93        let cpu_usage = self.get_cpu_usage().await.unwrap_or(0.0);
94
95        let health = SystemHealth {
96            status: overall_status,
97            timestamp: Utc::now(),
98            components,
99            uptime_seconds: uptime,
100            memory_usage_bytes: memory_usage,
101            cpu_usage_percent: cpu_usage,
102        };
103
104        let check_duration = start_check.elapsed().as_millis();
105        debug!("System health check completed in {}ms", check_duration);
106
107        Ok(health)
108    }
109
110    /// Check database connectivity and performance
111    async fn check_database_health(&self) -> ComponentHealth {
112        let start = Instant::now();
113        let mut status = HealthStatus::Healthy;
114        let mut message = None;
115        let mut error_count = 0;
116
117        // Test database connectivity and performance
118        match self.repository.health_check().await {
119            Ok(_) => {
120                debug!("Database connectivity check passed");
121                let response_time = start.elapsed().as_millis() as u64;
122                if response_time > self.component_thresholds.max_response_time_ms {
123                    status = HealthStatus::Degraded;
124                    message = Some(format!("Slow database response: {response_time}ms"));
125                    warn!("Database response time degraded: {}ms", response_time);
126                }
127            }
128            Err(e) => {
129                status = HealthStatus::Unhealthy;
130                message = Some(format!("Database health check failed: {e}"));
131                error_count += 1;
132                error!("Database health check failed: {}", e);
133            }
134        }
135
136        let response_time_ms = start.elapsed().as_millis() as u64;
137
138        ComponentHealth {
139            status,
140            message,
141            last_checked: Utc::now(),
142            response_time_ms: Some(response_time_ms),
143            error_count,
144        }
145    }
146
147    /// Check memory system health
148    async fn check_memory_health(&self) -> ComponentHealth {
149        let start = Instant::now();
150        let mut status = HealthStatus::Healthy;
151        let mut message = None;
152        let mut error_count = 0;
153
154        // Check memory tier distribution
155        match self.repository.get_memory_tier_distribution().await {
156            Ok(tier_counts) => {
157                let total: i64 = tier_counts.values().sum();
158
159                // Check for memory pressure (too many memories in working tier)
160                if let Some(working_count) = tier_counts.get("working") {
161                    let working_ratio = *working_count as f64 / total as f64;
162                    if working_ratio > 0.7 {
163                        // More than 70% in working tier
164                        status = HealthStatus::Degraded;
165                        message = Some(format!(
166                            "Memory pressure detected: {:.1}% in working tier",
167                            working_ratio * 100.0
168                        ));
169                        warn!(
170                            "Memory pressure: {:.1}% of memories in working tier",
171                            working_ratio * 100.0
172                        );
173                    }
174                }
175
176                info!(
177                    "Memory tier distribution check passed: {} active memories",
178                    total
179                );
180            }
181            Err(e) => {
182                status = HealthStatus::Degraded;
183                message = Some(format!("Memory tier check failed: {e}"));
184                error_count += 1;
185                warn!("Memory tier health check failed: {}", e);
186            }
187        }
188
189        // Check for recent migration failures
190        match self.repository.check_migration_failures(1).await {
191            Ok(failure_count) => {
192                if failure_count > 10 {
193                    status = HealthStatus::Degraded;
194                    message = Some(format!(
195                        "High migration failure rate: {failure_count} failures in last hour"
196                    ));
197                    warn!(
198                        "High migration failure rate: {} failures in last hour",
199                        failure_count
200                    );
201                }
202            }
203            Err(e) => {
204                warn!("Failed to check migration failures: {}", e);
205                error_count += 1;
206            }
207        }
208
209        let response_time_ms = start.elapsed().as_millis() as u64;
210
211        ComponentHealth {
212            status,
213            message,
214            last_checked: Utc::now(),
215            response_time_ms: Some(response_time_ms),
216            error_count,
217        }
218    }
219
220    /// Check connection pool health
221    async fn check_connection_pool_health(&self) -> ComponentHealth {
222        let start = Instant::now();
223        let mut status = HealthStatus::Healthy;
224        let mut message = None;
225
226        // Get connection pool statistics
227        let pool_stats = match self.repository.get_connection_pool_stats().await {
228            Ok(stats) => stats,
229            Err(e) => {
230                status = HealthStatus::Degraded;
231                message = Some(format!("Failed to get connection pool stats: {e}"));
232                warn!("Failed to get connection pool statistics: {}", e);
233                return ComponentHealth {
234                    status,
235                    message,
236                    last_checked: chrono::Utc::now(),
237                    response_time_ms: Some(start.elapsed().as_millis() as u64),
238                    error_count: 1,
239                };
240            }
241        };
242
243        let max_size = 100; // Would get from config in production
244        let utilization = if max_size > 0 {
245            (pool_stats.active_connections as f64 / max_size as f64) * 100.0
246        } else {
247            0.0
248        };
249
250        if utilization > self.component_thresholds.max_connection_pool_utilization {
251            status = HealthStatus::Degraded;
252            message = Some(format!(
253                "High connection pool utilization: {utilization:.1}%"
254            ));
255            warn!("Connection pool utilization high: {:.1}%", utilization);
256        } else if utilization > 90.0 {
257            status = HealthStatus::Unhealthy;
258            message = Some(format!(
259                "Critical connection pool utilization: {utilization:.1}%"
260            ));
261            error!("Connection pool utilization critical: {:.1}%", utilization);
262        }
263
264        let response_time_ms = start.elapsed().as_millis() as u64;
265
266        info!(
267            "Connection pool health: {}/{} connections used ({:.1}% utilization)",
268            pool_stats.active_connections, max_size, utilization
269        );
270
271        ComponentHealth {
272            status,
273            message,
274            last_checked: Utc::now(),
275            response_time_ms: Some(response_time_ms),
276            error_count: 0,
277        }
278    }
279
280    /// Check system resource health
281    async fn check_system_resources(&self) -> ComponentHealth {
282        let start = Instant::now();
283        let mut status = HealthStatus::Healthy;
284        let mut message = None;
285
286        let memory_usage = self.get_memory_usage().await.unwrap_or(0);
287        let cpu_usage = self.get_cpu_usage().await.unwrap_or(0.0);
288
289        // Check memory usage (simplified - would use actual system monitoring in production)
290        let memory_usage_mb = memory_usage / (1024 * 1024);
291        if memory_usage_mb > 1024 {
292            // Simplified threshold
293            status = HealthStatus::Degraded;
294            message = Some(format!("High memory usage: {memory_usage_mb}MB"));
295        }
296
297        // Check CPU usage
298        if cpu_usage > self.component_thresholds.max_cpu_usage_percent {
299            status = HealthStatus::Degraded;
300            let cpu_message = format!("High CPU usage: {cpu_usage:.1}%");
301            message = match message {
302                Some(existing) => Some(format!("{existing}; {cpu_message}")),
303                None => Some(cpu_message),
304            };
305        }
306
307        let response_time_ms = start.elapsed().as_millis() as u64;
308
309        ComponentHealth {
310            status,
311            message,
312            last_checked: Utc::now(),
313            response_time_ms: Some(response_time_ms),
314            error_count: 0,
315        }
316    }
317
318    /// Determine overall system health from component health
319    fn determine_overall_status(
320        &self,
321        components: &HashMap<String, ComponentHealth>,
322    ) -> HealthStatus {
323        let mut has_unhealthy = false;
324        let mut has_degraded = false;
325
326        for (component_name, health) in components {
327            match health.status {
328                HealthStatus::Unhealthy => {
329                    has_unhealthy = true;
330                    error!(
331                        "Component {} is unhealthy: {:?}",
332                        component_name, health.message
333                    );
334                }
335                HealthStatus::Degraded => {
336                    has_degraded = true;
337                    warn!(
338                        "Component {} is degraded: {:?}",
339                        component_name, health.message
340                    );
341                }
342                HealthStatus::Healthy => {
343                    debug!("Component {} is healthy", component_name);
344                }
345            }
346        }
347
348        if has_unhealthy {
349            HealthStatus::Unhealthy
350        } else if has_degraded {
351            HealthStatus::Degraded
352        } else {
353            HealthStatus::Healthy
354        }
355    }
356
357    /// Get current memory usage (simplified implementation)
358    async fn get_memory_usage(&self) -> Result<u64> {
359        // In production, would use system monitoring APIs
360        // For now, return a placeholder value
361        Ok(512 * 1024 * 1024) // 512MB
362    }
363
364    /// Get current CPU usage (simplified implementation)
365    async fn get_cpu_usage(&self) -> Result<f64> {
366        // In production, would use system monitoring APIs
367        // For now, return a placeholder value
368        Ok(25.0) // 25%
369    }
370}
371
372/// Simple health check endpoint response
373#[derive(Debug, Serialize, Deserialize)]
374pub struct SimpleHealthResponse {
375    pub status: String,
376    pub timestamp: String,
377    pub uptime_seconds: u64,
378}
379
380impl From<&SystemHealth> for SimpleHealthResponse {
381    fn from(health: &SystemHealth) -> Self {
382        Self {
383            status: match health.status {
384                HealthStatus::Healthy => "healthy".to_string(),
385                HealthStatus::Degraded => "degraded".to_string(),
386                HealthStatus::Unhealthy => "unhealthy".to_string(),
387            },
388            timestamp: health.timestamp.to_rfc3339(),
389            uptime_seconds: health.uptime_seconds,
390        }
391    }
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397
398    #[test]
399    fn test_health_thresholds_default() {
400        let thresholds = HealthThresholds::default();
401        assert_eq!(thresholds.max_response_time_ms, 1000);
402        assert_eq!(thresholds.max_error_rate, 0.05);
403    }
404
405    #[test]
406    fn test_simple_health_response_conversion() {
407        let health = SystemHealth {
408            status: HealthStatus::Healthy,
409            timestamp: Utc::now(),
410            components: HashMap::new(),
411            uptime_seconds: 3600,
412            memory_usage_bytes: 1024 * 1024,
413            cpu_usage_percent: 25.0,
414        };
415
416        let simple: SimpleHealthResponse = (&health).into();
417        assert_eq!(simple.status, "healthy");
418        assert_eq!(simple.uptime_seconds, 3600);
419    }
420}