Skip to main content

heldar_kernel/services/
backup.rs

1//! Backup subsystem: scheduled policy jobs, on-demand archive export, and the shared transfer
2//! plumbing.
3//!
4//! The scheduler (spawned from `main` when `HELDAR_BACKUP_ENABLED`) ticks every
5//! `HELDAR_BACKUP_SCHEDULER_INTERVAL_S`, creates a `backup_job` for each due enabled policy, and runs
6//! it under a process-wide concurrency [`Semaphore`] (also shared by manual triggers) with a
7//! per-job timeout. A job resolves its segment files (camera selection + time window, optionally only
8//! evidence-locked footage) and ships them:
9//!   - `local` destinations copy via std fs into `{dest path}/{camera_id}/` (NAS mounts, no rclone).
10//!   - `sftp` / `ftp` / `s3` destinations shell out to rclone (`HELDAR_RCLONE_BIN`). When rclone is not
11//!     installed the job is marked `error` with a clear message — the build/tests never require it.
12//!
13//! On-demand archive export ([`create_archive`]) builds a `.zip` of the selected segments via
14//! `/usr/bin/zip` into `HELDAR_ARCHIVE_DIR/{job_id}.zip` (served at `/media/archives`), enforcing
15//! `HELDAR_ARCHIVE_MAX_BYTES`. It reuses `backup_jobs` with `kind='on_demand_archive'` + `output_url`.
16
17use std::path::Path;
18use std::process::Stdio;
19use std::sync::{Arc, OnceLock};
20use std::time::Duration;
21
22use chrono::{DateTime, Utc};
23use serde_json::Value;
24use sqlx::types::Json as SqlxJson;
25use sqlx::SqlitePool;
26use tokio::process::Command;
27use tokio::sync::Semaphore;
28use uuid::Uuid;
29
30use crate::config::Config;
31use crate::error::{AppError, AppResult};
32use crate::models::{BackupDestination, BackupJob, BackupPolicy, BackupTestResult, Segment};
33use crate::repo;
34use crate::state::AppState;
35
36/// Hardcoded archiver (the environment provides /usr/bin/zip + tar).
37const ZIP_BIN: &str = "/usr/bin/zip";
38
39/// Process-wide job concurrency gate, sized from config on first use and shared by the scheduler +
40/// manual triggers. A `OnceLock` (not reset on scheduler respawn) keeps the bound stable for the
41/// process lifetime.
42fn job_semaphore(cfg: &Config) -> Arc<Semaphore> {
43    static SEM: OnceLock<Arc<Semaphore>> = OnceLock::new();
44    SEM.get_or_init(|| Arc::new(Semaphore::new(cfg.backup_max_concurrent_jobs.max(1))))
45        .clone()
46}
47
48/// Background scheduler loop. Returns immediately (no respawn churn) when backups are disabled — but
49/// `main` already guards the spawn, mirroring the notifier.
50pub async fn run(state: AppState) {
51    if !state.cfg.backup_enabled {
52        tracing::info!("backup: scheduler disabled (HELDAR_BACKUP_ENABLED=false)");
53        return;
54    }
55    let interval_s = state.cfg.backup_scheduler_interval_s.max(5);
56    tracing::info!(
57        interval_s,
58        max_concurrent = state.cfg.backup_max_concurrent_jobs,
59        "backup: scheduler started"
60    );
61    let mut tick = tokio::time::interval(Duration::from_secs(interval_s));
62    loop {
63        tick.tick().await;
64        if let Err(e) = sweep(&state).await {
65            tracing::error!(error = %e, "backup: scheduler tick failed");
66        }
67    }
68}
69
70/// Create + dispatch a job for every due enabled policy.
71async fn sweep(state: &AppState) -> anyhow::Result<()> {
72    let now = Utc::now();
73    let policies: Vec<BackupPolicy> =
74        sqlx::query_as::<_, BackupPolicy>("SELECT * FROM backup_policies WHERE enabled = 1")
75            .fetch_all(&state.pool)
76            .await?;
77    for p in policies {
78        let due = match p.last_run_at {
79            None => true,
80            Some(last) => last + chrono::Duration::seconds(p.schedule_interval_s.max(1)) <= now,
81        };
82        if !due {
83            continue;
84        }
85        match create_policy_job(state, &p).await {
86            Ok(job_id) => spawn_job(state.clone(), job_id),
87            Err(e) => tracing::error!(policy = %p.id, error = %e, "backup: failed to create job"),
88        }
89    }
90    Ok(())
91}
92
93/// Insert a `policy` job from a policy and claim the policy (`last_run_at`/`last_job_id`) so the next
94/// tick does not re-trigger it. Returns the new job id.
95async fn create_policy_job(state: &AppState, p: &BackupPolicy) -> anyhow::Result<String> {
96    let now = Utc::now();
97    let job_id = format!("bkj_{}", Uuid::new_v4().simple());
98    let from_time = if p.lookback_hours > 0 {
99        Some(now - chrono::Duration::hours(p.lookback_hours))
100    } else {
101        None
102    };
103    let to_time = Some(now);
104    sqlx::query(
105        "INSERT INTO backup_jobs
106           (id, policy_id, destination_id, kind, camera_ids, from_time, to_time,
107            incident_lock_only, status, created_at)
108         VALUES (?, ?, ?, 'policy', ?, ?, ?, ?, 'pending', ?)",
109    )
110    .bind(&job_id)
111    .bind(&p.id)
112    .bind(&p.destination_id)
113    .bind(SqlxJson(p.camera_ids.0.clone()))
114    .bind(from_time)
115    .bind(to_time)
116    .bind(p.incident_lock_only)
117    .bind(now)
118    .execute(&state.pool)
119    .await?;
120    sqlx::query(
121        "UPDATE backup_policies SET last_run_at = ?, last_job_id = ?, updated_at = ? WHERE id = ?",
122    )
123    .bind(now)
124    .bind(&job_id)
125    .bind(now)
126    .bind(&p.id)
127    .execute(&state.pool)
128    .await?;
129    Ok(job_id)
130}
131
132/// Manually trigger a policy: create its job and dispatch it (returns the job id immediately).
133pub async fn trigger_policy(state: &AppState, policy: &BackupPolicy) -> anyhow::Result<String> {
134    let job_id = create_policy_job(state, policy).await?;
135    spawn_job(state.clone(), job_id.clone());
136    Ok(job_id)
137}
138
139/// Spawn a detached task that acquires a concurrency permit then executes the job under the timeout.
140fn spawn_job(state: AppState, job_id: String) {
141    let sem = job_semaphore(&state.cfg);
142    let timeout = Duration::from_secs(state.cfg.backup_job_timeout_s.max(30));
143    tokio::spawn(async move {
144        let _permit = match sem.acquire_owned().await {
145            Ok(p) => p,
146            Err(_) => return,
147        };
148        execute_job(&state, &job_id, timeout).await;
149    });
150}
151
152/// Execute a destination-backed job: resolve its segments, copy them, and record progress + outcome.
153async fn execute_job(state: &AppState, job_id: &str, timeout: Duration) {
154    let Some(job) = sqlx::query_as::<_, BackupJob>("SELECT * FROM backup_jobs WHERE id = ?")
155        .bind(job_id)
156        .fetch_optional(&state.pool)
157        .await
158        .ok()
159        .flatten()
160    else {
161        return;
162    };
163
164    let dest = match &job.destination_id {
165        Some(d) => {
166            sqlx::query_as::<_, BackupDestination>("SELECT * FROM backup_destinations WHERE id = ?")
167                .bind(d)
168                .fetch_optional(&state.pool)
169                .await
170                .ok()
171                .flatten()
172        }
173        None => None,
174    };
175    let Some(dest) = dest else {
176        set_job_error(state, job_id, "backup destination not found or removed").await;
177        return;
178    };
179    if !dest.enabled {
180        set_job_error(state, job_id, "backup destination is disabled").await;
181        return;
182    }
183
184    let camera_ids = json_to_string_vec(&job.camera_ids.0);
185    let segments = match resolve_segments(
186        &state.pool,
187        &camera_ids,
188        job.from_time,
189        job.to_time,
190        job.incident_lock_only,
191    )
192    .await
193    {
194        Ok(s) => s,
195        Err(e) => {
196            set_job_error(state, job_id, &format!("resolving segments: {e}")).await;
197            return;
198        }
199    };
200
201    let files_total = segments.len() as i64;
202    let _ = sqlx::query(
203        "UPDATE backup_jobs SET status = 'running', files_total = ?, started_at = ? WHERE id = ?",
204    )
205    .bind(files_total)
206    .bind(Utc::now())
207    .bind(job_id)
208    .execute(&state.pool)
209    .await;
210
211    if segments.is_empty() {
212        let _ = sqlx::query(
213            "UPDATE backup_jobs SET status = 'completed', finished_at = ? WHERE id = ?",
214        )
215        .bind(Utc::now())
216        .bind(job_id)
217        .execute(&state.pool)
218        .await;
219        return;
220    }
221
222    // Read-lock the source segments so retention cannot prune them mid-transfer; always released
223    // after the (possibly timed-out) transfer future settles.
224    let seg_ids: Vec<String> = segments.iter().map(|s| s.id.clone()).collect();
225    repo::set_segments_locked(&state.pool, &seg_ids, true).await;
226    let outcome =
227        tokio::time::timeout(timeout, copy_segments(state, job_id, &dest, &segments)).await;
228    repo::set_segments_locked(&state.pool, &seg_ids, false).await;
229
230    match outcome {
231        Err(_) => set_job_error(state, job_id, "backup job timed out").await,
232        Ok(Err(e)) => set_job_error(state, job_id, &e.to_string()).await,
233        Ok(Ok((copied, bytes))) => {
234            let _ = sqlx::query(
235                "UPDATE backup_jobs SET status = 'completed', files_copied = ?, bytes_copied = ?, finished_at = ? WHERE id = ?",
236            )
237            .bind(copied as i64)
238            .bind(bytes as i64)
239            .bind(Utc::now())
240            .bind(job_id)
241            .execute(&state.pool)
242            .await;
243            tracing::info!(job = job_id, files = copied, bytes, "backup: job completed");
244        }
245    }
246}
247
248async fn set_job_error(state: &AppState, job_id: &str, msg: &str) {
249    tracing::warn!(job = job_id, error = msg, "backup: job failed");
250    let _ = sqlx::query(
251        "UPDATE backup_jobs SET status = 'error', error = ?, finished_at = ? WHERE id = ?",
252    )
253    .bind(msg)
254    .bind(Utc::now())
255    .bind(job_id)
256    .execute(&state.pool)
257    .await;
258}
259
260/// Dispatch the transfer by destination kind. Returns (files_copied, bytes_copied).
261async fn copy_segments(
262    state: &AppState,
263    job_id: &str,
264    dest: &BackupDestination,
265    segments: &[Segment],
266) -> anyhow::Result<(u64, u64)> {
267    match dest.kind.as_str() {
268        "local" => copy_local(state, job_id, dest, segments).await,
269        "sftp" | "ftp" | "s3" => copy_rclone(state, job_id, dest, segments).await,
270        other => anyhow::bail!("unknown backup destination kind `{other}`"),
271    }
272}
273
274/// Local / NAS-mount destination: std fs copy into `{path}/{camera_id}/{file}`.
275async fn copy_local(
276    state: &AppState,
277    job_id: &str,
278    dest: &BackupDestination,
279    segments: &[Segment],
280) -> anyhow::Result<(u64, u64)> {
281    let base = cfg_str(&dest.config.0, "path");
282    if base.is_empty() {
283        anyhow::bail!("local destination has no `path` configured");
284    }
285    let base = Path::new(&base);
286    let mut copied = 0u64;
287    let mut bytes = 0u64;
288    for seg in segments {
289        let cam_dir = base.join(&seg.camera_id);
290        tokio::fs::create_dir_all(&cam_dir)
291            .await
292            .map_err(|e| anyhow::anyhow!("creating {}: {e}", cam_dir.display()))?;
293        let target = cam_dir.join(file_name_of(&seg.path));
294        match tokio::fs::copy(&seg.path, &target).await {
295            Ok(n) => {
296                copied += 1;
297                bytes += n;
298            }
299            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
300                tracing::warn!(job = job_id, path = %seg.path, "backup: source segment vanished; skipping");
301            }
302            Err(e) => anyhow::bail!("copying {}: {e}", seg.path),
303        }
304        update_progress(state, job_id, copied, bytes).await;
305    }
306    Ok((copied, bytes))
307}
308
309/// Remote destination (sftp/ftp/s3) via rclone. Degrades to a clear error when rclone is missing.
310async fn copy_rclone(
311    state: &AppState,
312    job_id: &str,
313    dest: &BackupDestination,
314    segments: &[Segment],
315) -> anyhow::Result<(u64, u64)> {
316    let bin = &state.cfg.rclone_bin;
317    if !binary_available(bin).await {
318        anyhow::bail!(
319            "rclone binary `{bin}` is not available; install rclone or set HELDAR_RCLONE_BIN \
320             (remote sftp/ftp/s3 backup requires it; local/NAS destinations do not)"
321        );
322    }
323    let (remote, base, secrets) = build_remote(bin, &dest.kind, &dest.config.0).await?;
324    let mut copied = 0u64;
325    let mut bytes = 0u64;
326    for seg in segments {
327        let rel = join_path(&base, &[&seg.camera_id, &file_name_of(&seg.path)]);
328        let target = format!("{remote}{rel}");
329        let out = Command::new(bin)
330            .arg("copyto")
331            .arg(&seg.path)
332            .arg(&target)
333            .arg("--no-traverse")
334            .stdin(Stdio::null())
335            .stdout(Stdio::null())
336            .stderr(Stdio::piped())
337            .kill_on_drop(true)
338            .output()
339            .await
340            .map_err(|e| anyhow::anyhow!("spawning rclone: {e}"))?;
341        if out.status.success() {
342            copied += 1;
343            bytes += seg.size_bytes.max(0) as u64;
344        } else {
345            let err = scrub(&String::from_utf8_lossy(&out.stderr), &secrets);
346            anyhow::bail!(
347                "rclone copy failed for {}: {}",
348                file_name_of(&seg.path),
349                err.trim()
350            );
351        }
352        update_progress(state, job_id, copied, bytes).await;
353    }
354    Ok((copied, bytes))
355}
356
357async fn update_progress(state: &AppState, job_id: &str, copied: u64, bytes: u64) {
358    let _ = sqlx::query("UPDATE backup_jobs SET files_copied = ?, bytes_copied = ? WHERE id = ?")
359        .bind(copied as i64)
360        .bind(bytes as i64)
361        .bind(job_id)
362        .execute(&state.pool)
363        .await;
364}
365
366// ---- On-demand archive export ----
367
368/// Build a `.zip` of the selected segments and record it as an `on_demand_archive` job. Enforces the
369/// archive size cap on the source footprint; runs inline (bounded by the job timeout) so the returned
370/// job already carries `output_url`.
371pub async fn create_archive(
372    state: &AppState,
373    camera_ids: Vec<String>,
374    from: Option<DateTime<Utc>>,
375    to: Option<DateTime<Utc>>,
376    incident_lock_only: bool,
377    trim: bool,
378) -> AppResult<BackupJob> {
379    if trim && (from.is_none() || to.is_none()) {
380        return Err(AppError::BadRequest(
381            "`trim` requires both `from` and `to`".into(),
382        ));
383    }
384    let segments = resolve_segments(&state.pool, &camera_ids, from, to, incident_lock_only).await?;
385    if segments.is_empty() {
386        return Err(AppError::NotFound(
387            "no recorded footage matches the requested archive selection".into(),
388        ));
389    }
390    let total_bytes: i64 = segments.iter().map(|s| s.size_bytes.max(0)).sum();
391    if total_bytes as u64 > state.cfg.archive_max_bytes {
392        return Err(AppError::BadRequest(format!(
393            "archive selection is {total_bytes} bytes; exceeds the limit of {} bytes (HELDAR_ARCHIVE_MAX_BYTES)",
394            state.cfg.archive_max_bytes
395        )));
396    }
397
398    tokio::fs::create_dir_all(&state.cfg.archive_dir)
399        .await
400        .map_err(|e| AppError::Other(e.into()))?;
401
402    let job_id = format!("bkj_{}", Uuid::new_v4().simple());
403    let now = Utc::now();
404    let files_total = segments.len() as i64;
405    sqlx::query(
406        "INSERT INTO backup_jobs
407           (id, policy_id, destination_id, kind, camera_ids, from_time, to_time,
408            incident_lock_only, status, files_total, started_at, created_at)
409         VALUES (?, NULL, NULL, 'on_demand_archive', ?, ?, ?, ?, 'running', ?, ?, ?)",
410    )
411    .bind(&job_id)
412    .bind(SqlxJson(json_from_strs(&camera_ids)))
413    .bind(from)
414    .bind(to)
415    .bind(incident_lock_only)
416    .bind(files_total)
417    .bind(now)
418    .bind(now)
419    .execute(&state.pool)
420    .await?;
421
422    // Read-lock the sources for the duration of the zip/trim (released on every outcome).
423    let seg_ids: Vec<String> = segments.iter().map(|s| s.id.clone()).collect();
424    repo::set_segments_locked(&state.pool, &seg_ids, true).await;
425    let timeout = Duration::from_secs(state.cfg.backup_job_timeout_s.max(30));
426    let outcome = tokio::time::timeout(
427        timeout,
428        build_archive_zip(state, &job_id, &segments, from, to, trim),
429    )
430    .await;
431    repo::set_segments_locked(&state.pool, &seg_ids, false).await;
432
433    let out_path = state.cfg.archive_dir.join(format!("{job_id}.zip"));
434    match outcome {
435        Err(_) => {
436            let _ = tokio::fs::remove_file(&out_path).await;
437            set_job_error(state, &job_id, "archive export timed out").await;
438            return Err(AppError::Other(anyhow::anyhow!("archive export timed out")));
439        }
440        Ok(Err(e)) => {
441            let _ = tokio::fs::remove_file(&out_path).await;
442            set_job_error(state, &job_id, &e.to_string()).await;
443            return Err(AppError::Other(e));
444        }
445        Ok(Ok(zip_bytes)) => {
446            let url = format!("/media/archives/{job_id}.zip");
447            sqlx::query(
448                "UPDATE backup_jobs SET status = 'completed', files_copied = ?, bytes_copied = ?, output_path = ?, output_url = ?, finished_at = ? WHERE id = ?",
449            )
450            .bind(files_total)
451            .bind(zip_bytes as i64)
452            .bind(out_path.to_string_lossy().to_string())
453            .bind(&url)
454            .bind(Utc::now())
455            .bind(&job_id)
456            .execute(&state.pool)
457            .await?;
458        }
459    }
460
461    let job = sqlx::query_as::<_, BackupJob>("SELECT * FROM backup_jobs WHERE id = ?")
462        .bind(&job_id)
463        .fetch_one(&state.pool)
464        .await?;
465    Ok(job)
466}
467
468/// Stage the selected segments under a temp dir (symlinks, or ffmpeg-trimmed copies) then zip them.
469/// Returns the produced zip's size in bytes. The staging dir is always removed.
470async fn build_archive_zip(
471    state: &AppState,
472    job_id: &str,
473    segments: &[Segment],
474    from: Option<DateTime<Utc>>,
475    to: Option<DateTime<Utc>>,
476    trim: bool,
477) -> anyhow::Result<u64> {
478    let staging = state.cfg.archive_dir.join(format!("{job_id}.stage"));
479    let out_path = state.cfg.archive_dir.join(format!("{job_id}.zip"));
480    let _ = tokio::fs::remove_dir_all(&staging).await;
481    let _ = tokio::fs::remove_file(&out_path).await;
482
483    let inner = async {
484        tokio::fs::create_dir_all(&staging).await?;
485        for seg in segments {
486            let cam_dir = staging.join(&seg.camera_id);
487            tokio::fs::create_dir_all(&cam_dir).await?;
488            let link = cam_dir.join(file_name_of(&seg.path));
489            if trim {
490                // from/to are guaranteed Some when trim is set (validated by the caller).
491                trim_segment(state, seg, from.unwrap(), to.unwrap(), &link).await?;
492            } else {
493                match tokio::fs::symlink(&seg.path, &link).await {
494                    Ok(()) => {}
495                    Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
496                    Err(e) => return Err(anyhow::anyhow!("staging {}: {e}", seg.path)),
497                }
498            }
499        }
500        // zip recursively from the staging dir; the output lives in the parent (archive_dir), so the
501        // archive never tries to include itself. zip follows symlinks by default (stores content).
502        let out = Command::new(ZIP_BIN)
503            .current_dir(&staging)
504            .arg("-r")
505            .arg("-q")
506            .arg(&out_path)
507            .arg(".")
508            .stdin(Stdio::null())
509            .stdout(Stdio::null())
510            .stderr(Stdio::piped())
511            .kill_on_drop(true)
512            .output()
513            .await
514            .map_err(|e| anyhow::anyhow!("spawning zip ({ZIP_BIN}): {e}"))?;
515        if !out.status.success() {
516            anyhow::bail!(
517                "zip failed: {}",
518                String::from_utf8_lossy(&out.stderr).trim()
519            );
520        }
521        let size = tokio::fs::metadata(&out_path)
522            .await
523            .map(|m| m.len())
524            .unwrap_or(0);
525        Ok::<u64, anyhow::Error>(size)
526    }
527    .await;
528
529    let _ = tokio::fs::remove_dir_all(&staging).await;
530    inner
531}
532
533/// Re-mux the [from, to] overlap of a segment into `out` (`-c copy`, keyframe-aligned like clip export).
534async fn trim_segment(
535    state: &AppState,
536    seg: &Segment,
537    from: DateTime<Utc>,
538    to: DateTime<Utc>,
539    out: &Path,
540) -> anyhow::Result<()> {
541    let win_start = from.max(seg.start_time);
542    let win_end = to.min(seg.end_time);
543    let ss = ((win_start - seg.start_time).num_milliseconds() as f64 / 1000.0).max(0.0);
544    let dur = (win_end - win_start).num_milliseconds() as f64 / 1000.0;
545    if dur <= 0.0 {
546        // No meaningful overlap (resolve_segments already filters to overlapping rows, so this is a
547        // rare edge); fall back to staging the whole segment.
548        let _ = tokio::fs::symlink(&seg.path, out).await;
549        return Ok(());
550    }
551    let out_status = Command::new(&state.cfg.ffmpeg_bin)
552        .kill_on_drop(true)
553        .args(["-hide_banner", "-loglevel", "error"])
554        .args(["-ss", &format!("{ss:.3}")])
555        .arg("-i")
556        .arg(&seg.path)
557        .args(["-t", &format!("{dur:.3}")])
558        .args([
559            "-c",
560            "copy",
561            "-avoid_negative_ts",
562            "make_zero",
563            "-movflags",
564            "+faststart",
565        ])
566        .arg(out)
567        .stdin(Stdio::null())
568        .stdout(Stdio::null())
569        .stderr(Stdio::piped())
570        .output()
571        .await
572        .map_err(|e| anyhow::anyhow!("spawning ffmpeg: {e}"))?;
573    if !out_status.status.success() {
574        anyhow::bail!(
575            "ffmpeg trim failed for {}: {}",
576            file_name_of(&seg.path),
577            String::from_utf8_lossy(&out_status.stderr).trim()
578        );
579    }
580    Ok(())
581}
582
583// ---- Destination connectivity test ----
584
585/// Probe a destination: writability for `local`, a short rclone connectivity check for remotes.
586pub async fn test_destination(state: &AppState, dest: &BackupDestination) -> BackupTestResult {
587    let start = std::time::Instant::now();
588    let res = match dest.kind.as_str() {
589        "local" => test_local(&dest.config.0).await,
590        "sftp" | "ftp" | "s3" => test_rclone(state, dest).await,
591        other => Err(anyhow::anyhow!("unknown destination kind `{other}`")),
592    };
593    let latency_ms = start.elapsed().as_millis() as i64;
594    match res {
595        Ok(()) => BackupTestResult {
596            ok: true,
597            error: None,
598            latency_ms,
599        },
600        Err(e) => BackupTestResult {
601            ok: false,
602            error: Some(e.to_string()),
603            latency_ms,
604        },
605    }
606}
607
608async fn test_local(config: &Value) -> anyhow::Result<()> {
609    let base = cfg_str(config, "path");
610    if base.is_empty() {
611        anyhow::bail!("local destination requires `path`");
612    }
613    tokio::fs::create_dir_all(&base)
614        .await
615        .map_err(|e| anyhow::anyhow!("cannot create {base}: {e}"))?;
616    let probe = Path::new(&base).join(".heldar_backup_probe");
617    tokio::fs::write(&probe, b"ok")
618        .await
619        .map_err(|e| anyhow::anyhow!("path not writable: {e}"))?;
620    let _ = tokio::fs::remove_file(&probe).await;
621    Ok(())
622}
623
624async fn test_rclone(state: &AppState, dest: &BackupDestination) -> anyhow::Result<()> {
625    let bin = &state.cfg.rclone_bin;
626    if !binary_available(bin).await {
627        anyhow::bail!(
628            "rclone binary `{bin}` is not available; install rclone or set HELDAR_RCLONE_BIN \
629             (remote sftp/ftp/s3 backup requires it)"
630        );
631    }
632    let (remote, base, secrets) = build_remote(bin, &dest.kind, &dest.config.0).await?;
633    let target = format!("{remote}{base}");
634    let out = tokio::time::timeout(
635        Duration::from_secs(30),
636        Command::new(bin)
637            .arg("lsd")
638            .arg(&target)
639            .args(["--max-depth", "1"])
640            .stdin(Stdio::null())
641            .stdout(Stdio::null())
642            .stderr(Stdio::piped())
643            .kill_on_drop(true)
644            .output(),
645    )
646    .await
647    .map_err(|_| anyhow::anyhow!("rclone connectivity test timed out"))?
648    .map_err(|e| anyhow::anyhow!("spawning rclone: {e}"))?;
649    if !out.status.success() {
650        anyhow::bail!(
651            "rclone could not reach destination: {}",
652            scrub(&String::from_utf8_lossy(&out.stderr), &secrets).trim()
653        );
654    }
655    Ok(())
656}
657
658// ---- shared helpers ----
659
660/// Fetch the segments a job/archive should ship: optionally bounded by camera ids + a [from, to)
661/// overlap window, optionally restricted to evidence-locked footage.
662async fn resolve_segments(
663    pool: &SqlitePool,
664    camera_ids: &[String],
665    from: Option<DateTime<Utc>>,
666    to: Option<DateTime<Utc>>,
667    incident_lock_only: bool,
668) -> sqlx::Result<Vec<Segment>> {
669    let mut sql = String::from("SELECT * FROM segments WHERE 1 = 1");
670    if !camera_ids.is_empty() {
671        let placeholders = vec!["?"; camera_ids.len()].join(",");
672        sql.push_str(&format!(" AND camera_id IN ({placeholders})"));
673    }
674    sql.push_str(" AND (? IS NULL OR start_time < ?) AND (? IS NULL OR end_time > ?)");
675    if incident_lock_only {
676        sql.push_str(" AND evidence_locked = 1");
677    }
678    sql.push_str(" ORDER BY camera_id ASC, start_time ASC");
679
680    let mut q = sqlx::query_as::<_, Segment>(&sql);
681    for id in camera_ids {
682        q = q.bind(id);
683    }
684    q = q.bind(to).bind(to).bind(from).bind(from);
685    q.fetch_all(pool).await
686}
687
688/// Whether an external binary is runnable (so missing rclone degrades to a clear error, not a panic).
689async fn binary_available(bin: &str) -> bool {
690    Command::new(bin)
691        .arg("version")
692        .stdin(Stdio::null())
693        .stdout(Stdio::null())
694        .stderr(Stdio::null())
695        .kill_on_drop(true)
696        .status()
697        .await
698        .map(|s| s.success())
699        .unwrap_or(false)
700}
701
702/// Build an rclone on-the-fly connection-string remote (no persisted config) for a destination kind.
703/// Returns (remote_prefix_ending_in_colon, base_path, secrets_to_scrub_from_logs).
704async fn build_remote(
705    bin: &str,
706    kind: &str,
707    config: &Value,
708) -> anyhow::Result<(String, String, Vec<String>)> {
709    let mut secrets: Vec<String> = Vec::new();
710    match kind {
711        "sftp" | "ftp" => {
712            let host = cfg_str(config, "host");
713            if host.is_empty() {
714                anyhow::bail!("{kind} destination requires `host`");
715            }
716            let user = cfg_str(config, "user");
717            let pass = cfg_str(config, "pass");
718            let port = config
719                .get("port")
720                .and_then(|p| p.as_i64())
721                .map(|p| p.to_string())
722                .unwrap_or_default();
723            let mut parts = vec![format!(":{kind}"), format!("host={host}")];
724            if !port.is_empty() {
725                parts.push(format!("port={port}"));
726            }
727            if !user.is_empty() {
728                parts.push(format!("user={user}"));
729            }
730            if !pass.is_empty() {
731                let obscured = rclone_obscure(bin, &pass).await?;
732                secrets.push(obscured.clone());
733                secrets.push(pass.clone());
734                parts.push(format!("pass={obscured}"));
735            }
736            Ok((
737                format!("{}:", parts.join(",")),
738                cfg_str(config, "path"),
739                secrets,
740            ))
741        }
742        "s3" => {
743            let bucket = cfg_str(config, "bucket");
744            if bucket.is_empty() {
745                anyhow::bail!("s3 destination requires `bucket`");
746            }
747            let access_key = cfg_str(config, "access_key");
748            let secret_key = cfg_str(config, "secret_key");
749            let endpoint = cfg_str(config, "endpoint");
750            let region = cfg_str(config, "region");
751            let mut parts = vec![":s3".to_string(), "provider=Other".to_string()];
752            if !access_key.is_empty() {
753                parts.push(format!("access_key_id={access_key}"));
754            }
755            if !secret_key.is_empty() {
756                secrets.push(secret_key.clone());
757                parts.push(format!("secret_access_key={secret_key}"));
758            }
759            if !endpoint.is_empty() {
760                parts.push(format!("endpoint={endpoint}"));
761            }
762            if !region.is_empty() {
763                parts.push(format!("region={region}"));
764            }
765            let base = join_path("", &[&bucket, &cfg_str(config, "prefix")]);
766            Ok((format!("{}:", parts.join(",")), base, secrets))
767        }
768        other => anyhow::bail!("kind `{other}` does not use rclone"),
769    }
770}
771
772/// Obscure a plaintext password into rclone's at-rest form (only invoked when rclone is present).
773async fn rclone_obscure(bin: &str, pass: &str) -> anyhow::Result<String> {
774    let out = Command::new(bin)
775        .arg("obscure")
776        .arg(pass)
777        .stdin(Stdio::null())
778        .stdout(Stdio::piped())
779        .stderr(Stdio::piped())
780        .kill_on_drop(true)
781        .output()
782        .await
783        .map_err(|e| anyhow::anyhow!("spawning rclone obscure: {e}"))?;
784    if !out.status.success() {
785        anyhow::bail!(
786            "rclone obscure failed: {}",
787            String::from_utf8_lossy(&out.stderr).trim()
788        );
789    }
790    Ok(String::from_utf8_lossy(&out.stdout).trim().to_string())
791}
792
793/// Join a base path with extra segments using `/`, preserving a leading slash on `base` (absolute
794/// remote paths) but never producing a double slash.
795fn join_path(base: &str, parts: &[&str]) -> String {
796    let mut out = base.trim_end_matches('/').to_string();
797    for p in parts {
798        let p = p.trim_matches('/');
799        if p.is_empty() {
800            continue;
801        }
802        if !out.is_empty() {
803            out.push('/');
804        }
805        out.push_str(p);
806    }
807    out
808}
809
810/// Replace any known secret substrings in a log/error string with `***`.
811fn scrub(s: &str, secrets: &[String]) -> String {
812    let mut out = s.to_string();
813    for sec in secrets {
814        if !sec.is_empty() {
815            out = out.replace(sec.as_str(), "***");
816        }
817    }
818    out
819}
820
821fn cfg_str(config: &Value, key: &str) -> String {
822    config
823        .get(key)
824        .and_then(|v| v.as_str())
825        .unwrap_or("")
826        .trim()
827        .to_string()
828}
829
830fn file_name_of(path: &str) -> String {
831    Path::new(path)
832        .file_name()
833        .and_then(|s| s.to_str())
834        .unwrap_or("segment.mp4")
835        .to_string()
836}
837
838fn json_to_string_vec(v: &Value) -> Vec<String> {
839    v.as_array()
840        .map(|a| {
841            a.iter()
842                .filter_map(|x| x.as_str().map(String::from))
843                .collect()
844        })
845        .unwrap_or_default()
846}
847
848fn json_from_strs(v: &[String]) -> Value {
849    Value::Array(v.iter().map(|s| Value::String(s.clone())).collect())
850}
851
852#[cfg(test)]
853mod tests {
854    use super::*;
855    use serde_json::json;
856
857    #[test]
858    fn join_path_preserves_leading_slash() {
859        assert_eq!(
860            join_path("/srv/backups", &["cam1", "f.mp4"]),
861            "/srv/backups/cam1/f.mp4"
862        );
863        assert_eq!(
864            join_path("backups/", &["cam1", "f.mp4"]),
865            "backups/cam1/f.mp4"
866        );
867        assert_eq!(join_path("", &["cam1", "f.mp4"]), "cam1/f.mp4");
868        assert_eq!(join_path("bucket", &["", "p"]), "bucket/p");
869    }
870
871    #[test]
872    fn scrub_masks_secrets() {
873        let s = "auth failed for pass=hunter2 token=hunter2";
874        assert_eq!(
875            scrub(s, &["hunter2".into()]),
876            "auth failed for pass=*** token=***"
877        );
878        assert_eq!(scrub("nothing", &["".into()]), "nothing");
879    }
880
881    #[test]
882    fn json_string_vec_roundtrip() {
883        let v = json!(["a", "b", 3, "c"]);
884        assert_eq!(json_to_string_vec(&v), vec!["a", "b", "c"]);
885        assert_eq!(json_to_string_vec(&json!("nope")), Vec::<String>::new());
886        assert_eq!(json_from_strs(&["x".into(), "y".into()]), json!(["x", "y"]));
887    }
888
889    #[test]
890    fn cfg_str_reads_and_trims() {
891        let c = json!({ "host": "  example.com ", "port": 22 });
892        assert_eq!(cfg_str(&c, "host"), "example.com");
893        assert_eq!(cfg_str(&c, "missing"), "");
894        // non-string fields read as empty
895        assert_eq!(cfg_str(&c, "port"), "");
896    }
897
898    #[test]
899    fn file_name_of_extracts_basename() {
900        assert_eq!(
901            file_name_of("/data/recordings/cam1/20260613_120000.mp4"),
902            "20260613_120000.mp4"
903        );
904        assert_eq!(file_name_of(""), "segment.mp4");
905    }
906}