1use std::sync::Arc;
7use std::time::Duration;
8
9use chrono::{DateTime, Utc};
10use serde_json::json;
11use sqlx::SqlitePool;
12
13use crate::config::Config;
14use crate::repo;
15use crate::services::storage;
16
17async fn unlink_segment(path: &str) -> bool {
22 match tokio::fs::remove_file(path).await {
23 Ok(()) => true,
24 Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
25 Err(e) => {
26 tracing::error!(path, error = %e, "retention: failed to delete segment file; keeping DB row to retry next sweep");
27 false
28 }
29 }
30}
31
32async fn delete_segment_if_unlocked(
42 pool: &SqlitePool,
43 seg_id: &str,
44 path: &str,
45) -> anyhow::Result<bool> {
46 let removed =
47 sqlx::query("DELETE FROM segments WHERE id = ? AND locked = 0 AND evidence_locked = 0")
48 .bind(seg_id)
49 .execute(pool)
50 .await?
51 .rows_affected();
52 if removed == 1 {
53 unlink_segment(path).await;
54 Ok(true)
55 } else {
56 Ok(false)
57 }
58}
59
60async fn unlink_snapshot(path: &str) -> bool {
64 match tokio::fs::remove_file(path).await {
65 Ok(()) => true,
66 Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
67 Err(e) => {
68 tracing::error!(path, error = %e, "retention: failed to delete snapshot file; keeping DB row to retry next sweep");
69 false
70 }
71 }
72}
73
74pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
75 let mut tick = tokio::time::interval(Duration::from_secs(cfg.retention_interval_s.max(30)));
76 loop {
77 tick.tick().await;
78 if let Err(e) = sweep(&pool, &cfg).await {
79 tracing::error!(error = %e, "retention: sweep failed");
80 }
81 }
82}
83
84async fn sweep(pool: &SqlitePool, cfg: &Config) -> anyhow::Result<()> {
85 let mut age_deleted: u64 = 0;
87 let cams: Vec<(String, i64)> = sqlx::query_as("SELECT id, retention_hours FROM cameras")
88 .fetch_all(pool)
89 .await?;
90 for (id, hours) in cams {
91 let cutoff = Utc::now() - chrono::Duration::hours(hours.max(1));
92 let rows: Vec<(String, String)> = sqlx::query_as(
93 "SELECT id, path FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0 AND end_time < ?",
94 )
95 .bind(&id)
96 .bind(cutoff)
97 .fetch_all(pool)
98 .await?;
99 for (seg_id, path) in rows {
100 if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
101 age_deleted += 1;
102 }
103 }
104 }
105 if age_deleted > 0 {
106 let _ = repo::log_event(
107 pool,
108 None,
109 "retention_delete",
110 "info",
111 json!({ "deleted": age_deleted, "reason": "age" }),
112 )
113 .await;
114 tracing::info!(deleted = age_deleted, "retention: age-based cleanup");
115 }
116
117 let mut quota_deleted: u64 = 0;
124 let quota_cams: Vec<(String, i64)> = sqlx::query_as(
125 "SELECT id, storage_quota_bytes FROM cameras WHERE storage_quota_bytes IS NOT NULL",
126 )
127 .fetch_all(pool)
128 .await?;
129 for (cam_id, quota) in quota_cams {
130 if quota <= 0 {
131 continue;
132 }
133 let protected_bytes: i64 = sqlx::query_scalar(
134 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE camera_id = ? AND evidence_locked = 1",
135 )
136 .bind(&cam_id)
137 .fetch_one(pool)
138 .await?;
139 let budget = quota - protected_bytes;
140 if budget <= 0 {
141 if protected_bytes > quota {
142 tracing::warn!(
143 camera_id = %cam_id,
144 protected_bytes,
145 quota,
146 "retention: evidence-locked footage exceeds the camera quota; not deleting other footage"
147 );
148 let _ = repo::log_event(
149 pool,
150 Some(&cam_id),
151 "disk_pressure",
152 "warning",
153 json!({ "reason": "camera_quota", "camera_id": &cam_id, "protected_bytes": protected_bytes, "quota_bytes": quota }),
154 )
155 .await;
156 }
157 continue;
158 }
159 loop {
160 let deletable_total: i64 = sqlx::query_scalar(
161 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0",
162 )
163 .bind(&cam_id)
164 .fetch_one(pool)
165 .await?;
166 if deletable_total <= budget {
167 break;
168 }
169 let batch: Vec<(String, String, i64)> = sqlx::query_as(
170 "SELECT id, path, size_bytes FROM segments WHERE camera_id = ? AND locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
171 )
172 .bind(&cam_id)
173 .fetch_all(pool)
174 .await?;
175 if batch.is_empty() {
176 break;
177 }
178 let mut remaining = deletable_total;
179 let mut progressed = 0u64;
180 for (seg_id, path, size) in batch {
181 if remaining <= budget {
186 break;
187 }
188 if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
189 remaining -= size;
190 quota_deleted += 1;
191 progressed += 1;
192 }
193 }
194 if progressed == 0 {
195 tracing::error!(camera_id = %cam_id, "retention: camera-quota prune made no progress (segment file deletes failing); stopping this camera");
196 break;
197 }
198 }
199 }
200 if quota_deleted > 0 {
201 let _ = repo::log_event(
202 pool,
203 None,
204 "disk_pressure",
205 "warning",
206 json!({ "deleted": quota_deleted, "reason": "camera_quota" }),
207 )
208 .await;
209 tracing::warn!(
210 deleted = quota_deleted,
211 "retention: per-camera quota cleanup"
212 );
213 }
214
215 let max = cfg.max_recordings_bytes as i64;
222 let protected_bytes: i64 = sqlx::query_scalar(
223 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE evidence_locked = 1",
224 )
225 .fetch_one(pool)
226 .await?;
227 let budget = max - protected_bytes;
228 let mut size_deleted: u64 = 0;
229
230 if budget <= 0 {
231 let unlocked: i64 = sqlx::query_scalar(
234 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0 AND evidence_locked = 0",
235 )
236 .fetch_one(pool)
237 .await?;
238 if protected_bytes > max {
239 tracing::warn!(
240 protected_bytes,
241 max,
242 "retention: evidence-locked footage exceeds the size cap; not deleting other footage"
243 );
244 let _ = repo::log_event(
245 pool,
246 None,
247 "disk_pressure",
248 "warning",
249 json!({ "reason": "locked_exceeds_cap", "protected_bytes": protected_bytes, "unlocked_bytes": unlocked, "max_bytes": max }),
250 )
251 .await;
252 }
253 } else {
254 loop {
255 let unlocked_total: i64 = sqlx::query_scalar(
256 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0 AND evidence_locked = 0",
257 )
258 .fetch_one(pool)
259 .await?;
260 if unlocked_total <= budget {
261 break;
262 }
263 let batch: Vec<(String, String, i64)> = sqlx::query_as(
264 "SELECT id, path, size_bytes FROM segments WHERE locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
265 )
266 .fetch_all(pool)
267 .await?;
268 if batch.is_empty() {
269 break;
270 }
271 let mut remaining = unlocked_total;
272 let mut progressed = 0u64;
273 for (seg_id, path, size) in batch {
274 if remaining <= budget {
276 break;
277 }
278 if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
279 remaining -= size;
280 size_deleted += 1;
281 progressed += 1;
282 }
283 }
284 if progressed == 0 {
285 tracing::error!("retention: size-cap prune made no progress (segment file deletes failing); stopping this sweep");
287 break;
288 }
289 }
290 }
291
292 if size_deleted > 0 {
293 let _ = repo::log_event(
294 pool,
295 None,
296 "disk_pressure",
297 "warning",
298 json!({ "deleted": size_deleted, "reason": "size_cap", "max_bytes": max }),
299 )
300 .await;
301 tracing::warn!(deleted = size_deleted, "retention: size-cap cleanup");
302 }
303
304 let floor = cfg.min_free_disk_bytes;
309 let mut disk_deleted: u64 = 0;
310 match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
311 None => {
312 tracing::warn!(
313 "retention: could not read disk stats; free-floor check skipped this sweep"
314 );
315 let _ = repo::log_event(
316 pool,
317 None,
318 "disk_pressure",
319 "warning",
320 json!({ "reason": "disk_stats_unavailable" }),
321 )
322 .await;
323 }
324 Some(d) if floor >= d.total_bytes => {
325 if d.free_bytes < floor {
326 tracing::warn!(
327 floor,
328 total = d.total_bytes,
329 "retention: free-disk floor exceeds total disk size; refusing to prune (misconfigured?)"
330 );
331 let _ = repo::log_event(
332 pool,
333 None,
334 "disk_pressure",
335 "critical",
336 json!({ "reason": "floor_unsatisfiable", "min_free_bytes": floor, "total_bytes": d.total_bytes }),
337 )
338 .await;
339 }
340 }
341 Some(mut prev) => {
342 let mut guard = 0;
343 let mut futile = false;
344 while prev.free_bytes < floor && guard < 200 {
345 guard += 1;
346 let before = prev.free_bytes;
347 let batch: Vec<(String, String)> = sqlx::query_as(
348 "SELECT id, path FROM segments WHERE locked = 0 AND evidence_locked = 0 ORDER BY end_time ASC LIMIT 20",
349 )
350 .fetch_all(pool)
351 .await?;
352 if batch.is_empty() {
353 tracing::warn!(
354 free_bytes = before,
355 floor,
356 "retention: below disk-free floor but no deletable segments remain to prune"
357 );
358 break;
359 }
360 for (seg_id, path) in batch {
361 if delete_segment_if_unlocked(pool, &seg_id, &path).await? {
362 disk_deleted += 1;
363 }
364 }
365 match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
366 Some(d) if d.free_bytes > before => prev = d,
367 Some(_) => {
368 futile = true;
369 break;
370 }
371 None => break,
372 }
373 }
374 if futile {
375 tracing::error!(
376 free_bytes = prev.free_bytes,
377 floor,
378 "retention: pruning recordings is not recovering free space (disk filled by non-recording data?); stopping"
379 );
380 let _ = repo::log_event(
381 pool,
382 None,
383 "disk_pressure",
384 "critical",
385 json!({ "reason": "prune_not_recovering_space", "min_free_bytes": floor, "deleted": disk_deleted }),
386 )
387 .await;
388 }
389 }
390 }
391 if disk_deleted > 0 {
392 let _ = repo::log_event(
393 pool,
394 None,
395 "disk_pressure",
396 "critical",
397 json!({ "deleted": disk_deleted, "reason": "free_floor", "min_free_bytes": floor }),
398 )
399 .await;
400 tracing::warn!(deleted = disk_deleted, "retention: disk-free-floor cleanup");
401 }
402
403 let det_cutoff = Utc::now() - chrono::Duration::hours(cfg.detection_retention_hours.max(1));
405 let pruned = sqlx::query("DELETE FROM detections WHERE created_at < ?")
406 .bind(det_cutoff)
407 .execute(pool)
408 .await?
409 .rows_affected();
410 if pruned > 0 {
411 tracing::info!(deleted = pruned, "retention: pruned old detections");
412 }
413 let ob_pruned = sqlx::query("DELETE FROM outbox WHERE created_at < ?")
415 .bind(det_cutoff)
416 .execute(pool)
417 .await?
418 .rows_affected();
419 if ob_pruned > 0 {
420 tracing::info!(deleted = ob_pruned, "retention: pruned old outbox rows");
421 }
422
423 let old_zone_events: Vec<(String, Option<String>)> =
425 sqlx::query_as("SELECT id, evidence_path FROM zone_events WHERE created_at < ?")
426 .bind(det_cutoff)
427 .fetch_all(pool)
428 .await?;
429 if !old_zone_events.is_empty() {
430 for (_id, evidence) in &old_zone_events {
431 if let Some(name) = evidence.as_deref().and_then(|u| u.rsplit('/').next()) {
432 let _ = tokio::fs::remove_file(cfg.snapshots_dir.join(name)).await;
433 }
434 }
435 let zpruned = sqlx::query("DELETE FROM zone_events WHERE created_at < ?")
436 .bind(det_cutoff)
437 .execute(pool)
438 .await?
439 .rows_affected();
440 tracing::info!(
441 deleted = zpruned,
442 "retention: pruned old zone events + evidence"
443 );
444 }
445
446 let audit_cutoff = Utc::now() - chrono::Duration::days(cfg.audit_retention_days.max(1));
449 let apruned = sqlx::query("DELETE FROM audit_log WHERE created_at < ?")
450 .bind(audit_cutoff)
451 .execute(pool)
452 .await?
453 .rows_affected();
454 if apruned > 0 {
455 tracing::info!(deleted = apruned, "retention: pruned old audit log entries");
456 }
457 let spruned = sqlx::query("DELETE FROM sessions WHERE expires_at < ?")
458 .bind(Utc::now())
459 .execute(pool)
460 .await?
461 .rows_affected();
462 if spruned > 0 {
463 tracing::debug!(deleted = spruned, "retention: pruned expired sessions");
464 }
465
466 let evpruned = sqlx::query("DELETE FROM events WHERE created_at < ?")
471 .bind(audit_cutoff)
472 .execute(pool)
473 .await?
474 .rows_affected();
475 if evpruned > 0 {
476 tracing::info!(deleted = evpruned, "retention: pruned old event-log rows");
477 }
478
479 let wdpruned = sqlx::query("DELETE FROM webhook_deliveries WHERE created_at < ?")
483 .bind(audit_cutoff)
484 .execute(pool)
485 .await?
486 .rows_affected();
487 if wdpruned > 0 {
488 tracing::info!(
489 deleted = wdpruned,
490 "retention: pruned old webhook-delivery rows"
491 );
492 }
493
494 let gpruned = sqlx::query(
497 "DELETE FROM recording_gaps WHERE fill_state IN ('filled','failed') AND created_at < ?",
498 )
499 .bind(audit_cutoff)
500 .execute(pool)
501 .await?
502 .rows_affected();
503 if gpruned > 0 {
504 tracing::info!(
505 deleted = gpruned,
506 "retention: pruned resolved recording-gap rows"
507 );
508 }
509
510 if cfg.snapshot_retention_hours > 0 {
514 let snap_cutoff = Utc::now() - chrono::Duration::hours(cfg.snapshot_retention_hours);
515 let rows: Vec<(String, String)> =
516 sqlx::query_as("SELECT id, path FROM snapshots WHERE taken_at < ?")
517 .bind(snap_cutoff)
518 .fetch_all(pool)
519 .await?;
520 let mut snap_deleted: u64 = 0;
521 for (snap_id, path) in rows {
522 if unlink_snapshot(&path).await {
523 sqlx::query("DELETE FROM snapshots WHERE id = ?")
524 .bind(&snap_id)
525 .execute(pool)
526 .await?;
527 snap_deleted += 1;
528 }
529 }
530 if snap_deleted > 0 {
531 tracing::info!(deleted = snap_deleted, "retention: pruned old snapshots");
532 }
533 }
534
535 if cfg.archive_retention_hours > 0 {
539 let cutoff = Utc::now() - chrono::Duration::hours(cfg.archive_retention_hours);
540 if let Ok(mut entries) = tokio::fs::read_dir(&cfg.archive_dir).await {
541 let mut removed: u64 = 0;
542 while let Ok(Some(ent)) = entries.next_entry().await {
543 let path = ent.path();
544 if path.extension().and_then(|e| e.to_str()) != Some("zip") {
545 continue;
546 }
547 let stale = ent
548 .metadata()
549 .await
550 .ok()
551 .and_then(|m| m.modified().ok())
552 .map(|t| DateTime::<Utc>::from(t) < cutoff)
553 .unwrap_or(false);
554 if stale && tokio::fs::remove_file(&path).await.is_ok() {
555 removed += 1;
556 }
557 }
558 if removed > 0 {
559 tracing::info!(deleted = removed, "retention: pruned old archive exports");
560 }
561 }
562 let jpruned = sqlx::query(
563 "DELETE FROM backup_jobs WHERE finished_at IS NOT NULL AND finished_at < ?",
564 )
565 .bind(cutoff)
566 .execute(pool)
567 .await?
568 .rows_affected();
569 if jpruned > 0 {
570 tracing::info!(
571 deleted = jpruned,
572 "retention: pruned old finished backup jobs"
573 );
574 }
575 }
576 Ok(())
577}
578
579#[cfg(test)]
580mod tests {
581 use super::*;
582
583 fn unique_path(prefix: &str) -> std::path::PathBuf {
586 use std::sync::atomic::{AtomicU64, Ordering};
587 static N: AtomicU64 = AtomicU64::new(0);
588 let n = N.fetch_add(1, Ordering::Relaxed);
589 std::env::temp_dir().join(format!("{prefix}-{}-{n}", std::process::id()))
590 }
591
592 async fn mem_pool() -> SqlitePool {
593 let pool = sqlx::sqlite::SqlitePoolOptions::new()
594 .max_connections(1)
595 .connect("sqlite::memory:")
596 .await
597 .unwrap();
598 crate::db::run_migrations(&pool).await.unwrap();
599 pool
600 }
601
602 fn test_cfg() -> Config {
606 let mut cfg = Config::from_env();
607 cfg.max_recordings_bytes = u64::MAX / 4;
608 cfg.min_free_disk_bytes = 0;
609 cfg.recordings_dir = std::env::temp_dir();
610 cfg.snapshot_retention_hours = 0;
611 cfg.archive_retention_hours = 0;
612 cfg.detection_retention_hours = 168;
613 cfg.audit_retention_days = 365;
614 cfg
615 }
616
617 async fn insert_camera(pool: &SqlitePool, id: &str, retention_hours: i64, quota: Option<i64>) {
618 let now = Utc::now();
619 sqlx::query(
620 "INSERT INTO cameras (id, name, retention_hours, storage_quota_bytes, created_at, updated_at)
621 VALUES (?, ?, ?, ?, ?, ?)",
622 )
623 .bind(id)
624 .bind(id)
625 .bind(retention_hours)
626 .bind(quota)
627 .bind(now)
628 .bind(now)
629 .execute(pool)
630 .await
631 .unwrap();
632 }
633
634 async fn insert_segment(
635 pool: &SqlitePool,
636 id: &str,
637 camera_id: &str,
638 end: DateTime<Utc>,
639 size_bytes: i64,
640 locked: i64,
641 evidence_locked: i64,
642 ) {
643 sqlx::query(
644 "INSERT INTO segments
645 (id, camera_id, path, start_time, end_time, duration_s, size_bytes, locked, evidence_locked, created_at)
646 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
647 )
648 .bind(id)
649 .bind(camera_id)
650 .bind(format!("/nonexistent/heldar-test/{id}.mp4"))
652 .bind(end)
653 .bind(end)
654 .bind(60.0_f64)
655 .bind(size_bytes)
656 .bind(locked)
657 .bind(evidence_locked)
658 .bind(end)
659 .execute(pool)
660 .await
661 .unwrap();
662 }
663
664 async fn seg_exists(pool: &SqlitePool, id: &str) -> bool {
665 let c: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM segments WHERE id = ?")
666 .bind(id)
667 .fetch_one(pool)
668 .await
669 .unwrap();
670 c == 1
671 }
672
673 async fn seg_count(pool: &SqlitePool) -> i64 {
674 sqlx::query_scalar("SELECT COUNT(*) FROM segments")
675 .fetch_one(pool)
676 .await
677 .unwrap()
678 }
679
680 async fn event_type_count(pool: &SqlitePool, event_type: &str) -> i64 {
681 sqlx::query_scalar("SELECT COUNT(*) FROM events WHERE event_type = ?")
682 .bind(event_type)
683 .fetch_one(pool)
684 .await
685 .unwrap()
686 }
687
688 async fn camera_quota_event_count(pool: &SqlitePool) -> i64 {
689 sqlx::query_scalar(
690 "SELECT COUNT(*) FROM events WHERE event_type = 'disk_pressure' AND payload LIKE '%camera_quota%'",
691 )
692 .fetch_one(pool)
693 .await
694 .unwrap()
695 }
696
697 #[tokio::test]
700 async fn unlink_segment_reports_removable_for_missing_path() {
701 assert!(unlink_segment("/nonexistent/heldar/definitely-not-here.mp4").await);
703 }
704
705 #[tokio::test]
706 async fn unlink_segment_deletes_existing_file() {
707 let p = unique_path("heldar-seg");
708 tokio::fs::write(&p, b"x").await.unwrap();
709 assert!(p.exists());
710 assert!(unlink_segment(p.to_str().unwrap()).await);
711 assert!(!p.exists());
712 }
713
714 #[tokio::test]
715 async fn unlink_segment_keeps_row_for_directory() {
716 let d = unique_path("heldar-dir");
718 tokio::fs::create_dir(&d).await.unwrap();
719 assert!(!unlink_segment(d.to_str().unwrap()).await);
720 assert!(d.exists());
721 let _ = tokio::fs::remove_dir(&d).await;
722 }
723
724 #[tokio::test]
725 async fn unlink_snapshot_handles_missing_and_existing() {
726 assert!(unlink_snapshot("/nonexistent/heldar/none.jpg").await);
728 let p = unique_path("heldar-snap");
729 tokio::fs::write(&p, b"x").await.unwrap();
730 assert!(unlink_snapshot(p.to_str().unwrap()).await);
731 assert!(!p.exists());
732 }
733
734 #[tokio::test]
737 async fn sweep_age_retention_deletes_only_old_unlocked() {
738 let pool = mem_pool().await;
739 let cfg = test_cfg();
740 let now = Utc::now();
741
742 insert_camera(&pool, "cam_age", 24, None).await;
743 insert_segment(
745 &pool,
746 "seg_recent",
747 "cam_age",
748 now - chrono::Duration::hours(1),
749 100,
750 0,
751 0,
752 )
753 .await;
754 insert_segment(
756 &pool,
757 "seg_old",
758 "cam_age",
759 now - chrono::Duration::hours(48),
760 100,
761 0,
762 0,
763 )
764 .await;
765 insert_segment(
767 &pool,
768 "seg_old_locked",
769 "cam_age",
770 now - chrono::Duration::hours(48),
771 100,
772 1,
773 0,
774 )
775 .await;
776 insert_segment(
778 &pool,
779 "seg_old_ev",
780 "cam_age",
781 now - chrono::Duration::hours(48),
782 100,
783 0,
784 1,
785 )
786 .await;
787
788 sweep(&pool, &cfg).await.unwrap();
789
790 assert!(seg_exists(&pool, "seg_recent").await);
791 assert!(
792 !seg_exists(&pool, "seg_old").await,
793 "old unlocked segment should be pruned by age"
794 );
795 assert!(
796 seg_exists(&pool, "seg_old_locked").await,
797 "read-locked segment must survive age prune"
798 );
799 assert!(
800 seg_exists(&pool, "seg_old_ev").await,
801 "evidence-locked segment must survive age prune"
802 );
803 assert_eq!(seg_count(&pool).await, 3);
804 assert_eq!(event_type_count(&pool, "retention_delete").await, 1);
806 }
807
808 #[tokio::test]
811 async fn sweep_camera_quota_prunes_only_to_budget_keeps_evidence() {
812 let pool = mem_pool().await;
813 let cfg = test_cfg();
814 let now = Utc::now();
815
816 insert_camera(&pool, "cam_q", 100_000, Some(1000)).await;
818 insert_segment(
820 &pool,
821 "sL",
822 "cam_q",
823 now - chrono::Duration::hours(5),
824 600,
825 0,
826 1,
827 )
828 .await;
829 insert_segment(
831 &pool,
832 "s1",
833 "cam_q",
834 now - chrono::Duration::hours(3),
835 400,
836 0,
837 0,
838 )
839 .await;
840 insert_segment(
841 &pool,
842 "s2",
843 "cam_q",
844 now - chrono::Duration::hours(2),
845 400,
846 0,
847 0,
848 )
849 .await;
850 insert_segment(
851 &pool,
852 "s3",
853 "cam_q",
854 now - chrono::Duration::hours(1),
855 400,
856 0,
857 0,
858 )
859 .await;
860
861 sweep(&pool, &cfg).await.unwrap();
862
863 assert!(
867 seg_exists(&pool, "sL").await,
868 "evidence-locked footage must survive the quota prune"
869 );
870 assert!(
871 !seg_exists(&pool, "s1").await,
872 "oldest over-budget segment is pruned"
873 );
874 assert!(
875 !seg_exists(&pool, "s2").await,
876 "second-oldest pruned to reach budget"
877 );
878 assert!(
879 seg_exists(&pool, "s3").await,
880 "s3 is within quota once s1+s2 are gone and must NOT be over-deleted"
881 );
882 assert_eq!(
883 seg_count(&pool).await,
884 2,
885 "only s1,s2 pruned to reach budget; sL+s3 remain"
886 );
887 assert!(
888 camera_quota_event_count(&pool).await >= 1,
889 "a camera_quota disk_pressure event should be logged"
890 );
891 }
892
893 #[tokio::test]
894 async fn delete_segment_if_unlocked_spares_locked_rows() {
895 let pool = mem_pool().await;
899 let now = Utc::now();
900 insert_camera(&pool, "cam_t", 100_000, None).await;
901 insert_segment(&pool, "held", "cam_t", now, 100, 0, 1).await; insert_segment(&pool, "rlok", "cam_t", now, 100, 1, 0).await; insert_segment(&pool, "free", "cam_t", now, 100, 0, 0).await; assert!(
906 !delete_segment_if_unlocked(&pool, "held", "/nonexistent/held.mp4")
907 .await
908 .unwrap(),
909 "evidence-locked row must not be removable"
910 );
911 assert!(
912 !delete_segment_if_unlocked(&pool, "rlok", "/nonexistent/rlok.mp4")
913 .await
914 .unwrap(),
915 "read-locked row must not be removable"
916 );
917 assert!(seg_exists(&pool, "held").await);
918 assert!(seg_exists(&pool, "rlok").await);
919
920 assert!(
921 delete_segment_if_unlocked(&pool, "free", "/nonexistent/free.mp4")
922 .await
923 .unwrap(),
924 "unlocked row is removed"
925 );
926 assert!(!seg_exists(&pool, "free").await);
927 }
928
929 #[tokio::test]
930 async fn sweep_camera_quota_protected_exceeds_deletes_nothing() {
931 let pool = mem_pool().await;
932 let cfg = test_cfg();
933 let now = Utc::now();
934
935 insert_camera(&pool, "cam_over", 100_000, Some(100)).await;
938 insert_segment(
939 &pool,
940 "ovL",
941 "cam_over",
942 now - chrono::Duration::hours(5),
943 500,
944 0,
945 1,
946 )
947 .await;
948 insert_segment(
949 &pool,
950 "ov1",
951 "cam_over",
952 now - chrono::Duration::hours(1),
953 50,
954 0,
955 0,
956 )
957 .await;
958
959 sweep(&pool, &cfg).await.unwrap();
960
961 assert!(seg_exists(&pool, "ovL").await);
962 assert!(
963 seg_exists(&pool, "ov1").await,
964 "other footage must not be wiped when protected footage exceeds the quota"
965 );
966 assert_eq!(seg_count(&pool).await, 2);
967 assert!(
968 camera_quota_event_count(&pool).await >= 1,
969 "a camera_quota warning should be logged"
970 );
971 }
972
973 #[tokio::test]
976 async fn sweep_prunes_old_detections() {
977 let pool = mem_pool().await;
978 let cfg = test_cfg(); insert_camera(&pool, "cam_d", 24, None).await;
981 let now = Utc::now();
982 sqlx::query(
984 "INSERT INTO detections (id, camera_id, task_type, timestamp, created_at) VALUES (?, ?, ?, ?, ?)",
985 )
986 .bind("det_old")
987 .bind("cam_d")
988 .bind("object")
989 .bind(now - chrono::Duration::hours(200))
990 .bind(now - chrono::Duration::hours(200))
991 .execute(&pool)
992 .await
993 .unwrap();
994 sqlx::query(
996 "INSERT INTO detections (id, camera_id, task_type, timestamp, created_at) VALUES (?, ?, ?, ?, ?)",
997 )
998 .bind("det_new")
999 .bind("cam_d")
1000 .bind("object")
1001 .bind(now - chrono::Duration::hours(1))
1002 .bind(now - chrono::Duration::hours(1))
1003 .execute(&pool)
1004 .await
1005 .unwrap();
1006
1007 sweep(&pool, &cfg).await.unwrap();
1008
1009 let remaining: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM detections")
1010 .fetch_one(&pool)
1011 .await
1012 .unwrap();
1013 assert_eq!(remaining, 1);
1014 let kept: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM detections WHERE id = ?")
1015 .bind("det_new")
1016 .fetch_one(&pool)
1017 .await
1018 .unwrap();
1019 assert_eq!(kept, 1, "the recent detection must be retained");
1020 }
1021}