1use std::sync::Arc;
5use std::time::Duration;
6
7use chrono::Utc;
8use serde_json::json;
9use sqlx::SqlitePool;
10
11use crate::config::Config;
12use crate::repo;
13use crate::services::storage;
14
15async fn unlink_segment(path: &str) -> bool {
20 match tokio::fs::remove_file(path).await {
21 Ok(()) => true,
22 Err(e) if e.kind() == std::io::ErrorKind::NotFound => true,
23 Err(e) => {
24 tracing::error!(path, error = %e, "retention: failed to delete segment file; keeping DB row to retry next sweep");
25 false
26 }
27 }
28}
29
30pub async fn run(pool: SqlitePool, cfg: Arc<Config>) {
31 let mut tick = tokio::time::interval(Duration::from_secs(cfg.retention_interval_s.max(30)));
32 loop {
33 tick.tick().await;
34 if let Err(e) = sweep(&pool, &cfg).await {
35 tracing::error!(error = %e, "retention: sweep failed");
36 }
37 }
38}
39
40async fn sweep(pool: &SqlitePool, cfg: &Config) -> anyhow::Result<()> {
41 let mut age_deleted: u64 = 0;
43 let cams: Vec<(String, i64)> = sqlx::query_as("SELECT id, retention_hours FROM cameras")
44 .fetch_all(pool)
45 .await?;
46 for (id, hours) in cams {
47 let cutoff = Utc::now() - chrono::Duration::hours(hours.max(1));
48 let rows: Vec<(String, String)> = sqlx::query_as(
49 "SELECT id, path FROM segments WHERE camera_id = ? AND locked = 0 AND end_time < ?",
50 )
51 .bind(&id)
52 .bind(cutoff)
53 .fetch_all(pool)
54 .await?;
55 for (seg_id, path) in rows {
56 if unlink_segment(&path).await {
57 sqlx::query("DELETE FROM segments WHERE id = ?")
58 .bind(&seg_id)
59 .execute(pool)
60 .await?;
61 age_deleted += 1;
62 }
63 }
64 }
65 if age_deleted > 0 {
66 let _ = repo::log_event(
67 pool,
68 None,
69 "retention_delete",
70 "info",
71 json!({ "deleted": age_deleted, "reason": "age" }),
72 )
73 .await;
74 tracing::info!(deleted = age_deleted, "retention: age-based cleanup");
75 }
76
77 let max = cfg.max_recordings_bytes as i64;
81 let locked_bytes: i64 =
82 sqlx::query_scalar("SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 1")
83 .fetch_one(pool)
84 .await?;
85 let budget = max - locked_bytes;
86 let mut size_deleted: u64 = 0;
87
88 if budget <= 0 {
89 let unlocked: i64 = sqlx::query_scalar(
92 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0",
93 )
94 .fetch_one(pool)
95 .await?;
96 if locked_bytes > max {
97 tracing::warn!(
98 locked_bytes,
99 max,
100 "retention: locked (evidence) footage exceeds the size cap; not deleting unlocked footage"
101 );
102 let _ = repo::log_event(
103 pool,
104 None,
105 "disk_pressure",
106 "warning",
107 json!({ "reason": "locked_exceeds_cap", "locked_bytes": locked_bytes, "unlocked_bytes": unlocked, "max_bytes": max }),
108 )
109 .await;
110 }
111 } else {
112 loop {
113 let unlocked_total: i64 = sqlx::query_scalar(
114 "SELECT COALESCE(SUM(size_bytes), 0) FROM segments WHERE locked = 0",
115 )
116 .fetch_one(pool)
117 .await?;
118 if unlocked_total <= budget {
119 break;
120 }
121 let batch: Vec<(String, String)> = sqlx::query_as(
122 "SELECT id, path FROM segments WHERE locked = 0 ORDER BY end_time ASC LIMIT 20",
123 )
124 .fetch_all(pool)
125 .await?;
126 if batch.is_empty() {
127 break;
128 }
129 let mut progressed = 0u64;
130 for (seg_id, path) in batch {
131 if unlink_segment(&path).await {
132 sqlx::query("DELETE FROM segments WHERE id = ?")
133 .bind(&seg_id)
134 .execute(pool)
135 .await?;
136 size_deleted += 1;
137 progressed += 1;
138 }
139 }
140 if progressed == 0 {
141 tracing::error!("retention: size-cap prune made no progress (segment file deletes failing); stopping this sweep");
143 break;
144 }
145 }
146 }
147
148 if size_deleted > 0 {
149 let _ = repo::log_event(
150 pool,
151 None,
152 "disk_pressure",
153 "warning",
154 json!({ "deleted": size_deleted, "reason": "size_cap", "max_bytes": max }),
155 )
156 .await;
157 tracing::warn!(deleted = size_deleted, "retention: size-cap cleanup");
158 }
159
160 let floor = cfg.min_free_disk_bytes;
165 let mut disk_deleted: u64 = 0;
166 match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
167 None => {
168 tracing::warn!(
169 "retention: could not read disk stats; free-floor check skipped this sweep"
170 );
171 let _ = repo::log_event(
172 pool,
173 None,
174 "disk_pressure",
175 "warning",
176 json!({ "reason": "disk_stats_unavailable" }),
177 )
178 .await;
179 }
180 Some(d) if floor >= d.total_bytes => {
181 if d.free_bytes < floor {
182 tracing::warn!(
183 floor,
184 total = d.total_bytes,
185 "retention: free-disk floor exceeds total disk size; refusing to prune (misconfigured?)"
186 );
187 let _ = repo::log_event(
188 pool,
189 None,
190 "disk_pressure",
191 "critical",
192 json!({ "reason": "floor_unsatisfiable", "min_free_bytes": floor, "total_bytes": d.total_bytes }),
193 )
194 .await;
195 }
196 }
197 Some(mut prev) => {
198 let mut guard = 0;
199 let mut futile = false;
200 while prev.free_bytes < floor && guard < 200 {
201 guard += 1;
202 let before = prev.free_bytes;
203 let batch: Vec<(String, String)> = sqlx::query_as(
204 "SELECT id, path FROM segments WHERE locked = 0 ORDER BY end_time ASC LIMIT 20",
205 )
206 .fetch_all(pool)
207 .await?;
208 if batch.is_empty() {
209 tracing::warn!(
210 free_bytes = before,
211 floor,
212 "retention: below disk-free floor but no unlocked segments remain to prune"
213 );
214 break;
215 }
216 for (seg_id, path) in batch {
217 if unlink_segment(&path).await {
218 sqlx::query("DELETE FROM segments WHERE id = ?")
219 .bind(&seg_id)
220 .execute(pool)
221 .await?;
222 disk_deleted += 1;
223 }
224 }
225 match storage::disk_stats_async(cfg.recordings_dir.clone()).await {
226 Some(d) if d.free_bytes > before => prev = d,
227 Some(_) => {
228 futile = true;
229 break;
230 }
231 None => break,
232 }
233 }
234 if futile {
235 tracing::error!(
236 free_bytes = prev.free_bytes,
237 floor,
238 "retention: pruning recordings is not recovering free space (disk filled by non-recording data?); stopping"
239 );
240 let _ = repo::log_event(
241 pool,
242 None,
243 "disk_pressure",
244 "critical",
245 json!({ "reason": "prune_not_recovering_space", "min_free_bytes": floor, "deleted": disk_deleted }),
246 )
247 .await;
248 }
249 }
250 }
251 if disk_deleted > 0 {
252 let _ = repo::log_event(
253 pool,
254 None,
255 "disk_pressure",
256 "critical",
257 json!({ "deleted": disk_deleted, "reason": "free_floor", "min_free_bytes": floor }),
258 )
259 .await;
260 tracing::warn!(deleted = disk_deleted, "retention: disk-free-floor cleanup");
261 }
262
263 let det_cutoff = Utc::now() - chrono::Duration::hours(cfg.detection_retention_hours.max(1));
265 let pruned = sqlx::query("DELETE FROM detections WHERE created_at < ?")
266 .bind(det_cutoff)
267 .execute(pool)
268 .await?
269 .rows_affected();
270 if pruned > 0 {
271 tracing::info!(deleted = pruned, "retention: pruned old detections");
272 }
273 let ob_pruned = sqlx::query("DELETE FROM outbox WHERE created_at < ?")
275 .bind(det_cutoff)
276 .execute(pool)
277 .await?
278 .rows_affected();
279 if ob_pruned > 0 {
280 tracing::info!(deleted = ob_pruned, "retention: pruned old outbox rows");
281 }
282
283 let old_zone_events: Vec<(String, Option<String>)> =
285 sqlx::query_as("SELECT id, evidence_path FROM zone_events WHERE created_at < ?")
286 .bind(det_cutoff)
287 .fetch_all(pool)
288 .await?;
289 if !old_zone_events.is_empty() {
290 for (_id, evidence) in &old_zone_events {
291 if let Some(name) = evidence.as_deref().and_then(|u| u.rsplit('/').next()) {
292 let _ = tokio::fs::remove_file(cfg.snapshots_dir.join(name)).await;
293 }
294 }
295 let zpruned = sqlx::query("DELETE FROM zone_events WHERE created_at < ?")
296 .bind(det_cutoff)
297 .execute(pool)
298 .await?
299 .rows_affected();
300 tracing::info!(
301 deleted = zpruned,
302 "retention: pruned old zone events + evidence"
303 );
304 }
305
306 let audit_cutoff = Utc::now() - chrono::Duration::days(cfg.audit_retention_days.max(1));
309 let apruned = sqlx::query("DELETE FROM audit_log WHERE created_at < ?")
310 .bind(audit_cutoff)
311 .execute(pool)
312 .await?
313 .rows_affected();
314 if apruned > 0 {
315 tracing::info!(deleted = apruned, "retention: pruned old audit log entries");
316 }
317 let spruned = sqlx::query("DELETE FROM sessions WHERE expires_at < ?")
318 .bind(Utc::now())
319 .execute(pool)
320 .await?
321 .rows_affected();
322 if spruned > 0 {
323 tracing::debug!(deleted = spruned, "retention: pruned expired sessions");
324 }
325
326 let evpruned = sqlx::query("DELETE FROM events WHERE created_at < ?")
331 .bind(audit_cutoff)
332 .execute(pool)
333 .await?
334 .rows_affected();
335 if evpruned > 0 {
336 tracing::info!(deleted = evpruned, "retention: pruned old event-log rows");
337 }
338 Ok(())
339}