Skip to main content

heldar_kernel/services/
retention.rs

1//! Retention sweeper: deletes recordings past each camera's age policy, and enforces a global
2//! size cap by pruning the oldest deletable segments. Segments under a durable evidence hold
3//! (`evidence_locked = 1`) are never deleted, and a segment with a transient export read-lock
4//! (`locked = 1`) is skipped while the export is in flight. Both are excluded from every prune.
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use chrono::{DateTime, Utc};
10use serde_json::json;
11use sqlx::SqlitePool;
12
13use crate::config::Config;
14use crate::repo;
15use crate::services::{settings, storage};
16
17/// Delete a segment's file and report whether its DB row should now be removed. The row is removed
18/// only when the file is actually gone — deleted just now, or already absent (`NotFound`). If the
19/// delete fails for any other reason (permissions, I/O error), we keep the DB row so the file is not
20/// orphaned-yet-forgotten: the next sweep retries it, and the size/disk accounting stays truthful.
21async fn unlink_segment(path: &str) -> bool {
22    match tokio::fs::remove_file(path).await {
23        Ok(()) => true,
24        Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
25        Err(e) => {
26            tracing::error!(path, error = %e, "retention: failed to delete segment file; keeping DB row to retry next sweep");
27            false
28        }
29    }
30}
31
32/// Remove one segment row IF it is still unlocked, then best-effort delete its file. Returns
33/// whether the row was removed.
34///
35/// The conditional `DELETE ... WHERE locked = 0 AND evidence_locked = 0` is a TOCTOU guard: SQLite
36/// serializes it against the incident/export lock `UPDATE`s, so an evidence-hold or export
37/// read-lock that commits AFTER this segment was selected for pruning wins the race — `rows_affected`
38/// is 0 and the file is never touched. Only when the row is actually removed do we unlink the file.
39/// A rare unlink failure then orphans the file (the `path` column is UNIQUE, so an orphan sweep can
40/// reclaim it) — strictly preferable to ever deleting protected evidence.
41async fn delete_segment_if_unlocked(
42    pool: &SqlitePool,
43    seg_id: &str,
44    path: &str,
45) -> anyhow::Result<bool> {
46    let removed =
47        sqlx::query("DELETE FROM segments WHERE id = ? AND locked = 0 AND evidence_locked = 0")
48            .bind(seg_id)
49            .execute(pool)
50            .await?
51            .rows_affected();
52    if removed == 1 {
53        unlink_segment(path).await;
54        Ok(true)
55    } else {
56        Ok(false)
57    }
58}
59
60/// Delete a snapshot's file and report whether its DB row should now be removed. Mirrors
61/// [`unlink_segment`]: the row is removed only when the file is actually gone (deleted just now or
62/// already absent); on any other delete error we keep the row so the next sweep retries.
63async fn unlink_snapshot(path: &str) -> bool {
64    match tokio::fs::remove_file(path).await {
65        Ok(()) => true,
66        Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
67        Err(e) => {
68            tracing::error!(path, error = %e, "retention: failed to delete snapshot file; keeping DB row to retry next sweep");
69            false
70        }
71    }
72}
73
74pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
75    let mut tick = tokio::time::interval(Duration::from_secs(cfg.retention_interval_s.max(30)));
76    loop {
77        tick.tick().await;
78        if let Err(e) = sweep(&pool, &cfg).await {
79            tracing::error!(error = %e, "retention: sweep failed");
80        }
81    }
82}
83
84async fn sweep(pool: &SqlitePool, cfg: &Config) -> anyhow::Result<()> {
85    // 1) Age-based retention, per-camera.
86    let mut age_deleted: u64 = 0;
87    let cams: Vec<(String, i64)> = sqlx::query_as("SELECT id, retention_hours FROM cameras")
88        .fetch_all(pool)
89        .await?;
90    for (id, hours) in cams {
91        let cutoff = Utc::now() - chrono::Duration::hours(hours.max(1));
92        let rows: Vec<(String, String)> = sqlx::query_as(
93            "SELECT id, path FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0 AND end_time < ?",
94        )
95        .bind(&id)
96        .bind(cutoff)
97        .fetch_all(pool)
98        .await?;
99        for (seg_id, path) in rows {
100            if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
101                age_deleted += 1;
102            }
103        }
104    }
105    if age_deleted > 0 {
106        let _ = repo::log_event(
107            pool,
108            None,
109            "retention_delete",
110            "info",
111            json!({ "deleted": age_deleted, "reason": "age" }),
112        )
113        .await;
114        tracing::info!(deleted = age_deleted, "retention: age-based cleanup");
115    }
116
117    // 2) Per-camera storage quota. Mirrors the global size cap (step 3) but scoped to one camera:
118    //    keep each capped camera's deletable footprint within its quota by pruning its oldest
119    //    unlocked segments. Evidence-locked footage (`evidence_locked = 1`) is protected and counts
120    //    against the quota; if it alone meets or exceeds the quota, we warn and delete nothing rather
121    //    than wiping the camera's other footage. Only cameras with `storage_quota_bytes IS NOT NULL`
122    //    are capped here; the rest are governed solely by the global cap + disk floor below.
123    let mut quota_deleted: u64 = 0;
124    let quota_cams: Vec<(String, i64)> = sqlx::query_as(
125        "SELECT id, storage_quota_bytes FROM cameras WHERE storage_quota_bytes IS NOT NULL",
126    )
127    .fetch_all(pool)
128    .await?;
129    for (cam_id, quota) in quota_cams {
130        if quota <= 0 {
131            continue;
132        }
133        let protected_bytes: i64 = sqlx::query_scalar(
134            "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE camera_id = ? AND evidence_locked = 1",
135        )
136        .bind(&cam_id)
137        .fetch_one(pool)
138        .await?;
139        let budget = quota - protected_bytes;
140        if budget <= 0 {
141            if protected_bytes > quota {
142                tracing::warn!(
143                    camera_id = %cam_id,
144                    protected_bytes,
145                    quota,
146                    "retention: evidence-locked footage exceeds the camera quota; not deleting other footage"
147                );
148                let _ = repo::log_event(
149                    pool,
150                    Some(&cam_id),
151                    "disk_pressure",
152                    "warning",
153                    json!({ "reason": "camera_quota", "camera_id": &cam_id, "protected_bytes": protected_bytes, "quota_bytes": quota }),
154                )
155                .await;
156            }
157            continue;
158        }
159        loop {
160            let deletable_total: i64 = sqlx::query_scalar(
161                "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0",
162            )
163            .bind(&cam_id)
164            .fetch_one(pool)
165            .await?;
166            if deletable_total <= budget {
167                break;
168            }
169            let batch: Vec<(String, String, i64)> = sqlx::query_as(
170                "SELECT id, path, size_bytes FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
171            )
172            .bind(&cam_id)
173            .fetch_all(pool)
174            .await?;
175            if batch.is_empty() {
176                break;
177            }
178            let mut remaining = deletable_total;
179            let mut progressed = 0u64;
180            for (seg_id, path, size) in batch {
181                // Stop the instant the budget is met — never over-prune within a batch. The oldest
182                // segments are deleted first; once enough have gone to bring the deletable footprint
183                // to-or-under budget, the rest are within quota and must be kept (footage is
184                // unrecoverable on a DVR).
185                if remaining <= budget {
186                    break;
187                }
188                if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
189                    remaining -= size;
190                    quota_deleted += 1;
191                    progressed += 1;
192                }
193            }
194            if progressed == 0 {
195                tracing::error!(camera_id = %cam_id, "retention: camera-quota prune made no progress (segment file deletes failing); stopping this camera");
196                break;
197            }
198        }
199    }
200    if quota_deleted > 0 {
201        let _ = repo::log_event(
202            pool,
203            None,
204            "disk_pressure",
205            "warning",
206            json!({ "deleted": quota_deleted, "reason": "camera_quota" }),
207        )
208        .await;
209        tracing::warn!(
210            deleted = quota_deleted,
211            "retention: per-camera quota cleanup"
212        );
213    }
214
215    // 3) Global size cap: prune the oldest DELETABLE segments until the deletable footprint fits the
216    //    budget. The budget is the cap minus the evidence-locked bytes we cannot delete — counting
217    //    those in the comparison would otherwise make us delete every deletable segment. We measure
218    //    the protected footprint by `evidence_locked = 1` (the DURABLE hold), not the transient
219    //    `locked` read-lock: an in-flight export must not inflate the protected total and starve the
220    //    cap. Deletable = `locked = 0 AND evidence_locked = 0` (skip both the read-lock and the hold).
221    // Operator-tunable from the dashboard (settings table); a positive override wins, else the env
222    // default (`HELDAR_MAX_RECORDINGS_GB`). Non-positive overrides are ignored so a stray 0 can't
223    // silently disable the cap.
224    let max = settings::get_i64(pool, settings::RECORDING_MAX_BYTES)
225        .await
226        .filter(|&v| v > 0)
227        .unwrap_or(cfg.max_recordings_bytes as i64);
228    let protected_bytes: i64 = sqlx::query_scalar(
229        "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE evidence_locked = 1",
230    )
231    .fetch_one(pool)
232    .await?;
233    let budget = max - protected_bytes;
234    let mut size_deleted: u64 = 0;
235
236    if budget <= 0 {
237        // Evidence-locked footage alone meets or exceeds the cap; deleting other footage cannot
238        // help. Warn instead of wiping everything.
239        let unlocked: i64 = sqlx::query_scalar(
240            "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0 AND evidence_locked = 0",
241        )
242        .fetch_one(pool)
243        .await?;
244        if protected_bytes > max {
245            tracing::warn!(
246                protected_bytes,
247                max,
248                "retention: evidence-locked footage exceeds the size cap; not deleting other footage"
249            );
250            let _ = repo::log_event(
251                pool,
252                None,
253                "disk_pressure",
254                "warning",
255                json!({ "reason": "locked_exceeds_cap", "protected_bytes": protected_bytes, "unlocked_bytes": unlocked, "max_bytes": max }),
256            )
257            .await;
258        }
259    } else {
260        loop {
261            let unlocked_total: i64 = sqlx::query_scalar(
262                "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0 AND evidence_locked = 0",
263            )
264            .fetch_one(pool)
265            .await?;
266            if unlocked_total <= budget {
267                break;
268            }
269            let batch: Vec<(String, String, i64)> = sqlx::query_as(
270                "SELECT id, path, size_bytes FROM segments WHERE locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
271            )
272            .fetch_all(pool)
273            .await?;
274            if batch.is_empty() {
275                break;
276            }
277            let mut remaining = unlocked_total;
278            let mut progressed = 0u64;
279            for (seg_id, path, size) in batch {
280                // Stop the instant the global cap is satisfied — never over-prune within a batch.
281                if remaining <= budget {
282                    break;
283                }
284                if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
285                    remaining -= size;
286                    size_deleted += 1;
287                    progressed += 1;
288                }
289            }
290            if progressed == 0 {
291                // Every file in the batch failed to delete; we'd re-select the same rows forever.
292                tracing::error!("retention: size-cap prune made no progress (segment file deletes failing); stopping this sweep");
293                break;
294            }
295        }
296    }
297
298    if size_deleted > 0 {
299        let _ = repo::log_event(
300            pool,
301            None,
302            "disk_pressure",
303            "warning",
304            json!({ "deleted": size_deleted, "reason": "size_cap", "max_bytes": max }),
305        )
306        .await;
307        tracing::warn!(deleted = size_deleted, "retention: size-cap cleanup");
308    }
309
310    // 4) Disk-free floor: if the recordings filesystem drops below the free-space floor, prune the
311    //    oldest unlocked segments until back above it. Self-limiting: it stops if a delete batch
312    //    does not actually recover free space (disk filled by non-recording data), and refuses to
313    //    run if the floor exceeds the whole disk — so it never destroys the footprint for nothing.
314    // Operator-tunable free-disk floor (settings table); env default `HELDAR_MIN_FREE_DISK_GB` otherwise.
315    // 0 is a valid override meaning "no floor".
316    let floor = settings::get_i64(pool, settings::RECORDING_MIN_FREE_BYTES)
317        .await
318        .filter(|&v| v >= 0)
319        .map(|v| v as u64)
320        .unwrap_or(cfg.min_free_disk_bytes);
321    let mut disk_deleted: u64 = 0;
322    match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
323        None => {
324            tracing::warn!(
325                "retention: could not read disk stats; free-floor check skipped this sweep"
326            );
327            let _ = repo::log_event(
328                pool,
329                None,
330                "disk_pressure",
331                "warning",
332                json!({ "reason": "disk_stats_unavailable" }),
333            )
334            .await;
335        }
336        Some(d) if floor >= d.total_bytes => {
337            if d.free_bytes < floor {
338                tracing::warn!(
339                    floor,
340                    total = d.total_bytes,
341                    "retention: free-disk floor exceeds total disk size; refusing to prune (misconfigured?)"
342                );
343                let _ = repo::log_event(
344                    pool,
345                    None,
346                    "disk_pressure",
347                    "critical",
348                    json!({ "reason": "floor_unsatisfiable", "min_free_bytes": floor, "total_bytes": d.total_bytes }),
349                )
350                .await;
351            }
352        }
353        Some(mut prev) => {
354            let mut guard = 0;
355            let mut futile = false;
356            while prev.free_bytes < floor && guard < 200 {
357                guard += 1;
358                let before = prev.free_bytes;
359                let batch: Vec<(String, String)> = sqlx::query_as(
360                    "SELECT id, path FROM segments WHERE locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
361                )
362                .fetch_all(pool)
363                .await?;
364                if batch.is_empty() {
365                    tracing::warn!(
366                        free_bytes = before,
367                        floor,
368                        "retention: below disk-free floor but no deletable segments remain to prune"
369                    );
370                    break;
371                }
372                for (seg_id, path) in batch {
373                    if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
374                        disk_deleted += 1;
375                    }
376                }
377                match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
378                    Some(d) if d.free_bytes > before => prev = d,
379                    Some(_) => {
380                        futile = true;
381                        break;
382                    }
383                    None => break,
384                }
385            }
386            if futile {
387                tracing::error!(
388                    free_bytes = prev.free_bytes,
389                    floor,
390                    "retention: pruning recordings is not recovering free space (disk filled by non-recording data?); stopping"
391                );
392                let _ = repo::log_event(
393                    pool,
394                    None,
395                    "disk_pressure",
396                    "critical",
397                    json!({ "reason": "prune_not_recovering_space", "min_free_bytes": floor, "deleted": disk_deleted }),
398                )
399                .await;
400            }
401        }
402    }
403    if disk_deleted > 0 {
404        let _ = repo::log_event(
405            pool,
406            None,
407            "disk_pressure",
408            "critical",
409            json!({ "deleted": disk_deleted, "reason": "free_floor", "min_free_bytes": floor }),
410        )
411        .await;
412        tracing::warn!(deleted = disk_deleted, "retention: disk-free-floor cleanup");
413    }
414
415    // 5) Prune old AI detections (the table grows unbounded otherwise).
416    let det_cutoff = Utc::now() - chrono::Duration::hours(cfg.detection_retention_hours.max(1));
417    let pruned = sqlx::query("DELETE FROM detections WHERE created_at < ?")
418        .bind(det_cutoff)
419        .execute(pool)
420        .await?
421        .rows_affected();
422    if pruned > 0 {
423        tracing::info!(deleted = pruned, "retention: pruned old detections");
424    }
425    // Prune the transactional outbox on the same TTL (until an edge→cloud relay acks + prunes by seq).
426    let ob_pruned = sqlx::query("DELETE FROM outbox WHERE created_at < ?")
427        .bind(det_cutoff)
428        .execute(pool)
429        .await?
430        .rows_affected();
431    if ob_pruned > 0 {
432        tracing::info!(deleted = ob_pruned, "retention: pruned old outbox rows");
433    }
434
435    // 6) Prune old zone events and delete their evidence frames (same TTL as detections).
436    let old_zone_events: Vec<(String, Option<String>)> =
437        sqlx::query_as("SELECT id, evidence_path FROM zone_events WHERE created_at < ?")
438            .bind(det_cutoff)
439            .fetch_all(pool)
440            .await?;
441    if !old_zone_events.is_empty() {
442        for (_id, evidence) in &old_zone_events {
443            if let Some(name) = evidence.as_deref().and_then(|u| u.rsplit('/').next()) {
444                let _ = tokio::fs::remove_file(cfg.snapshots_dir.join(name)).await;
445            }
446        }
447        let zpruned = sqlx::query("DELETE FROM zone_events WHERE created_at < ?")
448            .bind(det_cutoff)
449            .execute(pool)
450            .await?
451            .rows_affected();
452        tracing::info!(
453            deleted = zpruned,
454            "retention: pruned old zone events + evidence"
455        );
456    }
457
458    // 7) Prune kernel auth bookkeeping: stale audit log + expired sessions. (Domain entry events +
459    //    their evidence frames are pruned by the entry app's own retention loop, not the kernel.)
460    let audit_cutoff = Utc::now() - chrono::Duration::days(cfg.audit_retention_days.max(1));
461    let apruned = sqlx::query("DELETE FROM audit_log WHERE created_at < ?")
462        .bind(audit_cutoff)
463        .execute(pool)
464        .await?
465        .rows_affected();
466    if apruned > 0 {
467        tracing::info!(deleted = apruned, "retention: pruned old audit log entries");
468    }
469    let spruned = sqlx::query("DELETE FROM sessions WHERE expires_at < ?")
470        .bind(Utc::now())
471        .execute(pool)
472        .await?
473        .rows_affected();
474    if spruned > 0 {
475        tracing::debug!(deleted = spruned, "retention: pruned expired sessions");
476    }
477
478    // 8) Prune the generic event log (camera-status events, disk-pressure warnings, and the entry
479    //    mirrors written by the ANPR engine). It is otherwise unbounded. The alert notifier advances
480    //    a durable cursor over recent rows, so deleting rows older than the (long) entry TTL — which
481    //    are far past delivery — is safe.
482    let evpruned = sqlx::query("DELETE FROM events WHERE created_at < ?")
483        .bind(audit_cutoff)
484        .execute(pool)
485        .await?
486        .rows_affected();
487    if evpruned > 0 {
488        tracing::info!(deleted = evpruned, "retention: pruned old event-log rows");
489    }
490
491    // 8b) Prune the webhook delivery ledger (one row per delivery attempt, per subscription, per event)
492    //     past the audit horizon. The delivery cursor lives on the subscription, not these rows, so
493    //     deleting old attempt records is safe — they are an at-rest audit trail, not delivery state.
494    let wdpruned = sqlx::query("DELETE FROM webhook_deliveries WHERE created_at < ?")
495        .bind(audit_cutoff)
496        .execute(pool)
497        .await?
498        .rows_affected();
499    if wdpruned > 0 {
500        tracing::info!(
501            deleted = wdpruned,
502            "retention: pruned old webhook-delivery rows"
503        );
504    }
505
506    // 8c) Prune RESOLVED recording-gap rows (filled/failed) past the audit horizon. Pending gaps are
507    //     left for the ANR re-fill engine to act on (they age out of its query via anr_max_gap_hours).
508    let gpruned = sqlx::query(
509        "DELETE FROM recording_gaps WHERE fill_state IN ('filled','failed') AND created_at < ?",
510    )
511    .bind(audit_cutoff)
512    .execute(pool)
513    .await?
514    .rows_affected();
515    if gpruned > 0 {
516        tracing::info!(
517            deleted = gpruned,
518            "retention: pruned resolved recording-gap rows"
519        );
520    }
521
522    // 9) Prune scheduled snapshots past their retention window. The cutoff is `taken_at` (capture
523    //    time, not the row's `created_at`). Delete the file first; only drop the DB row when the
524    //    file is gone (mirrors the segment unlink pattern). Skipped entirely when hours = 0.
525    if cfg.snapshot_retention_hours > 0 {
526        let snap_cutoff = Utc::now() - chrono::Duration::hours(cfg.snapshot_retention_hours);
527        let rows: Vec<(String, String)> =
528            sqlx::query_as("SELECT id, path FROM snapshots WHERE taken_at < ?")
529                .bind(snap_cutoff)
530                .fetch_all(pool)
531                .await?;
532        let mut snap_deleted: u64 = 0;
533        for (snap_id, path) in rows {
534            if unlink_snapshot(&path).await {
535                sqlx::query("DELETE FROM snapshots WHERE id = ?")
536                    .bind(&snap_id)
537                    .execute(pool)
538                    .await?;
539                snap_deleted += 1;
540            }
541        }
542        if snap_deleted > 0 {
543            tracing::info!(deleted = snap_deleted, "retention: pruned old snapshots");
544        }
545    }
546
547    // 10) Prune on-demand archive exports + finished backup-job rows past the archive retention
548    //     window. Delete the .zip files by mtime, then drop any backup_jobs that have finished before
549    //     the cutoff (both policy runs and archive exports). Skipped entirely when hours = 0.
550    if cfg.archive_retention_hours > 0 {
551        let cutoff = Utc::now() - chrono::Duration::hours(cfg.archive_retention_hours);
552        if let Ok(mut entries) = tokio::fs::read_dir(&cfg.archive_dir).await {
553            let mut removed: u64 = 0;
554            while let Ok(Some(ent)) = entries.next_entry().await {
555                let path = ent.path();
556                if path.extension().and_then(|e| e.to_str()) != Some("zip") {
557                    continue;
558                }
559                let stale = ent
560                    .metadata()
561                    .await
562                    .ok()
563                    .and_then(|m| m.modified().ok())
564                    .map(|t| DateTime::<Utc>::from(t) < cutoff)
565                    .unwrap_or(false);
566                if stale && tokio::fs::remove_file(&path).await.is_ok() {
567                    removed += 1;
568                }
569            }
570            if removed > 0 {
571                tracing::info!(deleted = removed, "retention: pruned old archive exports");
572            }
573        }
574        let jpruned = sqlx::query(
575            "DELETE FROM backup_jobs WHERE finished_at IS NOT NULL AND finished_at < ?",
576        )
577        .bind(cutoff)
578        .execute(pool)
579        .await?
580        .rows_affected();
581        if jpruned > 0 {
582            tracing::info!(
583                deleted = jpruned,
584                "retention: pruned old finished backup jobs"
585            );
586        }
587    }
588    Ok(())
589}
590
591#[cfg(test)]
592mod tests {
593    use super::*;
594
595    // ----- helpers -------------------------------------------------------
596
597    fn unique_path(prefix: &str) -> std::path::PathBuf {
598        use std::sync::atomic::{AtomicU64, Ordering};
599        static N: AtomicU64 = AtomicU64::new(0);
600        let n = N.fetch_add(1, Ordering::Relaxed);
601        std::env::temp_dir().join(format!("{prefix}-{}-{n}", std::process::id()))
602    }
603
604    async fn mem_pool() -> SqlitePool {
605        let pool = sqlx::sqlite::SqlitePoolOptions::new()
606            .max_connections(1)
607            .connect("sqlite::memory:")
608            .await
609            .unwrap();
610        crate::db::run_migrations(&pool).await.unwrap();
611        pool
612    }
613
614    /// A Config wired so that ONLY age-retention (step 1) and per-camera quota (step 2) can act:
615    /// the global size cap is effectively infinite, the disk-free floor is 0 (step 4 never deletes,
616    /// regardless of whether statvfs succeeds), and snapshot/archive prunes are disabled.
617    fn test_cfg() -> Config {
618        let mut cfg = Config::from_env();
619        cfg.max_recordings_bytes = u64::MAX / 4;
620        cfg.min_free_disk_bytes = 0;
621        cfg.recordings_dir = std::env::temp_dir();
622        cfg.snapshot_retention_hours = 0;
623        cfg.archive_retention_hours = 0;
624        cfg.detection_retention_hours = 168;
625        cfg.audit_retention_days = 365;
626        cfg
627    }
628
629    async fn insert_camera(pool: &SqlitePool, id: &str, retention_hours: i64, quota: Option<i64>) {
630        let now = Utc::now();
631        sqlx::query(
632            "INSERT INTO cameras (id, name, retention_hours, storage_quota_bytes, created_at, updated_at)
633             VALUES (?, ?, ?, ?, ?, ?)",
634        )
635        .bind(id)
636        .bind(id)
637        .bind(retention_hours)
638        .bind(quota)
639        .bind(now)
640        .bind(now)
641        .execute(pool)
642        .await
643        .unwrap();
644    }
645
646    async fn insert_segment(
647        pool: &SqlitePool,
648        id: &str,
649        camera_id: &str,
650        end: DateTime<Utc>,
651        size_bytes: i64,
652        locked: i64,
653        evidence_locked: i64,
654    ) {
655        sqlx::query(
656            "INSERT INTO segments
657                (id, camera_id, path, start_time, end_time, duration_s, size_bytes, locked, evidence_locked, created_at)
658             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
659        )
660        .bind(id)
661        .bind(camera_id)
662        // points at a file that does not exist -> unlink_segment hits the NotFound->true branch.
663        .bind(format!("/nonexistent/heldar-test/{id}.mp4"))
664        .bind(end)
665        .bind(end)
666        .bind(60.0_f64)
667        .bind(size_bytes)
668        .bind(locked)
669        .bind(evidence_locked)
670        .bind(end)
671        .execute(pool)
672        .await
673        .unwrap();
674    }
675
676    async fn seg_exists(pool: &SqlitePool, id: &str) -> bool {
677        let c: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM segments WHERE id = ?")
678            .bind(id)
679            .fetch_one(pool)
680            .await
681            .unwrap();
682        c == 1
683    }
684
685    async fn seg_count(pool: &SqlitePool) -> i64 {
686        sqlx::query_scalar("SELECT COUNT(*) FROM segments")
687            .fetch_one(pool)
688            .await
689            .unwrap()
690    }
691
692    async fn event_type_count(pool: &SqlitePool, event_type: &str) -> i64 {
693        sqlx::query_scalar("SELECT COUNT(*) FROM events WHERE event_type = ?")
694            .bind(event_type)
695            .fetch_one(pool)
696            .await
697            .unwrap()
698    }
699
700    async fn camera_quota_event_count(pool: &SqlitePool) -> i64 {
701        sqlx::query_scalar(
702            "SELECT COUNT(*) FROM events WHERE event_type = 'disk_pressure' AND payload LIKE '%camera_quota%'",
703        )
704        .fetch_one(pool)
705        .await
706        .unwrap()
707    }
708
709    // ----- unlink helpers ------------------------------------------------
710
711    #[tokio::test]
712    async fn unlink_segment_reports_removable_for_missing_path() {
713        // Already-absent file: the DB row should be removed (returns true).
714        assert!(unlink_segment("/nonexistent/heldar/definitely-not-here.mp4").await);
715    }
716
717    #[tokio::test]
718    async fn unlink_segment_deletes_existing_file() {
719        let p = unique_path("heldar-seg");
720        tokio::fs::write(&p, b"x").await.unwrap();
721        assert!(p.exists());
722        assert!(unlink_segment(p.to_str().unwrap()).await);
723        assert!(!p.exists());
724    }
725
726    #[tokio::test]
727    async fn unlink_segment_keeps_row_for_directory() {
728        // remove_file on a directory fails with a non-NotFound error -> keep the row (false).
729        let d = unique_path("heldar-dir");
730        tokio::fs::create_dir(&d).await.unwrap();
731        assert!(!unlink_segment(d.to_str().unwrap()).await);
732        assert!(d.exists());
733        let _ = tokio::fs::remove_dir(&d).await;
734    }
735
736    #[tokio::test]
737    async fn unlink_snapshot_handles_missing_and_existing() {
738        // Mirrors unlink_segment: missing -> true; existing -> deleted + true.
739        assert!(unlink_snapshot("/nonexistent/heldar/none.jpg").await);
740        let p = unique_path("heldar-snap");
741        tokio::fs::write(&p, b"x").await.unwrap();
742        assert!(unlink_snapshot(p.to_str().unwrap()).await);
743        assert!(!p.exists());
744    }
745
746    // ----- sweep: age retention -----------------------------------------
747
748    #[tokio::test]
749    async fn sweep_age_retention_deletes_only_old_unlocked() {
750        let pool = mem_pool().await;
751        let cfg = test_cfg();
752        let now = Utc::now();
753
754        insert_camera(&pool, "cam_age", 24, None).await;
755        // Recent unlocked segment: kept (newer than the 24h cutoff).
756        insert_segment(
757            &pool,
758            "seg_recent",
759            "cam_age",
760            now - chrono::Duration::hours(1),
761            100,
762            0,
763            0,
764        )
765        .await;
766        // Old unlocked segment: deleted by age policy.
767        insert_segment(
768            &pool,
769            "seg_old",
770            "cam_age",
771            now - chrono::Duration::hours(48),
772            100,
773            0,
774            0,
775        )
776        .await;
777        // Old but read-locked (transient export lock): excluded from age prune -> kept.
778        insert_segment(
779            &pool,
780            "seg_old_locked",
781            "cam_age",
782            now - chrono::Duration::hours(48),
783            100,
784            1,
785            0,
786        )
787        .await;
788        // Old but evidence-locked (durable hold): excluded from age prune -> kept.
789        insert_segment(
790            &pool,
791            "seg_old_ev",
792            "cam_age",
793            now - chrono::Duration::hours(48),
794            100,
795            0,
796            1,
797        )
798        .await;
799
800        sweep(&pool, &cfg).await.unwrap();
801
802        assert!(seg_exists(&pool, "seg_recent").await);
803        assert!(
804            !seg_exists(&pool, "seg_old").await,
805            "old unlocked segment should be pruned by age"
806        );
807        assert!(
808            seg_exists(&pool, "seg_old_locked").await,
809            "read-locked segment must survive age prune"
810        );
811        assert!(
812            seg_exists(&pool, "seg_old_ev").await,
813            "evidence-locked segment must survive age prune"
814        );
815        assert_eq!(seg_count(&pool).await, 3);
816        // age_deleted > 0 logs exactly one retention_delete event for the sweep.
817        assert_eq!(event_type_count(&pool, "retention_delete").await, 1);
818    }
819
820    // ----- sweep: per-camera quota --------------------------------------
821
822    #[tokio::test]
823    async fn sweep_camera_quota_prunes_only_to_budget_keeps_evidence() {
824        let pool = mem_pool().await;
825        let cfg = test_cfg();
826        let now = Utc::now();
827
828        // Huge retention so age policy never fires; only the quota acts here.
829        insert_camera(&pool, "cam_q", 100_000, Some(1000)).await;
830        // Protected (evidence-locked) footage counts against the quota but is never deleted.
831        insert_segment(
832            &pool,
833            "sL",
834            "cam_q",
835            now - chrono::Duration::hours(5),
836            600,
837            0,
838            1,
839        )
840        .await;
841        // Three deletable segments (total 1200) over the budget (quota 1000 - protected 600 = 400).
842        insert_segment(
843            &pool,
844            "s1",
845            "cam_q",
846            now - chrono::Duration::hours(3),
847            400,
848            0,
849            0,
850        )
851        .await;
852        insert_segment(
853            &pool,
854            "s2",
855            "cam_q",
856            now - chrono::Duration::hours(2),
857            400,
858            0,
859            0,
860        )
861        .await;
862        insert_segment(
863            &pool,
864            "s3",
865            "cam_q",
866            now - chrono::Duration::hours(1),
867            400,
868            0,
869            0,
870        )
871        .await;
872
873        sweep(&pool, &cfg).await.unwrap();
874
875        // Correctness invariant: prune ONLY enough oldest segments to reach budget (400), then stop.
876        // Deleting s1+s2 brings the deletable footprint to exactly 400 == budget, so s3 is within
877        // quota and MUST be kept; pruning it would needlessly destroy recoverable footage.
878        assert!(
879            seg_exists(&pool, "sL").await,
880            "evidence-locked footage must survive the quota prune"
881        );
882        assert!(
883            !seg_exists(&pool, "s1").await,
884            "oldest over-budget segment is pruned"
885        );
886        assert!(
887            !seg_exists(&pool, "s2").await,
888            "second-oldest pruned to reach budget"
889        );
890        assert!(
891            seg_exists(&pool, "s3").await,
892            "s3 is within quota once s1+s2 are gone and must NOT be over-deleted"
893        );
894        assert_eq!(
895            seg_count(&pool).await,
896            2,
897            "only s1,s2 pruned to reach budget; sL+s3 remain"
898        );
899        assert!(
900            camera_quota_event_count(&pool).await >= 1,
901            "a camera_quota disk_pressure event should be logged"
902        );
903    }
904
905    #[tokio::test]
906    async fn delete_segment_if_unlocked_spares_locked_rows() {
907        // The TOCTOU guard: the conditional DELETE must refuse a row that became evidence-locked
908        // (or read-locked) since it was selected for pruning, and remove an unlocked one. This is
909        // the atomic primitive that makes pruning safe against a hold committing mid-sweep.
910        let pool = mem_pool().await;
911        let now = Utc::now();
912        insert_camera(&pool, "cam_t", 100_000, None).await;
913        insert_segment(&pool, "held", "cam_t", now, 100, 0, 1).await; // evidence_locked = 1
914        insert_segment(&pool, "rlok", "cam_t", now, 100, 1, 0).await; // locked = 1 (export read-lock)
915        insert_segment(&pool, "free", "cam_t", now, 100, 0, 0).await; // deletable
916
917        assert!(
918            !delete_segment_if_unlocked(&pool, "held", "/nonexistent/held.mp4")
919                .await
920                .unwrap(),
921            "evidence-locked row must not be removable"
922        );
923        assert!(
924            !delete_segment_if_unlocked(&pool, "rlok", "/nonexistent/rlok.mp4")
925                .await
926                .unwrap(),
927            "read-locked row must not be removable"
928        );
929        assert!(seg_exists(&pool, "held").await);
930        assert!(seg_exists(&pool, "rlok").await);
931
932        assert!(
933            delete_segment_if_unlocked(&pool, "free", "/nonexistent/free.mp4")
934                .await
935                .unwrap(),
936            "unlocked row is removed"
937        );
938        assert!(!seg_exists(&pool, "free").await);
939    }
940
941    #[tokio::test]
942    async fn sweep_camera_quota_protected_exceeds_deletes_nothing() {
943        let pool = mem_pool().await;
944        let cfg = test_cfg();
945        let now = Utc::now();
946
947        // Protected footage alone (500) exceeds the quota (100): deleting other footage cannot help,
948        // so nothing is pruned and a warning is logged instead.
949        insert_camera(&pool, "cam_over", 100_000, Some(100)).await;
950        insert_segment(
951            &pool,
952            "ovL",
953            "cam_over",
954            now - chrono::Duration::hours(5),
955            500,
956            0,
957            1,
958        )
959        .await;
960        insert_segment(
961            &pool,
962            "ov1",
963            "cam_over",
964            now - chrono::Duration::hours(1),
965            50,
966            0,
967            0,
968        )
969        .await;
970
971        sweep(&pool, &cfg).await.unwrap();
972
973        assert!(seg_exists(&pool, "ovL").await);
974        assert!(
975            seg_exists(&pool, "ov1").await,
976            "other footage must not be wiped when protected footage exceeds the quota"
977        );
978        assert_eq!(seg_count(&pool).await, 2);
979        assert!(
980            camera_quota_event_count(&pool).await >= 1,
981            "a camera_quota warning should be logged"
982        );
983    }
984
985    // ----- sweep: detection pruning -------------------------------------
986
987    #[tokio::test]
988    async fn sweep_prunes_old_detections() {
989        let pool = mem_pool().await;
990        let cfg = test_cfg(); // detection_retention_hours = 168
991
992        insert_camera(&pool, "cam_d", 24, None).await;
993        let now = Utc::now();
994        // Older than the 168h TTL -> pruned.
995        sqlx::query(
996            "INSERT INTO detections (id, camera_id, task_type, timestamp, created_at) VALUES (?, ?, ?, ?, ?)",
997        )
998        .bind("det_old")
999        .bind("cam_d")
1000        .bind("object")
1001        .bind(now - chrono::Duration::hours(200))
1002        .bind(now - chrono::Duration::hours(200))
1003        .execute(&pool)
1004        .await
1005        .unwrap();
1006        // Recent -> kept.
1007        sqlx::query(
1008            "INSERT INTO detections (id, camera_id, task_type, timestamp, created_at) VALUES (?, ?, ?, ?, ?)",
1009        )
1010        .bind("det_new")
1011        .bind("cam_d")
1012        .bind("object")
1013        .bind(now - chrono::Duration::hours(1))
1014        .bind(now - chrono::Duration::hours(1))
1015        .execute(&pool)
1016        .await
1017        .unwrap();
1018
1019        sweep(&pool, &cfg).await.unwrap();
1020
1021        let remaining: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM detections")
1022            .fetch_one(&pool)
1023            .await
1024            .unwrap();
1025        assert_eq!(remaining, 1);
1026        let kept: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM detections WHERE id = ?")
1027            .bind("det_new")
1028            .fetch_one(&pool)
1029            .await
1030            .unwrap();
1031        assert_eq!(kept, 1, "the recent detection must be retained");
1032    }
1033}