Skip to main content

nidus_http/
health.rs

1//! Health and readiness registry helpers.
2
3use std::{collections::BTreeMap, future::Future, pin::Pin, sync::Arc, time::Duration};
4
5use axum::{Json, Router, response::IntoResponse, routing::get};
6use http::StatusCode;
7use serde::Serialize;
8use tokio::time::timeout;
9
10type HealthFuture = Pin<Box<dyn Future<Output = HealthStatus> + Send>>;
11type HealthCheck = Arc<dyn Fn() -> HealthFuture + Send + Sync>;
12
13/// Result of a liveness or readiness check.
14///
15/// Return [`HealthStatus::up`] for healthy dependencies and
16/// [`HealthStatus::down`] with a safe diagnostic message for unhealthy ones.
17/// Messages are included in health JSON by default and can be suppressed with
18/// [`HealthRegistry::hide_details`].
19#[derive(Clone, Debug, Eq, PartialEq)]
20pub struct HealthStatus {
21    status: HealthState,
22    message: Option<String>,
23}
24
25impl HealthStatus {
26    /// Creates an up health status.
27    pub fn up() -> Self {
28        Self {
29            status: HealthState::Up,
30            message: None,
31        }
32    }
33
34    /// Creates a down health status with a safe diagnostic message.
35    ///
36    /// Keep the message operational and non-sensitive because it is exposed in
37    /// response bodies unless the registry uses [`HealthRegistry::hide_details`].
38    pub fn down(message: impl Into<String>) -> Self {
39        Self {
40            status: HealthState::Down,
41            message: Some(message.into()),
42        }
43    }
44
45    /// Returns whether the check is up.
46    pub const fn is_up(&self) -> bool {
47        matches!(self.status, HealthState::Up)
48    }
49}
50
51/// Health check state.
52#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)]
53#[serde(rename_all = "lowercase")]
54pub enum HealthState {
55    /// The dependency is healthy.
56    Up,
57    /// The dependency is unhealthy.
58    Down,
59}
60
61/// Registry for liveness and readiness checks.
62///
63/// The registry produces two routes: `/health/live` and `/health/ready`.
64/// With no registered checks, each route returns `200 OK` and
65/// `{ "status": "up", "checks": {} }`. When any check returns down or times
66/// out, the route returns `503 Service Unavailable`.
67///
68/// Checks are in-process async closures; this helper does not provide service
69/// discovery or external health storage.
70///
71/// ```
72/// use std::time::Duration;
73/// use nidus_http::health::{HealthRegistry, HealthStatus};
74///
75/// let health = HealthRegistry::new()
76///     .ready_check_sync("database", || HealthStatus::up())
77///     .live_check("worker", || async { HealthStatus::up() })
78///     .timeout(Duration::from_secs(1));
79///
80/// let routes = health.routes();
81/// # let _: axum::Router = routes;
82/// ```
83#[derive(Clone)]
84pub struct HealthRegistry {
85    live_checks: Vec<NamedHealthCheck>,
86    ready_checks: Vec<NamedHealthCheck>,
87    timeout: Duration,
88    expose_details: bool,
89}
90
91impl HealthRegistry {
92    /// Creates a registry with always-up live/ready routes and no dependencies.
93    ///
94    /// The default per-check timeout is two seconds and diagnostic messages are
95    /// exposed in responses.
96    pub fn new() -> Self {
97        Self {
98            live_checks: Vec::new(),
99            ready_checks: Vec::new(),
100            timeout: Duration::from_secs(2),
101            expose_details: true,
102        }
103    }
104
105    /// Adds a liveness check.
106    ///
107    /// Liveness checks should answer "should this process be restarted?" and
108    /// usually avoid dependencies that can recover independently.
109    pub fn live_check<F, Fut>(mut self, name: impl Into<String>, check: F) -> Self
110    where
111        F: Fn() -> Fut + Send + Sync + 'static,
112        Fut: Future<Output = HealthStatus> + Send + 'static,
113    {
114        self.live_checks.push(NamedHealthCheck::new(name, check));
115        self
116    }
117
118    /// Adds a synchronous liveness check.
119    pub fn live_check_sync<F>(self, name: impl Into<String>, check: F) -> Self
120    where
121        F: Fn() -> HealthStatus + Send + Sync + 'static,
122    {
123        self.live_check(name, move || {
124            let status = check();
125            async move { status }
126        })
127    }
128
129    /// Adds a readiness check.
130    ///
131    /// Readiness checks should answer "can this process serve traffic now?" and
132    /// commonly include database, queue, or cache dependencies.
133    pub fn ready_check<F, Fut>(mut self, name: impl Into<String>, check: F) -> Self
134    where
135        F: Fn() -> Fut + Send + Sync + 'static,
136        Fut: Future<Output = HealthStatus> + Send + 'static,
137    {
138        self.ready_checks.push(NamedHealthCheck::new(name, check));
139        self
140    }
141
142    /// Adds a synchronous readiness check.
143    pub fn ready_check_sync<F>(self, name: impl Into<String>, check: F) -> Self
144    where
145        F: Fn() -> HealthStatus + Send + Sync + 'static,
146    {
147        self.ready_check(name, move || {
148            let status = check();
149            async move { status }
150        })
151    }
152
153    /// Sets the timeout for each health check.
154    ///
155    /// A timed-out check is reported as down with `check timed out`.
156    pub fn timeout(mut self, timeout_duration: Duration) -> Self {
157        self.timeout = timeout_duration;
158        self
159    }
160
161    /// Hides diagnostic messages from health response bodies.
162    ///
163    /// Status values and check names remain visible; only per-check messages are
164    /// omitted.
165    pub fn hide_details(mut self) -> Self {
166        self.expose_details = false;
167        self
168    }
169
170    /// Returns Axum routes for `/health/live` and `/health/ready`.
171    pub fn routes(self) -> Router {
172        let live = self.clone();
173        let ready = self;
174        Router::new()
175            .route("/health/live", get(move || live.clone().run_live()))
176            .route("/health/ready", get(move || ready.clone().run_ready()))
177    }
178
179    async fn run_live(self) -> axum::response::Response {
180        let checks = self.live_checks.clone();
181        self.run_checks(checks).await.into_response()
182    }
183
184    async fn run_ready(self) -> axum::response::Response {
185        let checks = self.ready_checks.clone();
186        self.run_checks(checks).await.into_response()
187    }
188
189    async fn run_checks(self, checks: Vec<NamedHealthCheck>) -> (StatusCode, Json<HealthBody>) {
190        if checks.is_empty() {
191            return (
192                StatusCode::OK,
193                Json(HealthBody {
194                    status: HealthState::Up,
195                    checks: BTreeMap::new(),
196                }),
197            );
198        }
199
200        let mut handles = Vec::with_capacity(checks.len());
201        for check in checks {
202            let timeout_duration = self.timeout;
203            let name = check.name.clone();
204            let handle = tokio::spawn(async move {
205                let result = timeout(timeout_duration, (check.check)()).await;
206                let status = result.unwrap_or_else(|_| HealthStatus::down("check timed out"));
207                (check.name, status)
208            });
209            handles.push((name, handle));
210        }
211
212        let mut body_checks = BTreeMap::new();
213        let mut all_up = true;
214        for (name, handle) in handles {
215            let (name, status) = match handle.await {
216                Ok(result) => result,
217                Err(error) => {
218                    let message = if error.is_panic() {
219                        "check panicked"
220                    } else {
221                        "check join failed"
222                    };
223                    (name, HealthStatus::down(message))
224                }
225            };
226            all_up &= status.is_up();
227            body_checks.insert(
228                name,
229                HealthCheckBody {
230                    status: status.status,
231                    message: if self.expose_details {
232                        status.message
233                    } else {
234                        None
235                    },
236                },
237            );
238        }
239
240        let status = if all_up {
241            StatusCode::OK
242        } else {
243            StatusCode::SERVICE_UNAVAILABLE
244        };
245        (
246            status,
247            Json(HealthBody {
248                status: if all_up {
249                    HealthState::Up
250                } else {
251                    HealthState::Down
252                },
253                checks: body_checks,
254            }),
255        )
256    }
257}
258
259impl Default for HealthRegistry {
260    fn default() -> Self {
261        Self::new()
262    }
263}
264
265#[derive(Clone)]
266struct NamedHealthCheck {
267    name: String,
268    check: HealthCheck,
269}
270
271impl NamedHealthCheck {
272    fn new<F, Fut>(name: impl Into<String>, check: F) -> Self
273    where
274        F: Fn() -> Fut + Send + Sync + 'static,
275        Fut: Future<Output = HealthStatus> + Send + 'static,
276    {
277        Self {
278            name: name.into(),
279            check: Arc::new(move || Box::pin(check())),
280        }
281    }
282}
283
284#[derive(Debug, Serialize)]
285struct HealthBody {
286    status: HealthState,
287    checks: BTreeMap<String, HealthCheckBody>,
288}
289
290#[derive(Debug, Serialize)]
291struct HealthCheckBody {
292    status: HealthState,
293    #[serde(skip_serializing_if = "Option::is_none")]
294    message: Option<String>,
295}