Skip to main content

heldar_kernel/services/
health.rs

1//! Health monitor: downgrades cameras that claim to be recording but have stopped producing
2//! segments (a stalled-but-connected stream), emitting an event on the transition.
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use chrono::{DateTime, Utc};
8use serde_json::json;
9use sqlx::SqlitePool;
10
11use crate::config::Config;
12use crate::repo;
13
14pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
15    let mut tick = tokio::time::interval(Duration::from_secs(cfg.health_interval_s.max(5)));
16    loop {
17        tick.tick().await;
18        if let Err(e) = check_once(&pool).await {
19            tracing::error!(error = %e, "health: check failed");
20        }
21    }
22}
23
24/// (camera_id, last_segment_at, last_started_at, segment_seconds)
25type StaleRow = (String, Option<DateTime<Utc>>, Option<DateTime<Utc>>, i64);
26
27async fn check_once(pool: &SqlitePool) -> anyhow::Result<()> {
28    let rows: Vec<StaleRow> = sqlx::query_as(
29        "SELECT cs.camera_id, cs.last_segment_at, cs.last_started_at, c.segment_seconds
30         FROM camera_status cs
31         JOIN cameras c ON c.id = cs.camera_id
32         WHERE cs.state = 'recording'",
33    )
34    .fetch_all(pool)
35    .await?;
36
37    let now = Utc::now();
38    for (camera_id, last_seg, last_start, seg_s) in rows {
39        let threshold = (seg_s.max(10) * 3).max(30);
40        let seg_age = last_seg.map(|t| (now - t).num_seconds());
41        let start_age = last_start.map(|t| (now - t).num_seconds());
42
43        let recent_segment = seg_age.map(|a| a <= threshold).unwrap_or(false);
44        let recently_started = start_age.map(|a| a <= threshold).unwrap_or(false);
45        if recent_segment || recently_started {
46            continue;
47        }
48
49        let msg = format!("no segments for >{threshold}s while recording");
50        let _ = repo::set_state(pool, &camera_id, "error", Some(&msg)).await;
51        let _ = repo::log_event(
52            pool,
53            Some(&camera_id),
54            "recorder_error",
55            "warning",
56            json!({ "reason": "stale", "threshold_seconds": threshold, "last_segment_age_s": seg_age }),
57        )
58        .await;
59        tracing::warn!(%camera_id, threshold, "health: camera stale, marked error");
60    }
61    Ok(())
62}