1use std::path::Path;
18use std::process::Stdio;
19use std::sync::{Arc, OnceLock};
20use std::time::Duration;
21
22use chrono::{DateTime, Utc};
23use serde_json::Value;
24use sqlx::types::Json as SqlxJson;
25use sqlx::SqlitePool;
26use tokio::process::Command;
27use tokio::sync::Semaphore;
28use uuid::Uuid;
29
30use crate::config::Config;
31use crate::error::{AppError, AppResult};
32use crate::models::{BackupDestination, BackupJob, BackupPolicy, BackupTestResult, Segment};
33use crate::repo;
34use crate::state::AppState;
35
36const ZIP_BIN: &str = "/usr/bin/zip";
38
39fn job_semaphore(cfg: &Config) -> Arc<Semaphore> {
43 static SEM: OnceLock<Arc<Semaphore>> = OnceLock::new();
44 SEM.get_or_init(|| Arc::new(Semaphore::new(cfg.backup_max_concurrent_jobs.max(1))))
45 .clone()
46}
47
48pub async fn run(state: AppState) {
51 if !state.cfg.backup_enabled {
52 tracing::info!("backup: scheduler disabled (HELDAR_BACKUP_ENABLED=false)");
53 return;
54 }
55 let interval_s = state.cfg.backup_scheduler_interval_s.max(5);
56 tracing::info!(
57 interval_s,
58 max_concurrent = state.cfg.backup_max_concurrent_jobs,
59 "backup: scheduler started"
60 );
61 let mut tick = tokio::time::interval(Duration::from_secs(interval_s));
62 loop {
63 tick.tick().await;
64 if let Err(e) = sweep(&state).await {
65 tracing::error!(error = %e, "backup: scheduler tick failed");
66 }
67 }
68}
69
70async fn sweep(state: &AppState) -> anyhow::Result<()> {
72 let now = Utc::now();
73 let policies: Vec<BackupPolicy> =
74 sqlx::query_as::<_, BackupPolicy>("SELECT * FROM backup_policies WHERE enabled = 1")
75 .fetch_all(&state.pool)
76 .await?;
77 for p in policies {
78 let due = match p.last_run_at {
79 None => true,
80 Some(last) => last + chrono::Duration::seconds(p.schedule_interval_s.max(1)) <= now,
81 };
82 if !due {
83 continue;
84 }
85 match create_policy_job(state, &p).await {
86 Ok(job_id) => spawn_job(state.clone(), job_id),
87 Err(e) => tracing::error!(policy = %p.id, error = %e, "backup: failed to create job"),
88 }
89 }
90 Ok(())
91}
92
93async fn create_policy_job(state: &AppState, p: &BackupPolicy) -> anyhow::Result<String> {
96 let now = Utc::now();
97 let job_id = format!("bkj_{}", Uuid::new_v4().simple());
98 let from_time = if p.lookback_hours > 0 {
99 Some(now - chrono::Duration::hours(p.lookback_hours))
100 } else {
101 None
102 };
103 let to_time = Some(now);
104 sqlx::query(
105 "INSERT INTO backup_jobs
106 (id, policy_id, destination_id, kind, camera_ids, from_time, to_time,
107 incident_lock_only, status, created_at)
108 VALUES (?, ?, ?, 'policy', ?, ?, ?, ?, 'pending', ?)",
109 )
110 .bind(&job_id)
111 .bind(&p.id)
112 .bind(&p.destination_id)
113 .bind(SqlxJson(p.camera_ids.0.clone()))
114 .bind(from_time)
115 .bind(to_time)
116 .bind(p.incident_lock_only)
117 .bind(now)
118 .execute(&state.pool)
119 .await?;
120 sqlx::query(
121 "UPDATE backup_policies SET last_run_at = ?, last_job_id = ?, updated_at = ? WHERE id = ?",
122 )
123 .bind(now)
124 .bind(&job_id)
125 .bind(now)
126 .bind(&p.id)
127 .execute(&state.pool)
128 .await?;
129 Ok(job_id)
130}
131
132pub async fn trigger_policy(state: &AppState, policy: &BackupPolicy) -> anyhow::Result<String> {
134 let job_id = create_policy_job(state, policy).await?;
135 spawn_job(state.clone(), job_id.clone());
136 Ok(job_id)
137}
138
139fn spawn_job(state: AppState, job_id: String) {
141 let sem = job_semaphore(&state.cfg);
142 let timeout = Duration::from_secs(state.cfg.backup_job_timeout_s.max(30));
143 tokio::spawn(async move {
144 let _permit = match sem.acquire_owned().await {
145 Ok(p) => p,
146 Err(_) => return,
147 };
148 execute_job(&state, &job_id, timeout).await;
149 });
150}
151
152async fn execute_job(state: &AppState, job_id: &str, timeout: Duration) {
154 let Some(job) = sqlx::query_as::<_, BackupJob>("SELECT * FROM backup_jobs WHERE id = ?")
155 .bind(job_id)
156 .fetch_optional(&state.pool)
157 .await
158 .ok()
159 .flatten()
160 else {
161 return;
162 };
163
164 let dest = match &job.destination_id {
165 Some(d) => {
166 sqlx::query_as::<_, BackupDestination>("SELECT * FROM backup_destinations WHERE id = ?")
167 .bind(d)
168 .fetch_optional(&state.pool)
169 .await
170 .ok()
171 .flatten()
172 }
173 None => None,
174 };
175 let Some(dest) = dest else {
176 set_job_error(state, job_id, "backup destination not found or removed").await;
177 return;
178 };
179 if !dest.enabled {
180 set_job_error(state, job_id, "backup destination is disabled").await;
181 return;
182 }
183
184 let camera_ids = json_to_string_vec(&job.camera_ids.0);
185 let segments = match resolve_segments(
186 &state.pool,
187 &camera_ids,
188 job.from_time,
189 job.to_time,
190 job.incident_lock_only,
191 )
192 .await
193 {
194 Ok(s) => s,
195 Err(e) => {
196 set_job_error(state, job_id, &format!("resolving segments: {e}")).await;
197 return;
198 }
199 };
200
201 let files_total = segments.len() as i64;
202 let _ = sqlx::query(
203 "UPDATE backup_jobs SET status = 'running', files_total = ?, started_at = ? WHERE id = ?",
204 )
205 .bind(files_total)
206 .bind(Utc::now())
207 .bind(job_id)
208 .execute(&state.pool)
209 .await;
210
211 if segments.is_empty() {
212 let _ = sqlx::query(
213 "UPDATE backup_jobs SET status = 'completed', finished_at = ? WHERE id = ?",
214 )
215 .bind(Utc::now())
216 .bind(job_id)
217 .execute(&state.pool)
218 .await;
219 return;
220 }
221
222 let seg_ids: Vec<String> = segments.iter().map(|s| s.id.clone()).collect();
225 repo::set_segments_locked(&state.pool, &seg_ids, true).await;
226 let outcome =
227 tokio::time::timeout(timeout, copy_segments(state, job_id, &dest, &segments)).await;
228 repo::set_segments_locked(&state.pool, &seg_ids, false).await;
229
230 match outcome {
231 Err(_) => set_job_error(state, job_id, "backup job timed out").await,
232 Ok(Err(e)) => set_job_error(state, job_id, &e.to_string()).await,
233 Ok(Ok((copied, bytes))) => {
234 let _ = sqlx::query(
235 "UPDATE backup_jobs SET status = 'completed', files_copied = ?, bytes_copied = ?, finished_at = ? WHERE id = ?",
236 )
237 .bind(copied as i64)
238 .bind(bytes as i64)
239 .bind(Utc::now())
240 .bind(job_id)
241 .execute(&state.pool)
242 .await;
243 tracing::info!(job = job_id, files = copied, bytes, "backup: job completed");
244 }
245 }
246}
247
248async fn set_job_error(state: &AppState, job_id: &str, msg: &str) {
249 tracing::warn!(job = job_id, error = msg, "backup: job failed");
250 let _ = sqlx::query(
251 "UPDATE backup_jobs SET status = 'error', error = ?, finished_at = ? WHERE id = ?",
252 )
253 .bind(msg)
254 .bind(Utc::now())
255 .bind(job_id)
256 .execute(&state.pool)
257 .await;
258}
259
260async fn copy_segments(
262 state: &AppState,
263 job_id: &str,
264 dest: &BackupDestination,
265 segments: &[Segment],
266) -> anyhow::Result<(u64, u64)> {
267 match dest.kind.as_str() {
268 "local" => copy_local(state, job_id, dest, segments).await,
269 "sftp" | "ftp" | "s3" => copy_rclone(state, job_id, dest, segments).await,
270 other => anyhow::bail!("unknown backup destination kind `{other}`"),
271 }
272}
273
274async fn copy_local(
276 state: &AppState,
277 job_id: &str,
278 dest: &BackupDestination,
279 segments: &[Segment],
280) -> anyhow::Result<(u64, u64)> {
281 let base = cfg_str(&dest.config.0, "path");
282 if base.is_empty() {
283 anyhow::bail!("local destination has no `path` configured");
284 }
285 let base = Path::new(&base);
286 let mut copied = 0u64;
287 let mut bytes = 0u64;
288 for seg in segments {
289 let cam_dir = base.join(&seg.camera_id);
290 tokio::fs::create_dir_all(&cam_dir)
291 .await
292 .map_err(|e| anyhow::anyhow!("creating {}: {e}", cam_dir.display()))?;
293 let target = cam_dir.join(file_name_of(&seg.path));
294 match tokio::fs::copy(&seg.path, &target).await {
295 Ok(n) => {
296 copied += 1;
297 bytes += n;
298 }
299 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
300 tracing::warn!(job = job_id, path = %seg.path, "backup: source segment vanished; skipping");
301 }
302 Err(e) => anyhow::bail!("copying {}: {e}", seg.path),
303 }
304 update_progress(state, job_id, copied, bytes).await;
305 }
306 Ok((copied, bytes))
307}
308
309async fn copy_rclone(
311 state: &AppState,
312 job_id: &str,
313 dest: &BackupDestination,
314 segments: &[Segment],
315) -> anyhow::Result<(u64, u64)> {
316 let bin = &state.cfg.rclone_bin;
317 if !binary_available(bin).await {
318 anyhow::bail!(
319 "rclone binary `{bin}` is not available; install rclone or set HELDAR_RCLONE_BIN \
320 (remote sftp/ftp/s3 backup requires it; local/NAS destinations do not)"
321 );
322 }
323 let (remote, base, secrets) = build_remote(bin, &dest.kind, &dest.config.0).await?;
324 let mut copied = 0u64;
325 let mut bytes = 0u64;
326 for seg in segments {
327 let rel = join_path(&base, &[&seg.camera_id, &file_name_of(&seg.path)]);
328 let target = format!("{remote}{rel}");
329 let out = Command::new(bin)
330 .arg("copyto")
331 .arg(&seg.path)
332 .arg(&target)
333 .arg("--no-traverse")
334 .stdin(Stdio::null())
335 .stdout(Stdio::null())
336 .stderr(Stdio::piped())
337 .kill_on_drop(true)
338 .output()
339 .await
340 .map_err(|e| anyhow::anyhow!("spawning rclone: {e}"))?;
341 if out.status.success() {
342 copied += 1;
343 bytes += seg.size_bytes.max(0) as u64;
344 } else {
345 let err = scrub(&String::from_utf8_lossy(&out.stderr), &secrets);
346 anyhow::bail!(
347 "rclone copy failed for {}: {}",
348 file_name_of(&seg.path),
349 err.trim()
350 );
351 }
352 update_progress(state, job_id, copied, bytes).await;
353 }
354 Ok((copied, bytes))
355}
356
357async fn update_progress(state: &AppState, job_id: &str, copied: u64, bytes: u64) {
358 let _ = sqlx::query("UPDATE backup_jobs SET files_copied = ?, bytes_copied = ? WHERE id = ?")
359 .bind(copied as i64)
360 .bind(bytes as i64)
361 .bind(job_id)
362 .execute(&state.pool)
363 .await;
364}
365
366pub async fn create_archive(
372 state: &AppState,
373 camera_ids: Vec<String>,
374 from: Option<DateTime<Utc>>,
375 to: Option<DateTime<Utc>>,
376 incident_lock_only: bool,
377 trim: bool,
378) -> AppResult<BackupJob> {
379 if trim && (from.is_none() || to.is_none()) {
380 return Err(AppError::BadRequest(
381 "`trim` requires both `from` and `to`".into(),
382 ));
383 }
384 let segments = resolve_segments(&state.pool, &camera_ids, from, to, incident_lock_only).await?;
385 if segments.is_empty() {
386 return Err(AppError::NotFound(
387 "no recorded footage matches the requested archive selection".into(),
388 ));
389 }
390 let total_bytes: i64 = segments.iter().map(|s| s.size_bytes.max(0)).sum();
391 if total_bytes as u64 > state.cfg.archive_max_bytes {
392 return Err(AppError::BadRequest(format!(
393 "archive selection is {total_bytes} bytes; exceeds the limit of {} bytes (HELDAR_ARCHIVE_MAX_BYTES)",
394 state.cfg.archive_max_bytes
395 )));
396 }
397
398 tokio::fs::create_dir_all(&state.cfg.archive_dir)
399 .await
400 .map_err(|e| AppError::Other(e.into()))?;
401
402 let job_id = format!("bkj_{}", Uuid::new_v4().simple());
403 let now = Utc::now();
404 let files_total = segments.len() as i64;
405 sqlx::query(
406 "INSERT INTO backup_jobs
407 (id, policy_id, destination_id, kind, camera_ids, from_time, to_time,
408 incident_lock_only, status, files_total, started_at, created_at)
409 VALUES (?, NULL, NULL, 'on_demand_archive', ?, ?, ?, ?, 'running', ?, ?, ?)",
410 )
411 .bind(&job_id)
412 .bind(SqlxJson(json_from_strs(&camera_ids)))
413 .bind(from)
414 .bind(to)
415 .bind(incident_lock_only)
416 .bind(files_total)
417 .bind(now)
418 .bind(now)
419 .execute(&state.pool)
420 .await?;
421
422 let seg_ids: Vec<String> = segments.iter().map(|s| s.id.clone()).collect();
424 repo::set_segments_locked(&state.pool, &seg_ids, true).await;
425 let timeout = Duration::from_secs(state.cfg.backup_job_timeout_s.max(30));
426 let outcome = tokio::time::timeout(
427 timeout,
428 build_archive_zip(state, &job_id, &segments, from, to, trim),
429 )
430 .await;
431 repo::set_segments_locked(&state.pool, &seg_ids, false).await;
432
433 let out_path = state.cfg.archive_dir.join(format!("{job_id}.zip"));
434 match outcome {
435 Err(_) => {
436 let _ = tokio::fs::remove_file(&out_path).await;
437 set_job_error(state, &job_id, "archive export timed out").await;
438 return Err(AppError::Other(anyhow::anyhow!("archive export timed out")));
439 }
440 Ok(Err(e)) => {
441 let _ = tokio::fs::remove_file(&out_path).await;
442 set_job_error(state, &job_id, &e.to_string()).await;
443 return Err(AppError::Other(e));
444 }
445 Ok(Ok(zip_bytes)) => {
446 let url = format!("/media/archives/{job_id}.zip");
447 sqlx::query(
448 "UPDATE backup_jobs SET status = 'completed', files_copied = ?, bytes_copied = ?, output_path = ?, output_url = ?, finished_at = ? WHERE id = ?",
449 )
450 .bind(files_total)
451 .bind(zip_bytes as i64)
452 .bind(out_path.to_string_lossy().to_string())
453 .bind(&url)
454 .bind(Utc::now())
455 .bind(&job_id)
456 .execute(&state.pool)
457 .await?;
458 }
459 }
460
461 let job = sqlx::query_as::<_, BackupJob>("SELECT * FROM backup_jobs WHERE id = ?")
462 .bind(&job_id)
463 .fetch_one(&state.pool)
464 .await?;
465 Ok(job)
466}
467
468async fn build_archive_zip(
471 state: &AppState,
472 job_id: &str,
473 segments: &[Segment],
474 from: Option<DateTime<Utc>>,
475 to: Option<DateTime<Utc>>,
476 trim: bool,
477) -> anyhow::Result<u64> {
478 let staging = state.cfg.archive_dir.join(format!("{job_id}.stage"));
479 let out_path = state.cfg.archive_dir.join(format!("{job_id}.zip"));
480 let _ = tokio::fs::remove_dir_all(&staging).await;
481 let _ = tokio::fs::remove_file(&out_path).await;
482
483 let inner = async {
484 tokio::fs::create_dir_all(&staging).await?;
485 for seg in segments {
486 let cam_dir = staging.join(&seg.camera_id);
487 tokio::fs::create_dir_all(&cam_dir).await?;
488 let link = cam_dir.join(file_name_of(&seg.path));
489 if trim {
490 trim_segment(state, seg, from.unwrap(), to.unwrap(), &link).await?;
492 } else {
493 match tokio::fs::symlink(&seg.path, &link).await {
494 Ok(()) => {}
495 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
496 Err(e) => return Err(anyhow::anyhow!("staging {}: {e}", seg.path)),
497 }
498 }
499 }
500 let out = Command::new(ZIP_BIN)
503 .current_dir(&staging)
504 .arg("-r")
505 .arg("-q")
506 .arg(&out_path)
507 .arg(".")
508 .stdin(Stdio::null())
509 .stdout(Stdio::null())
510 .stderr(Stdio::piped())
511 .kill_on_drop(true)
512 .output()
513 .await
514 .map_err(|e| anyhow::anyhow!("spawning zip ({ZIP_BIN}): {e}"))?;
515 if !out.status.success() {
516 anyhow::bail!(
517 "zip failed: {}",
518 String::from_utf8_lossy(&out.stderr).trim()
519 );
520 }
521 let size = tokio::fs::metadata(&out_path)
522 .await
523 .map(|m| m.len())
524 .unwrap_or(0);
525 Ok::<u64, anyhow::Error>(size)
526 }
527 .await;
528
529 let _ = tokio::fs::remove_dir_all(&staging).await;
530 inner
531}
532
533async fn trim_segment(
535 state: &AppState,
536 seg: &Segment,
537 from: DateTime<Utc>,
538 to: DateTime<Utc>,
539 out: &Path,
540) -> anyhow::Result<()> {
541 let win_start = from.max(seg.start_time);
542 let win_end = to.min(seg.end_time);
543 let ss = ((win_start - seg.start_time).num_milliseconds() as f64 / 1000.0).max(0.0);
544 let dur = (win_end - win_start).num_milliseconds() as f64 / 1000.0;
545 if dur <= 0.0 {
546 let _ = tokio::fs::symlink(&seg.path, out).await;
549 return Ok(());
550 }
551 let out_status = Command::new(&state.cfg.ffmpeg_bin)
552 .kill_on_drop(true)
553 .args(["-hide_banner", "-loglevel", "error"])
554 .args(["-ss", &format!("{ss:.3}")])
555 .arg("-i")
556 .arg(&seg.path)
557 .args(["-t", &format!("{dur:.3}")])
558 .args([
559 "-c",
560 "copy",
561 "-avoid_negative_ts",
562 "make_zero",
563 "-movflags",
564 "+faststart",
565 ])
566 .arg(out)
567 .stdin(Stdio::null())
568 .stdout(Stdio::null())
569 .stderr(Stdio::piped())
570 .output()
571 .await
572 .map_err(|e| anyhow::anyhow!("spawning ffmpeg: {e}"))?;
573 if !out_status.status.success() {
574 anyhow::bail!(
575 "ffmpeg trim failed for {}: {}",
576 file_name_of(&seg.path),
577 String::from_utf8_lossy(&out_status.stderr).trim()
578 );
579 }
580 Ok(())
581}
582
583pub async fn test_destination(state: &AppState, dest: &BackupDestination) -> BackupTestResult {
587 let start = std::time::Instant::now();
588 let res = match dest.kind.as_str() {
589 "local" => test_local(&dest.config.0).await,
590 "sftp" | "ftp" | "s3" => test_rclone(state, dest).await,
591 other => Err(anyhow::anyhow!("unknown destination kind `{other}`")),
592 };
593 let latency_ms = start.elapsed().as_millis() as i64;
594 match res {
595 Ok(()) => BackupTestResult {
596 ok: true,
597 error: None,
598 latency_ms,
599 },
600 Err(e) => BackupTestResult {
601 ok: false,
602 error: Some(e.to_string()),
603 latency_ms,
604 },
605 }
606}
607
608async fn test_local(config: &Value) -> anyhow::Result<()> {
609 let base = cfg_str(config, "path");
610 if base.is_empty() {
611 anyhow::bail!("local destination requires `path`");
612 }
613 tokio::fs::create_dir_all(&base)
614 .await
615 .map_err(|e| anyhow::anyhow!("cannot create {base}: {e}"))?;
616 let probe = Path::new(&base).join(".heldar_backup_probe");
617 tokio::fs::write(&probe, b"ok")
618 .await
619 .map_err(|e| anyhow::anyhow!("path not writable: {e}"))?;
620 let _ = tokio::fs::remove_file(&probe).await;
621 Ok(())
622}
623
624async fn test_rclone(state: &AppState, dest: &BackupDestination) -> anyhow::Result<()> {
625 let bin = &state.cfg.rclone_bin;
626 if !binary_available(bin).await {
627 anyhow::bail!(
628 "rclone binary `{bin}` is not available; install rclone or set HELDAR_RCLONE_BIN \
629 (remote sftp/ftp/s3 backup requires it)"
630 );
631 }
632 let (remote, base, secrets) = build_remote(bin, &dest.kind, &dest.config.0).await?;
633 let target = format!("{remote}{base}");
634 let out = tokio::time::timeout(
635 Duration::from_secs(30),
636 Command::new(bin)
637 .arg("lsd")
638 .arg(&target)
639 .args(["--max-depth", "1"])
640 .stdin(Stdio::null())
641 .stdout(Stdio::null())
642 .stderr(Stdio::piped())
643 .kill_on_drop(true)
644 .output(),
645 )
646 .await
647 .map_err(|_| anyhow::anyhow!("rclone connectivity test timed out"))?
648 .map_err(|e| anyhow::anyhow!("spawning rclone: {e}"))?;
649 if !out.status.success() {
650 anyhow::bail!(
651 "rclone could not reach destination: {}",
652 scrub(&String::from_utf8_lossy(&out.stderr), &secrets).trim()
653 );
654 }
655 Ok(())
656}
657
658async fn resolve_segments(
663 pool: &SqlitePool,
664 camera_ids: &[String],
665 from: Option<DateTime<Utc>>,
666 to: Option<DateTime<Utc>>,
667 incident_lock_only: bool,
668) -> sqlx::Result<Vec<Segment>> {
669 let mut sql = String::from("SELECT * FROM segments WHERE 1 = 1");
670 if !camera_ids.is_empty() {
671 let placeholders = vec!["?"; camera_ids.len()].join(",");
672 sql.push_str(&format!(" AND camera_id IN ({placeholders})"));
673 }
674 sql.push_str(" AND (? IS NULL OR start_time < ?) AND (? IS NULL OR end_time > ?)");
675 if incident_lock_only {
676 sql.push_str(" AND evidence_locked = 1");
677 }
678 sql.push_str(" ORDER BY camera_id ASC, start_time ASC");
679
680 let mut q = sqlx::query_as::<_, Segment>(&sql);
681 for id in camera_ids {
682 q = q.bind(id);
683 }
684 q = q.bind(to).bind(to).bind(from).bind(from);
685 q.fetch_all(pool).await
686}
687
688async fn binary_available(bin: &str) -> bool {
690 Command::new(bin)
691 .arg("version")
692 .stdin(Stdio::null())
693 .stdout(Stdio::null())
694 .stderr(Stdio::null())
695 .kill_on_drop(true)
696 .status()
697 .await
698 .map(|s| s.success())
699 .unwrap_or(false)
700}
701
702async fn build_remote(
705 bin: &str,
706 kind: &str,
707 config: &Value,
708) -> anyhow::Result<(String, String, Vec<String>)> {
709 let mut secrets: Vec<String> = Vec::new();
710 match kind {
711 "sftp" | "ftp" => {
712 let host = cfg_str(config, "host");
713 if host.is_empty() {
714 anyhow::bail!("{kind} destination requires `host`");
715 }
716 let user = cfg_str(config, "user");
717 let pass = cfg_str(config, "pass");
718 let port = config
719 .get("port")
720 .and_then(|p| p.as_i64())
721 .map(|p| p.to_string())
722 .unwrap_or_default();
723 let mut parts = vec![format!(":{kind}"), format!("host={host}")];
724 if !port.is_empty() {
725 parts.push(format!("port={port}"));
726 }
727 if !user.is_empty() {
728 parts.push(format!("user={user}"));
729 }
730 if !pass.is_empty() {
731 let obscured = rclone_obscure(bin, &pass).await?;
732 secrets.push(obscured.clone());
733 secrets.push(pass.clone());
734 parts.push(format!("pass={obscured}"));
735 }
736 Ok((
737 format!("{}:", parts.join(",")),
738 cfg_str(config, "path"),
739 secrets,
740 ))
741 }
742 "s3" => {
743 let bucket = cfg_str(config, "bucket");
744 if bucket.is_empty() {
745 anyhow::bail!("s3 destination requires `bucket`");
746 }
747 let access_key = cfg_str(config, "access_key");
748 let secret_key = cfg_str(config, "secret_key");
749 let endpoint = cfg_str(config, "endpoint");
750 let region = cfg_str(config, "region");
751 let mut parts = vec![":s3".to_string(), "provider=Other".to_string()];
752 if !access_key.is_empty() {
753 parts.push(format!("access_key_id={access_key}"));
754 }
755 if !secret_key.is_empty() {
756 secrets.push(secret_key.clone());
757 parts.push(format!("secret_access_key={secret_key}"));
758 }
759 if !endpoint.is_empty() {
760 parts.push(format!("endpoint={endpoint}"));
761 }
762 if !region.is_empty() {
763 parts.push(format!("region={region}"));
764 }
765 let base = join_path("", &[&bucket, &cfg_str(config, "prefix")]);
766 Ok((format!("{}:", parts.join(",")), base, secrets))
767 }
768 other => anyhow::bail!("kind `{other}` does not use rclone"),
769 }
770}
771
772async fn rclone_obscure(bin: &str, pass: &str) -> anyhow::Result<String> {
774 let out = Command::new(bin)
775 .arg("obscure")
776 .arg(pass)
777 .stdin(Stdio::null())
778 .stdout(Stdio::piped())
779 .stderr(Stdio::piped())
780 .kill_on_drop(true)
781 .output()
782 .await
783 .map_err(|e| anyhow::anyhow!("spawning rclone obscure: {e}"))?;
784 if !out.status.success() {
785 anyhow::bail!(
786 "rclone obscure failed: {}",
787 String::from_utf8_lossy(&out.stderr).trim()
788 );
789 }
790 Ok(String::from_utf8_lossy(&out.stdout).trim().to_string())
791}
792
793fn join_path(base: &str, parts: &[&str]) -> String {
796 let mut out = base.trim_end_matches('/').to_string();
797 for p in parts {
798 let p = p.trim_matches('/');
799 if p.is_empty() {
800 continue;
801 }
802 if !out.is_empty() {
803 out.push('/');
804 }
805 out.push_str(p);
806 }
807 out
808}
809
810fn scrub(s: &str, secrets: &[String]) -> String {
812 let mut out = s.to_string();
813 for sec in secrets {
814 if !sec.is_empty() {
815 out = out.replace(sec.as_str(), "***");
816 }
817 }
818 out
819}
820
821fn cfg_str(config: &Value, key: &str) -> String {
822 config
823 .get(key)
824 .and_then(|v| v.as_str())
825 .unwrap_or("")
826 .trim()
827 .to_string()
828}
829
830fn file_name_of(path: &str) -> String {
831 Path::new(path)
832 .file_name()
833 .and_then(|s| s.to_str())
834 .unwrap_or("segment.mp4")
835 .to_string()
836}
837
838fn json_to_string_vec(v: &Value) -> Vec<String> {
839 v.as_array()
840 .map(|a| {
841 a.iter()
842 .filter_map(|x| x.as_str().map(String::from))
843 .collect()
844 })
845 .unwrap_or_default()
846}
847
848fn json_from_strs(v: &[String]) -> Value {
849 Value::Array(v.iter().map(|s| Value::String(s.clone())).collect())
850}
851
852#[cfg(test)]
853mod tests {
854 use super::*;
855 use serde_json::json;
856
857 #[test]
858 fn join_path_preserves_leading_slash() {
859 assert_eq!(
860 join_path("/srv/backups", &["cam1", "f.mp4"]),
861 "/srv/backups/cam1/f.mp4"
862 );
863 assert_eq!(
864 join_path("backups/", &["cam1", "f.mp4"]),
865 "backups/cam1/f.mp4"
866 );
867 assert_eq!(join_path("", &["cam1", "f.mp4"]), "cam1/f.mp4");
868 assert_eq!(join_path("bucket", &["", "p"]), "bucket/p");
869 }
870
871 #[test]
872 fn scrub_masks_secrets() {
873 let s = "auth failed for pass=hunter2 token=hunter2";
874 assert_eq!(
875 scrub(s, &["hunter2".into()]),
876 "auth failed for pass=*** token=***"
877 );
878 assert_eq!(scrub("nothing", &["".into()]), "nothing");
879 }
880
881 #[test]
882 fn json_string_vec_roundtrip() {
883 let v = json!(["a", "b", 3, "c"]);
884 assert_eq!(json_to_string_vec(&v), vec!["a", "b", "c"]);
885 assert_eq!(json_to_string_vec(&json!("nope")), Vec::<String>::new());
886 assert_eq!(json_from_strs(&["x".into(), "y".into()]), json!(["x", "y"]));
887 }
888
889 #[test]
890 fn cfg_str_reads_and_trims() {
891 let c = json!({ "host": " example.com ", "port": 22 });
892 assert_eq!(cfg_str(&c, "host"), "example.com");
893 assert_eq!(cfg_str(&c, "missing"), "");
894 assert_eq!(cfg_str(&c, "port"), "");
896 }
897
898 #[test]
899 fn file_name_of_extracts_basename() {
900 assert_eq!(
901 file_name_of("/data/recordings/cam1/20260613_120000.mp4"),
902 "20260613_120000.mp4"
903 );
904 assert_eq!(file_name_of(""), "segment.mp4");
905 }
906}