1use axum::extract::{Path, Query, State};
10use axum::http::StatusCode;
11use axum::routing::{get, post};
12use axum::{Json, Router};
13use serde::Deserialize;
14use serde_json::json;
15use sqlx::types::Json as SqlxJson;
16use uuid::Uuid;
17
18use crate::auth::{self, Principal};
19use crate::error::{AppError, AppResult};
20use crate::models::{
21 ArchiveExportRequest, BackupDestination, BackupDestinationCreate, BackupDestinationUpdate,
22 BackupDestinationView, BackupJob, BackupPolicy, BackupPolicyCreate, BackupPolicyUpdate,
23 BackupTestResult, BACKUP_SECRET_KEYS,
24};
25use crate::services::backup;
26use crate::state::AppState;
27use crate::util;
28use chrono::{DateTime, Utc};
29
30pub fn router() -> Router<AppState> {
31 Router::new()
32 .route(
33 "/api/v1/backup/destinations",
34 get(list_destinations).post(create_destination),
35 )
36 .route(
37 "/api/v1/backup/destinations/{id}",
38 axum::routing::patch(update_destination).delete(delete_destination),
39 )
40 .route(
41 "/api/v1/backup/destinations/{id}/test",
42 post(test_destination),
43 )
44 .route(
45 "/api/v1/backup/policies",
46 get(list_policies).post(create_policy),
47 )
48 .route(
49 "/api/v1/backup/policies/{id}",
50 axum::routing::patch(update_policy).delete(delete_policy),
51 )
52 .route("/api/v1/backup/policies/{id}/trigger", post(trigger_policy))
53 .route("/api/v1/backup/jobs", get(list_jobs))
54 .route("/api/v1/backup/jobs/{id}", get(get_job).delete(delete_job))
55 .route("/api/v1/archive/export", post(archive_export))
56 .route("/api/v1/archive/exports", get(list_archive_exports))
57}
58
59const VALID_KINDS: &[&str] = &["local", "sftp", "ftp", "s3"];
60
61fn valid_kind(kind: &str) -> bool {
62 VALID_KINDS.contains(&kind)
63}
64
65async fn load_destination(pool: &sqlx::SqlitePool, id: &str) -> AppResult<BackupDestination> {
66 sqlx::query_as::<_, BackupDestination>("SELECT * FROM backup_destinations WHERE id = ?")
67 .bind(id)
68 .fetch_optional(pool)
69 .await?
70 .ok_or_else(|| AppError::NotFound(format!("backup destination {id} not found")))
71}
72
73async fn load_policy(pool: &sqlx::SqlitePool, id: &str) -> AppResult<BackupPolicy> {
74 sqlx::query_as::<_, BackupPolicy>("SELECT * FROM backup_policies WHERE id = ?")
75 .bind(id)
76 .fetch_optional(pool)
77 .await?
78 .ok_or_else(|| AppError::NotFound(format!("backup policy {id} not found")))
79}
80
81async fn load_job(pool: &sqlx::SqlitePool, id: &str) -> AppResult<BackupJob> {
82 sqlx::query_as::<_, BackupJob>("SELECT * FROM backup_jobs WHERE id = ?")
83 .bind(id)
84 .fetch_optional(pool)
85 .await?
86 .ok_or_else(|| AppError::NotFound(format!("backup job {id} not found")))
87}
88
89fn merge_secret_config(old: &serde_json::Value, mut new: serde_json::Value) -> serde_json::Value {
92 if let (Some(old_obj), Some(new_obj)) = (old.as_object(), new.as_object_mut()) {
93 for key in BACKUP_SECRET_KEYS {
94 if new_obj.get(*key).and_then(|v| v.as_str()) == Some("***") {
95 match old_obj.get(*key) {
96 Some(prev) => {
97 new_obj.insert((*key).to_string(), prev.clone());
98 }
99 None => {
100 new_obj.remove(*key);
101 }
102 }
103 }
104 }
105 }
106 new
107}
108
109async fn list_destinations(
112 State(st): State<AppState>,
113 principal: Principal,
114) -> AppResult<Json<Vec<BackupDestinationView>>> {
115 principal.require(principal.can_view(), "view backup destinations")?;
116 let rows = sqlx::query_as::<_, BackupDestination>(
117 "SELECT * FROM backup_destinations ORDER BY created_at ASC",
118 )
119 .fetch_all(&st.pool)
120 .await?;
121 Ok(Json(
122 rows.into_iter().map(BackupDestinationView::from).collect(),
123 ))
124}
125
126async fn create_destination(
127 State(st): State<AppState>,
128 principal: Principal,
129 Json(body): Json<BackupDestinationCreate>,
130) -> AppResult<(StatusCode, Json<BackupDestinationView>)> {
131 principal.require(
132 principal.can_manage_registry(),
133 "create backup destinations",
134 )?;
135 let name = body.name.trim();
136 if name.is_empty() {
137 return Err(AppError::BadRequest("`name` is required".into()));
138 }
139 if !valid_kind(&body.kind) {
140 return Err(AppError::BadRequest(
141 "`kind` must be local|sftp|ftp|s3".into(),
142 ));
143 }
144 let config = body.config.unwrap_or_else(|| json!({}));
145 if !config.is_object() {
146 return Err(AppError::BadRequest(
147 "`config` must be a JSON object".into(),
148 ));
149 }
150 let enabled = body.enabled.unwrap_or(true);
151 let id = format!("bkd_{}", Uuid::new_v4().simple());
152 let now = Utc::now();
153 sqlx::query(
154 "INSERT INTO backup_destinations (id, name, kind, config, enabled, created_at, updated_at)
155 VALUES (?, ?, ?, ?, ?, ?, ?)",
156 )
157 .bind(&id)
158 .bind(name)
159 .bind(&body.kind)
160 .bind(SqlxJson(config))
161 .bind(enabled)
162 .bind(now)
163 .bind(now)
164 .execute(&st.pool)
165 .await?;
166 auth::audit(
167 &st.pool,
168 &principal,
169 "create_backup_destination",
170 "backup_destination",
171 &id,
172 json!({ "kind": &body.kind, "name": name }),
173 )
174 .await;
175 let dest = load_destination(&st.pool, &id).await?;
176 Ok((StatusCode::CREATED, Json(BackupDestinationView::from(dest))))
177}
178
179async fn update_destination(
180 State(st): State<AppState>,
181 Path(id): Path<String>,
182 principal: Principal,
183 Json(body): Json<BackupDestinationUpdate>,
184) -> AppResult<Json<BackupDestinationView>> {
185 principal.require(
186 principal.can_manage_registry(),
187 "update backup destinations",
188 )?;
189 let cur = load_destination(&st.pool, &id).await?;
190
191 let name = body
192 .name
193 .map(|s| s.trim().to_string())
194 .filter(|s| !s.is_empty())
195 .unwrap_or_else(|| cur.name.clone());
196 let kind = body.kind.unwrap_or_else(|| cur.kind.clone());
197 if !valid_kind(&kind) {
198 return Err(AppError::BadRequest(
199 "`kind` must be local|sftp|ftp|s3".into(),
200 ));
201 }
202 let config = match body.config {
203 Some(new) => {
204 if !new.is_object() {
205 return Err(AppError::BadRequest(
206 "`config` must be a JSON object".into(),
207 ));
208 }
209 merge_secret_config(&cur.config.0, new)
210 }
211 None => cur.config.0.clone(),
212 };
213 let enabled = body.enabled.unwrap_or(cur.enabled);
214
215 sqlx::query(
216 "UPDATE backup_destinations SET name = ?, kind = ?, config = ?, enabled = ?, updated_at = ? WHERE id = ?",
217 )
218 .bind(&name)
219 .bind(&kind)
220 .bind(SqlxJson(config))
221 .bind(enabled)
222 .bind(Utc::now())
223 .bind(&id)
224 .execute(&st.pool)
225 .await?;
226 auth::audit(
227 &st.pool,
228 &principal,
229 "update_backup_destination",
230 "backup_destination",
231 &id,
232 json!({ "kind": &kind, "enabled": enabled }),
233 )
234 .await;
235 let dest = load_destination(&st.pool, &id).await?;
236 Ok(Json(BackupDestinationView::from(dest)))
237}
238
239async fn delete_destination(
240 State(st): State<AppState>,
241 Path(id): Path<String>,
242 principal: Principal,
243) -> AppResult<StatusCode> {
244 principal.require(
245 principal.can_manage_registry(),
246 "delete backup destinations",
247 )?;
248 let res = sqlx::query("DELETE FROM backup_destinations WHERE id = ?")
249 .bind(&id)
250 .execute(&st.pool)
251 .await?;
252 if res.rows_affected() == 0 {
253 return Err(AppError::NotFound(format!(
254 "backup destination {id} not found"
255 )));
256 }
257 auth::audit(
258 &st.pool,
259 &principal,
260 "delete_backup_destination",
261 "backup_destination",
262 &id,
263 json!({}),
264 )
265 .await;
266 Ok(StatusCode::NO_CONTENT)
267}
268
269async fn test_destination(
270 State(st): State<AppState>,
271 Path(id): Path<String>,
272 principal: Principal,
273) -> AppResult<Json<BackupTestResult>> {
274 principal.require(principal.can_manage_registry(), "test backup destinations")?;
275 let dest = load_destination(&st.pool, &id).await?;
276 let result = backup::test_destination(&st, &dest).await;
277 auth::audit(
278 &st.pool,
279 &principal,
280 "test_backup_destination",
281 "backup_destination",
282 &id,
283 json!({ "ok": result.ok }),
284 )
285 .await;
286 Ok(Json(result))
287}
288
289async fn list_policies(
292 State(st): State<AppState>,
293 principal: Principal,
294) -> AppResult<Json<Vec<BackupPolicy>>> {
295 principal.require(principal.can_view(), "view backup policies")?;
296 let rows =
297 sqlx::query_as::<_, BackupPolicy>("SELECT * FROM backup_policies ORDER BY created_at ASC")
298 .fetch_all(&st.pool)
299 .await?;
300 Ok(Json(rows))
301}
302
303async fn create_policy(
304 State(st): State<AppState>,
305 principal: Principal,
306 Json(body): Json<BackupPolicyCreate>,
307) -> AppResult<(StatusCode, Json<BackupPolicy>)> {
308 principal.require(principal.can_manage_registry(), "create backup policies")?;
309 let name = body.name.trim();
310 if name.is_empty() {
311 return Err(AppError::BadRequest("`name` is required".into()));
312 }
313 let _ = load_destination(&st.pool, &body.destination_id).await?;
315 let camera_ids = body.camera_ids.unwrap_or_else(|| json!([]));
316 if !camera_ids.is_array() {
317 return Err(AppError::BadRequest(
318 "`camera_ids` must be a JSON array of camera ids".into(),
319 ));
320 }
321 let incident_lock_only = body.incident_lock_only.unwrap_or(false);
322 let schedule_interval_s = body.schedule_interval_s.unwrap_or(86_400).max(60);
323 let lookback_hours = body.lookback_hours.unwrap_or(0).max(0);
324 let enabled = body.enabled.unwrap_or(true);
325 let id = format!("bkp_{}", Uuid::new_v4().simple());
326 let now = Utc::now();
327 sqlx::query(
328 "INSERT INTO backup_policies
329 (id, name, destination_id, camera_ids, incident_lock_only, schedule_interval_s,
330 lookback_hours, last_run_at, last_job_id, enabled, created_at, updated_at)
331 VALUES (?, ?, ?, ?, ?, ?, ?, NULL, NULL, ?, ?, ?)",
332 )
333 .bind(&id)
334 .bind(name)
335 .bind(&body.destination_id)
336 .bind(SqlxJson(camera_ids))
337 .bind(incident_lock_only)
338 .bind(schedule_interval_s)
339 .bind(lookback_hours)
340 .bind(enabled)
341 .bind(now)
342 .bind(now)
343 .execute(&st.pool)
344 .await?;
345 auth::audit(
346 &st.pool,
347 &principal,
348 "create_backup_policy",
349 "backup_policy",
350 &id,
351 json!({ "destination_id": &body.destination_id, "name": name }),
352 )
353 .await;
354 let policy = load_policy(&st.pool, &id).await?;
355 Ok((StatusCode::CREATED, Json(policy)))
356}
357
358async fn update_policy(
359 State(st): State<AppState>,
360 Path(id): Path<String>,
361 principal: Principal,
362 Json(body): Json<BackupPolicyUpdate>,
363) -> AppResult<Json<BackupPolicy>> {
364 principal.require(principal.can_manage_registry(), "update backup policies")?;
365 let cur = load_policy(&st.pool, &id).await?;
366
367 let name = body
368 .name
369 .map(|s| s.trim().to_string())
370 .filter(|s| !s.is_empty())
371 .unwrap_or_else(|| cur.name.clone());
372 let destination_id = body
373 .destination_id
374 .unwrap_or_else(|| cur.destination_id.clone());
375 let _ = load_destination(&st.pool, &destination_id).await?;
376 let camera_ids = match body.camera_ids {
377 Some(v) => {
378 if !v.is_array() {
379 return Err(AppError::BadRequest(
380 "`camera_ids` must be a JSON array of camera ids".into(),
381 ));
382 }
383 v
384 }
385 None => cur.camera_ids.0.clone(),
386 };
387 let incident_lock_only = body.incident_lock_only.unwrap_or(cur.incident_lock_only);
388 let schedule_interval_s = body
389 .schedule_interval_s
390 .map(|v| v.max(60))
391 .unwrap_or(cur.schedule_interval_s);
392 let lookback_hours = body
393 .lookback_hours
394 .map(|v| v.max(0))
395 .unwrap_or(cur.lookback_hours);
396 let enabled = body.enabled.unwrap_or(cur.enabled);
397
398 sqlx::query(
399 "UPDATE backup_policies SET name = ?, destination_id = ?, camera_ids = ?,
400 incident_lock_only = ?, schedule_interval_s = ?, lookback_hours = ?, enabled = ?, updated_at = ?
401 WHERE id = ?",
402 )
403 .bind(&name)
404 .bind(&destination_id)
405 .bind(SqlxJson(camera_ids))
406 .bind(incident_lock_only)
407 .bind(schedule_interval_s)
408 .bind(lookback_hours)
409 .bind(enabled)
410 .bind(Utc::now())
411 .bind(&id)
412 .execute(&st.pool)
413 .await?;
414 auth::audit(
415 &st.pool,
416 &principal,
417 "update_backup_policy",
418 "backup_policy",
419 &id,
420 json!({ "enabled": enabled }),
421 )
422 .await;
423 let policy = load_policy(&st.pool, &id).await?;
424 Ok(Json(policy))
425}
426
427async fn delete_policy(
428 State(st): State<AppState>,
429 Path(id): Path<String>,
430 principal: Principal,
431) -> AppResult<StatusCode> {
432 principal.require(principal.can_manage_registry(), "delete backup policies")?;
433 let res = sqlx::query("DELETE FROM backup_policies WHERE id = ?")
434 .bind(&id)
435 .execute(&st.pool)
436 .await?;
437 if res.rows_affected() == 0 {
438 return Err(AppError::NotFound(format!("backup policy {id} not found")));
439 }
440 auth::audit(
441 &st.pool,
442 &principal,
443 "delete_backup_policy",
444 "backup_policy",
445 &id,
446 json!({}),
447 )
448 .await;
449 Ok(StatusCode::NO_CONTENT)
450}
451
452async fn trigger_policy(
453 State(st): State<AppState>,
454 Path(id): Path<String>,
455 principal: Principal,
456) -> AppResult<(StatusCode, Json<BackupJob>)> {
457 principal.require(principal.can_manage_registry(), "trigger backup policies")?;
458 let policy = load_policy(&st.pool, &id).await?;
459 let job_id = backup::trigger_policy(&st, &policy)
460 .await
461 .map_err(AppError::Other)?;
462 auth::audit(
463 &st.pool,
464 &principal,
465 "trigger_backup_policy",
466 "backup_policy",
467 &id,
468 json!({ "job_id": &job_id }),
469 )
470 .await;
471 let job = load_job(&st.pool, &job_id).await?;
472 Ok((StatusCode::ACCEPTED, Json(job)))
473}
474
475#[derive(Debug, Deserialize)]
478struct JobQuery {
479 policy_id: Option<String>,
480 status: Option<String>,
481 limit: Option<i64>,
482}
483
484async fn list_jobs(
485 State(st): State<AppState>,
486 principal: Principal,
487 Query(q): Query<JobQuery>,
488) -> AppResult<Json<Vec<BackupJob>>> {
489 principal.require(principal.can_view(), "view backup jobs")?;
490 let limit = q.limit.unwrap_or(100).clamp(1, 2000);
491 let rows = sqlx::query_as::<_, BackupJob>(
492 "SELECT * FROM backup_jobs
493 WHERE (? IS NULL OR policy_id = ?)
494 AND (? IS NULL OR status = ?)
495 ORDER BY created_at DESC LIMIT ?",
496 )
497 .bind(&q.policy_id)
498 .bind(&q.policy_id)
499 .bind(&q.status)
500 .bind(&q.status)
501 .bind(limit)
502 .fetch_all(&st.pool)
503 .await?;
504 Ok(Json(rows))
505}
506
507async fn get_job(
508 State(st): State<AppState>,
509 Path(id): Path<String>,
510 principal: Principal,
511) -> AppResult<Json<BackupJob>> {
512 principal.require(principal.can_view(), "view backup jobs")?;
513 let job = load_job(&st.pool, &id).await?;
514 Ok(Json(job))
515}
516
517async fn delete_job(
518 State(st): State<AppState>,
519 Path(id): Path<String>,
520 principal: Principal,
521) -> AppResult<StatusCode> {
522 principal.require(principal.can_manage_registry(), "delete backup jobs")?;
523 let job = load_job(&st.pool, &id).await?;
524 if let Some(path) = &job.output_path {
526 let _ = tokio::fs::remove_file(path).await;
527 }
528 sqlx::query("DELETE FROM backup_jobs WHERE id = ?")
529 .bind(&id)
530 .execute(&st.pool)
531 .await?;
532 auth::audit(
533 &st.pool,
534 &principal,
535 "delete_backup_job",
536 "backup_job",
537 &id,
538 json!({}),
539 )
540 .await;
541 Ok(StatusCode::NO_CONTENT)
542}
543
544fn parse_opt_ts(s: &Option<String>, field: &str) -> AppResult<Option<DateTime<Utc>>> {
547 match s {
548 Some(v) => util::parse_rfc3339(v)
549 .map(Some)
550 .ok_or_else(|| AppError::BadRequest(format!("invalid `{field}` timestamp"))),
551 None => Ok(None),
552 }
553}
554
555async fn archive_export(
556 State(st): State<AppState>,
557 principal: Principal,
558 Json(body): Json<ArchiveExportRequest>,
559) -> AppResult<(StatusCode, Json<BackupJob>)> {
560 principal.require(principal.can_manage_registry(), "export archives")?;
561 let from = parse_opt_ts(&body.from, "from")?;
562 let to = parse_opt_ts(&body.to, "to")?;
563 if let (Some(f), Some(t)) = (from, to) {
564 if f > t {
565 return Err(AppError::BadRequest("`from` must be <= `to`".into()));
566 }
567 }
568 let camera_ids = body.camera_ids;
569 let incident_lock_only = body.incident_lock_only.unwrap_or(false);
570 let trim = body.trim.unwrap_or(false);
571 let job =
572 backup::create_archive(&st, camera_ids.clone(), from, to, incident_lock_only, trim).await?;
573 auth::audit(
574 &st.pool,
575 &principal,
576 "create_archive_export",
577 "backup_job",
578 &job.id,
579 json!({ "camera_ids": camera_ids, "incident_lock_only": incident_lock_only, "trim": trim }),
580 )
581 .await;
582 Ok((StatusCode::CREATED, Json(job)))
583}
584
585#[derive(Debug, Deserialize)]
586struct LimitQuery {
587 limit: Option<i64>,
588}
589
590async fn list_archive_exports(
591 State(st): State<AppState>,
592 principal: Principal,
593 Query(q): Query<LimitQuery>,
594) -> AppResult<Json<Vec<BackupJob>>> {
595 principal.require(principal.can_view(), "view archive exports")?;
596 let limit = q.limit.unwrap_or(100).clamp(1, 2000);
597 let rows = sqlx::query_as::<_, BackupJob>(
598 "SELECT * FROM backup_jobs WHERE kind = 'on_demand_archive' ORDER BY created_at DESC LIMIT ?",
599 )
600 .bind(limit)
601 .fetch_all(&st.pool)
602 .await?;
603 Ok(Json(rows))
604}
605
606#[cfg(test)]
607mod tests {
608 use super::*;
609
610 #[test]
611 fn valid_kind_accepts_known_and_rejects_others() {
612 assert!(valid_kind("local"));
613 assert!(valid_kind("sftp"));
614 assert!(valid_kind("ftp"));
615 assert!(valid_kind("s3"));
616 assert!(!valid_kind("gcs"));
618 assert!(!valid_kind(""));
619 assert!(!valid_kind("Local"));
620 assert!(!valid_kind("S3"));
621 }
622
623 #[test]
624 fn valid_kinds_list_is_exact() {
625 assert_eq!(VALID_KINDS, &["local", "sftp", "ftp", "s3"]);
626 }
627
628 #[test]
629 fn merge_secret_config_restores_placeholder_from_old() {
630 let old = json!({ "password": "hunter2", "host": "old.example" });
631 let new = json!({ "password": "***", "host": "new.example" });
632 let merged = merge_secret_config(&old, new);
633 assert_eq!(merged["password"], json!("hunter2"));
635 assert_eq!(merged["host"], json!("new.example"));
637 }
638
639 #[test]
640 fn merge_secret_config_drops_placeholder_when_no_stored_secret() {
641 let old = json!({ "host": "h" });
642 let new = json!({ "secret": "***", "host": "h2" });
643 let merged = merge_secret_config(&old, new);
644 assert!(merged.as_object().unwrap().get("secret").is_none());
646 assert_eq!(merged["host"], json!("h2"));
647 }
648
649 #[test]
650 fn merge_secret_config_keeps_new_non_placeholder_secret() {
651 let old = json!({ "password": "old" });
652 let new = json!({ "password": "brandnew" });
653 let merged = merge_secret_config(&old, new);
654 assert_eq!(merged["password"], json!("brandnew"));
656 }
657
658 #[test]
659 fn merge_secret_config_handles_all_secret_keys() {
660 let old = json!({ "pass": "a", "password": "b", "secret_key": "c", "secret": "d" });
661 let new = json!({ "pass": "***", "password": "***", "secret_key": "***", "secret": "***" });
662 let merged = merge_secret_config(&old, new);
663 assert_eq!(merged["pass"], json!("a"));
664 assert_eq!(merged["password"], json!("b"));
665 assert_eq!(merged["secret_key"], json!("c"));
666 assert_eq!(merged["secret"], json!("d"));
667 }
668
669 #[test]
670 fn merge_secret_config_passthrough_for_non_objects() {
671 let old = json!("not-an-object");
673 let new = json!([1, 2, 3]);
674 let merged = merge_secret_config(&old, new.clone());
675 assert_eq!(merged, new);
676 }
677
678 #[test]
679 fn parse_opt_ts_none_valid_and_invalid() {
680 assert!(parse_opt_ts(&None, "from").unwrap().is_none());
682
683 let ok = parse_opt_ts(&Some("2024-01-02T03:04:05Z".to_string()), "from").unwrap();
685 assert!(ok.is_some());
686
687 let err = parse_opt_ts(&Some("not-a-timestamp".to_string()), "to").unwrap_err();
689 match err {
690 AppError::BadRequest(msg) => {
691 assert!(msg.contains("to"));
692 assert!(msg.contains("invalid"));
693 }
694 other => panic!("expected BadRequest, got {other:?}"),
695 }
696 }
697}