arcly-http 0.4.0

Enterprise-grade NestJS-inspired web framework on axum: zero-lock DI, declarative controllers, multi-tenant data routing, transactional outbox, ABAC, and a self-documenting OpenAPI surface
Documentation
//! Health-check registry and `/healthz` / `/readyz` route handlers.
//!
//! Register checks from any plugin or application code via `global().register(...)`.
//! The plugin calls `add_get("/healthz", ...)` and `add_get("/readyz", ...)` to
//! expose them, both backed by `global().run_all()`.

use std::collections::BTreeMap;
use std::sync::{Arc, OnceLock};
use std::time::Instant;

use axum::response::Response;
use dashmap::DashMap;
use futures::future::BoxFuture;
use serde::Serialize;

// ─── Public surface ──────────────────────────────────────────────────────────

#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum HealthStatus {
    Healthy,
    Degraded(String),
    Unhealthy(String),
}

/// One named sub-system health probe.
pub trait HealthCheck: Send + Sync + 'static {
    fn check(&self) -> BoxFuture<'_, HealthStatus>;
}

/// Process-wide health-check registry. Accessible via `global()`.
pub struct HealthRegistry {
    checks: DashMap<&'static str, Arc<dyn HealthCheck>>,
    started_at: Instant,
}

impl HealthRegistry {
    fn new() -> Self {
        Self {
            checks: DashMap::new(),
            started_at: Instant::now(),
        }
    }

    /// Register a named health check. Idempotent — re-registration replaces the
    /// previous check for that name.
    pub fn register(&self, name: &'static str, check: impl HealthCheck) {
        self.checks.insert(name, Arc::new(check));
    }

    /// Run all registered checks concurrently and return a map of their results.
    pub async fn run_all(&self) -> BTreeMap<&'static str, HealthStatus> {
        let mut results = BTreeMap::new();
        // Collect futures first so we don't hold the DashMap ref across await.
        let futs: Vec<(&'static str, Arc<dyn HealthCheck>)> = self
            .checks
            .iter()
            .map(|e| (*e.key(), Arc::clone(e.value())))
            .collect();

        for (name, check) in futs {
            // A hung dependency must degrade the probe, not hang it: probe
            // handlers feed K8s with short periods, so each check gets a
            // bounded slice of time.
            let status = match tokio::time::timeout(CHECK_TIMEOUT, check.check()).await {
                Ok(s) => s,
                Err(_) => HealthStatus::Degraded("health check timed out".into()),
            };
            results.insert(name, status);
        }
        results
    }

    pub fn uptime_secs(&self) -> u64 {
        self.started_at.elapsed().as_secs()
    }
}

/// The one process-wide health registry. Initialized on first access.
pub fn global() -> &'static HealthRegistry {
    static REGISTRY: OnceLock<HealthRegistry> = OnceLock::new();
    REGISTRY.get_or_init(HealthRegistry::new)
}

/// Per-check budget inside the probe handlers.
const CHECK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2);

/// Flipped by the launch path the moment a shutdown signal arrives —
/// **before** the HTTP drain starts. `/readyz` answers 503 from then on so
/// the load balancer stops routing new traffic to a pod that is going away,
/// while `/healthz` (liveness) stays green so the supervisor doesn't kill
/// the drain. One atomic; readers pay a single relaxed load.
static DRAINING: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false);

#[doc(hidden)]
pub fn set_draining(draining: bool) {
    DRAINING.store(draining, std::sync::atomic::Ordering::Release);
}

/// `true` once a shutdown signal has been received.
pub fn is_draining() -> bool {
    DRAINING.load(std::sync::atomic::Ordering::Acquire)
}

// ─── Route handlers ──────────────────────────────────────────────────────────

#[derive(Serialize)]
struct HealthResponse<'a> {
    status: &'a str,
    checks: BTreeMap<&'static str, serde_json::Value>,
    uptime_secs: u64,
}

async fn run_health_response() -> Response {
    let registry = global();
    let results = registry.run_all().await;

    let mut any_unhealthy = false;
    let mut any_degraded = false;
    let mut checks: BTreeMap<&'static str, serde_json::Value> = BTreeMap::new();
    for (name, status) in &results {
        match status {
            HealthStatus::Unhealthy(_) => any_unhealthy = true,
            HealthStatus::Degraded(_) => any_degraded = true,
            HealthStatus::Healthy => {}
        }
        checks.insert(name, serde_json::to_value(status).unwrap_or_default());
    }

    let overall = if any_unhealthy {
        "unhealthy"
    } else if any_degraded {
        "degraded"
    } else {
        "healthy"
    };
    let http_status = if any_unhealthy { 503u16 } else { 200 };

    let body = serde_json::to_vec(&HealthResponse {
        status: overall,
        checks,
        uptime_secs: registry.uptime_secs(),
    })
    .unwrap_or_default();

    Response::builder()
        .status(http_status)
        .header("Content-Type", "application/json")
        .body(axum::body::Body::from(body))
        .unwrap()
}

fn probe_handler() -> impl Fn(crate::web::context::RequestContext) -> BoxFuture<'static, Response>
       + Send
       + Sync
       + Clone
       + 'static {
    |_ctx| Box::pin(run_health_response())
}

/// Handler for `GET /healthz` — liveness probe.
pub fn healthz_handler(
) -> impl Fn(crate::web::context::RequestContext) -> BoxFuture<'static, Response>
       + Send
       + Sync
       + Clone
       + 'static {
    probe_handler()
}

/// Handler for `GET /readyz` — readiness probe.
///
/// Unlike `/healthz`, readiness goes **503 the moment shutdown begins**
/// (see [`is_draining`]): the LB must stop sending new traffic while
/// in-flight requests drain, but liveness must stay green so the
/// supervisor doesn't kill the draining process.
pub fn readyz_handler(
) -> impl Fn(crate::web::context::RequestContext) -> BoxFuture<'static, Response>
       + Send
       + Sync
       + Clone
       + 'static {
    |_ctx| {
        Box::pin(async {
            if is_draining() {
                return Response::builder()
                    .status(503)
                    .header("Content-Type", "application/json")
                    .body(axum::body::Body::from(r#"{"status":"draining"}"#))
                    .expect("static draining response");
            }
            run_health_response().await
        })
    }
}