1use crate::{DatabasePool, Result};
53use serde::{Deserialize, Serialize};
54use sqlx::Row;
55use std::collections::HashMap;
56use std::time::{Duration, Instant};
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
60pub enum HealthStatus {
61 Healthy,
63 Degraded,
65 Unhealthy,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct ComponentHealth {
72 pub name: String,
74 pub status: HealthStatus,
76 pub message: String,
78 pub duration_ms: u64,
80 #[serde(skip_serializing_if = "Option::is_none")]
82 pub metadata: Option<HashMap<String, String>>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct HealthReport {
88 pub status: HealthStatus,
90 pub message: String,
92 pub components: Vec<ComponentHealth>,
94 pub total_duration_ms: u64,
96 pub timestamp: i64,
98}
99
100impl HealthReport {
101 pub fn is_healthy(&self) -> bool {
103 self.status == HealthStatus::Healthy
104 }
105
106 pub fn is_degraded(&self) -> bool {
108 self.status == HealthStatus::Degraded
109 }
110
111 pub fn is_unhealthy(&self) -> bool {
113 self.status == HealthStatus::Unhealthy
114 }
115
116 pub fn unhealthy_components(&self) -> Vec<&ComponentHealth> {
118 self.components
119 .iter()
120 .filter(|c| c.status == HealthStatus::Unhealthy)
121 .collect()
122 }
123
124 pub fn degraded_components(&self) -> Vec<&ComponentHealth> {
126 self.components
127 .iter()
128 .filter(|c| c.status == HealthStatus::Degraded)
129 .collect()
130 }
131}
132
133#[derive(Debug, Clone)]
135pub struct HealthCheckConfig {
136 pub ping_timeout: Duration,
138 pub pool_degraded_threshold: f64,
140 pub pool_unhealthy_threshold: f64,
142 pub required_tables: Vec<String>,
144 pub check_replication_lag: bool,
146 pub max_replication_lag_secs: i64,
148}
149
150impl Default for HealthCheckConfig {
151 fn default() -> Self {
152 Self {
153 ping_timeout: Duration::from_secs(5),
154 pool_degraded_threshold: 0.8,
155 pool_unhealthy_threshold: 0.95,
156 required_tables: vec![
157 "users".to_string(),
158 "workflows".to_string(),
159 "executions".to_string(),
160 "user_quotas".to_string(),
161 ],
162 check_replication_lag: false,
163 max_replication_lag_secs: 10,
164 }
165 }
166}
167
168pub struct HealthCheck {
170 pool: DatabasePool,
171 config: HealthCheckConfig,
172}
173
174impl HealthCheck {
175 pub fn new(pool: DatabasePool) -> Self {
177 Self {
178 pool,
179 config: HealthCheckConfig::default(),
180 }
181 }
182
183 pub fn with_config(pool: DatabasePool, config: HealthCheckConfig) -> Self {
185 Self { pool, config }
186 }
187
188 pub async fn is_alive(&self) -> Result<bool> {
193 let result = sqlx::query("SELECT 1").fetch_one(self.pool.pool()).await;
194
195 Ok(result.is_ok())
196 }
197
198 pub async fn check(&self) -> Result<HealthReport> {
203 let start = Instant::now();
204 let mut components = Vec::new();
205
206 components.push(self.check_database_connectivity().await);
208
209 components.push(self.check_connection_pool().await);
211
212 components.push(self.check_required_tables().await);
214
215 if self.config.check_replication_lag {
217 components.push(self.check_replication_lag().await);
218 }
219
220 components.push(self.check_disk_space().await);
222
223 let has_unhealthy = components
225 .iter()
226 .any(|c| c.status == HealthStatus::Unhealthy);
227 let has_degraded = components
228 .iter()
229 .any(|c| c.status == HealthStatus::Degraded);
230
231 let (status, message) = if has_unhealthy {
232 (
233 HealthStatus::Unhealthy,
234 "One or more critical components are unhealthy".to_string(),
235 )
236 } else if has_degraded {
237 (
238 HealthStatus::Degraded,
239 "System is operational but degraded".to_string(),
240 )
241 } else {
242 (HealthStatus::Healthy, "All systems operational".to_string())
243 };
244
245 Ok(HealthReport {
246 status,
247 message,
248 components,
249 total_duration_ms: start.elapsed().as_millis() as u64,
250 timestamp: chrono::Utc::now().timestamp(),
251 })
252 }
253
254 pub async fn export_metrics(&self) -> Result<HashMap<String, f64>> {
256 let report = self.check().await?;
257 let mut metrics = HashMap::new();
258
259 let health_value = match report.status {
261 HealthStatus::Healthy => 1.0,
262 HealthStatus::Degraded => 0.5,
263 HealthStatus::Unhealthy => 0.0,
264 };
265 metrics.insert("overall_health".to_string(), health_value);
266
267 for component in &report.components {
269 let value = match component.status {
270 HealthStatus::Healthy => 1.0,
271 HealthStatus::Degraded => 0.5,
272 HealthStatus::Unhealthy => 0.0,
273 };
274 metrics.insert(format!("component_{}", component.name), value);
275 metrics.insert(
276 format!("component_{}_duration_ms", component.name),
277 component.duration_ms as f64,
278 );
279 }
280
281 metrics.insert(
283 "check_duration_ms".to_string(),
284 report.total_duration_ms as f64,
285 );
286
287 Ok(metrics)
288 }
289
290 async fn check_database_connectivity(&self) -> ComponentHealth {
293 let start = Instant::now();
294 let result = sqlx::query("SELECT version()")
295 .fetch_one(self.pool.pool())
296 .await;
297
298 let duration_ms = start.elapsed().as_millis() as u64;
299
300 match result {
301 Ok(_) => ComponentHealth {
302 name: "database".to_string(),
303 status: HealthStatus::Healthy,
304 message: "Database is responding".to_string(),
305 duration_ms,
306 metadata: None,
307 },
308 Err(e) => ComponentHealth {
309 name: "database".to_string(),
310 status: HealthStatus::Unhealthy,
311 message: format!("Database connection failed: {e}"),
312 duration_ms,
313 metadata: None,
314 },
315 }
316 }
317
318 async fn check_connection_pool(&self) -> ComponentHealth {
319 let start = Instant::now();
320 let metrics = self.pool.metrics();
321 let duration_ms = start.elapsed().as_millis() as u64;
322
323 let utilization = metrics.stats.utilization();
324 let mut metadata = HashMap::new();
325 metadata.insert("utilization".to_string(), format!("{utilization:.2}"));
326 metadata.insert(
327 "active_connections".to_string(),
328 metrics.stats.size.to_string(),
329 );
330 metadata.insert(
331 "idle_connections".to_string(),
332 metrics.stats.num_idle.to_string(),
333 );
334
335 let (status, message) = if utilization >= self.config.pool_unhealthy_threshold {
336 (
337 HealthStatus::Unhealthy,
338 format!(
339 "Pool is at critical capacity ({:.1}% utilization)",
340 utilization * 100.0
341 ),
342 )
343 } else if utilization >= self.config.pool_degraded_threshold {
344 (
345 HealthStatus::Degraded,
346 format!(
347 "Pool is under high load ({:.1}% utilization)",
348 utilization * 100.0
349 ),
350 )
351 } else {
352 (
353 HealthStatus::Healthy,
354 format!("Pool is healthy ({:.1}% utilization)", utilization * 100.0),
355 )
356 };
357
358 ComponentHealth {
359 name: "connection_pool".to_string(),
360 status,
361 message,
362 duration_ms,
363 metadata: Some(metadata),
364 }
365 }
366
367 async fn check_required_tables(&self) -> ComponentHealth {
368 let start = Instant::now();
369 let mut missing_tables = Vec::new();
370
371 for table in &self.config.required_tables {
372 let result = sqlx::query(
373 "SELECT EXISTS (
374 SELECT FROM information_schema.tables
375 WHERE table_schema = 'public' AND table_name = $1
376 )",
377 )
378 .bind(table)
379 .fetch_one(self.pool.pool())
380 .await;
381
382 match result {
383 Ok(row) => {
384 let exists: bool = row.get(0);
385 if !exists {
386 missing_tables.push(table.clone());
387 }
388 }
389 Err(_) => {
390 missing_tables.push(table.clone());
391 }
392 }
393 }
394
395 let duration_ms = start.elapsed().as_millis() as u64;
396
397 if missing_tables.is_empty() {
398 ComponentHealth {
399 name: "tables".to_string(),
400 status: HealthStatus::Healthy,
401 message: "All required tables exist".to_string(),
402 duration_ms,
403 metadata: None,
404 }
405 } else {
406 let mut metadata = HashMap::new();
407 metadata.insert("missing_tables".to_string(), missing_tables.join(", "));
408
409 ComponentHealth {
410 name: "tables".to_string(),
411 status: HealthStatus::Unhealthy,
412 message: format!("Missing tables: {}", missing_tables.join(", ")),
413 duration_ms,
414 metadata: Some(metadata),
415 }
416 }
417 }
418
419 async fn check_replication_lag(&self) -> ComponentHealth {
420 let start = Instant::now();
421
422 let result = sqlx::query(
424 "SELECT CASE WHEN pg_is_in_recovery() THEN
425 EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::bigint
426 ELSE 0 END as lag_seconds",
427 )
428 .fetch_one(self.pool.pool())
429 .await;
430
431 let duration_ms = start.elapsed().as_millis() as u64;
432
433 match result {
434 Ok(row) => {
435 let lag_secs: i64 = row.get("lag_seconds");
436 let mut metadata = HashMap::new();
437 metadata.insert("lag_seconds".to_string(), lag_secs.to_string());
438
439 let (status, message) = if lag_secs == 0 {
440 (
441 HealthStatus::Healthy,
442 "Not a replica or no replication lag".to_string(),
443 )
444 } else if lag_secs > self.config.max_replication_lag_secs {
445 (
446 HealthStatus::Degraded,
447 format!("Replication lag is {lag_secs} seconds"),
448 )
449 } else {
450 (
451 HealthStatus::Healthy,
452 format!("Replication lag is {lag_secs} seconds"),
453 )
454 };
455
456 ComponentHealth {
457 name: "replication".to_string(),
458 status,
459 message,
460 duration_ms,
461 metadata: Some(metadata),
462 }
463 }
464 Err(e) => ComponentHealth {
465 name: "replication".to_string(),
466 status: HealthStatus::Degraded,
467 message: format!("Could not check replication lag: {e}"),
468 duration_ms,
469 metadata: None,
470 },
471 }
472 }
473
474 async fn check_disk_space(&self) -> ComponentHealth {
475 let start = Instant::now();
476
477 let result = sqlx::query(
478 "SELECT
479 pg_database_size(current_database()) as db_size,
480 pg_size_pretty(pg_database_size(current_database())) as db_size_pretty",
481 )
482 .fetch_one(self.pool.pool())
483 .await;
484
485 let duration_ms = start.elapsed().as_millis() as u64;
486
487 match result {
488 Ok(row) => {
489 let db_size: i64 = row.get("db_size");
490 let db_size_pretty: String = row.get("db_size_pretty");
491
492 let mut metadata = HashMap::new();
493 metadata.insert("database_size".to_string(), db_size_pretty.clone());
494 metadata.insert("database_size_bytes".to_string(), db_size.to_string());
495
496 ComponentHealth {
497 name: "disk_space".to_string(),
498 status: HealthStatus::Healthy,
499 message: format!("Database size: {db_size_pretty}"),
500 duration_ms,
501 metadata: Some(metadata),
502 }
503 }
504 Err(e) => ComponentHealth {
505 name: "disk_space".to_string(),
506 status: HealthStatus::Degraded,
507 message: format!("Could not check disk space: {e}"),
508 duration_ms,
509 metadata: None,
510 },
511 }
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518
519 #[test]
520 fn test_default_health_check_config() {
521 let config = HealthCheckConfig::default();
522 assert_eq!(config.ping_timeout, Duration::from_secs(5));
523 assert_eq!(config.pool_degraded_threshold, 0.8);
524 assert_eq!(config.pool_unhealthy_threshold, 0.95);
525 assert!(!config.required_tables.is_empty());
526 assert!(!config.check_replication_lag);
527 }
528
529 #[test]
530 fn test_health_status() {
531 let report = HealthReport {
532 status: HealthStatus::Healthy,
533 message: "All good".to_string(),
534 components: vec![],
535 total_duration_ms: 100,
536 timestamp: 0,
537 };
538
539 assert!(report.is_healthy());
540 assert!(!report.is_degraded());
541 assert!(!report.is_unhealthy());
542 }
543
544 #[test]
545 fn test_health_report_filters() {
546 let report = HealthReport {
547 status: HealthStatus::Degraded,
548 message: "Some issues".to_string(),
549 components: vec![
550 ComponentHealth {
551 name: "db".to_string(),
552 status: HealthStatus::Healthy,
553 message: "OK".to_string(),
554 duration_ms: 10,
555 metadata: None,
556 },
557 ComponentHealth {
558 name: "pool".to_string(),
559 status: HealthStatus::Degraded,
560 message: "High load".to_string(),
561 duration_ms: 5,
562 metadata: None,
563 },
564 ComponentHealth {
565 name: "disk".to_string(),
566 status: HealthStatus::Unhealthy,
567 message: "Low space".to_string(),
568 duration_ms: 8,
569 metadata: None,
570 },
571 ],
572 total_duration_ms: 23,
573 timestamp: 0,
574 };
575
576 assert_eq!(report.degraded_components().len(), 1);
577 assert_eq!(report.unhealthy_components().len(), 1);
578 assert_eq!(report.degraded_components()[0].name, "pool");
579 assert_eq!(report.unhealthy_components()[0].name, "disk");
580 }
581}