heldar_kernel/services/
anr.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use chrono::Utc;
19use sqlx::SqlitePool;
20use tokio::process::Command;
21
22use crate::camera_url;
23use crate::config::Config;
24use crate::models::{Camera, RecordingGap};
25
26const MAX_FILL_TIMEOUT_S: u64 = 4 * 3600;
28const MAX_GAPS_PER_SWEEP: i64 = 20;
30
31pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
32 if !cfg.anr_enabled {
33 tracing::info!("anr: disabled (HELDAR_ANR_ENABLED=false)");
34 return;
35 }
36 let interval_s = cfg.anr_interval_s.max(10);
37 tracing::info!(
38 interval_s,
39 max_gap_hours = cfg.anr_max_gap_hours,
40 max_attempts = cfg.anr_max_attempts,
41 "anr: edge re-fill started"
42 );
43 let mut tick = tokio::time::interval(Duration::from_secs(interval_s));
44 loop {
45 tick.tick().await;
46 if let Err(e) = sweep(&pool, &cfg).await {
47 tracing::error!(error = %e, "anr: sweep failed");
48 }
49 }
50}
51
52async fn sweep(pool: &SqlitePool, cfg: &Config) -> anyhow::Result<()> {
54 let max_attempts = cfg.anr_max_attempts.max(1);
55 let cutoff = Utc::now() - chrono::Duration::hours(cfg.anr_max_gap_hours.max(1));
56 let gaps: Vec<RecordingGap> = sqlx::query_as::<_, RecordingGap>(
57 "SELECT g.* FROM recording_gaps g
58 JOIN cameras c ON c.id = g.camera_id
59 WHERE g.fill_state = 'pending'
60 AND g.fill_attempts < ?
61 AND g.gap_start >= ?
62 AND c.anr_enabled = 1
63 ORDER BY g.gap_start DESC
64 LIMIT ?",
65 )
66 .bind(max_attempts)
67 .bind(cutoff)
68 .bind(MAX_GAPS_PER_SWEEP)
69 .fetch_all(pool)
70 .await?;
71
72 for gap in gaps {
73 let cam = match sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
74 .bind(&gap.camera_id)
75 .fetch_optional(pool)
76 .await?
77 {
78 Some(c) => c,
79 None => continue, };
81 match fill_gap(cfg, &cam, &gap).await {
82 Ok(()) => {
83 tracing::info!(camera = %gap.camera_id, gap = %gap.id, "anr: gap re-filled from camera storage");
84 mark_filled(pool, &gap.id).await;
85 }
86 Err(e) => {
87 tracing::warn!(camera = %gap.camera_id, gap = %gap.id, error = %e, "anr: fill attempt failed");
88 bump_attempt(pool, &gap.id, max_attempts).await;
89 }
90 }
91 }
92 Ok(())
93}
94
95async fn fill_gap(cfg: &Config, cam: &Camera, gap: &RecordingGap) -> anyhow::Result<()> {
97 let url = camera_url::anr_replay_url(cam, gap.gap_start, gap.gap_end).ok_or_else(|| {
98 anyhow::anyhow!(
99 "no replay URL: set anr_replay_url_template, or address+credentials for the default"
100 )
101 })?;
102
103 let dir = cfg.camera_recordings_dir(&cam.id);
104 tokio::fs::create_dir_all(&dir)
105 .await
106 .map_err(|e| anyhow::anyhow!("creating recordings dir: {e}"))?;
107
108 let fname = format!("{}.mp4", gap.gap_start.format("%Y%m%d_%H%M%S"));
110 let final_path = dir.join(&fname);
111 if tokio::fs::try_exists(&final_path).await.unwrap_or(false) {
112 return Ok(());
114 }
115 let part_path = dir.join(format!("{fname}.part"));
117 let _ = tokio::fs::remove_file(&part_path).await;
118
119 let dur = gap.gap_seconds.max(1) as u64;
120 let audio_args: &[&str] = if cam.record_audio {
121 &["-c:a", "copy"]
122 } else {
123 &["-an"]
124 };
125 let mut cmd = Command::new(&cfg.ffmpeg_bin);
126 cmd.kill_on_drop(true)
127 .env("TZ", "UTC")
128 .args(["-nostdin", "-hide_banner", "-loglevel", "warning"])
129 .args(["-rtsp_transport", "tcp"])
130 .args(["-timeout", "15000000"]) .args(["-i", &url])
132 .args(["-t", &dur.to_string()]) .args(["-c", "copy"]) .args(audio_args)
135 .args(["-movflags", "+faststart"])
136 .arg(&part_path)
137 .stdin(std::process::Stdio::null())
138 .stdout(std::process::Stdio::null())
139 .stderr(std::process::Stdio::piped());
140
141 let job_timeout = Duration::from_secs((dur + 60).min(MAX_FILL_TIMEOUT_S));
142 let outcome = tokio::time::timeout(job_timeout, cmd.output()).await;
143
144 let out = match outcome {
145 Err(_) => {
146 let _ = tokio::fs::remove_file(&part_path).await;
147 anyhow::bail!("replay capture timed out after {}s", job_timeout.as_secs());
148 }
149 Ok(Err(e)) => {
150 let _ = tokio::fs::remove_file(&part_path).await;
151 anyhow::bail!("spawning ffmpeg: {e}");
152 }
153 Ok(Ok(out)) => out,
154 };
155 if !out.status.success() {
156 let err = camera_url::mask_url(String::from_utf8_lossy(&out.stderr).trim());
157 let _ = tokio::fs::remove_file(&part_path).await;
158 anyhow::bail!("ffmpeg replay failed: {err}");
159 }
160
161 let size = tokio::fs::metadata(&part_path)
162 .await
163 .map(|m| m.len())
164 .unwrap_or(0);
165 if size == 0 {
166 let _ = tokio::fs::remove_file(&part_path).await;
167 anyhow::bail!(
168 "replay produced an empty file (camera likely has no footage for this window)"
169 );
170 }
171 tokio::fs::rename(&part_path, &final_path)
172 .await
173 .map_err(|e| anyhow::anyhow!("finalizing {fname}: {e}"))?;
174 Ok(())
175}
176
177async fn mark_filled(pool: &SqlitePool, gap_id: &str) {
178 let now = Utc::now();
179 let _ = sqlx::query(
180 "UPDATE recording_gaps
181 SET fill_state = 'filled', filled_at = ?, last_attempt_at = ?,
182 fill_attempts = fill_attempts + 1
183 WHERE id = ?",
184 )
185 .bind(now)
186 .bind(now)
187 .bind(gap_id)
188 .execute(pool)
189 .await;
190}
191
192async fn bump_attempt(pool: &SqlitePool, gap_id: &str, max_attempts: i64) {
195 let _ = sqlx::query(
196 "UPDATE recording_gaps
197 SET fill_attempts = fill_attempts + 1,
198 last_attempt_at = ?,
199 fill_state = CASE WHEN fill_attempts + 1 >= ? THEN 'failed' ELSE 'pending' END
200 WHERE id = ?",
201 )
202 .bind(Utc::now())
203 .bind(max_attempts)
204 .bind(gap_id)
205 .execute(pool)
206 .await;
207}