arcly_http_core/observability/
health.rs1use std::collections::BTreeMap;
8use std::sync::{Arc, OnceLock};
9use std::time::Instant;
10
11use crate::http::Response;
12use dashmap::DashMap;
13use futures::future::BoxFuture;
14use serde::Serialize;
15
16#[derive(Debug, Clone, Serialize)]
19#[serde(rename_all = "lowercase")]
20pub enum HealthStatus {
21 Healthy,
22 Degraded(String),
23 Unhealthy(String),
24}
25
26pub trait HealthCheck: Send + Sync + 'static {
28 fn check(&self) -> BoxFuture<'_, HealthStatus>;
29}
30
31pub struct HealthRegistry {
33 checks: DashMap<&'static str, Arc<dyn HealthCheck>>,
34 started_at: Instant,
35}
36
37impl HealthRegistry {
38 fn new() -> Self {
39 Self {
40 checks: DashMap::new(),
41 started_at: Instant::now(),
42 }
43 }
44
45 pub fn register(&self, name: &'static str, check: impl HealthCheck) {
48 self.checks.insert(name, Arc::new(check));
49 }
50
51 pub async fn run_all(&self) -> BTreeMap<&'static str, HealthStatus> {
53 let mut results = BTreeMap::new();
54 let futs: Vec<(&'static str, Arc<dyn HealthCheck>)> = self
56 .checks
57 .iter()
58 .map(|e| (*e.key(), Arc::clone(e.value())))
59 .collect();
60
61 for (name, check) in futs {
62 let status = match tokio::time::timeout(CHECK_TIMEOUT, check.check()).await {
66 Ok(s) => s,
67 Err(_) => HealthStatus::Degraded("health check timed out".into()),
68 };
69 results.insert(name, status);
70 }
71 results
72 }
73
74 pub fn uptime_secs(&self) -> u64 {
75 self.started_at.elapsed().as_secs()
76 }
77}
78
79pub fn global() -> &'static HealthRegistry {
81 static REGISTRY: OnceLock<HealthRegistry> = OnceLock::new();
82 REGISTRY.get_or_init(HealthRegistry::new)
83}
84
85const CHECK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2);
87
88static DRAINING: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
94
95#[doc(hidden)]
96pub fn set_draining(draining: bool) {
97 DRAINING.store(draining, std::sync::atomic::Ordering::Release);
98}
99
100pub fn is_draining() -> bool {
102 DRAINING.load(std::sync::atomic::Ordering::Acquire)
103}
104
105#[derive(Serialize)]
108struct HealthResponse<'a> {
109 status: &'a str,
110 checks: BTreeMap<&'static str, serde_json::Value>,
111 uptime_secs: u64,
112}
113
114async fn run_health_response() -> Response {
115 let registry = global();
116 let results = registry.run_all().await;
117
118 let mut any_unhealthy = false;
119 let mut any_degraded = false;
120 let mut checks: BTreeMap<&'static str, serde_json::Value> = BTreeMap::new();
121 for (name, status) in &results {
122 match status {
123 HealthStatus::Unhealthy(_) => any_unhealthy = true,
124 HealthStatus::Degraded(_) => any_degraded = true,
125 HealthStatus::Healthy => {}
126 }
127 checks.insert(name, serde_json::to_value(status).unwrap_or_default());
128 }
129
130 let overall = if any_unhealthy {
131 "unhealthy"
132 } else if any_degraded {
133 "degraded"
134 } else {
135 "healthy"
136 };
137 let http_status = if any_unhealthy { 503u16 } else { 200 };
138
139 let body = serde_json::to_vec(&HealthResponse {
140 status: overall,
141 checks,
142 uptime_secs: registry.uptime_secs(),
143 })
144 .unwrap_or_default();
145
146 Response::builder()
147 .status(http_status)
148 .header("Content-Type", "application/json")
149 .body(crate::http::Body::from(body))
150 .unwrap()
151}
152
153fn probe_handler() -> impl Fn(crate::web::context::RequestContext) -> BoxFuture<'static, Response>
154 + Send
155 + Sync
156 + Clone
157 + 'static {
158 |_ctx| Box::pin(run_health_response())
159}
160
161pub fn healthz_handler(
163) -> impl Fn(crate::web::context::RequestContext) -> BoxFuture<'static, Response>
164 + Send
165 + Sync
166 + Clone
167 + 'static {
168 probe_handler()
169}
170
171pub fn readyz_handler(
178) -> impl Fn(crate::web::context::RequestContext) -> BoxFuture<'static, Response>
179 + Send
180 + Sync
181 + Clone
182 + 'static {
183 |_ctx| {
184 Box::pin(async {
185 if is_draining() {
186 return Response::builder()
187 .status(503)
188 .header("Content-Type", "application/json")
189 .body(crate::http::Body::from(r#"{"status":"draining"}"#))
190 .expect("static draining response");
191 }
192 run_health_response().await
193 })
194 }
195}