1use std::path::Path;
18use std::process::Stdio;
19use std::sync::{Arc, OnceLock};
20use std::time::Duration;
21
22use chrono::{DateTime, Utc};
23use serde_json::Value;
24use sqlx::types::Json as SqlxJson;
25use sqlx::SqlitePool;
26use tokio::process::Command;
27use tokio::sync::Semaphore;
28use uuid::Uuid;
29
30use crate::config::Config;
31use crate::error::{AppError, AppResult};
32use crate::models::{BackupDestination, BackupJob, BackupPolicy, BackupTestResult, Segment};
33use crate::repo;
34use crate::state::AppState;
35
36const ZIP_BIN: &str = "/usr/bin/zip";
38
39fn job_semaphore(cfg: &Config) -> Arc<Semaphore> {
43 static SEM: OnceLock<Arc<Semaphore>> = OnceLock::new();
44 SEM.get_or_init(|| Arc::new(Semaphore::new(cfg.backup_max_concurrent_jobs.max(1))))
45 .clone()
46}
47
48pub async fn run(state: AppState) {
51 if !state.cfg.backup_enabled {
52 tracing::info!("backup: scheduler disabled (HELDAR_BACKUP_ENABLED=false)");
53 return;
54 }
55 let interval_s = state.cfg.backup_scheduler_interval_s.max(5);
56 tracing::info!(
57 interval_s,
58 max_concurrent = state.cfg.backup_max_concurrent_jobs,
59 "backup: scheduler started"
60 );
61 let mut tick = tokio::time::interval(Duration::from_secs(interval_s));
62 loop {
63 tick.tick().await;
64 if let Err(e) = sweep(&state).await {
65 tracing::error!(error = %e, "backup: scheduler tick failed");
66 }
67 }
68}
69
70async fn sweep(state: &AppState) -> anyhow::Result<()> {
72 let now = Utc::now();
73 let policies: Vec<BackupPolicy> =
74 sqlx::query_as::<_, BackupPolicy>("SELECT * FROM backup_policies WHERE enabled = 1")
75 .fetch_all(&state.pool)
76 .await?;
77 for p in policies {
78 let due = match p.last_run_at {
79 None => true,
80 Some(last) => last + chrono::Duration::seconds(p.schedule_interval_s.max(1)) <= now,
81 };
82 if !due {
83 continue;
84 }
85 match create_policy_job(state, &p).await {
86 Ok(job_id) => spawn_job(state.clone(), job_id),
87 Err(e) => tracing::error!(policy = %p.id, error = %e, "backup: failed to create job"),
88 }
89 }
90 Ok(())
91}
92
93async fn create_policy_job(state: &AppState, p: &BackupPolicy) -> anyhow::Result<String> {
96 let now = Utc::now();
97 let job_id = format!("bkj_{}", Uuid::new_v4().simple());
98 let from_time = if p.lookback_hours > 0 {
99 Some(now - chrono::Duration::hours(p.lookback_hours))
100 } else {
101 None
102 };
103 let to_time = Some(now);
104 sqlx::query(
105 "INSERT INTO backup_jobs
106 (id, policy_id, destination_id, kind, camera_ids, from_time, to_time,
107 incident_lock_only, status, created_at)
108 VALUES (?, ?, ?, 'policy', ?, ?, ?, ?, 'pending', ?)",
109 )
110 .bind(&job_id)
111 .bind(&p.id)
112 .bind(&p.destination_id)
113 .bind(SqlxJson(p.camera_ids.0.clone()))
114 .bind(from_time)
115 .bind(to_time)
116 .bind(p.incident_lock_only)
117 .bind(now)
118 .execute(&state.pool)
119 .await?;
120 sqlx::query(
121 "UPDATE backup_policies SET last_run_at = ?, last_job_id = ?, updated_at = ? WHERE id = ?",
122 )
123 .bind(now)
124 .bind(&job_id)
125 .bind(now)
126 .bind(&p.id)
127 .execute(&state.pool)
128 .await?;
129 Ok(job_id)
130}
131
132pub async fn trigger_policy(state: &AppState, policy: &BackupPolicy) -> anyhow::Result<String> {
134 let job_id = create_policy_job(state, policy).await?;
135 spawn_job(state.clone(), job_id.clone());
136 Ok(job_id)
137}
138
139fn spawn_job(state: AppState, job_id: String) {
141 let sem = job_semaphore(&state.cfg);
142 let timeout = Duration::from_secs(state.cfg.backup_job_timeout_s.max(30));
143 tokio::spawn(async move {
144 let _permit = match sem.acquire_owned().await {
145 Ok(p) => p,
146 Err(_) => return,
147 };
148 execute_job(&state, &job_id, timeout).await;
149 });
150}
151
152async fn execute_job(state: &AppState, job_id: &str, timeout: Duration) {
154 let Some(job) = sqlx::query_as::<_, BackupJob>("SELECT * FROM backup_jobs WHERE id = ?")
155 .bind(job_id)
156 .fetch_optional(&state.pool)
157 .await
158 .ok()
159 .flatten()
160 else {
161 return;
162 };
163
164 let dest = match &job.destination_id {
165 Some(d) => {
166 sqlx::query_as::<_, BackupDestination>("SELECT * FROM backup_destinations WHERE id = ?")
167 .bind(d)
168 .fetch_optional(&state.pool)
169 .await
170 .ok()
171 .flatten()
172 }
173 None => None,
174 };
175 let Some(dest) = dest else {
176 set_job_error(state, job_id, "backup destination not found or removed").await;
177 return;
178 };
179 if !dest.enabled {
180 set_job_error(state, job_id, "backup destination is disabled").await;
181 return;
182 }
183
184 let camera_ids = json_to_string_vec(&job.camera_ids.0);
185 let segments = match resolve_segments(
186 &state.pool,
187 &camera_ids,
188 job.from_time,
189 job.to_time,
190 job.incident_lock_only,
191 )
192 .await
193 {
194 Ok(s) => s,
195 Err(e) => {
196 set_job_error(state, job_id, &format!("resolving segments: {e}")).await;
197 return;
198 }
199 };
200
201 let files_total = segments.len() as i64;
202 let _ = sqlx::query(
203 "UPDATE backup_jobs SET status = 'running', files_total = ?, started_at = ? WHERE id = ?",
204 )
205 .bind(files_total)
206 .bind(Utc::now())
207 .bind(job_id)
208 .execute(&state.pool)
209 .await;
210
211 if segments.is_empty() {
212 let _ = sqlx::query(
213 "UPDATE backup_jobs SET status = 'completed', finished_at = ? WHERE id = ?",
214 )
215 .bind(Utc::now())
216 .bind(job_id)
217 .execute(&state.pool)
218 .await;
219 return;
220 }
221
222 let seg_ids: Vec<String> = segments.iter().map(|s| s.id.clone()).collect();
225 let _read_lock = repo::SegReadLock::acquire(&state.pool, seg_ids).await;
226 let outcome =
227 tokio::time::timeout(timeout, copy_segments(state, job_id, &dest, &segments)).await;
228
229 match outcome {
230 Err(_) => set_job_error(state, job_id, "backup job timed out").await,
231 Ok(Err(e)) => set_job_error(state, job_id, &e.to_string()).await,
232 Ok(Ok((copied, bytes))) => {
233 let _ = sqlx::query(
234 "UPDATE backup_jobs SET status = 'completed', files_copied = ?, bytes_copied = ?, finished_at = ? WHERE id = ?",
235 )
236 .bind(copied as i64)
237 .bind(bytes as i64)
238 .bind(Utc::now())
239 .bind(job_id)
240 .execute(&state.pool)
241 .await;
242 tracing::info!(job = job_id, files = copied, bytes, "backup: job completed");
243 }
244 }
245}
246
247async fn set_job_error(state: &AppState, job_id: &str, msg: &str) {
248 tracing::warn!(job = job_id, error = msg, "backup: job failed");
249 let _ = sqlx::query(
250 "UPDATE backup_jobs SET status = 'error', error = ?, finished_at = ? WHERE id = ?",
251 )
252 .bind(msg)
253 .bind(Utc::now())
254 .bind(job_id)
255 .execute(&state.pool)
256 .await;
257}
258
259async fn copy_segments(
261 state: &AppState,
262 job_id: &str,
263 dest: &BackupDestination,
264 segments: &[Segment],
265) -> anyhow::Result<(u64, u64)> {
266 match dest.kind.as_str() {
267 "local" => copy_local(state, job_id, dest, segments).await,
268 "sftp" | "ftp" | "s3" => copy_rclone(state, job_id, dest, segments).await,
269 other => anyhow::bail!("unknown backup destination kind `{other}`"),
270 }
271}
272
273async fn copy_local(
275 state: &AppState,
276 job_id: &str,
277 dest: &BackupDestination,
278 segments: &[Segment],
279) -> anyhow::Result<(u64, u64)> {
280 let base = cfg_str(&dest.config.0, "path");
281 if base.is_empty() {
282 anyhow::bail!("local destination has no `path` configured");
283 }
284 let base = Path::new(&base);
285 let mut copied = 0u64;
286 let mut bytes = 0u64;
287 for seg in segments {
288 let cam_dir = base.join(&seg.camera_id);
289 tokio::fs::create_dir_all(&cam_dir)
290 .await
291 .map_err(|e| anyhow::anyhow!("creating {}: {e}", cam_dir.display()))?;
292 let target = cam_dir.join(file_name_of(&seg.path));
293 match tokio::fs::copy(&seg.path, &target).await {
294 Ok(n) => {
295 copied += 1;
296 bytes += n;
297 }
298 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
299 tracing::warn!(job = job_id, path = %seg.path, "backup: source segment vanished; skipping");
300 }
301 Err(e) => anyhow::bail!("copying {}: {e}", seg.path),
302 }
303 update_progress(state, job_id, copied, bytes).await;
304 }
305 Ok((copied, bytes))
306}
307
308async fn copy_rclone(
310 state: &AppState,
311 job_id: &str,
312 dest: &BackupDestination,
313 segments: &[Segment],
314) -> anyhow::Result<(u64, u64)> {
315 let bin = &state.cfg.rclone_bin;
316 if !binary_available(bin).await {
317 anyhow::bail!(
318 "rclone binary `{bin}` is not available; install rclone or set HELDAR_RCLONE_BIN \
319 (remote sftp/ftp/s3 backup requires it; local/NAS destinations do not)"
320 );
321 }
322 let (remote, base, secrets) = build_remote(bin, &dest.kind, &dest.config.0).await?;
323 let mut copied = 0u64;
324 let mut bytes = 0u64;
325 for seg in segments {
326 let rel = join_path(&base, &[&seg.camera_id, &file_name_of(&seg.path)]);
327 let target = format!("{remote}{rel}");
328 let out = Command::new(bin)
329 .arg("copyto")
330 .arg(&seg.path)
331 .arg(&target)
332 .arg("--no-traverse")
333 .stdin(Stdio::null())
334 .stdout(Stdio::null())
335 .stderr(Stdio::piped())
336 .kill_on_drop(true)
337 .output()
338 .await
339 .map_err(|e| anyhow::anyhow!("spawning rclone: {e}"))?;
340 if out.status.success() {
341 copied += 1;
342 bytes += seg.size_bytes.max(0) as u64;
343 } else {
344 let err = scrub(&String::from_utf8_lossy(&out.stderr), &secrets);
345 anyhow::bail!(
346 "rclone copy failed for {}: {}",
347 file_name_of(&seg.path),
348 err.trim()
349 );
350 }
351 update_progress(state, job_id, copied, bytes).await;
352 }
353 Ok((copied, bytes))
354}
355
356async fn update_progress(state: &AppState, job_id: &str, copied: u64, bytes: u64) {
357 let _ = sqlx::query("UPDATE backup_jobs SET files_copied = ?, bytes_copied = ? WHERE id = ?")
358 .bind(copied as i64)
359 .bind(bytes as i64)
360 .bind(job_id)
361 .execute(&state.pool)
362 .await;
363}
364
365pub async fn create_archive(
371 state: &AppState,
372 camera_ids: Vec<String>,
373 from: Option<DateTime<Utc>>,
374 to: Option<DateTime<Utc>>,
375 incident_lock_only: bool,
376 trim: bool,
377) -> AppResult<BackupJob> {
378 if trim && (from.is_none() || to.is_none()) {
379 return Err(AppError::BadRequest(
380 "`trim` requires both `from` and `to`".into(),
381 ));
382 }
383 let segments = resolve_segments(&state.pool, &camera_ids, from, to, incident_lock_only).await?;
384 if segments.is_empty() {
385 return Err(AppError::NotFound(
386 "no recorded footage matches the requested archive selection".into(),
387 ));
388 }
389 let total_bytes: i64 = segments.iter().map(|s| s.size_bytes.max(0)).sum();
390 if total_bytes as u64 > state.cfg.archive_max_bytes {
391 return Err(AppError::BadRequest(format!(
392 "archive selection is {total_bytes} bytes; exceeds the limit of {} bytes (HELDAR_ARCHIVE_MAX_BYTES)",
393 state.cfg.archive_max_bytes
394 )));
395 }
396
397 tokio::fs::create_dir_all(&state.cfg.archive_dir)
398 .await
399 .map_err(|e| AppError::Other(e.into()))?;
400
401 let job_id = format!("bkj_{}", Uuid::new_v4().simple());
402 let now = Utc::now();
403 let files_total = segments.len() as i64;
404 sqlx::query(
405 "INSERT INTO backup_jobs
406 (id, policy_id, destination_id, kind, camera_ids, from_time, to_time,
407 incident_lock_only, status, files_total, started_at, created_at)
408 VALUES (?, NULL, NULL, 'on_demand_archive', ?, ?, ?, ?, 'running', ?, ?, ?)",
409 )
410 .bind(&job_id)
411 .bind(SqlxJson(json_from_strs(&camera_ids)))
412 .bind(from)
413 .bind(to)
414 .bind(incident_lock_only)
415 .bind(files_total)
416 .bind(now)
417 .bind(now)
418 .execute(&state.pool)
419 .await?;
420
421 let seg_ids: Vec<String> = segments.iter().map(|s| s.id.clone()).collect();
423 let _read_lock = repo::SegReadLock::acquire(&state.pool, seg_ids).await;
424 let timeout = Duration::from_secs(state.cfg.backup_job_timeout_s.max(30));
425 let outcome = tokio::time::timeout(
426 timeout,
427 build_archive_zip(state, &job_id, &segments, from, to, trim),
428 )
429 .await;
430
431 let out_path = state.cfg.archive_dir.join(format!("{job_id}.zip"));
432 match outcome {
433 Err(_) => {
434 let _ = tokio::fs::remove_file(&out_path).await;
435 set_job_error(state, &job_id, "archive export timed out").await;
436 return Err(AppError::Other(anyhow::anyhow!("archive export timed out")));
437 }
438 Ok(Err(e)) => {
439 let _ = tokio::fs::remove_file(&out_path).await;
440 set_job_error(state, &job_id, &e.to_string()).await;
441 return Err(AppError::Other(e));
442 }
443 Ok(Ok(zip_bytes)) => {
444 let url = format!("/media/archives/{job_id}.zip");
445 sqlx::query(
446 "UPDATE backup_jobs SET status = 'completed', files_copied = ?, bytes_copied = ?, output_path = ?, output_url = ?, finished_at = ? WHERE id = ?",
447 )
448 .bind(files_total)
449 .bind(zip_bytes as i64)
450 .bind(out_path.to_string_lossy().to_string())
451 .bind(&url)
452 .bind(Utc::now())
453 .bind(&job_id)
454 .execute(&state.pool)
455 .await?;
456 }
457 }
458
459 let job = sqlx::query_as::<_, BackupJob>("SELECT * FROM backup_jobs WHERE id = ?")
460 .bind(&job_id)
461 .fetch_one(&state.pool)
462 .await?;
463 Ok(job)
464}
465
466async fn build_archive_zip(
469 state: &AppState,
470 job_id: &str,
471 segments: &[Segment],
472 from: Option<DateTime<Utc>>,
473 to: Option<DateTime<Utc>>,
474 trim: bool,
475) -> anyhow::Result<u64> {
476 let staging = state.cfg.archive_dir.join(format!("{job_id}.stage"));
477 let out_path = state.cfg.archive_dir.join(format!("{job_id}.zip"));
478 let _ = tokio::fs::remove_dir_all(&staging).await;
479 let _ = tokio::fs::remove_file(&out_path).await;
480
481 let inner = async {
482 tokio::fs::create_dir_all(&staging).await?;
483 for seg in segments {
484 let cam_dir = staging.join(&seg.camera_id);
485 tokio::fs::create_dir_all(&cam_dir).await?;
486 let link = cam_dir.join(file_name_of(&seg.path));
487 if trim {
488 trim_segment(state, seg, from.unwrap(), to.unwrap(), &link).await?;
490 } else {
491 match tokio::fs::symlink(&seg.path, &link).await {
492 Ok(()) => {}
493 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
494 Err(e) => return Err(anyhow::anyhow!("staging {}: {e}", seg.path)),
495 }
496 }
497 }
498 let out = Command::new(ZIP_BIN)
501 .current_dir(&staging)
502 .arg("-r")
503 .arg("-q")
504 .arg(&out_path)
505 .arg(".")
506 .stdin(Stdio::null())
507 .stdout(Stdio::null())
508 .stderr(Stdio::piped())
509 .kill_on_drop(true)
510 .output()
511 .await
512 .map_err(|e| anyhow::anyhow!("spawning zip ({ZIP_BIN}): {e}"))?;
513 if !out.status.success() {
514 anyhow::bail!(
515 "zip failed: {}",
516 String::from_utf8_lossy(&out.stderr).trim()
517 );
518 }
519 let size = tokio::fs::metadata(&out_path)
520 .await
521 .map(|m| m.len())
522 .unwrap_or(0);
523 Ok::<u64, anyhow::Error>(size)
524 }
525 .await;
526
527 let _ = tokio::fs::remove_dir_all(&staging).await;
528 inner
529}
530
531async fn trim_segment(
533 state: &AppState,
534 seg: &Segment,
535 from: DateTime<Utc>,
536 to: DateTime<Utc>,
537 out: &Path,
538) -> anyhow::Result<()> {
539 let win_start = from.max(seg.start_time);
540 let win_end = to.min(seg.end_time);
541 let ss = ((win_start - seg.start_time).num_milliseconds() as f64 / 1000.0).max(0.0);
542 let dur = (win_end - win_start).num_milliseconds() as f64 / 1000.0;
543 if dur <= 0.0 {
544 let _ = tokio::fs::symlink(&seg.path, out).await;
547 return Ok(());
548 }
549 let out_status = Command::new(&state.cfg.ffmpeg_bin)
550 .kill_on_drop(true)
551 .args(["-hide_banner", "-loglevel", "error"])
552 .args(["-ss", &format!("{ss:.3}")])
553 .arg("-i")
554 .arg(&seg.path)
555 .args(["-t", &format!("{dur:.3}")])
556 .args([
557 "-c",
558 "copy",
559 "-avoid_negative_ts",
560 "make_zero",
561 "-movflags",
562 "+faststart",
563 ])
564 .arg(out)
565 .stdin(Stdio::null())
566 .stdout(Stdio::null())
567 .stderr(Stdio::piped())
568 .output()
569 .await
570 .map_err(|e| anyhow::anyhow!("spawning ffmpeg: {e}"))?;
571 if !out_status.status.success() {
572 anyhow::bail!(
573 "ffmpeg trim failed for {}: {}",
574 file_name_of(&seg.path),
575 String::from_utf8_lossy(&out_status.stderr).trim()
576 );
577 }
578 Ok(())
579}
580
581pub async fn test_destination(state: &AppState, dest: &BackupDestination) -> BackupTestResult {
585 let start = std::time::Instant::now();
586 let res = match dest.kind.as_str() {
587 "local" => test_local(&dest.config.0).await,
588 "sftp" | "ftp" | "s3" => test_rclone(state, dest).await,
589 other => Err(anyhow::anyhow!("unknown destination kind `{other}`")),
590 };
591 let latency_ms = start.elapsed().as_millis() as i64;
592 match res {
593 Ok(()) => BackupTestResult {
594 ok: true,
595 error: None,
596 latency_ms,
597 },
598 Err(e) => BackupTestResult {
599 ok: false,
600 error: Some(e.to_string()),
601 latency_ms,
602 },
603 }
604}
605
606async fn test_local(config: &Value) -> anyhow::Result<()> {
607 let base = cfg_str(config, "path");
608 if base.is_empty() {
609 anyhow::bail!("local destination requires `path`");
610 }
611 tokio::fs::create_dir_all(&base)
612 .await
613 .map_err(|e| anyhow::anyhow!("cannot create {base}: {e}"))?;
614 let probe = Path::new(&base).join(".heldar_backup_probe");
615 tokio::fs::write(&probe, b"ok")
616 .await
617 .map_err(|e| anyhow::anyhow!("path not writable: {e}"))?;
618 let _ = tokio::fs::remove_file(&probe).await;
619 Ok(())
620}
621
622async fn test_rclone(state: &AppState, dest: &BackupDestination) -> anyhow::Result<()> {
623 let bin = &state.cfg.rclone_bin;
624 if !binary_available(bin).await {
625 anyhow::bail!(
626 "rclone binary `{bin}` is not available; install rclone or set HELDAR_RCLONE_BIN \
627 (remote sftp/ftp/s3 backup requires it)"
628 );
629 }
630 let (remote, base, secrets) = build_remote(bin, &dest.kind, &dest.config.0).await?;
631 let target = format!("{remote}{base}");
632 let out = tokio::time::timeout(
633 Duration::from_secs(30),
634 Command::new(bin)
635 .arg("lsd")
636 .arg(&target)
637 .args(["--max-depth", "1"])
638 .stdin(Stdio::null())
639 .stdout(Stdio::null())
640 .stderr(Stdio::piped())
641 .kill_on_drop(true)
642 .output(),
643 )
644 .await
645 .map_err(|_| anyhow::anyhow!("rclone connectivity test timed out"))?
646 .map_err(|e| anyhow::anyhow!("spawning rclone: {e}"))?;
647 if !out.status.success() {
648 anyhow::bail!(
649 "rclone could not reach destination: {}",
650 scrub(&String::from_utf8_lossy(&out.stderr), &secrets).trim()
651 );
652 }
653 Ok(())
654}
655
656async fn resolve_segments(
661 pool: &SqlitePool,
662 camera_ids: &[String],
663 from: Option<DateTime<Utc>>,
664 to: Option<DateTime<Utc>>,
665 incident_lock_only: bool,
666) -> sqlx::Result<Vec<Segment>> {
667 let mut sql = String::from("SELECT * FROM segments WHERE 1 = 1");
668 if !camera_ids.is_empty() {
669 let placeholders = vec!["?"; camera_ids.len()].join(",");
670 sql.push_str(&format!(" AND camera_id IN ({placeholders})"));
671 }
672 sql.push_str(" AND (? IS NULL OR start_time < ?) AND (? IS NULL OR end_time > ?)");
673 if incident_lock_only {
674 sql.push_str(" AND evidence_locked = 1");
675 }
676 sql.push_str(" ORDER BY camera_id ASC, start_time ASC");
677
678 let mut q = sqlx::query_as::<_, Segment>(&sql);
679 for id in camera_ids {
680 q = q.bind(id);
681 }
682 q = q.bind(to).bind(to).bind(from).bind(from);
683 q.fetch_all(pool).await
684}
685
686async fn binary_available(bin: &str) -> bool {
688 Command::new(bin)
689 .arg("version")
690 .stdin(Stdio::null())
691 .stdout(Stdio::null())
692 .stderr(Stdio::null())
693 .kill_on_drop(true)
694 .status()
695 .await
696 .map(|s| s.success())
697 .unwrap_or(false)
698}
699
700async fn build_remote(
703 bin: &str,
704 kind: &str,
705 config: &Value,
706) -> anyhow::Result<(String, String, Vec<String>)> {
707 let mut secrets: Vec<String> = Vec::new();
708 match kind {
709 "sftp" | "ftp" => {
710 let host = cfg_str(config, "host");
711 if host.is_empty() {
712 anyhow::bail!("{kind} destination requires `host`");
713 }
714 let user = cfg_str(config, "user");
715 let pass = cfg_str(config, "pass");
716 let port = config
717 .get("port")
718 .and_then(|p| p.as_i64())
719 .map(|p| p.to_string())
720 .unwrap_or_default();
721 let mut parts = vec![format!(":{kind}"), format!("host={host}")];
722 if !port.is_empty() {
723 parts.push(format!("port={port}"));
724 }
725 if !user.is_empty() {
726 parts.push(format!("user={user}"));
727 }
728 if !pass.is_empty() {
729 let obscured = rclone_obscure(bin, &pass).await?;
730 secrets.push(obscured.clone());
731 secrets.push(pass.clone());
732 parts.push(format!("pass={obscured}"));
733 }
734 Ok((
735 format!("{}:", parts.join(",")),
736 cfg_str(config, "path"),
737 secrets,
738 ))
739 }
740 "s3" => {
741 let bucket = cfg_str(config, "bucket");
742 if bucket.is_empty() {
743 anyhow::bail!("s3 destination requires `bucket`");
744 }
745 let access_key = cfg_str(config, "access_key");
746 let secret_key = cfg_str(config, "secret_key");
747 let endpoint = cfg_str(config, "endpoint");
748 let region = cfg_str(config, "region");
749 let mut parts = vec![":s3".to_string(), "provider=Other".to_string()];
750 if !access_key.is_empty() {
751 parts.push(format!("access_key_id={access_key}"));
752 }
753 if !secret_key.is_empty() {
754 secrets.push(secret_key.clone());
755 parts.push(format!("secret_access_key={secret_key}"));
756 }
757 if !endpoint.is_empty() {
758 parts.push(format!("endpoint={endpoint}"));
759 }
760 if !region.is_empty() {
761 parts.push(format!("region={region}"));
762 }
763 let base = join_path("", &[&bucket, &cfg_str(config, "prefix")]);
764 Ok((format!("{}:", parts.join(",")), base, secrets))
765 }
766 other => anyhow::bail!("kind `{other}` does not use rclone"),
767 }
768}
769
770async fn rclone_obscure(bin: &str, pass: &str) -> anyhow::Result<String> {
772 let out = Command::new(bin)
773 .arg("obscure")
774 .arg(pass)
775 .stdin(Stdio::null())
776 .stdout(Stdio::piped())
777 .stderr(Stdio::piped())
778 .kill_on_drop(true)
779 .output()
780 .await
781 .map_err(|e| anyhow::anyhow!("spawning rclone obscure: {e}"))?;
782 if !out.status.success() {
783 anyhow::bail!(
784 "rclone obscure failed: {}",
785 String::from_utf8_lossy(&out.stderr).trim()
786 );
787 }
788 Ok(String::from_utf8_lossy(&out.stdout).trim().to_string())
789}
790
791fn join_path(base: &str, parts: &[&str]) -> String {
794 let mut out = base.trim_end_matches('/').to_string();
795 for p in parts {
796 let p = p.trim_matches('/');
797 if p.is_empty() {
798 continue;
799 }
800 if !out.is_empty() {
801 out.push('/');
802 }
803 out.push_str(p);
804 }
805 out
806}
807
808fn scrub(s: &str, secrets: &[String]) -> String {
810 let mut out = s.to_string();
811 for sec in secrets {
812 if !sec.is_empty() {
813 out = out.replace(sec.as_str(), "***");
814 }
815 }
816 out
817}
818
819fn cfg_str(config: &Value, key: &str) -> String {
820 config
821 .get(key)
822 .and_then(|v| v.as_str())
823 .unwrap_or("")
824 .trim()
825 .to_string()
826}
827
828fn file_name_of(path: &str) -> String {
829 Path::new(path)
830 .file_name()
831 .and_then(|s| s.to_str())
832 .unwrap_or("segment.mp4")
833 .to_string()
834}
835
836fn json_to_string_vec(v: &Value) -> Vec<String> {
837 v.as_array()
838 .map(|a| {
839 a.iter()
840 .filter_map(|x| x.as_str().map(String::from))
841 .collect()
842 })
843 .unwrap_or_default()
844}
845
846fn json_from_strs(v: &[String]) -> Value {
847 Value::Array(v.iter().map(|s| Value::String(s.clone())).collect())
848}
849
850#[cfg(test)]
851mod tests {
852 use super::*;
853 use serde_json::json;
854
855 #[test]
856 fn join_path_preserves_leading_slash() {
857 assert_eq!(
858 join_path("/srv/backups", &["cam1", "f.mp4"]),
859 "/srv/backups/cam1/f.mp4"
860 );
861 assert_eq!(
862 join_path("backups/", &["cam1", "f.mp4"]),
863 "backups/cam1/f.mp4"
864 );
865 assert_eq!(join_path("", &["cam1", "f.mp4"]), "cam1/f.mp4");
866 assert_eq!(join_path("bucket", &["", "p"]), "bucket/p");
867 }
868
869 #[test]
870 fn scrub_masks_secrets() {
871 let s = "auth failed for pass=hunter2 token=hunter2";
872 assert_eq!(
873 scrub(s, &["hunter2".into()]),
874 "auth failed for pass=*** token=***"
875 );
876 assert_eq!(scrub("nothing", &["".into()]), "nothing");
877 }
878
879 #[test]
880 fn json_string_vec_roundtrip() {
881 let v = json!(["a", "b", 3, "c"]);
882 assert_eq!(json_to_string_vec(&v), vec!["a", "b", "c"]);
883 assert_eq!(json_to_string_vec(&json!("nope")), Vec::<String>::new());
884 assert_eq!(json_from_strs(&["x".into(), "y".into()]), json!(["x", "y"]));
885 }
886
887 #[test]
888 fn cfg_str_reads_and_trims() {
889 let c = json!({ "host": " example.com ", "port": 22 });
890 assert_eq!(cfg_str(&c, "host"), "example.com");
891 assert_eq!(cfg_str(&c, "missing"), "");
892 assert_eq!(cfg_str(&c, "port"), "");
894 }
895
896 #[test]
897 fn file_name_of_extracts_basename() {
898 assert_eq!(
899 file_name_of("/data/recordings/cam1/20260613_120000.mp4"),
900 "20260613_120000.mp4"
901 );
902 assert_eq!(file_name_of(""), "segment.mp4");
903 }
904}