1use std::sync::Arc;
7use std::time::Duration;
8
9use chrono::{DateTime, Utc};
10use serde_json::json;
11use sqlx::SqlitePool;
12
13use crate::config::Config;
14use crate::repo;
15use crate::services::storage;
16
17async fn unlink_segment(path: &str) -> bool {
22 match tokio::fs::remove_file(path).await {
23 Ok(()) => true,
24 Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
25 Err(e) => {
26 tracing::error!(path, error = %e, "retention: failed to delete segment file; keeping DB row to retry next sweep");
27 false
28 }
29 }
30}
31
32async fn unlink_snapshot(path: &str) -> bool {
36 match tokio::fs::remove_file(path).await {
37 Ok(()) => true,
38 Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
39 Err(e) => {
40 tracing::error!(path, error = %e, "retention: failed to delete snapshot file; keeping DB row to retry next sweep");
41 false
42 }
43 }
44}
45
46pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
47 let mut tick = tokio::time::interval(Duration::from_secs(cfg.retention_interval_s.max(30)));
48 loop {
49 tick.tick().await;
50 if let Err(e) = sweep(&pool, &cfg).await {
51 tracing::error!(error = %e, "retention: sweep failed");
52 }
53 }
54}
55
56async fn sweep(pool: &SqlitePool, cfg: &Config) -> anyhow::Result<()> {
57 let mut age_deleted: u64 = 0;
59 let cams: Vec<(String, i64)> = sqlx::query_as("SELECT id, retention_hours FROM cameras")
60 .fetch_all(pool)
61 .await?;
62 for (id, hours) in cams {
63 let cutoff = Utc::now() - chrono::Duration::hours(hours.max(1));
64 let rows: Vec<(String, String)> = sqlx::query_as(
65 "SELECT id, path FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0 AND end_time < ?",
66 )
67 .bind(&id)
68 .bind(cutoff)
69 .fetch_all(pool)
70 .await?;
71 for (seg_id, path) in rows {
72 if unlink_segment(&path).await {
73 sqlx::query("DELETE FROM segments WHERE id = ?")
74 .bind(&seg_id)
75 .execute(pool)
76 .await?;
77 age_deleted += 1;
78 }
79 }
80 }
81 if age_deleted > 0 {
82 let _ = repo::log_event(
83 pool,
84 None,
85 "retention_delete",
86 "info",
87 json!({ "deleted": age_deleted, "reason": "age" }),
88 )
89 .await;
90 tracing::info!(deleted = age_deleted, "retention: age-based cleanup");
91 }
92
93 let mut quota_deleted: u64 = 0;
100 let quota_cams: Vec<(String, i64)> = sqlx::query_as(
101 "SELECT id, storage_quota_bytes FROM cameras WHERE storage_quota_bytes IS NOT NULL",
102 )
103 .fetch_all(pool)
104 .await?;
105 for (cam_id, quota) in quota_cams {
106 if quota <= 0 {
107 continue;
108 }
109 let protected_bytes: i64 = sqlx::query_scalar(
110 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE camera_id = ? AND evidence_locked = 1",
111 )
112 .bind(&cam_id)
113 .fetch_one(pool)
114 .await?;
115 let budget = quota - protected_bytes;
116 if budget <= 0 {
117 if protected_bytes > quota {
118 tracing::warn!(
119 camera_id = %cam_id,
120 protected_bytes,
121 quota,
122 "retention: evidence-locked footage exceeds the camera quota; not deleting other footage"
123 );
124 let _ = repo::log_event(
125 pool,
126 Some(&cam_id),
127 "disk_pressure",
128 "warning",
129 json!({ "reason": "camera_quota", "camera_id": &cam_id, "protected_bytes": protected_bytes, "quota_bytes": quota }),
130 )
131 .await;
132 }
133 continue;
134 }
135 loop {
136 let deletable_total: i64 = sqlx::query_scalar(
137 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0",
138 )
139 .bind(&cam_id)
140 .fetch_one(pool)
141 .await?;
142 if deletable_total <= budget {
143 break;
144 }
145 let batch: Vec<(String, String)> = sqlx::query_as(
146 "SELECT id, path FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
147 )
148 .bind(&cam_id)
149 .fetch_all(pool)
150 .await?;
151 if batch.is_empty() {
152 break;
153 }
154 let mut progressed = 0u64;
155 for (seg_id, path) in batch {
156 if unlink_segment(&path).await {
157 sqlx::query("DELETE FROM segments WHERE id = ?")
158 .bind(&seg_id)
159 .execute(pool)
160 .await?;
161 quota_deleted += 1;
162 progressed += 1;
163 }
164 }
165 if progressed == 0 {
166 tracing::error!(camera_id = %cam_id, "retention: camera-quota prune made no progress (segment file deletes failing); stopping this camera");
167 break;
168 }
169 }
170 }
171 if quota_deleted > 0 {
172 let _ = repo::log_event(
173 pool,
174 None,
175 "disk_pressure",
176 "warning",
177 json!({ "deleted": quota_deleted, "reason": "camera_quota" }),
178 )
179 .await;
180 tracing::warn!(
181 deleted = quota_deleted,
182 "retention: per-camera quota cleanup"
183 );
184 }
185
186 let max = cfg.max_recordings_bytes as i64;
193 let protected_bytes: i64 = sqlx::query_scalar(
194 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE evidence_locked = 1",
195 )
196 .fetch_one(pool)
197 .await?;
198 let budget = max - protected_bytes;
199 let mut size_deleted: u64 = 0;
200
201 if budget <= 0 {
202 let unlocked: i64 = sqlx::query_scalar(
205 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0 AND evidence_locked = 0",
206 )
207 .fetch_one(pool)
208 .await?;
209 if protected_bytes > max {
210 tracing::warn!(
211 protected_bytes,
212 max,
213 "retention: evidence-locked footage exceeds the size cap; not deleting other footage"
214 );
215 let _ = repo::log_event(
216 pool,
217 None,
218 "disk_pressure",
219 "warning",
220 json!({ "reason": "locked_exceeds_cap", "protected_bytes": protected_bytes, "unlocked_bytes": unlocked, "max_bytes": max }),
221 )
222 .await;
223 }
224 } else {
225 loop {
226 let unlocked_total: i64 = sqlx::query_scalar(
227 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0 AND evidence_locked = 0",
228 )
229 .fetch_one(pool)
230 .await?;
231 if unlocked_total <= budget {
232 break;
233 }
234 let batch: Vec<(String, String)> = sqlx::query_as(
235 "SELECT id, path FROM segments WHERE locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
236 )
237 .fetch_all(pool)
238 .await?;
239 if batch.is_empty() {
240 break;
241 }
242 let mut progressed = 0u64;
243 for (seg_id, path) in batch {
244 if unlink_segment(&path).await {
245 sqlx::query("DELETE FROM segments WHERE id = ?")
246 .bind(&seg_id)
247 .execute(pool)
248 .await?;
249 size_deleted += 1;
250 progressed += 1;
251 }
252 }
253 if progressed == 0 {
254 tracing::error!("retention: size-cap prune made no progress (segment file deletes failing); stopping this sweep");
256 break;
257 }
258 }
259 }
260
261 if size_deleted > 0 {
262 let _ = repo::log_event(
263 pool,
264 None,
265 "disk_pressure",
266 "warning",
267 json!({ "deleted": size_deleted, "reason": "size_cap", "max_bytes": max }),
268 )
269 .await;
270 tracing::warn!(deleted = size_deleted, "retention: size-cap cleanup");
271 }
272
273 let floor = cfg.min_free_disk_bytes;
278 let mut disk_deleted: u64 = 0;
279 match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
280 None => {
281 tracing::warn!(
282 "retention: could not read disk stats; free-floor check skipped this sweep"
283 );
284 let _ = repo::log_event(
285 pool,
286 None,
287 "disk_pressure",
288 "warning",
289 json!({ "reason": "disk_stats_unavailable" }),
290 )
291 .await;
292 }
293 Some(d) if floor >= d.total_bytes => {
294 if d.free_bytes < floor {
295 tracing::warn!(
296 floor,
297 total = d.total_bytes,
298 "retention: free-disk floor exceeds total disk size; refusing to prune (misconfigured?)"
299 );
300 let _ = repo::log_event(
301 pool,
302 None,
303 "disk_pressure",
304 "critical",
305 json!({ "reason": "floor_unsatisfiable", "min_free_bytes": floor, "total_bytes": d.total_bytes }),
306 )
307 .await;
308 }
309 }
310 Some(mut prev) => {
311 let mut guard = 0;
312 let mut futile = false;
313 while prev.free_bytes < floor && guard < 200 {
314 guard += 1;
315 let before = prev.free_bytes;
316 let batch: Vec<(String, String)> = sqlx::query_as(
317 "SELECT id, path FROM segments WHERE locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
318 )
319 .fetch_all(pool)
320 .await?;
321 if batch.is_empty() {
322 tracing::warn!(
323 free_bytes = before,
324 floor,
325 "retention: below disk-free floor but no deletable segments remain to prune"
326 );
327 break;
328 }
329 for (seg_id, path) in batch {
330 if unlink_segment(&path).await {
331 sqlx::query("DELETE FROM segments WHERE id = ?")
332 .bind(&seg_id)
333 .execute(pool)
334 .await?;
335 disk_deleted += 1;
336 }
337 }
338 match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
339 Some(d) if d.free_bytes > before => prev = d,
340 Some(_) => {
341 futile = true;
342 break;
343 }
344 None => break,
345 }
346 }
347 if futile {
348 tracing::error!(
349 free_bytes = prev.free_bytes,
350 floor,
351 "retention: pruning recordings is not recovering free space (disk filled by non-recording data?); stopping"
352 );
353 let _ = repo::log_event(
354 pool,
355 None,
356 "disk_pressure",
357 "critical",
358 json!({ "reason": "prune_not_recovering_space", "min_free_bytes": floor, "deleted": disk_deleted }),
359 )
360 .await;
361 }
362 }
363 }
364 if disk_deleted > 0 {
365 let _ = repo::log_event(
366 pool,
367 None,
368 "disk_pressure",
369 "critical",
370 json!({ "deleted": disk_deleted, "reason": "free_floor", "min_free_bytes": floor }),
371 )
372 .await;
373 tracing::warn!(deleted = disk_deleted, "retention: disk-free-floor cleanup");
374 }
375
376 let det_cutoff = Utc::now() - chrono::Duration::hours(cfg.detection_retention_hours.max(1));
378 let pruned = sqlx::query("DELETE FROM detections WHERE created_at < ?")
379 .bind(det_cutoff)
380 .execute(pool)
381 .await?
382 .rows_affected();
383 if pruned > 0 {
384 tracing::info!(deleted = pruned, "retention: pruned old detections");
385 }
386 let ob_pruned = sqlx::query("DELETE FROM outbox WHERE created_at < ?")
388 .bind(det_cutoff)
389 .execute(pool)
390 .await?
391 .rows_affected();
392 if ob_pruned > 0 {
393 tracing::info!(deleted = ob_pruned, "retention: pruned old outbox rows");
394 }
395
396 let old_zone_events: Vec<(String, Option<String>)> =
398 sqlx::query_as("SELECT id, evidence_path FROM zone_events WHERE created_at < ?")
399 .bind(det_cutoff)
400 .fetch_all(pool)
401 .await?;
402 if !old_zone_events.is_empty() {
403 for (_id, evidence) in &old_zone_events {
404 if let Some(name) = evidence.as_deref().and_then(|u| u.rsplit('/').next()) {
405 let _ = tokio::fs::remove_file(cfg.snapshots_dir.join(name)).await;
406 }
407 }
408 let zpruned = sqlx::query("DELETE FROM zone_events WHERE created_at < ?")
409 .bind(det_cutoff)
410 .execute(pool)
411 .await?
412 .rows_affected();
413 tracing::info!(
414 deleted = zpruned,
415 "retention: pruned old zone events + evidence"
416 );
417 }
418
419 let audit_cutoff = Utc::now() - chrono::Duration::days(cfg.audit_retention_days.max(1));
422 let apruned = sqlx::query("DELETE FROM audit_log WHERE created_at < ?")
423 .bind(audit_cutoff)
424 .execute(pool)
425 .await?
426 .rows_affected();
427 if apruned > 0 {
428 tracing::info!(deleted = apruned, "retention: pruned old audit log entries");
429 }
430 let spruned = sqlx::query("DELETE FROM sessions WHERE expires_at < ?")
431 .bind(Utc::now())
432 .execute(pool)
433 .await?
434 .rows_affected();
435 if spruned > 0 {
436 tracing::debug!(deleted = spruned, "retention: pruned expired sessions");
437 }
438
439 let evpruned = sqlx::query("DELETE FROM events WHERE created_at < ?")
444 .bind(audit_cutoff)
445 .execute(pool)
446 .await?
447 .rows_affected();
448 if evpruned > 0 {
449 tracing::info!(deleted = evpruned, "retention: pruned old event-log rows");
450 }
451
452 if cfg.snapshot_retention_hours > 0 {
456 let snap_cutoff = Utc::now() - chrono::Duration::hours(cfg.snapshot_retention_hours);
457 let rows: Vec<(String, String)> =
458 sqlx::query_as("SELECT id, path FROM snapshots WHERE taken_at < ?")
459 .bind(snap_cutoff)
460 .fetch_all(pool)
461 .await?;
462 let mut snap_deleted: u64 = 0;
463 for (snap_id, path) in rows {
464 if unlink_snapshot(&path).await {
465 sqlx::query("DELETE FROM snapshots WHERE id = ?")
466 .bind(&snap_id)
467 .execute(pool)
468 .await?;
469 snap_deleted += 1;
470 }
471 }
472 if snap_deleted > 0 {
473 tracing::info!(deleted = snap_deleted, "retention: pruned old snapshots");
474 }
475 }
476
477 if cfg.archive_retention_hours > 0 {
481 let cutoff = Utc::now() - chrono::Duration::hours(cfg.archive_retention_hours);
482 if let Ok(mut entries) = tokio::fs::read_dir(&cfg.archive_dir).await {
483 let mut removed: u64 = 0;
484 while let Ok(Some(ent)) = entries.next_entry().await {
485 let path = ent.path();
486 if path.extension().and_then(|e| e.to_str()) != Some("zip") {
487 continue;
488 }
489 let stale = ent
490 .metadata()
491 .await
492 .ok()
493 .and_then(|m| m.modified().ok())
494 .map(|t| DateTime::<Utc>::from(t) < cutoff)
495 .unwrap_or(false);
496 if stale && tokio::fs::remove_file(&path).await.is_ok() {
497 removed += 1;
498 }
499 }
500 if removed > 0 {
501 tracing::info!(deleted = removed, "retention: pruned old archive exports");
502 }
503 }
504 let jpruned = sqlx::query(
505 "DELETE FROM backup_jobs WHERE finished_at IS NOT NULL AND finished_at < ?",
506 )
507 .bind(cutoff)
508 .execute(pool)
509 .await?
510 .rows_affected();
511 if jpruned > 0 {
512 tracing::info!(
513 deleted = jpruned,
514 "retention: pruned old finished backup jobs"
515 );
516 }
517 }
518 Ok(())
519}