oxify_storage/
health.rs

1//! Health check system for production monitoring
2//!
3//! This module provides comprehensive health checks for the database and storage layer.
4//! It's designed to integrate with container orchestration systems (Kubernetes, Docker)
5//! and monitoring platforms (Prometheus, Datadog).
6//!
7//! # Features
8//!
9//! - **Connection Health**: Verify database connectivity and responsiveness
10//! - **Pool Health**: Check connection pool utilization and availability
11//! - **Table Checks**: Verify critical tables exist and are accessible
12//! - **Replication Lag**: Monitor replication delay (if applicable)
13//! - **Disk Space**: Check available storage space
14//! - **Overall Status**: Aggregate health status with severity levels
15//!
16//! # Example
17//!
18//! ```ignore
19//! use oxify_storage::{DatabasePool, HealthCheck, HealthStatus};
20//!
21//! let pool = DatabasePool::new(config).await?;
22//! let health_check = HealthCheck::new(pool.clone());
23//!
24//! // Quick check (liveness probe)
25//! let is_alive = health_check.is_alive().await?;
26//! if !is_alive {
27//!     panic!("Database is not responding!");
28//! }
29//!
30//! // Detailed check (readiness probe)
31//! let health = health_check.check().await?;
32//! match health.status {
33//!     HealthStatus::Healthy => {
34//!         println!("System is healthy");
35//!     }
36//!     HealthStatus::Degraded => {
37//!         println!("System is degraded: {}", health.message);
38//!     }
39//!     HealthStatus::Unhealthy => {
40//!         eprintln!("System is unhealthy: {}", health.message);
41//!         // Don't accept new requests
42//!     }
43//! }
44//!
45//! // Export for monitoring
46//! let metrics = health_check.export_metrics().await?;
47//! for (key, value) in metrics {
48//!     prometheus::gauge(format!("oxify_health_{}", key), value);
49//! }
50//! ```
51
52use crate::{DatabasePool, Result};
53use serde::{Deserialize, Serialize};
54use sqlx::Row;
55use std::collections::HashMap;
56use std::time::{Duration, Instant};
57
58/// Overall health status of the system
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
60pub enum HealthStatus {
61    /// System is fully operational
62    Healthy,
63    /// System is operational but with degraded performance
64    Degraded,
65    /// System is not operational
66    Unhealthy,
67}
68
69/// Individual component health check result
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct ComponentHealth {
72    /// Component name (e.g., "database", "connection_pool")
73    pub name: String,
74    /// Health status of this component
75    pub status: HealthStatus,
76    /// Human-readable message
77    pub message: String,
78    /// Check duration in milliseconds
79    pub duration_ms: u64,
80    /// Additional metadata
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub metadata: Option<HashMap<String, String>>,
83}
84
85/// Comprehensive health check result
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct HealthReport {
88    /// Overall system health status
89    pub status: HealthStatus,
90    /// Overall message
91    pub message: String,
92    /// Individual component checks
93    pub components: Vec<ComponentHealth>,
94    /// Total check duration in milliseconds
95    pub total_duration_ms: u64,
96    /// Timestamp of the check
97    pub timestamp: i64,
98}
99
100impl HealthReport {
101    /// Check if the system is healthy
102    pub fn is_healthy(&self) -> bool {
103        self.status == HealthStatus::Healthy
104    }
105
106    /// Check if the system is degraded
107    pub fn is_degraded(&self) -> bool {
108        self.status == HealthStatus::Degraded
109    }
110
111    /// Check if the system is unhealthy
112    pub fn is_unhealthy(&self) -> bool {
113        self.status == HealthStatus::Unhealthy
114    }
115
116    /// Get all unhealthy components
117    pub fn unhealthy_components(&self) -> Vec<&ComponentHealth> {
118        self.components
119            .iter()
120            .filter(|c| c.status == HealthStatus::Unhealthy)
121            .collect()
122    }
123
124    /// Get all degraded components
125    pub fn degraded_components(&self) -> Vec<&ComponentHealth> {
126        self.components
127            .iter()
128            .filter(|c| c.status == HealthStatus::Degraded)
129            .collect()
130    }
131}
132
133/// Health check configuration
134#[derive(Debug, Clone)]
135pub struct HealthCheckConfig {
136    /// Timeout for database ping (default: 5 seconds)
137    pub ping_timeout: Duration,
138    /// Pool utilization threshold for degraded status (default: 0.8)
139    pub pool_degraded_threshold: f64,
140    /// Pool utilization threshold for unhealthy status (default: 0.95)
141    pub pool_unhealthy_threshold: f64,
142    /// Tables to check for existence (default: critical tables)
143    pub required_tables: Vec<String>,
144    /// Enable replication lag check (default: false)
145    pub check_replication_lag: bool,
146    /// Maximum acceptable replication lag in seconds (default: 10)
147    pub max_replication_lag_secs: i64,
148}
149
150impl Default for HealthCheckConfig {
151    fn default() -> Self {
152        Self {
153            ping_timeout: Duration::from_secs(5),
154            pool_degraded_threshold: 0.8,
155            pool_unhealthy_threshold: 0.95,
156            required_tables: vec![
157                "users".to_string(),
158                "workflows".to_string(),
159                "executions".to_string(),
160                "user_quotas".to_string(),
161            ],
162            check_replication_lag: false,
163            max_replication_lag_secs: 10,
164        }
165    }
166}
167
168/// Health check service
169pub struct HealthCheck {
170    pool: DatabasePool,
171    config: HealthCheckConfig,
172}
173
174impl HealthCheck {
175    /// Create a new health check service with default configuration
176    pub fn new(pool: DatabasePool) -> Self {
177        Self {
178            pool,
179            config: HealthCheckConfig::default(),
180        }
181    }
182
183    /// Create a new health check service with custom configuration
184    pub fn with_config(pool: DatabasePool, config: HealthCheckConfig) -> Self {
185        Self { pool, config }
186    }
187
188    /// Quick liveness check - just verifies database is responding
189    ///
190    /// This is suitable for Kubernetes liveness probes.
191    /// It only checks if the database connection works.
192    pub async fn is_alive(&self) -> Result<bool> {
193        let result = sqlx::query("SELECT 1").fetch_one(self.pool.pool()).await;
194
195        Ok(result.is_ok())
196    }
197
198    /// Comprehensive health check
199    ///
200    /// This is suitable for Kubernetes readiness probes.
201    /// It checks all system components and returns detailed status.
202    pub async fn check(&self) -> Result<HealthReport> {
203        let start = Instant::now();
204        let mut components = Vec::new();
205
206        // Check 1: Database connectivity
207        components.push(self.check_database_connectivity().await);
208
209        // Check 2: Connection pool health
210        components.push(self.check_connection_pool().await);
211
212        // Check 3: Required tables existence
213        components.push(self.check_required_tables().await);
214
215        // Check 4: Replication lag (if enabled)
216        if self.config.check_replication_lag {
217            components.push(self.check_replication_lag().await);
218        }
219
220        // Check 5: Database size and disk space
221        components.push(self.check_disk_space().await);
222
223        // Determine overall status
224        let has_unhealthy = components
225            .iter()
226            .any(|c| c.status == HealthStatus::Unhealthy);
227        let has_degraded = components
228            .iter()
229            .any(|c| c.status == HealthStatus::Degraded);
230
231        let (status, message) = if has_unhealthy {
232            (
233                HealthStatus::Unhealthy,
234                "One or more critical components are unhealthy".to_string(),
235            )
236        } else if has_degraded {
237            (
238                HealthStatus::Degraded,
239                "System is operational but degraded".to_string(),
240            )
241        } else {
242            (HealthStatus::Healthy, "All systems operational".to_string())
243        };
244
245        Ok(HealthReport {
246            status,
247            message,
248            components,
249            total_duration_ms: start.elapsed().as_millis() as u64,
250            timestamp: chrono::Utc::now().timestamp(),
251        })
252    }
253
254    /// Export health metrics in a format suitable for monitoring systems
255    pub async fn export_metrics(&self) -> Result<HashMap<String, f64>> {
256        let report = self.check().await?;
257        let mut metrics = HashMap::new();
258
259        // Overall health (1 = healthy, 0.5 = degraded, 0 = unhealthy)
260        let health_value = match report.status {
261            HealthStatus::Healthy => 1.0,
262            HealthStatus::Degraded => 0.5,
263            HealthStatus::Unhealthy => 0.0,
264        };
265        metrics.insert("overall_health".to_string(), health_value);
266
267        // Component health
268        for component in &report.components {
269            let value = match component.status {
270                HealthStatus::Healthy => 1.0,
271                HealthStatus::Degraded => 0.5,
272                HealthStatus::Unhealthy => 0.0,
273            };
274            metrics.insert(format!("component_{}", component.name), value);
275            metrics.insert(
276                format!("component_{}_duration_ms", component.name),
277                component.duration_ms as f64,
278            );
279        }
280
281        // Total check duration
282        metrics.insert(
283            "check_duration_ms".to_string(),
284            report.total_duration_ms as f64,
285        );
286
287        Ok(metrics)
288    }
289
290    // Individual health checks
291
292    async fn check_database_connectivity(&self) -> ComponentHealth {
293        let start = Instant::now();
294        let result = sqlx::query("SELECT version()")
295            .fetch_one(self.pool.pool())
296            .await;
297
298        let duration_ms = start.elapsed().as_millis() as u64;
299
300        match result {
301            Ok(_) => ComponentHealth {
302                name: "database".to_string(),
303                status: HealthStatus::Healthy,
304                message: "Database is responding".to_string(),
305                duration_ms,
306                metadata: None,
307            },
308            Err(e) => ComponentHealth {
309                name: "database".to_string(),
310                status: HealthStatus::Unhealthy,
311                message: format!("Database connection failed: {e}"),
312                duration_ms,
313                metadata: None,
314            },
315        }
316    }
317
318    async fn check_connection_pool(&self) -> ComponentHealth {
319        let start = Instant::now();
320        let metrics = self.pool.metrics();
321        let duration_ms = start.elapsed().as_millis() as u64;
322
323        let utilization = metrics.stats.utilization();
324        let mut metadata = HashMap::new();
325        metadata.insert("utilization".to_string(), format!("{utilization:.2}"));
326        metadata.insert(
327            "active_connections".to_string(),
328            metrics.stats.size.to_string(),
329        );
330        metadata.insert(
331            "idle_connections".to_string(),
332            metrics.stats.num_idle.to_string(),
333        );
334
335        let (status, message) = if utilization >= self.config.pool_unhealthy_threshold {
336            (
337                HealthStatus::Unhealthy,
338                format!(
339                    "Pool is at critical capacity ({:.1}% utilization)",
340                    utilization * 100.0
341                ),
342            )
343        } else if utilization >= self.config.pool_degraded_threshold {
344            (
345                HealthStatus::Degraded,
346                format!(
347                    "Pool is under high load ({:.1}% utilization)",
348                    utilization * 100.0
349                ),
350            )
351        } else {
352            (
353                HealthStatus::Healthy,
354                format!("Pool is healthy ({:.1}% utilization)", utilization * 100.0),
355            )
356        };
357
358        ComponentHealth {
359            name: "connection_pool".to_string(),
360            status,
361            message,
362            duration_ms,
363            metadata: Some(metadata),
364        }
365    }
366
367    async fn check_required_tables(&self) -> ComponentHealth {
368        let start = Instant::now();
369        let mut missing_tables = Vec::new();
370
371        for table in &self.config.required_tables {
372            let result = sqlx::query(
373                "SELECT EXISTS (
374                    SELECT FROM information_schema.tables
375                    WHERE table_schema = 'public' AND table_name = $1
376                )",
377            )
378            .bind(table)
379            .fetch_one(self.pool.pool())
380            .await;
381
382            match result {
383                Ok(row) => {
384                    let exists: bool = row.get(0);
385                    if !exists {
386                        missing_tables.push(table.clone());
387                    }
388                }
389                Err(_) => {
390                    missing_tables.push(table.clone());
391                }
392            }
393        }
394
395        let duration_ms = start.elapsed().as_millis() as u64;
396
397        if missing_tables.is_empty() {
398            ComponentHealth {
399                name: "tables".to_string(),
400                status: HealthStatus::Healthy,
401                message: "All required tables exist".to_string(),
402                duration_ms,
403                metadata: None,
404            }
405        } else {
406            let mut metadata = HashMap::new();
407            metadata.insert("missing_tables".to_string(), missing_tables.join(", "));
408
409            ComponentHealth {
410                name: "tables".to_string(),
411                status: HealthStatus::Unhealthy,
412                message: format!("Missing tables: {}", missing_tables.join(", ")),
413                duration_ms,
414                metadata: Some(metadata),
415            }
416        }
417    }
418
419    async fn check_replication_lag(&self) -> ComponentHealth {
420        let start = Instant::now();
421
422        // Check if we're on a replica
423        let result = sqlx::query(
424            "SELECT CASE WHEN pg_is_in_recovery() THEN
425                EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::bigint
426             ELSE 0 END as lag_seconds",
427        )
428        .fetch_one(self.pool.pool())
429        .await;
430
431        let duration_ms = start.elapsed().as_millis() as u64;
432
433        match result {
434            Ok(row) => {
435                let lag_secs: i64 = row.get("lag_seconds");
436                let mut metadata = HashMap::new();
437                metadata.insert("lag_seconds".to_string(), lag_secs.to_string());
438
439                let (status, message) = if lag_secs == 0 {
440                    (
441                        HealthStatus::Healthy,
442                        "Not a replica or no replication lag".to_string(),
443                    )
444                } else if lag_secs > self.config.max_replication_lag_secs {
445                    (
446                        HealthStatus::Degraded,
447                        format!("Replication lag is {lag_secs} seconds"),
448                    )
449                } else {
450                    (
451                        HealthStatus::Healthy,
452                        format!("Replication lag is {lag_secs} seconds"),
453                    )
454                };
455
456                ComponentHealth {
457                    name: "replication".to_string(),
458                    status,
459                    message,
460                    duration_ms,
461                    metadata: Some(metadata),
462                }
463            }
464            Err(e) => ComponentHealth {
465                name: "replication".to_string(),
466                status: HealthStatus::Degraded,
467                message: format!("Could not check replication lag: {e}"),
468                duration_ms,
469                metadata: None,
470            },
471        }
472    }
473
474    async fn check_disk_space(&self) -> ComponentHealth {
475        let start = Instant::now();
476
477        let result = sqlx::query(
478            "SELECT
479                pg_database_size(current_database()) as db_size,
480                pg_size_pretty(pg_database_size(current_database())) as db_size_pretty",
481        )
482        .fetch_one(self.pool.pool())
483        .await;
484
485        let duration_ms = start.elapsed().as_millis() as u64;
486
487        match result {
488            Ok(row) => {
489                let db_size: i64 = row.get("db_size");
490                let db_size_pretty: String = row.get("db_size_pretty");
491
492                let mut metadata = HashMap::new();
493                metadata.insert("database_size".to_string(), db_size_pretty.clone());
494                metadata.insert("database_size_bytes".to_string(), db_size.to_string());
495
496                ComponentHealth {
497                    name: "disk_space".to_string(),
498                    status: HealthStatus::Healthy,
499                    message: format!("Database size: {db_size_pretty}"),
500                    duration_ms,
501                    metadata: Some(metadata),
502                }
503            }
504            Err(e) => ComponentHealth {
505                name: "disk_space".to_string(),
506                status: HealthStatus::Degraded,
507                message: format!("Could not check disk space: {e}"),
508                duration_ms,
509                metadata: None,
510            },
511        }
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518
519    #[test]
520    fn test_default_health_check_config() {
521        let config = HealthCheckConfig::default();
522        assert_eq!(config.ping_timeout, Duration::from_secs(5));
523        assert_eq!(config.pool_degraded_threshold, 0.8);
524        assert_eq!(config.pool_unhealthy_threshold, 0.95);
525        assert!(!config.required_tables.is_empty());
526        assert!(!config.check_replication_lag);
527    }
528
529    #[test]
530    fn test_health_status() {
531        let report = HealthReport {
532            status: HealthStatus::Healthy,
533            message: "All good".to_string(),
534            components: vec![],
535            total_duration_ms: 100,
536            timestamp: 0,
537        };
538
539        assert!(report.is_healthy());
540        assert!(!report.is_degraded());
541        assert!(!report.is_unhealthy());
542    }
543
544    #[test]
545    fn test_health_report_filters() {
546        let report = HealthReport {
547            status: HealthStatus::Degraded,
548            message: "Some issues".to_string(),
549            components: vec![
550                ComponentHealth {
551                    name: "db".to_string(),
552                    status: HealthStatus::Healthy,
553                    message: "OK".to_string(),
554                    duration_ms: 10,
555                    metadata: None,
556                },
557                ComponentHealth {
558                    name: "pool".to_string(),
559                    status: HealthStatus::Degraded,
560                    message: "High load".to_string(),
561                    duration_ms: 5,
562                    metadata: None,
563                },
564                ComponentHealth {
565                    name: "disk".to_string(),
566                    status: HealthStatus::Unhealthy,
567                    message: "Low space".to_string(),
568                    duration_ms: 8,
569                    metadata: None,
570                },
571            ],
572            total_duration_ms: 23,
573            timestamp: 0,
574        };
575
576        assert_eq!(report.degraded_components().len(), 1);
577        assert_eq!(report.unhealthy_components().len(), 1);
578        assert_eq!(report.degraded_components()[0].name, "pool");
579        assert_eq!(report.unhealthy_components()[0].name, "disk");
580    }
581}