1use async_trait::async_trait;
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::net::SocketAddr;
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
15use tokio::io::{AsyncReadExt, AsyncWriteExt};
16use tokio::net::TcpListener;
17use tracing::{debug, warn};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "lowercase")]
26pub enum HealthStatus {
27 Healthy,
29 Starting,
31 ShuttingDown,
33 Unhealthy,
35 Degraded,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ComponentHealth {
42 pub name: String,
44 pub status: HealthStatus,
46 pub message: Option<String>,
48 pub last_check: u64,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct HealthProbeResult {
59 pub status: ProbeStatus,
61 pub latency_ms: f64,
63 pub message: String,
65}
66
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
69#[serde(rename_all = "lowercase")]
70pub enum ProbeStatus {
71 Healthy,
73 Degraded,
75 Unhealthy,
77}
78
79impl ProbeStatus {
80 pub fn to_health_status(self) -> HealthStatus {
82 match self {
83 ProbeStatus::Healthy => HealthStatus::Healthy,
84 ProbeStatus::Degraded => HealthStatus::Degraded,
85 ProbeStatus::Unhealthy => HealthStatus::Unhealthy,
86 }
87 }
88
89 pub fn worse(self, other: ProbeStatus) -> ProbeStatus {
91 match (self, other) {
92 (ProbeStatus::Unhealthy, _) | (_, ProbeStatus::Unhealthy) => ProbeStatus::Unhealthy,
93 (ProbeStatus::Degraded, _) | (_, ProbeStatus::Degraded) => ProbeStatus::Degraded,
94 _ => ProbeStatus::Healthy,
95 }
96 }
97}
98
99#[async_trait]
101pub trait DeepHealthCheck: Send + Sync {
102 async fn check(&self) -> HealthProbeResult;
104}
105
106pub struct StorageProbe {
113 storage_path: std::path::PathBuf,
115}
116
117impl StorageProbe {
118 pub fn new(storage_path: std::path::PathBuf) -> Self {
120 Self { storage_path }
121 }
122}
123
124#[async_trait]
125impl DeepHealthCheck for StorageProbe {
126 async fn check(&self) -> HealthProbeResult {
127 let start = Instant::now();
128 let test_file = self.storage_path.join(".health_probe_test");
129
130 let write_result = tokio::fs::write(&test_file, b"health_probe").await;
132 if let Err(e) = write_result {
133 return HealthProbeResult {
134 status: ProbeStatus::Unhealthy,
135 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
136 message: format!("storage write failed: {e}"),
137 };
138 }
139
140 let read_result = tokio::fs::read(&test_file).await;
142 match read_result {
143 Ok(data) if data == b"health_probe" => {}
144 Ok(_) => {
145 let _ = tokio::fs::remove_file(&test_file).await;
146 return HealthProbeResult {
147 status: ProbeStatus::Unhealthy,
148 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
149 message: "storage read returned unexpected data".to_string(),
150 };
151 }
152 Err(e) => {
153 let _ = tokio::fs::remove_file(&test_file).await;
154 return HealthProbeResult {
155 status: ProbeStatus::Unhealthy,
156 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
157 message: format!("storage read failed: {e}"),
158 };
159 }
160 }
161
162 if let Err(e) = tokio::fs::remove_file(&test_file).await {
164 return HealthProbeResult {
165 status: ProbeStatus::Degraded,
166 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
167 message: format!("storage cleanup failed (non-critical): {e}"),
168 };
169 }
170
171 HealthProbeResult {
172 status: ProbeStatus::Healthy,
173 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
174 message: "storage read/write/delete OK".to_string(),
175 }
176 }
177}
178
179pub struct WalProbe {
181 wal_path: std::path::PathBuf,
183}
184
185impl WalProbe {
186 pub fn new(wal_path: std::path::PathBuf) -> Self {
188 Self { wal_path }
189 }
190}
191
192#[async_trait]
193impl DeepHealthCheck for WalProbe {
194 async fn check(&self) -> HealthProbeResult {
195 let start = Instant::now();
196 let test_file = self.wal_path.join(".wal_health_probe");
197
198 let result = tokio::fs::OpenOptions::new()
200 .create(true)
201 .append(true)
202 .truncate(false)
203 .open(&test_file)
204 .await;
205
206 match result {
207 Ok(_file) => {
208 let _ = tokio::fs::remove_file(&test_file).await;
209 HealthProbeResult {
210 status: ProbeStatus::Healthy,
211 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
212 message: "WAL directory is appendable".to_string(),
213 }
214 }
215 Err(e) => HealthProbeResult {
216 status: ProbeStatus::Unhealthy,
217 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
218 message: format!("WAL append test failed: {e}"),
219 },
220 }
221 }
222}
223
224#[cfg(any(target_os = "macos", target_os = "linux"))]
226unsafe extern "C" {
227 #[link_name = "statvfs"]
228 fn statvfs_raw(path: *const std::ffi::c_char, buf: *mut u8) -> std::ffi::c_int;
229}
230
231pub struct DiskSpaceProbe {
233 path: std::path::PathBuf,
235 min_free_bytes: u64,
237}
238
239impl DiskSpaceProbe {
240 pub fn new(path: std::path::PathBuf, min_free_bytes: u64) -> Self {
245 Self {
246 path,
247 min_free_bytes,
248 }
249 }
250
251 fn available_space(&self) -> Result<u64, String> {
256 self.available_space_impl()
257 }
258
259 #[cfg(target_os = "macos")]
260 fn available_space_impl(&self) -> Result<u64, String> {
261 use std::ffi::CString;
262 use std::os::unix::ffi::OsStrExt;
263
264 let c_path = CString::new(self.path.as_os_str().as_bytes())
265 .map_err(|e| format!("invalid path: {e}"))?;
266
267 #[repr(C)]
271 struct Statvfs {
272 f_bsize: u64,
273 f_frsize: u64,
274 f_blocks: u64,
275 f_bfree: u64,
276 f_bavail: u64,
277 _pad: [u64; 11],
279 }
280
281 let mut buf: Statvfs = unsafe { std::mem::zeroed() };
282 let ret = unsafe { statvfs_raw(c_path.as_ptr(), &mut buf as *mut Statvfs as *mut u8) };
284 if ret != 0 {
285 return Err(format!(
286 "statvfs failed: {}",
287 std::io::Error::last_os_error()
288 ));
289 }
290
291 let available = buf.f_bavail.saturating_mul(buf.f_frsize);
292 Ok(available)
293 }
294
295 #[cfg(target_os = "linux")]
296 fn available_space_impl(&self) -> Result<u64, String> {
297 use std::ffi::CString;
298 use std::os::unix::ffi::OsStrExt;
299
300 let c_path = CString::new(self.path.as_os_str().as_bytes())
301 .map_err(|e| format!("invalid path: {e}"))?;
302
303 #[repr(C)]
304 struct Statvfs {
305 f_bsize: u64,
306 f_frsize: u64,
307 f_blocks: u64,
308 f_bfree: u64,
309 f_bavail: u64,
310 _pad: [u64; 11],
311 }
312
313 let mut buf: Statvfs = unsafe { std::mem::zeroed() };
314 let ret = unsafe { statvfs_raw(c_path.as_ptr(), &mut buf as *mut Statvfs as *mut u8) };
315 if ret != 0 {
316 return Err(format!(
317 "statvfs failed: {}",
318 std::io::Error::last_os_error()
319 ));
320 }
321
322 let available = buf.f_bavail.saturating_mul(buf.f_frsize);
323 Ok(available)
324 }
325
326 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
327 fn available_space_impl(&self) -> Result<u64, String> {
328 if self.path.exists() {
330 Ok(u64::MAX)
331 } else {
332 Err("path does not exist".to_string())
333 }
334 }
335}
336
337#[async_trait]
338impl DeepHealthCheck for DiskSpaceProbe {
339 async fn check(&self) -> HealthProbeResult {
340 let start = Instant::now();
341
342 match self.available_space() {
343 Ok(available) => {
344 let latency_ms = start.elapsed().as_secs_f64() * 1000.0;
345 if available >= self.min_free_bytes {
346 HealthProbeResult {
347 status: ProbeStatus::Healthy,
348 latency_ms,
349 message: format!(
350 "disk space OK: {} bytes available (threshold: {})",
351 available, self.min_free_bytes
352 ),
353 }
354 } else if available >= self.min_free_bytes / 4 {
355 HealthProbeResult {
356 status: ProbeStatus::Degraded,
357 latency_ms,
358 message: format!(
359 "disk space low: {} bytes available (threshold: {})",
360 available, self.min_free_bytes
361 ),
362 }
363 } else {
364 HealthProbeResult {
365 status: ProbeStatus::Unhealthy,
366 latency_ms,
367 message: format!(
368 "disk space critically low: {} bytes available (threshold: {})",
369 available, self.min_free_bytes
370 ),
371 }
372 }
373 }
374 Err(e) => HealthProbeResult {
375 status: ProbeStatus::Unhealthy,
376 latency_ms: start.elapsed().as_secs_f64() * 1000.0,
377 message: format!("disk space check failed: {e}"),
378 },
379 }
380 }
381}
382
383#[derive(Debug, Clone, Serialize, Deserialize)]
389pub struct HealthSnapshot {
390 pub timestamp: u64,
392 pub status: HealthStatus,
394 pub alive: bool,
396 pub ready: bool,
398}
399
400#[derive(Debug)]
402pub struct HealthHistory {
403 buffer: Vec<Option<HealthSnapshot>>,
405 write_pos: usize,
407 total_written: usize,
409 capacity: usize,
411}
412
413impl HealthHistory {
414 pub fn new(capacity: usize) -> Self {
416 let capacity = capacity.max(1); Self {
418 buffer: (0..capacity).map(|_| None).collect(),
419 write_pos: 0,
420 total_written: 0,
421 capacity,
422 }
423 }
424
425 pub fn record(&mut self, snapshot: HealthSnapshot) {
427 self.buffer[self.write_pos] = Some(snapshot);
428 self.write_pos = (self.write_pos + 1) % self.capacity;
429 self.total_written += 1;
430 }
431
432 pub fn snapshots(&self) -> Vec<HealthSnapshot> {
434 let count = self.total_written.min(self.capacity);
435 let mut result = Vec::with_capacity(count);
436
437 if self.total_written < self.capacity {
438 for s in self.buffer.iter().take(self.write_pos).flatten() {
440 result.push(s.clone());
441 }
442 } else {
443 for i in 0..self.capacity {
445 let idx = (self.write_pos + i) % self.capacity;
446 if let Some(s) = &self.buffer[idx] {
447 result.push(s.clone());
448 }
449 }
450 }
451
452 result
453 }
454
455 pub fn uptime_percent(&self) -> f64 {
458 let snaps = self.snapshots();
459 if snaps.is_empty() {
460 return 100.0;
461 }
462 let alive_count = snaps.iter().filter(|s| s.alive).count();
463 (alive_count as f64 / snaps.len() as f64) * 100.0
464 }
465}
466
467#[derive(Debug, Clone, Serialize, Deserialize)]
473pub struct DependencyHealth {
474 pub name: String,
476 pub status: ProbeStatus,
478 pub latency_ms: f64,
480 pub last_checked: u64,
482 pub message: String,
484}
485
486#[derive(Debug, Clone, Serialize, Deserialize)]
492pub struct LivenessResponse {
493 pub alive: bool,
495 pub status: HealthStatus,
497 pub uptime_seconds: u64,
499 pub timestamp: u64,
501}
502
503#[derive(Debug, Clone, Serialize, Deserialize)]
505pub struct ReadinessResponse {
506 pub ready: bool,
508 pub status: HealthStatus,
510 pub components: Vec<ComponentHealth>,
512 pub dependencies: Vec<DependencyHealth>,
514 pub timestamp: u64,
516}
517
518#[derive(Debug, Clone, Serialize, Deserialize)]
524pub struct HealthCheckResponse {
525 pub status: HealthStatus,
527 pub version: String,
529 pub uptime_seconds: u64,
531 pub components: Vec<ComponentHealth>,
533 pub dependencies: Vec<DependencyHealth>,
535 pub probes: HashMap<String, HealthProbeResult>,
537 pub uptime_percent: f64,
539 pub timestamp: u64,
541}
542
543#[derive(Clone)]
552pub struct HealthChecker {
553 inner: Arc<HealthCheckerInner>,
554}
555
556struct HealthCheckerInner {
557 start_time: AtomicU64,
559 status: AtomicU64,
561 storage_healthy: AtomicBool,
563 network_healthy: AtomicBool,
565 cluster_enabled: AtomicBool,
567 cluster_healthy: AtomicBool,
569 probes: RwLock<HashMap<String, Arc<dyn DeepHealthCheck>>>,
571 dependency_checkers: RwLock<HashMap<String, Arc<dyn DeepHealthCheck>>>,
573 dependency_health: RwLock<HashMap<String, DependencyHealth>>,
575 history: RwLock<HealthHistory>,
577}
578
579fn now_secs() -> u64 {
580 SystemTime::now()
581 .duration_since(UNIX_EPOCH)
582 .map(|d| d.as_secs())
583 .unwrap_or(0)
584}
585
586impl HealthChecker {
587 pub fn new() -> Self {
589 Self::with_history_capacity(10)
590 }
591
592 pub fn with_history_capacity(capacity: usize) -> Self {
594 Self {
595 inner: Arc::new(HealthCheckerInner {
596 start_time: AtomicU64::new(now_secs()),
597 status: AtomicU64::new(HealthStatus::Starting as u64),
598 storage_healthy: AtomicBool::new(false),
599 network_healthy: AtomicBool::new(false),
600 cluster_enabled: AtomicBool::new(false),
601 cluster_healthy: AtomicBool::new(false),
602 probes: RwLock::new(HashMap::new()),
603 dependency_checkers: RwLock::new(HashMap::new()),
604 dependency_health: RwLock::new(HashMap::new()),
605 history: RwLock::new(HealthHistory::new(capacity)),
606 }),
607 }
608 }
609
610 pub fn set_status(&self, status: HealthStatus) {
614 self.inner.status.store(status as u64, Ordering::SeqCst);
615 }
616
617 pub fn status(&self) -> HealthStatus {
619 match self.inner.status.load(Ordering::SeqCst) {
620 0 => HealthStatus::Healthy,
621 1 => HealthStatus::Starting,
622 2 => HealthStatus::ShuttingDown,
623 3 => HealthStatus::Unhealthy,
624 4 => HealthStatus::Degraded,
625 _ => HealthStatus::Unhealthy,
626 }
627 }
628
629 pub fn set_storage_healthy(&self, healthy: bool) {
631 self.inner.storage_healthy.store(healthy, Ordering::SeqCst);
632 }
633
634 pub fn set_network_healthy(&self, healthy: bool) {
636 self.inner.network_healthy.store(healthy, Ordering::SeqCst);
637 }
638
639 pub fn set_cluster_enabled(&self, enabled: bool) {
641 self.inner.cluster_enabled.store(enabled, Ordering::SeqCst);
642 }
643
644 pub fn set_cluster_healthy(&self, healthy: bool) {
646 self.inner.cluster_healthy.store(healthy, Ordering::SeqCst);
647 }
648
649 pub fn uptime_seconds(&self) -> u64 {
651 let now = now_secs();
652 let start = self.inner.start_time.load(Ordering::SeqCst);
653 now.saturating_sub(start)
654 }
655
656 pub fn is_alive(&self) -> bool {
661 matches!(
662 self.status(),
663 HealthStatus::Healthy | HealthStatus::Starting | HealthStatus::Degraded
664 )
665 }
666
667 pub fn is_ready(&self) -> bool {
670 let status = self.status();
671 let base_ok = matches!(status, HealthStatus::Healthy | HealthStatus::Degraded);
672 base_ok
673 && self.inner.storage_healthy.load(Ordering::SeqCst)
674 && self.inner.network_healthy.load(Ordering::SeqCst)
675 }
676
677 pub fn liveness_response(&self) -> LivenessResponse {
679 LivenessResponse {
680 alive: self.is_alive(),
681 status: self.status(),
682 uptime_seconds: self.uptime_seconds(),
683 timestamp: now_secs(),
684 }
685 }
686
687 pub fn readiness_response(&self) -> ReadinessResponse {
689 let components = self.build_component_list();
690 let dependencies: Vec<DependencyHealth> = self
691 .inner
692 .dependency_health
693 .read()
694 .values()
695 .cloned()
696 .collect();
697
698 ReadinessResponse {
699 ready: self.is_ready(),
700 status: self.status(),
701 components,
702 dependencies,
703 timestamp: now_secs(),
704 }
705 }
706
707 pub fn register_probe(&self, name: impl Into<String>, probe: Arc<dyn DeepHealthCheck>) {
711 self.inner.probes.write().insert(name.into(), probe);
712 }
713
714 pub async fn run_probes(&self) -> HashMap<String, HealthProbeResult> {
716 let probes: Vec<(String, Arc<dyn DeepHealthCheck>)> = {
717 let guard = self.inner.probes.read();
718 guard
719 .iter()
720 .map(|(k, v)| (k.clone(), Arc::clone(v)))
721 .collect()
722 };
723
724 let mut results = HashMap::with_capacity(probes.len());
725 for (name, probe) in probes {
726 let result = probe.check().await;
727 results.insert(name, result);
728 }
729 results
730 }
731
732 pub fn register_dependency(&self, name: impl Into<String>, checker: Arc<dyn DeepHealthCheck>) {
736 let name = name.into();
737 self.inner
738 .dependency_checkers
739 .write()
740 .insert(name.clone(), checker);
741 self.inner.dependency_health.write().insert(
743 name.clone(),
744 DependencyHealth {
745 name,
746 status: ProbeStatus::Unhealthy,
747 latency_ms: 0.0,
748 last_checked: 0,
749 message: "not yet checked".to_string(),
750 },
751 );
752 }
753
754 pub async fn check_dependencies(&self) -> ProbeStatus {
757 let checkers: Vec<(String, Arc<dyn DeepHealthCheck>)> = {
758 let guard = self.inner.dependency_checkers.read();
759 guard
760 .iter()
761 .map(|(k, v)| (k.clone(), Arc::clone(v)))
762 .collect()
763 };
764
765 let mut worst = ProbeStatus::Healthy;
766 let now = now_secs();
767
768 for (name, checker) in checkers {
769 let result = checker.check().await;
770 worst = worst.worse(result.status);
771 let dep = DependencyHealth {
772 name: name.clone(),
773 status: result.status,
774 latency_ms: result.latency_ms,
775 last_checked: now,
776 message: result.message,
777 };
778 self.inner.dependency_health.write().insert(name, dep);
779 }
780
781 worst
782 }
783
784 pub fn aggregated_dependency_status(&self) -> ProbeStatus {
786 let guard = self.inner.dependency_health.read();
787 guard
788 .values()
789 .fold(ProbeStatus::Healthy, |acc, d| acc.worse(d.status))
790 }
791
792 pub fn record_snapshot(&self) {
796 let snapshot = HealthSnapshot {
797 timestamp: now_secs(),
798 status: self.status(),
799 alive: self.is_alive(),
800 ready: self.is_ready(),
801 };
802 self.inner.history.write().record(snapshot);
803 }
804
805 pub fn health_history(&self) -> Vec<HealthSnapshot> {
807 self.inner.history.read().snapshots()
808 }
809
810 pub fn uptime_percent(&self) -> f64 {
812 self.inner.history.read().uptime_percent()
813 }
814
815 pub fn get_health(&self) -> HealthCheckResponse {
819 let components = self.build_component_list();
820 let dependencies: Vec<DependencyHealth> = self
821 .inner
822 .dependency_health
823 .read()
824 .values()
825 .cloned()
826 .collect();
827 let probes = HashMap::new(); let uptime_pct = self.uptime_percent();
829
830 HealthCheckResponse {
831 status: self.status(),
832 version: env!("CARGO_PKG_VERSION").to_string(),
833 uptime_seconds: self.uptime_seconds(),
834 components,
835 dependencies,
836 probes,
837 uptime_percent: uptime_pct,
838 timestamp: now_secs(),
839 }
840 }
841
842 pub async fn get_health_deep(&self) -> HealthCheckResponse {
844 let components = self.build_component_list();
845 let dependencies: Vec<DependencyHealth> = self
846 .inner
847 .dependency_health
848 .read()
849 .values()
850 .cloned()
851 .collect();
852 let probes = self.run_probes().await;
853 let uptime_pct = self.uptime_percent();
854
855 HealthCheckResponse {
856 status: self.status(),
857 version: env!("CARGO_PKG_VERSION").to_string(),
858 uptime_seconds: self.uptime_seconds(),
859 components,
860 dependencies,
861 probes,
862 uptime_percent: uptime_pct,
863 timestamp: now_secs(),
864 }
865 }
866
867 pub fn get_health_json(&self) -> Result<String, serde_json::Error> {
869 serde_json::to_string_pretty(&self.get_health())
870 }
871
872 fn build_component_list(&self) -> Vec<ComponentHealth> {
875 let now = now_secs();
876
877 let storage_status = if self.inner.storage_healthy.load(Ordering::SeqCst) {
878 HealthStatus::Healthy
879 } else {
880 HealthStatus::Unhealthy
881 };
882
883 let network_status = if self.inner.network_healthy.load(Ordering::SeqCst) {
884 HealthStatus::Healthy
885 } else {
886 HealthStatus::Unhealthy
887 };
888
889 let cluster_healthy = self.inner.cluster_healthy.load(Ordering::SeqCst);
890 let cluster_enabled = self.inner.cluster_enabled.load(Ordering::SeqCst);
891 let cluster_status = if cluster_enabled {
892 if cluster_healthy {
893 HealthStatus::Healthy
894 } else {
895 HealthStatus::Unhealthy
896 }
897 } else {
898 HealthStatus::Starting };
900
901 let cluster_message = if cluster_enabled {
902 if cluster_healthy {
903 "cluster active".to_string()
904 } else {
905 "cluster unhealthy".to_string()
906 }
907 } else {
908 "cluster disabled (standalone mode)".to_string()
909 };
910
911 vec![
912 ComponentHealth {
913 name: "storage".to_string(),
914 status: storage_status,
915 message: None,
916 last_check: now,
917 },
918 ComponentHealth {
919 name: "network".to_string(),
920 status: network_status,
921 message: None,
922 last_check: now,
923 },
924 ComponentHealth {
925 name: "cluster".to_string(),
926 status: cluster_status,
927 message: Some(cluster_message),
928 last_check: now,
929 },
930 ]
931 }
932}
933
934impl Default for HealthChecker {
935 fn default() -> Self {
936 Self::new()
937 }
938}
939
940pub struct HealthHttpServer {
953 checker: Arc<HealthChecker>,
954 bind_addr: SocketAddr,
955 shutdown: Arc<AtomicBool>,
956}
957
958pub struct HealthHttpHandle {
960 shutdown: Arc<AtomicBool>,
961 port: u16,
962 join_handle: tokio::task::JoinHandle<Result<(), std::io::Error>>,
963}
964
965impl HealthHttpHandle {
966 pub fn stop(&self) {
968 self.shutdown.store(true, Ordering::SeqCst);
969 }
970
971 pub fn port(&self) -> u16 {
973 self.port
974 }
975
976 pub async fn join(self) -> Result<(), std::io::Error> {
978 match self.join_handle.await {
979 Ok(inner) => inner,
980 Err(e) => Err(std::io::Error::other(e)),
981 }
982 }
983}
984
985impl HealthHttpServer {
986 pub fn new(checker: Arc<HealthChecker>, bind_addr: SocketAddr) -> Self {
990 Self {
991 checker,
992 bind_addr,
993 shutdown: Arc::new(AtomicBool::new(false)),
994 }
995 }
996
997 pub async fn start(self) -> Result<HealthHttpHandle, std::io::Error> {
1002 let listener = TcpListener::bind(self.bind_addr).await?;
1003 let local_addr = listener.local_addr()?;
1004 let port = local_addr.port();
1005 let shutdown = Arc::clone(&self.shutdown);
1006 let checker = Arc::clone(&self.checker);
1007
1008 let shutdown_flag = Arc::clone(&shutdown);
1009 let join_handle =
1010 tokio::spawn(async move { Self::accept_loop(listener, checker, shutdown_flag).await });
1011
1012 Ok(HealthHttpHandle {
1013 shutdown,
1014 port,
1015 join_handle,
1016 })
1017 }
1018
1019 async fn accept_loop(
1021 listener: TcpListener,
1022 checker: Arc<HealthChecker>,
1023 shutdown: Arc<AtomicBool>,
1024 ) -> Result<(), std::io::Error> {
1025 loop {
1026 if shutdown.load(Ordering::SeqCst) {
1027 debug!("health HTTP server shutting down");
1028 break;
1029 }
1030
1031 let accept_result =
1033 tokio::time::timeout(Duration::from_millis(200), listener.accept()).await;
1034
1035 match accept_result {
1036 Ok(Ok((stream, _addr))) => {
1037 let checker = Arc::clone(&checker);
1038 tokio::spawn(async move {
1039 if let Err(e) = Self::handle_connection(stream, &checker).await {
1040 warn!("health HTTP connection error: {e}");
1041 }
1042 });
1043 }
1044 Ok(Err(e)) => {
1045 warn!("health HTTP accept error: {e}");
1046 }
1047 Err(_) => {
1048 }
1050 }
1051 }
1052 Ok(())
1053 }
1054
1055 async fn handle_connection(
1057 mut stream: tokio::net::TcpStream,
1058 checker: &HealthChecker,
1059 ) -> Result<(), std::io::Error> {
1060 let mut buf = [0u8; 4096];
1061 let n = stream.read(&mut buf).await?;
1062 if n == 0 {
1063 return Ok(());
1064 }
1065
1066 let request = String::from_utf8_lossy(&buf[..n]);
1067 let (method, path) = Self::parse_request_line(&request);
1068
1069 let (status_code, status_text, body) = match method {
1070 "GET" => Self::route(path, checker),
1071 _ => (
1072 405,
1073 "Method Not Allowed",
1074 r#"{"error":"method not allowed"}"#.to_string(),
1075 ),
1076 };
1077
1078 let response = format!(
1079 "HTTP/1.1 {} {}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
1080 status_code,
1081 status_text,
1082 body.len(),
1083 body
1084 );
1085
1086 stream.write_all(response.as_bytes()).await?;
1087 stream.flush().await?;
1088 Ok(())
1089 }
1090
1091 fn parse_request_line(request: &str) -> (&str, &str) {
1094 let first_line = request.lines().next().unwrap_or("");
1095 let mut parts = first_line.split_whitespace();
1096 let method = parts.next().unwrap_or("");
1097 let path = parts.next().unwrap_or("");
1098 (method, path)
1099 }
1100
1101 fn route(path: &str, checker: &HealthChecker) -> (u16, &'static str, String) {
1103 match path {
1104 "/health" => Self::handle_health(checker),
1105 "/healthz" => Self::handle_healthz(checker),
1106 "/readyz" => Self::handle_readyz(checker),
1107 "/livez" => Self::handle_livez(checker),
1108 "/metrics" => Self::handle_metrics(checker),
1109 _ => (404, "Not Found", r#"{"error":"not found"}"#.to_string()),
1110 }
1111 }
1112
1113 fn handle_health(checker: &HealthChecker) -> (u16, &'static str, String) {
1115 let health = checker.get_health();
1116 let status_code = match health.status {
1117 HealthStatus::Healthy | HealthStatus::Degraded => 200,
1118 _ => 503,
1119 };
1120 let status_text = if status_code == 200 {
1121 "OK"
1122 } else {
1123 "Service Unavailable"
1124 };
1125 let body = serde_json::to_string(&health)
1126 .unwrap_or_else(|e| format!(r#"{{"error":"serialization failed: {e}"}}"#));
1127 (status_code, status_text, body)
1128 }
1129
1130 fn handle_healthz(checker: &HealthChecker) -> (u16, &'static str, String) {
1132 let alive = checker.is_alive();
1133 let status_code = if alive { 200 } else { 503 };
1134 let status_text = if alive { "OK" } else { "Service Unavailable" };
1135 let body = format!(r#"{{"alive":{alive}}}"#);
1136 (status_code, status_text, body)
1137 }
1138
1139 fn handle_readyz(checker: &HealthChecker) -> (u16, &'static str, String) {
1141 let ready = checker.is_ready();
1142 let status_code = if ready { 200 } else { 503 };
1143 let status_text = if ready { "OK" } else { "Service Unavailable" };
1144 let body = format!(r#"{{"ready":{ready}}}"#);
1145 (status_code, status_text, body)
1146 }
1147
1148 fn handle_livez(checker: &HealthChecker) -> (u16, &'static str, String) {
1150 let resp = checker.liveness_response();
1151 let status_code = if resp.alive { 200 } else { 503 };
1152 let status_text = if resp.alive {
1153 "OK"
1154 } else {
1155 "Service Unavailable"
1156 };
1157 let body = serde_json::to_string(&resp)
1158 .unwrap_or_else(|e| format!(r#"{{"error":"serialization failed: {e}"}}"#));
1159 (status_code, status_text, body)
1160 }
1161
1162 fn handle_metrics(checker: &HealthChecker) -> (u16, &'static str, String) {
1164 let history = checker.health_history();
1165 let uptime_percent = checker.uptime_percent();
1166 let uptime_seconds = checker.uptime_seconds();
1167
1168 #[derive(Serialize)]
1169 struct MetricsResponse {
1170 uptime_seconds: u64,
1171 uptime_percent: f64,
1172 history_count: usize,
1173 history: Vec<HealthSnapshot>,
1174 }
1175
1176 let resp = MetricsResponse {
1177 uptime_seconds,
1178 uptime_percent,
1179 history_count: history.len(),
1180 history,
1181 };
1182
1183 let body = serde_json::to_string(&resp)
1184 .unwrap_or_else(|e| format!(r#"{{"error":"serialization failed: {e}"}}"#));
1185 (200, "OK", body)
1186 }
1187}
1188
1189#[cfg(test)]
1194#[path = "health_tests.rs"]
1195mod tests;