Skip to main content

heldar_kernel/services/
retention.rs

1//! Retention sweeper: deletes recordings past each camera's age policy, and enforces a global
2//! size cap by pruning the oldest unlocked segments. Locked (evidence) segments are never deleted.
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use chrono::Utc;
8use serde_json::json;
9use sqlx::SqlitePool;
10
11use crate::config::Config;
12use crate::repo;
13use crate::services::storage;
14
15/// Delete a segment's file and report whether its DB row should now be removed. The row is removed
16/// only when the file is actually gone — deleted just now, or already absent (`NotFound`). If the
17/// delete fails for any other reason (permissions, I/O error), we keep the DB row so the file is not
18/// orphaned-yet-forgotten: the next sweep retries it, and the size/disk accounting stays truthful.
19async fn unlink_segment(path: &str) -> bool {
20    match tokio::fs::remove_file(path).await {
21        Ok(()) => true,
22        Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
23        Err(e) => {
24            tracing::error!(path, error = %e, "retention: failed to delete segment file; keeping DB row to retry next sweep");
25            false
26        }
27    }
28}
29
30pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
31    let mut tick = tokio::time::interval(Duration::from_secs(cfg.retention_interval_s.max(30)));
32    loop {
33        tick.tick().await;
34        if let Err(e) = sweep(&pool, &cfg).await {
35            tracing::error!(error = %e, "retention: sweep failed");
36        }
37    }
38}
39
40async fn sweep(pool: &SqlitePool, cfg: &Config) -> anyhow::Result<()> {
41    // 1) Age-based retention, per-camera.
42    let mut age_deleted: u64 = 0;
43    let cams: Vec<(String, i64)> = sqlx::query_as("SELECT id, retention_hours FROM cameras")
44        .fetch_all(pool)
45        .await?;
46    for (id, hours) in cams {
47        let cutoff = Utc::now() - chrono::Duration::hours(hours.max(1));
48        let rows: Vec<(String, String)> = sqlx::query_as(
49            "SELECT id, path FROM segments WHERE camera_id = ? AND locked = 0 AND end_time < ?",
50        )
51        .bind(&id)
52        .bind(cutoff)
53        .fetch_all(pool)
54        .await?;
55        for (seg_id, path) in rows {
56            if unlink_segment(&path).await {
57                sqlx::query("DELETE FROM segments WHERE id = ?")
58                    .bind(&seg_id)
59                    .execute(pool)
60                    .await?;
61                age_deleted += 1;
62            }
63        }
64    }
65    if age_deleted > 0 {
66        let _ = repo::log_event(
67            pool,
68            None,
69            "retention_delete",
70            "info",
71            json!({ "deleted": age_deleted, "reason": "age" }),
72        )
73        .await;
74        tracing::info!(deleted = age_deleted, "retention: age-based cleanup");
75    }
76
77    // 2) Global size cap: prune the oldest UNLOCKED segments until the deletable footprint fits the
78    //    budget. The budget is the cap minus the locked (evidence) bytes we cannot delete — counting
79    //    locked bytes in the comparison would otherwise make us delete every unlocked segment.
80    let max = cfg.max_recordings_bytes as i64;
81    let locked_bytes: i64 =
82        sqlx::query_scalar("SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 1")
83            .fetch_one(pool)
84            .await?;
85    let budget = max - locked_bytes;
86    let mut size_deleted: u64 = 0;
87
88    if budget <= 0 {
89        // Locked/evidence footage alone meets or exceeds the cap; deleting unlocked footage cannot
90        // help. Warn instead of wiping everything.
91        let unlocked: i64 = sqlx::query_scalar(
92            "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0",
93        )
94        .fetch_one(pool)
95        .await?;
96        if locked_bytes > max {
97            tracing::warn!(
98                locked_bytes,
99                max,
100                "retention: locked (evidence) footage exceeds the size cap; not deleting unlocked footage"
101            );
102            let _ = repo::log_event(
103                pool,
104                None,
105                "disk_pressure",
106                "warning",
107                json!({ "reason": "locked_exceeds_cap", "locked_bytes": locked_bytes, "unlocked_bytes": unlocked, "max_bytes": max }),
108            )
109            .await;
110        }
111    } else {
112        loop {
113            let unlocked_total: i64 = sqlx::query_scalar(
114                "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0",
115            )
116            .fetch_one(pool)
117            .await?;
118            if unlocked_total <= budget {
119                break;
120            }
121            let batch: Vec<(String, String)> = sqlx::query_as(
122                "SELECT id, path FROM segments WHERE locked = 0 ORDER BY end_time ASC LIMIT 20",
123            )
124            .fetch_all(pool)
125            .await?;
126            if batch.is_empty() {
127                break;
128            }
129            let mut progressed = 0u64;
130            for (seg_id, path) in batch {
131                if unlink_segment(&path).await {
132                    sqlx::query("DELETE FROM segments WHERE id = ?")
133                        .bind(&seg_id)
134                        .execute(pool)
135                        .await?;
136                    size_deleted += 1;
137                    progressed += 1;
138                }
139            }
140            if progressed == 0 {
141                // Every file in the batch failed to delete; we'd re-select the same rows forever.
142                tracing::error!("retention: size-cap prune made no progress (segment file deletes failing); stopping this sweep");
143                break;
144            }
145        }
146    }
147
148    if size_deleted > 0 {
149        let _ = repo::log_event(
150            pool,
151            None,
152            "disk_pressure",
153            "warning",
154            json!({ "deleted": size_deleted, "reason": "size_cap", "max_bytes": max }),
155        )
156        .await;
157        tracing::warn!(deleted = size_deleted, "retention: size-cap cleanup");
158    }
159
160    // 3) Disk-free floor: if the recordings filesystem drops below the free-space floor, prune the
161    //    oldest unlocked segments until back above it. Self-limiting: it stops if a delete batch
162    //    does not actually recover free space (disk filled by non-recording data), and refuses to
163    //    run if the floor exceeds the whole disk — so it never destroys the footprint for nothing.
164    let floor = cfg.min_free_disk_bytes;
165    let mut disk_deleted: u64 = 0;
166    match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
167        None => {
168            tracing::warn!(
169                "retention: could not read disk stats; free-floor check skipped this sweep"
170            );
171            let _ = repo::log_event(
172                pool,
173                None,
174                "disk_pressure",
175                "warning",
176                json!({ "reason": "disk_stats_unavailable" }),
177            )
178            .await;
179        }
180        Some(d) if floor >= d.total_bytes => {
181            if d.free_bytes < floor {
182                tracing::warn!(
183                    floor,
184                    total = d.total_bytes,
185                    "retention: free-disk floor exceeds total disk size; refusing to prune (misconfigured?)"
186                );
187                let _ = repo::log_event(
188                    pool,
189                    None,
190                    "disk_pressure",
191                    "critical",
192                    json!({ "reason": "floor_unsatisfiable", "min_free_bytes": floor, "total_bytes": d.total_bytes }),
193                )
194                .await;
195            }
196        }
197        Some(mut prev) => {
198            let mut guard = 0;
199            let mut futile = false;
200            while prev.free_bytes < floor && guard < 200 {
201                guard += 1;
202                let before = prev.free_bytes;
203                let batch: Vec<(String, String)> = sqlx::query_as(
204                    "SELECT id, path FROM segments WHERE locked = 0 ORDER BY end_time ASC LIMIT 20",
205                )
206                .fetch_all(pool)
207                .await?;
208                if batch.is_empty() {
209                    tracing::warn!(
210                        free_bytes = before,
211                        floor,
212                        "retention: below disk-free floor but no unlocked segments remain to prune"
213                    );
214                    break;
215                }
216                for (seg_id, path) in batch {
217                    if unlink_segment(&path).await {
218                        sqlx::query("DELETE FROM segments WHERE id = ?")
219                            .bind(&seg_id)
220                            .execute(pool)
221                            .await?;
222                        disk_deleted += 1;
223                    }
224                }
225                match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
226                    Some(d) if d.free_bytes > before => prev = d,
227                    Some(_) => {
228                        futile = true;
229                        break;
230                    }
231                    None => break,
232                }
233            }
234            if futile {
235                tracing::error!(
236                    free_bytes = prev.free_bytes,
237                    floor,
238                    "retention: pruning recordings is not recovering free space (disk filled by non-recording data?); stopping"
239                );
240                let _ = repo::log_event(
241                    pool,
242                    None,
243                    "disk_pressure",
244                    "critical",
245                    json!({ "reason": "prune_not_recovering_space", "min_free_bytes": floor, "deleted": disk_deleted }),
246                )
247                .await;
248            }
249        }
250    }
251    if disk_deleted > 0 {
252        let _ = repo::log_event(
253            pool,
254            None,
255            "disk_pressure",
256            "critical",
257            json!({ "deleted": disk_deleted, "reason": "free_floor", "min_free_bytes": floor }),
258        )
259        .await;
260        tracing::warn!(deleted = disk_deleted, "retention: disk-free-floor cleanup");
261    }
262
263    // 4) Prune old AI detections (the table grows unbounded otherwise).
264    let det_cutoff = Utc::now() - chrono::Duration::hours(cfg.detection_retention_hours.max(1));
265    let pruned = sqlx::query("DELETE FROM detections WHERE created_at < ?")
266        .bind(det_cutoff)
267        .execute(pool)
268        .await?
269        .rows_affected();
270    if pruned > 0 {
271        tracing::info!(deleted = pruned, "retention: pruned old detections");
272    }
273    // Prune the transactional outbox on the same TTL (until an edge→cloud relay acks + prunes by seq).
274    let ob_pruned = sqlx::query("DELETE FROM outbox WHERE created_at < ?")
275        .bind(det_cutoff)
276        .execute(pool)
277        .await?
278        .rows_affected();
279    if ob_pruned > 0 {
280        tracing::info!(deleted = ob_pruned, "retention: pruned old outbox rows");
281    }
282
283    // 5) Prune old zone events and delete their evidence frames (same TTL as detections).
284    let old_zone_events: Vec<(String, Option<String>)> =
285        sqlx::query_as("SELECT id, evidence_path FROM zone_events WHERE created_at < ?")
286            .bind(det_cutoff)
287            .fetch_all(pool)
288            .await?;
289    if !old_zone_events.is_empty() {
290        for (_id, evidence) in &old_zone_events {
291            if let Some(name) = evidence.as_deref().and_then(|u| u.rsplit('/').next()) {
292                let _ = tokio::fs::remove_file(cfg.snapshots_dir.join(name)).await;
293            }
294        }
295        let zpruned = sqlx::query("DELETE FROM zone_events WHERE created_at < ?")
296            .bind(det_cutoff)
297            .execute(pool)
298            .await?
299            .rows_affected();
300        tracing::info!(
301            deleted = zpruned,
302            "retention: pruned old zone events + evidence"
303        );
304    }
305
306    // 6) Prune kernel auth bookkeeping: stale audit log + expired sessions. (Domain entry events +
307    //    their evidence frames are pruned by the entry app's own retention loop, not the kernel.)
308    let audit_cutoff = Utc::now() - chrono::Duration::days(cfg.audit_retention_days.max(1));
309    let apruned = sqlx::query("DELETE FROM audit_log WHERE created_at < ?")
310        .bind(audit_cutoff)
311        .execute(pool)
312        .await?
313        .rows_affected();
314    if apruned > 0 {
315        tracing::info!(deleted = apruned, "retention: pruned old audit log entries");
316    }
317    let spruned = sqlx::query("DELETE FROM sessions WHERE expires_at < ?")
318        .bind(Utc::now())
319        .execute(pool)
320        .await?
321        .rows_affected();
322    if spruned > 0 {
323        tracing::debug!(deleted = spruned, "retention: pruned expired sessions");
324    }
325
326    // 7) Prune the generic event log (camera-status events, disk-pressure warnings, and the entry
327    //    mirrors written by the ANPR engine). It is otherwise unbounded. The alert notifier advances
328    //    a durable cursor over recent rows, so deleting rows older than the (long) entry TTL — which
329    //    are far past delivery — is safe.
330    let evpruned = sqlx::query("DELETE FROM events WHERE created_at < ?")
331        .bind(audit_cutoff)
332        .execute(pool)
333        .await?
334        .rows_affected();
335    if evpruned > 0 {
336        tracing::info!(deleted = evpruned, "retention: pruned old event-log rows");
337    }
338    Ok(())
339}