Skip to main content

arcly_http_core/observability/
health.rs

1//! Health-check registry and `/healthz` / `/readyz` route handlers.
2//!
3//! Register checks from any plugin or application code via `global().register(...)`.
4//! The plugin calls `add_get("/healthz", ...)` and `add_get("/readyz", ...)` to
5//! expose them, both backed by `global().run_all()`.
6
7use std::collections::BTreeMap;
8use std::sync::{Arc, OnceLock};
9use std::time::Instant;
10
11use crate::http::Response;
12use dashmap::DashMap;
13use futures::future::BoxFuture;
14use serde::Serialize;
15
16// ─── Public surface ──────────────────────────────────────────────────────────
17
18#[derive(Debug, Clone, Serialize)]
19#[serde(rename_all = "lowercase")]
20pub enum HealthStatus {
21    Healthy,
22    Degraded(String),
23    Unhealthy(String),
24}
25
26/// One named sub-system health probe.
27pub trait HealthCheck: Send + Sync + 'static {
28    fn check(&self) -> BoxFuture<'_, HealthStatus>;
29}
30
31/// Process-wide health-check registry. Accessible via `global()`.
32pub struct HealthRegistry {
33    checks: DashMap<&'static str, Arc<dyn HealthCheck>>,
34    started_at: Instant,
35}
36
37impl HealthRegistry {
38    fn new() -> Self {
39        Self {
40            checks: DashMap::new(),
41            started_at: Instant::now(),
42        }
43    }
44
45    /// Register a named health check. Idempotent — re-registration replaces the
46    /// previous check for that name.
47    pub fn register(&self, name: &'static str, check: impl HealthCheck) {
48        self.checks.insert(name, Arc::new(check));
49    }
50
51    /// Run all registered checks concurrently and return a map of their results.
52    pub async fn run_all(&self) -> BTreeMap<&'static str, HealthStatus> {
53        let mut results = BTreeMap::new();
54        // Collect futures first so we don't hold the DashMap ref across await.
55        let futs: Vec<(&'static str, Arc<dyn HealthCheck>)> = self
56            .checks
57            .iter()
58            .map(|e| (*e.key(), Arc::clone(e.value())))
59            .collect();
60
61        for (name, check) in futs {
62            // A hung dependency must degrade the probe, not hang it: probe
63            // handlers feed K8s with short periods, so each check gets a
64            // bounded slice of time.
65            let status = match tokio::time::timeout(CHECK_TIMEOUT, check.check()).await {
66                Ok(s) => s,
67                Err(_) => HealthStatus::Degraded("health check timed out".into()),
68            };
69            results.insert(name, status);
70        }
71        results
72    }
73
74    pub fn uptime_secs(&self) -> u64 {
75        self.started_at.elapsed().as_secs()
76    }
77}
78
79/// The one process-wide health registry. Initialized on first access.
80pub fn global() -> &'static HealthRegistry {
81    static REGISTRY: OnceLock<HealthRegistry> = OnceLock::new();
82    REGISTRY.get_or_init(HealthRegistry::new)
83}
84
85/// Per-check budget inside the probe handlers.
86const CHECK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2);
87
88/// Flipped by the launch path the moment a shutdown signal arrives —
89/// **before** the HTTP drain starts. `/readyz` answers 503 from then on so
90/// the load balancer stops routing new traffic to a pod that is going away,
91/// while `/healthz` (liveness) stays green so the supervisor doesn't kill
92/// the drain. One atomic; readers pay a single relaxed load.
93static DRAINING: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);
94
95#[doc(hidden)]
96pub fn set_draining(draining: bool) {
97    DRAINING.store(draining, std::sync::atomic::Ordering::Release);
98}
99
100/// `true` once a shutdown signal has been received.
101pub fn is_draining() -> bool {
102    DRAINING.load(std::sync::atomic::Ordering::Acquire)
103}
104
105// ─── Route handlers ──────────────────────────────────────────────────────────
106
107#[derive(Serialize)]
108struct HealthResponse<'a> {
109    status: &'a str,
110    checks: BTreeMap<&'static str, serde_json::Value>,
111    uptime_secs: u64,
112}
113
114async fn run_health_response() -> Response {
115    let registry = global();
116    let results = registry.run_all().await;
117
118    let mut any_unhealthy = false;
119    let mut any_degraded = false;
120    let mut checks: BTreeMap<&'static str, serde_json::Value> = BTreeMap::new();
121    for (name, status) in &results {
122        match status {
123            HealthStatus::Unhealthy(_) => any_unhealthy = true,
124            HealthStatus::Degraded(_) => any_degraded = true,
125            HealthStatus::Healthy => {}
126        }
127        checks.insert(name, serde_json::to_value(status).unwrap_or_default());
128    }
129
130    let overall = if any_unhealthy {
131        "unhealthy"
132    } else if any_degraded {
133        "degraded"
134    } else {
135        "healthy"
136    };
137    let http_status = if any_unhealthy { 503u16 } else { 200 };
138
139    let body = serde_json::to_vec(&HealthResponse {
140        status: overall,
141        checks,
142        uptime_secs: registry.uptime_secs(),
143    })
144    .unwrap_or_default();
145
146    Response::builder()
147        .status(http_status)
148        .header("Content-Type", "application/json")
149        .body(crate::http::Body::from(body))
150        .unwrap()
151}
152
153fn probe_handler() -> impl Fn(crate::web::context::RequestContext) -> BoxFuture<'static, Response>
154       + Send
155       + Sync
156       + Clone
157       + 'static {
158    |_ctx| Box::pin(run_health_response())
159}
160
161/// Handler for `GET /healthz` — liveness probe.
162pub fn healthz_handler(
163) -> impl Fn(crate::web::context::RequestContext) -> BoxFuture<'static, Response>
164       + Send
165       + Sync
166       + Clone
167       + 'static {
168    probe_handler()
169}
170
171/// Handler for `GET /readyz` — readiness probe.
172///
173/// Unlike `/healthz`, readiness goes **503 the moment shutdown begins**
174/// (see [`is_draining`]): the LB must stop sending new traffic while
175/// in-flight requests drain, but liveness must stay green so the
176/// supervisor doesn't kill the draining process.
177pub fn readyz_handler(
178) -> impl Fn(crate::web::context::RequestContext) -> BoxFuture<'static, Response>
179       + Send
180       + Sync
181       + Clone
182       + 'static {
183    |_ctx| {
184        Box::pin(async {
185            if is_draining() {
186                return Response::builder()
187                    .status(503)
188                    .header("Content-Type", "application/json")
189                    .body(crate::http::Body::from(r#"{"status":"draining"}"#))
190                    .expect("static draining response");
191            }
192            run_health_response().await
193        })
194    }
195}