Skip to main content

arcly_stream/
health.rs

1//! Liveness/readiness health checks.
2//!
3//! A host registers named [`HealthCheck`]s (a probe of the disk store, an
4//! upstream relay, the transcode pool, …) and exposes the aggregate over its own
5//! HTTP surface — the engine stays transport-free, mirroring `arcly-http`'s
6//! `HealthRegistry` pattern.
7
8use async_trait::async_trait;
9use dashmap::DashMap;
10use std::sync::Arc;
11use std::time::Instant;
12
13/// The health of one sub-system.
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum HealthStatus {
16    /// Fully operational.
17    Healthy,
18    /// Serving but impaired (reason attached).
19    Degraded(String),
20    /// Not serving (reason attached).
21    Unhealthy(String),
22}
23
24impl HealthStatus {
25    /// True only for [`HealthStatus::Healthy`].
26    pub fn is_healthy(&self) -> bool {
27        matches!(self, HealthStatus::Healthy)
28    }
29
30    /// Lowercase label (`"healthy"`, `"degraded"`, `"unhealthy"`).
31    pub fn label(&self) -> &'static str {
32        match self {
33            HealthStatus::Healthy => "healthy",
34            HealthStatus::Degraded(_) => "degraded",
35            HealthStatus::Unhealthy(_) => "unhealthy",
36        }
37    }
38}
39
40/// One named probe. Implementations should be cheap and non-blocking.
41#[async_trait]
42pub trait HealthCheck: Send + Sync + 'static {
43    /// Run the probe.
44    async fn check(&self) -> HealthStatus;
45}
46
47/// A built-in check that reports the live publisher count against a soft cap,
48/// degrading as it approaches the limit. Useful wired to an engine snapshot.
49pub struct CapacityCheck {
50    current: Arc<dyn Fn() -> usize + Send + Sync>,
51    soft_limit: usize,
52}
53
54impl CapacityCheck {
55    /// Probe `current()` against `soft_limit` (degraded ≥ 90%).
56    pub fn new(current: impl Fn() -> usize + Send + Sync + 'static, soft_limit: usize) -> Self {
57        Self {
58            current: Arc::new(current),
59            soft_limit: soft_limit.max(1),
60        }
61    }
62}
63
64#[async_trait]
65impl HealthCheck for CapacityCheck {
66    async fn check(&self) -> HealthStatus {
67        let n = (self.current)();
68        let pct = n * 100 / self.soft_limit;
69        if n >= self.soft_limit {
70            HealthStatus::Unhealthy(format!("at capacity: {n}/{}", self.soft_limit))
71        } else if pct >= 90 {
72            HealthStatus::Degraded(format!("near capacity: {n}/{}", self.soft_limit))
73        } else {
74            HealthStatus::Healthy
75        }
76    }
77}
78
79/// A registry of named health checks plus process uptime.
80#[derive(Clone)]
81pub struct HealthRegistry {
82    checks: Arc<DashMap<&'static str, Arc<dyn HealthCheck>>>,
83    started_at: Instant,
84}
85
86impl Default for HealthRegistry {
87    fn default() -> Self {
88        Self::new()
89    }
90}
91
92/// Aggregate health across all registered checks.
93#[derive(Debug, Clone)]
94pub struct HealthReport {
95    /// Worst status across all checks (overall readiness).
96    pub overall: HealthStatus,
97    /// Per-check results, by name.
98    pub checks: Vec<(&'static str, HealthStatus)>,
99    /// Process uptime in seconds.
100    pub uptime_secs: u64,
101}
102
103impl HealthRegistry {
104    /// An empty registry; uptime starts now.
105    pub fn new() -> Self {
106        Self {
107            checks: Arc::new(DashMap::new()),
108            started_at: Instant::now(),
109        }
110    }
111
112    /// Register (or replace) a named check.
113    pub fn register(&self, name: &'static str, check: impl HealthCheck) {
114        self.checks.insert(name, Arc::new(check));
115    }
116
117    /// Run every check and aggregate. `overall` is the worst observed status
118    /// (Unhealthy ≻ Degraded ≻ Healthy).
119    pub async fn run_all(&self) -> HealthReport {
120        let mut results = Vec::with_capacity(self.checks.len());
121        let mut overall = HealthStatus::Healthy;
122        // Snapshot first so we don't hold DashMap guards across `.await`.
123        let entries: Vec<(&'static str, Arc<dyn HealthCheck>)> = self
124            .checks
125            .iter()
126            .map(|r| (*r.key(), Arc::clone(r.value())))
127            .collect();
128        for (name, check) in entries {
129            let status = check.check().await;
130            overall = worst(overall, &status);
131            results.push((name, status));
132        }
133        results.sort_by_key(|(n, _)| *n);
134        HealthReport {
135            overall,
136            checks: results,
137            uptime_secs: self.started_at.elapsed().as_secs(),
138        }
139    }
140}
141
142fn worst(acc: HealthStatus, next: &HealthStatus) -> HealthStatus {
143    fn rank(s: &HealthStatus) -> u8 {
144        match s {
145            HealthStatus::Healthy => 0,
146            HealthStatus::Degraded(_) => 1,
147            HealthStatus::Unhealthy(_) => 2,
148        }
149    }
150    if rank(next) > rank(&acc) {
151        next.clone()
152    } else {
153        acc
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use std::sync::atomic::{AtomicUsize, Ordering};
161
162    struct Fixed(HealthStatus);
163    #[async_trait]
164    impl HealthCheck for Fixed {
165        async fn check(&self) -> HealthStatus {
166            self.0.clone()
167        }
168    }
169
170    #[tokio::test]
171    async fn aggregates_to_worst_status() {
172        let reg = HealthRegistry::new();
173        reg.register("a", Fixed(HealthStatus::Healthy));
174        reg.register("b", Fixed(HealthStatus::Degraded("warm".into())));
175        let report = reg.run_all().await;
176        assert_eq!(report.overall, HealthStatus::Degraded("warm".into()));
177        assert_eq!(report.checks.len(), 2);
178
179        reg.register("c", Fixed(HealthStatus::Unhealthy("down".into())));
180        assert_eq!(
181            reg.run_all().await.overall,
182            HealthStatus::Unhealthy("down".into())
183        );
184    }
185
186    #[tokio::test]
187    async fn capacity_check_degrades_then_fails() {
188        let n = Arc::new(AtomicUsize::new(0));
189        let n2 = Arc::clone(&n);
190        let check = CapacityCheck::new(move || n2.load(Ordering::Relaxed), 10);
191        assert!(check.check().await.is_healthy());
192        n.store(9, Ordering::Relaxed);
193        assert!(matches!(check.check().await, HealthStatus::Degraded(_)));
194        n.store(10, Ordering::Relaxed);
195        assert!(matches!(check.check().await, HealthStatus::Unhealthy(_)));
196    }
197}