Skip to main content

amaters_server/
health.rs

1//! Health check endpoint
2//!
3//! Provides health status information for monitoring and orchestration systems.
4//! Supports deep health probes, readiness/liveness separation, health history
5//! tracking, and dependency health aggregation.
6
7use async_trait::async_trait;
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
15use tokio::io::{AsyncReadExt, AsyncWriteExt};
16use tokio::net::TcpListener;
17use tracing::{debug, warn};
18
19// ---------------------------------------------------------------------------
20// Health status types
21// ---------------------------------------------------------------------------
22
23/// Health status
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "lowercase")]
26pub enum HealthStatus {
27    /// Server is healthy and ready to serve requests
28    Healthy,
29    /// Server is starting up
30    Starting,
31    /// Server is shutting down
32    ShuttingDown,
33    /// Server has encountered an error
34    Unhealthy,
35    /// Server is operational but degraded
36    Degraded,
37}
38
39/// Component health status
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ComponentHealth {
42    /// Component name
43    pub name: String,
44    /// Component status
45    pub status: HealthStatus,
46    /// Optional message
47    pub message: Option<String>,
48    /// Last check timestamp
49    pub last_check: u64,
50}
51
52// ---------------------------------------------------------------------------
53// Deep health probe types
54// ---------------------------------------------------------------------------
55
56/// Result of a deep health probe
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct HealthProbeResult {
59    /// Probe status
60    pub status: ProbeStatus,
61    /// Latency of the probe in milliseconds
62    pub latency_ms: f64,
63    /// Human-readable message
64    pub message: String,
65}
66
67/// Probe status (more granular than HealthStatus)
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "lowercase")]
70pub enum ProbeStatus {
71    /// Everything is fine
72    Healthy,
73    /// Operational but degraded
74    Degraded,
75    /// Not operational
76    Unhealthy,
77}
78
79impl ProbeStatus {
80    /// Convert to a HealthStatus
81    pub fn to_health_status(self) -> HealthStatus {
82        match self {
83            ProbeStatus::Healthy => HealthStatus::Healthy,
84            ProbeStatus::Degraded => HealthStatus::Degraded,
85            ProbeStatus::Unhealthy => HealthStatus::Unhealthy,
86        }
87    }
88
89    /// Return the worse of two statuses
90    pub fn worse(self, other: ProbeStatus) -> ProbeStatus {
91        match (self, other) {
92            (ProbeStatus::Unhealthy, _) | (_, ProbeStatus::Unhealthy) => ProbeStatus::Unhealthy,
93            (ProbeStatus::Degraded, _) | (_, ProbeStatus::Degraded) => ProbeStatus::Degraded,
94            _ => ProbeStatus::Healthy,
95        }
96    }
97}
98
99/// Trait for deep health check probes
100#[async_trait]
101pub trait DeepHealthCheck: Send + Sync {
102    /// Execute the health check and return the result
103    async fn check(&self) -> HealthProbeResult;
104}
105
106// ---------------------------------------------------------------------------
107// Built-in probes
108// ---------------------------------------------------------------------------
109
110/// Storage probe — verifies storage is readable/writable by writing a test
111/// key, reading it back, and deleting it.
112pub struct StorageProbe {
113    /// Path to storage directory for the probe
114    storage_path: std::path::PathBuf,
115}
116
117impl StorageProbe {
118    /// Create a new storage probe targeting the given directory
119    pub fn new(storage_path: std::path::PathBuf) -> Self {
120        Self { storage_path }
121    }
122}
123
124#[async_trait]
125impl DeepHealthCheck for StorageProbe {
126    async fn check(&self) -> HealthProbeResult {
127        let start = Instant::now();
128        let test_file = self.storage_path.join(".health_probe_test");
129
130        // Write
131        let write_result = tokio::fs::write(&test_file, b"health_probe").await;
132        if let Err(e) = write_result {
133            return HealthProbeResult {
134                status: ProbeStatus::Unhealthy,
135                latency_ms: start.elapsed().as_secs_f64() * 1000.0,
136                message: format!("storage write failed: {e}"),
137            };
138        }
139
140        // Read back
141        let read_result = tokio::fs::read(&test_file).await;
142        match read_result {
143            Ok(data) if data == b"health_probe" => {}
144            Ok(_) => {
145                let _ = tokio::fs::remove_file(&test_file).await;
146                return HealthProbeResult {
147                    status: ProbeStatus::Unhealthy,
148                    latency_ms: start.elapsed().as_secs_f64() * 1000.0,
149                    message: "storage read returned unexpected data".to_string(),
150                };
151            }
152            Err(e) => {
153                let _ = tokio::fs::remove_file(&test_file).await;
154                return HealthProbeResult {
155                    status: ProbeStatus::Unhealthy,
156                    latency_ms: start.elapsed().as_secs_f64() * 1000.0,
157                    message: format!("storage read failed: {e}"),
158                };
159            }
160        }
161
162        // Delete
163        if let Err(e) = tokio::fs::remove_file(&test_file).await {
164            return HealthProbeResult {
165                status: ProbeStatus::Degraded,
166                latency_ms: start.elapsed().as_secs_f64() * 1000.0,
167                message: format!("storage cleanup failed (non-critical): {e}"),
168            };
169        }
170
171        HealthProbeResult {
172            status: ProbeStatus::Healthy,
173            latency_ms: start.elapsed().as_secs_f64() * 1000.0,
174            message: "storage read/write/delete OK".to_string(),
175        }
176    }
177}
178
179/// WAL probe — verifies the write-ahead log directory is appendable.
180pub struct WalProbe {
181    /// Path to the WAL directory
182    wal_path: std::path::PathBuf,
183}
184
185impl WalProbe {
186    /// Create a new WAL probe targeting the given directory
187    pub fn new(wal_path: std::path::PathBuf) -> Self {
188        Self { wal_path }
189    }
190}
191
192#[async_trait]
193impl DeepHealthCheck for WalProbe {
194    async fn check(&self) -> HealthProbeResult {
195        let start = Instant::now();
196        let test_file = self.wal_path.join(".wal_health_probe");
197
198        // Try to append
199        let result = tokio::fs::OpenOptions::new()
200            .create(true)
201            .append(true)
202            .truncate(false)
203            .open(&test_file)
204            .await;
205
206        match result {
207            Ok(_file) => {
208                let _ = tokio::fs::remove_file(&test_file).await;
209                HealthProbeResult {
210                    status: ProbeStatus::Healthy,
211                    latency_ms: start.elapsed().as_secs_f64() * 1000.0,
212                    message: "WAL directory is appendable".to_string(),
213                }
214            }
215            Err(e) => HealthProbeResult {
216                status: ProbeStatus::Unhealthy,
217                latency_ms: start.elapsed().as_secs_f64() * 1000.0,
218                message: format!("WAL append test failed: {e}"),
219            },
220        }
221    }
222}
223
224// Raw statvfs binding — avoids depending on the `libc` crate.
225#[cfg(any(target_os = "macos", target_os = "linux"))]
226unsafe extern "C" {
227    #[link_name = "statvfs"]
228    fn statvfs_raw(path: *const std::ffi::c_char, buf: *mut u8) -> std::ffi::c_int;
229}
230
231/// Disk space probe — checks available disk space against a threshold.
232pub struct DiskSpaceProbe {
233    /// Path to check disk space for
234    path: std::path::PathBuf,
235    /// Minimum required free bytes
236    min_free_bytes: u64,
237}
238
239impl DiskSpaceProbe {
240    /// Create a new disk space probe.
241    ///
242    /// `min_free_bytes` is the threshold below which the probe reports degraded
243    /// (at half the threshold) or unhealthy (at zero or below threshold / 4).
244    pub fn new(path: std::path::PathBuf, min_free_bytes: u64) -> Self {
245        Self {
246            path,
247            min_free_bytes,
248        }
249    }
250
251    /// Get available space on the filesystem containing `path`.
252    ///
253    /// Uses platform-specific raw syscalls without depending on the `libc`
254    /// crate, keeping the build pure Rust.
255    fn available_space(&self) -> Result<u64, String> {
256        self.available_space_impl()
257    }
258
259    #[cfg(target_os = "macos")]
260    fn available_space_impl(&self) -> Result<u64, String> {
261        use std::ffi::CString;
262        use std::os::unix::ffi::OsStrExt;
263
264        let c_path = CString::new(self.path.as_os_str().as_bytes())
265            .map_err(|e| format!("invalid path: {e}"))?;
266
267        // macOS statvfs layout (LP64). We only need f_frsize and f_bavail.
268        // struct statvfs is 64 bytes on macOS (all u64 fields after the
269        // initial f_bsize). We allocate a generous buffer.
270        #[repr(C)]
271        struct Statvfs {
272            f_bsize: u64,
273            f_frsize: u64,
274            f_blocks: u64,
275            f_bfree: u64,
276            f_bavail: u64,
277            // remaining fields not needed
278            _pad: [u64; 11],
279        }
280
281        let mut buf: Statvfs = unsafe { std::mem::zeroed() };
282        // macOS syscall: statvfs(2)
283        let ret = unsafe { statvfs_raw(c_path.as_ptr(), &mut buf as *mut Statvfs as *mut u8) };
284        if ret != 0 {
285            return Err(format!(
286                "statvfs failed: {}",
287                std::io::Error::last_os_error()
288            ));
289        }
290
291        let available = buf.f_bavail.saturating_mul(buf.f_frsize);
292        Ok(available)
293    }
294
295    #[cfg(target_os = "linux")]
296    fn available_space_impl(&self) -> Result<u64, String> {
297        use std::ffi::CString;
298        use std::os::unix::ffi::OsStrExt;
299
300        let c_path = CString::new(self.path.as_os_str().as_bytes())
301            .map_err(|e| format!("invalid path: {e}"))?;
302
303        #[repr(C)]
304        struct Statvfs {
305            f_bsize: u64,
306            f_frsize: u64,
307            f_blocks: u64,
308            f_bfree: u64,
309            f_bavail: u64,
310            _pad: [u64; 11],
311        }
312
313        let mut buf: Statvfs = unsafe { std::mem::zeroed() };
314        let ret = unsafe { statvfs_raw(c_path.as_ptr(), &mut buf as *mut Statvfs as *mut u8) };
315        if ret != 0 {
316            return Err(format!(
317                "statvfs failed: {}",
318                std::io::Error::last_os_error()
319            ));
320        }
321
322        let available = buf.f_bavail.saturating_mul(buf.f_frsize);
323        Ok(available)
324    }
325
326    #[cfg(not(any(target_os = "macos", target_os = "linux")))]
327    fn available_space_impl(&self) -> Result<u64, String> {
328        // On unsupported platforms, just verify the directory exists.
329        if self.path.exists() {
330            Ok(u64::MAX)
331        } else {
332            Err("path does not exist".to_string())
333        }
334    }
335}
336
337#[async_trait]
338impl DeepHealthCheck for DiskSpaceProbe {
339    async fn check(&self) -> HealthProbeResult {
340        let start = Instant::now();
341
342        match self.available_space() {
343            Ok(available) => {
344                let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
345                if available >= self.min_free_bytes {
346                    HealthProbeResult {
347                        status: ProbeStatus::Healthy,
348                        latency_ms,
349                        message: format!(
350                            "disk space OK: {} bytes available (threshold: {})",
351                            available, self.min_free_bytes
352                        ),
353                    }
354                } else if available >= self.min_free_bytes / 4 {
355                    HealthProbeResult {
356                        status: ProbeStatus::Degraded,
357                        latency_ms,
358                        message: format!(
359                            "disk space low: {} bytes available (threshold: {})",
360                            available, self.min_free_bytes
361                        ),
362                    }
363                } else {
364                    HealthProbeResult {
365                        status: ProbeStatus::Unhealthy,
366                        latency_ms,
367                        message: format!(
368                            "disk space critically low: {} bytes available (threshold: {})",
369                            available, self.min_free_bytes
370                        ),
371                    }
372                }
373            }
374            Err(e) => HealthProbeResult {
375                status: ProbeStatus::Unhealthy,
376                latency_ms: start.elapsed().as_secs_f64() * 1000.0,
377                message: format!("disk space check failed: {e}"),
378            },
379        }
380    }
381}
382
383// ---------------------------------------------------------------------------
384// Health history
385// ---------------------------------------------------------------------------
386
387/// A point-in-time health snapshot
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct HealthSnapshot {
390    /// Timestamp (seconds since UNIX epoch)
391    pub timestamp: u64,
392    /// Overall status at this point
393    pub status: HealthStatus,
394    /// Whether the server was alive
395    pub alive: bool,
396    /// Whether the server was ready
397    pub ready: bool,
398}
399
400/// Ring buffer for health check history
401#[derive(Debug)]
402pub struct HealthHistory {
403    /// Fixed-size buffer of snapshots (ring buffer)
404    buffer: Vec<Option<HealthSnapshot>>,
405    /// Current write position
406    write_pos: usize,
407    /// Number of entries written (may exceed capacity — used for stats)
408    total_written: usize,
409    /// Capacity
410    capacity: usize,
411}
412
413impl HealthHistory {
414    /// Create a new history buffer with the given capacity
415    pub fn new(capacity: usize) -> Self {
416        let capacity = capacity.max(1); // at least 1
417        Self {
418            buffer: (0..capacity).map(|_| None).collect(),
419            write_pos: 0,
420            total_written: 0,
421            capacity,
422        }
423    }
424
425    /// Record a new snapshot
426    pub fn record(&mut self, snapshot: HealthSnapshot) {
427        self.buffer[self.write_pos] = Some(snapshot);
428        self.write_pos = (self.write_pos + 1) % self.capacity;
429        self.total_written += 1;
430    }
431
432    /// Return all recorded snapshots in chronological order
433    pub fn snapshots(&self) -> Vec<HealthSnapshot> {
434        let count = self.total_written.min(self.capacity);
435        let mut result = Vec::with_capacity(count);
436
437        if self.total_written < self.capacity {
438            // Haven't wrapped yet — entries are 0..write_pos
439            for s in self.buffer.iter().take(self.write_pos).flatten() {
440                result.push(s.clone());
441            }
442        } else {
443            // Wrapped — oldest is at write_pos, read around
444            for i in 0..self.capacity {
445                let idx = (self.write_pos + i) % self.capacity;
446                if let Some(s) = &self.buffer[idx] {
447                    result.push(s.clone());
448                }
449            }
450        }
451
452        result
453    }
454
455    /// Calculate uptime percentage from the history buffer.
456    /// "Up" means the snapshot's `alive` field is true.
457    pub fn uptime_percent(&self) -> f64 {
458        let snaps = self.snapshots();
459        if snaps.is_empty() {
460            return 100.0;
461        }
462        let alive_count = snaps.iter().filter(|s| s.alive).count();
463        (alive_count as f64 / snaps.len() as f64) * 100.0
464    }
465}
466
467// ---------------------------------------------------------------------------
468// Dependency health
469// ---------------------------------------------------------------------------
470
471/// Health information for a single dependency
472#[derive(Debug, Clone, Serialize, Deserialize)]
473pub struct DependencyHealth {
474    /// Dependency name
475    pub name: String,
476    /// Current status
477    pub status: ProbeStatus,
478    /// Latency in milliseconds
479    pub latency_ms: f64,
480    /// Last checked timestamp (seconds since UNIX epoch)
481    pub last_checked: u64,
482    /// Human-readable message
483    pub message: String,
484}
485
486// ---------------------------------------------------------------------------
487// Liveness / readiness responses
488// ---------------------------------------------------------------------------
489
490/// Liveness check response (lightweight)
491#[derive(Debug, Clone, Serialize, Deserialize)]
492pub struct LivenessResponse {
493    /// Whether the process is alive
494    pub alive: bool,
495    /// Current status
496    pub status: HealthStatus,
497    /// Uptime in seconds
498    pub uptime_seconds: u64,
499    /// Timestamp
500    pub timestamp: u64,
501}
502
503/// Readiness check response (includes dependency detail)
504#[derive(Debug, Clone, Serialize, Deserialize)]
505pub struct ReadinessResponse {
506    /// Whether the server is ready to serve traffic
507    pub ready: bool,
508    /// Current status
509    pub status: HealthStatus,
510    /// Component statuses
511    pub components: Vec<ComponentHealth>,
512    /// Dependency statuses
513    pub dependencies: Vec<DependencyHealth>,
514    /// Timestamp
515    pub timestamp: u64,
516}
517
518// ---------------------------------------------------------------------------
519// Overall health check response (enhanced)
520// ---------------------------------------------------------------------------
521
522/// Overall health check response
523#[derive(Debug, Clone, Serialize, Deserialize)]
524pub struct HealthCheckResponse {
525    /// Overall status
526    pub status: HealthStatus,
527    /// Server version
528    pub version: String,
529    /// Uptime in seconds
530    pub uptime_seconds: u64,
531    /// Component health statuses
532    pub components: Vec<ComponentHealth>,
533    /// Dependency health statuses
534    pub dependencies: Vec<DependencyHealth>,
535    /// Deep probe results (keyed by probe name)
536    pub probes: HashMap<String, HealthProbeResult>,
537    /// Uptime percentage from history
538    pub uptime_percent: f64,
539    /// Current timestamp
540    pub timestamp: u64,
541}
542
543// ---------------------------------------------------------------------------
544// HealthChecker
545// ---------------------------------------------------------------------------
546
547/// Health checker
548///
549/// Tracks the health of various server components, runs deep probes,
550/// maintains health history, and aggregates dependency health.
551#[derive(Clone)]
552pub struct HealthChecker {
553    inner: Arc<HealthCheckerInner>,
554}
555
556struct HealthCheckerInner {
557    /// Server start time
558    start_time: AtomicU64,
559    /// Overall status (encoded as u64)
560    status: AtomicU64,
561    /// Storage health
562    storage_healthy: AtomicBool,
563    /// Network health
564    network_healthy: AtomicBool,
565    /// Whether cluster mode is enabled
566    cluster_enabled: AtomicBool,
567    /// Cluster health (only meaningful when cluster_enabled is true)
568    cluster_healthy: AtomicBool,
569    /// Registered deep health probes
570    probes: RwLock<HashMap<String, Arc<dyn DeepHealthCheck>>>,
571    /// Registered dependency checkers
572    dependency_checkers: RwLock<HashMap<String, Arc<dyn DeepHealthCheck>>>,
573    /// Cached dependency health results
574    dependency_health: RwLock<HashMap<String, DependencyHealth>>,
575    /// Health history
576    history: RwLock<HealthHistory>,
577}
578
579fn now_secs() -> u64 {
580    SystemTime::now()
581        .duration_since(UNIX_EPOCH)
582        .map(|d| d.as_secs())
583        .unwrap_or(0)
584}
585
586impl HealthChecker {
587    /// Create a new health checker with default history capacity (10)
588    pub fn new() -> Self {
589        Self::with_history_capacity(10)
590    }
591
592    /// Create a new health checker with the given history capacity
593    pub fn with_history_capacity(capacity: usize) -> Self {
594        Self {
595            inner: Arc::new(HealthCheckerInner {
596                start_time: AtomicU64::new(now_secs()),
597                status: AtomicU64::new(HealthStatus::Starting as u64),
598                storage_healthy: AtomicBool::new(false),
599                network_healthy: AtomicBool::new(false),
600                cluster_enabled: AtomicBool::new(false),
601                cluster_healthy: AtomicBool::new(false),
602                probes: RwLock::new(HashMap::new()),
603                dependency_checkers: RwLock::new(HashMap::new()),
604                dependency_health: RwLock::new(HashMap::new()),
605                history: RwLock::new(HealthHistory::new(capacity)),
606            }),
607        }
608    }
609
610    // ---- status getters/setters (same API as before) ----
611
612    /// Set overall status
613    pub fn set_status(&self, status: HealthStatus) {
614        self.inner.status.store(status as u64, Ordering::SeqCst);
615    }
616
617    /// Get current overall status
618    pub fn status(&self) -> HealthStatus {
619        match self.inner.status.load(Ordering::SeqCst) {
620            0 => HealthStatus::Healthy,
621            1 => HealthStatus::Starting,
622            2 => HealthStatus::ShuttingDown,
623            3 => HealthStatus::Unhealthy,
624            4 => HealthStatus::Degraded,
625            _ => HealthStatus::Unhealthy,
626        }
627    }
628
629    /// Mark storage as healthy
630    pub fn set_storage_healthy(&self, healthy: bool) {
631        self.inner.storage_healthy.store(healthy, Ordering::SeqCst);
632    }
633
634    /// Mark network as healthy
635    pub fn set_network_healthy(&self, healthy: bool) {
636        self.inner.network_healthy.store(healthy, Ordering::SeqCst);
637    }
638
639    /// Mark cluster mode as enabled
640    pub fn set_cluster_enabled(&self, enabled: bool) {
641        self.inner.cluster_enabled.store(enabled, Ordering::SeqCst);
642    }
643
644    /// Mark cluster as healthy
645    pub fn set_cluster_healthy(&self, healthy: bool) {
646        self.inner.cluster_healthy.store(healthy, Ordering::SeqCst);
647    }
648
649    /// Get uptime in seconds
650    pub fn uptime_seconds(&self) -> u64 {
651        let now = now_secs();
652        let start = self.inner.start_time.load(Ordering::SeqCst);
653        now.saturating_sub(start)
654    }
655
656    // ---- liveness / readiness ----
657
658    /// Check if server is alive (not shutting down or unhealthy).
659    /// This is a lightweight check suitable for liveness probes.
660    pub fn is_alive(&self) -> bool {
661        matches!(
662            self.status(),
663            HealthStatus::Healthy | HealthStatus::Starting | HealthStatus::Degraded
664        )
665    }
666
667    /// Check if server is ready to serve traffic.
668    /// Returns false during startup, shutdown, and recovery.
669    pub fn is_ready(&self) -> bool {
670        let status = self.status();
671        let base_ok = matches!(status, HealthStatus::Healthy | HealthStatus::Degraded);
672        base_ok
673            && self.inner.storage_healthy.load(Ordering::SeqCst)
674            && self.inner.network_healthy.load(Ordering::SeqCst)
675    }
676
677    /// Build a liveness response (lightweight, fast)
678    pub fn liveness_response(&self) -> LivenessResponse {
679        LivenessResponse {
680            alive: self.is_alive(),
681            status: self.status(),
682            uptime_seconds: self.uptime_seconds(),
683            timestamp: now_secs(),
684        }
685    }
686
687    /// Build a readiness response (includes component and dependency info)
688    pub fn readiness_response(&self) -> ReadinessResponse {
689        let components = self.build_component_list();
690        let dependencies: Vec<DependencyHealth> = self
691            .inner
692            .dependency_health
693            .read()
694            .values()
695            .cloned()
696            .collect();
697
698        ReadinessResponse {
699            ready: self.is_ready(),
700            status: self.status(),
701            components,
702            dependencies,
703            timestamp: now_secs(),
704        }
705    }
706
707    // ---- deep probes ----
708
709    /// Register a deep health probe under the given name
710    pub fn register_probe(&self, name: impl Into<String>, probe: Arc<dyn DeepHealthCheck>) {
711        self.inner.probes.write().insert(name.into(), probe);
712    }
713
714    /// Run all registered deep probes and return their results
715    pub async fn run_probes(&self) -> HashMap<String, HealthProbeResult> {
716        let probes: Vec<(String, Arc<dyn DeepHealthCheck>)> = {
717            let guard = self.inner.probes.read();
718            guard
719                .iter()
720                .map(|(k, v)| (k.clone(), Arc::clone(v)))
721                .collect()
722        };
723
724        let mut results = HashMap::with_capacity(probes.len());
725        for (name, probe) in probes {
726            let result = probe.check().await;
727            results.insert(name, result);
728        }
729        results
730    }
731
732    // ---- dependency health ----
733
734    /// Register a dependency health checker
735    pub fn register_dependency(&self, name: impl Into<String>, checker: Arc<dyn DeepHealthCheck>) {
736        let name = name.into();
737        self.inner
738            .dependency_checkers
739            .write()
740            .insert(name.clone(), checker);
741        // Initialize with unknown state
742        self.inner.dependency_health.write().insert(
743            name.clone(),
744            DependencyHealth {
745                name,
746                status: ProbeStatus::Unhealthy,
747                latency_ms: 0.0,
748                last_checked: 0,
749                message: "not yet checked".to_string(),
750            },
751        );
752    }
753
754    /// Run all dependency checks and update cached results.
755    /// Returns the aggregated worst status.
756    pub async fn check_dependencies(&self) -> ProbeStatus {
757        let checkers: Vec<(String, Arc<dyn DeepHealthCheck>)> = {
758            let guard = self.inner.dependency_checkers.read();
759            guard
760                .iter()
761                .map(|(k, v)| (k.clone(), Arc::clone(v)))
762                .collect()
763        };
764
765        let mut worst = ProbeStatus::Healthy;
766        let now = now_secs();
767
768        for (name, checker) in checkers {
769            let result = checker.check().await;
770            worst = worst.worse(result.status);
771            let dep = DependencyHealth {
772                name: name.clone(),
773                status: result.status,
774                latency_ms: result.latency_ms,
775                last_checked: now,
776                message: result.message,
777            };
778            self.inner.dependency_health.write().insert(name, dep);
779        }
780
781        worst
782    }
783
784    /// Get the current aggregated dependency status (from cached results)
785    pub fn aggregated_dependency_status(&self) -> ProbeStatus {
786        let guard = self.inner.dependency_health.read();
787        guard
788            .values()
789            .fold(ProbeStatus::Healthy, |acc, d| acc.worse(d.status))
790    }
791
792    // ---- health history ----
793
794    /// Record a snapshot of the current health state into the history buffer
795    pub fn record_snapshot(&self) {
796        let snapshot = HealthSnapshot {
797            timestamp: now_secs(),
798            status: self.status(),
799            alive: self.is_alive(),
800            ready: self.is_ready(),
801        };
802        self.inner.history.write().record(snapshot);
803    }
804
805    /// Get the health check history as a chronologically ordered list
806    pub fn health_history(&self) -> Vec<HealthSnapshot> {
807        self.inner.history.read().snapshots()
808    }
809
810    /// Get the uptime percentage from the history buffer
811    pub fn uptime_percent(&self) -> f64 {
812        self.inner.history.read().uptime_percent()
813    }
814
815    // ---- full health response ----
816
817    /// Get full health check response (enhanced with probes, deps, history)
818    pub fn get_health(&self) -> HealthCheckResponse {
819        let components = self.build_component_list();
820        let dependencies: Vec<DependencyHealth> = self
821            .inner
822            .dependency_health
823            .read()
824            .values()
825            .cloned()
826            .collect();
827        let probes = HashMap::new(); // synchronous path — probes not run
828        let uptime_pct = self.uptime_percent();
829
830        HealthCheckResponse {
831            status: self.status(),
832            version: env!("CARGO_PKG_VERSION").to_string(),
833            uptime_seconds: self.uptime_seconds(),
834            components,
835            dependencies,
836            probes,
837            uptime_percent: uptime_pct,
838            timestamp: now_secs(),
839        }
840    }
841
842    /// Get full health check response including deep probe results (async)
843    pub async fn get_health_deep(&self) -> HealthCheckResponse {
844        let components = self.build_component_list();
845        let dependencies: Vec<DependencyHealth> = self
846            .inner
847            .dependency_health
848            .read()
849            .values()
850            .cloned()
851            .collect();
852        let probes = self.run_probes().await;
853        let uptime_pct = self.uptime_percent();
854
855        HealthCheckResponse {
856            status: self.status(),
857            version: env!("CARGO_PKG_VERSION").to_string(),
858            uptime_seconds: self.uptime_seconds(),
859            components,
860            dependencies,
861            probes,
862            uptime_percent: uptime_pct,
863            timestamp: now_secs(),
864        }
865    }
866
867    /// Format health as JSON
868    pub fn get_health_json(&self) -> Result<String, serde_json::Error> {
869        serde_json::to_string_pretty(&self.get_health())
870    }
871
872    // ---- helpers ----
873
874    fn build_component_list(&self) -> Vec<ComponentHealth> {
875        let now = now_secs();
876
877        let storage_status = if self.inner.storage_healthy.load(Ordering::SeqCst) {
878            HealthStatus::Healthy
879        } else {
880            HealthStatus::Unhealthy
881        };
882
883        let network_status = if self.inner.network_healthy.load(Ordering::SeqCst) {
884            HealthStatus::Healthy
885        } else {
886            HealthStatus::Unhealthy
887        };
888
889        let cluster_healthy = self.inner.cluster_healthy.load(Ordering::SeqCst);
890        let cluster_enabled = self.inner.cluster_enabled.load(Ordering::SeqCst);
891        let cluster_status = if cluster_enabled {
892            if cluster_healthy {
893                HealthStatus::Healthy
894            } else {
895                HealthStatus::Unhealthy
896            }
897        } else {
898            HealthStatus::Starting // Cluster is optional, not enabled
899        };
900
901        let cluster_message = if cluster_enabled {
902            if cluster_healthy {
903                "cluster active".to_string()
904            } else {
905                "cluster unhealthy".to_string()
906            }
907        } else {
908            "cluster disabled (standalone mode)".to_string()
909        };
910
911        vec![
912            ComponentHealth {
913                name: "storage".to_string(),
914                status: storage_status,
915                message: None,
916                last_check: now,
917            },
918            ComponentHealth {
919                name: "network".to_string(),
920                status: network_status,
921                message: None,
922                last_check: now,
923            },
924            ComponentHealth {
925                name: "cluster".to_string(),
926                status: cluster_status,
927                message: Some(cluster_message),
928                last_check: now,
929            },
930        ]
931    }
932}
933
934impl Default for HealthChecker {
935    fn default() -> Self {
936        Self::new()
937    }
938}
939
940// ---------------------------------------------------------------------------
941// HTTP health check server
942// ---------------------------------------------------------------------------
943
944/// Lightweight HTTP server that exposes health check endpoints.
945///
946/// Routes:
947/// - `GET /health`  — full health status (JSON)
948/// - `GET /healthz` — simple alive check (200 or 503)
949/// - `GET /readyz`  — readiness check (200 or 503)
950/// - `GET /livez`   — liveness check (200 or 503)
951/// - `GET /metrics` — health metrics (history, uptime percentage)
952pub struct HealthHttpServer {
953    checker: Arc<HealthChecker>,
954    bind_addr: SocketAddr,
955    shutdown: Arc<AtomicBool>,
956}
957
958/// Handle returned by [`HealthHttpServer::start`] to control the running server.
959pub struct HealthHttpHandle {
960    shutdown: Arc<AtomicBool>,
961    port: u16,
962    join_handle: tokio::task::JoinHandle<Result<(), std::io::Error>>,
963}
964
965impl HealthHttpHandle {
966    /// Signal the HTTP health server to stop accepting new connections.
967    pub fn stop(&self) {
968        self.shutdown.store(true, Ordering::SeqCst);
969    }
970
971    /// Return the port the server is listening on.
972    pub fn port(&self) -> u16 {
973        self.port
974    }
975
976    /// Wait for the server task to finish after calling [`stop`](Self::stop).
977    pub async fn join(self) -> Result<(), std::io::Error> {
978        match self.join_handle.await {
979            Ok(inner) => inner,
980            Err(e) => Err(std::io::Error::other(e)),
981        }
982    }
983}
984
985impl HealthHttpServer {
986    /// Create a new health HTTP server.
987    ///
988    /// `bind_addr` is the address to listen on (e.g. `0.0.0.0:8081`).
989    pub fn new(checker: Arc<HealthChecker>, bind_addr: SocketAddr) -> Self {
990        Self {
991            checker,
992            bind_addr,
993            shutdown: Arc::new(AtomicBool::new(false)),
994        }
995    }
996
997    /// Start the server in a background tokio task.
998    ///
999    /// Returns a [`HealthHttpHandle`] that can be used to query the port and
1000    /// signal shutdown.
1001    pub async fn start(self) -> Result<HealthHttpHandle, std::io::Error> {
1002        let listener = TcpListener::bind(self.bind_addr).await?;
1003        let local_addr = listener.local_addr()?;
1004        let port = local_addr.port();
1005        let shutdown = Arc::clone(&self.shutdown);
1006        let checker = Arc::clone(&self.checker);
1007
1008        let shutdown_flag = Arc::clone(&shutdown);
1009        let join_handle =
1010            tokio::spawn(async move { Self::accept_loop(listener, checker, shutdown_flag).await });
1011
1012        Ok(HealthHttpHandle {
1013            shutdown,
1014            port,
1015            join_handle,
1016        })
1017    }
1018
1019    /// Main accept loop.
1020    async fn accept_loop(
1021        listener: TcpListener,
1022        checker: Arc<HealthChecker>,
1023        shutdown: Arc<AtomicBool>,
1024    ) -> Result<(), std::io::Error> {
1025        loop {
1026            if shutdown.load(Ordering::SeqCst) {
1027                debug!("health HTTP server shutting down");
1028                break;
1029            }
1030
1031            // Use a short timeout so we can check the shutdown flag periodically.
1032            let accept_result =
1033                tokio::time::timeout(Duration::from_millis(200), listener.accept()).await;
1034
1035            match accept_result {
1036                Ok(Ok((stream, _addr))) => {
1037                    let checker = Arc::clone(&checker);
1038                    tokio::spawn(async move {
1039                        if let Err(e) = Self::handle_connection(stream, &checker).await {
1040                            warn!("health HTTP connection error: {e}");
1041                        }
1042                    });
1043                }
1044                Ok(Err(e)) => {
1045                    warn!("health HTTP accept error: {e}");
1046                }
1047                Err(_) => {
1048                    // Timeout — loop back to check shutdown flag
1049                }
1050            }
1051        }
1052        Ok(())
1053    }
1054
1055    /// Handle a single TCP connection: read the HTTP request, route, respond.
1056    async fn handle_connection(
1057        mut stream: tokio::net::TcpStream,
1058        checker: &HealthChecker,
1059    ) -> Result<(), std::io::Error> {
1060        let mut buf = [0u8; 4096];
1061        let n = stream.read(&mut buf).await?;
1062        if n == 0 {
1063            return Ok(());
1064        }
1065
1066        let request = String::from_utf8_lossy(&buf[..n]);
1067        let (method, path) = Self::parse_request_line(&request);
1068
1069        let (status_code, status_text, body) = match method {
1070            "GET" => Self::route(path, checker),
1071            _ => (
1072                405,
1073                "Method Not Allowed",
1074                r#"{"error":"method not allowed"}"#.to_string(),
1075            ),
1076        };
1077
1078        let response = format!(
1079            "HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1080            status_code,
1081            status_text,
1082            body.len(),
1083            body
1084        );
1085
1086        stream.write_all(response.as_bytes()).await?;
1087        stream.flush().await?;
1088        Ok(())
1089    }
1090
1091    /// Parse the request line (first line of the HTTP request).
1092    /// Returns (method, path). Defaults to ("", "") on malformed input.
1093    fn parse_request_line(request: &str) -> (&str, &str) {
1094        let first_line = request.lines().next().unwrap_or("");
1095        let mut parts = first_line.split_whitespace();
1096        let method = parts.next().unwrap_or("");
1097        let path = parts.next().unwrap_or("");
1098        (method, path)
1099    }
1100
1101    /// Route a GET request to the appropriate handler.
1102    fn route(path: &str, checker: &HealthChecker) -> (u16, &'static str, String) {
1103        match path {
1104            "/health" => Self::handle_health(checker),
1105            "/healthz" => Self::handle_healthz(checker),
1106            "/readyz" => Self::handle_readyz(checker),
1107            "/livez" => Self::handle_livez(checker),
1108            "/metrics" => Self::handle_metrics(checker),
1109            _ => (404, "Not Found", r#"{"error":"not found"}"#.to_string()),
1110        }
1111    }
1112
1113    /// `GET /health` — full health status JSON.
1114    fn handle_health(checker: &HealthChecker) -> (u16, &'static str, String) {
1115        let health = checker.get_health();
1116        let status_code = match health.status {
1117            HealthStatus::Healthy | HealthStatus::Degraded => 200,
1118            _ => 503,
1119        };
1120        let status_text = if status_code == 200 {
1121            "OK"
1122        } else {
1123            "Service Unavailable"
1124        };
1125        let body = serde_json::to_string(&health)
1126            .unwrap_or_else(|e| format!(r#"{{"error":"serialization failed: {e}"}}"#));
1127        (status_code, status_text, body)
1128    }
1129
1130    /// `GET /healthz` — simple alive check.
1131    fn handle_healthz(checker: &HealthChecker) -> (u16, &'static str, String) {
1132        let alive = checker.is_alive();
1133        let status_code = if alive { 200 } else { 503 };
1134        let status_text = if alive { "OK" } else { "Service Unavailable" };
1135        let body = format!(r#"{{"alive":{alive}}}"#);
1136        (status_code, status_text, body)
1137    }
1138
1139    /// `GET /readyz` — readiness check.
1140    fn handle_readyz(checker: &HealthChecker) -> (u16, &'static str, String) {
1141        let ready = checker.is_ready();
1142        let status_code = if ready { 200 } else { 503 };
1143        let status_text = if ready { "OK" } else { "Service Unavailable" };
1144        let body = format!(r#"{{"ready":{ready}}}"#);
1145        (status_code, status_text, body)
1146    }
1147
1148    /// `GET /livez` — liveness check.
1149    fn handle_livez(checker: &HealthChecker) -> (u16, &'static str, String) {
1150        let resp = checker.liveness_response();
1151        let status_code = if resp.alive { 200 } else { 503 };
1152        let status_text = if resp.alive {
1153            "OK"
1154        } else {
1155            "Service Unavailable"
1156        };
1157        let body = serde_json::to_string(&resp)
1158            .unwrap_or_else(|e| format!(r#"{{"error":"serialization failed: {e}"}}"#));
1159        (status_code, status_text, body)
1160    }
1161
1162    /// `GET /metrics` — health history and uptime metrics.
1163    fn handle_metrics(checker: &HealthChecker) -> (u16, &'static str, String) {
1164        let history = checker.health_history();
1165        let uptime_percent = checker.uptime_percent();
1166        let uptime_seconds = checker.uptime_seconds();
1167
1168        #[derive(Serialize)]
1169        struct MetricsResponse {
1170            uptime_seconds: u64,
1171            uptime_percent: f64,
1172            history_count: usize,
1173            history: Vec<HealthSnapshot>,
1174        }
1175
1176        let resp = MetricsResponse {
1177            uptime_seconds,
1178            uptime_percent,
1179            history_count: history.len(),
1180            history,
1181        };
1182
1183        let body = serde_json::to_string(&resp)
1184            .unwrap_or_else(|e| format!(r#"{{"error":"serialization failed: {e}"}}"#));
1185        (200, "OK", body)
1186    }
1187}
1188
1189// ---------------------------------------------------------------------------
1190// Tests
1191// ---------------------------------------------------------------------------
1192
1193#[cfg(test)]
1194#[path = "health_tests.rs"]
1195mod tests;