Skip to main content

heldar_kernel/services/
retention.rs

1//! Retention sweeper: deletes recordings past each camera's age policy, and enforces a global
2//! size cap by pruning the oldest deletable segments. Segments under a durable evidence hold
3//! (`evidence_locked = 1`) are never deleted, and a segment with a transient export read-lock
4//! (`locked = 1`) is skipped while the export is in flight. Both are excluded from every prune.
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use chrono::{DateTime, Utc};
10use serde_json::json;
11use sqlx::SqlitePool;
12
13use crate::config::Config;
14use crate::repo;
15use crate::services::storage;
16
17/// Delete a segment's file and report whether its DB row should now be removed. The row is removed
18/// only when the file is actually gone — deleted just now, or already absent (`NotFound`). If the
19/// delete fails for any other reason (permissions, I/O error), we keep the DB row so the file is not
20/// orphaned-yet-forgotten: the next sweep retries it, and the size/disk accounting stays truthful.
21async fn unlink_segment(path: &str) -> bool {
22    match tokio::fs::remove_file(path).await {
23        Ok(()) => true,
24        Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
25        Err(e) => {
26            tracing::error!(path, error = %e, "retention: failed to delete segment file; keeping DB row to retry next sweep");
27            false
28        }
29    }
30}
31
32/// Remove one segment row IF it is still unlocked, then best-effort delete its file. Returns
33/// whether the row was removed.
34///
35/// The conditional `DELETE ... WHERE locked = 0 AND evidence_locked = 0` is a TOCTOU guard: SQLite
36/// serializes it against the incident/export lock `UPDATE`s, so an evidence-hold or export
37/// read-lock that commits AFTER this segment was selected for pruning wins the race — `rows_affected`
38/// is 0 and the file is never touched. Only when the row is actually removed do we unlink the file.
39/// A rare unlink failure then orphans the file (the `path` column is UNIQUE, so an orphan sweep can
40/// reclaim it) — strictly preferable to ever deleting protected evidence.
41async fn delete_segment_if_unlocked(
42    pool: &SqlitePool,
43    seg_id: &str,
44    path: &str,
45) -> anyhow::Result<bool> {
46    let removed =
47        sqlx::query("DELETE FROM segments WHERE id = ? AND locked = 0 AND evidence_locked = 0")
48            .bind(seg_id)
49            .execute(pool)
50            .await?
51            .rows_affected();
52    if removed == 1 {
53        unlink_segment(path).await;
54        Ok(true)
55    } else {
56        Ok(false)
57    }
58}
59
60/// Delete a snapshot's file and report whether its DB row should now be removed. Mirrors
61/// [`unlink_segment`]: the row is removed only when the file is actually gone (deleted just now or
62/// already absent); on any other delete error we keep the row so the next sweep retries.
63async fn unlink_snapshot(path: &str) -> bool {
64    match tokio::fs::remove_file(path).await {
65        Ok(()) => true,
66        Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
67        Err(e) => {
68            tracing::error!(path, error = %e, "retention: failed to delete snapshot file; keeping DB row to retry next sweep");
69            false
70        }
71    }
72}
73
74pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
75    let mut tick = tokio::time::interval(Duration::from_secs(cfg.retention_interval_s.max(30)));
76    loop {
77        tick.tick().await;
78        if let Err(e) = sweep(&pool, &cfg).await {
79            tracing::error!(error = %e, "retention: sweep failed");
80        }
81    }
82}
83
84async fn sweep(pool: &SqlitePool, cfg: &Config) -> anyhow::Result<()> {
85    // 1) Age-based retention, per-camera.
86    let mut age_deleted: u64 = 0;
87    let cams: Vec<(String, i64)> = sqlx::query_as("SELECT id, retention_hours FROM cameras")
88        .fetch_all(pool)
89        .await?;
90    for (id, hours) in cams {
91        let cutoff = Utc::now() - chrono::Duration::hours(hours.max(1));
92        let rows: Vec<(String, String)> = sqlx::query_as(
93            "SELECT id, path FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0 AND end_time < ?",
94        )
95        .bind(&id)
96        .bind(cutoff)
97        .fetch_all(pool)
98        .await?;
99        for (seg_id, path) in rows {
100            if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
101                age_deleted += 1;
102            }
103        }
104    }
105    if age_deleted > 0 {
106        let _ = repo::log_event(
107            pool,
108            None,
109            "retention_delete",
110            "info",
111            json!({ "deleted": age_deleted, "reason": "age" }),
112        )
113        .await;
114        tracing::info!(deleted = age_deleted, "retention: age-based cleanup");
115    }
116
117    // 2) Per-camera storage quota. Mirrors the global size cap (step 3) but scoped to one camera:
118    //    keep each capped camera's deletable footprint within its quota by pruning its oldest
119    //    unlocked segments. Evidence-locked footage (`evidence_locked = 1`) is protected and counts
120    //    against the quota; if it alone meets or exceeds the quota, we warn and delete nothing rather
121    //    than wiping the camera's other footage. Only cameras with `storage_quota_bytes IS NOT NULL`
122    //    are capped here; the rest are governed solely by the global cap + disk floor below.
123    let mut quota_deleted: u64 = 0;
124    let quota_cams: Vec<(String, i64)> = sqlx::query_as(
125        "SELECT id, storage_quota_bytes FROM cameras WHERE storage_quota_bytes IS NOT NULL",
126    )
127    .fetch_all(pool)
128    .await?;
129    for (cam_id, quota) in quota_cams {
130        if quota <= 0 {
131            continue;
132        }
133        let protected_bytes: i64 = sqlx::query_scalar(
134            "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE camera_id = ? AND evidence_locked = 1",
135        )
136        .bind(&cam_id)
137        .fetch_one(pool)
138        .await?;
139        let budget = quota - protected_bytes;
140        if budget <= 0 {
141            if protected_bytes > quota {
142                tracing::warn!(
143                    camera_id = %cam_id,
144                    protected_bytes,
145                    quota,
146                    "retention: evidence-locked footage exceeds the camera quota; not deleting other footage"
147                );
148                let _ = repo::log_event(
149                    pool,
150                    Some(&cam_id),
151                    "disk_pressure",
152                    "warning",
153                    json!({ "reason": "camera_quota", "camera_id": &cam_id, "protected_bytes": protected_bytes, "quota_bytes": quota }),
154                )
155                .await;
156            }
157            continue;
158        }
159        loop {
160            let deletable_total: i64 = sqlx::query_scalar(
161                "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0",
162            )
163            .bind(&cam_id)
164            .fetch_one(pool)
165            .await?;
166            if deletable_total <= budget {
167                break;
168            }
169            let batch: Vec<(String, String, i64)> = sqlx::query_as(
170                "SELECT id, path, size_bytes FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
171            )
172            .bind(&cam_id)
173            .fetch_all(pool)
174            .await?;
175            if batch.is_empty() {
176                break;
177            }
178            let mut remaining = deletable_total;
179            let mut progressed = 0u64;
180            for (seg_id, path, size) in batch {
181                // Stop the instant the budget is met — never over-prune within a batch. The oldest
182                // segments are deleted first; once enough have gone to bring the deletable footprint
183                // to-or-under budget, the rest are within quota and must be kept (footage is
184                // unrecoverable on a DVR).
185                if remaining <= budget {
186                    break;
187                }
188                if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
189                    remaining -= size;
190                    quota_deleted += 1;
191                    progressed += 1;
192                }
193            }
194            if progressed == 0 {
195                tracing::error!(camera_id = %cam_id, "retention: camera-quota prune made no progress (segment file deletes failing); stopping this camera");
196                break;
197            }
198        }
199    }
200    if quota_deleted > 0 {
201        let _ = repo::log_event(
202            pool,
203            None,
204            "disk_pressure",
205            "warning",
206            json!({ "deleted": quota_deleted, "reason": "camera_quota" }),
207        )
208        .await;
209        tracing::warn!(
210            deleted = quota_deleted,
211            "retention: per-camera quota cleanup"
212        );
213    }
214
215    // 3) Global size cap: prune the oldest DELETABLE segments until the deletable footprint fits the
216    //    budget. The budget is the cap minus the evidence-locked bytes we cannot delete — counting
217    //    those in the comparison would otherwise make us delete every deletable segment. We measure
218    //    the protected footprint by `evidence_locked = 1` (the DURABLE hold), not the transient
219    //    `locked` read-lock: an in-flight export must not inflate the protected total and starve the
220    //    cap. Deletable = `locked = 0 AND evidence_locked = 0` (skip both the read-lock and the hold).
221    let max = cfg.max_recordings_bytes as i64;
222    let protected_bytes: i64 = sqlx::query_scalar(
223        "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE evidence_locked = 1",
224    )
225    .fetch_one(pool)
226    .await?;
227    let budget = max - protected_bytes;
228    let mut size_deleted: u64 = 0;
229
230    if budget <= 0 {
231        // Evidence-locked footage alone meets or exceeds the cap; deleting other footage cannot
232        // help. Warn instead of wiping everything.
233        let unlocked: i64 = sqlx::query_scalar(
234            "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0 AND evidence_locked = 0",
235        )
236        .fetch_one(pool)
237        .await?;
238        if protected_bytes > max {
239            tracing::warn!(
240                protected_bytes,
241                max,
242                "retention: evidence-locked footage exceeds the size cap; not deleting other footage"
243            );
244            let _ = repo::log_event(
245                pool,
246                None,
247                "disk_pressure",
248                "warning",
249                json!({ "reason": "locked_exceeds_cap", "protected_bytes": protected_bytes, "unlocked_bytes": unlocked, "max_bytes": max }),
250            )
251            .await;
252        }
253    } else {
254        loop {
255            let unlocked_total: i64 = sqlx::query_scalar(
256                "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0 AND evidence_locked = 0",
257            )
258            .fetch_one(pool)
259            .await?;
260            if unlocked_total <= budget {
261                break;
262            }
263            let batch: Vec<(String, String, i64)> = sqlx::query_as(
264                "SELECT id, path, size_bytes FROM segments WHERE locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
265            )
266            .fetch_all(pool)
267            .await?;
268            if batch.is_empty() {
269                break;
270            }
271            let mut remaining = unlocked_total;
272            let mut progressed = 0u64;
273            for (seg_id, path, size) in batch {
274                // Stop the instant the global cap is satisfied — never over-prune within a batch.
275                if remaining <= budget {
276                    break;
277                }
278                if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
279                    remaining -= size;
280                    size_deleted += 1;
281                    progressed += 1;
282                }
283            }
284            if progressed == 0 {
285                // Every file in the batch failed to delete; we'd re-select the same rows forever.
286                tracing::error!("retention: size-cap prune made no progress (segment file deletes failing); stopping this sweep");
287                break;
288            }
289        }
290    }
291
292    if size_deleted > 0 {
293        let _ = repo::log_event(
294            pool,
295            None,
296            "disk_pressure",
297            "warning",
298            json!({ "deleted": size_deleted, "reason": "size_cap", "max_bytes": max }),
299        )
300        .await;
301        tracing::warn!(deleted = size_deleted, "retention: size-cap cleanup");
302    }
303
304    // 4) Disk-free floor: if the recordings filesystem drops below the free-space floor, prune the
305    //    oldest unlocked segments until back above it. Self-limiting: it stops if a delete batch
306    //    does not actually recover free space (disk filled by non-recording data), and refuses to
307    //    run if the floor exceeds the whole disk — so it never destroys the footprint for nothing.
308    let floor = cfg.min_free_disk_bytes;
309    let mut disk_deleted: u64 = 0;
310    match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
311        None => {
312            tracing::warn!(
313                "retention: could not read disk stats; free-floor check skipped this sweep"
314            );
315            let _ = repo::log_event(
316                pool,
317                None,
318                "disk_pressure",
319                "warning",
320                json!({ "reason": "disk_stats_unavailable" }),
321            )
322            .await;
323        }
324        Some(d) if floor >= d.total_bytes => {
325            if d.free_bytes < floor {
326                tracing::warn!(
327                    floor,
328                    total = d.total_bytes,
329                    "retention: free-disk floor exceeds total disk size; refusing to prune (misconfigured?)"
330                );
331                let _ = repo::log_event(
332                    pool,
333                    None,
334                    "disk_pressure",
335                    "critical",
336                    json!({ "reason": "floor_unsatisfiable", "min_free_bytes": floor, "total_bytes": d.total_bytes }),
337                )
338                .await;
339            }
340        }
341        Some(mut prev) => {
342            let mut guard = 0;
343            let mut futile = false;
344            while prev.free_bytes < floor && guard < 200 {
345                guard += 1;
346                let before = prev.free_bytes;
347                let batch: Vec<(String, String)> = sqlx::query_as(
348                    "SELECT id, path FROM segments WHERE locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
349                )
350                .fetch_all(pool)
351                .await?;
352                if batch.is_empty() {
353                    tracing::warn!(
354                        free_bytes = before,
355                        floor,
356                        "retention: below disk-free floor but no deletable segments remain to prune"
357                    );
358                    break;
359                }
360                for (seg_id, path) in batch {
361                    if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
362                        disk_deleted += 1;
363                    }
364                }
365                match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
366                    Some(d) if d.free_bytes > before => prev = d,
367                    Some(_) => {
368                        futile = true;
369                        break;
370                    }
371                    None => break,
372                }
373            }
374            if futile {
375                tracing::error!(
376                    free_bytes = prev.free_bytes,
377                    floor,
378                    "retention: pruning recordings is not recovering free space (disk filled by non-recording data?); stopping"
379                );
380                let _ = repo::log_event(
381                    pool,
382                    None,
383                    "disk_pressure",
384                    "critical",
385                    json!({ "reason": "prune_not_recovering_space", "min_free_bytes": floor, "deleted": disk_deleted }),
386                )
387                .await;
388            }
389        }
390    }
391    if disk_deleted > 0 {
392        let _ = repo::log_event(
393            pool,
394            None,
395            "disk_pressure",
396            "critical",
397            json!({ "deleted": disk_deleted, "reason": "free_floor", "min_free_bytes": floor }),
398        )
399        .await;
400        tracing::warn!(deleted = disk_deleted, "retention: disk-free-floor cleanup");
401    }
402
403    // 5) Prune old AI detections (the table grows unbounded otherwise).
404    let det_cutoff = Utc::now() - chrono::Duration::hours(cfg.detection_retention_hours.max(1));
405    let pruned = sqlx::query("DELETE FROM detections WHERE created_at < ?")
406        .bind(det_cutoff)
407        .execute(pool)
408        .await?
409        .rows_affected();
410    if pruned > 0 {
411        tracing::info!(deleted = pruned, "retention: pruned old detections");
412    }
413    // Prune the transactional outbox on the same TTL (until an edge→cloud relay acks + prunes by seq).
414    let ob_pruned = sqlx::query("DELETE FROM outbox WHERE created_at < ?")
415        .bind(det_cutoff)
416        .execute(pool)
417        .await?
418        .rows_affected();
419    if ob_pruned > 0 {
420        tracing::info!(deleted = ob_pruned, "retention: pruned old outbox rows");
421    }
422
423    // 6) Prune old zone events and delete their evidence frames (same TTL as detections).
424    let old_zone_events: Vec<(String, Option<String>)> =
425        sqlx::query_as("SELECT id, evidence_path FROM zone_events WHERE created_at < ?")
426            .bind(det_cutoff)
427            .fetch_all(pool)
428            .await?;
429    if !old_zone_events.is_empty() {
430        for (_id, evidence) in &old_zone_events {
431            if let Some(name) = evidence.as_deref().and_then(|u| u.rsplit('/').next()) {
432                let _ = tokio::fs::remove_file(cfg.snapshots_dir.join(name)).await;
433            }
434        }
435        let zpruned = sqlx::query("DELETE FROM zone_events WHERE created_at < ?")
436            .bind(det_cutoff)
437            .execute(pool)
438            .await?
439            .rows_affected();
440        tracing::info!(
441            deleted = zpruned,
442            "retention: pruned old zone events + evidence"
443        );
444    }
445
446    // 7) Prune kernel auth bookkeeping: stale audit log + expired sessions. (Domain entry events +
447    //    their evidence frames are pruned by the entry app's own retention loop, not the kernel.)
448    let audit_cutoff = Utc::now() - chrono::Duration::days(cfg.audit_retention_days.max(1));
449    let apruned = sqlx::query("DELETE FROM audit_log WHERE created_at < ?")
450        .bind(audit_cutoff)
451        .execute(pool)
452        .await?
453        .rows_affected();
454    if apruned > 0 {
455        tracing::info!(deleted = apruned, "retention: pruned old audit log entries");
456    }
457    let spruned = sqlx::query("DELETE FROM sessions WHERE expires_at < ?")
458        .bind(Utc::now())
459        .execute(pool)
460        .await?
461        .rows_affected();
462    if spruned > 0 {
463        tracing::debug!(deleted = spruned, "retention: pruned expired sessions");
464    }
465
466    // 8) Prune the generic event log (camera-status events, disk-pressure warnings, and the entry
467    //    mirrors written by the ANPR engine). It is otherwise unbounded. The alert notifier advances
468    //    a durable cursor over recent rows, so deleting rows older than the (long) entry TTL — which
469    //    are far past delivery — is safe.
470    let evpruned = sqlx::query("DELETE FROM events WHERE created_at < ?")
471        .bind(audit_cutoff)
472        .execute(pool)
473        .await?
474        .rows_affected();
475    if evpruned > 0 {
476        tracing::info!(deleted = evpruned, "retention: pruned old event-log rows");
477    }
478
479    // 8b) Prune the webhook delivery ledger (one row per delivery attempt, per subscription, per event)
480    //     past the audit horizon. The delivery cursor lives on the subscription, not these rows, so
481    //     deleting old attempt records is safe — they are an at-rest audit trail, not delivery state.
482    let wdpruned = sqlx::query("DELETE FROM webhook_deliveries WHERE created_at < ?")
483        .bind(audit_cutoff)
484        .execute(pool)
485        .await?
486        .rows_affected();
487    if wdpruned > 0 {
488        tracing::info!(
489            deleted = wdpruned,
490            "retention: pruned old webhook-delivery rows"
491        );
492    }
493
494    // 8c) Prune RESOLVED recording-gap rows (filled/failed) past the audit horizon. Pending gaps are
495    //     left for the ANR re-fill engine to act on (they age out of its query via anr_max_gap_hours).
496    let gpruned = sqlx::query(
497        "DELETE FROM recording_gaps WHERE fill_state IN ('filled','failed') AND created_at < ?",
498    )
499    .bind(audit_cutoff)
500    .execute(pool)
501    .await?
502    .rows_affected();
503    if gpruned > 0 {
504        tracing::info!(
505            deleted = gpruned,
506            "retention: pruned resolved recording-gap rows"
507        );
508    }
509
510    // 9) Prune scheduled snapshots past their retention window. The cutoff is `taken_at` (capture
511    //    time, not the row's `created_at`). Delete the file first; only drop the DB row when the
512    //    file is gone (mirrors the segment unlink pattern). Skipped entirely when hours = 0.
513    if cfg.snapshot_retention_hours > 0 {
514        let snap_cutoff = Utc::now() - chrono::Duration::hours(cfg.snapshot_retention_hours);
515        let rows: Vec<(String, String)> =
516            sqlx::query_as("SELECT id, path FROM snapshots WHERE taken_at < ?")
517                .bind(snap_cutoff)
518                .fetch_all(pool)
519                .await?;
520        let mut snap_deleted: u64 = 0;
521        for (snap_id, path) in rows {
522            if unlink_snapshot(&path).await {
523                sqlx::query("DELETE FROM snapshots WHERE id = ?")
524                    .bind(&snap_id)
525                    .execute(pool)
526                    .await?;
527                snap_deleted += 1;
528            }
529        }
530        if snap_deleted > 0 {
531            tracing::info!(deleted = snap_deleted, "retention: pruned old snapshots");
532        }
533    }
534
535    // 10) Prune on-demand archive exports + finished backup-job rows past the archive retention
536    //     window. Delete the .zip files by mtime, then drop any backup_jobs that have finished before
537    //     the cutoff (both policy runs and archive exports). Skipped entirely when hours = 0.
538    if cfg.archive_retention_hours > 0 {
539        let cutoff = Utc::now() - chrono::Duration::hours(cfg.archive_retention_hours);
540        if let Ok(mut entries) = tokio::fs::read_dir(&cfg.archive_dir).await {
541            let mut removed: u64 = 0;
542            while let Ok(Some(ent)) = entries.next_entry().await {
543                let path = ent.path();
544                if path.extension().and_then(|e| e.to_str()) != Some("zip") {
545                    continue;
546                }
547                let stale = ent
548                    .metadata()
549                    .await
550                    .ok()
551                    .and_then(|m| m.modified().ok())
552                    .map(|t| DateTime::<Utc>::from(t) < cutoff)
553                    .unwrap_or(false);
554                if stale && tokio::fs::remove_file(&path).await.is_ok() {
555                    removed += 1;
556                }
557            }
558            if removed > 0 {
559                tracing::info!(deleted = removed, "retention: pruned old archive exports");
560            }
561        }
562        let jpruned = sqlx::query(
563            "DELETE FROM backup_jobs WHERE finished_at IS NOT NULL AND finished_at < ?",
564        )
565        .bind(cutoff)
566        .execute(pool)
567        .await?
568        .rows_affected();
569        if jpruned > 0 {
570            tracing::info!(
571                deleted = jpruned,
572                "retention: pruned old finished backup jobs"
573            );
574        }
575    }
576    Ok(())
577}
578
579#[cfg(test)]
580mod tests {
581    use super::*;
582
583    // ----- helpers -------------------------------------------------------
584
585    fn unique_path(prefix: &str) -> std::path::PathBuf {
586        use std::sync::atomic::{AtomicU64, Ordering};
587        static N: AtomicU64 = AtomicU64::new(0);
588        let n = N.fetch_add(1, Ordering::Relaxed);
589        std::env::temp_dir().join(format!("{prefix}-{}-{n}", std::process::id()))
590    }
591
592    async fn mem_pool() -> SqlitePool {
593        let pool = sqlx::sqlite::SqlitePoolOptions::new()
594            .max_connections(1)
595            .connect("sqlite::memory:")
596            .await
597            .unwrap();
598        crate::db::run_migrations(&pool).await.unwrap();
599        pool
600    }
601
602    /// A Config wired so that ONLY age-retention (step 1) and per-camera quota (step 2) can act:
603    /// the global size cap is effectively infinite, the disk-free floor is 0 (step 4 never deletes,
604    /// regardless of whether statvfs succeeds), and snapshot/archive prunes are disabled.
605    fn test_cfg() -> Config {
606        let mut cfg = Config::from_env();
607        cfg.max_recordings_bytes = u64::MAX / 4;
608        cfg.min_free_disk_bytes = 0;
609        cfg.recordings_dir = std::env::temp_dir();
610        cfg.snapshot_retention_hours = 0;
611        cfg.archive_retention_hours = 0;
612        cfg.detection_retention_hours = 168;
613        cfg.audit_retention_days = 365;
614        cfg
615    }
616
617    async fn insert_camera(pool: &SqlitePool, id: &str, retention_hours: i64, quota: Option<i64>) {
618        let now = Utc::now();
619        sqlx::query(
620            "INSERT INTO cameras (id, name, retention_hours, storage_quota_bytes, created_at, updated_at)
621             VALUES (?, ?, ?, ?, ?, ?)",
622        )
623        .bind(id)
624        .bind(id)
625        .bind(retention_hours)
626        .bind(quota)
627        .bind(now)
628        .bind(now)
629        .execute(pool)
630        .await
631        .unwrap();
632    }
633
634    async fn insert_segment(
635        pool: &SqlitePool,
636        id: &str,
637        camera_id: &str,
638        end: DateTime<Utc>,
639        size_bytes: i64,
640        locked: i64,
641        evidence_locked: i64,
642    ) {
643        sqlx::query(
644            "INSERT INTO segments
645                (id, camera_id, path, start_time, end_time, duration_s, size_bytes, locked, evidence_locked, created_at)
646             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
647        )
648        .bind(id)
649        .bind(camera_id)
650        // points at a file that does not exist -> unlink_segment hits the NotFound->true branch.
651        .bind(format!("/nonexistent/heldar-test/{id}.mp4"))
652        .bind(end)
653        .bind(end)
654        .bind(60.0_f64)
655        .bind(size_bytes)
656        .bind(locked)
657        .bind(evidence_locked)
658        .bind(end)
659        .execute(pool)
660        .await
661        .unwrap();
662    }
663
664    async fn seg_exists(pool: &SqlitePool, id: &str) -> bool {
665        let c: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM segments WHERE id = ?")
666            .bind(id)
667            .fetch_one(pool)
668            .await
669            .unwrap();
670        c == 1
671    }
672
673    async fn seg_count(pool: &SqlitePool) -> i64 {
674        sqlx::query_scalar("SELECT COUNT(*) FROM segments")
675            .fetch_one(pool)
676            .await
677            .unwrap()
678    }
679
680    async fn event_type_count(pool: &SqlitePool, event_type: &str) -> i64 {
681        sqlx::query_scalar("SELECT COUNT(*) FROM events WHERE event_type = ?")
682            .bind(event_type)
683            .fetch_one(pool)
684            .await
685            .unwrap()
686    }
687
688    async fn camera_quota_event_count(pool: &SqlitePool) -> i64 {
689        sqlx::query_scalar(
690            "SELECT COUNT(*) FROM events WHERE event_type = 'disk_pressure' AND payload LIKE '%camera_quota%'",
691        )
692        .fetch_one(pool)
693        .await
694        .unwrap()
695    }
696
697    // ----- unlink helpers ------------------------------------------------
698
699    #[tokio::test]
700    async fn unlink_segment_reports_removable_for_missing_path() {
701        // Already-absent file: the DB row should be removed (returns true).
702        assert!(unlink_segment("/nonexistent/heldar/definitely-not-here.mp4").await);
703    }
704
705    #[tokio::test]
706    async fn unlink_segment_deletes_existing_file() {
707        let p = unique_path("heldar-seg");
708        tokio::fs::write(&p, b"x").await.unwrap();
709        assert!(p.exists());
710        assert!(unlink_segment(p.to_str().unwrap()).await);
711        assert!(!p.exists());
712    }
713
714    #[tokio::test]
715    async fn unlink_segment_keeps_row_for_directory() {
716        // remove_file on a directory fails with a non-NotFound error -> keep the row (false).
717        let d = unique_path("heldar-dir");
718        tokio::fs::create_dir(&d).await.unwrap();
719        assert!(!unlink_segment(d.to_str().unwrap()).await);
720        assert!(d.exists());
721        let _ = tokio::fs::remove_dir(&d).await;
722    }
723
724    #[tokio::test]
725    async fn unlink_snapshot_handles_missing_and_existing() {
726        // Mirrors unlink_segment: missing -> true; existing -> deleted + true.
727        assert!(unlink_snapshot("/nonexistent/heldar/none.jpg").await);
728        let p = unique_path("heldar-snap");
729        tokio::fs::write(&p, b"x").await.unwrap();
730        assert!(unlink_snapshot(p.to_str().unwrap()).await);
731        assert!(!p.exists());
732    }
733
734    // ----- sweep: age retention -----------------------------------------
735
736    #[tokio::test]
737    async fn sweep_age_retention_deletes_only_old_unlocked() {
738        let pool = mem_pool().await;
739        let cfg = test_cfg();
740        let now = Utc::now();
741
742        insert_camera(&pool, "cam_age", 24, None).await;
743        // Recent unlocked segment: kept (newer than the 24h cutoff).
744        insert_segment(
745            &pool,
746            "seg_recent",
747            "cam_age",
748            now - chrono::Duration::hours(1),
749            100,
750            0,
751            0,
752        )
753        .await;
754        // Old unlocked segment: deleted by age policy.
755        insert_segment(
756            &pool,
757            "seg_old",
758            "cam_age",
759            now - chrono::Duration::hours(48),
760            100,
761            0,
762            0,
763        )
764        .await;
765        // Old but read-locked (transient export lock): excluded from age prune -> kept.
766        insert_segment(
767            &pool,
768            "seg_old_locked",
769            "cam_age",
770            now - chrono::Duration::hours(48),
771            100,
772            1,
773            0,
774        )
775        .await;
776        // Old but evidence-locked (durable hold): excluded from age prune -> kept.
777        insert_segment(
778            &pool,
779            "seg_old_ev",
780            "cam_age",
781            now - chrono::Duration::hours(48),
782            100,
783            0,
784            1,
785        )
786        .await;
787
788        sweep(&pool, &cfg).await.unwrap();
789
790        assert!(seg_exists(&pool, "seg_recent").await);
791        assert!(
792            !seg_exists(&pool, "seg_old").await,
793            "old unlocked segment should be pruned by age"
794        );
795        assert!(
796            seg_exists(&pool, "seg_old_locked").await,
797            "read-locked segment must survive age prune"
798        );
799        assert!(
800            seg_exists(&pool, "seg_old_ev").await,
801            "evidence-locked segment must survive age prune"
802        );
803        assert_eq!(seg_count(&pool).await, 3);
804        // age_deleted > 0 logs exactly one retention_delete event for the sweep.
805        assert_eq!(event_type_count(&pool, "retention_delete").await, 1);
806    }
807
808    // ----- sweep: per-camera quota --------------------------------------
809
810    #[tokio::test]
811    async fn sweep_camera_quota_prunes_only_to_budget_keeps_evidence() {
812        let pool = mem_pool().await;
813        let cfg = test_cfg();
814        let now = Utc::now();
815
816        // Huge retention so age policy never fires; only the quota acts here.
817        insert_camera(&pool, "cam_q", 100_000, Some(1000)).await;
818        // Protected (evidence-locked) footage counts against the quota but is never deleted.
819        insert_segment(
820            &pool,
821            "sL",
822            "cam_q",
823            now - chrono::Duration::hours(5),
824            600,
825            0,
826            1,
827        )
828        .await;
829        // Three deletable segments (total 1200) over the budget (quota 1000 - protected 600 = 400).
830        insert_segment(
831            &pool,
832            "s1",
833            "cam_q",
834            now - chrono::Duration::hours(3),
835            400,
836            0,
837            0,
838        )
839        .await;
840        insert_segment(
841            &pool,
842            "s2",
843            "cam_q",
844            now - chrono::Duration::hours(2),
845            400,
846            0,
847            0,
848        )
849        .await;
850        insert_segment(
851            &pool,
852            "s3",
853            "cam_q",
854            now - chrono::Duration::hours(1),
855            400,
856            0,
857            0,
858        )
859        .await;
860
861        sweep(&pool, &cfg).await.unwrap();
862
863        // Correctness invariant: prune ONLY enough oldest segments to reach budget (400), then stop.
864        // Deleting s1+s2 brings the deletable footprint to exactly 400 == budget, so s3 is within
865        // quota and MUST be kept; pruning it would needlessly destroy recoverable footage.
866        assert!(
867            seg_exists(&pool, "sL").await,
868            "evidence-locked footage must survive the quota prune"
869        );
870        assert!(
871            !seg_exists(&pool, "s1").await,
872            "oldest over-budget segment is pruned"
873        );
874        assert!(
875            !seg_exists(&pool, "s2").await,
876            "second-oldest pruned to reach budget"
877        );
878        assert!(
879            seg_exists(&pool, "s3").await,
880            "s3 is within quota once s1+s2 are gone and must NOT be over-deleted"
881        );
882        assert_eq!(
883            seg_count(&pool).await,
884            2,
885            "only s1,s2 pruned to reach budget; sL+s3 remain"
886        );
887        assert!(
888            camera_quota_event_count(&pool).await >= 1,
889            "a camera_quota disk_pressure event should be logged"
890        );
891    }
892
893    #[tokio::test]
894    async fn delete_segment_if_unlocked_spares_locked_rows() {
895        // The TOCTOU guard: the conditional DELETE must refuse a row that became evidence-locked
896        // (or read-locked) since it was selected for pruning, and remove an unlocked one. This is
897        // the atomic primitive that makes pruning safe against a hold committing mid-sweep.
898        let pool = mem_pool().await;
899        let now = Utc::now();
900        insert_camera(&pool, "cam_t", 100_000, None).await;
901        insert_segment(&pool, "held", "cam_t", now, 100, 0, 1).await; // evidence_locked = 1
902        insert_segment(&pool, "rlok", "cam_t", now, 100, 1, 0).await; // locked = 1 (export read-lock)
903        insert_segment(&pool, "free", "cam_t", now, 100, 0, 0).await; // deletable
904
905        assert!(
906            !delete_segment_if_unlocked(&pool, "held", "/nonexistent/held.mp4")
907                .await
908                .unwrap(),
909            "evidence-locked row must not be removable"
910        );
911        assert!(
912            !delete_segment_if_unlocked(&pool, "rlok", "/nonexistent/rlok.mp4")
913                .await
914                .unwrap(),
915            "read-locked row must not be removable"
916        );
917        assert!(seg_exists(&pool, "held").await);
918        assert!(seg_exists(&pool, "rlok").await);
919
920        assert!(
921            delete_segment_if_unlocked(&pool, "free", "/nonexistent/free.mp4")
922                .await
923                .unwrap(),
924            "unlocked row is removed"
925        );
926        assert!(!seg_exists(&pool, "free").await);
927    }
928
929    #[tokio::test]
930    async fn sweep_camera_quota_protected_exceeds_deletes_nothing() {
931        let pool = mem_pool().await;
932        let cfg = test_cfg();
933        let now = Utc::now();
934
935        // Protected footage alone (500) exceeds the quota (100): deleting other footage cannot help,
936        // so nothing is pruned and a warning is logged instead.
937        insert_camera(&pool, "cam_over", 100_000, Some(100)).await;
938        insert_segment(
939            &pool,
940            "ovL",
941            "cam_over",
942            now - chrono::Duration::hours(5),
943            500,
944            0,
945            1,
946        )
947        .await;
948        insert_segment(
949            &pool,
950            "ov1",
951            "cam_over",
952            now - chrono::Duration::hours(1),
953            50,
954            0,
955            0,
956        )
957        .await;
958
959        sweep(&pool, &cfg).await.unwrap();
960
961        assert!(seg_exists(&pool, "ovL").await);
962        assert!(
963            seg_exists(&pool, "ov1").await,
964            "other footage must not be wiped when protected footage exceeds the quota"
965        );
966        assert_eq!(seg_count(&pool).await, 2);
967        assert!(
968            camera_quota_event_count(&pool).await >= 1,
969            "a camera_quota warning should be logged"
970        );
971    }
972
973    // ----- sweep: detection pruning -------------------------------------
974
975    #[tokio::test]
976    async fn sweep_prunes_old_detections() {
977        let pool = mem_pool().await;
978        let cfg = test_cfg(); // detection_retention_hours = 168
979
980        insert_camera(&pool, "cam_d", 24, None).await;
981        let now = Utc::now();
982        // Older than the 168h TTL -> pruned.
983        sqlx::query(
984            "INSERT INTO detections (id, camera_id, task_type, timestamp, created_at) VALUES (?, ?, ?, ?, ?)",
985        )
986        .bind("det_old")
987        .bind("cam_d")
988        .bind("object")
989        .bind(now - chrono::Duration::hours(200))
990        .bind(now - chrono::Duration::hours(200))
991        .execute(&pool)
992        .await
993        .unwrap();
994        // Recent -> kept.
995        sqlx::query(
996            "INSERT INTO detections (id, camera_id, task_type, timestamp, created_at) VALUES (?, ?, ?, ?, ?)",
997        )
998        .bind("det_new")
999        .bind("cam_d")
1000        .bind("object")
1001        .bind(now - chrono::Duration::hours(1))
1002        .bind(now - chrono::Duration::hours(1))
1003        .execute(&pool)
1004        .await
1005        .unwrap();
1006
1007        sweep(&pool, &cfg).await.unwrap();
1008
1009        let remaining: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM detections")
1010            .fetch_one(&pool)
1011            .await
1012            .unwrap();
1013        assert_eq!(remaining, 1);
1014        let kept: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM detections WHERE id = ?")
1015            .bind("det_new")
1016            .fetch_one(&pool)
1017            .await
1018            .unwrap();
1019        assert_eq!(kept, 1, "the recent detection must be retained");
1020    }
1021}