Skip to main content

heldar_kernel/services/
anr.rs

1//! ANR (Automatic Network Replenishment) edge re-fill.
2//!
3//! Many IP cameras keep their own onboard SD/NVR recording that survives a network outage. When the
4//! kernel loses the live RTSP stream the indexer records a [`recording_gaps`](crate::models::RecordingGap)
5//! row; this loop (spawned from `main` only when `HELDAR_ANR_ENABLED`) tries to re-fetch the missed
6//! footage from the camera's onboard storage by recording its REPLAY stream into the camera's normal
7//! recordings dir (so the indexer then folds it back into the timeline).
8//!
9//! It is BEST-EFFORT and CAMERA-DEPENDENT: it only works when the camera retained the footage and
10//! exposes a replay/playback endpoint. The replay URL comes from the per-camera
11//! `anr_replay_url_template` (or the default Hikvision RTSP playback endpoint) — see
12//! [`crate::camera_url::anr_replay_url`]. The re-filled file is named by the gap's START time (UTC
13//! strftime, matching the recorder) so it lands at the right place on the timeline rather than "now".
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use chrono::Utc;
19use sqlx::SqlitePool;
20use tokio::process::Command;
21
22use crate::camera_url;
23use crate::config::Config;
24use crate::models::{Camera, RecordingGap};
25
26/// Upper bound on a single fill ffmpeg job regardless of gap length (a stuck replay can't wedge it).
27const MAX_FILL_TIMEOUT_S: u64 = 4 * 3600;
28/// How many pending gaps a single sweep attempts (keeps each tick bounded).
29const MAX_GAPS_PER_SWEEP: i64 = 20;
30
31pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
32    if !cfg.anr_enabled {
33        tracing::info!("anr: disabled (HELDAR_ANR_ENABLED=false)");
34        return;
35    }
36    let interval_s = cfg.anr_interval_s.max(10);
37    tracing::info!(
38        interval_s,
39        max_gap_hours = cfg.anr_max_gap_hours,
40        max_attempts = cfg.anr_max_attempts,
41        "anr: edge re-fill started"
42    );
43    let mut tick = tokio::time::interval(Duration::from_secs(interval_s));
44    loop {
45        tick.tick().await;
46        if let Err(e) = sweep(&pool, &cfg).await {
47            tracing::error!(error = %e, "anr: sweep failed");
48        }
49    }
50}
51
52/// Pick pending gaps (young enough, attempts left, on ANR-enabled cameras) and try to fill each.
53async fn sweep(pool: &SqlitePool, cfg: &Config) -> anyhow::Result<()> {
54    let max_attempts = cfg.anr_max_attempts.max(1);
55    let cutoff = Utc::now() - chrono::Duration::hours(cfg.anr_max_gap_hours.max(1));
56    let gaps: Vec<RecordingGap> = sqlx::query_as::<_, RecordingGap>(
57        "SELECT g.* FROM recording_gaps g
58           JOIN cameras c ON c.id = g.camera_id
59          WHERE g.fill_state = 'pending'
60            AND g.fill_attempts < ?
61            AND g.gap_start >= ?
62            AND c.anr_enabled = 1
63          ORDER BY g.gap_start DESC
64          LIMIT ?",
65    )
66    .bind(max_attempts)
67    .bind(cutoff)
68    .bind(MAX_GAPS_PER_SWEEP)
69    .fetch_all(pool)
70    .await?;
71
72    for gap in gaps {
73        let cam = match sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
74            .bind(&gap.camera_id)
75            .fetch_optional(pool)
76            .await?
77        {
78            Some(c) => c,
79            None => continue, // camera vanished mid-sweep
80        };
81        match fill_gap(cfg, &cam, &gap).await {
82            Ok(()) => {
83                tracing::info!(camera = %gap.camera_id, gap = %gap.id, "anr: gap re-filled from camera storage");
84                mark_filled(pool, &gap.id).await;
85            }
86            Err(e) => {
87                tracing::warn!(camera = %gap.camera_id, gap = %gap.id, error = %e, "anr: fill attempt failed");
88                bump_attempt(pool, &gap.id, max_attempts).await;
89            }
90        }
91    }
92    Ok(())
93}
94
95/// Record the camera's replay stream for the gap window into the recordings dir, named by gap start.
96async fn fill_gap(cfg: &Config, cam: &Camera, gap: &RecordingGap) -> anyhow::Result<()> {
97    let url = camera_url::anr_replay_url(cam, gap.gap_start, gap.gap_end).ok_or_else(|| {
98        anyhow::anyhow!(
99            "no replay URL: set anr_replay_url_template, or address+credentials for the default"
100        )
101    })?;
102
103    let dir = cfg.camera_recordings_dir(&cam.id);
104    tokio::fs::create_dir_all(&dir)
105        .await
106        .map_err(|e| anyhow::anyhow!("creating recordings dir: {e}"))?;
107
108    // Name by the gap's START (UTC strftime, like the recorder) so the indexer places it correctly.
109    let fname = format!("{}.mp4", gap.gap_start.format("%Y%m%d_%H%M%S"));
110    let final_path = dir.join(&fname);
111    if tokio::fs::try_exists(&final_path).await.unwrap_or(false) {
112        // Already present (a prior fill, or the recorder named a file at the same second): done.
113        return Ok(());
114    }
115    // Write to a non-`.mp4` temp path so the indexer ignores it until the fill completes, then rename.
116    let part_path = dir.join(format!("{fname}.part"));
117    let _ = tokio::fs::remove_file(&part_path).await;
118
119    let dur = gap.gap_seconds.max(1) as u64;
120    let audio_args: &[&str] = if cam.record_audio {
121        &["-c:a", "copy"]
122    } else {
123        &["-an"]
124    };
125    let mut cmd = Command::new(&cfg.ffmpeg_bin);
126    cmd.kill_on_drop(true)
127        .env("TZ", "UTC")
128        .args(["-nostdin", "-hide_banner", "-loglevel", "warning"])
129        .args(["-rtsp_transport", "tcp"])
130        .args(["-timeout", "15000000"]) // 15s RTSP socket I/O timeout -> exit on stall
131        .args(["-i", &url])
132        .args(["-t", &dur.to_string()]) // bound to the gap length (safety cap vs replay end)
133        .args(["-c", "copy"]) // stream-copy (no decode)
134        .args(audio_args)
135        .args(["-movflags", "+faststart"])
136        .arg(&part_path)
137        .stdin(std::process::Stdio::null())
138        .stdout(std::process::Stdio::null())
139        .stderr(std::process::Stdio::piped());
140
141    let job_timeout = Duration::from_secs((dur + 60).min(MAX_FILL_TIMEOUT_S));
142    let outcome = tokio::time::timeout(job_timeout, cmd.output()).await;
143
144    let out = match outcome {
145        Err(_) => {
146            let _ = tokio::fs::remove_file(&part_path).await;
147            anyhow::bail!("replay capture timed out after {}s", job_timeout.as_secs());
148        }
149        Ok(Err(e)) => {
150            let _ = tokio::fs::remove_file(&part_path).await;
151            anyhow::bail!("spawning ffmpeg: {e}");
152        }
153        Ok(Ok(out)) => out,
154    };
155    if !out.status.success() {
156        let err = camera_url::mask_url(String::from_utf8_lossy(&out.stderr).trim());
157        let _ = tokio::fs::remove_file(&part_path).await;
158        anyhow::bail!("ffmpeg replay failed: {err}");
159    }
160
161    let size = tokio::fs::metadata(&part_path)
162        .await
163        .map(|m| m.len())
164        .unwrap_or(0);
165    if size == 0 {
166        let _ = tokio::fs::remove_file(&part_path).await;
167        anyhow::bail!(
168            "replay produced an empty file (camera likely has no footage for this window)"
169        );
170    }
171    tokio::fs::rename(&part_path, &final_path)
172        .await
173        .map_err(|e| anyhow::anyhow!("finalizing {fname}: {e}"))?;
174    Ok(())
175}
176
177async fn mark_filled(pool: &SqlitePool, gap_id: &str) {
178    let now = Utc::now();
179    let _ = sqlx::query(
180        "UPDATE recording_gaps
181            SET fill_state = 'filled', filled_at = ?, last_attempt_at = ?,
182                fill_attempts = fill_attempts + 1
183          WHERE id = ?",
184    )
185    .bind(now)
186    .bind(now)
187    .bind(gap_id)
188    .execute(pool)
189    .await;
190}
191
192/// Bump the attempt counter; mark `failed` once attempts are exhausted so the gap drops out of the
193/// pending queue (the retry endpoint can reset it to `pending`).
194async fn bump_attempt(pool: &SqlitePool, gap_id: &str, max_attempts: i64) {
195    let _ = sqlx::query(
196        "UPDATE recording_gaps
197            SET fill_attempts = fill_attempts + 1,
198                last_attempt_at = ?,
199                fill_state = CASE WHEN fill_attempts + 1 >= ? THEN 'failed' ELSE 'pending' END
200          WHERE id = ?",
201    )
202    .bind(Utc::now())
203    .bind(max_attempts)
204    .bind(gap_id)
205    .execute(pool)
206    .await;
207}