Skip to main content

heldar_kernel/routes/
backup.rs

1//! Backup subsystem API: destinations, scheduled policies, the job ledger, and on-demand archive
2//! export.
3//!
4//! Destinations + policies are managed by manager+; their listings (with destination credentials
5//! MASKED) and the job/export ledger are readable by any authenticated principal. The actual
6//! transfers run in the background backup service ([`crate::services::backup`]). All mutations are
7//! written to the immutable audit log.
8
9use axum::extract::{Path, Query, State};
10use axum::http::StatusCode;
11use axum::routing::{get, post};
12use axum::{Json, Router};
13use serde::Deserialize;
14use serde_json::json;
15use sqlx::types::Json as SqlxJson;
16use uuid::Uuid;
17
18use crate::auth::{self, Principal};
19use crate::error::{AppError, AppResult};
20use crate::models::{
21    ArchiveExportRequest, BackupDestination, BackupDestinationCreate, BackupDestinationUpdate,
22    BackupDestinationView, BackupJob, BackupPolicy, BackupPolicyCreate, BackupPolicyUpdate,
23    BackupTestResult, BACKUP_SECRET_KEYS,
24};
25use crate::services::backup;
26use crate::state::AppState;
27use crate::util;
28use chrono::{DateTime, Utc};
29
30pub fn router() -> Router<AppState> {
31    Router::new()
32        .route(
33            "/api/v1/backup/destinations",
34            get(list_destinations).post(create_destination),
35        )
36        .route(
37            "/api/v1/backup/destinations/{id}",
38            axum::routing::patch(update_destination).delete(delete_destination),
39        )
40        .route(
41            "/api/v1/backup/destinations/{id}/test",
42            post(test_destination),
43        )
44        .route(
45            "/api/v1/backup/policies",
46            get(list_policies).post(create_policy),
47        )
48        .route(
49            "/api/v1/backup/policies/{id}",
50            axum::routing::patch(update_policy).delete(delete_policy),
51        )
52        .route("/api/v1/backup/policies/{id}/trigger", post(trigger_policy))
53        .route("/api/v1/backup/jobs", get(list_jobs))
54        .route("/api/v1/backup/jobs/{id}", get(get_job).delete(delete_job))
55        .route("/api/v1/archive/export", post(archive_export))
56        .route("/api/v1/archive/exports", get(list_archive_exports))
57}
58
59const VALID_KINDS: &[&str] = &["local", "sftp", "ftp", "s3"];
60
61fn valid_kind(kind: &str) -> bool {
62    VALID_KINDS.contains(&kind)
63}
64
65async fn load_destination(pool: &sqlx::SqlitePool, id: &str) -> AppResult<BackupDestination> {
66    sqlx::query_as::<_, BackupDestination>("SELECT * FROM backup_destinations WHERE id = ?")
67        .bind(id)
68        .fetch_optional(pool)
69        .await?
70        .ok_or_else(|| AppError::NotFound(format!("backup destination {id} not found")))
71}
72
73async fn load_policy(pool: &sqlx::SqlitePool, id: &str) -> AppResult<BackupPolicy> {
74    sqlx::query_as::<_, BackupPolicy>("SELECT * FROM backup_policies WHERE id = ?")
75        .bind(id)
76        .fetch_optional(pool)
77        .await?
78        .ok_or_else(|| AppError::NotFound(format!("backup policy {id} not found")))
79}
80
81async fn load_job(pool: &sqlx::SqlitePool, id: &str) -> AppResult<BackupJob> {
82    sqlx::query_as::<_, BackupJob>("SELECT * FROM backup_jobs WHERE id = ?")
83        .bind(id)
84        .fetch_optional(pool)
85        .await?
86        .ok_or_else(|| AppError::NotFound(format!("backup job {id} not found")))
87}
88
89/// Merge a new config over the existing one: any secret value the client sent back as the `***`
90/// placeholder is replaced with the stored secret (so editing other fields never wipes credentials).
91fn merge_secret_config(old: &serde_json::Value, mut new: serde_json::Value) -> serde_json::Value {
92    if let (Some(old_obj), Some(new_obj)) = (old.as_object(), new.as_object_mut()) {
93        for key in BACKUP_SECRET_KEYS {
94            if new_obj.get(*key).and_then(|v| v.as_str()) == Some("***") {
95                match old_obj.get(*key) {
96                    Some(prev) => {
97                        new_obj.insert((*key).to_string(), prev.clone());
98                    }
99                    None => {
100                        new_obj.remove(*key);
101                    }
102                }
103            }
104        }
105    }
106    new
107}
108
109// ---- Destinations ----
110
111async fn list_destinations(
112    State(st): State<AppState>,
113    principal: Principal,
114) -> AppResult<Json<Vec<BackupDestinationView>>> {
115    principal.require(principal.can_view(), "view backup destinations")?;
116    let rows = sqlx::query_as::<_, BackupDestination>(
117        "SELECT * FROM backup_destinations ORDER BY created_at ASC",
118    )
119    .fetch_all(&st.pool)
120    .await?;
121    Ok(Json(
122        rows.into_iter().map(BackupDestinationView::from).collect(),
123    ))
124}
125
126async fn create_destination(
127    State(st): State<AppState>,
128    principal: Principal,
129    Json(body): Json<BackupDestinationCreate>,
130) -> AppResult<(StatusCode, Json<BackupDestinationView>)> {
131    principal.require(
132        principal.can_manage_registry(),
133        "create backup destinations",
134    )?;
135    let name = body.name.trim();
136    if name.is_empty() {
137        return Err(AppError::BadRequest("`name` is required".into()));
138    }
139    if !valid_kind(&body.kind) {
140        return Err(AppError::BadRequest(
141            "`kind` must be local|sftp|ftp|s3".into(),
142        ));
143    }
144    let config = body.config.unwrap_or_else(|| json!({}));
145    if !config.is_object() {
146        return Err(AppError::BadRequest(
147            "`config` must be a JSON object".into(),
148        ));
149    }
150    let enabled = body.enabled.unwrap_or(true);
151    let id = format!("bkd_{}", Uuid::new_v4().simple());
152    let now = Utc::now();
153    sqlx::query(
154        "INSERT INTO backup_destinations (id, name, kind, config, enabled, created_at, updated_at)
155         VALUES (?, ?, ?, ?, ?, ?, ?)",
156    )
157    .bind(&id)
158    .bind(name)
159    .bind(&body.kind)
160    .bind(SqlxJson(config))
161    .bind(enabled)
162    .bind(now)
163    .bind(now)
164    .execute(&st.pool)
165    .await?;
166    auth::audit(
167        &st.pool,
168        &principal,
169        "create_backup_destination",
170        "backup_destination",
171        &id,
172        json!({ "kind": &body.kind, "name": name }),
173    )
174    .await;
175    let dest = load_destination(&st.pool, &id).await?;
176    Ok((StatusCode::CREATED, Json(BackupDestinationView::from(dest))))
177}
178
179async fn update_destination(
180    State(st): State<AppState>,
181    Path(id): Path<String>,
182    principal: Principal,
183    Json(body): Json<BackupDestinationUpdate>,
184) -> AppResult<Json<BackupDestinationView>> {
185    principal.require(
186        principal.can_manage_registry(),
187        "update backup destinations",
188    )?;
189    let cur = load_destination(&st.pool, &id).await?;
190
191    let name = body
192        .name
193        .map(|s| s.trim().to_string())
194        .filter(|s| !s.is_empty())
195        .unwrap_or_else(|| cur.name.clone());
196    let kind = body.kind.unwrap_or_else(|| cur.kind.clone());
197    if !valid_kind(&kind) {
198        return Err(AppError::BadRequest(
199            "`kind` must be local|sftp|ftp|s3".into(),
200        ));
201    }
202    let config = match body.config {
203        Some(new) => {
204            if !new.is_object() {
205                return Err(AppError::BadRequest(
206                    "`config` must be a JSON object".into(),
207                ));
208            }
209            merge_secret_config(&cur.config.0, new)
210        }
211        None => cur.config.0.clone(),
212    };
213    let enabled = body.enabled.unwrap_or(cur.enabled);
214
215    sqlx::query(
216        "UPDATE backup_destinations SET name = ?, kind = ?, config = ?, enabled = ?, updated_at = ? WHERE id = ?",
217    )
218    .bind(&name)
219    .bind(&kind)
220    .bind(SqlxJson(config))
221    .bind(enabled)
222    .bind(Utc::now())
223    .bind(&id)
224    .execute(&st.pool)
225    .await?;
226    auth::audit(
227        &st.pool,
228        &principal,
229        "update_backup_destination",
230        "backup_destination",
231        &id,
232        json!({ "kind": &kind, "enabled": enabled }),
233    )
234    .await;
235    let dest = load_destination(&st.pool, &id).await?;
236    Ok(Json(BackupDestinationView::from(dest)))
237}
238
239async fn delete_destination(
240    State(st): State<AppState>,
241    Path(id): Path<String>,
242    principal: Principal,
243) -> AppResult<StatusCode> {
244    principal.require(
245        principal.can_manage_registry(),
246        "delete backup destinations",
247    )?;
248    let res = sqlx::query("DELETE FROM backup_destinations WHERE id = ?")
249        .bind(&id)
250        .execute(&st.pool)
251        .await?;
252    if res.rows_affected() == 0 {
253        return Err(AppError::NotFound(format!(
254            "backup destination {id} not found"
255        )));
256    }
257    auth::audit(
258        &st.pool,
259        &principal,
260        "delete_backup_destination",
261        "backup_destination",
262        &id,
263        json!({}),
264    )
265    .await;
266    Ok(StatusCode::NO_CONTENT)
267}
268
269async fn test_destination(
270    State(st): State<AppState>,
271    Path(id): Path<String>,
272    principal: Principal,
273) -> AppResult<Json<BackupTestResult>> {
274    principal.require(principal.can_manage_registry(), "test backup destinations")?;
275    let dest = load_destination(&st.pool, &id).await?;
276    let result = backup::test_destination(&st, &dest).await;
277    auth::audit(
278        &st.pool,
279        &principal,
280        "test_backup_destination",
281        "backup_destination",
282        &id,
283        json!({ "ok": result.ok }),
284    )
285    .await;
286    Ok(Json(result))
287}
288
289// ---- Policies ----
290
291async fn list_policies(
292    State(st): State<AppState>,
293    principal: Principal,
294) -> AppResult<Json<Vec<BackupPolicy>>> {
295    principal.require(principal.can_view(), "view backup policies")?;
296    let rows =
297        sqlx::query_as::<_, BackupPolicy>("SELECT * FROM backup_policies ORDER BY created_at ASC")
298            .fetch_all(&st.pool)
299            .await?;
300    Ok(Json(rows))
301}
302
303async fn create_policy(
304    State(st): State<AppState>,
305    principal: Principal,
306    Json(body): Json<BackupPolicyCreate>,
307) -> AppResult<(StatusCode, Json<BackupPolicy>)> {
308    principal.require(principal.can_manage_registry(), "create backup policies")?;
309    let name = body.name.trim();
310    if name.is_empty() {
311        return Err(AppError::BadRequest("`name` is required".into()));
312    }
313    // The destination must exist (FK would also reject, but a clean 404 is friendlier).
314    let _ = load_destination(&st.pool, &body.destination_id).await?;
315    let camera_ids = body.camera_ids.unwrap_or_else(|| json!([]));
316    if !camera_ids.is_array() {
317        return Err(AppError::BadRequest(
318            "`camera_ids` must be a JSON array of camera ids".into(),
319        ));
320    }
321    let incident_lock_only = body.incident_lock_only.unwrap_or(false);
322    let schedule_interval_s = body.schedule_interval_s.unwrap_or(86_400).max(60);
323    let lookback_hours = body.lookback_hours.unwrap_or(0).max(0);
324    let enabled = body.enabled.unwrap_or(true);
325    let id = format!("bkp_{}", Uuid::new_v4().simple());
326    let now = Utc::now();
327    sqlx::query(
328        "INSERT INTO backup_policies
329           (id, name, destination_id, camera_ids, incident_lock_only, schedule_interval_s,
330            lookback_hours, last_run_at, last_job_id, enabled, created_at, updated_at)
331         VALUES (?, ?, ?, ?, ?, ?, ?, NULL, NULL, ?, ?, ?)",
332    )
333    .bind(&id)
334    .bind(name)
335    .bind(&body.destination_id)
336    .bind(SqlxJson(camera_ids))
337    .bind(incident_lock_only)
338    .bind(schedule_interval_s)
339    .bind(lookback_hours)
340    .bind(enabled)
341    .bind(now)
342    .bind(now)
343    .execute(&st.pool)
344    .await?;
345    auth::audit(
346        &st.pool,
347        &principal,
348        "create_backup_policy",
349        "backup_policy",
350        &id,
351        json!({ "destination_id": &body.destination_id, "name": name }),
352    )
353    .await;
354    let policy = load_policy(&st.pool, &id).await?;
355    Ok((StatusCode::CREATED, Json(policy)))
356}
357
358async fn update_policy(
359    State(st): State<AppState>,
360    Path(id): Path<String>,
361    principal: Principal,
362    Json(body): Json<BackupPolicyUpdate>,
363) -> AppResult<Json<BackupPolicy>> {
364    principal.require(principal.can_manage_registry(), "update backup policies")?;
365    let cur = load_policy(&st.pool, &id).await?;
366
367    let name = body
368        .name
369        .map(|s| s.trim().to_string())
370        .filter(|s| !s.is_empty())
371        .unwrap_or_else(|| cur.name.clone());
372    let destination_id = body
373        .destination_id
374        .unwrap_or_else(|| cur.destination_id.clone());
375    let _ = load_destination(&st.pool, &destination_id).await?;
376    let camera_ids = match body.camera_ids {
377        Some(v) => {
378            if !v.is_array() {
379                return Err(AppError::BadRequest(
380                    "`camera_ids` must be a JSON array of camera ids".into(),
381                ));
382            }
383            v
384        }
385        None => cur.camera_ids.0.clone(),
386    };
387    let incident_lock_only = body.incident_lock_only.unwrap_or(cur.incident_lock_only);
388    let schedule_interval_s = body
389        .schedule_interval_s
390        .map(|v| v.max(60))
391        .unwrap_or(cur.schedule_interval_s);
392    let lookback_hours = body
393        .lookback_hours
394        .map(|v| v.max(0))
395        .unwrap_or(cur.lookback_hours);
396    let enabled = body.enabled.unwrap_or(cur.enabled);
397
398    sqlx::query(
399        "UPDATE backup_policies SET name = ?, destination_id = ?, camera_ids = ?,
400            incident_lock_only = ?, schedule_interval_s = ?, lookback_hours = ?, enabled = ?, updated_at = ?
401         WHERE id = ?",
402    )
403    .bind(&name)
404    .bind(&destination_id)
405    .bind(SqlxJson(camera_ids))
406    .bind(incident_lock_only)
407    .bind(schedule_interval_s)
408    .bind(lookback_hours)
409    .bind(enabled)
410    .bind(Utc::now())
411    .bind(&id)
412    .execute(&st.pool)
413    .await?;
414    auth::audit(
415        &st.pool,
416        &principal,
417        "update_backup_policy",
418        "backup_policy",
419        &id,
420        json!({ "enabled": enabled }),
421    )
422    .await;
423    let policy = load_policy(&st.pool, &id).await?;
424    Ok(Json(policy))
425}
426
427async fn delete_policy(
428    State(st): State<AppState>,
429    Path(id): Path<String>,
430    principal: Principal,
431) -> AppResult<StatusCode> {
432    principal.require(principal.can_manage_registry(), "delete backup policies")?;
433    let res = sqlx::query("DELETE FROM backup_policies WHERE id = ?")
434        .bind(&id)
435        .execute(&st.pool)
436        .await?;
437    if res.rows_affected() == 0 {
438        return Err(AppError::NotFound(format!("backup policy {id} not found")));
439    }
440    auth::audit(
441        &st.pool,
442        &principal,
443        "delete_backup_policy",
444        "backup_policy",
445        &id,
446        json!({}),
447    )
448    .await;
449    Ok(StatusCode::NO_CONTENT)
450}
451
452async fn trigger_policy(
453    State(st): State<AppState>,
454    Path(id): Path<String>,
455    principal: Principal,
456) -> AppResult<(StatusCode, Json<BackupJob>)> {
457    principal.require(principal.can_manage_registry(), "trigger backup policies")?;
458    let policy = load_policy(&st.pool, &id).await?;
459    let job_id = backup::trigger_policy(&st, &policy)
460        .await
461        .map_err(AppError::Other)?;
462    auth::audit(
463        &st.pool,
464        &principal,
465        "trigger_backup_policy",
466        "backup_policy",
467        &id,
468        json!({ "job_id": &job_id }),
469    )
470    .await;
471    let job = load_job(&st.pool, &job_id).await?;
472    Ok((StatusCode::ACCEPTED, Json(job)))
473}
474
475// ---- Jobs ----
476
477#[derive(Debug, Deserialize)]
478struct JobQuery {
479    policy_id: Option<String>,
480    status: Option<String>,
481    limit: Option<i64>,
482}
483
484async fn list_jobs(
485    State(st): State<AppState>,
486    principal: Principal,
487    Query(q): Query<JobQuery>,
488) -> AppResult<Json<Vec<BackupJob>>> {
489    principal.require(principal.can_view(), "view backup jobs")?;
490    let limit = q.limit.unwrap_or(100).clamp(1, 2000);
491    let rows = sqlx::query_as::<_, BackupJob>(
492        "SELECT * FROM backup_jobs
493         WHERE (? IS NULL OR policy_id = ?)
494           AND (? IS NULL OR status = ?)
495         ORDER BY created_at DESC LIMIT ?",
496    )
497    .bind(&q.policy_id)
498    .bind(&q.policy_id)
499    .bind(&q.status)
500    .bind(&q.status)
501    .bind(limit)
502    .fetch_all(&st.pool)
503    .await?;
504    Ok(Json(rows))
505}
506
507async fn get_job(
508    State(st): State<AppState>,
509    Path(id): Path<String>,
510    principal: Principal,
511) -> AppResult<Json<BackupJob>> {
512    principal.require(principal.can_view(), "view backup jobs")?;
513    let job = load_job(&st.pool, &id).await?;
514    Ok(Json(job))
515}
516
517async fn delete_job(
518    State(st): State<AppState>,
519    Path(id): Path<String>,
520    principal: Principal,
521) -> AppResult<StatusCode> {
522    principal.require(principal.can_manage_registry(), "delete backup jobs")?;
523    let job = load_job(&st.pool, &id).await?;
524    // Remove the produced archive artifact, if any, before dropping the row.
525    if let Some(path) = &job.output_path {
526        let _ = tokio::fs::remove_file(path).await;
527    }
528    sqlx::query("DELETE FROM backup_jobs WHERE id = ?")
529        .bind(&id)
530        .execute(&st.pool)
531        .await?;
532    auth::audit(
533        &st.pool,
534        &principal,
535        "delete_backup_job",
536        "backup_job",
537        &id,
538        json!({}),
539    )
540    .await;
541    Ok(StatusCode::NO_CONTENT)
542}
543
544// ---- On-demand archive export ----
545
546fn parse_opt_ts(s: &Option<String>, field: &str) -> AppResult<Option<DateTime<Utc>>> {
547    match s {
548        Some(v) => util::parse_rfc3339(v)
549            .map(Some)
550            .ok_or_else(|| AppError::BadRequest(format!("invalid `{field}` timestamp"))),
551        None => Ok(None),
552    }
553}
554
555async fn archive_export(
556    State(st): State<AppState>,
557    principal: Principal,
558    Json(body): Json<ArchiveExportRequest>,
559) -> AppResult<(StatusCode, Json<BackupJob>)> {
560    principal.require(principal.can_manage_registry(), "export archives")?;
561    let from = parse_opt_ts(&body.from, "from")?;
562    let to = parse_opt_ts(&body.to, "to")?;
563    if let (Some(f), Some(t)) = (from, to) {
564        if f > t {
565            return Err(AppError::BadRequest("`from` must be <= `to`".into()));
566        }
567    }
568    let camera_ids = body.camera_ids;
569    let incident_lock_only = body.incident_lock_only.unwrap_or(false);
570    let trim = body.trim.unwrap_or(false);
571    let job =
572        backup::create_archive(&st, camera_ids.clone(), from, to, incident_lock_only, trim).await?;
573    auth::audit(
574        &st.pool,
575        &principal,
576        "create_archive_export",
577        "backup_job",
578        &job.id,
579        json!({ "camera_ids": camera_ids, "incident_lock_only": incident_lock_only, "trim": trim }),
580    )
581    .await;
582    Ok((StatusCode::CREATED, Json(job)))
583}
584
585#[derive(Debug, Deserialize)]
586struct LimitQuery {
587    limit: Option<i64>,
588}
589
590async fn list_archive_exports(
591    State(st): State<AppState>,
592    principal: Principal,
593    Query(q): Query<LimitQuery>,
594) -> AppResult<Json<Vec<BackupJob>>> {
595    principal.require(principal.can_view(), "view archive exports")?;
596    let limit = q.limit.unwrap_or(100).clamp(1, 2000);
597    let rows = sqlx::query_as::<_, BackupJob>(
598        "SELECT * FROM backup_jobs WHERE kind = 'on_demand_archive' ORDER BY created_at DESC LIMIT ?",
599    )
600    .bind(limit)
601    .fetch_all(&st.pool)
602    .await?;
603    Ok(Json(rows))
604}
605
606#[cfg(test)]
607mod tests {
608    use super::*;
609
610    #[test]
611    fn valid_kind_accepts_known_and_rejects_others() {
612        assert!(valid_kind("local"));
613        assert!(valid_kind("sftp"));
614        assert!(valid_kind("ftp"));
615        assert!(valid_kind("s3"));
616        // Unknown, empty, and wrong-case are all rejected (case-sensitive match).
617        assert!(!valid_kind("gcs"));
618        assert!(!valid_kind(""));
619        assert!(!valid_kind("Local"));
620        assert!(!valid_kind("S3"));
621    }
622
623    #[test]
624    fn valid_kinds_list_is_exact() {
625        assert_eq!(VALID_KINDS, &["local", "sftp", "ftp", "s3"]);
626    }
627
628    #[test]
629    fn merge_secret_config_restores_placeholder_from_old() {
630        let old = json!({ "password": "hunter2", "host": "old.example" });
631        let new = json!({ "password": "***", "host": "new.example" });
632        let merged = merge_secret_config(&old, new);
633        // The *** placeholder is replaced with the stored secret...
634        assert_eq!(merged["password"], json!("hunter2"));
635        // ...while non-secret fields take the newly submitted value.
636        assert_eq!(merged["host"], json!("new.example"));
637    }
638
639    #[test]
640    fn merge_secret_config_drops_placeholder_when_no_stored_secret() {
641        let old = json!({ "host": "h" });
642        let new = json!({ "secret": "***", "host": "h2" });
643        let merged = merge_secret_config(&old, new);
644        // No previously-stored `secret`, so the placeholder key is removed entirely.
645        assert!(merged.as_object().unwrap().get("secret").is_none());
646        assert_eq!(merged["host"], json!("h2"));
647    }
648
649    #[test]
650    fn merge_secret_config_keeps_new_non_placeholder_secret() {
651        let old = json!({ "password": "old" });
652        let new = json!({ "password": "brandnew" });
653        let merged = merge_secret_config(&old, new);
654        // A real new value (not the placeholder) is preserved, overwriting the old secret.
655        assert_eq!(merged["password"], json!("brandnew"));
656    }
657
658    #[test]
659    fn merge_secret_config_handles_all_secret_keys() {
660        let old = json!({ "pass": "a", "password": "b", "secret_key": "c", "secret": "d" });
661        let new = json!({ "pass": "***", "password": "***", "secret_key": "***", "secret": "***" });
662        let merged = merge_secret_config(&old, new);
663        assert_eq!(merged["pass"], json!("a"));
664        assert_eq!(merged["password"], json!("b"));
665        assert_eq!(merged["secret_key"], json!("c"));
666        assert_eq!(merged["secret"], json!("d"));
667    }
668
669    #[test]
670    fn merge_secret_config_passthrough_for_non_objects() {
671        // When either side is not a JSON object, the new value is returned unchanged.
672        let old = json!("not-an-object");
673        let new = json!([1, 2, 3]);
674        let merged = merge_secret_config(&old, new.clone());
675        assert_eq!(merged, new);
676    }
677
678    #[test]
679    fn parse_opt_ts_none_valid_and_invalid() {
680        // None input yields Ok(None).
681        assert!(parse_opt_ts(&None, "from").unwrap().is_none());
682
683        // A valid RFC3339 timestamp (trailing Z accepted) parses to Some.
684        let ok = parse_opt_ts(&Some("2024-01-02T03:04:05Z".to_string()), "from").unwrap();
685        assert!(ok.is_some());
686
687        // An invalid timestamp surfaces a BadRequest mentioning the field name.
688        let err = parse_opt_ts(&Some("not-a-timestamp".to_string()), "to").unwrap_err();
689        match err {
690            AppError::BadRequest(msg) => {
691                assert!(msg.contains("to"));
692                assert!(msg.contains("invalid"));
693            }
694            other => panic!("expected BadRequest, got {other:?}"),
695        }
696    }
697}