1use async_trait::async_trait;
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
15use tokio::io::{AsyncReadExt, AsyncWriteExt};
16use tokio::net::TcpListener;
17use tracing::{debug, warn};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "lowercase")]
26pub enum HealthStatus {
27 Healthy,
29 Starting,
31 ShuttingDown,
33 Unhealthy,
35 Degraded,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ComponentHealth {
42 pub name: String,
44 pub status: HealthStatus,
46 pub message: Option<String>,
48 pub last_check: u64,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct HealthProbeResult {
59 pub status: ProbeStatus,
61 pub latency_ms: f64,
63 pub message: String,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "lowercase")]
70pub enum ProbeStatus {
71 Healthy,
73 Degraded,
75 Unhealthy,
77}
78
79impl ProbeStatus {
80 pub fn to_health_status(self) -> HealthStatus {
82 match self {
83 ProbeStatus::Healthy => HealthStatus::Healthy,
84 ProbeStatus::Degraded => HealthStatus::Degraded,
85 ProbeStatus::Unhealthy => HealthStatus::Unhealthy,
86 }
87 }
88
89 pub fn worse(self, other: ProbeStatus) -> ProbeStatus {
91 match (self, other) {
92 (ProbeStatus::Unhealthy, _) | (_, ProbeStatus::Unhealthy) => ProbeStatus::Unhealthy,
93 (ProbeStatus::Degraded, _) | (_, ProbeStatus::Degraded) => ProbeStatus::Degraded,
94 _ => ProbeStatus::Healthy,
95 }
96 }
97}
98
99#[async_trait]
101pub trait DeepHealthCheck: Send + Sync {
102 async fn check(&self) -> HealthProbeResult;
104}
105
106pub struct StorageProbe {
113 storage_path: std::path::PathBuf,
115}
116
117impl StorageProbe {
118 pub fn new(storage_path: std::path::PathBuf) -> Self {
120 Self { storage_path }
121 }
122}
123
124#[async_trait]
125impl DeepHealthCheck for StorageProbe {
126 async fn check(&self) -> HealthProbeResult {
127 let start = Instant::now();
128 let test_file = self.storage_path.join(".health_probe_test");
129
130 let write_result = tokio::fs::write(&test_file, b"health_probe").await;
132 if let Err(e) = write_result {
133 return HealthProbeResult {
134 status: ProbeStatus::Unhealthy,
135 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
136 message: format!("storage write failed: {e}"),
137 };
138 }
139
140 let read_result = tokio::fs::read(&test_file).await;
142 match read_result {
143 Ok(data) if data == b"health_probe" => {}
144 Ok(_) => {
145 let _ = tokio::fs::remove_file(&test_file).await;
146 return HealthProbeResult {
147 status: ProbeStatus::Unhealthy,
148 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
149 message: "storage read returned unexpected data".to_string(),
150 };
151 }
152 Err(e) => {
153 let _ = tokio::fs::remove_file(&test_file).await;
154 return HealthProbeResult {
155 status: ProbeStatus::Unhealthy,
156 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
157 message: format!("storage read failed: {e}"),
158 };
159 }
160 }
161
162 if let Err(e) = tokio::fs::remove_file(&test_file).await {
164 return HealthProbeResult {
165 status: ProbeStatus::Degraded,
166 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
167 message: format!("storage cleanup failed (non-critical): {e}"),
168 };
169 }
170
171 HealthProbeResult {
172 status: ProbeStatus::Healthy,
173 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
174 message: "storage read/write/delete OK".to_string(),
175 }
176 }
177}
178
179pub struct WalProbe {
181 wal_path: std::path::PathBuf,
183}
184
185impl WalProbe {
186 pub fn new(wal_path: std::path::PathBuf) -> Self {
188 Self { wal_path }
189 }
190}
191
192#[async_trait]
193impl DeepHealthCheck for WalProbe {
194 async fn check(&self) -> HealthProbeResult {
195 let start = Instant::now();
196 let test_file = self.wal_path.join(".wal_health_probe");
197
198 let result = tokio::fs::OpenOptions::new()
200 .create(true)
201 .append(true)
202 .truncate(false)
203 .open(&test_file)
204 .await;
205
206 match result {
207 Ok(_file) => {
208 let _ = tokio::fs::remove_file(&test_file).await;
209 HealthProbeResult {
210 status: ProbeStatus::Healthy,
211 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
212 message: "WAL directory is appendable".to_string(),
213 }
214 }
215 Err(e) => HealthProbeResult {
216 status: ProbeStatus::Unhealthy,
217 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
218 message: format!("WAL append test failed: {e}"),
219 },
220 }
221 }
222}
223
224#[cfg(any(target_os = "macos", target_os = "linux"))]
226unsafe extern "C" {
227 #[link_name = "statvfs"]
228 fn statvfs_raw(path: *const std::ffi::c_char, buf: *mut u8) -> std::ffi::c_int;
229}
230
231pub struct DiskSpaceProbe {
233 path: std::path::PathBuf,
235 min_free_bytes: u64,
237}
238
239impl DiskSpaceProbe {
240 pub fn new(path: std::path::PathBuf, min_free_bytes: u64) -> Self {
245 Self {
246 path,
247 min_free_bytes,
248 }
249 }
250
251 fn available_space(&self) -> Result<u64, String> {
256 self.available_space_impl()
257 }
258
259 #[cfg(target_os = "macos")]
260 fn available_space_impl(&self) -> Result<u64, String> {
261 use std::ffi::CString;
262 use std::os::unix::ffi::OsStrExt;
263
264 let c_path = CString::new(self.path.as_os_str().as_bytes())
265 .map_err(|e| format!("invalid path: {e}"))?;
266
267 #[repr(C)]
271 struct Statvfs {
272 f_bsize: u64,
273 f_frsize: u64,
274 f_blocks: u64,
275 f_bfree: u64,
276 f_bavail: u64,
277 _pad: [u64; 11],
279 }
280
281 let mut buf: Statvfs = unsafe { std::mem::zeroed() };
282 let ret = unsafe { statvfs_raw(c_path.as_ptr(), &mut buf as *mut Statvfs as *mut u8) };
284 if ret != 0 {
285 return Err(format!(
286 "statvfs failed: {}",
287 std::io::Error::last_os_error()
288 ));
289 }
290
291 let available = buf.f_bavail.saturating_mul(buf.f_frsize);
292 Ok(available)
293 }
294
295 #[cfg(target_os = "linux")]
296 fn available_space_impl(&self) -> Result<u64, String> {
297 use std::ffi::CString;
298 use std::os::unix::ffi::OsStrExt;
299
300 let c_path = CString::new(self.path.as_os_str().as_bytes())
301 .map_err(|e| format!("invalid path: {e}"))?;
302
303 #[repr(C)]
304 struct Statvfs {
305 f_bsize: u64,
306 f_frsize: u64,
307 f_blocks: u64,
308 f_bfree: u64,
309 f_bavail: u64,
310 _pad: [u64; 11],
311 }
312
313 let mut buf: Statvfs = unsafe { std::mem::zeroed() };
314 let ret = unsafe { statvfs_raw(c_path.as_ptr(), &mut buf as *mut Statvfs as *mut u8) };
315 if ret != 0 {
316 return Err(format!(
317 "statvfs failed: {}",
318 std::io::Error::last_os_error()
319 ));
320 }
321
322 let available = buf.f_bavail.saturating_mul(buf.f_frsize);
323 Ok(available)
324 }
325
326 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
327 fn available_space_impl(&self) -> Result<u64, String> {
328 if self.path.exists() {
330 Ok(u64::MAX)
331 } else {
332 Err("path does not exist".to_string())
333 }
334 }
335}
336
337#[async_trait]
338impl DeepHealthCheck for DiskSpaceProbe {
339 async fn check(&self) -> HealthProbeResult {
340 let start = Instant::now();
341
342 match self.available_space() {
343 Ok(available) => {
344 let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
345 if available >= self.min_free_bytes {
346 HealthProbeResult {
347 status: ProbeStatus::Healthy,
348 latency_ms,
349 message: format!(
350 "disk space OK: {} bytes available (threshold: {})",
351 available, self.min_free_bytes
352 ),
353 }
354 } else if available >= self.min_free_bytes / 4 {
355 HealthProbeResult {
356 status: ProbeStatus::Degraded,
357 latency_ms,
358 message: format!(
359 "disk space low: {} bytes available (threshold: {})",
360 available, self.min_free_bytes
361 ),
362 }
363 } else {
364 HealthProbeResult {
365 status: ProbeStatus::Unhealthy,
366 latency_ms,
367 message: format!(
368 "disk space critically low: {} bytes available (threshold: {})",
369 available, self.min_free_bytes
370 ),
371 }
372 }
373 }
374 Err(e) => HealthProbeResult {
375 status: ProbeStatus::Unhealthy,
376 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
377 message: format!("disk space check failed: {e}"),
378 },
379 }
380 }
381}
382
383#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct HealthSnapshot {
390 pub timestamp: u64,
392 pub status: HealthStatus,
394 pub alive: bool,
396 pub ready: bool,
398}
399
400#[derive(Debug)]
402pub struct HealthHistory {
403 buffer: Vec<Option<HealthSnapshot>>,
405 write_pos: usize,
407 total_written: usize,
409 capacity: usize,
411}
412
413impl HealthHistory {
414 pub fn new(capacity: usize) -> Self {
416 let capacity = capacity.max(1); Self {
418 buffer: (0..capacity).map(|_| None).collect(),
419 write_pos: 0,
420 total_written: 0,
421 capacity,
422 }
423 }
424
425 pub fn record(&mut self, snapshot: HealthSnapshot) {
427 self.buffer[self.write_pos] = Some(snapshot);
428 self.write_pos = (self.write_pos + 1) % self.capacity;
429 self.total_written += 1;
430 }
431
432 pub fn snapshots(&self) -> Vec<HealthSnapshot> {
434 let count = self.total_written.min(self.capacity);
435 let mut result = Vec::with_capacity(count);
436
437 if self.total_written < self.capacity {
438 for s in self.buffer.iter().take(self.write_pos).flatten() {
440 result.push(s.clone());
441 }
442 } else {
443 for i in 0..self.capacity {
445 let idx = (self.write_pos + i) % self.capacity;
446 if let Some(s) = &self.buffer[idx] {
447 result.push(s.clone());
448 }
449 }
450 }
451
452 result
453 }
454
455 pub fn uptime_percent(&self) -> f64 {
458 let snaps = self.snapshots();
459 if snaps.is_empty() {
460 return 100.0;
461 }
462 let alive_count = snaps.iter().filter(|s| s.alive).count();
463 (alive_count as f64 / snaps.len() as f64) * 100.0
464 }
465}
466
467#[derive(Debug, Clone, Serialize, Deserialize)]
473pub struct DependencyHealth {
474 pub name: String,
476 pub status: ProbeStatus,
478 pub latency_ms: f64,
480 pub last_checked: u64,
482 pub message: String,
484}
485
486#[derive(Debug, Clone, Serialize, Deserialize)]
492pub struct LivenessResponse {
493 pub alive: bool,
495 pub status: HealthStatus,
497 pub uptime_seconds: u64,
499 pub timestamp: u64,
501}
502
503#[derive(Debug, Clone, Serialize, Deserialize)]
505pub struct ReadinessResponse {
506 pub ready: bool,
508 pub status: HealthStatus,
510 pub components: Vec<ComponentHealth>,
512 pub dependencies: Vec<DependencyHealth>,
514 pub timestamp: u64,
516}
517
518#[derive(Debug, Clone, Serialize, Deserialize)]
524pub struct HealthCheckResponse {
525 pub status: HealthStatus,
527 pub version: String,
529 pub uptime_seconds: u64,
531 pub components: Vec<ComponentHealth>,
533 pub dependencies: Vec<DependencyHealth>,
535 pub probes: HashMap<String, HealthProbeResult>,
537 pub uptime_percent: f64,
539 pub timestamp: u64,
541}
542
543#[derive(Clone)]
552pub struct HealthChecker {
553 inner: Arc<HealthCheckerInner>,
554}
555
556struct HealthCheckerInner {
557 start_time: AtomicU64,
559 status: AtomicU64,
561 storage_healthy: AtomicBool,
563 network_healthy: AtomicBool,
565 cluster_enabled: AtomicBool,
567 cluster_healthy: AtomicBool,
569 probes: RwLock<HashMap<String, Arc<dyn DeepHealthCheck>>>,
571 dependency_checkers: RwLock<HashMap<String, Arc<dyn DeepHealthCheck>>>,
573 dependency_health: RwLock<HashMap<String, DependencyHealth>>,
575 history: RwLock<HealthHistory>,
577}
578
579fn now_secs() -> u64 {
580 SystemTime::now()
581 .duration_since(UNIX_EPOCH)
582 .map(|d| d.as_secs())
583 .unwrap_or(0)
584}
585
586impl HealthChecker {
587 pub fn new() -> Self {
589 Self::with_history_capacity(10)
590 }
591
592 pub fn with_history_capacity(capacity: usize) -> Self {
594 Self {
595 inner: Arc::new(HealthCheckerInner {
596 start_time: AtomicU64::new(now_secs()),
597 status: AtomicU64::new(HealthStatus::Starting as u64),
598 storage_healthy: AtomicBool::new(false),
599 network_healthy: AtomicBool::new(false),
600 cluster_enabled: AtomicBool::new(false),
601 cluster_healthy: AtomicBool::new(false),
602 probes: RwLock::new(HashMap::new()),
603 dependency_checkers: RwLock::new(HashMap::new()),
604 dependency_health: RwLock::new(HashMap::new()),
605 history: RwLock::new(HealthHistory::new(capacity)),
606 }),
607 }
608 }
609
610 pub fn set_status(&self, status: HealthStatus) {
614 self.inner.status.store(status as u64, Ordering::SeqCst);
615 }
616
617 pub fn status(&self) -> HealthStatus {
619 match self.inner.status.load(Ordering::SeqCst) {
620 0 => HealthStatus::Healthy,
621 1 => HealthStatus::Starting,
622 2 => HealthStatus::ShuttingDown,
623 3 => HealthStatus::Unhealthy,
624 4 => HealthStatus::Degraded,
625 _ => HealthStatus::Unhealthy,
626 }
627 }
628
629 pub fn set_storage_healthy(&self, healthy: bool) {
631 self.inner.storage_healthy.store(healthy, Ordering::SeqCst);
632 }
633
634 pub fn set_network_healthy(&self, healthy: bool) {
636 self.inner.network_healthy.store(healthy, Ordering::SeqCst);
637 }
638
639 pub fn set_cluster_enabled(&self, enabled: bool) {
641 self.inner.cluster_enabled.store(enabled, Ordering::SeqCst);
642 }
643
644 pub fn set_cluster_healthy(&self, healthy: bool) {
646 self.inner.cluster_healthy.store(healthy, Ordering::SeqCst);
647 }
648
649 pub fn uptime_seconds(&self) -> u64 {
651 let now = now_secs();
652 let start = self.inner.start_time.load(Ordering::SeqCst);
653 now.saturating_sub(start)
654 }
655
656 pub fn is_alive(&self) -> bool {
661 matches!(
662 self.status(),
663 HealthStatus::Healthy | HealthStatus::Starting | HealthStatus::Degraded
664 )
665 }
666
667 pub fn is_ready(&self) -> bool {
670 let status = self.status();
671 let base_ok = matches!(status, HealthStatus::Healthy | HealthStatus::Degraded);
672 base_ok
673 && self.inner.storage_healthy.load(Ordering::SeqCst)
674 && self.inner.network_healthy.load(Ordering::SeqCst)
675 }
676
677 pub fn liveness_response(&self) -> LivenessResponse {
679 LivenessResponse {
680 alive: self.is_alive(),
681 status: self.status(),
682 uptime_seconds: self.uptime_seconds(),
683 timestamp: now_secs(),
684 }
685 }
686
687 pub fn readiness_response(&self) -> ReadinessResponse {
689 let components = self.build_component_list();
690 let dependencies: Vec<DependencyHealth> = self
691 .inner
692 .dependency_health
693 .read()
694 .values()
695 .cloned()
696 .collect();
697
698 ReadinessResponse {
699 ready: self.is_ready(),
700 status: self.status(),
701 components,
702 dependencies,
703 timestamp: now_secs(),
704 }
705 }
706
707 pub fn register_probe(&self, name: impl Into<String>, probe: Arc<dyn DeepHealthCheck>) {
711 self.inner.probes.write().insert(name.into(), probe);
712 }
713
714 pub async fn run_probes(&self) -> HashMap<String, HealthProbeResult> {
716 let probes: Vec<(String, Arc<dyn DeepHealthCheck>)> = {
717 let guard = self.inner.probes.read();
718 guard
719 .iter()
720 .map(|(k, v)| (k.clone(), Arc::clone(v)))
721 .collect()
722 };
723
724 let mut results = HashMap::with_capacity(probes.len());
725 for (name, probe) in probes {
726 let result = probe.check().await;
727 results.insert(name, result);
728 }
729 results
730 }
731
732 pub fn register_dependency(&self, name: impl Into<String>, checker: Arc<dyn DeepHealthCheck>) {
736 let name = name.into();
737 self.inner
738 .dependency_checkers
739 .write()
740 .insert(name.clone(), checker);
741 self.inner.dependency_health.write().insert(
743 name.clone(),
744 DependencyHealth {
745 name,
746 status: ProbeStatus::Unhealthy,
747 latency_ms: 0.0,
748 last_checked: 0,
749 message: "not yet checked".to_string(),
750 },
751 );
752 }
753
754 pub async fn check_dependencies(&self) -> ProbeStatus {
757 let checkers: Vec<(String, Arc<dyn DeepHealthCheck>)> = {
758 let guard = self.inner.dependency_checkers.read();
759 guard
760 .iter()
761 .map(|(k, v)| (k.clone(), Arc::clone(v)))
762 .collect()
763 };
764
765 let mut worst = ProbeStatus::Healthy;
766 let now = now_secs();
767
768 for (name, checker) in checkers {
769 let result = checker.check().await;
770 worst = worst.worse(result.status);
771 let dep = DependencyHealth {
772 name: name.clone(),
773 status: result.status,
774 latency_ms: result.latency_ms,
775 last_checked: now,
776 message: result.message,
777 };
778 self.inner.dependency_health.write().insert(name, dep);
779 }
780
781 worst
782 }
783
784 pub fn aggregated_dependency_status(&self) -> ProbeStatus {
786 let guard = self.inner.dependency_health.read();
787 guard
788 .values()
789 .fold(ProbeStatus::Healthy, |acc, d| acc.worse(d.status))
790 }
791
792 pub fn record_snapshot(&self) {
796 let snapshot = HealthSnapshot {
797 timestamp: now_secs(),
798 status: self.status(),
799 alive: self.is_alive(),
800 ready: self.is_ready(),
801 };
802 self.inner.history.write().record(snapshot);
803 }
804
805 pub fn health_history(&self) -> Vec<HealthSnapshot> {
807 self.inner.history.read().snapshots()
808 }
809
810 pub fn uptime_percent(&self) -> f64 {
812 self.inner.history.read().uptime_percent()
813 }
814
815 pub fn get_health(&self) -> HealthCheckResponse {
819 let components = self.build_component_list();
820 let dependencies: Vec<DependencyHealth> = self
821 .inner
822 .dependency_health
823 .read()
824 .values()
825 .cloned()
826 .collect();
827 let probes = HashMap::new(); let uptime_pct = self.uptime_percent();
829
830 HealthCheckResponse {
831 status: self.status(),
832 version: env!("CARGO_PKG_VERSION").to_string(),
833 uptime_seconds: self.uptime_seconds(),
834 components,
835 dependencies,
836 probes,
837 uptime_percent: uptime_pct,
838 timestamp: now_secs(),
839 }
840 }
841
842 pub async fn get_health_deep(&self) -> HealthCheckResponse {
844 let components = self.build_component_list();
845 let dependencies: Vec<DependencyHealth> = self
846 .inner
847 .dependency_health
848 .read()
849 .values()
850 .cloned()
851 .collect();
852 let probes = self.run_probes().await;
853 let uptime_pct = self.uptime_percent();
854
855 HealthCheckResponse {
856 status: self.status(),
857 version: env!("CARGO_PKG_VERSION").to_string(),
858 uptime_seconds: self.uptime_seconds(),
859 components,
860 dependencies,
861 probes,
862 uptime_percent: uptime_pct,
863 timestamp: now_secs(),
864 }
865 }
866
867 pub fn get_health_json(&self) -> Result<String, serde_json::Error> {
869 serde_json::to_string_pretty(&self.get_health())
870 }
871
872 fn build_component_list(&self) -> Vec<ComponentHealth> {
875 let now = now_secs();
876
877 let storage_status = if self.inner.storage_healthy.load(Ordering::SeqCst) {
878 HealthStatus::Healthy
879 } else {
880 HealthStatus::Unhealthy
881 };
882
883 let network_status = if self.inner.network_healthy.load(Ordering::SeqCst) {
884 HealthStatus::Healthy
885 } else {
886 HealthStatus::Unhealthy
887 };
888
889 let cluster_healthy = self.inner.cluster_healthy.load(Ordering::SeqCst);
890 let cluster_enabled = self.inner.cluster_enabled.load(Ordering::SeqCst);
891 let cluster_status = if cluster_enabled {
892 if cluster_healthy {
893 HealthStatus::Healthy
894 } else {
895 HealthStatus::Unhealthy
896 }
897 } else {
898 HealthStatus::Starting };
900
901 let cluster_message = if cluster_enabled {
902 if cluster_healthy {
903 "cluster active".to_string()
904 } else {
905 "cluster unhealthy".to_string()
906 }
907 } else {
908 "cluster disabled (standalone mode)".to_string()
909 };
910
911 vec![
912 ComponentHealth {
913 name: "storage".to_string(),
914 status: storage_status,
915 message: None,
916 last_check: now,
917 },
918 ComponentHealth {
919 name: "network".to_string(),
920 status: network_status,
921 message: None,
922 last_check: now,
923 },
924 ComponentHealth {
925 name: "cluster".to_string(),
926 status: cluster_status,
927 message: Some(cluster_message),
928 last_check: now,
929 },
930 ]
931 }
932}
933
934impl Default for HealthChecker {
935 fn default() -> Self {
936 Self::new()
937 }
938}
939
940pub struct HealthHttpServer {
953 checker: Arc<HealthChecker>,
954 bind_addr: SocketAddr,
955 shutdown: Arc<AtomicBool>,
956}
957
958pub struct HealthHttpHandle {
960 shutdown: Arc<AtomicBool>,
961 port: u16,
962 join_handle: tokio::task::JoinHandle<Result<(), std::io::Error>>,
963}
964
965impl HealthHttpHandle {
966 pub fn stop(&self) {
968 self.shutdown.store(true, Ordering::SeqCst);
969 }
970
971 pub fn port(&self) -> u16 {
973 self.port
974 }
975
976 pub async fn join(self) -> Result<(), std::io::Error> {
978 match self.join_handle.await {
979 Ok(inner) => inner,
980 Err(e) => Err(std::io::Error::other(e)),
981 }
982 }
983}
984
985impl HealthHttpServer {
986 pub fn new(checker: Arc<HealthChecker>, bind_addr: SocketAddr) -> Self {
990 Self {
991 checker,
992 bind_addr,
993 shutdown: Arc::new(AtomicBool::new(false)),
994 }
995 }
996
997 pub async fn start(self) -> Result<HealthHttpHandle, std::io::Error> {
1002 let listener = TcpListener::bind(self.bind_addr).await?;
1003 let local_addr = listener.local_addr()?;
1004 let port = local_addr.port();
1005 let shutdown = Arc::clone(&self.shutdown);
1006 let checker = Arc::clone(&self.checker);
1007
1008 let shutdown_flag = Arc::clone(&shutdown);
1009 let join_handle =
1010 tokio::spawn(async move { Self::accept_loop(listener, checker, shutdown_flag).await });
1011
1012 Ok(HealthHttpHandle {
1013 shutdown,
1014 port,
1015 join_handle,
1016 })
1017 }
1018
1019 async fn accept_loop(
1021 listener: TcpListener,
1022 checker: Arc<HealthChecker>,
1023 shutdown: Arc<AtomicBool>,
1024 ) -> Result<(), std::io::Error> {
1025 loop {
1026 if shutdown.load(Ordering::SeqCst) {
1027 debug!("health HTTP server shutting down");
1028 break;
1029 }
1030
1031 let accept_result =
1033 tokio::time::timeout(Duration::from_millis(200), listener.accept()).await;
1034
1035 match accept_result {
1036 Ok(Ok((stream, _addr))) => {
1037 let checker = Arc::clone(&checker);
1038 tokio::spawn(async move {
1039 if let Err(e) = Self::handle_connection(stream, &checker).await {
1040 warn!("health HTTP connection error: {e}");
1041 }
1042 });
1043 }
1044 Ok(Err(e)) => {
1045 warn!("health HTTP accept error: {e}");
1046 }
1047 Err(_) => {
1048 }
1050 }
1051 }
1052 Ok(())
1053 }
1054
1055 async fn handle_connection(
1057 mut stream: tokio::net::TcpStream,
1058 checker: &HealthChecker,
1059 ) -> Result<(), std::io::Error> {
1060 let mut buf = [0u8; 4096];
1061 let n = stream.read(&mut buf).await?;
1062 if n == 0 {
1063 return Ok(());
1064 }
1065
1066 let request = String::from_utf8_lossy(&buf[..n]);
1067 let (method, path) = Self::parse_request_line(&request);
1068
1069 let (status_code, status_text, body) = match method {
1070 "GET" => Self::route(path, checker),
1071 _ => (
1072 405,
1073 "Method Not Allowed",
1074 r#"{"error":"method not allowed"}"#.to_string(),
1075 ),
1076 };
1077
1078 let response = format!(
1079 "HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1080 status_code,
1081 status_text,
1082 body.len(),
1083 body
1084 );
1085
1086 stream.write_all(response.as_bytes()).await?;
1087 stream.flush().await?;
1088 Ok(())
1089 }
1090
1091 fn parse_request_line(request: &str) -> (&str, &str) {
1094 let first_line = request.lines().next().unwrap_or("");
1095 let mut parts = first_line.split_whitespace();
1096 let method = parts.next().unwrap_or("");
1097 let path = parts.next().unwrap_or("");
1098 (method, path)
1099 }
1100
1101 fn route(path: &str, checker: &HealthChecker) -> (u16, &'static str, String) {
1103 match path {
1104 "/health" => Self::handle_health(checker),
1105 "/healthz" => Self::handle_healthz(checker),
1106 "/readyz" => Self::handle_readyz(checker),
1107 "/livez" => Self::handle_livez(checker),
1108 "/metrics" => Self::handle_metrics(checker),
1109 _ => (404, "Not Found", r#"{"error":"not found"}"#.to_string()),
1110 }
1111 }
1112
1113 fn handle_health(checker: &HealthChecker) -> (u16, &'static str, String) {
1115 let health = checker.get_health();
1116 let status_code = match health.status {
1117 HealthStatus::Healthy | HealthStatus::Degraded => 200,
1118 _ => 503,
1119 };
1120 let status_text = if status_code == 200 {
1121 "OK"
1122 } else {
1123 "Service Unavailable"
1124 };
1125 let body = serde_json::to_string(&health)
1126 .unwrap_or_else(|e| format!(r#"{{"error":"serialization failed: {e}"}}"#));
1127 (status_code, status_text, body)
1128 }
1129
1130 fn handle_healthz(checker: &HealthChecker) -> (u16, &'static str, String) {
1132 let alive = checker.is_alive();
1133 let status_code = if alive { 200 } else { 503 };
1134 let status_text = if alive { "OK" } else { "Service Unavailable" };
1135 let body = format!(r#"{{"alive":{alive}}}"#);
1136 (status_code, status_text, body)
1137 }
1138
1139 fn handle_readyz(checker: &HealthChecker) -> (u16, &'static str, String) {
1141 let ready = checker.is_ready();
1142 let status_code = if ready { 200 } else { 503 };
1143 let status_text = if ready { "OK" } else { "Service Unavailable" };
1144 let body = format!(r#"{{"ready":{ready}}}"#);
1145 (status_code, status_text, body)
1146 }
1147
1148 fn handle_livez(checker: &HealthChecker) -> (u16, &'static str, String) {
1150 let resp = checker.liveness_response();
1151 let status_code = if resp.alive { 200 } else { 503 };
1152 let status_text = if resp.alive {
1153 "OK"
1154 } else {
1155 "Service Unavailable"
1156 };
1157 let body = serde_json::to_string(&resp)
1158 .unwrap_or_else(|e| format!(r#"{{"error":"serialization failed: {e}"}}"#));
1159 (status_code, status_text, body)
1160 }
1161
1162 fn handle_metrics(checker: &HealthChecker) -> (u16, &'static str, String) {
1164 let history = checker.health_history();
1165 let uptime_percent = checker.uptime_percent();
1166 let uptime_seconds = checker.uptime_seconds();
1167
1168 #[derive(Serialize)]
1169 struct MetricsResponse {
1170 uptime_seconds: u64,
1171 uptime_percent: f64,
1172 history_count: usize,
1173 history: Vec<HealthSnapshot>,
1174 }
1175
1176 let resp = MetricsResponse {
1177 uptime_seconds,
1178 uptime_percent,
1179 history_count: history.len(),
1180 history,
1181 };
1182
1183 let body = serde_json::to_string(&resp)
1184 .unwrap_or_else(|e| format!(r#"{{"error":"serialization failed: {e}"}}"#));
1185 (200, "OK", body)
1186 }
1187}
1188
1189#[cfg(test)]
1194mod tests {
1195 use super::*;
1196 use std::thread::sleep;
1197
1198 #[test]
1201 fn test_health_checker_creation() {
1202 let checker = HealthChecker::new();
1203 assert_eq!(checker.status(), HealthStatus::Starting);
1204 assert!(!checker.is_ready());
1205 assert!(checker.is_alive());
1206 }
1207
1208 #[test]
1209 fn test_set_status() {
1210 let checker = HealthChecker::new();
1211
1212 checker.set_status(HealthStatus::Healthy);
1213 assert_eq!(checker.status(), HealthStatus::Healthy);
1214
1215 checker.set_status(HealthStatus::ShuttingDown);
1216 assert_eq!(checker.status(), HealthStatus::ShuttingDown);
1217
1218 checker.set_status(HealthStatus::Unhealthy);
1219 assert_eq!(checker.status(), HealthStatus::Unhealthy);
1220 }
1221
1222 #[test]
1223 fn test_component_health() {
1224 let checker = HealthChecker::new();
1225
1226 checker.set_storage_healthy(true);
1227 checker.set_network_healthy(true);
1228 checker.set_cluster_healthy(true);
1229 checker.set_status(HealthStatus::Healthy);
1230
1231 assert!(checker.is_ready());
1232 assert!(checker.is_alive());
1233 }
1234
1235 #[test]
1236 fn test_not_ready_when_components_unhealthy() {
1237 let checker = HealthChecker::new();
1238
1239 checker.set_status(HealthStatus::Healthy);
1240 checker.set_storage_healthy(false); assert!(!checker.is_ready());
1243 }
1244
1245 #[test]
1246 fn test_uptime() {
1247 let checker = HealthChecker::new();
1248 sleep(Duration::from_millis(100));
1249
1250 let uptime = checker.uptime_seconds();
1251 assert!(uptime < 1000); }
1254
1255 #[test]
1256 fn test_health_response() {
1257 let checker = HealthChecker::new();
1258 checker.set_storage_healthy(true);
1259 checker.set_network_healthy(true);
1260 checker.set_status(HealthStatus::Healthy);
1261
1262 let health = checker.get_health();
1263 assert_eq!(health.status, HealthStatus::Healthy);
1264 assert_eq!(health.components.len(), 3);
1265 assert_eq!(health.version, env!("CARGO_PKG_VERSION"));
1266 }
1267
1268 #[test]
1269 fn test_health_json() {
1270 let checker = HealthChecker::new();
1271 let json = checker.get_health_json();
1272 assert!(json.is_ok());
1273
1274 let json_str = json.expect("JSON serialization failed");
1275 assert!(json_str.contains("status"));
1276 assert!(json_str.contains("version"));
1277 assert!(json_str.contains("components"));
1278 }
1279
1280 #[test]
1281 fn test_is_alive() {
1282 let checker = HealthChecker::new();
1283
1284 checker.set_status(HealthStatus::Starting);
1285 assert!(checker.is_alive());
1286
1287 checker.set_status(HealthStatus::Healthy);
1288 assert!(checker.is_alive());
1289
1290 checker.set_status(HealthStatus::ShuttingDown);
1291 assert!(!checker.is_alive());
1292
1293 checker.set_status(HealthStatus::Unhealthy);
1294 assert!(!checker.is_alive());
1295 }
1296
1297 struct AlwaysHealthyProbe;
1301
1302 #[async_trait]
1303 impl DeepHealthCheck for AlwaysHealthyProbe {
1304 async fn check(&self) -> HealthProbeResult {
1305 HealthProbeResult {
1306 status: ProbeStatus::Healthy,
1307 latency_ms: 0.1,
1308 message: "always healthy".to_string(),
1309 }
1310 }
1311 }
1312
1313 struct AlwaysUnhealthyProbe;
1315
1316 #[async_trait]
1317 impl DeepHealthCheck for AlwaysUnhealthyProbe {
1318 async fn check(&self) -> HealthProbeResult {
1319 HealthProbeResult {
1320 status: ProbeStatus::Unhealthy,
1321 latency_ms: 5.0,
1322 message: "always unhealthy".to_string(),
1323 }
1324 }
1325 }
1326
1327 struct AlwaysDegradedProbe;
1329
1330 #[async_trait]
1331 impl DeepHealthCheck for AlwaysDegradedProbe {
1332 async fn check(&self) -> HealthProbeResult {
1333 HealthProbeResult {
1334 status: ProbeStatus::Degraded,
1335 latency_ms: 2.0,
1336 message: "always degraded".to_string(),
1337 }
1338 }
1339 }
1340
1341 #[tokio::test]
1342 async fn test_deep_probe_execution_and_result_reporting() {
1343 let checker = HealthChecker::new();
1344 checker.register_probe("test_healthy", Arc::new(AlwaysHealthyProbe));
1345 checker.register_probe("test_unhealthy", Arc::new(AlwaysUnhealthyProbe));
1346
1347 let results = checker.run_probes().await;
1348 assert_eq!(results.len(), 2);
1349
1350 let healthy = results.get("test_healthy").expect("missing healthy probe");
1351 assert_eq!(healthy.status, ProbeStatus::Healthy);
1352 assert_eq!(healthy.message, "always healthy");
1353
1354 let unhealthy = results
1355 .get("test_unhealthy")
1356 .expect("missing unhealthy probe");
1357 assert_eq!(unhealthy.status, ProbeStatus::Unhealthy);
1358 assert_eq!(unhealthy.message, "always unhealthy");
1359 }
1360
1361 #[tokio::test]
1362 async fn test_storage_probe_passes_with_valid_storage() {
1363 let dir = std::env::temp_dir().join("amaters_health_test_storage");
1364 let _ = std::fs::create_dir_all(&dir);
1365
1366 let probe = StorageProbe::new(dir.clone());
1367 let result = probe.check().await;
1368
1369 assert_eq!(result.status, ProbeStatus::Healthy);
1370 assert!(result.latency_ms >= 0.0);
1371 assert!(result.message.contains("OK"));
1372
1373 let _ = std::fs::remove_dir_all(&dir);
1374 }
1375
1376 #[tokio::test]
1377 async fn test_storage_probe_fails_with_invalid_path() {
1378 let probe = StorageProbe::new(std::path::PathBuf::from(
1379 "/nonexistent_path_for_health_check_test_12345",
1380 ));
1381 let result = probe.check().await;
1382 assert_eq!(result.status, ProbeStatus::Unhealthy);
1383 }
1384
1385 #[tokio::test]
1386 async fn test_wal_probe_passes() {
1387 let dir = std::env::temp_dir().join("amaters_health_test_wal");
1388 let _ = std::fs::create_dir_all(&dir);
1389
1390 let probe = WalProbe::new(dir.clone());
1391 let result = probe.check().await;
1392
1393 assert_eq!(result.status, ProbeStatus::Healthy);
1394 assert!(result.message.contains("appendable"));
1395
1396 let _ = std::fs::remove_dir_all(&dir);
1397 }
1398
1399 #[tokio::test]
1400 async fn test_disk_space_probe_healthy() {
1401 let probe = DiskSpaceProbe::new(std::env::temp_dir(), 1);
1403 let result = probe.check().await;
1404 assert_eq!(result.status, ProbeStatus::Healthy);
1405 }
1406
1407 #[test]
1410 fn test_liveness_vs_readiness_during_startup() {
1411 let checker = HealthChecker::new();
1412 assert!(checker.is_alive());
1414 assert!(!checker.is_ready());
1415
1416 let live_resp = checker.liveness_response();
1417 assert!(live_resp.alive);
1418
1419 let ready_resp = checker.readiness_response();
1420 assert!(!ready_resp.ready);
1421 }
1422
1423 #[test]
1424 fn test_liveness_vs_readiness_during_shutdown() {
1425 let checker = HealthChecker::new();
1426 checker.set_status(HealthStatus::ShuttingDown);
1427
1428 assert!(!checker.is_alive());
1429 assert!(!checker.is_ready());
1430
1431 let live_resp = checker.liveness_response();
1432 assert!(!live_resp.alive);
1433
1434 let ready_resp = checker.readiness_response();
1435 assert!(!ready_resp.ready);
1436 }
1437
1438 #[test]
1439 fn test_readiness_requires_components() {
1440 let checker = HealthChecker::new();
1441 checker.set_status(HealthStatus::Healthy);
1442 assert!(!checker.is_ready());
1444
1445 checker.set_storage_healthy(true);
1446 assert!(!checker.is_ready()); checker.set_network_healthy(true);
1449 assert!(checker.is_ready()); }
1451
1452 #[test]
1455 fn test_health_history_ring_buffer_correctness() {
1456 let mut history = HealthHistory::new(3);
1457
1458 for i in 0..5u64 {
1460 history.record(HealthSnapshot {
1461 timestamp: i,
1462 status: HealthStatus::Healthy,
1463 alive: true,
1464 ready: true,
1465 });
1466 }
1467
1468 let snaps = history.snapshots();
1469 assert_eq!(snaps.len(), 3);
1470 assert_eq!(snaps[0].timestamp, 2);
1472 assert_eq!(snaps[1].timestamp, 3);
1473 assert_eq!(snaps[2].timestamp, 4);
1474 }
1475
1476 #[test]
1477 fn test_health_history_partial_fill() {
1478 let mut history = HealthHistory::new(10);
1479
1480 history.record(HealthSnapshot {
1481 timestamp: 100,
1482 status: HealthStatus::Healthy,
1483 alive: true,
1484 ready: true,
1485 });
1486 history.record(HealthSnapshot {
1487 timestamp: 200,
1488 status: HealthStatus::Unhealthy,
1489 alive: false,
1490 ready: false,
1491 });
1492
1493 let snaps = history.snapshots();
1494 assert_eq!(snaps.len(), 2);
1495 assert_eq!(snaps[0].timestamp, 100);
1496 assert_eq!(snaps[1].timestamp, 200);
1497 }
1498
1499 #[test]
1502 fn test_uptime_percentage_all_alive() {
1503 let mut history = HealthHistory::new(5);
1504 for i in 0..5 {
1505 history.record(HealthSnapshot {
1506 timestamp: i,
1507 status: HealthStatus::Healthy,
1508 alive: true,
1509 ready: true,
1510 });
1511 }
1512 let pct = history.uptime_percent();
1513 assert!((pct - 100.0).abs() < f64::EPSILON);
1514 }
1515
1516 #[test]
1517 fn test_uptime_percentage_partial() {
1518 let mut history = HealthHistory::new(4);
1519 for i in 0..3 {
1521 history.record(HealthSnapshot {
1522 timestamp: i,
1523 status: HealthStatus::Healthy,
1524 alive: true,
1525 ready: true,
1526 });
1527 }
1528 history.record(HealthSnapshot {
1529 timestamp: 3,
1530 status: HealthStatus::Unhealthy,
1531 alive: false,
1532 ready: false,
1533 });
1534
1535 let pct = history.uptime_percent();
1536 assert!((pct - 75.0).abs() < 0.01);
1537 }
1538
1539 #[test]
1540 fn test_uptime_percentage_empty_is_100() {
1541 let history = HealthHistory::new(10);
1542 assert!((history.uptime_percent() - 100.0).abs() < f64::EPSILON);
1543 }
1544
1545 #[test]
1546 fn test_health_checker_uptime_percent_and_history() {
1547 let checker = HealthChecker::new();
1548 checker.set_status(HealthStatus::Healthy);
1549 checker.set_storage_healthy(true);
1550 checker.set_network_healthy(true);
1551
1552 checker.record_snapshot();
1553 checker.record_snapshot();
1554
1555 checker.set_status(HealthStatus::Unhealthy);
1556 checker.record_snapshot();
1557
1558 let history = checker.health_history();
1559 assert_eq!(history.len(), 3);
1560
1561 let pct = checker.uptime_percent();
1563 assert!((pct - 100.0 * 2.0 / 3.0).abs() < 0.01);
1564 }
1565
1566 #[tokio::test]
1569 async fn test_dependency_aggregation_one_unhealthy() {
1570 let checker = HealthChecker::new();
1571 checker.register_dependency("dep_ok", Arc::new(AlwaysHealthyProbe));
1572 checker.register_dependency("dep_bad", Arc::new(AlwaysUnhealthyProbe));
1573
1574 let worst = checker.check_dependencies().await;
1575 assert_eq!(worst, ProbeStatus::Unhealthy);
1576
1577 assert_eq!(
1579 checker.aggregated_dependency_status(),
1580 ProbeStatus::Unhealthy
1581 );
1582 }
1583
1584 #[tokio::test]
1585 async fn test_dependency_aggregation_all_healthy() {
1586 let checker = HealthChecker::new();
1587 checker.register_dependency("dep_a", Arc::new(AlwaysHealthyProbe));
1588 checker.register_dependency("dep_b", Arc::new(AlwaysHealthyProbe));
1589
1590 let worst = checker.check_dependencies().await;
1591 assert_eq!(worst, ProbeStatus::Healthy);
1592 }
1593
1594 #[tokio::test]
1595 async fn test_dependency_health_in_readiness_response() {
1596 let checker = HealthChecker::new();
1597 checker.set_status(HealthStatus::Healthy);
1598 checker.set_storage_healthy(true);
1599 checker.set_network_healthy(true);
1600 checker.register_dependency("cache", Arc::new(AlwaysHealthyProbe));
1601
1602 let _ = checker.check_dependencies().await;
1603
1604 let resp = checker.readiness_response();
1605 assert!(resp.ready);
1606 assert_eq!(resp.dependencies.len(), 1);
1607 assert_eq!(resp.dependencies[0].name, "cache");
1608 assert_eq!(resp.dependencies[0].status, ProbeStatus::Healthy);
1609 }
1610
1611 #[test]
1614 fn test_degraded_state_alive_and_ready() {
1615 let checker = HealthChecker::new();
1616 checker.set_status(HealthStatus::Degraded);
1617 checker.set_storage_healthy(true);
1618 checker.set_network_healthy(true);
1619
1620 assert!(checker.is_alive());
1622 assert!(checker.is_ready());
1623 }
1624
1625 #[tokio::test]
1626 async fn test_degraded_dependency_aggregation() {
1627 let checker = HealthChecker::new();
1628 checker.register_dependency("dep_ok", Arc::new(AlwaysHealthyProbe));
1629 checker.register_dependency("dep_degraded", Arc::new(AlwaysDegradedProbe));
1630
1631 let worst = checker.check_dependencies().await;
1632 assert_eq!(worst, ProbeStatus::Degraded);
1633 }
1634
1635 #[tokio::test]
1638 async fn test_concurrent_health_checks() {
1639 let checker = HealthChecker::new();
1640 checker.set_status(HealthStatus::Healthy);
1641 checker.set_storage_healthy(true);
1642 checker.set_network_healthy(true);
1643 checker.register_probe("probe_a", Arc::new(AlwaysHealthyProbe));
1644 checker.register_dependency("dep_a", Arc::new(AlwaysHealthyProbe));
1645
1646 let checker_clone1 = checker.clone();
1648 let checker_clone2 = checker.clone();
1649 let checker_clone3 = checker.clone();
1650
1651 let (r1, r2, r3) = tokio::join!(
1652 async move { checker_clone1.run_probes().await },
1653 async move { checker_clone2.check_dependencies().await },
1654 async move {
1655 checker_clone3.record_snapshot();
1656 checker_clone3.health_history()
1657 },
1658 );
1659
1660 assert_eq!(r1.len(), 1);
1661 assert_eq!(r2, ProbeStatus::Healthy);
1662 assert!(!r3.is_empty());
1663 }
1664
1665 #[test]
1668 fn test_probe_status_worse() {
1669 assert_eq!(
1670 ProbeStatus::Healthy.worse(ProbeStatus::Healthy),
1671 ProbeStatus::Healthy
1672 );
1673 assert_eq!(
1674 ProbeStatus::Healthy.worse(ProbeStatus::Degraded),
1675 ProbeStatus::Degraded
1676 );
1677 assert_eq!(
1678 ProbeStatus::Degraded.worse(ProbeStatus::Healthy),
1679 ProbeStatus::Degraded
1680 );
1681 assert_eq!(
1682 ProbeStatus::Healthy.worse(ProbeStatus::Unhealthy),
1683 ProbeStatus::Unhealthy
1684 );
1685 assert_eq!(
1686 ProbeStatus::Degraded.worse(ProbeStatus::Unhealthy),
1687 ProbeStatus::Unhealthy
1688 );
1689 }
1690
1691 #[tokio::test]
1694 async fn test_get_health_deep_includes_probes() {
1695 let checker = HealthChecker::new();
1696 checker.register_probe("deep_test", Arc::new(AlwaysHealthyProbe));
1697
1698 let resp = checker.get_health_deep().await;
1699 assert_eq!(resp.probes.len(), 1);
1700 let probe_result = resp.probes.get("deep_test").expect("missing probe result");
1701 assert_eq!(probe_result.status, ProbeStatus::Healthy);
1702 }
1703
1704 async fn start_test_server(checker: HealthChecker) -> HealthHttpHandle {
1707 let addr: SocketAddr = "127.0.0.1:0".parse().expect("valid addr");
1708 HealthHttpServer::new(Arc::new(checker), addr)
1709 .start()
1710 .await
1711 .expect("failed to start health HTTP server")
1712 }
1713
1714 async fn http_request(port: u16, method: &str, path: &str) -> (u16, String) {
1715 let mut stream = tokio::net::TcpStream::connect(format!("127.0.0.1:{port}"))
1716 .await
1717 .expect("failed to connect");
1718 let req =
1719 format!("{method} {path} HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n");
1720 stream.write_all(req.as_bytes()).await.expect("write");
1721 let mut resp = String::new();
1722 stream.read_to_string(&mut resp).await.expect("read");
1723 let line = resp.lines().next().unwrap_or("");
1724 let code: u16 = line
1725 .split_whitespace()
1726 .nth(1)
1727 .and_then(|s| s.parse().ok())
1728 .unwrap_or(0);
1729 let body = resp.split("\r\n\r\n").nth(1).unwrap_or("").to_string();
1730 (code, body)
1731 }
1732
1733 async fn http_get(port: u16, path: &str) -> (u16, String) {
1734 http_request(port, "GET", path).await
1735 }
1736
1737 #[tokio::test]
1738 async fn test_health_http_server_starts() {
1739 let checker = HealthChecker::new();
1740 let handle = start_test_server(checker).await;
1741 let port = handle.port();
1742 assert!(port > 0);
1743
1744 let result = tokio::net::TcpStream::connect(format!("127.0.0.1:{port}")).await;
1746 assert!(result.is_ok());
1747
1748 handle.stop();
1749 let _ = handle.join().await;
1750 }
1751
1752 #[tokio::test]
1753 async fn test_health_endpoint() {
1754 let checker = HealthChecker::new();
1755 checker.set_status(HealthStatus::Healthy);
1756 checker.set_storage_healthy(true);
1757 checker.set_network_healthy(true);
1758
1759 let handle = start_test_server(checker).await;
1760 let port = handle.port();
1761
1762 let (status, body) = http_get(port, "/health").await;
1763 assert_eq!(status, 200);
1764 assert!(body.contains("\"status\":\"healthy\""));
1765 assert!(body.contains("\"version\""));
1766 assert!(body.contains("\"components\""));
1767
1768 handle.stop();
1769 let _ = handle.join().await;
1770 }
1771
1772 #[tokio::test]
1773 async fn test_healthz_endpoint() {
1774 let checker = HealthChecker::new();
1775 checker.set_status(HealthStatus::Healthy);
1776
1777 let handle = start_test_server(checker).await;
1778 let port = handle.port();
1779
1780 let (status, body) = http_get(port, "/healthz").await;
1781 assert_eq!(status, 200);
1782 assert!(body.contains("\"alive\":true"));
1783
1784 handle.stop();
1785 let _ = handle.join().await;
1786 }
1787
1788 #[tokio::test]
1789 async fn test_healthz_unhealthy() {
1790 let checker = HealthChecker::new();
1791 checker.set_status(HealthStatus::Unhealthy);
1792
1793 let handle = start_test_server(checker).await;
1794 let port = handle.port();
1795
1796 let (status, body) = http_get(port, "/healthz").await;
1797 assert_eq!(status, 503);
1798 assert!(body.contains("\"alive\":false"));
1799
1800 handle.stop();
1801 let _ = handle.join().await;
1802 }
1803
1804 #[tokio::test]
1805 async fn test_readyz_endpoint() {
1806 let checker = HealthChecker::new();
1807 checker.set_status(HealthStatus::Healthy);
1808 checker.set_storage_healthy(true);
1809 checker.set_network_healthy(true);
1810
1811 let handle = start_test_server(checker).await;
1812 let port = handle.port();
1813
1814 let (status, body) = http_get(port, "/readyz").await;
1815 assert_eq!(status, 200);
1816 assert!(body.contains("\"ready\":true"));
1817
1818 handle.stop();
1819 let _ = handle.join().await;
1820 }
1821
1822 #[tokio::test]
1823 async fn test_readyz_not_ready() {
1824 let checker = HealthChecker::new();
1825 let handle = start_test_server(checker).await;
1828 let port = handle.port();
1829
1830 let (status, body) = http_get(port, "/readyz").await;
1831 assert_eq!(status, 503);
1832 assert!(body.contains("\"ready\":false"));
1833
1834 handle.stop();
1835 let _ = handle.join().await;
1836 }
1837
1838 #[tokio::test]
1839 async fn test_livez_endpoint() {
1840 let checker = HealthChecker::new();
1841 checker.set_status(HealthStatus::Healthy);
1842
1843 let handle = start_test_server(checker).await;
1844 let port = handle.port();
1845
1846 let (status, body) = http_get(port, "/livez").await;
1847 assert_eq!(status, 200);
1848 assert!(body.contains("\"alive\":true"));
1849 assert!(body.contains("\"uptime_seconds\""));
1850
1851 handle.stop();
1852 let _ = handle.join().await;
1853 }
1854
1855 #[tokio::test]
1856 async fn test_metrics_endpoint() {
1857 let checker = HealthChecker::new();
1858 checker.set_status(HealthStatus::Healthy);
1859 checker.set_storage_healthy(true);
1860 checker.set_network_healthy(true);
1861 checker.record_snapshot();
1862 checker.record_snapshot();
1863
1864 let handle = start_test_server(checker).await;
1865 let port = handle.port();
1866
1867 let (status, body) = http_get(port, "/metrics").await;
1868 assert_eq!(status, 200);
1869 assert!(body.contains("\"uptime_seconds\""));
1870 assert!(body.contains("\"uptime_percent\""));
1871 assert!(body.contains("\"history_count\":2"));
1872 assert!(body.contains("\"history\""));
1873
1874 handle.stop();
1875 let _ = handle.join().await;
1876 }
1877
1878 #[tokio::test]
1879 async fn test_unknown_path_404() {
1880 let checker = HealthChecker::new();
1881
1882 let handle = start_test_server(checker).await;
1883 let port = handle.port();
1884
1885 let (status, body) = http_get(port, "/unknown").await;
1886 assert_eq!(status, 404);
1887 assert!(body.contains("not found"));
1888
1889 handle.stop();
1890 let _ = handle.join().await;
1891 }
1892
1893 #[tokio::test]
1894 async fn test_non_get_method_405() {
1895 let checker = HealthChecker::new();
1896
1897 let handle = start_test_server(checker).await;
1898 let port = handle.port();
1899
1900 let (status, body) = http_request(port, "POST", "/health").await;
1901 assert_eq!(status, 405);
1902 assert!(body.contains("method not allowed"));
1903
1904 handle.stop();
1905 let _ = handle.join().await;
1906 }
1907
1908 #[tokio::test]
1909 async fn test_concurrent_http_requests() {
1910 let checker = HealthChecker::new();
1911 checker.set_status(HealthStatus::Healthy);
1912 checker.set_storage_healthy(true);
1913 checker.set_network_healthy(true);
1914
1915 let handle = start_test_server(checker).await;
1916 let port = handle.port();
1917
1918 let mut tasks = Vec::new();
1920 for i in 0..10 {
1921 let path = match i % 4 {
1922 0 => "/health",
1923 1 => "/healthz",
1924 2 => "/readyz",
1925 _ => "/livez",
1926 };
1927 tasks.push(tokio::spawn(async move { http_get(port, path).await }));
1928 }
1929
1930 for task in tasks {
1931 let (status, _body) = task.await.expect("task panicked");
1932 assert_eq!(status, 200);
1933 }
1934
1935 handle.stop();
1936 let _ = handle.join().await;
1937 }
1938
1939 #[tokio::test]
1940 async fn test_server_shutdown() {
1941 let checker = HealthChecker::new();
1942
1943 let handle = start_test_server(checker).await;
1944 let port = handle.port();
1945
1946 let (status, _) = http_get(port, "/healthz").await;
1948 assert_eq!(status, 200);
1949
1950 handle.stop();
1952 let result = handle.join().await;
1953 assert!(result.is_ok());
1954
1955 tokio::time::sleep(Duration::from_millis(300)).await;
1957 let connect_result = tokio::time::timeout(
1958 Duration::from_millis(500),
1959 tokio::net::TcpStream::connect(format!("127.0.0.1:{port}")),
1960 )
1961 .await;
1962
1963 match connect_result {
1965 Err(_) => {} Ok(Err(_)) => {} Ok(Ok(_)) => {
1968 }
1971 }
1972 }
1973}