Skip to main content

heldar_kernel/services/
backup.rs

1//! Backup subsystem: scheduled policy jobs, on-demand archive export, and the shared transfer
2//! plumbing.
3//!
4//! The scheduler (spawned from `main` when `HELDAR_BACKUP_ENABLED`) ticks every
5//! `HELDAR_BACKUP_SCHEDULER_INTERVAL_S`, creates a `backup_job` for each due enabled policy, and runs
6//! it under a process-wide concurrency [`Semaphore`] (also shared by manual triggers) with a
7//! per-job timeout. A job resolves its segment files (camera selection + time window, optionally only
8//! evidence-locked footage) and ships them:
9//!   - `local` destinations copy via std fs into `{dest path}/{camera_id}/` (NAS mounts, no rclone).
10//!   - `sftp` / `ftp` / `s3` destinations shell out to rclone (`HELDAR_RCLONE_BIN`). When rclone is not
11//!     installed the job is marked `error` with a clear message — the build/tests never require it.
12//!
13//! On-demand archive export ([`create_archive`]) builds a `.zip` of the selected segments via
14//! `/usr/bin/zip` into `HELDAR_ARCHIVE_DIR/{job_id}.zip` (served at `/media/archives`), enforcing
15//! `HELDAR_ARCHIVE_MAX_BYTES`. It reuses `backup_jobs` with `kind='on_demand_archive'` + `output_url`.
16
17use std::path::Path;
18use std::process::Stdio;
19use std::sync::{Arc, OnceLock};
20use std::time::Duration;
21
22use chrono::{DateTime, Utc};
23use serde_json::Value;
24use sqlx::types::Json as SqlxJson;
25use sqlx::SqlitePool;
26use tokio::process::Command;
27use tokio::sync::Semaphore;
28use uuid::Uuid;
29
30use crate::config::Config;
31use crate::error::{AppError, AppResult};
32use crate::models::{BackupDestination, BackupJob, BackupPolicy, BackupTestResult, Segment};
33use crate::repo;
34use crate::state::AppState;
35
36/// Hardcoded archiver (the environment provides /usr/bin/zip + tar).
37const ZIP_BIN: &str = "/usr/bin/zip";
38
39/// Process-wide job concurrency gate, sized from config on first use and shared by the scheduler +
40/// manual triggers. A `OnceLock` (not reset on scheduler respawn) keeps the bound stable for the
41/// process lifetime.
42fn job_semaphore(cfg: &Config) -> Arc<Semaphore> {
43    static SEM: OnceLock<Arc<Semaphore>> = OnceLock::new();
44    SEM.get_or_init(|| Arc::new(Semaphore::new(cfg.backup_max_concurrent_jobs.max(1))))
45        .clone()
46}
47
48/// Background scheduler loop. Returns immediately (no respawn churn) when backups are disabled — but
49/// `main` already guards the spawn, mirroring the notifier.
50pub async fn run(state: AppState) {
51    if !state.cfg.backup_enabled {
52        tracing::info!("backup: scheduler disabled (HELDAR_BACKUP_ENABLED=false)");
53        return;
54    }
55    let interval_s = state.cfg.backup_scheduler_interval_s.max(5);
56    tracing::info!(
57        interval_s,
58        max_concurrent = state.cfg.backup_max_concurrent_jobs,
59        "backup: scheduler started"
60    );
61    let mut tick = tokio::time::interval(Duration::from_secs(interval_s));
62    loop {
63        tick.tick().await;
64        if let Err(e) = sweep(&state).await {
65            tracing::error!(error = %e, "backup: scheduler tick failed");
66        }
67    }
68}
69
70/// Create + dispatch a job for every due enabled policy.
71async fn sweep(state: &AppState) -> anyhow::Result<()> {
72    let now = Utc::now();
73    let policies: Vec<BackupPolicy> =
74        sqlx::query_as::<_, BackupPolicy>("SELECT * FROM backup_policies WHERE enabled = 1")
75            .fetch_all(&state.pool)
76            .await?;
77    for p in policies {
78        let due = match p.last_run_at {
79            None => true,
80            Some(last) => last + chrono::Duration::seconds(p.schedule_interval_s.max(1)) <= now,
81        };
82        if !due {
83            continue;
84        }
85        match create_policy_job(state, &p).await {
86            Ok(job_id) => spawn_job(state.clone(), job_id),
87            Err(e) => tracing::error!(policy = %p.id, error = %e, "backup: failed to create job"),
88        }
89    }
90    Ok(())
91}
92
93/// Insert a `policy` job from a policy and claim the policy (`last_run_at`/`last_job_id`) so the next
94/// tick does not re-trigger it. Returns the new job id.
95async fn create_policy_job(state: &AppState, p: &BackupPolicy) -> anyhow::Result<String> {
96    let now = Utc::now();
97    let job_id = format!("bkj_{}", Uuid::new_v4().simple());
98    let from_time = if p.lookback_hours > 0 {
99        Some(now - chrono::Duration::hours(p.lookback_hours))
100    } else {
101        None
102    };
103    let to_time = Some(now);
104    sqlx::query(
105        "INSERT INTO backup_jobs
106           (id, policy_id, destination_id, kind, camera_ids, from_time, to_time,
107            incident_lock_only, status, created_at)
108         VALUES (?, ?, ?, 'policy', ?, ?, ?, ?, 'pending', ?)",
109    )
110    .bind(&job_id)
111    .bind(&p.id)
112    .bind(&p.destination_id)
113    .bind(SqlxJson(p.camera_ids.0.clone()))
114    .bind(from_time)
115    .bind(to_time)
116    .bind(p.incident_lock_only)
117    .bind(now)
118    .execute(&state.pool)
119    .await?;
120    sqlx::query(
121        "UPDATE backup_policies SET last_run_at = ?, last_job_id = ?, updated_at = ? WHERE id = ?",
122    )
123    .bind(now)
124    .bind(&job_id)
125    .bind(now)
126    .bind(&p.id)
127    .execute(&state.pool)
128    .await?;
129    Ok(job_id)
130}
131
132/// Manually trigger a policy: create its job and dispatch it (returns the job id immediately).
133pub async fn trigger_policy(state: &AppState, policy: &BackupPolicy) -> anyhow::Result<String> {
134    let job_id = create_policy_job(state, policy).await?;
135    spawn_job(state.clone(), job_id.clone());
136    Ok(job_id)
137}
138
139/// Spawn a detached task that acquires a concurrency permit then executes the job under the timeout.
140fn spawn_job(state: AppState, job_id: String) {
141    let sem = job_semaphore(&state.cfg);
142    let timeout = Duration::from_secs(state.cfg.backup_job_timeout_s.max(30));
143    tokio::spawn(async move {
144        let _permit = match sem.acquire_owned().await {
145            Ok(p) => p,
146            Err(_) => return,
147        };
148        execute_job(&state, &job_id, timeout).await;
149    });
150}
151
152/// Execute a destination-backed job: resolve its segments, copy them, and record progress + outcome.
153async fn execute_job(state: &AppState, job_id: &str, timeout: Duration) {
154    let Some(job) = sqlx::query_as::<_, BackupJob>("SELECT * FROM backup_jobs WHERE id = ?")
155        .bind(job_id)
156        .fetch_optional(&state.pool)
157        .await
158        .ok()
159        .flatten()
160    else {
161        return;
162    };
163
164    let dest = match &job.destination_id {
165        Some(d) => {
166            sqlx::query_as::<_, BackupDestination>("SELECT * FROM backup_destinations WHERE id = ?")
167                .bind(d)
168                .fetch_optional(&state.pool)
169                .await
170                .ok()
171                .flatten()
172        }
173        None => None,
174    };
175    let Some(dest) = dest else {
176        set_job_error(state, job_id, "backup destination not found or removed").await;
177        return;
178    };
179    if !dest.enabled {
180        set_job_error(state, job_id, "backup destination is disabled").await;
181        return;
182    }
183
184    let camera_ids = json_to_string_vec(&job.camera_ids.0);
185    let segments = match resolve_segments(
186        &state.pool,
187        &camera_ids,
188        job.from_time,
189        job.to_time,
190        job.incident_lock_only,
191    )
192    .await
193    {
194        Ok(s) => s,
195        Err(e) => {
196            set_job_error(state, job_id, &format!("resolving segments: {e}")).await;
197            return;
198        }
199    };
200
201    let files_total = segments.len() as i64;
202    let _ = sqlx::query(
203        "UPDATE backup_jobs SET status = 'running', files_total = ?, started_at = ? WHERE id = ?",
204    )
205    .bind(files_total)
206    .bind(Utc::now())
207    .bind(job_id)
208    .execute(&state.pool)
209    .await;
210
211    if segments.is_empty() {
212        let _ = sqlx::query(
213            "UPDATE backup_jobs SET status = 'completed', finished_at = ? WHERE id = ?",
214        )
215        .bind(Utc::now())
216        .bind(job_id)
217        .execute(&state.pool)
218        .await;
219        return;
220    }
221
222    // Read-lock the source segments so retention cannot prune them mid-transfer; always released
223    // after the (possibly timed-out) transfer future settles.
224    let seg_ids: Vec<String> = segments.iter().map(|s| s.id.clone()).collect();
225    let _read_lock = repo::SegReadLock::acquire(&state.pool, seg_ids).await;
226    let outcome =
227        tokio::time::timeout(timeout, copy_segments(state, job_id, &dest, &segments)).await;
228
229    match outcome {
230        Err(_) => set_job_error(state, job_id, "backup job timed out").await,
231        Ok(Err(e)) => set_job_error(state, job_id, &e.to_string()).await,
232        Ok(Ok((copied, bytes))) => {
233            let _ = sqlx::query(
234                "UPDATE backup_jobs SET status = 'completed', files_copied = ?, bytes_copied = ?, finished_at = ? WHERE id = ?",
235            )
236            .bind(copied as i64)
237            .bind(bytes as i64)
238            .bind(Utc::now())
239            .bind(job_id)
240            .execute(&state.pool)
241            .await;
242            tracing::info!(job = job_id, files = copied, bytes, "backup: job completed");
243        }
244    }
245}
246
247async fn set_job_error(state: &AppState, job_id: &str, msg: &str) {
248    tracing::warn!(job = job_id, error = msg, "backup: job failed");
249    let _ = sqlx::query(
250        "UPDATE backup_jobs SET status = 'error', error = ?, finished_at = ? WHERE id = ?",
251    )
252    .bind(msg)
253    .bind(Utc::now())
254    .bind(job_id)
255    .execute(&state.pool)
256    .await;
257}
258
259/// Dispatch the transfer by destination kind. Returns (files_copied, bytes_copied).
260async fn copy_segments(
261    state: &AppState,
262    job_id: &str,
263    dest: &BackupDestination,
264    segments: &[Segment],
265) -> anyhow::Result<(u64, u64)> {
266    match dest.kind.as_str() {
267        "local" => copy_local(state, job_id, dest, segments).await,
268        "sftp" | "ftp" | "s3" => copy_rclone(state, job_id, dest, segments).await,
269        other => anyhow::bail!("unknown backup destination kind `{other}`"),
270    }
271}
272
273/// Local / NAS-mount destination: std fs copy into `{path}/{camera_id}/{file}`.
274async fn copy_local(
275    state: &AppState,
276    job_id: &str,
277    dest: &BackupDestination,
278    segments: &[Segment],
279) -> anyhow::Result<(u64, u64)> {
280    let base = cfg_str(&dest.config.0, "path");
281    if base.is_empty() {
282        anyhow::bail!("local destination has no `path` configured");
283    }
284    let base = Path::new(&base);
285    let mut copied = 0u64;
286    let mut bytes = 0u64;
287    for seg in segments {
288        let cam_dir = base.join(&seg.camera_id);
289        tokio::fs::create_dir_all(&cam_dir)
290            .await
291            .map_err(|e| anyhow::anyhow!("creating {}: {e}", cam_dir.display()))?;
292        let target = cam_dir.join(file_name_of(&seg.path));
293        match tokio::fs::copy(&seg.path, &target).await {
294            Ok(n) => {
295                copied += 1;
296                bytes += n;
297            }
298            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
299                tracing::warn!(job = job_id, path = %seg.path, "backup: source segment vanished; skipping");
300            }
301            Err(e) => anyhow::bail!("copying {}: {e}", seg.path),
302        }
303        update_progress(state, job_id, copied, bytes).await;
304    }
305    Ok((copied, bytes))
306}
307
308/// Remote destination (sftp/ftp/s3) via rclone. Degrades to a clear error when rclone is missing.
309async fn copy_rclone(
310    state: &AppState,
311    job_id: &str,
312    dest: &BackupDestination,
313    segments: &[Segment],
314) -> anyhow::Result<(u64, u64)> {
315    let bin = &state.cfg.rclone_bin;
316    if !binary_available(bin).await {
317        anyhow::bail!(
318            "rclone binary `{bin}` is not available; install rclone or set HELDAR_RCLONE_BIN \
319             (remote sftp/ftp/s3 backup requires it; local/NAS destinations do not)"
320        );
321    }
322    let (remote, base, secrets) = build_remote(bin, &dest.kind, &dest.config.0).await?;
323    let mut copied = 0u64;
324    let mut bytes = 0u64;
325    for seg in segments {
326        let rel = join_path(&base, &[&seg.camera_id, &file_name_of(&seg.path)]);
327        let target = format!("{remote}{rel}");
328        let out = Command::new(bin)
329            .arg("copyto")
330            .arg(&seg.path)
331            .arg(&target)
332            .arg("--no-traverse")
333            .stdin(Stdio::null())
334            .stdout(Stdio::null())
335            .stderr(Stdio::piped())
336            .kill_on_drop(true)
337            .output()
338            .await
339            .map_err(|e| anyhow::anyhow!("spawning rclone: {e}"))?;
340        if out.status.success() {
341            copied += 1;
342            bytes += seg.size_bytes.max(0) as u64;
343        } else {
344            let err = scrub(&String::from_utf8_lossy(&out.stderr), &secrets);
345            anyhow::bail!(
346                "rclone copy failed for {}: {}",
347                file_name_of(&seg.path),
348                err.trim()
349            );
350        }
351        update_progress(state, job_id, copied, bytes).await;
352    }
353    Ok((copied, bytes))
354}
355
356async fn update_progress(state: &AppState, job_id: &str, copied: u64, bytes: u64) {
357    let _ = sqlx::query("UPDATE backup_jobs SET files_copied = ?, bytes_copied = ? WHERE id = ?")
358        .bind(copied as i64)
359        .bind(bytes as i64)
360        .bind(job_id)
361        .execute(&state.pool)
362        .await;
363}
364
365// ---- On-demand archive export ----
366
367/// Build a `.zip` of the selected segments and record it as an `on_demand_archive` job. Enforces the
368/// archive size cap on the source footprint; runs inline (bounded by the job timeout) so the returned
369/// job already carries `output_url`.
370pub async fn create_archive(
371    state: &AppState,
372    camera_ids: Vec<String>,
373    from: Option<DateTime<Utc>>,
374    to: Option<DateTime<Utc>>,
375    incident_lock_only: bool,
376    trim: bool,
377) -> AppResult<BackupJob> {
378    if trim && (from.is_none() || to.is_none()) {
379        return Err(AppError::BadRequest(
380            "`trim` requires both `from` and `to`".into(),
381        ));
382    }
383    let segments = resolve_segments(&state.pool, &camera_ids, from, to, incident_lock_only).await?;
384    if segments.is_empty() {
385        return Err(AppError::NotFound(
386            "no recorded footage matches the requested archive selection".into(),
387        ));
388    }
389    let total_bytes: i64 = segments.iter().map(|s| s.size_bytes.max(0)).sum();
390    if total_bytes as u64 > state.cfg.archive_max_bytes {
391        return Err(AppError::BadRequest(format!(
392            "archive selection is {total_bytes} bytes; exceeds the limit of {} bytes (HELDAR_ARCHIVE_MAX_BYTES)",
393            state.cfg.archive_max_bytes
394        )));
395    }
396
397    tokio::fs::create_dir_all(&state.cfg.archive_dir)
398        .await
399        .map_err(|e| AppError::Other(e.into()))?;
400
401    let job_id = format!("bkj_{}", Uuid::new_v4().simple());
402    let now = Utc::now();
403    let files_total = segments.len() as i64;
404    sqlx::query(
405        "INSERT INTO backup_jobs
406           (id, policy_id, destination_id, kind, camera_ids, from_time, to_time,
407            incident_lock_only, status, files_total, started_at, created_at)
408         VALUES (?, NULL, NULL, 'on_demand_archive', ?, ?, ?, ?, 'running', ?, ?, ?)",
409    )
410    .bind(&job_id)
411    .bind(SqlxJson(json_from_strs(&camera_ids)))
412    .bind(from)
413    .bind(to)
414    .bind(incident_lock_only)
415    .bind(files_total)
416    .bind(now)
417    .bind(now)
418    .execute(&state.pool)
419    .await?;
420
421    // Read-lock the sources for the duration of the zip/trim (released on every outcome).
422    let seg_ids: Vec<String> = segments.iter().map(|s| s.id.clone()).collect();
423    let _read_lock = repo::SegReadLock::acquire(&state.pool, seg_ids).await;
424    let timeout = Duration::from_secs(state.cfg.backup_job_timeout_s.max(30));
425    let outcome = tokio::time::timeout(
426        timeout,
427        build_archive_zip(state, &job_id, &segments, from, to, trim),
428    )
429    .await;
430
431    let out_path = state.cfg.archive_dir.join(format!("{job_id}.zip"));
432    match outcome {
433        Err(_) => {
434            let _ = tokio::fs::remove_file(&out_path).await;
435            set_job_error(state, &job_id, "archive export timed out").await;
436            return Err(AppError::Other(anyhow::anyhow!("archive export timed out")));
437        }
438        Ok(Err(e)) => {
439            let _ = tokio::fs::remove_file(&out_path).await;
440            set_job_error(state, &job_id, &e.to_string()).await;
441            return Err(AppError::Other(e));
442        }
443        Ok(Ok(zip_bytes)) => {
444            let url = format!("/media/archives/{job_id}.zip");
445            sqlx::query(
446                "UPDATE backup_jobs SET status = 'completed', files_copied = ?, bytes_copied = ?, output_path = ?, output_url = ?, finished_at = ? WHERE id = ?",
447            )
448            .bind(files_total)
449            .bind(zip_bytes as i64)
450            .bind(out_path.to_string_lossy().to_string())
451            .bind(&url)
452            .bind(Utc::now())
453            .bind(&job_id)
454            .execute(&state.pool)
455            .await?;
456        }
457    }
458
459    let job = sqlx::query_as::<_, BackupJob>("SELECT * FROM backup_jobs WHERE id = ?")
460        .bind(&job_id)
461        .fetch_one(&state.pool)
462        .await?;
463    Ok(job)
464}
465
466/// Stage the selected segments under a temp dir (symlinks, or ffmpeg-trimmed copies) then zip them.
467/// Returns the produced zip's size in bytes. The staging dir is always removed.
468async fn build_archive_zip(
469    state: &AppState,
470    job_id: &str,
471    segments: &[Segment],
472    from: Option<DateTime<Utc>>,
473    to: Option<DateTime<Utc>>,
474    trim: bool,
475) -> anyhow::Result<u64> {
476    let staging = state.cfg.archive_dir.join(format!("{job_id}.stage"));
477    let out_path = state.cfg.archive_dir.join(format!("{job_id}.zip"));
478    let _ = tokio::fs::remove_dir_all(&staging).await;
479    let _ = tokio::fs::remove_file(&out_path).await;
480
481    let inner = async {
482        tokio::fs::create_dir_all(&staging).await?;
483        for seg in segments {
484            let cam_dir = staging.join(&seg.camera_id);
485            tokio::fs::create_dir_all(&cam_dir).await?;
486            let link = cam_dir.join(file_name_of(&seg.path));
487            if trim {
488                // from/to are guaranteed Some when trim is set (validated by the caller).
489                trim_segment(state, seg, from.unwrap(), to.unwrap(), &link).await?;
490            } else {
491                match tokio::fs::symlink(&seg.path, &link).await {
492                    Ok(()) => {}
493                    Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
494                    Err(e) => return Err(anyhow::anyhow!("staging {}: {e}", seg.path)),
495                }
496            }
497        }
498        // zip recursively from the staging dir; the output lives in the parent (archive_dir), so the
499        // archive never tries to include itself. zip follows symlinks by default (stores content).
500        let out = Command::new(ZIP_BIN)
501            .current_dir(&staging)
502            .arg("-r")
503            .arg("-q")
504            .arg(&out_path)
505            .arg(".")
506            .stdin(Stdio::null())
507            .stdout(Stdio::null())
508            .stderr(Stdio::piped())
509            .kill_on_drop(true)
510            .output()
511            .await
512            .map_err(|e| anyhow::anyhow!("spawning zip ({ZIP_BIN}): {e}"))?;
513        if !out.status.success() {
514            anyhow::bail!(
515                "zip failed: {}",
516                String::from_utf8_lossy(&out.stderr).trim()
517            );
518        }
519        let size = tokio::fs::metadata(&out_path)
520            .await
521            .map(|m| m.len())
522            .unwrap_or(0);
523        Ok::<u64, anyhow::Error>(size)
524    }
525    .await;
526
527    let _ = tokio::fs::remove_dir_all(&staging).await;
528    inner
529}
530
531/// Re-mux the [from, to] overlap of a segment into `out` (`-c copy`, keyframe-aligned like clip export).
532async fn trim_segment(
533    state: &AppState,
534    seg: &Segment,
535    from: DateTime<Utc>,
536    to: DateTime<Utc>,
537    out: &Path,
538) -> anyhow::Result<()> {
539    let win_start = from.max(seg.start_time);
540    let win_end = to.min(seg.end_time);
541    let ss = ((win_start - seg.start_time).num_milliseconds() as f64 / 1000.0).max(0.0);
542    let dur = (win_end - win_start).num_milliseconds() as f64 / 1000.0;
543    if dur <= 0.0 {
544        // No meaningful overlap (resolve_segments already filters to overlapping rows, so this is a
545        // rare edge); fall back to staging the whole segment.
546        let _ = tokio::fs::symlink(&seg.path, out).await;
547        return Ok(());
548    }
549    let out_status = Command::new(&state.cfg.ffmpeg_bin)
550        .kill_on_drop(true)
551        .args(["-hide_banner", "-loglevel", "error"])
552        .args(["-ss", &format!("{ss:.3}")])
553        .arg("-i")
554        .arg(&seg.path)
555        .args(["-t", &format!("{dur:.3}")])
556        .args([
557            "-c",
558            "copy",
559            "-avoid_negative_ts",
560            "make_zero",
561            "-movflags",
562            "+faststart",
563        ])
564        .arg(out)
565        .stdin(Stdio::null())
566        .stdout(Stdio::null())
567        .stderr(Stdio::piped())
568        .output()
569        .await
570        .map_err(|e| anyhow::anyhow!("spawning ffmpeg: {e}"))?;
571    if !out_status.status.success() {
572        anyhow::bail!(
573            "ffmpeg trim failed for {}: {}",
574            file_name_of(&seg.path),
575            String::from_utf8_lossy(&out_status.stderr).trim()
576        );
577    }
578    Ok(())
579}
580
581// ---- Destination connectivity test ----
582
583/// Probe a destination: writability for `local`, a short rclone connectivity check for remotes.
584pub async fn test_destination(state: &AppState, dest: &BackupDestination) -> BackupTestResult {
585    let start = std::time::Instant::now();
586    let res = match dest.kind.as_str() {
587        "local" => test_local(&dest.config.0).await,
588        "sftp" | "ftp" | "s3" => test_rclone(state, dest).await,
589        other => Err(anyhow::anyhow!("unknown destination kind `{other}`")),
590    };
591    let latency_ms = start.elapsed().as_millis() as i64;
592    match res {
593        Ok(()) => BackupTestResult {
594            ok: true,
595            error: None,
596            latency_ms,
597        },
598        Err(e) => BackupTestResult {
599            ok: false,
600            error: Some(e.to_string()),
601            latency_ms,
602        },
603    }
604}
605
606async fn test_local(config: &Value) -> anyhow::Result<()> {
607    let base = cfg_str(config, "path");
608    if base.is_empty() {
609        anyhow::bail!("local destination requires `path`");
610    }
611    tokio::fs::create_dir_all(&base)
612        .await
613        .map_err(|e| anyhow::anyhow!("cannot create {base}: {e}"))?;
614    let probe = Path::new(&base).join(".heldar_backup_probe");
615    tokio::fs::write(&probe, b"ok")
616        .await
617        .map_err(|e| anyhow::anyhow!("path not writable: {e}"))?;
618    let _ = tokio::fs::remove_file(&probe).await;
619    Ok(())
620}
621
622async fn test_rclone(state: &AppState, dest: &BackupDestination) -> anyhow::Result<()> {
623    let bin = &state.cfg.rclone_bin;
624    if !binary_available(bin).await {
625        anyhow::bail!(
626            "rclone binary `{bin}` is not available; install rclone or set HELDAR_RCLONE_BIN \
627             (remote sftp/ftp/s3 backup requires it)"
628        );
629    }
630    let (remote, base, secrets) = build_remote(bin, &dest.kind, &dest.config.0).await?;
631    let target = format!("{remote}{base}");
632    let out = tokio::time::timeout(
633        Duration::from_secs(30),
634        Command::new(bin)
635            .arg("lsd")
636            .arg(&target)
637            .args(["--max-depth", "1"])
638            .stdin(Stdio::null())
639            .stdout(Stdio::null())
640            .stderr(Stdio::piped())
641            .kill_on_drop(true)
642            .output(),
643    )
644    .await
645    .map_err(|_| anyhow::anyhow!("rclone connectivity test timed out"))?
646    .map_err(|e| anyhow::anyhow!("spawning rclone: {e}"))?;
647    if !out.status.success() {
648        anyhow::bail!(
649            "rclone could not reach destination: {}",
650            scrub(&String::from_utf8_lossy(&out.stderr), &secrets).trim()
651        );
652    }
653    Ok(())
654}
655
656// ---- shared helpers ----
657
658/// Fetch the segments a job/archive should ship: optionally bounded by camera ids + a [from, to)
659/// overlap window, optionally restricted to evidence-locked footage.
660async fn resolve_segments(
661    pool: &SqlitePool,
662    camera_ids: &[String],
663    from: Option<DateTime<Utc>>,
664    to: Option<DateTime<Utc>>,
665    incident_lock_only: bool,
666) -> sqlx::Result<Vec<Segment>> {
667    let mut sql = String::from("SELECT * FROM segments WHERE 1 = 1");
668    if !camera_ids.is_empty() {
669        let placeholders = vec!["?"; camera_ids.len()].join(",");
670        sql.push_str(&format!(" AND camera_id IN ({placeholders})"));
671    }
672    sql.push_str(" AND (? IS NULL OR start_time < ?) AND (? IS NULL OR end_time > ?)");
673    if incident_lock_only {
674        sql.push_str(" AND evidence_locked = 1");
675    }
676    sql.push_str(" ORDER BY camera_id ASC, start_time ASC");
677
678    let mut q = sqlx::query_as::<_, Segment>(&sql);
679    for id in camera_ids {
680        q = q.bind(id);
681    }
682    q = q.bind(to).bind(to).bind(from).bind(from);
683    q.fetch_all(pool).await
684}
685
686/// Whether an external binary is runnable (so missing rclone degrades to a clear error, not a panic).
687async fn binary_available(bin: &str) -> bool {
688    Command::new(bin)
689        .arg("version")
690        .stdin(Stdio::null())
691        .stdout(Stdio::null())
692        .stderr(Stdio::null())
693        .kill_on_drop(true)
694        .status()
695        .await
696        .map(|s| s.success())
697        .unwrap_or(false)
698}
699
700/// Build an rclone on-the-fly connection-string remote (no persisted config) for a destination kind.
701/// Returns (remote_prefix_ending_in_colon, base_path, secrets_to_scrub_from_logs).
702async fn build_remote(
703    bin: &str,
704    kind: &str,
705    config: &Value,
706) -> anyhow::Result<(String, String, Vec<String>)> {
707    let mut secrets: Vec<String> = Vec::new();
708    match kind {
709        "sftp" | "ftp" => {
710            let host = cfg_str(config, "host");
711            if host.is_empty() {
712                anyhow::bail!("{kind} destination requires `host`");
713            }
714            let user = cfg_str(config, "user");
715            let pass = cfg_str(config, "pass");
716            let port = config
717                .get("port")
718                .and_then(|p| p.as_i64())
719                .map(|p| p.to_string())
720                .unwrap_or_default();
721            let mut parts = vec![format!(":{kind}"), format!("host={host}")];
722            if !port.is_empty() {
723                parts.push(format!("port={port}"));
724            }
725            if !user.is_empty() {
726                parts.push(format!("user={user}"));
727            }
728            if !pass.is_empty() {
729                let obscured = rclone_obscure(bin, &pass).await?;
730                secrets.push(obscured.clone());
731                secrets.push(pass.clone());
732                parts.push(format!("pass={obscured}"));
733            }
734            Ok((
735                format!("{}:", parts.join(",")),
736                cfg_str(config, "path"),
737                secrets,
738            ))
739        }
740        "s3" => {
741            let bucket = cfg_str(config, "bucket");
742            if bucket.is_empty() {
743                anyhow::bail!("s3 destination requires `bucket`");
744            }
745            let access_key = cfg_str(config, "access_key");
746            let secret_key = cfg_str(config, "secret_key");
747            let endpoint = cfg_str(config, "endpoint");
748            let region = cfg_str(config, "region");
749            let mut parts = vec![":s3".to_string(), "provider=Other".to_string()];
750            if !access_key.is_empty() {
751                parts.push(format!("access_key_id={access_key}"));
752            }
753            if !secret_key.is_empty() {
754                secrets.push(secret_key.clone());
755                parts.push(format!("secret_access_key={secret_key}"));
756            }
757            if !endpoint.is_empty() {
758                parts.push(format!("endpoint={endpoint}"));
759            }
760            if !region.is_empty() {
761                parts.push(format!("region={region}"));
762            }
763            let base = join_path("", &[&bucket, &cfg_str(config, "prefix")]);
764            Ok((format!("{}:", parts.join(",")), base, secrets))
765        }
766        other => anyhow::bail!("kind `{other}` does not use rclone"),
767    }
768}
769
770/// Obscure a plaintext password into rclone's at-rest form (only invoked when rclone is present).
771async fn rclone_obscure(bin: &str, pass: &str) -> anyhow::Result<String> {
772    let out = Command::new(bin)
773        .arg("obscure")
774        .arg(pass)
775        .stdin(Stdio::null())
776        .stdout(Stdio::piped())
777        .stderr(Stdio::piped())
778        .kill_on_drop(true)
779        .output()
780        .await
781        .map_err(|e| anyhow::anyhow!("spawning rclone obscure: {e}"))?;
782    if !out.status.success() {
783        anyhow::bail!(
784            "rclone obscure failed: {}",
785            String::from_utf8_lossy(&out.stderr).trim()
786        );
787    }
788    Ok(String::from_utf8_lossy(&out.stdout).trim().to_string())
789}
790
791/// Join a base path with extra segments using `/`, preserving a leading slash on `base` (absolute
792/// remote paths) but never producing a double slash.
793fn join_path(base: &str, parts: &[&str]) -> String {
794    let mut out = base.trim_end_matches('/').to_string();
795    for p in parts {
796        let p = p.trim_matches('/');
797        if p.is_empty() {
798            continue;
799        }
800        if !out.is_empty() {
801            out.push('/');
802        }
803        out.push_str(p);
804    }
805    out
806}
807
808/// Replace any known secret substrings in a log/error string with `***`.
809fn scrub(s: &str, secrets: &[String]) -> String {
810    let mut out = s.to_string();
811    for sec in secrets {
812        if !sec.is_empty() {
813            out = out.replace(sec.as_str(), "***");
814        }
815    }
816    out
817}
818
819fn cfg_str(config: &Value, key: &str) -> String {
820    config
821        .get(key)
822        .and_then(|v| v.as_str())
823        .unwrap_or("")
824        .trim()
825        .to_string()
826}
827
828fn file_name_of(path: &str) -> String {
829    Path::new(path)
830        .file_name()
831        .and_then(|s| s.to_str())
832        .unwrap_or("segment.mp4")
833        .to_string()
834}
835
836fn json_to_string_vec(v: &Value) -> Vec<String> {
837    v.as_array()
838        .map(|a| {
839            a.iter()
840                .filter_map(|x| x.as_str().map(String::from))
841                .collect()
842        })
843        .unwrap_or_default()
844}
845
846fn json_from_strs(v: &[String]) -> Value {
847    Value::Array(v.iter().map(|s| Value::String(s.clone())).collect())
848}
849
850#[cfg(test)]
851mod tests {
852    use super::*;
853    use serde_json::json;
854
855    #[test]
856    fn join_path_preserves_leading_slash() {
857        assert_eq!(
858            join_path("/srv/backups", &["cam1", "f.mp4"]),
859            "/srv/backups/cam1/f.mp4"
860        );
861        assert_eq!(
862            join_path("backups/", &["cam1", "f.mp4"]),
863            "backups/cam1/f.mp4"
864        );
865        assert_eq!(join_path("", &["cam1", "f.mp4"]), "cam1/f.mp4");
866        assert_eq!(join_path("bucket", &["", "p"]), "bucket/p");
867    }
868
869    #[test]
870    fn scrub_masks_secrets() {
871        let s = "auth failed for pass=hunter2 token=hunter2";
872        assert_eq!(
873            scrub(s, &["hunter2".into()]),
874            "auth failed for pass=*** token=***"
875        );
876        assert_eq!(scrub("nothing", &["".into()]), "nothing");
877    }
878
879    #[test]
880    fn json_string_vec_roundtrip() {
881        let v = json!(["a", "b", 3, "c"]);
882        assert_eq!(json_to_string_vec(&v), vec!["a", "b", "c"]);
883        assert_eq!(json_to_string_vec(&json!("nope")), Vec::<String>::new());
884        assert_eq!(json_from_strs(&["x".into(), "y".into()]), json!(["x", "y"]));
885    }
886
887    #[test]
888    fn cfg_str_reads_and_trims() {
889        let c = json!({ "host": "  example.com ", "port": 22 });
890        assert_eq!(cfg_str(&c, "host"), "example.com");
891        assert_eq!(cfg_str(&c, "missing"), "");
892        // non-string fields read as empty
893        assert_eq!(cfg_str(&c, "port"), "");
894    }
895
896    #[test]
897    fn file_name_of_extracts_basename() {
898        assert_eq!(
899            file_name_of("/data/recordings/cam1/20260613_120000.mp4"),
900            "20260613_120000.mp4"
901        );
902        assert_eq!(file_name_of(""), "segment.mp4");
903    }
904}