1use std::sync::Arc;
7use std::time::Duration;
8
9use chrono::{DateTime, Utc};
10use serde_json::json;
11use sqlx::SqlitePool;
12
13use crate::config::Config;
14use crate::repo;
15use crate::services::{settings, storage};
16
17async fn unlink_segment(path: &str) -> bool {
22 match tokio::fs::remove_file(path).await {
23 Ok(()) => true,
24 Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
25 Err(e) => {
26 tracing::error!(path, error = %e, "retention: failed to delete segment file; keeping DB row to retry next sweep");
27 false
28 }
29 }
30}
31
32async fn delete_segment_if_unlocked(
42 pool: &SqlitePool,
43 seg_id: &str,
44 path: &str,
45) -> anyhow::Result<bool> {
46 let removed =
47 sqlx::query("DELETE FROM segments WHERE id = ? AND locked = 0 AND evidence_locked = 0")
48 .bind(seg_id)
49 .execute(pool)
50 .await?
51 .rows_affected();
52 if removed == 1 {
53 unlink_segment(path).await;
54 Ok(true)
55 } else {
56 Ok(false)
57 }
58}
59
60async fn unlink_snapshot(path: &str) -> bool {
64 match tokio::fs::remove_file(path).await {
65 Ok(()) => true,
66 Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
67 Err(e) => {
68 tracing::error!(path, error = %e, "retention: failed to delete snapshot file; keeping DB row to retry next sweep");
69 false
70 }
71 }
72}
73
74pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
75 let mut tick = tokio::time::interval(Duration::from_secs(cfg.retention_interval_s.max(30)));
76 loop {
77 tick.tick().await;
78 if let Err(e) = sweep(&pool, &cfg).await {
79 tracing::error!(error = %e, "retention: sweep failed");
80 }
81 }
82}
83
84async fn sweep(pool: &SqlitePool, cfg: &Config) -> anyhow::Result<()> {
85 let mut age_deleted: u64 = 0;
87 let cams: Vec<(String, i64)> = sqlx::query_as("SELECT id, retention_hours FROM cameras")
88 .fetch_all(pool)
89 .await?;
90 for (id, hours) in cams {
91 let cutoff = Utc::now() - chrono::Duration::hours(hours.max(1));
92 let rows: Vec<(String, String)> = sqlx::query_as(
93 "SELECT id, path FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0 AND end_time < ?",
94 )
95 .bind(&id)
96 .bind(cutoff)
97 .fetch_all(pool)
98 .await?;
99 for (seg_id, path) in rows {
100 if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
101 age_deleted += 1;
102 }
103 }
104 }
105 if age_deleted > 0 {
106 let _ = repo::log_event(
107 pool,
108 None,
109 "retention_delete",
110 "info",
111 json!({ "deleted": age_deleted, "reason": "age" }),
112 )
113 .await;
114 tracing::info!(deleted = age_deleted, "retention: age-based cleanup");
115 }
116
117 let mut quota_deleted: u64 = 0;
124 let quota_cams: Vec<(String, i64)> = sqlx::query_as(
125 "SELECT id, storage_quota_bytes FROM cameras WHERE storage_quota_bytes IS NOT NULL",
126 )
127 .fetch_all(pool)
128 .await?;
129 for (cam_id, quota) in quota_cams {
130 if quota <= 0 {
131 continue;
132 }
133 let protected_bytes: i64 = sqlx::query_scalar(
134 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE camera_id = ? AND evidence_locked = 1",
135 )
136 .bind(&cam_id)
137 .fetch_one(pool)
138 .await?;
139 let budget = quota - protected_bytes;
140 if budget <= 0 {
141 if protected_bytes > quota {
142 tracing::warn!(
143 camera_id = %cam_id,
144 protected_bytes,
145 quota,
146 "retention: evidence-locked footage exceeds the camera quota; not deleting other footage"
147 );
148 let _ = repo::log_event(
149 pool,
150 Some(&cam_id),
151 "disk_pressure",
152 "warning",
153 json!({ "reason": "camera_quota", "camera_id": &cam_id, "protected_bytes": protected_bytes, "quota_bytes": quota }),
154 )
155 .await;
156 }
157 continue;
158 }
159 loop {
160 let deletable_total: i64 = sqlx::query_scalar(
161 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0",
162 )
163 .bind(&cam_id)
164 .fetch_one(pool)
165 .await?;
166 if deletable_total <= budget {
167 break;
168 }
169 let batch: Vec<(String, String, i64)> = sqlx::query_as(
170 "SELECT id, path, size_bytes FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
171 )
172 .bind(&cam_id)
173 .fetch_all(pool)
174 .await?;
175 if batch.is_empty() {
176 break;
177 }
178 let mut remaining = deletable_total;
179 let mut progressed = 0u64;
180 for (seg_id, path, size) in batch {
181 if remaining <= budget {
186 break;
187 }
188 if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
189 remaining -= size;
190 quota_deleted += 1;
191 progressed += 1;
192 }
193 }
194 if progressed == 0 {
195 tracing::error!(camera_id = %cam_id, "retention: camera-quota prune made no progress (segment file deletes failing); stopping this camera");
196 break;
197 }
198 }
199 }
200 if quota_deleted > 0 {
201 let _ = repo::log_event(
202 pool,
203 None,
204 "disk_pressure",
205 "warning",
206 json!({ "deleted": quota_deleted, "reason": "camera_quota" }),
207 )
208 .await;
209 tracing::warn!(
210 deleted = quota_deleted,
211 "retention: per-camera quota cleanup"
212 );
213 }
214
215 let max = settings::get_i64(pool, settings::RECORDING_MAX_BYTES)
225 .await
226 .filter(|&v| v > 0)
227 .unwrap_or(cfg.max_recordings_bytes as i64);
228 let protected_bytes: i64 = sqlx::query_scalar(
229 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE evidence_locked = 1",
230 )
231 .fetch_one(pool)
232 .await?;
233 let budget = max - protected_bytes;
234 let mut size_deleted: u64 = 0;
235
236 if budget <= 0 {
237 let unlocked: i64 = sqlx::query_scalar(
240 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0 AND evidence_locked = 0",
241 )
242 .fetch_one(pool)
243 .await?;
244 if protected_bytes > max {
245 tracing::warn!(
246 protected_bytes,
247 max,
248 "retention: evidence-locked footage exceeds the size cap; not deleting other footage"
249 );
250 let _ = repo::log_event(
251 pool,
252 None,
253 "disk_pressure",
254 "warning",
255 json!({ "reason": "locked_exceeds_cap", "protected_bytes": protected_bytes, "unlocked_bytes": unlocked, "max_bytes": max }),
256 )
257 .await;
258 }
259 } else {
260 loop {
261 let unlocked_total: i64 = sqlx::query_scalar(
262 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0 AND evidence_locked = 0",
263 )
264 .fetch_one(pool)
265 .await?;
266 if unlocked_total <= budget {
267 break;
268 }
269 let batch: Vec<(String, String, i64)> = sqlx::query_as(
270 "SELECT id, path, size_bytes FROM segments WHERE locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
271 )
272 .fetch_all(pool)
273 .await?;
274 if batch.is_empty() {
275 break;
276 }
277 let mut remaining = unlocked_total;
278 let mut progressed = 0u64;
279 for (seg_id, path, size) in batch {
280 if remaining <= budget {
282 break;
283 }
284 if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
285 remaining -= size;
286 size_deleted += 1;
287 progressed += 1;
288 }
289 }
290 if progressed == 0 {
291 tracing::error!("retention: size-cap prune made no progress (segment file deletes failing); stopping this sweep");
293 break;
294 }
295 }
296 }
297
298 if size_deleted > 0 {
299 let _ = repo::log_event(
300 pool,
301 None,
302 "disk_pressure",
303 "warning",
304 json!({ "deleted": size_deleted, "reason": "size_cap", "max_bytes": max }),
305 )
306 .await;
307 tracing::warn!(deleted = size_deleted, "retention: size-cap cleanup");
308 }
309
310 let floor = settings::get_i64(pool, settings::RECORDING_MIN_FREE_BYTES)
317 .await
318 .filter(|&v| v >= 0)
319 .map(|v| v as u64)
320 .unwrap_or(cfg.min_free_disk_bytes);
321 let mut disk_deleted: u64 = 0;
322 match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
323 None => {
324 tracing::warn!(
325 "retention: could not read disk stats; free-floor check skipped this sweep"
326 );
327 let _ = repo::log_event(
328 pool,
329 None,
330 "disk_pressure",
331 "warning",
332 json!({ "reason": "disk_stats_unavailable" }),
333 )
334 .await;
335 }
336 Some(d) if floor >= d.total_bytes => {
337 if d.free_bytes < floor {
338 tracing::warn!(
339 floor,
340 total = d.total_bytes,
341 "retention: free-disk floor exceeds total disk size; refusing to prune (misconfigured?)"
342 );
343 let _ = repo::log_event(
344 pool,
345 None,
346 "disk_pressure",
347 "critical",
348 json!({ "reason": "floor_unsatisfiable", "min_free_bytes": floor, "total_bytes": d.total_bytes }),
349 )
350 .await;
351 }
352 }
353 Some(mut prev) => {
354 let mut guard = 0;
355 let mut futile = false;
356 while prev.free_bytes < floor && guard < 200 {
357 guard += 1;
358 let before = prev.free_bytes;
359 let batch: Vec<(String, String)> = sqlx::query_as(
360 "SELECT id, path FROM segments WHERE locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
361 )
362 .fetch_all(pool)
363 .await?;
364 if batch.is_empty() {
365 tracing::warn!(
366 free_bytes = before,
367 floor,
368 "retention: below disk-free floor but no deletable segments remain to prune"
369 );
370 break;
371 }
372 for (seg_id, path) in batch {
373 if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
374 disk_deleted += 1;
375 }
376 }
377 match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
378 Some(d) if d.free_bytes > before => prev = d,
379 Some(_) => {
380 futile = true;
381 break;
382 }
383 None => break,
384 }
385 }
386 if futile {
387 tracing::error!(
388 free_bytes = prev.free_bytes,
389 floor,
390 "retention: pruning recordings is not recovering free space (disk filled by non-recording data?); stopping"
391 );
392 let _ = repo::log_event(
393 pool,
394 None,
395 "disk_pressure",
396 "critical",
397 json!({ "reason": "prune_not_recovering_space", "min_free_bytes": floor, "deleted": disk_deleted }),
398 )
399 .await;
400 }
401 }
402 }
403 if disk_deleted > 0 {
404 let _ = repo::log_event(
405 pool,
406 None,
407 "disk_pressure",
408 "critical",
409 json!({ "deleted": disk_deleted, "reason": "free_floor", "min_free_bytes": floor }),
410 )
411 .await;
412 tracing::warn!(deleted = disk_deleted, "retention: disk-free-floor cleanup");
413 }
414
415 let det_cutoff = Utc::now() - chrono::Duration::hours(cfg.detection_retention_hours.max(1));
417 let pruned = sqlx::query("DELETE FROM detections WHERE created_at < ?")
418 .bind(det_cutoff)
419 .execute(pool)
420 .await?
421 .rows_affected();
422 if pruned > 0 {
423 tracing::info!(deleted = pruned, "retention: pruned old detections");
424 }
425 let ob_pruned = sqlx::query("DELETE FROM outbox WHERE created_at < ?")
427 .bind(det_cutoff)
428 .execute(pool)
429 .await?
430 .rows_affected();
431 if ob_pruned > 0 {
432 tracing::info!(deleted = ob_pruned, "retention: pruned old outbox rows");
433 }
434
435 let old_zone_events: Vec<(String, Option<String>)> =
437 sqlx::query_as("SELECT id, evidence_path FROM zone_events WHERE created_at < ?")
438 .bind(det_cutoff)
439 .fetch_all(pool)
440 .await?;
441 if !old_zone_events.is_empty() {
442 for (_id, evidence) in &old_zone_events {
443 if let Some(name) = evidence.as_deref().and_then(|u| u.rsplit('/').next()) {
444 let _ = tokio::fs::remove_file(cfg.snapshots_dir.join(name)).await;
445 }
446 }
447 let zpruned = sqlx::query("DELETE FROM zone_events WHERE created_at < ?")
448 .bind(det_cutoff)
449 .execute(pool)
450 .await?
451 .rows_affected();
452 tracing::info!(
453 deleted = zpruned,
454 "retention: pruned old zone events + evidence"
455 );
456 }
457
458 let audit_cutoff = Utc::now() - chrono::Duration::days(cfg.audit_retention_days.max(1));
461 let apruned = sqlx::query("DELETE FROM audit_log WHERE created_at < ?")
462 .bind(audit_cutoff)
463 .execute(pool)
464 .await?
465 .rows_affected();
466 if apruned > 0 {
467 tracing::info!(deleted = apruned, "retention: pruned old audit log entries");
468 }
469 let spruned = sqlx::query("DELETE FROM sessions WHERE expires_at < ?")
470 .bind(Utc::now())
471 .execute(pool)
472 .await?
473 .rows_affected();
474 if spruned > 0 {
475 tracing::debug!(deleted = spruned, "retention: pruned expired sessions");
476 }
477
478 let evpruned = sqlx::query("DELETE FROM events WHERE created_at < ?")
483 .bind(audit_cutoff)
484 .execute(pool)
485 .await?
486 .rows_affected();
487 if evpruned > 0 {
488 tracing::info!(deleted = evpruned, "retention: pruned old event-log rows");
489 }
490
491 let wdpruned = sqlx::query("DELETE FROM webhook_deliveries WHERE created_at < ?")
495 .bind(audit_cutoff)
496 .execute(pool)
497 .await?
498 .rows_affected();
499 if wdpruned > 0 {
500 tracing::info!(
501 deleted = wdpruned,
502 "retention: pruned old webhook-delivery rows"
503 );
504 }
505
506 let gpruned = sqlx::query(
509 "DELETE FROM recording_gaps WHERE fill_state IN ('filled','failed') AND created_at < ?",
510 )
511 .bind(audit_cutoff)
512 .execute(pool)
513 .await?
514 .rows_affected();
515 if gpruned > 0 {
516 tracing::info!(
517 deleted = gpruned,
518 "retention: pruned resolved recording-gap rows"
519 );
520 }
521
522 if cfg.snapshot_retention_hours > 0 {
526 let snap_cutoff = Utc::now() - chrono::Duration::hours(cfg.snapshot_retention_hours);
527 let rows: Vec<(String, String)> =
528 sqlx::query_as("SELECT id, path FROM snapshots WHERE taken_at < ?")
529 .bind(snap_cutoff)
530 .fetch_all(pool)
531 .await?;
532 let mut snap_deleted: u64 = 0;
533 for (snap_id, path) in rows {
534 if unlink_snapshot(&path).await {
535 sqlx::query("DELETE FROM snapshots WHERE id = ?")
536 .bind(&snap_id)
537 .execute(pool)
538 .await?;
539 snap_deleted += 1;
540 }
541 }
542 if snap_deleted > 0 {
543 tracing::info!(deleted = snap_deleted, "retention: pruned old snapshots");
544 }
545 }
546
547 if cfg.archive_retention_hours > 0 {
551 let cutoff = Utc::now() - chrono::Duration::hours(cfg.archive_retention_hours);
552 if let Ok(mut entries) = tokio::fs::read_dir(&cfg.archive_dir).await {
553 let mut removed: u64 = 0;
554 while let Ok(Some(ent)) = entries.next_entry().await {
555 let path = ent.path();
556 if path.extension().and_then(|e| e.to_str()) != Some("zip") {
557 continue;
558 }
559 let stale = ent
560 .metadata()
561 .await
562 .ok()
563 .and_then(|m| m.modified().ok())
564 .map(|t| DateTime::<Utc>::from(t) < cutoff)
565 .unwrap_or(false);
566 if stale && tokio::fs::remove_file(&path).await.is_ok() {
567 removed += 1;
568 }
569 }
570 if removed > 0 {
571 tracing::info!(deleted = removed, "retention: pruned old archive exports");
572 }
573 }
574 let jpruned = sqlx::query(
575 "DELETE FROM backup_jobs WHERE finished_at IS NOT NULL AND finished_at < ?",
576 )
577 .bind(cutoff)
578 .execute(pool)
579 .await?
580 .rows_affected();
581 if jpruned > 0 {
582 tracing::info!(
583 deleted = jpruned,
584 "retention: pruned old finished backup jobs"
585 );
586 }
587 }
588 Ok(())
589}
590
591#[cfg(test)]
592mod tests {
593 use super::*;
594
595 fn unique_path(prefix: &str) -> std::path::PathBuf {
598 use std::sync::atomic::{AtomicU64, Ordering};
599 static N: AtomicU64 = AtomicU64::new(0);
600 let n = N.fetch_add(1, Ordering::Relaxed);
601 std::env::temp_dir().join(format!("{prefix}-{}-{n}", std::process::id()))
602 }
603
604 async fn mem_pool() -> SqlitePool {
605 let pool = sqlx::sqlite::SqlitePoolOptions::new()
606 .max_connections(1)
607 .connect("sqlite::memory:")
608 .await
609 .unwrap();
610 crate::db::run_migrations(&pool).await.unwrap();
611 pool
612 }
613
614 fn test_cfg() -> Config {
618 let mut cfg = Config::from_env();
619 cfg.max_recordings_bytes = u64::MAX / 4;
620 cfg.min_free_disk_bytes = 0;
621 cfg.recordings_dir = std::env::temp_dir();
622 cfg.snapshot_retention_hours = 0;
623 cfg.archive_retention_hours = 0;
624 cfg.detection_retention_hours = 168;
625 cfg.audit_retention_days = 365;
626 cfg
627 }
628
629 async fn insert_camera(pool: &SqlitePool, id: &str, retention_hours: i64, quota: Option<i64>) {
630 let now = Utc::now();
631 sqlx::query(
632 "INSERT INTO cameras (id, name, retention_hours, storage_quota_bytes, created_at, updated_at)
633 VALUES (?, ?, ?, ?, ?, ?)",
634 )
635 .bind(id)
636 .bind(id)
637 .bind(retention_hours)
638 .bind(quota)
639 .bind(now)
640 .bind(now)
641 .execute(pool)
642 .await
643 .unwrap();
644 }
645
646 async fn insert_segment(
647 pool: &SqlitePool,
648 id: &str,
649 camera_id: &str,
650 end: DateTime<Utc>,
651 size_bytes: i64,
652 locked: i64,
653 evidence_locked: i64,
654 ) {
655 sqlx::query(
656 "INSERT INTO segments
657 (id, camera_id, path, start_time, end_time, duration_s, size_bytes, locked, evidence_locked, created_at)
658 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
659 )
660 .bind(id)
661 .bind(camera_id)
662 .bind(format!("/nonexistent/heldar-test/{id}.mp4"))
664 .bind(end)
665 .bind(end)
666 .bind(60.0_f64)
667 .bind(size_bytes)
668 .bind(locked)
669 .bind(evidence_locked)
670 .bind(end)
671 .execute(pool)
672 .await
673 .unwrap();
674 }
675
676 async fn seg_exists(pool: &SqlitePool, id: &str) -> bool {
677 let c: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM segments WHERE id = ?")
678 .bind(id)
679 .fetch_one(pool)
680 .await
681 .unwrap();
682 c == 1
683 }
684
685 async fn seg_count(pool: &SqlitePool) -> i64 {
686 sqlx::query_scalar("SELECT COUNT(*) FROM segments")
687 .fetch_one(pool)
688 .await
689 .unwrap()
690 }
691
692 async fn event_type_count(pool: &SqlitePool, event_type: &str) -> i64 {
693 sqlx::query_scalar("SELECT COUNT(*) FROM events WHERE event_type = ?")
694 .bind(event_type)
695 .fetch_one(pool)
696 .await
697 .unwrap()
698 }
699
700 async fn camera_quota_event_count(pool: &SqlitePool) -> i64 {
701 sqlx::query_scalar(
702 "SELECT COUNT(*) FROM events WHERE event_type = 'disk_pressure' AND payload LIKE '%camera_quota%'",
703 )
704 .fetch_one(pool)
705 .await
706 .unwrap()
707 }
708
709 #[tokio::test]
712 async fn unlink_segment_reports_removable_for_missing_path() {
713 assert!(unlink_segment("/nonexistent/heldar/definitely-not-here.mp4").await);
715 }
716
717 #[tokio::test]
718 async fn unlink_segment_deletes_existing_file() {
719 let p = unique_path("heldar-seg");
720 tokio::fs::write(&p, b"x").await.unwrap();
721 assert!(p.exists());
722 assert!(unlink_segment(p.to_str().unwrap()).await);
723 assert!(!p.exists());
724 }
725
726 #[tokio::test]
727 async fn unlink_segment_keeps_row_for_directory() {
728 let d = unique_path("heldar-dir");
730 tokio::fs::create_dir(&d).await.unwrap();
731 assert!(!unlink_segment(d.to_str().unwrap()).await);
732 assert!(d.exists());
733 let _ = tokio::fs::remove_dir(&d).await;
734 }
735
736 #[tokio::test]
737 async fn unlink_snapshot_handles_missing_and_existing() {
738 assert!(unlink_snapshot("/nonexistent/heldar/none.jpg").await);
740 let p = unique_path("heldar-snap");
741 tokio::fs::write(&p, b"x").await.unwrap();
742 assert!(unlink_snapshot(p.to_str().unwrap()).await);
743 assert!(!p.exists());
744 }
745
746 #[tokio::test]
749 async fn sweep_age_retention_deletes_only_old_unlocked() {
750 let pool = mem_pool().await;
751 let cfg = test_cfg();
752 let now = Utc::now();
753
754 insert_camera(&pool, "cam_age", 24, None).await;
755 insert_segment(
757 &pool,
758 "seg_recent",
759 "cam_age",
760 now - chrono::Duration::hours(1),
761 100,
762 0,
763 0,
764 )
765 .await;
766 insert_segment(
768 &pool,
769 "seg_old",
770 "cam_age",
771 now - chrono::Duration::hours(48),
772 100,
773 0,
774 0,
775 )
776 .await;
777 insert_segment(
779 &pool,
780 "seg_old_locked",
781 "cam_age",
782 now - chrono::Duration::hours(48),
783 100,
784 1,
785 0,
786 )
787 .await;
788 insert_segment(
790 &pool,
791 "seg_old_ev",
792 "cam_age",
793 now - chrono::Duration::hours(48),
794 100,
795 0,
796 1,
797 )
798 .await;
799
800 sweep(&pool, &cfg).await.unwrap();
801
802 assert!(seg_exists(&pool, "seg_recent").await);
803 assert!(
804 !seg_exists(&pool, "seg_old").await,
805 "old unlocked segment should be pruned by age"
806 );
807 assert!(
808 seg_exists(&pool, "seg_old_locked").await,
809 "read-locked segment must survive age prune"
810 );
811 assert!(
812 seg_exists(&pool, "seg_old_ev").await,
813 "evidence-locked segment must survive age prune"
814 );
815 assert_eq!(seg_count(&pool).await, 3);
816 assert_eq!(event_type_count(&pool, "retention_delete").await, 1);
818 }
819
820 #[tokio::test]
823 async fn sweep_camera_quota_prunes_only_to_budget_keeps_evidence() {
824 let pool = mem_pool().await;
825 let cfg = test_cfg();
826 let now = Utc::now();
827
828 insert_camera(&pool, "cam_q", 100_000, Some(1000)).await;
830 insert_segment(
832 &pool,
833 "sL",
834 "cam_q",
835 now - chrono::Duration::hours(5),
836 600,
837 0,
838 1,
839 )
840 .await;
841 insert_segment(
843 &pool,
844 "s1",
845 "cam_q",
846 now - chrono::Duration::hours(3),
847 400,
848 0,
849 0,
850 )
851 .await;
852 insert_segment(
853 &pool,
854 "s2",
855 "cam_q",
856 now - chrono::Duration::hours(2),
857 400,
858 0,
859 0,
860 )
861 .await;
862 insert_segment(
863 &pool,
864 "s3",
865 "cam_q",
866 now - chrono::Duration::hours(1),
867 400,
868 0,
869 0,
870 )
871 .await;
872
873 sweep(&pool, &cfg).await.unwrap();
874
875 assert!(
879 seg_exists(&pool, "sL").await,
880 "evidence-locked footage must survive the quota prune"
881 );
882 assert!(
883 !seg_exists(&pool, "s1").await,
884 "oldest over-budget segment is pruned"
885 );
886 assert!(
887 !seg_exists(&pool, "s2").await,
888 "second-oldest pruned to reach budget"
889 );
890 assert!(
891 seg_exists(&pool, "s3").await,
892 "s3 is within quota once s1+s2 are gone and must NOT be over-deleted"
893 );
894 assert_eq!(
895 seg_count(&pool).await,
896 2,
897 "only s1,s2 pruned to reach budget; sL+s3 remain"
898 );
899 assert!(
900 camera_quota_event_count(&pool).await >= 1,
901 "a camera_quota disk_pressure event should be logged"
902 );
903 }
904
905 #[tokio::test]
906 async fn delete_segment_if_unlocked_spares_locked_rows() {
907 let pool = mem_pool().await;
911 let now = Utc::now();
912 insert_camera(&pool, "cam_t", 100_000, None).await;
913 insert_segment(&pool, "held", "cam_t", now, 100, 0, 1).await; insert_segment(&pool, "rlok", "cam_t", now, 100, 1, 0).await; insert_segment(&pool, "free", "cam_t", now, 100, 0, 0).await; assert!(
918 !delete_segment_if_unlocked(&pool, "held", "/nonexistent/held.mp4")
919 .await
920 .unwrap(),
921 "evidence-locked row must not be removable"
922 );
923 assert!(
924 !delete_segment_if_unlocked(&pool, "rlok", "/nonexistent/rlok.mp4")
925 .await
926 .unwrap(),
927 "read-locked row must not be removable"
928 );
929 assert!(seg_exists(&pool, "held").await);
930 assert!(seg_exists(&pool, "rlok").await);
931
932 assert!(
933 delete_segment_if_unlocked(&pool, "free", "/nonexistent/free.mp4")
934 .await
935 .unwrap(),
936 "unlocked row is removed"
937 );
938 assert!(!seg_exists(&pool, "free").await);
939 }
940
941 #[tokio::test]
942 async fn sweep_camera_quota_protected_exceeds_deletes_nothing() {
943 let pool = mem_pool().await;
944 let cfg = test_cfg();
945 let now = Utc::now();
946
947 insert_camera(&pool, "cam_over", 100_000, Some(100)).await;
950 insert_segment(
951 &pool,
952 "ovL",
953 "cam_over",
954 now - chrono::Duration::hours(5),
955 500,
956 0,
957 1,
958 )
959 .await;
960 insert_segment(
961 &pool,
962 "ov1",
963 "cam_over",
964 now - chrono::Duration::hours(1),
965 50,
966 0,
967 0,
968 )
969 .await;
970
971 sweep(&pool, &cfg).await.unwrap();
972
973 assert!(seg_exists(&pool, "ovL").await);
974 assert!(
975 seg_exists(&pool, "ov1").await,
976 "other footage must not be wiped when protected footage exceeds the quota"
977 );
978 assert_eq!(seg_count(&pool).await, 2);
979 assert!(
980 camera_quota_event_count(&pool).await >= 1,
981 "a camera_quota warning should be logged"
982 );
983 }
984
985 #[tokio::test]
988 async fn sweep_prunes_old_detections() {
989 let pool = mem_pool().await;
990 let cfg = test_cfg(); insert_camera(&pool, "cam_d", 24, None).await;
993 let now = Utc::now();
994 sqlx::query(
996 "INSERT INTO detections (id, camera_id, task_type, timestamp, created_at) VALUES (?, ?, ?, ?, ?)",
997 )
998 .bind("det_old")
999 .bind("cam_d")
1000 .bind("object")
1001 .bind(now - chrono::Duration::hours(200))
1002 .bind(now - chrono::Duration::hours(200))
1003 .execute(&pool)
1004 .await
1005 .unwrap();
1006 sqlx::query(
1008 "INSERT INTO detections (id, camera_id, task_type, timestamp, created_at) VALUES (?, ?, ?, ?, ?)",
1009 )
1010 .bind("det_new")
1011 .bind("cam_d")
1012 .bind("object")
1013 .bind(now - chrono::Duration::hours(1))
1014 .bind(now - chrono::Duration::hours(1))
1015 .execute(&pool)
1016 .await
1017 .unwrap();
1018
1019 sweep(&pool, &cfg).await.unwrap();
1020
1021 let remaining: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM detections")
1022 .fetch_one(&pool)
1023 .await
1024 .unwrap();
1025 assert_eq!(remaining, 1);
1026 let kept: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM detections WHERE id = ?")
1027 .bind("det_new")
1028 .fetch_one(&pool)
1029 .await
1030 .unwrap();
1031 assert_eq!(kept, 1, "the recent detection must be retained");
1032 }
1033}