arcly-stream 0.1.5

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! Liveness/readiness health checks.
//!
//! A host registers named [`HealthCheck`]s (a probe of the disk store, an
//! upstream relay, the transcode pool, …) and exposes the aggregate over its own
//! HTTP surface — the engine stays transport-free, mirroring `arcly-http`'s
//! `HealthRegistry` pattern.

use async_trait::async_trait;
use dashmap::DashMap;
use std::sync::Arc;
use std::time::Instant;

/// The health of one sub-system.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum HealthStatus {
    /// Fully operational.
    Healthy,
    /// Serving but impaired (reason attached).
    Degraded(String),
    /// Not serving (reason attached).
    Unhealthy(String),
}

impl HealthStatus {
    /// True only for [`HealthStatus::Healthy`].
    pub fn is_healthy(&self) -> bool {
        matches!(self, HealthStatus::Healthy)
    }

    /// Lowercase label (`"healthy"`, `"degraded"`, `"unhealthy"`).
    pub fn label(&self) -> &'static str {
        match self {
            HealthStatus::Healthy => "healthy",
            HealthStatus::Degraded(_) => "degraded",
            HealthStatus::Unhealthy(_) => "unhealthy",
        }
    }
}

/// One named probe. Implementations should be cheap and non-blocking.
#[async_trait]
pub trait HealthCheck: Send + Sync + 'static {
    /// Run the probe.
    async fn check(&self) -> HealthStatus;
}

/// A built-in check that reports the live publisher count against a soft cap,
/// degrading as it approaches the limit. Useful wired to an engine snapshot.
pub struct CapacityCheck {
    current: Arc<dyn Fn() -> usize + Send + Sync>,
    soft_limit: usize,
}

impl CapacityCheck {
    /// Probe `current()` against `soft_limit` (degraded ≥ 90%).
    pub fn new(current: impl Fn() -> usize + Send + Sync + 'static, soft_limit: usize) -> Self {
        Self {
            current: Arc::new(current),
            soft_limit: soft_limit.max(1),
        }
    }
}

#[async_trait]
impl HealthCheck for CapacityCheck {
    async fn check(&self) -> HealthStatus {
        let n = (self.current)();
        let pct = n * 100 / self.soft_limit;
        if n >= self.soft_limit {
            HealthStatus::Unhealthy(format!("at capacity: {n}/{}", self.soft_limit))
        } else if pct >= 90 {
            HealthStatus::Degraded(format!("near capacity: {n}/{}", self.soft_limit))
        } else {
            HealthStatus::Healthy
        }
    }
}

/// A registry of named health checks plus process uptime.
#[derive(Clone)]
pub struct HealthRegistry {
    checks: Arc<DashMap<&'static str, Arc<dyn HealthCheck>>>,
    started_at: Instant,
}

impl Default for HealthRegistry {
    fn default() -> Self {
        Self::new()
    }
}

/// Aggregate health across all registered checks.
#[derive(Debug, Clone)]
pub struct HealthReport {
    /// Worst status across all checks (overall readiness).
    pub overall: HealthStatus,
    /// Per-check results, by name.
    pub checks: Vec<(&'static str, HealthStatus)>,
    /// Process uptime in seconds.
    pub uptime_secs: u64,
}

impl HealthRegistry {
    /// An empty registry; uptime starts now.
    pub fn new() -> Self {
        Self {
            checks: Arc::new(DashMap::new()),
            started_at: Instant::now(),
        }
    }

    /// Register (or replace) a named check.
    pub fn register(&self, name: &'static str, check: impl HealthCheck) {
        self.checks.insert(name, Arc::new(check));
    }

    /// Run every check and aggregate. `overall` is the worst observed status
    /// (Unhealthy ≻ Degraded ≻ Healthy).
    pub async fn run_all(&self) -> HealthReport {
        let mut results = Vec::with_capacity(self.checks.len());
        let mut overall = HealthStatus::Healthy;
        // Snapshot first so we don't hold DashMap guards across `.await`.
        let entries: Vec<(&'static str, Arc<dyn HealthCheck>)> = self
            .checks
            .iter()
            .map(|r| (*r.key(), Arc::clone(r.value())))
            .collect();
        for (name, check) in entries {
            let status = check.check().await;
            overall = worst(overall, &status);
            results.push((name, status));
        }
        results.sort_by_key(|(n, _)| *n);
        HealthReport {
            overall,
            checks: results,
            uptime_secs: self.started_at.elapsed().as_secs(),
        }
    }
}

fn worst(acc: HealthStatus, next: &HealthStatus) -> HealthStatus {
    fn rank(s: &HealthStatus) -> u8 {
        match s {
            HealthStatus::Healthy => 0,
            HealthStatus::Degraded(_) => 1,
            HealthStatus::Unhealthy(_) => 2,
        }
    }
    if rank(next) > rank(&acc) {
        next.clone()
    } else {
        acc
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::atomic::{AtomicUsize, Ordering};

    struct Fixed(HealthStatus);
    #[async_trait]
    impl HealthCheck for Fixed {
        async fn check(&self) -> HealthStatus {
            self.0.clone()
        }
    }

    #[tokio::test]
    async fn aggregates_to_worst_status() {
        let reg = HealthRegistry::new();
        reg.register("a", Fixed(HealthStatus::Healthy));
        reg.register("b", Fixed(HealthStatus::Degraded("warm".into())));
        let report = reg.run_all().await;
        assert_eq!(report.overall, HealthStatus::Degraded("warm".into()));
        assert_eq!(report.checks.len(), 2);

        reg.register("c", Fixed(HealthStatus::Unhealthy("down".into())));
        assert_eq!(
            reg.run_all().await.overall,
            HealthStatus::Unhealthy("down".into())
        );
    }

    #[tokio::test]
    async fn capacity_check_degrades_then_fails() {
        let n = Arc::new(AtomicUsize::new(0));
        let n2 = Arc::clone(&n);
        let check = CapacityCheck::new(move || n2.load(Ordering::Relaxed), 10);
        assert!(check.check().await.is_healthy());
        n.store(9, Ordering::Relaxed);
        assert!(matches!(check.check().await, HealthStatus::Degraded(_)));
        n.store(10, Ordering::Relaxed);
        assert!(matches!(check.check().await, HealthStatus::Unhealthy(_)));
    }
}