Skip to main content

amaters_server/
health.rs

1//! Health check endpoint
2//!
3//! Provides health status information for monitoring and orchestration systems.
4//! Supports deep health probes, readiness/liveness separation, health history
5//! tracking, and dependency health aggregation.
6
7use async_trait::async_trait;
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
15use tokio::io::{AsyncReadExt, AsyncWriteExt};
16use tokio::net::TcpListener;
17use tracing::{debug, warn};
18
19// ---------------------------------------------------------------------------
20// Health status types
21// ---------------------------------------------------------------------------
22
23/// Health status
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "lowercase")]
26pub enum HealthStatus {
27    /// Server is healthy and ready to serve requests
28    Healthy,
29    /// Server is starting up
30    Starting,
31    /// Server is shutting down
32    ShuttingDown,
33    /// Server has encountered an error
34    Unhealthy,
35    /// Server is operational but degraded
36    Degraded,
37}
38
39/// Component health status
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ComponentHealth {
42    /// Component name
43    pub name: String,
44    /// Component status
45    pub status: HealthStatus,
46    /// Optional message
47    pub message: Option<String>,
48    /// Last check timestamp
49    pub last_check: u64,
50}
51
52// ---------------------------------------------------------------------------
53// Deep health probe types
54// ---------------------------------------------------------------------------
55
56/// Result of a deep health probe
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct HealthProbeResult {
59    /// Probe status
60    pub status: ProbeStatus,
61    /// Latency of the probe in milliseconds
62    pub latency_ms: f64,
63    /// Human-readable message
64    pub message: String,
65}
66
67/// Probe status (more granular than HealthStatus)
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "lowercase")]
70pub enum ProbeStatus {
71    /// Everything is fine
72    Healthy,
73    /// Operational but degraded
74    Degraded,
75    /// Not operational
76    Unhealthy,
77}
78
79impl ProbeStatus {
80    /// Convert to a HealthStatus
81    pub fn to_health_status(self) -> HealthStatus {
82        match self {
83            ProbeStatus::Healthy => HealthStatus::Healthy,
84            ProbeStatus::Degraded => HealthStatus::Degraded,
85            ProbeStatus::Unhealthy => HealthStatus::Unhealthy,
86        }
87    }
88
89    /// Return the worse of two statuses
90    pub fn worse(self, other: ProbeStatus) -> ProbeStatus {
91        match (self, other) {
92            (ProbeStatus::Unhealthy, _) | (_, ProbeStatus::Unhealthy) => ProbeStatus::Unhealthy,
93            (ProbeStatus::Degraded, _) | (_, ProbeStatus::Degraded) => ProbeStatus::Degraded,
94            _ => ProbeStatus::Healthy,
95        }
96    }
97}
98
99/// Trait for deep health check probes
100#[async_trait]
101pub trait DeepHealthCheck: Send + Sync {
102    /// Execute the health check and return the result
103    async fn check(&self) -> HealthProbeResult;
104}
105
106// ---------------------------------------------------------------------------
107// Built-in probes
108// ---------------------------------------------------------------------------
109
110/// Storage probe — verifies storage is readable/writable by writing a test
111/// key, reading it back, and deleting it.
112pub struct StorageProbe {
113    /// Path to storage directory for the probe
114    storage_path: std::path::PathBuf,
115}
116
117impl StorageProbe {
118    /// Create a new storage probe targeting the given directory
119    pub fn new(storage_path: std::path::PathBuf) -> Self {
120        Self { storage_path }
121    }
122}
123
124#[async_trait]
125impl DeepHealthCheck for StorageProbe {
126    async fn check(&self) -> HealthProbeResult {
127        let start = Instant::now();
128        let test_file = self.storage_path.join(".health_probe_test");
129
130        // Write
131        let write_result = tokio::fs::write(&test_file, b"health_probe").await;
132        if let Err(e) = write_result {
133            return HealthProbeResult {
134                status: ProbeStatus::Unhealthy,
135                latency_ms: start.elapsed().as_secs_f64() * 1000.0,
136                message: format!("storage write failed: {e}"),
137            };
138        }
139
140        // Read back
141        let read_result = tokio::fs::read(&test_file).await;
142        match read_result {
143            Ok(data) if data == b"health_probe" => {}
144            Ok(_) => {
145                let _ = tokio::fs::remove_file(&test_file).await;
146                return HealthProbeResult {
147                    status: ProbeStatus::Unhealthy,
148                    latency_ms: start.elapsed().as_secs_f64() * 1000.0,
149                    message: "storage read returned unexpected data".to_string(),
150                };
151            }
152            Err(e) => {
153                let _ = tokio::fs::remove_file(&test_file).await;
154                return HealthProbeResult {
155                    status: ProbeStatus::Unhealthy,
156                    latency_ms: start.elapsed().as_secs_f64() * 1000.0,
157                    message: format!("storage read failed: {e}"),
158                };
159            }
160        }
161
162        // Delete
163        if let Err(e) = tokio::fs::remove_file(&test_file).await {
164            return HealthProbeResult {
165                status: ProbeStatus::Degraded,
166                latency_ms: start.elapsed().as_secs_f64() * 1000.0,
167                message: format!("storage cleanup failed (non-critical): {e}"),
168            };
169        }
170
171        HealthProbeResult {
172            status: ProbeStatus::Healthy,
173            latency_ms: start.elapsed().as_secs_f64() * 1000.0,
174            message: "storage read/write/delete OK".to_string(),
175        }
176    }
177}
178
179/// WAL probe — verifies the write-ahead log directory is appendable.
180pub struct WalProbe {
181    /// Path to the WAL directory
182    wal_path: std::path::PathBuf,
183}
184
185impl WalProbe {
186    /// Create a new WAL probe targeting the given directory
187    pub fn new(wal_path: std::path::PathBuf) -> Self {
188        Self { wal_path }
189    }
190}
191
192#[async_trait]
193impl DeepHealthCheck for WalProbe {
194    async fn check(&self) -> HealthProbeResult {
195        let start = Instant::now();
196        let test_file = self.wal_path.join(".wal_health_probe");
197
198        // Try to append
199        let result = tokio::fs::OpenOptions::new()
200            .create(true)
201            .append(true)
202            .truncate(false)
203            .open(&test_file)
204            .await;
205
206        match result {
207            Ok(_file) => {
208                let _ = tokio::fs::remove_file(&test_file).await;
209                HealthProbeResult {
210                    status: ProbeStatus::Healthy,
211                    latency_ms: start.elapsed().as_secs_f64() * 1000.0,
212                    message: "WAL directory is appendable".to_string(),
213                }
214            }
215            Err(e) => HealthProbeResult {
216                status: ProbeStatus::Unhealthy,
217                latency_ms: start.elapsed().as_secs_f64() * 1000.0,
218                message: format!("WAL append test failed: {e}"),
219            },
220        }
221    }
222}
223
224// Raw statvfs binding — avoids depending on the `libc` crate.
225#[cfg(any(target_os = "macos", target_os = "linux"))]
226unsafe extern "C" {
227    #[link_name = "statvfs"]
228    fn statvfs_raw(path: *const std::ffi::c_char, buf: *mut u8) -> std::ffi::c_int;
229}
230
231/// Disk space probe — checks available disk space against a threshold.
232pub struct DiskSpaceProbe {
233    /// Path to check disk space for
234    path: std::path::PathBuf,
235    /// Minimum required free bytes
236    min_free_bytes: u64,
237}
238
239impl DiskSpaceProbe {
240    /// Create a new disk space probe.
241    ///
242    /// `min_free_bytes` is the threshold below which the probe reports degraded
243    /// (at half the threshold) or unhealthy (at zero or below threshold / 4).
244    pub fn new(path: std::path::PathBuf, min_free_bytes: u64) -> Self {
245        Self {
246            path,
247            min_free_bytes,
248        }
249    }
250
251    /// Get available space on the filesystem containing `path`.
252    ///
253    /// Uses platform-specific raw syscalls without depending on the `libc`
254    /// crate, keeping the build pure Rust.
255    fn available_space(&self) -> Result<u64, String> {
256        self.available_space_impl()
257    }
258
259    #[cfg(target_os = "macos")]
260    fn available_space_impl(&self) -> Result<u64, String> {
261        use std::ffi::CString;
262        use std::os::unix::ffi::OsStrExt;
263
264        let c_path = CString::new(self.path.as_os_str().as_bytes())
265            .map_err(|e| format!("invalid path: {e}"))?;
266
267        // macOS statvfs layout (LP64). We only need f_frsize and f_bavail.
268        // struct statvfs is 64 bytes on macOS (all u64 fields after the
269        // initial f_bsize). We allocate a generous buffer.
270        #[repr(C)]
271        struct Statvfs {
272            f_bsize: u64,
273            f_frsize: u64,
274            f_blocks: u64,
275            f_bfree: u64,
276            f_bavail: u64,
277            // remaining fields not needed
278            _pad: [u64; 11],
279        }
280
281        let mut buf: Statvfs = unsafe { std::mem::zeroed() };
282        // macOS syscall: statvfs(2)
283        let ret = unsafe { statvfs_raw(c_path.as_ptr(), &mut buf as *mut Statvfs as *mut u8) };
284        if ret != 0 {
285            return Err(format!(
286                "statvfs failed: {}",
287                std::io::Error::last_os_error()
288            ));
289        }
290
291        let available = buf.f_bavail.saturating_mul(buf.f_frsize);
292        Ok(available)
293    }
294
295    #[cfg(target_os = "linux")]
296    fn available_space_impl(&self) -> Result<u64, String> {
297        use std::ffi::CString;
298        use std::os::unix::ffi::OsStrExt;
299
300        let c_path = CString::new(self.path.as_os_str().as_bytes())
301            .map_err(|e| format!("invalid path: {e}"))?;
302
303        #[repr(C)]
304        struct Statvfs {
305            f_bsize: u64,
306            f_frsize: u64,
307            f_blocks: u64,
308            f_bfree: u64,
309            f_bavail: u64,
310            _pad: [u64; 11],
311        }
312
313        let mut buf: Statvfs = unsafe { std::mem::zeroed() };
314        let ret = unsafe { statvfs_raw(c_path.as_ptr(), &mut buf as *mut Statvfs as *mut u8) };
315        if ret != 0 {
316            return Err(format!(
317                "statvfs failed: {}",
318                std::io::Error::last_os_error()
319            ));
320        }
321
322        let available = buf.f_bavail.saturating_mul(buf.f_frsize);
323        Ok(available)
324    }
325
326    #[cfg(not(any(target_os = "macos", target_os = "linux")))]
327    fn available_space_impl(&self) -> Result<u64, String> {
328        // On unsupported platforms, just verify the directory exists.
329        if self.path.exists() {
330            Ok(u64::MAX)
331        } else {
332            Err("path does not exist".to_string())
333        }
334    }
335}
336
337#[async_trait]
338impl DeepHealthCheck for DiskSpaceProbe {
339    async fn check(&self) -> HealthProbeResult {
340        let start = Instant::now();
341
342        match self.available_space() {
343            Ok(available) => {
344                let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
345                if available >= self.min_free_bytes {
346                    HealthProbeResult {
347                        status: ProbeStatus::Healthy,
348                        latency_ms,
349                        message: format!(
350                            "disk space OK: {} bytes available (threshold: {})",
351                            available, self.min_free_bytes
352                        ),
353                    }
354                } else if available >= self.min_free_bytes / 4 {
355                    HealthProbeResult {
356                        status: ProbeStatus::Degraded,
357                        latency_ms,
358                        message: format!(
359                            "disk space low: {} bytes available (threshold: {})",
360                            available, self.min_free_bytes
361                        ),
362                    }
363                } else {
364                    HealthProbeResult {
365                        status: ProbeStatus::Unhealthy,
366                        latency_ms,
367                        message: format!(
368                            "disk space critically low: {} bytes available (threshold: {})",
369                            available, self.min_free_bytes
370                        ),
371                    }
372                }
373            }
374            Err(e) => HealthProbeResult {
375                status: ProbeStatus::Unhealthy,
376                latency_ms: start.elapsed().as_secs_f64() * 1000.0,
377                message: format!("disk space check failed: {e}"),
378            },
379        }
380    }
381}
382
383// ---------------------------------------------------------------------------
384// Health history
385// ---------------------------------------------------------------------------
386
387/// A point-in-time health snapshot
388#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct HealthSnapshot {
390    /// Timestamp (seconds since UNIX epoch)
391    pub timestamp: u64,
392    /// Overall status at this point
393    pub status: HealthStatus,
394    /// Whether the server was alive
395    pub alive: bool,
396    /// Whether the server was ready
397    pub ready: bool,
398}
399
400/// Ring buffer for health check history
401#[derive(Debug)]
402pub struct HealthHistory {
403    /// Fixed-size buffer of snapshots (ring buffer)
404    buffer: Vec<Option<HealthSnapshot>>,
405    /// Current write position
406    write_pos: usize,
407    /// Number of entries written (may exceed capacity — used for stats)
408    total_written: usize,
409    /// Capacity
410    capacity: usize,
411}
412
413impl HealthHistory {
414    /// Create a new history buffer with the given capacity
415    pub fn new(capacity: usize) -> Self {
416        let capacity = capacity.max(1); // at least 1
417        Self {
418            buffer: (0..capacity).map(|_| None).collect(),
419            write_pos: 0,
420            total_written: 0,
421            capacity,
422        }
423    }
424
425    /// Record a new snapshot
426    pub fn record(&mut self, snapshot: HealthSnapshot) {
427        self.buffer[self.write_pos] = Some(snapshot);
428        self.write_pos = (self.write_pos + 1) % self.capacity;
429        self.total_written += 1;
430    }
431
432    /// Return all recorded snapshots in chronological order
433    pub fn snapshots(&self) -> Vec<HealthSnapshot> {
434        let count = self.total_written.min(self.capacity);
435        let mut result = Vec::with_capacity(count);
436
437        if self.total_written < self.capacity {
438            // Haven't wrapped yet — entries are 0..write_pos
439            for s in self.buffer.iter().take(self.write_pos).flatten() {
440                result.push(s.clone());
441            }
442        } else {
443            // Wrapped — oldest is at write_pos, read around
444            for i in 0..self.capacity {
445                let idx = (self.write_pos + i) % self.capacity;
446                if let Some(s) = &self.buffer[idx] {
447                    result.push(s.clone());
448                }
449            }
450        }
451
452        result
453    }
454
455    /// Calculate uptime percentage from the history buffer.
456    /// "Up" means the snapshot's `alive` field is true.
457    pub fn uptime_percent(&self) -> f64 {
458        let snaps = self.snapshots();
459        if snaps.is_empty() {
460            return 100.0;
461        }
462        let alive_count = snaps.iter().filter(|s| s.alive).count();
463        (alive_count as f64 / snaps.len() as f64) * 100.0
464    }
465}
466
467// ---------------------------------------------------------------------------
468// Dependency health
469// ---------------------------------------------------------------------------
470
471/// Health information for a single dependency
472#[derive(Debug, Clone, Serialize, Deserialize)]
473pub struct DependencyHealth {
474    /// Dependency name
475    pub name: String,
476    /// Current status
477    pub status: ProbeStatus,
478    /// Latency in milliseconds
479    pub latency_ms: f64,
480    /// Last checked timestamp (seconds since UNIX epoch)
481    pub last_checked: u64,
482    /// Human-readable message
483    pub message: String,
484}
485
486// ---------------------------------------------------------------------------
487// Liveness / readiness responses
488// ---------------------------------------------------------------------------
489
490/// Liveness check response (lightweight)
491#[derive(Debug, Clone, Serialize, Deserialize)]
492pub struct LivenessResponse {
493    /// Whether the process is alive
494    pub alive: bool,
495    /// Current status
496    pub status: HealthStatus,
497    /// Uptime in seconds
498    pub uptime_seconds: u64,
499    /// Timestamp
500    pub timestamp: u64,
501}
502
503/// Readiness check response (includes dependency detail)
504#[derive(Debug, Clone, Serialize, Deserialize)]
505pub struct ReadinessResponse {
506    /// Whether the server is ready to serve traffic
507    pub ready: bool,
508    /// Current status
509    pub status: HealthStatus,
510    /// Component statuses
511    pub components: Vec<ComponentHealth>,
512    /// Dependency statuses
513    pub dependencies: Vec<DependencyHealth>,
514    /// Timestamp
515    pub timestamp: u64,
516}
517
518// ---------------------------------------------------------------------------
519// Overall health check response (enhanced)
520// ---------------------------------------------------------------------------
521
522/// Overall health check response
523#[derive(Debug, Clone, Serialize, Deserialize)]
524pub struct HealthCheckResponse {
525    /// Overall status
526    pub status: HealthStatus,
527    /// Server version
528    pub version: String,
529    /// Uptime in seconds
530    pub uptime_seconds: u64,
531    /// Component health statuses
532    pub components: Vec<ComponentHealth>,
533    /// Dependency health statuses
534    pub dependencies: Vec<DependencyHealth>,
535    /// Deep probe results (keyed by probe name)
536    pub probes: HashMap<String, HealthProbeResult>,
537    /// Uptime percentage from history
538    pub uptime_percent: f64,
539    /// Current timestamp
540    pub timestamp: u64,
541}
542
543// ---------------------------------------------------------------------------
544// HealthChecker
545// ---------------------------------------------------------------------------
546
547/// Health checker
548///
549/// Tracks the health of various server components, runs deep probes,
550/// maintains health history, and aggregates dependency health.
551#[derive(Clone)]
552pub struct HealthChecker {
553    inner: Arc<HealthCheckerInner>,
554}
555
556struct HealthCheckerInner {
557    /// Server start time
558    start_time: AtomicU64,
559    /// Overall status (encoded as u64)
560    status: AtomicU64,
561    /// Storage health
562    storage_healthy: AtomicBool,
563    /// Network health
564    network_healthy: AtomicBool,
565    /// Whether cluster mode is enabled
566    cluster_enabled: AtomicBool,
567    /// Cluster health (only meaningful when cluster_enabled is true)
568    cluster_healthy: AtomicBool,
569    /// Registered deep health probes
570    probes: RwLock<HashMap<String, Arc<dyn DeepHealthCheck>>>,
571    /// Registered dependency checkers
572    dependency_checkers: RwLock<HashMap<String, Arc<dyn DeepHealthCheck>>>,
573    /// Cached dependency health results
574    dependency_health: RwLock<HashMap<String, DependencyHealth>>,
575    /// Health history
576    history: RwLock<HealthHistory>,
577}
578
579fn now_secs() -> u64 {
580    SystemTime::now()
581        .duration_since(UNIX_EPOCH)
582        .map(|d| d.as_secs())
583        .unwrap_or(0)
584}
585
586impl HealthChecker {
587    /// Create a new health checker with default history capacity (10)
588    pub fn new() -> Self {
589        Self::with_history_capacity(10)
590    }
591
592    /// Create a new health checker with the given history capacity
593    pub fn with_history_capacity(capacity: usize) -> Self {
594        Self {
595            inner: Arc::new(HealthCheckerInner {
596                start_time: AtomicU64::new(now_secs()),
597                status: AtomicU64::new(HealthStatus::Starting as u64),
598                storage_healthy: AtomicBool::new(false),
599                network_healthy: AtomicBool::new(false),
600                cluster_enabled: AtomicBool::new(false),
601                cluster_healthy: AtomicBool::new(false),
602                probes: RwLock::new(HashMap::new()),
603                dependency_checkers: RwLock::new(HashMap::new()),
604                dependency_health: RwLock::new(HashMap::new()),
605                history: RwLock::new(HealthHistory::new(capacity)),
606            }),
607        }
608    }
609
610    // ---- status getters/setters (same API as before) ----
611
612    /// Set overall status
613    pub fn set_status(&self, status: HealthStatus) {
614        self.inner.status.store(status as u64, Ordering::SeqCst);
615    }
616
617    /// Get current overall status
618    pub fn status(&self) -> HealthStatus {
619        match self.inner.status.load(Ordering::SeqCst) {
620            0 => HealthStatus::Healthy,
621            1 => HealthStatus::Starting,
622            2 => HealthStatus::ShuttingDown,
623            3 => HealthStatus::Unhealthy,
624            4 => HealthStatus::Degraded,
625            _ => HealthStatus::Unhealthy,
626        }
627    }
628
629    /// Mark storage as healthy
630    pub fn set_storage_healthy(&self, healthy: bool) {
631        self.inner.storage_healthy.store(healthy, Ordering::SeqCst);
632    }
633
634    /// Mark network as healthy
635    pub fn set_network_healthy(&self, healthy: bool) {
636        self.inner.network_healthy.store(healthy, Ordering::SeqCst);
637    }
638
639    /// Mark cluster mode as enabled
640    pub fn set_cluster_enabled(&self, enabled: bool) {
641        self.inner.cluster_enabled.store(enabled, Ordering::SeqCst);
642    }
643
644    /// Mark cluster as healthy
645    pub fn set_cluster_healthy(&self, healthy: bool) {
646        self.inner.cluster_healthy.store(healthy, Ordering::SeqCst);
647    }
648
649    /// Get uptime in seconds
650    pub fn uptime_seconds(&self) -> u64 {
651        let now = now_secs();
652        let start = self.inner.start_time.load(Ordering::SeqCst);
653        now.saturating_sub(start)
654    }
655
656    // ---- liveness / readiness ----
657
658    /// Check if server is alive (not shutting down or unhealthy).
659    /// This is a lightweight check suitable for liveness probes.
660    pub fn is_alive(&self) -> bool {
661        matches!(
662            self.status(),
663            HealthStatus::Healthy | HealthStatus::Starting | HealthStatus::Degraded
664        )
665    }
666
667    /// Check if server is ready to serve traffic.
668    /// Returns false during startup, shutdown, and recovery.
669    pub fn is_ready(&self) -> bool {
670        let status = self.status();
671        let base_ok = matches!(status, HealthStatus::Healthy | HealthStatus::Degraded);
672        base_ok
673            && self.inner.storage_healthy.load(Ordering::SeqCst)
674            && self.inner.network_healthy.load(Ordering::SeqCst)
675    }
676
677    /// Build a liveness response (lightweight, fast)
678    pub fn liveness_response(&self) -> LivenessResponse {
679        LivenessResponse {
680            alive: self.is_alive(),
681            status: self.status(),
682            uptime_seconds: self.uptime_seconds(),
683            timestamp: now_secs(),
684        }
685    }
686
687    /// Build a readiness response (includes component and dependency info)
688    pub fn readiness_response(&self) -> ReadinessResponse {
689        let components = self.build_component_list();
690        let dependencies: Vec<DependencyHealth> = self
691            .inner
692            .dependency_health
693            .read()
694            .values()
695            .cloned()
696            .collect();
697
698        ReadinessResponse {
699            ready: self.is_ready(),
700            status: self.status(),
701            components,
702            dependencies,
703            timestamp: now_secs(),
704        }
705    }
706
707    // ---- deep probes ----
708
709    /// Register a deep health probe under the given name
710    pub fn register_probe(&self, name: impl Into<String>, probe: Arc<dyn DeepHealthCheck>) {
711        self.inner.probes.write().insert(name.into(), probe);
712    }
713
714    /// Run all registered deep probes and return their results
715    pub async fn run_probes(&self) -> HashMap<String, HealthProbeResult> {
716        let probes: Vec<(String, Arc<dyn DeepHealthCheck>)> = {
717            let guard = self.inner.probes.read();
718            guard
719                .iter()
720                .map(|(k, v)| (k.clone(), Arc::clone(v)))
721                .collect()
722        };
723
724        let mut results = HashMap::with_capacity(probes.len());
725        for (name, probe) in probes {
726            let result = probe.check().await;
727            results.insert(name, result);
728        }
729        results
730    }
731
732    // ---- dependency health ----
733
734    /// Register a dependency health checker
735    pub fn register_dependency(&self, name: impl Into<String>, checker: Arc<dyn DeepHealthCheck>) {
736        let name = name.into();
737        self.inner
738            .dependency_checkers
739            .write()
740            .insert(name.clone(), checker);
741        // Initialize with unknown state
742        self.inner.dependency_health.write().insert(
743            name.clone(),
744            DependencyHealth {
745                name,
746                status: ProbeStatus::Unhealthy,
747                latency_ms: 0.0,
748                last_checked: 0,
749                message: "not yet checked".to_string(),
750            },
751        );
752    }
753
754    /// Run all dependency checks and update cached results.
755    /// Returns the aggregated worst status.
756    pub async fn check_dependencies(&self) -> ProbeStatus {
757        let checkers: Vec<(String, Arc<dyn DeepHealthCheck>)> = {
758            let guard = self.inner.dependency_checkers.read();
759            guard
760                .iter()
761                .map(|(k, v)| (k.clone(), Arc::clone(v)))
762                .collect()
763        };
764
765        let mut worst = ProbeStatus::Healthy;
766        let now = now_secs();
767
768        for (name, checker) in checkers {
769            let result = checker.check().await;
770            worst = worst.worse(result.status);
771            let dep = DependencyHealth {
772                name: name.clone(),
773                status: result.status,
774                latency_ms: result.latency_ms,
775                last_checked: now,
776                message: result.message,
777            };
778            self.inner.dependency_health.write().insert(name, dep);
779        }
780
781        worst
782    }
783
784    /// Get the current aggregated dependency status (from cached results)
785    pub fn aggregated_dependency_status(&self) -> ProbeStatus {
786        let guard = self.inner.dependency_health.read();
787        guard
788            .values()
789            .fold(ProbeStatus::Healthy, |acc, d| acc.worse(d.status))
790    }
791
792    // ---- health history ----
793
794    /// Record a snapshot of the current health state into the history buffer
795    pub fn record_snapshot(&self) {
796        let snapshot = HealthSnapshot {
797            timestamp: now_secs(),
798            status: self.status(),
799            alive: self.is_alive(),
800            ready: self.is_ready(),
801        };
802        self.inner.history.write().record(snapshot);
803    }
804
805    /// Get the health check history as a chronologically ordered list
806    pub fn health_history(&self) -> Vec<HealthSnapshot> {
807        self.inner.history.read().snapshots()
808    }
809
810    /// Get the uptime percentage from the history buffer
811    pub fn uptime_percent(&self) -> f64 {
812        self.inner.history.read().uptime_percent()
813    }
814
815    // ---- full health response ----
816
817    /// Get full health check response (enhanced with probes, deps, history)
818    pub fn get_health(&self) -> HealthCheckResponse {
819        let components = self.build_component_list();
820        let dependencies: Vec<DependencyHealth> = self
821            .inner
822            .dependency_health
823            .read()
824            .values()
825            .cloned()
826            .collect();
827        let probes = HashMap::new(); // synchronous path — probes not run
828        let uptime_pct = self.uptime_percent();
829
830        HealthCheckResponse {
831            status: self.status(),
832            version: env!("CARGO_PKG_VERSION").to_string(),
833            uptime_seconds: self.uptime_seconds(),
834            components,
835            dependencies,
836            probes,
837            uptime_percent: uptime_pct,
838            timestamp: now_secs(),
839        }
840    }
841
842    /// Get full health check response including deep probe results (async)
843    pub async fn get_health_deep(&self) -> HealthCheckResponse {
844        let components = self.build_component_list();
845        let dependencies: Vec<DependencyHealth> = self
846            .inner
847            .dependency_health
848            .read()
849            .values()
850            .cloned()
851            .collect();
852        let probes = self.run_probes().await;
853        let uptime_pct = self.uptime_percent();
854
855        HealthCheckResponse {
856            status: self.status(),
857            version: env!("CARGO_PKG_VERSION").to_string(),
858            uptime_seconds: self.uptime_seconds(),
859            components,
860            dependencies,
861            probes,
862            uptime_percent: uptime_pct,
863            timestamp: now_secs(),
864        }
865    }
866
867    /// Format health as JSON
868    pub fn get_health_json(&self) -> Result<String, serde_json::Error> {
869        serde_json::to_string_pretty(&self.get_health())
870    }
871
872    // ---- helpers ----
873
874    fn build_component_list(&self) -> Vec<ComponentHealth> {
875        let now = now_secs();
876
877        let storage_status = if self.inner.storage_healthy.load(Ordering::SeqCst) {
878            HealthStatus::Healthy
879        } else {
880            HealthStatus::Unhealthy
881        };
882
883        let network_status = if self.inner.network_healthy.load(Ordering::SeqCst) {
884            HealthStatus::Healthy
885        } else {
886            HealthStatus::Unhealthy
887        };
888
889        let cluster_healthy = self.inner.cluster_healthy.load(Ordering::SeqCst);
890        let cluster_enabled = self.inner.cluster_enabled.load(Ordering::SeqCst);
891        let cluster_status = if cluster_enabled {
892            if cluster_healthy {
893                HealthStatus::Healthy
894            } else {
895                HealthStatus::Unhealthy
896            }
897        } else {
898            HealthStatus::Starting // Cluster is optional, not enabled
899        };
900
901        let cluster_message = if cluster_enabled {
902            if cluster_healthy {
903                "cluster active".to_string()
904            } else {
905                "cluster unhealthy".to_string()
906            }
907        } else {
908            "cluster disabled (standalone mode)".to_string()
909        };
910
911        vec![
912            ComponentHealth {
913                name: "storage".to_string(),
914                status: storage_status,
915                message: None,
916                last_check: now,
917            },
918            ComponentHealth {
919                name: "network".to_string(),
920                status: network_status,
921                message: None,
922                last_check: now,
923            },
924            ComponentHealth {
925                name: "cluster".to_string(),
926                status: cluster_status,
927                message: Some(cluster_message),
928                last_check: now,
929            },
930        ]
931    }
932}
933
934impl Default for HealthChecker {
935    fn default() -> Self {
936        Self::new()
937    }
938}
939
940// ---------------------------------------------------------------------------
941// HTTP health check server
942// ---------------------------------------------------------------------------
943
944/// Lightweight HTTP server that exposes health check endpoints.
945///
946/// Routes:
947/// - `GET /health`  — full health status (JSON)
948/// - `GET /healthz` — simple alive check (200 or 503)
949/// - `GET /readyz`  — readiness check (200 or 503)
950/// - `GET /livez`   — liveness check (200 or 503)
951/// - `GET /metrics` — health metrics (history, uptime percentage)
952pub struct HealthHttpServer {
953    checker: Arc<HealthChecker>,
954    bind_addr: SocketAddr,
955    shutdown: Arc<AtomicBool>,
956}
957
958/// Handle returned by [`HealthHttpServer::start`] to control the running server.
959pub struct HealthHttpHandle {
960    shutdown: Arc<AtomicBool>,
961    port: u16,
962    join_handle: tokio::task::JoinHandle<Result<(), std::io::Error>>,
963}
964
965impl HealthHttpHandle {
966    /// Signal the HTTP health server to stop accepting new connections.
967    pub fn stop(&self) {
968        self.shutdown.store(true, Ordering::SeqCst);
969    }
970
971    /// Return the port the server is listening on.
972    pub fn port(&self) -> u16 {
973        self.port
974    }
975
976    /// Wait for the server task to finish after calling [`stop`](Self::stop).
977    pub async fn join(self) -> Result<(), std::io::Error> {
978        match self.join_handle.await {
979            Ok(inner) => inner,
980            Err(e) => Err(std::io::Error::other(e)),
981        }
982    }
983}
984
985impl HealthHttpServer {
986    /// Create a new health HTTP server.
987    ///
988    /// `bind_addr` is the address to listen on (e.g. `0.0.0.0:8081`).
989    pub fn new(checker: Arc<HealthChecker>, bind_addr: SocketAddr) -> Self {
990        Self {
991            checker,
992            bind_addr,
993            shutdown: Arc::new(AtomicBool::new(false)),
994        }
995    }
996
997    /// Start the server in a background tokio task.
998    ///
999    /// Returns a [`HealthHttpHandle`] that can be used to query the port and
1000    /// signal shutdown.
1001    pub async fn start(self) -> Result<HealthHttpHandle, std::io::Error> {
1002        let listener = TcpListener::bind(self.bind_addr).await?;
1003        let local_addr = listener.local_addr()?;
1004        let port = local_addr.port();
1005        let shutdown = Arc::clone(&self.shutdown);
1006        let checker = Arc::clone(&self.checker);
1007
1008        let shutdown_flag = Arc::clone(&shutdown);
1009        let join_handle =
1010            tokio::spawn(async move { Self::accept_loop(listener, checker, shutdown_flag).await });
1011
1012        Ok(HealthHttpHandle {
1013            shutdown,
1014            port,
1015            join_handle,
1016        })
1017    }
1018
1019    /// Main accept loop.
1020    async fn accept_loop(
1021        listener: TcpListener,
1022        checker: Arc<HealthChecker>,
1023        shutdown: Arc<AtomicBool>,
1024    ) -> Result<(), std::io::Error> {
1025        loop {
1026            if shutdown.load(Ordering::SeqCst) {
1027                debug!("health HTTP server shutting down");
1028                break;
1029            }
1030
1031            // Use a short timeout so we can check the shutdown flag periodically.
1032            let accept_result =
1033                tokio::time::timeout(Duration::from_millis(200), listener.accept()).await;
1034
1035            match accept_result {
1036                Ok(Ok((stream, _addr))) => {
1037                    let checker = Arc::clone(&checker);
1038                    tokio::spawn(async move {
1039                        if let Err(e) = Self::handle_connection(stream, &checker).await {
1040                            warn!("health HTTP connection error: {e}");
1041                        }
1042                    });
1043                }
1044                Ok(Err(e)) => {
1045                    warn!("health HTTP accept error: {e}");
1046                }
1047                Err(_) => {
1048                    // Timeout — loop back to check shutdown flag
1049                }
1050            }
1051        }
1052        Ok(())
1053    }
1054
1055    /// Handle a single TCP connection: read the HTTP request, route, respond.
1056    async fn handle_connection(
1057        mut stream: tokio::net::TcpStream,
1058        checker: &HealthChecker,
1059    ) -> Result<(), std::io::Error> {
1060        let mut buf = [0u8; 4096];
1061        let n = stream.read(&mut buf).await?;
1062        if n == 0 {
1063            return Ok(());
1064        }
1065
1066        let request = String::from_utf8_lossy(&buf[..n]);
1067        let (method, path) = Self::parse_request_line(&request);
1068
1069        let (status_code, status_text, body) = match method {
1070            "GET" => Self::route(path, checker),
1071            _ => (
1072                405,
1073                "Method Not Allowed",
1074                r#"{"error":"method not allowed"}"#.to_string(),
1075            ),
1076        };
1077
1078        let response = format!(
1079            "HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1080            status_code,
1081            status_text,
1082            body.len(),
1083            body
1084        );
1085
1086        stream.write_all(response.as_bytes()).await?;
1087        stream.flush().await?;
1088        Ok(())
1089    }
1090
1091    /// Parse the request line (first line of the HTTP request).
1092    /// Returns (method, path). Defaults to ("", "") on malformed input.
1093    fn parse_request_line(request: &str) -> (&str, &str) {
1094        let first_line = request.lines().next().unwrap_or("");
1095        let mut parts = first_line.split_whitespace();
1096        let method = parts.next().unwrap_or("");
1097        let path = parts.next().unwrap_or("");
1098        (method, path)
1099    }
1100
1101    /// Route a GET request to the appropriate handler.
1102    fn route(path: &str, checker: &HealthChecker) -> (u16, &'static str, String) {
1103        match path {
1104            "/health" => Self::handle_health(checker),
1105            "/healthz" => Self::handle_healthz(checker),
1106            "/readyz" => Self::handle_readyz(checker),
1107            "/livez" => Self::handle_livez(checker),
1108            "/metrics" => Self::handle_metrics(checker),
1109            _ => (404, "Not Found", r#"{"error":"not found"}"#.to_string()),
1110        }
1111    }
1112
1113    /// `GET /health` — full health status JSON.
1114    fn handle_health(checker: &HealthChecker) -> (u16, &'static str, String) {
1115        let health = checker.get_health();
1116        let status_code = match health.status {
1117            HealthStatus::Healthy | HealthStatus::Degraded => 200,
1118            _ => 503,
1119        };
1120        let status_text = if status_code == 200 {
1121            "OK"
1122        } else {
1123            "Service Unavailable"
1124        };
1125        let body = serde_json::to_string(&health)
1126            .unwrap_or_else(|e| format!(r#"{{"error":"serialization failed: {e}"}}"#));
1127        (status_code, status_text, body)
1128    }
1129
1130    /// `GET /healthz` — simple alive check.
1131    fn handle_healthz(checker: &HealthChecker) -> (u16, &'static str, String) {
1132        let alive = checker.is_alive();
1133        let status_code = if alive { 200 } else { 503 };
1134        let status_text = if alive { "OK" } else { "Service Unavailable" };
1135        let body = format!(r#"{{"alive":{alive}}}"#);
1136        (status_code, status_text, body)
1137    }
1138
1139    /// `GET /readyz` — readiness check.
1140    fn handle_readyz(checker: &HealthChecker) -> (u16, &'static str, String) {
1141        let ready = checker.is_ready();
1142        let status_code = if ready { 200 } else { 503 };
1143        let status_text = if ready { "OK" } else { "Service Unavailable" };
1144        let body = format!(r#"{{"ready":{ready}}}"#);
1145        (status_code, status_text, body)
1146    }
1147
1148    /// `GET /livez` — liveness check.
1149    fn handle_livez(checker: &HealthChecker) -> (u16, &'static str, String) {
1150        let resp = checker.liveness_response();
1151        let status_code = if resp.alive { 200 } else { 503 };
1152        let status_text = if resp.alive {
1153            "OK"
1154        } else {
1155            "Service Unavailable"
1156        };
1157        let body = serde_json::to_string(&resp)
1158            .unwrap_or_else(|e| format!(r#"{{"error":"serialization failed: {e}"}}"#));
1159        (status_code, status_text, body)
1160    }
1161
1162    /// `GET /metrics` — health history and uptime metrics.
1163    fn handle_metrics(checker: &HealthChecker) -> (u16, &'static str, String) {
1164        let history = checker.health_history();
1165        let uptime_percent = checker.uptime_percent();
1166        let uptime_seconds = checker.uptime_seconds();
1167
1168        #[derive(Serialize)]
1169        struct MetricsResponse {
1170            uptime_seconds: u64,
1171            uptime_percent: f64,
1172            history_count: usize,
1173            history: Vec<HealthSnapshot>,
1174        }
1175
1176        let resp = MetricsResponse {
1177            uptime_seconds,
1178            uptime_percent,
1179            history_count: history.len(),
1180            history,
1181        };
1182
1183        let body = serde_json::to_string(&resp)
1184            .unwrap_or_else(|e| format!(r#"{{"error":"serialization failed: {e}"}}"#));
1185        (200, "OK", body)
1186    }
1187}
1188
1189// ---------------------------------------------------------------------------
1190// Tests
1191// ---------------------------------------------------------------------------
1192
1193#[cfg(test)]
1194mod tests {
1195    use super::*;
1196    use std::thread::sleep;
1197
1198    // ---- Original tests (preserved) ----
1199
1200    #[test]
1201    fn test_health_checker_creation() {
1202        let checker = HealthChecker::new();
1203        assert_eq!(checker.status(), HealthStatus::Starting);
1204        assert!(!checker.is_ready());
1205        assert!(checker.is_alive());
1206    }
1207
1208    #[test]
1209    fn test_set_status() {
1210        let checker = HealthChecker::new();
1211
1212        checker.set_status(HealthStatus::Healthy);
1213        assert_eq!(checker.status(), HealthStatus::Healthy);
1214
1215        checker.set_status(HealthStatus::ShuttingDown);
1216        assert_eq!(checker.status(), HealthStatus::ShuttingDown);
1217
1218        checker.set_status(HealthStatus::Unhealthy);
1219        assert_eq!(checker.status(), HealthStatus::Unhealthy);
1220    }
1221
1222    #[test]
1223    fn test_component_health() {
1224        let checker = HealthChecker::new();
1225
1226        checker.set_storage_healthy(true);
1227        checker.set_network_healthy(true);
1228        checker.set_cluster_healthy(true);
1229        checker.set_status(HealthStatus::Healthy);
1230
1231        assert!(checker.is_ready());
1232        assert!(checker.is_alive());
1233    }
1234
1235    #[test]
1236    fn test_not_ready_when_components_unhealthy() {
1237        let checker = HealthChecker::new();
1238
1239        checker.set_status(HealthStatus::Healthy);
1240        checker.set_storage_healthy(false); // Storage not healthy
1241
1242        assert!(!checker.is_ready());
1243    }
1244
1245    #[test]
1246    fn test_uptime() {
1247        let checker = HealthChecker::new();
1248        sleep(Duration::from_millis(100));
1249
1250        let uptime = checker.uptime_seconds();
1251        // Uptime should be a reasonable value (u64 is always >= 0)
1252        assert!(uptime < 1000); // Should be less than 1000 seconds
1253    }
1254
1255    #[test]
1256    fn test_health_response() {
1257        let checker = HealthChecker::new();
1258        checker.set_storage_healthy(true);
1259        checker.set_network_healthy(true);
1260        checker.set_status(HealthStatus::Healthy);
1261
1262        let health = checker.get_health();
1263        assert_eq!(health.status, HealthStatus::Healthy);
1264        assert_eq!(health.components.len(), 3);
1265        assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
1266    }
1267
1268    #[test]
1269    fn test_health_json() {
1270        let checker = HealthChecker::new();
1271        let json = checker.get_health_json();
1272        assert!(json.is_ok());
1273
1274        let json_str = json.expect("JSON serialization failed");
1275        assert!(json_str.contains("status"));
1276        assert!(json_str.contains("version"));
1277        assert!(json_str.contains("components"));
1278    }
1279
1280    #[test]
1281    fn test_is_alive() {
1282        let checker = HealthChecker::new();
1283
1284        checker.set_status(HealthStatus::Starting);
1285        assert!(checker.is_alive());
1286
1287        checker.set_status(HealthStatus::Healthy);
1288        assert!(checker.is_alive());
1289
1290        checker.set_status(HealthStatus::ShuttingDown);
1291        assert!(!checker.is_alive());
1292
1293        checker.set_status(HealthStatus::Unhealthy);
1294        assert!(!checker.is_alive());
1295    }
1296
1297    // ---- Deep probe tests ----
1298
1299    /// A simple test probe that always returns healthy
1300    struct AlwaysHealthyProbe;
1301
1302    #[async_trait]
1303    impl DeepHealthCheck for AlwaysHealthyProbe {
1304        async fn check(&self) -> HealthProbeResult {
1305            HealthProbeResult {
1306                status: ProbeStatus::Healthy,
1307                latency_ms: 0.1,
1308                message: "always healthy".to_string(),
1309            }
1310        }
1311    }
1312
1313    /// A probe that returns unhealthy
1314    struct AlwaysUnhealthyProbe;
1315
1316    #[async_trait]
1317    impl DeepHealthCheck for AlwaysUnhealthyProbe {
1318        async fn check(&self) -> HealthProbeResult {
1319            HealthProbeResult {
1320                status: ProbeStatus::Unhealthy,
1321                latency_ms: 5.0,
1322                message: "always unhealthy".to_string(),
1323            }
1324        }
1325    }
1326
1327    /// A probe that returns degraded
1328    struct AlwaysDegradedProbe;
1329
1330    #[async_trait]
1331    impl DeepHealthCheck for AlwaysDegradedProbe {
1332        async fn check(&self) -> HealthProbeResult {
1333            HealthProbeResult {
1334                status: ProbeStatus::Degraded,
1335                latency_ms: 2.0,
1336                message: "always degraded".to_string(),
1337            }
1338        }
1339    }
1340
1341    #[tokio::test]
1342    async fn test_deep_probe_execution_and_result_reporting() {
1343        let checker = HealthChecker::new();
1344        checker.register_probe("test_healthy", Arc::new(AlwaysHealthyProbe));
1345        checker.register_probe("test_unhealthy", Arc::new(AlwaysUnhealthyProbe));
1346
1347        let results = checker.run_probes().await;
1348        assert_eq!(results.len(), 2);
1349
1350        let healthy = results.get("test_healthy").expect("missing healthy probe");
1351        assert_eq!(healthy.status, ProbeStatus::Healthy);
1352        assert_eq!(healthy.message, "always healthy");
1353
1354        let unhealthy = results
1355            .get("test_unhealthy")
1356            .expect("missing unhealthy probe");
1357        assert_eq!(unhealthy.status, ProbeStatus::Unhealthy);
1358        assert_eq!(unhealthy.message, "always unhealthy");
1359    }
1360
1361    #[tokio::test]
1362    async fn test_storage_probe_passes_with_valid_storage() {
1363        let dir = std::env::temp_dir().join("amaters_health_test_storage");
1364        let _ = std::fs::create_dir_all(&dir);
1365
1366        let probe = StorageProbe::new(dir.clone());
1367        let result = probe.check().await;
1368
1369        assert_eq!(result.status, ProbeStatus::Healthy);
1370        assert!(result.latency_ms >= 0.0);
1371        assert!(result.message.contains("OK"));
1372
1373        let _ = std::fs::remove_dir_all(&dir);
1374    }
1375
1376    #[tokio::test]
1377    async fn test_storage_probe_fails_with_invalid_path() {
1378        let probe = StorageProbe::new(std::path::PathBuf::from(
1379            "/nonexistent_path_for_health_check_test_12345",
1380        ));
1381        let result = probe.check().await;
1382        assert_eq!(result.status, ProbeStatus::Unhealthy);
1383    }
1384
1385    #[tokio::test]
1386    async fn test_wal_probe_passes() {
1387        let dir = std::env::temp_dir().join("amaters_health_test_wal");
1388        let _ = std::fs::create_dir_all(&dir);
1389
1390        let probe = WalProbe::new(dir.clone());
1391        let result = probe.check().await;
1392
1393        assert_eq!(result.status, ProbeStatus::Healthy);
1394        assert!(result.message.contains("appendable"));
1395
1396        let _ = std::fs::remove_dir_all(&dir);
1397    }
1398
1399    #[tokio::test]
1400    async fn test_disk_space_probe_healthy() {
1401        // Threshold of 1 byte — should always pass on a running system
1402        let probe = DiskSpaceProbe::new(std::env::temp_dir(), 1);
1403        let result = probe.check().await;
1404        assert_eq!(result.status, ProbeStatus::Healthy);
1405    }
1406
1407    // ---- Liveness vs readiness ----
1408
1409    #[test]
1410    fn test_liveness_vs_readiness_during_startup() {
1411        let checker = HealthChecker::new();
1412        // Starting state: alive but not ready
1413        assert!(checker.is_alive());
1414        assert!(!checker.is_ready());
1415
1416        let live_resp = checker.liveness_response();
1417        assert!(live_resp.alive);
1418
1419        let ready_resp = checker.readiness_response();
1420        assert!(!ready_resp.ready);
1421    }
1422
1423    #[test]
1424    fn test_liveness_vs_readiness_during_shutdown() {
1425        let checker = HealthChecker::new();
1426        checker.set_status(HealthStatus::ShuttingDown);
1427
1428        assert!(!checker.is_alive());
1429        assert!(!checker.is_ready());
1430
1431        let live_resp = checker.liveness_response();
1432        assert!(!live_resp.alive);
1433
1434        let ready_resp = checker.readiness_response();
1435        assert!(!ready_resp.ready);
1436    }
1437
1438    #[test]
1439    fn test_readiness_requires_components() {
1440        let checker = HealthChecker::new();
1441        checker.set_status(HealthStatus::Healthy);
1442        // Storage and network still false
1443        assert!(!checker.is_ready());
1444
1445        checker.set_storage_healthy(true);
1446        assert!(!checker.is_ready()); // network still down
1447
1448        checker.set_network_healthy(true);
1449        assert!(checker.is_ready()); // now ready
1450    }
1451
1452    // ---- Health history ring buffer ----
1453
1454    #[test]
1455    fn test_health_history_ring_buffer_correctness() {
1456        let mut history = HealthHistory::new(3);
1457
1458        // Record 5 entries — buffer should keep last 3
1459        for i in 0..5u64 {
1460            history.record(HealthSnapshot {
1461                timestamp: i,
1462                status: HealthStatus::Healthy,
1463                alive: true,
1464                ready: true,
1465            });
1466        }
1467
1468        let snaps = history.snapshots();
1469        assert_eq!(snaps.len(), 3);
1470        // Oldest should be timestamp 2
1471        assert_eq!(snaps[0].timestamp, 2);
1472        assert_eq!(snaps[1].timestamp, 3);
1473        assert_eq!(snaps[2].timestamp, 4);
1474    }
1475
1476    #[test]
1477    fn test_health_history_partial_fill() {
1478        let mut history = HealthHistory::new(10);
1479
1480        history.record(HealthSnapshot {
1481            timestamp: 100,
1482            status: HealthStatus::Healthy,
1483            alive: true,
1484            ready: true,
1485        });
1486        history.record(HealthSnapshot {
1487            timestamp: 200,
1488            status: HealthStatus::Unhealthy,
1489            alive: false,
1490            ready: false,
1491        });
1492
1493        let snaps = history.snapshots();
1494        assert_eq!(snaps.len(), 2);
1495        assert_eq!(snaps[0].timestamp, 100);
1496        assert_eq!(snaps[1].timestamp, 200);
1497    }
1498
1499    // ---- Uptime percentage ----
1500
1501    #[test]
1502    fn test_uptime_percentage_all_alive() {
1503        let mut history = HealthHistory::new(5);
1504        for i in 0..5 {
1505            history.record(HealthSnapshot {
1506                timestamp: i,
1507                status: HealthStatus::Healthy,
1508                alive: true,
1509                ready: true,
1510            });
1511        }
1512        let pct = history.uptime_percent();
1513        assert!((pct - 100.0).abs() < f64::EPSILON);
1514    }
1515
1516    #[test]
1517    fn test_uptime_percentage_partial() {
1518        let mut history = HealthHistory::new(4);
1519        // 3 alive, 1 dead => 75%
1520        for i in 0..3 {
1521            history.record(HealthSnapshot {
1522                timestamp: i,
1523                status: HealthStatus::Healthy,
1524                alive: true,
1525                ready: true,
1526            });
1527        }
1528        history.record(HealthSnapshot {
1529            timestamp: 3,
1530            status: HealthStatus::Unhealthy,
1531            alive: false,
1532            ready: false,
1533        });
1534
1535        let pct = history.uptime_percent();
1536        assert!((pct - 75.0).abs() < 0.01);
1537    }
1538
1539    #[test]
1540    fn test_uptime_percentage_empty_is_100() {
1541        let history = HealthHistory::new(10);
1542        assert!((history.uptime_percent() - 100.0).abs() < f64::EPSILON);
1543    }
1544
1545    #[test]
1546    fn test_health_checker_uptime_percent_and_history() {
1547        let checker = HealthChecker::new();
1548        checker.set_status(HealthStatus::Healthy);
1549        checker.set_storage_healthy(true);
1550        checker.set_network_healthy(true);
1551
1552        checker.record_snapshot();
1553        checker.record_snapshot();
1554
1555        checker.set_status(HealthStatus::Unhealthy);
1556        checker.record_snapshot();
1557
1558        let history = checker.health_history();
1559        assert_eq!(history.len(), 3);
1560
1561        // 2 alive, 1 not alive
1562        let pct = checker.uptime_percent();
1563        assert!((pct - 100.0 * 2.0 / 3.0).abs() < 0.01);
1564    }
1565
1566    // ---- Dependency aggregation ----
1567
1568    #[tokio::test]
1569    async fn test_dependency_aggregation_one_unhealthy() {
1570        let checker = HealthChecker::new();
1571        checker.register_dependency("dep_ok", Arc::new(AlwaysHealthyProbe));
1572        checker.register_dependency("dep_bad", Arc::new(AlwaysUnhealthyProbe));
1573
1574        let worst = checker.check_dependencies().await;
1575        assert_eq!(worst, ProbeStatus::Unhealthy);
1576
1577        // Aggregated should also be unhealthy
1578        assert_eq!(
1579            checker.aggregated_dependency_status(),
1580            ProbeStatus::Unhealthy
1581        );
1582    }
1583
1584    #[tokio::test]
1585    async fn test_dependency_aggregation_all_healthy() {
1586        let checker = HealthChecker::new();
1587        checker.register_dependency("dep_a", Arc::new(AlwaysHealthyProbe));
1588        checker.register_dependency("dep_b", Arc::new(AlwaysHealthyProbe));
1589
1590        let worst = checker.check_dependencies().await;
1591        assert_eq!(worst, ProbeStatus::Healthy);
1592    }
1593
1594    #[tokio::test]
1595    async fn test_dependency_health_in_readiness_response() {
1596        let checker = HealthChecker::new();
1597        checker.set_status(HealthStatus::Healthy);
1598        checker.set_storage_healthy(true);
1599        checker.set_network_healthy(true);
1600        checker.register_dependency("cache", Arc::new(AlwaysHealthyProbe));
1601
1602        let _ = checker.check_dependencies().await;
1603
1604        let resp = checker.readiness_response();
1605        assert!(resp.ready);
1606        assert_eq!(resp.dependencies.len(), 1);
1607        assert_eq!(resp.dependencies[0].name, "cache");
1608        assert_eq!(resp.dependencies[0].status, ProbeStatus::Healthy);
1609    }
1610
1611    // ---- Degraded state ----
1612
1613    #[test]
1614    fn test_degraded_state_alive_and_ready() {
1615        let checker = HealthChecker::new();
1616        checker.set_status(HealthStatus::Degraded);
1617        checker.set_storage_healthy(true);
1618        checker.set_network_healthy(true);
1619
1620        // Degraded is alive and can be ready
1621        assert!(checker.is_alive());
1622        assert!(checker.is_ready());
1623    }
1624
1625    #[tokio::test]
1626    async fn test_degraded_dependency_aggregation() {
1627        let checker = HealthChecker::new();
1628        checker.register_dependency("dep_ok", Arc::new(AlwaysHealthyProbe));
1629        checker.register_dependency("dep_degraded", Arc::new(AlwaysDegradedProbe));
1630
1631        let worst = checker.check_dependencies().await;
1632        assert_eq!(worst, ProbeStatus::Degraded);
1633    }
1634
1635    // ---- Concurrent health checks ----
1636
1637    #[tokio::test]
1638    async fn test_concurrent_health_checks() {
1639        let checker = HealthChecker::new();
1640        checker.set_status(HealthStatus::Healthy);
1641        checker.set_storage_healthy(true);
1642        checker.set_network_healthy(true);
1643        checker.register_probe("probe_a", Arc::new(AlwaysHealthyProbe));
1644        checker.register_dependency("dep_a", Arc::new(AlwaysHealthyProbe));
1645
1646        // Run multiple operations concurrently
1647        let checker_clone1 = checker.clone();
1648        let checker_clone2 = checker.clone();
1649        let checker_clone3 = checker.clone();
1650
1651        let (r1, r2, r3) = tokio::join!(
1652            async move { checker_clone1.run_probes().await },
1653            async move { checker_clone2.check_dependencies().await },
1654            async move {
1655                checker_clone3.record_snapshot();
1656                checker_clone3.health_history()
1657            },
1658        );
1659
1660        assert_eq!(r1.len(), 1);
1661        assert_eq!(r2, ProbeStatus::Healthy);
1662        assert!(!r3.is_empty());
1663    }
1664
1665    // ---- ProbeStatus::worse ----
1666
1667    #[test]
1668    fn test_probe_status_worse() {
1669        assert_eq!(
1670            ProbeStatus::Healthy.worse(ProbeStatus::Healthy),
1671            ProbeStatus::Healthy
1672        );
1673        assert_eq!(
1674            ProbeStatus::Healthy.worse(ProbeStatus::Degraded),
1675            ProbeStatus::Degraded
1676        );
1677        assert_eq!(
1678            ProbeStatus::Degraded.worse(ProbeStatus::Healthy),
1679            ProbeStatus::Degraded
1680        );
1681        assert_eq!(
1682            ProbeStatus::Healthy.worse(ProbeStatus::Unhealthy),
1683            ProbeStatus::Unhealthy
1684        );
1685        assert_eq!(
1686            ProbeStatus::Degraded.worse(ProbeStatus::Unhealthy),
1687            ProbeStatus::Unhealthy
1688        );
1689    }
1690
1691    // ---- Deep health response ----
1692
1693    #[tokio::test]
1694    async fn test_get_health_deep_includes_probes() {
1695        let checker = HealthChecker::new();
1696        checker.register_probe("deep_test", Arc::new(AlwaysHealthyProbe));
1697
1698        let resp = checker.get_health_deep().await;
1699        assert_eq!(resp.probes.len(), 1);
1700        let probe_result = resp.probes.get("deep_test").expect("missing probe result");
1701        assert_eq!(probe_result.status, ProbeStatus::Healthy);
1702    }
1703
1704    // ---- HTTP health server tests ----
1705
1706    async fn start_test_server(checker: HealthChecker) -> HealthHttpHandle {
1707        let addr: SocketAddr = "127.0.0.1:0".parse().expect("valid addr");
1708        HealthHttpServer::new(Arc::new(checker), addr)
1709            .start()
1710            .await
1711            .expect("failed to start health HTTP server")
1712    }
1713
1714    async fn http_request(port: u16, method: &str, path: &str) -> (u16, String) {
1715        let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{port}"))
1716            .await
1717            .expect("failed to connect");
1718        let req =
1719            format!("{method} {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n");
1720        stream.write_all(req.as_bytes()).await.expect("write");
1721        let mut resp = String::new();
1722        stream.read_to_string(&mut resp).await.expect("read");
1723        let line = resp.lines().next().unwrap_or("");
1724        let code: u16 = line
1725            .split_whitespace()
1726            .nth(1)
1727            .and_then(|s| s.parse().ok())
1728            .unwrap_or(0);
1729        let body = resp.split("\r\n\r\n").nth(1).unwrap_or("").to_string();
1730        (code, body)
1731    }
1732
1733    async fn http_get(port: u16, path: &str) -> (u16, String) {
1734        http_request(port, "GET", path).await
1735    }
1736
1737    #[tokio::test]
1738    async fn test_health_http_server_starts() {
1739        let checker = HealthChecker::new();
1740        let handle = start_test_server(checker).await;
1741        let port = handle.port();
1742        assert!(port > 0);
1743
1744        // Verify we can connect
1745        let result = tokio::net::TcpStream::connect(format!("127.0.0.1:{port}")).await;
1746        assert!(result.is_ok());
1747
1748        handle.stop();
1749        let _ = handle.join().await;
1750    }
1751
1752    #[tokio::test]
1753    async fn test_health_endpoint() {
1754        let checker = HealthChecker::new();
1755        checker.set_status(HealthStatus::Healthy);
1756        checker.set_storage_healthy(true);
1757        checker.set_network_healthy(true);
1758
1759        let handle = start_test_server(checker).await;
1760        let port = handle.port();
1761
1762        let (status, body) = http_get(port, "/health").await;
1763        assert_eq!(status, 200);
1764        assert!(body.contains("\"status\":\"healthy\""));
1765        assert!(body.contains("\"version\""));
1766        assert!(body.contains("\"components\""));
1767
1768        handle.stop();
1769        let _ = handle.join().await;
1770    }
1771
1772    #[tokio::test]
1773    async fn test_healthz_endpoint() {
1774        let checker = HealthChecker::new();
1775        checker.set_status(HealthStatus::Healthy);
1776
1777        let handle = start_test_server(checker).await;
1778        let port = handle.port();
1779
1780        let (status, body) = http_get(port, "/healthz").await;
1781        assert_eq!(status, 200);
1782        assert!(body.contains("\"alive\":true"));
1783
1784        handle.stop();
1785        let _ = handle.join().await;
1786    }
1787
1788    #[tokio::test]
1789    async fn test_healthz_unhealthy() {
1790        let checker = HealthChecker::new();
1791        checker.set_status(HealthStatus::Unhealthy);
1792
1793        let handle = start_test_server(checker).await;
1794        let port = handle.port();
1795
1796        let (status, body) = http_get(port, "/healthz").await;
1797        assert_eq!(status, 503);
1798        assert!(body.contains("\"alive\":false"));
1799
1800        handle.stop();
1801        let _ = handle.join().await;
1802    }
1803
1804    #[tokio::test]
1805    async fn test_readyz_endpoint() {
1806        let checker = HealthChecker::new();
1807        checker.set_status(HealthStatus::Healthy);
1808        checker.set_storage_healthy(true);
1809        checker.set_network_healthy(true);
1810
1811        let handle = start_test_server(checker).await;
1812        let port = handle.port();
1813
1814        let (status, body) = http_get(port, "/readyz").await;
1815        assert_eq!(status, 200);
1816        assert!(body.contains("\"ready\":true"));
1817
1818        handle.stop();
1819        let _ = handle.join().await;
1820    }
1821
1822    #[tokio::test]
1823    async fn test_readyz_not_ready() {
1824        let checker = HealthChecker::new();
1825        // Starting status — not ready (storage/network not set)
1826
1827        let handle = start_test_server(checker).await;
1828        let port = handle.port();
1829
1830        let (status, body) = http_get(port, "/readyz").await;
1831        assert_eq!(status, 503);
1832        assert!(body.contains("\"ready\":false"));
1833
1834        handle.stop();
1835        let _ = handle.join().await;
1836    }
1837
1838    #[tokio::test]
1839    async fn test_livez_endpoint() {
1840        let checker = HealthChecker::new();
1841        checker.set_status(HealthStatus::Healthy);
1842
1843        let handle = start_test_server(checker).await;
1844        let port = handle.port();
1845
1846        let (status, body) = http_get(port, "/livez").await;
1847        assert_eq!(status, 200);
1848        assert!(body.contains("\"alive\":true"));
1849        assert!(body.contains("\"uptime_seconds\""));
1850
1851        handle.stop();
1852        let _ = handle.join().await;
1853    }
1854
1855    #[tokio::test]
1856    async fn test_metrics_endpoint() {
1857        let checker = HealthChecker::new();
1858        checker.set_status(HealthStatus::Healthy);
1859        checker.set_storage_healthy(true);
1860        checker.set_network_healthy(true);
1861        checker.record_snapshot();
1862        checker.record_snapshot();
1863
1864        let handle = start_test_server(checker).await;
1865        let port = handle.port();
1866
1867        let (status, body) = http_get(port, "/metrics").await;
1868        assert_eq!(status, 200);
1869        assert!(body.contains("\"uptime_seconds\""));
1870        assert!(body.contains("\"uptime_percent\""));
1871        assert!(body.contains("\"history_count\":2"));
1872        assert!(body.contains("\"history\""));
1873
1874        handle.stop();
1875        let _ = handle.join().await;
1876    }
1877
1878    #[tokio::test]
1879    async fn test_unknown_path_404() {
1880        let checker = HealthChecker::new();
1881
1882        let handle = start_test_server(checker).await;
1883        let port = handle.port();
1884
1885        let (status, body) = http_get(port, "/unknown").await;
1886        assert_eq!(status, 404);
1887        assert!(body.contains("not found"));
1888
1889        handle.stop();
1890        let _ = handle.join().await;
1891    }
1892
1893    #[tokio::test]
1894    async fn test_non_get_method_405() {
1895        let checker = HealthChecker::new();
1896
1897        let handle = start_test_server(checker).await;
1898        let port = handle.port();
1899
1900        let (status, body) = http_request(port, "POST", "/health").await;
1901        assert_eq!(status, 405);
1902        assert!(body.contains("method not allowed"));
1903
1904        handle.stop();
1905        let _ = handle.join().await;
1906    }
1907
1908    #[tokio::test]
1909    async fn test_concurrent_http_requests() {
1910        let checker = HealthChecker::new();
1911        checker.set_status(HealthStatus::Healthy);
1912        checker.set_storage_healthy(true);
1913        checker.set_network_healthy(true);
1914
1915        let handle = start_test_server(checker).await;
1916        let port = handle.port();
1917
1918        // Fire 10 concurrent requests across different endpoints
1919        let mut tasks = Vec::new();
1920        for i in 0..10 {
1921            let path = match i % 4 {
1922                0 => "/health",
1923                1 => "/healthz",
1924                2 => "/readyz",
1925                _ => "/livez",
1926            };
1927            tasks.push(tokio::spawn(async move { http_get(port, path).await }));
1928        }
1929
1930        for task in tasks {
1931            let (status, _body) = task.await.expect("task panicked");
1932            assert_eq!(status, 200);
1933        }
1934
1935        handle.stop();
1936        let _ = handle.join().await;
1937    }
1938
1939    #[tokio::test]
1940    async fn test_server_shutdown() {
1941        let checker = HealthChecker::new();
1942
1943        let handle = start_test_server(checker).await;
1944        let port = handle.port();
1945
1946        // Verify server is listening
1947        let (status, _) = http_get(port, "/healthz").await;
1948        assert_eq!(status, 200);
1949
1950        // Signal shutdown
1951        handle.stop();
1952        let result = handle.join().await;
1953        assert!(result.is_ok());
1954
1955        // After shutdown, connection should fail (with a small delay for cleanup)
1956        tokio::time::sleep(Duration::from_millis(300)).await;
1957        let connect_result = tokio::time::timeout(
1958            Duration::from_millis(500),
1959            tokio::net::TcpStream::connect(format!("127.0.0.1:{port}")),
1960        )
1961        .await;
1962
1963        // Either timeout or connection refused — both are acceptable
1964        match connect_result {
1965            Err(_) => {}     // timeout — server stopped
1966            Ok(Err(_)) => {} // connection refused — server stopped
1967            Ok(Ok(_)) => {
1968                // Connection succeeded — this can happen if the OS hasn't fully
1969                // released the port yet; we just verify the server task exited.
1970            }
1971        }
1972    }
1973}