codex_memory/monitoring/
health.rs

1use super::{ComponentHealth, HealthStatus, SystemHealth};
2use anyhow::Result;
3use chrono::Utc;
4use serde::{Deserialize, Serialize};
5use sqlx::PgPool;
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::{Duration, Instant, SystemTime};
9use tracing::{debug, error, info, warn};
10
11#[derive(Debug, Clone)]
12pub struct HealthChecker {
13    db_pool: Arc<PgPool>,
14    start_time: SystemTime,
15    component_thresholds: HealthThresholds,
16}
17
18#[derive(Debug, Clone)]
19pub struct HealthThresholds {
20    pub max_response_time_ms: u64,
21    pub max_error_rate: f64,
22    pub max_memory_usage_percent: f64,
23    pub max_cpu_usage_percent: f64,
24    pub max_connection_pool_utilization: f64,
25}
26
27impl Default for HealthThresholds {
28    fn default() -> Self {
29        Self {
30            max_response_time_ms: 1000, // 1 second
31            max_error_rate: 0.05,       // 5%
32            max_memory_usage_percent: 80.0,
33            max_cpu_usage_percent: 90.0,
34            max_connection_pool_utilization: 80.0,
35        }
36    }
37}
38
39impl HealthChecker {
40    pub fn new(db_pool: Arc<PgPool>) -> Self {
41        Self {
42            db_pool,
43            start_time: SystemTime::now(),
44            component_thresholds: HealthThresholds::default(),
45        }
46    }
47
48    pub fn with_thresholds(mut self, thresholds: HealthThresholds) -> Self {
49        self.component_thresholds = thresholds;
50        self
51    }
52
53    /// Perform comprehensive system health check
54    pub async fn check_system_health(&self) -> Result<SystemHealth> {
55        let start_check = Instant::now();
56        let mut components = HashMap::new();
57
58        // Check database health
59        let db_health = self.check_database_health().await;
60        components.insert("database".to_string(), db_health);
61
62        // Check memory health
63        let memory_health = self.check_memory_health().await;
64        components.insert("memory_system".to_string(), memory_health);
65
66        // Check connection pool health
67        let pool_health = self.check_connection_pool_health().await;
68        components.insert("connection_pool".to_string(), pool_health);
69
70        // Check system resources
71        let system_health = self.check_system_resources().await;
72        components.insert("system_resources".to_string(), system_health);
73
74        // Determine overall health status
75        let overall_status = self.determine_overall_status(&components);
76
77        let uptime = self
78            .start_time
79            .elapsed()
80            .unwrap_or_else(|_| Duration::from_secs(0))
81            .as_secs();
82
83        let memory_usage = self.get_memory_usage().await.unwrap_or(0);
84        let cpu_usage = self.get_cpu_usage().await.unwrap_or(0.0);
85
86        let health = SystemHealth {
87            status: overall_status,
88            timestamp: Utc::now(),
89            components,
90            uptime_seconds: uptime,
91            memory_usage_bytes: memory_usage,
92            cpu_usage_percent: cpu_usage,
93        };
94
95        let check_duration = start_check.elapsed().as_millis();
96        debug!("System health check completed in {}ms", check_duration);
97
98        Ok(health)
99    }
100
101    /// Check database connectivity and performance
102    async fn check_database_health(&self) -> ComponentHealth {
103        let start = Instant::now();
104        let mut status = HealthStatus::Healthy;
105        let mut message = None;
106        let mut error_count = 0;
107
108        // Test basic connectivity
109        match sqlx::query("SELECT 1 as health_check")
110            .fetch_one(self.db_pool.as_ref())
111            .await
112        {
113            Ok(_) => {
114                debug!("Database connectivity check passed");
115            }
116            Err(e) => {
117                status = HealthStatus::Unhealthy;
118                message = Some(format!("Database connection failed: {e}"));
119                error_count += 1;
120                error!("Database health check failed: {}", e);
121            }
122        }
123
124        // Test database performance with a more complex query
125        if status == HealthStatus::Healthy {
126            match sqlx::query("SELECT COUNT(*) FROM memories WHERE status = 'active'")
127                .fetch_one(self.db_pool.as_ref())
128                .await
129            {
130                Ok(_) => {
131                    let response_time = start.elapsed().as_millis() as u64;
132                    if response_time > self.component_thresholds.max_response_time_ms {
133                        status = HealthStatus::Degraded;
134                        message = Some(format!("Slow database response: {response_time}ms"));
135                        warn!("Database response time degraded: {}ms", response_time);
136                    }
137                }
138                Err(e) => {
139                    status = HealthStatus::Degraded;
140                    message = Some(format!("Database query performance issue: {e}"));
141                    error_count += 1;
142                    warn!("Database performance check failed: {}", e);
143                }
144            }
145        }
146
147        let response_time_ms = start.elapsed().as_millis() as u64;
148
149        ComponentHealth {
150            status,
151            message,
152            last_checked: Utc::now(),
153            response_time_ms: Some(response_time_ms),
154            error_count,
155        }
156    }
157
158    /// Check memory system health
159    async fn check_memory_health(&self) -> ComponentHealth {
160        let start = Instant::now();
161        let mut status = HealthStatus::Healthy;
162        let mut message = None;
163        let mut error_count = 0;
164
165        // Check memory tier distribution
166        match sqlx::query_as::<_, (String, i64)>(
167            "SELECT tier, COUNT(*) FROM memories WHERE status = 'active' GROUP BY tier",
168        )
169        .fetch_all(self.db_pool.as_ref())
170        .await
171        {
172            Ok(tier_counts) => {
173                let total: i64 = tier_counts.iter().map(|(_, count)| count).sum();
174
175                // Check for memory pressure (too many memories in working tier)
176                if let Some((_, working_count)) =
177                    tier_counts.iter().find(|(tier, _)| tier == "working")
178                {
179                    let working_ratio = *working_count as f64 / total as f64;
180                    if working_ratio > 0.7 {
181                        // More than 70% in working tier
182                        status = HealthStatus::Degraded;
183                        message = Some(format!(
184                            "Memory pressure detected: {:.1}% in working tier",
185                            working_ratio * 100.0
186                        ));
187                        warn!(
188                            "Memory pressure: {:.1}% of memories in working tier",
189                            working_ratio * 100.0
190                        );
191                    }
192                }
193
194                info!(
195                    "Memory tier distribution check passed: {} active memories",
196                    total
197                );
198            }
199            Err(e) => {
200                status = HealthStatus::Degraded;
201                message = Some(format!("Memory tier check failed: {e}"));
202                error_count += 1;
203                warn!("Memory tier health check failed: {}", e);
204            }
205        }
206
207        // Check for recent migration failures
208        match sqlx::query_scalar::<_, i64>(
209            "SELECT COUNT(*) FROM migration_history WHERE success = false AND migrated_at > NOW() - INTERVAL '1 hour'"
210        )
211        .fetch_one(self.db_pool.as_ref())
212        .await
213        {
214            Ok(failure_count) => {
215                if failure_count > 10 {
216                    status = HealthStatus::Degraded;
217                    message = Some(format!("High migration failure rate: {failure_count} failures in last hour"));
218                    warn!("High migration failure rate: {} failures in last hour", failure_count);
219                }
220            }
221            Err(e) => {
222                warn!("Failed to check migration failures: {}", e);
223                error_count += 1;
224            }
225        }
226
227        let response_time_ms = start.elapsed().as_millis() as u64;
228
229        ComponentHealth {
230            status,
231            message,
232            last_checked: Utc::now(),
233            response_time_ms: Some(response_time_ms),
234            error_count,
235        }
236    }
237
238    /// Check connection pool health
239    async fn check_connection_pool_health(&self) -> ComponentHealth {
240        let start = Instant::now();
241        let mut status = HealthStatus::Healthy;
242        let mut message = None;
243
244        // Get connection pool statistics
245        let pool_size = self.db_pool.size();
246        let idle_connections = self.db_pool.num_idle();
247        let max_size = 100; // Would get from config in production
248
249        let utilization = if max_size > 0 {
250            ((pool_size as usize - idle_connections) as f64 / max_size as f64) * 100.0
251        } else {
252            0.0
253        };
254
255        if utilization > self.component_thresholds.max_connection_pool_utilization {
256            status = HealthStatus::Degraded;
257            message = Some(format!(
258                "High connection pool utilization: {utilization:.1}%"
259            ));
260            warn!("Connection pool utilization high: {:.1}%", utilization);
261        } else if utilization > 90.0 {
262            status = HealthStatus::Unhealthy;
263            message = Some(format!(
264                "Critical connection pool utilization: {utilization:.1}%"
265            ));
266            error!("Connection pool utilization critical: {:.1}%", utilization);
267        }
268
269        let response_time_ms = start.elapsed().as_millis() as u64;
270
271        info!(
272            "Connection pool health: {}/{} connections used ({:.1}% utilization)",
273            pool_size as usize - idle_connections,
274            max_size,
275            utilization
276        );
277
278        ComponentHealth {
279            status,
280            message,
281            last_checked: Utc::now(),
282            response_time_ms: Some(response_time_ms),
283            error_count: 0,
284        }
285    }
286
287    /// Check system resource health
288    async fn check_system_resources(&self) -> ComponentHealth {
289        let start = Instant::now();
290        let mut status = HealthStatus::Healthy;
291        let mut message = None;
292
293        let memory_usage = self.get_memory_usage().await.unwrap_or(0);
294        let cpu_usage = self.get_cpu_usage().await.unwrap_or(0.0);
295
296        // Check memory usage (simplified - would use actual system monitoring in production)
297        let memory_usage_mb = memory_usage / (1024 * 1024);
298        if memory_usage_mb > 1024 {
299            // Simplified threshold
300            status = HealthStatus::Degraded;
301            message = Some(format!("High memory usage: {memory_usage_mb}MB"));
302        }
303
304        // Check CPU usage
305        if cpu_usage > self.component_thresholds.max_cpu_usage_percent {
306            status = HealthStatus::Degraded;
307            let cpu_message = format!("High CPU usage: {cpu_usage:.1}%");
308            message = match message {
309                Some(existing) => Some(format!("{existing}; {cpu_message}")),
310                None => Some(cpu_message),
311            };
312        }
313
314        let response_time_ms = start.elapsed().as_millis() as u64;
315
316        ComponentHealth {
317            status,
318            message,
319            last_checked: Utc::now(),
320            response_time_ms: Some(response_time_ms),
321            error_count: 0,
322        }
323    }
324
325    /// Determine overall system health from component health
326    fn determine_overall_status(
327        &self,
328        components: &HashMap<String, ComponentHealth>,
329    ) -> HealthStatus {
330        let mut has_unhealthy = false;
331        let mut has_degraded = false;
332
333        for (component_name, health) in components {
334            match health.status {
335                HealthStatus::Unhealthy => {
336                    has_unhealthy = true;
337                    error!(
338                        "Component {} is unhealthy: {:?}",
339                        component_name, health.message
340                    );
341                }
342                HealthStatus::Degraded => {
343                    has_degraded = true;
344                    warn!(
345                        "Component {} is degraded: {:?}",
346                        component_name, health.message
347                    );
348                }
349                HealthStatus::Healthy => {
350                    debug!("Component {} is healthy", component_name);
351                }
352            }
353        }
354
355        if has_unhealthy {
356            HealthStatus::Unhealthy
357        } else if has_degraded {
358            HealthStatus::Degraded
359        } else {
360            HealthStatus::Healthy
361        }
362    }
363
364    /// Get current memory usage (simplified implementation)
365    async fn get_memory_usage(&self) -> Result<u64> {
366        // In production, would use system monitoring APIs
367        // For now, return a placeholder value
368        Ok(512 * 1024 * 1024) // 512MB
369    }
370
371    /// Get current CPU usage (simplified implementation)
372    async fn get_cpu_usage(&self) -> Result<f64> {
373        // In production, would use system monitoring APIs
374        // For now, return a placeholder value
375        Ok(25.0) // 25%
376    }
377}
378
379/// Simple health check endpoint response
380#[derive(Debug, Serialize, Deserialize)]
381pub struct SimpleHealthResponse {
382    pub status: String,
383    pub timestamp: String,
384    pub uptime_seconds: u64,
385}
386
387impl From<&SystemHealth> for SimpleHealthResponse {
388    fn from(health: &SystemHealth) -> Self {
389        Self {
390            status: match health.status {
391                HealthStatus::Healthy => "healthy".to_string(),
392                HealthStatus::Degraded => "degraded".to_string(),
393                HealthStatus::Unhealthy => "unhealthy".to_string(),
394            },
395            timestamp: health.timestamp.to_rfc3339(),
396            uptime_seconds: health.uptime_seconds,
397        }
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404
405    #[test]
406    fn test_health_thresholds_default() {
407        let thresholds = HealthThresholds::default();
408        assert_eq!(thresholds.max_response_time_ms, 1000);
409        assert_eq!(thresholds.max_error_rate, 0.05);
410    }
411
412    #[test]
413    fn test_simple_health_response_conversion() {
414        let health = SystemHealth {
415            status: HealthStatus::Healthy,
416            timestamp: Utc::now(),
417            components: HashMap::new(),
418            uptime_seconds: 3600,
419            memory_usage_bytes: 1024 * 1024,
420            cpu_usage_percent: 25.0,
421        };
422
423        let simple: SimpleHealthResponse = (&health).into();
424        assert_eq!(simple.status, "healthy");
425        assert_eq!(simple.uptime_seconds, 3600);
426    }
427}