Skip to main content

heldar_kernel/services/
retention.rs

1//! Retention sweeper: deletes recordings past each camera's age policy, and enforces a global
2//! size cap by pruning the oldest deletable segments. Segments under a durable evidence hold
3//! (`evidence_locked = 1`) are never deleted, and a segment with a transient export read-lock
4//! (`locked = 1`) is skipped while the export is in flight. Both are excluded from every prune.
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use chrono::{DateTime, Utc};
10use serde_json::json;
11use sqlx::SqlitePool;
12
13use crate::config::Config;
14use crate::repo;
15use crate::services::storage;
16
17/// Delete a segment's file and report whether its DB row should now be removed. The row is removed
18/// only when the file is actually gone — deleted just now, or already absent (`NotFound`). If the
19/// delete fails for any other reason (permissions, I/O error), we keep the DB row so the file is not
20/// orphaned-yet-forgotten: the next sweep retries it, and the size/disk accounting stays truthful.
21async fn unlink_segment(path: &str) -> bool {
22    match tokio::fs::remove_file(path).await {
23        Ok(()) => true,
24        Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
25        Err(e) => {
26            tracing::error!(path, error = %e, "retention: failed to delete segment file; keeping DB row to retry next sweep");
27            false
28        }
29    }
30}
31
32/// Delete a snapshot's file and report whether its DB row should now be removed. Mirrors
33/// [`unlink_segment`]: the row is removed only when the file is actually gone (deleted just now or
34/// already absent); on any other delete error we keep the row so the next sweep retries.
35async fn unlink_snapshot(path: &str) -> bool {
36    match tokio::fs::remove_file(path).await {
37        Ok(()) => true,
38        Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
39        Err(e) => {
40            tracing::error!(path, error = %e, "retention: failed to delete snapshot file; keeping DB row to retry next sweep");
41            false
42        }
43    }
44}
45
46pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
47    let mut tick = tokio::time::interval(Duration::from_secs(cfg.retention_interval_s.max(30)));
48    loop {
49        tick.tick().await;
50        if let Err(e) = sweep(&pool, &cfg).await {
51            tracing::error!(error = %e, "retention: sweep failed");
52        }
53    }
54}
55
56async fn sweep(pool: &SqlitePool, cfg: &Config) -> anyhow::Result<()> {
57    // 1) Age-based retention, per-camera.
58    let mut age_deleted: u64 = 0;
59    let cams: Vec<(String, i64)> = sqlx::query_as("SELECT id, retention_hours FROM cameras")
60        .fetch_all(pool)
61        .await?;
62    for (id, hours) in cams {
63        let cutoff = Utc::now() - chrono::Duration::hours(hours.max(1));
64        let rows: Vec<(String, String)> = sqlx::query_as(
65            "SELECT id, path FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0 AND end_time < ?",
66        )
67        .bind(&id)
68        .bind(cutoff)
69        .fetch_all(pool)
70        .await?;
71        for (seg_id, path) in rows {
72            if unlink_segment(&path).await {
73                sqlx::query("DELETE FROM segments WHERE id = ?")
74                    .bind(&seg_id)
75                    .execute(pool)
76                    .await?;
77                age_deleted += 1;
78            }
79        }
80    }
81    if age_deleted > 0 {
82        let _ = repo::log_event(
83            pool,
84            None,
85            "retention_delete",
86            "info",
87            json!({ "deleted": age_deleted, "reason": "age" }),
88        )
89        .await;
90        tracing::info!(deleted = age_deleted, "retention: age-based cleanup");
91    }
92
93    // 2) Per-camera storage quota. Mirrors the global size cap (step 3) but scoped to one camera:
94    //    keep each capped camera's deletable footprint within its quota by pruning its oldest
95    //    unlocked segments. Evidence-locked footage (`evidence_locked = 1`) is protected and counts
96    //    against the quota; if it alone meets or exceeds the quota, we warn and delete nothing rather
97    //    than wiping the camera's other footage. Only cameras with `storage_quota_bytes IS NOT NULL`
98    //    are capped here; the rest are governed solely by the global cap + disk floor below.
99    let mut quota_deleted: u64 = 0;
100    let quota_cams: Vec<(String, i64)> = sqlx::query_as(
101        "SELECT id, storage_quota_bytes FROM cameras WHERE storage_quota_bytes IS NOT NULL",
102    )
103    .fetch_all(pool)
104    .await?;
105    for (cam_id, quota) in quota_cams {
106        if quota <= 0 {
107            continue;
108        }
109        let protected_bytes: i64 = sqlx::query_scalar(
110            "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE camera_id = ? AND evidence_locked = 1",
111        )
112        .bind(&cam_id)
113        .fetch_one(pool)
114        .await?;
115        let budget = quota - protected_bytes;
116        if budget <= 0 {
117            if protected_bytes > quota {
118                tracing::warn!(
119                    camera_id = %cam_id,
120                    protected_bytes,
121                    quota,
122                    "retention: evidence-locked footage exceeds the camera quota; not deleting other footage"
123                );
124                let _ = repo::log_event(
125                    pool,
126                    Some(&cam_id),
127                    "disk_pressure",
128                    "warning",
129                    json!({ "reason": "camera_quota", "camera_id": &cam_id, "protected_bytes": protected_bytes, "quota_bytes": quota }),
130                )
131                .await;
132            }
133            continue;
134        }
135        loop {
136            let deletable_total: i64 = sqlx::query_scalar(
137                "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0",
138            )
139            .bind(&cam_id)
140            .fetch_one(pool)
141            .await?;
142            if deletable_total <= budget {
143                break;
144            }
145            let batch: Vec<(String, String)> = sqlx::query_as(
146                "SELECT id, path FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
147            )
148            .bind(&cam_id)
149            .fetch_all(pool)
150            .await?;
151            if batch.is_empty() {
152                break;
153            }
154            let mut progressed = 0u64;
155            for (seg_id, path) in batch {
156                if unlink_segment(&path).await {
157                    sqlx::query("DELETE FROM segments WHERE id = ?")
158                        .bind(&seg_id)
159                        .execute(pool)
160                        .await?;
161                    quota_deleted += 1;
162                    progressed += 1;
163                }
164            }
165            if progressed == 0 {
166                tracing::error!(camera_id = %cam_id, "retention: camera-quota prune made no progress (segment file deletes failing); stopping this camera");
167                break;
168            }
169        }
170    }
171    if quota_deleted > 0 {
172        let _ = repo::log_event(
173            pool,
174            None,
175            "disk_pressure",
176            "warning",
177            json!({ "deleted": quota_deleted, "reason": "camera_quota" }),
178        )
179        .await;
180        tracing::warn!(
181            deleted = quota_deleted,
182            "retention: per-camera quota cleanup"
183        );
184    }
185
186    // 3) Global size cap: prune the oldest DELETABLE segments until the deletable footprint fits the
187    //    budget. The budget is the cap minus the evidence-locked bytes we cannot delete — counting
188    //    those in the comparison would otherwise make us delete every deletable segment. We measure
189    //    the protected footprint by `evidence_locked = 1` (the DURABLE hold), not the transient
190    //    `locked` read-lock: an in-flight export must not inflate the protected total and starve the
191    //    cap. Deletable = `locked = 0 AND evidence_locked = 0` (skip both the read-lock and the hold).
192    let max = cfg.max_recordings_bytes as i64;
193    let protected_bytes: i64 = sqlx::query_scalar(
194        "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE evidence_locked = 1",
195    )
196    .fetch_one(pool)
197    .await?;
198    let budget = max - protected_bytes;
199    let mut size_deleted: u64 = 0;
200
201    if budget <= 0 {
202        // Evidence-locked footage alone meets or exceeds the cap; deleting other footage cannot
203        // help. Warn instead of wiping everything.
204        let unlocked: i64 = sqlx::query_scalar(
205            "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0 AND evidence_locked = 0",
206        )
207        .fetch_one(pool)
208        .await?;
209        if protected_bytes > max {
210            tracing::warn!(
211                protected_bytes,
212                max,
213                "retention: evidence-locked footage exceeds the size cap; not deleting other footage"
214            );
215            let _ = repo::log_event(
216                pool,
217                None,
218                "disk_pressure",
219                "warning",
220                json!({ "reason": "locked_exceeds_cap", "protected_bytes": protected_bytes, "unlocked_bytes": unlocked, "max_bytes": max }),
221            )
222            .await;
223        }
224    } else {
225        loop {
226            let unlocked_total: i64 = sqlx::query_scalar(
227                "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0 AND evidence_locked = 0",
228            )
229            .fetch_one(pool)
230            .await?;
231            if unlocked_total <= budget {
232                break;
233            }
234            let batch: Vec<(String, String)> = sqlx::query_as(
235                "SELECT id, path FROM segments WHERE locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
236            )
237            .fetch_all(pool)
238            .await?;
239            if batch.is_empty() {
240                break;
241            }
242            let mut progressed = 0u64;
243            for (seg_id, path) in batch {
244                if unlink_segment(&path).await {
245                    sqlx::query("DELETE FROM segments WHERE id = ?")
246                        .bind(&seg_id)
247                        .execute(pool)
248                        .await?;
249                    size_deleted += 1;
250                    progressed += 1;
251                }
252            }
253            if progressed == 0 {
254                // Every file in the batch failed to delete; we'd re-select the same rows forever.
255                tracing::error!("retention: size-cap prune made no progress (segment file deletes failing); stopping this sweep");
256                break;
257            }
258        }
259    }
260
261    if size_deleted > 0 {
262        let _ = repo::log_event(
263            pool,
264            None,
265            "disk_pressure",
266            "warning",
267            json!({ "deleted": size_deleted, "reason": "size_cap", "max_bytes": max }),
268        )
269        .await;
270        tracing::warn!(deleted = size_deleted, "retention: size-cap cleanup");
271    }
272
273    // 4) Disk-free floor: if the recordings filesystem drops below the free-space floor, prune the
274    //    oldest unlocked segments until back above it. Self-limiting: it stops if a delete batch
275    //    does not actually recover free space (disk filled by non-recording data), and refuses to
276    //    run if the floor exceeds the whole disk — so it never destroys the footprint for nothing.
277    let floor = cfg.min_free_disk_bytes;
278    let mut disk_deleted: u64 = 0;
279    match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
280        None => {
281            tracing::warn!(
282                "retention: could not read disk stats; free-floor check skipped this sweep"
283            );
284            let _ = repo::log_event(
285                pool,
286                None,
287                "disk_pressure",
288                "warning",
289                json!({ "reason": "disk_stats_unavailable" }),
290            )
291            .await;
292        }
293        Some(d) if floor >= d.total_bytes => {
294            if d.free_bytes < floor {
295                tracing::warn!(
296                    floor,
297                    total = d.total_bytes,
298                    "retention: free-disk floor exceeds total disk size; refusing to prune (misconfigured?)"
299                );
300                let _ = repo::log_event(
301                    pool,
302                    None,
303                    "disk_pressure",
304                    "critical",
305                    json!({ "reason": "floor_unsatisfiable", "min_free_bytes": floor, "total_bytes": d.total_bytes }),
306                )
307                .await;
308            }
309        }
310        Some(mut prev) => {
311            let mut guard = 0;
312            let mut futile = false;
313            while prev.free_bytes < floor && guard < 200 {
314                guard += 1;
315                let before = prev.free_bytes;
316                let batch: Vec<(String, String)> = sqlx::query_as(
317                    "SELECT id, path FROM segments WHERE locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
318                )
319                .fetch_all(pool)
320                .await?;
321                if batch.is_empty() {
322                    tracing::warn!(
323                        free_bytes = before,
324                        floor,
325                        "retention: below disk-free floor but no deletable segments remain to prune"
326                    );
327                    break;
328                }
329                for (seg_id, path) in batch {
330                    if unlink_segment(&path).await {
331                        sqlx::query("DELETE FROM segments WHERE id = ?")
332                            .bind(&seg_id)
333                            .execute(pool)
334                            .await?;
335                        disk_deleted += 1;
336                    }
337                }
338                match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
339                    Some(d) if d.free_bytes > before => prev = d,
340                    Some(_) => {
341                        futile = true;
342                        break;
343                    }
344                    None => break,
345                }
346            }
347            if futile {
348                tracing::error!(
349                    free_bytes = prev.free_bytes,
350                    floor,
351                    "retention: pruning recordings is not recovering free space (disk filled by non-recording data?); stopping"
352                );
353                let _ = repo::log_event(
354                    pool,
355                    None,
356                    "disk_pressure",
357                    "critical",
358                    json!({ "reason": "prune_not_recovering_space", "min_free_bytes": floor, "deleted": disk_deleted }),
359                )
360                .await;
361            }
362        }
363    }
364    if disk_deleted > 0 {
365        let _ = repo::log_event(
366            pool,
367            None,
368            "disk_pressure",
369            "critical",
370            json!({ "deleted": disk_deleted, "reason": "free_floor", "min_free_bytes": floor }),
371        )
372        .await;
373        tracing::warn!(deleted = disk_deleted, "retention: disk-free-floor cleanup");
374    }
375
376    // 5) Prune old AI detections (the table grows unbounded otherwise).
377    let det_cutoff = Utc::now() - chrono::Duration::hours(cfg.detection_retention_hours.max(1));
378    let pruned = sqlx::query("DELETE FROM detections WHERE created_at < ?")
379        .bind(det_cutoff)
380        .execute(pool)
381        .await?
382        .rows_affected();
383    if pruned > 0 {
384        tracing::info!(deleted = pruned, "retention: pruned old detections");
385    }
386    // Prune the transactional outbox on the same TTL (until an edge→cloud relay acks + prunes by seq).
387    let ob_pruned = sqlx::query("DELETE FROM outbox WHERE created_at < ?")
388        .bind(det_cutoff)
389        .execute(pool)
390        .await?
391        .rows_affected();
392    if ob_pruned > 0 {
393        tracing::info!(deleted = ob_pruned, "retention: pruned old outbox rows");
394    }
395
396    // 6) Prune old zone events and delete their evidence frames (same TTL as detections).
397    let old_zone_events: Vec<(String, Option<String>)> =
398        sqlx::query_as("SELECT id, evidence_path FROM zone_events WHERE created_at < ?")
399            .bind(det_cutoff)
400            .fetch_all(pool)
401            .await?;
402    if !old_zone_events.is_empty() {
403        for (_id, evidence) in &old_zone_events {
404            if let Some(name) = evidence.as_deref().and_then(|u| u.rsplit('/').next()) {
405                let _ = tokio::fs::remove_file(cfg.snapshots_dir.join(name)).await;
406            }
407        }
408        let zpruned = sqlx::query("DELETE FROM zone_events WHERE created_at < ?")
409            .bind(det_cutoff)
410            .execute(pool)
411            .await?
412            .rows_affected();
413        tracing::info!(
414            deleted = zpruned,
415            "retention: pruned old zone events + evidence"
416        );
417    }
418
419    // 7) Prune kernel auth bookkeeping: stale audit log + expired sessions. (Domain entry events +
420    //    their evidence frames are pruned by the entry app's own retention loop, not the kernel.)
421    let audit_cutoff = Utc::now() - chrono::Duration::days(cfg.audit_retention_days.max(1));
422    let apruned = sqlx::query("DELETE FROM audit_log WHERE created_at < ?")
423        .bind(audit_cutoff)
424        .execute(pool)
425        .await?
426        .rows_affected();
427    if apruned > 0 {
428        tracing::info!(deleted = apruned, "retention: pruned old audit log entries");
429    }
430    let spruned = sqlx::query("DELETE FROM sessions WHERE expires_at < ?")
431        .bind(Utc::now())
432        .execute(pool)
433        .await?
434        .rows_affected();
435    if spruned > 0 {
436        tracing::debug!(deleted = spruned, "retention: pruned expired sessions");
437    }
438
439    // 8) Prune the generic event log (camera-status events, disk-pressure warnings, and the entry
440    //    mirrors written by the ANPR engine). It is otherwise unbounded. The alert notifier advances
441    //    a durable cursor over recent rows, so deleting rows older than the (long) entry TTL — which
442    //    are far past delivery — is safe.
443    let evpruned = sqlx::query("DELETE FROM events WHERE created_at < ?")
444        .bind(audit_cutoff)
445        .execute(pool)
446        .await?
447        .rows_affected();
448    if evpruned > 0 {
449        tracing::info!(deleted = evpruned, "retention: pruned old event-log rows");
450    }
451
452    // 9) Prune scheduled snapshots past their retention window. The cutoff is `taken_at` (capture
453    //    time, not the row's `created_at`). Delete the file first; only drop the DB row when the
454    //    file is gone (mirrors the segment unlink pattern). Skipped entirely when hours = 0.
455    if cfg.snapshot_retention_hours > 0 {
456        let snap_cutoff = Utc::now() - chrono::Duration::hours(cfg.snapshot_retention_hours);
457        let rows: Vec<(String, String)> =
458            sqlx::query_as("SELECT id, path FROM snapshots WHERE taken_at < ?")
459                .bind(snap_cutoff)
460                .fetch_all(pool)
461                .await?;
462        let mut snap_deleted: u64 = 0;
463        for (snap_id, path) in rows {
464            if unlink_snapshot(&path).await {
465                sqlx::query("DELETE FROM snapshots WHERE id = ?")
466                    .bind(&snap_id)
467                    .execute(pool)
468                    .await?;
469                snap_deleted += 1;
470            }
471        }
472        if snap_deleted > 0 {
473            tracing::info!(deleted = snap_deleted, "retention: pruned old snapshots");
474        }
475    }
476
477    // 10) Prune on-demand archive exports + finished backup-job rows past the archive retention
478    //     window. Delete the .zip files by mtime, then drop any backup_jobs that have finished before
479    //     the cutoff (both policy runs and archive exports). Skipped entirely when hours = 0.
480    if cfg.archive_retention_hours > 0 {
481        let cutoff = Utc::now() - chrono::Duration::hours(cfg.archive_retention_hours);
482        if let Ok(mut entries) = tokio::fs::read_dir(&cfg.archive_dir).await {
483            let mut removed: u64 = 0;
484            while let Ok(Some(ent)) = entries.next_entry().await {
485                let path = ent.path();
486                if path.extension().and_then(|e| e.to_str()) != Some("zip") {
487                    continue;
488                }
489                let stale = ent
490                    .metadata()
491                    .await
492                    .ok()
493                    .and_then(|m| m.modified().ok())
494                    .map(|t| DateTime::<Utc>::from(t) < cutoff)
495                    .unwrap_or(false);
496                if stale && tokio::fs::remove_file(&path).await.is_ok() {
497                    removed += 1;
498                }
499            }
500            if removed > 0 {
501                tracing::info!(deleted = removed, "retention: pruned old archive exports");
502            }
503        }
504        let jpruned = sqlx::query(
505            "DELETE FROM backup_jobs WHERE finished_at IS NOT NULL AND finished_at < ?",
506        )
507        .bind(cutoff)
508        .execute(pool)
509        .await?
510        .rows_affected();
511        if jpruned > 0 {
512            tracing::info!(
513                deleted = jpruned,
514                "retention: pruned old finished backup jobs"
515            );
516        }
517    }
518    Ok(())
519}