use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use sqlx::SqlitePool;
use tokio::process::Command;
use crate::camera_url;
use crate::config::Config;
use crate::models::{Camera, RecordingGap};
const MAX_FILL_TIMEOUT_S: u64 = 4 * 3600;
const MAX_GAPS_PER_SWEEP: i64 = 20;
pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
if !cfg.anr_enabled {
tracing::info!("anr: disabled (HELDAR_ANR_ENABLED=false)");
return;
}
let interval_s = cfg.anr_interval_s.max(10);
tracing::info!(
interval_s,
max_gap_hours = cfg.anr_max_gap_hours,
max_attempts = cfg.anr_max_attempts,
"anr: edge re-fill started"
);
let mut tick = tokio::time::interval(Duration::from_secs(interval_s));
loop {
tick.tick().await;
if let Err(e) = sweep(&pool, &cfg).await {
tracing::error!(error = %e, "anr: sweep failed");
}
}
}
async fn sweep(pool: &SqlitePool, cfg: &Config) -> anyhow::Result<()> {
let max_attempts = cfg.anr_max_attempts.max(1);
let cutoff = Utc::now() - chrono::Duration::hours(cfg.anr_max_gap_hours.max(1));
let gaps: Vec<RecordingGap> = sqlx::query_as::<_, RecordingGap>(
"SELECT g.* FROM recording_gaps g
JOIN cameras c ON c.id = g.camera_id
WHERE g.fill_state = 'pending'
AND g.fill_attempts < ?
AND g.gap_start >= ?
AND c.anr_enabled = 1
ORDER BY g.gap_start DESC
LIMIT ?",
)
.bind(max_attempts)
.bind(cutoff)
.bind(MAX_GAPS_PER_SWEEP)
.fetch_all(pool)
.await?;
for gap in gaps {
let cam = match sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
.bind(&gap.camera_id)
.fetch_optional(pool)
.await?
{
Some(c) => c,
None => continue, };
match fill_gap(cfg, &cam, &gap).await {
Ok(()) => {
tracing::info!(camera = %gap.camera_id, gap = %gap.id, "anr: gap re-filled from camera storage");
mark_filled(pool, &gap.id).await;
}
Err(e) => {
tracing::warn!(camera = %gap.camera_id, gap = %gap.id, error = %e, "anr: fill attempt failed");
bump_attempt(pool, &gap.id, max_attempts).await;
}
}
}
Ok(())
}
async fn fill_gap(cfg: &Config, cam: &Camera, gap: &RecordingGap) -> anyhow::Result<()> {
let url = camera_url::anr_replay_url(cam, gap.gap_start, gap.gap_end).ok_or_else(|| {
anyhow::anyhow!(
"no replay URL: set anr_replay_url_template, or address+credentials for the default"
)
})?;
let dir = cfg.camera_recordings_dir(&cam.id);
tokio::fs::create_dir_all(&dir)
.await
.map_err(|e| anyhow::anyhow!("creating recordings dir: {e}"))?;
let fname = format!("{}.mp4", gap.gap_start.format("%Y%m%d_%H%M%S"));
let final_path = dir.join(&fname);
if tokio::fs::try_exists(&final_path).await.unwrap_or(false) {
return Ok(());
}
let part_path = dir.join(format!("{fname}.part"));
let _ = tokio::fs::remove_file(&part_path).await;
let dur = gap.gap_seconds.max(1) as u64;
let audio_args: &[&str] = if cam.record_audio {
&["-c:a", "copy"]
} else {
&["-an"]
};
let mut cmd = Command::new(&cfg.ffmpeg_bin);
cmd.kill_on_drop(true)
.env("TZ", "UTC")
.args(["-nostdin", "-hide_banner", "-loglevel", "warning"])
.args(["-rtsp_transport", "tcp"])
.args(["-timeout", "15000000"]) .args(["-i", &url])
.args(["-t", &dur.to_string()]) .args(["-c", "copy"]) .args(audio_args)
.args(["-movflags", "+faststart"])
.arg(&part_path)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped());
let job_timeout = Duration::from_secs((dur + 60).min(MAX_FILL_TIMEOUT_S));
let outcome = tokio::time::timeout(job_timeout, cmd.output()).await;
let out = match outcome {
Err(_) => {
let _ = tokio::fs::remove_file(&part_path).await;
anyhow::bail!("replay capture timed out after {}s", job_timeout.as_secs());
}
Ok(Err(e)) => {
let _ = tokio::fs::remove_file(&part_path).await;
anyhow::bail!("spawning ffmpeg: {e}");
}
Ok(Ok(out)) => out,
};
if !out.status.success() {
let err = camera_url::mask_url(String::from_utf8_lossy(&out.stderr).trim());
let _ = tokio::fs::remove_file(&part_path).await;
anyhow::bail!("ffmpeg replay failed: {err}");
}
let size = tokio::fs::metadata(&part_path)
.await
.map(|m| m.len())
.unwrap_or(0);
if size == 0 {
let _ = tokio::fs::remove_file(&part_path).await;
anyhow::bail!(
"replay produced an empty file (camera likely has no footage for this window)"
);
}
tokio::fs::rename(&part_path, &final_path)
.await
.map_err(|e| anyhow::anyhow!("finalizing {fname}: {e}"))?;
Ok(())
}
async fn mark_filled(pool: &SqlitePool, gap_id: &str) {
let now = Utc::now();
let _ = sqlx::query(
"UPDATE recording_gaps
SET fill_state = 'filled', filled_at = ?, last_attempt_at = ?,
fill_attempts = fill_attempts + 1
WHERE id = ?",
)
.bind(now)
.bind(now)
.bind(gap_id)
.execute(pool)
.await;
}
async fn bump_attempt(pool: &SqlitePool, gap_id: &str, max_attempts: i64) {
let _ = sqlx::query(
"UPDATE recording_gaps
SET fill_attempts = fill_attempts + 1,
last_attempt_at = ?,
fill_state = CASE WHEN fill_attempts + 1 >= ? THEN 'failed' ELSE 'pending' END
WHERE id = ?",
)
.bind(Utc::now())
.bind(max_attempts)
.bind(gap_id)
.execute(pool)
.await;
}