Skip to main content

heldar_kernel/services/
clip.rs

1//! Evidence clip export: concatenates the segments overlapping a time range and trims to
2//! the requested window with `-c copy` (no re-encode). Keyframe-aligned (Stage 0 precision).
3
4use std::process::Stdio;
5use std::time::Duration;
6
7use chrono::{DateTime, Utc};
8use serde::Serialize;
9use tokio::process::Command;
10use uuid::Uuid;
11
12use crate::error::{AppError, AppResult};
13use crate::models::Segment;
14use crate::state::AppState;
15
16const MAX_CLIP_SECONDS: f64 = 3600.0;
17
18#[derive(Debug, Serialize)]
19pub struct ClipResult {
20    pub id: String,
21    pub camera_id: String,
22    pub filename: String,
23    pub url: String,
24    pub from: DateTime<Utc>,
25    pub to: DateTime<Utc>,
26    pub requested_seconds: f64,
27    /// Seconds of the requested window for which footage actually exists. Equals `requested_seconds`
28    /// for a fully-covered clip; less when the window spans recording gaps.
29    pub covered_seconds: f64,
30    /// Recording gaps WITHIN the requested window. The concat output bridges these (the footage does
31    /// not exist), so they are reported here rather than silently presented as continuous video.
32    pub gaps: Vec<ClipGap>,
33    pub size_bytes: u64,
34    pub segment_count: usize,
35}
36
37/// A span within a requested clip window for which no recorded footage exists.
38#[derive(Debug, Serialize)]
39pub struct ClipGap {
40    pub from: DateTime<Utc>,
41    pub to: DateTime<Utc>,
42}
43
44/// Tolerance for sub-second seams between adjacent segments — below this, abutting segments are
45/// treated as continuous (not a gap).
46const GAP_TOLERANCE_MS: i64 = 1000;
47
48/// Compute covered seconds + the recording gaps within `[from, to]` from the (start-ordered)
49/// overlapping segments. A gap is a span longer than [`GAP_TOLERANCE_MS`] with no footage.
50fn coverage_and_gaps(
51    segments: &[Segment],
52    from: DateTime<Utc>,
53    to: DateTime<Utc>,
54) -> (f64, Vec<ClipGap>) {
55    let mut gaps = Vec::new();
56    let mut cursor = from;
57    for s in segments {
58        let cs = s.start_time.max(from);
59        let ce = s.end_time.min(to);
60        if ce <= cs {
61            continue;
62        }
63        if (cs - cursor).num_milliseconds() > GAP_TOLERANCE_MS {
64            gaps.push(ClipGap {
65                from: cursor,
66                to: cs,
67            });
68        }
69        if ce > cursor {
70            cursor = ce;
71        }
72    }
73    if (to - cursor).num_milliseconds() > GAP_TOLERANCE_MS {
74        gaps.push(ClipGap { from: cursor, to });
75    }
76    let requested = (to - from).num_milliseconds() as f64 / 1000.0;
77    let gap_secs: f64 = gaps
78        .iter()
79        .map(|g| (g.to - g.from).num_milliseconds() as f64 / 1000.0)
80        .sum();
81    ((requested - gap_secs).max(0.0), gaps)
82}
83
84pub async fn export_clip(
85    state: &AppState,
86    camera_id: &str,
87    from: DateTime<Utc>,
88    to: DateTime<Utc>,
89) -> AppResult<ClipResult> {
90    if to <= from {
91        return Err(AppError::BadRequest("`to` must be after `from`".into()));
92    }
93    let requested = (to - from).num_milliseconds() as f64 / 1000.0;
94    if requested > MAX_CLIP_SECONDS {
95        return Err(AppError::BadRequest(format!(
96            "clip too long ({requested:.0}s); max {MAX_CLIP_SECONDS:.0}s"
97        )));
98    }
99
100    let camera_exists: Option<(String,)> = sqlx::query_as("SELECT id FROM cameras WHERE id = ?")
101        .bind(camera_id)
102        .fetch_optional(&state.pool)
103        .await?;
104    if camera_exists.is_none() {
105        return Err(AppError::NotFound(format!("camera {camera_id} not found")));
106    }
107
108    let segments: Vec<Segment> = sqlx::query_as::<_, Segment>(
109        "SELECT * FROM segments
110         WHERE camera_id = ? AND start_time < ? AND end_time > ?
111         ORDER BY start_time ASC",
112    )
113    .bind(camera_id)
114    .bind(to)
115    .bind(from)
116    .fetch_all(&state.pool)
117    .await?;
118    if segments.is_empty() {
119        return Err(AppError::NotFound(
120            "no recorded footage in the requested range".into(),
121        ));
122    }
123
124    tokio::fs::create_dir_all(&state.cfg.clips_dir)
125        .await
126        .map_err(|e| AppError::Other(e.into()))?;
127
128    let id = format!("clip_{}", Uuid::new_v4().simple());
129    let filename = format!("{id}.mp4");
130    let out_path = state.cfg.clips_dir.join(&filename);
131    let list_path = state.cfg.clips_dir.join(format!("{id}.txt"));
132
133    // Read-lock the source segments so the retention sweeper can't delete them out from under ffmpeg
134    // mid-export (TOCTOU). The RAII guard releases on EVERY outcome — normal return, `?` error,
135    // timeout, AND cancellation/panic (where a manual unlock would be skipped, leaking the lock).
136    let seg_ids: Vec<String> = segments.iter().map(|s| s.id.clone()).collect();
137    let _read_lock = crate::repo::SegReadLock::acquire(&state.pool, seg_ids).await;
138
139    let size_outcome: AppResult<u64> = async {
140        let mut list = String::new();
141        for s in &segments {
142            let escaped = s.path.replace('\'', "'\\''");
143            list.push_str(&format!("file '{escaped}'\n"));
144        }
145        tokio::fs::write(&list_path, list)
146            .await
147            .map_err(|e| AppError::Other(e.into()))?;
148
149        let first_start = segments[0].start_time;
150        let ss = ((from - first_start).num_milliseconds() as f64 / 1000.0).max(0.0);
151
152        let mut cmd = Command::new(&state.cfg.ffmpeg_bin);
153        cmd.kill_on_drop(true)
154            .args([
155                "-hide_banner",
156                "-loglevel",
157                "error",
158                "-f",
159                "concat",
160                "-safe",
161                "0",
162            ])
163            .arg("-i")
164            .arg(&list_path)
165            .args(["-ss", &format!("{ss:.3}")])
166            .args(["-t", &format!("{requested:.3}")])
167            .args([
168                "-c",
169                "copy",
170                "-avoid_negative_ts",
171                "make_zero",
172                "-movflags",
173                "+faststart",
174            ])
175            .arg(&out_path)
176            .stdin(Stdio::null())
177            .stdout(Stdio::null())
178            .stderr(Stdio::piped());
179
180        // Remux of even an hour of footage is fast; bound it so a hung/cancelled job can't wedge the
181        // request or orphan ffmpeg (kill_on_drop kills the child when the timed-out future is dropped).
182        let result = tokio::time::timeout(Duration::from_secs(180), cmd.output()).await;
183        // Always remove the temp concat list, on every outcome.
184        let _ = tokio::fs::remove_file(&list_path).await;
185
186        let out = match result {
187            Err(_) => {
188                let _ = tokio::fs::remove_file(&out_path).await;
189                return Err(AppError::Other(anyhow::anyhow!("clip export timed out")));
190            }
191            Ok(Err(e)) => {
192                let _ = tokio::fs::remove_file(&out_path).await;
193                return Err(AppError::Other(e.into()));
194            }
195            Ok(Ok(out)) => out,
196        };
197
198        if !out.status.success() {
199            let _ = tokio::fs::remove_file(&out_path).await;
200            return Err(AppError::Other(anyhow::anyhow!(
201                "ffmpeg clip export failed: {}",
202                String::from_utf8_lossy(&out.stderr).trim()
203            )));
204        }
205
206        Ok(tokio::fs::metadata(&out_path)
207            .await
208            .map(|m| m.len())
209            .unwrap_or(0))
210    }
211    .await;
212
213    // `_read_lock` releases on drop (here or on any early return above). Surface any export error.
214    let size_bytes = size_outcome?;
215
216    // Report coverage honestly: the concat bridges any recording gaps in the window (that footage
217    // does not exist), so disclose them rather than presenting bridged video as continuous.
218    let (covered_seconds, gaps) = coverage_and_gaps(&segments, from, to);
219
220    Ok(ClipResult {
221        id,
222        camera_id: camera_id.to_string(),
223        url: format!("/media/clips/{filename}"),
224        filename,
225        from,
226        to,
227        requested_seconds: requested,
228        covered_seconds,
229        gaps,
230        size_bytes,
231        segment_count: segments.len(),
232    })
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238
239    fn seg(start: DateTime<Utc>, end: DateTime<Utc>) -> Segment {
240        Segment {
241            id: "s".into(),
242            camera_id: "c".into(),
243            path: "/x.mp4".into(),
244            start_time: start,
245            end_time: end,
246            duration_s: (end - start).num_milliseconds() as f64 / 1000.0,
247            codec: None,
248            width: None,
249            height: None,
250            size_bytes: 0,
251            container: "mp4".into(),
252            locked: false,
253            evidence_locked: false,
254            incident_id: None,
255            created_at: start,
256        }
257    }
258
259    #[test]
260    fn coverage_and_gaps_detects_interior_and_trailing_gaps() {
261        let t0 = "2026-06-18T00:00:00Z".parse::<DateTime<Utc>>().unwrap();
262        let m = |secs: i64| t0 + chrono::Duration::seconds(secs);
263        // Window [0,100]; footage [0,30] and [60,90]. Gaps: interior [30,60] + trailing [90,100].
264        let segs = vec![seg(m(0), m(30)), seg(m(60), m(90))];
265        let (covered, gaps) = coverage_and_gaps(&segs, m(0), m(100));
266        assert_eq!(gaps.len(), 2, "{gaps:?}");
267        assert_eq!((gaps[0].from, gaps[0].to), (m(30), m(60)));
268        assert_eq!((gaps[1].from, gaps[1].to), (m(90), m(100)));
269        assert!((covered - 60.0).abs() < 0.01, "covered={covered}");
270    }
271
272    #[test]
273    fn coverage_and_gaps_tolerates_subsecond_seam() {
274        let t0 = "2026-06-18T00:00:00Z".parse::<DateTime<Utc>>().unwrap();
275        let m = |secs: i64| t0 + chrono::Duration::seconds(secs);
276        // A 0.5s seam (< tolerance) between abutting segments is NOT reported as a gap.
277        let segs = vec![
278            seg(m(0), m(50)),
279            seg(t0 + chrono::Duration::milliseconds(50_500), m(100)),
280        ];
281        let (covered, gaps) = coverage_and_gaps(&segs, m(0), m(100));
282        assert!(
283            gaps.is_empty(),
284            "sub-second seam must not be a gap: {gaps:?}"
285        );
286        assert!(covered > 99.0, "covered={covered}");
287    }
288
289    async fn test_state() -> AppState {
290        let pool = sqlx::sqlite::SqlitePoolOptions::new()
291            .max_connections(1)
292            .connect("sqlite::memory:")
293            .await
294            .unwrap();
295        crate::db::run_migrations(&pool).await.unwrap();
296        let cfg = std::sync::Arc::new(crate::config::Config::from_env());
297        AppState {
298            recorder: crate::services::recorder::RecorderManager::new(pool.clone(), cfg.clone()),
299            sampler: crate::services::sampler::SamplerManager::new(pool.clone(), cfg.clone()),
300            mirror: None,
301            consumers: std::sync::Arc::new(Vec::new()),
302            modules: std::sync::Arc::new(Vec::new()),
303            catalog: std::sync::Arc::new(crate::services::registry::CatalogService::new(&cfg)),
304            http: reqwest::Client::new(),
305            started_at: Utc::now(),
306            pool,
307            cfg,
308        }
309    }
310
311    async fn insert_camera(pool: &sqlx::SqlitePool, id: &str) {
312        let now = Utc::now();
313        sqlx::query("INSERT INTO cameras (id, name, created_at, updated_at) VALUES (?, ?, ?, ?)")
314            .bind(id)
315            .bind(format!("Camera {id}"))
316            .bind(now)
317            .bind(now)
318            .execute(pool)
319            .await
320            .unwrap();
321    }
322
323    #[tokio::test]
324    async fn rejects_to_before_from() {
325        let state = test_state().await;
326        let from = Utc::now();
327        let to = from - chrono::Duration::seconds(5);
328        match export_clip(&state, "anycam", from, to).await {
329            Err(AppError::BadRequest(msg)) => assert_eq!(msg, "`to` must be after `from`"),
330            other => panic!("expected BadRequest, got {other:?}"),
331        }
332    }
333
334    #[tokio::test]
335    async fn rejects_to_equal_from() {
336        // `to <= from` covers the equality boundary.
337        let state = test_state().await;
338        let from = Utc::now();
339        match export_clip(&state, "anycam", from, from).await {
340            Err(AppError::BadRequest(msg)) => assert_eq!(msg, "`to` must be after `from`"),
341            other => panic!("expected BadRequest, got {other:?}"),
342        }
343    }
344
345    #[tokio::test]
346    async fn rejects_clip_exceeding_max_length() {
347        // 3601s exceeds the 3600s cap; rejected before any DB lookup, so no camera is needed.
348        let state = test_state().await;
349        let from = Utc::now();
350        let to = from + chrono::Duration::seconds(3601);
351        match export_clip(&state, "anycam", from, to).await {
352            Err(AppError::BadRequest(msg)) => {
353                assert!(msg.contains("clip too long"), "msg was: {msg}");
354                assert!(msg.contains("3601s"), "msg was: {msg}");
355                assert!(msg.contains("3600s"), "msg was: {msg}");
356            }
357            other => panic!("expected BadRequest, got {other:?}"),
358        }
359    }
360
361    #[tokio::test]
362    async fn max_length_boundary_passes_length_check() {
363        // Exactly MAX_CLIP_SECONDS (3600s) is allowed (the guard uses a strict `>`), so validation
364        // falls through to the camera lookup instead of returning the length error.
365        let state = test_state().await;
366        let from = Utc::now();
367        let to = from + chrono::Duration::seconds(3600);
368        match export_clip(&state, "cam_boundary", from, to).await {
369            Err(AppError::NotFound(msg)) => assert_eq!(msg, "camera cam_boundary not found"),
370            other => {
371                panic!(
372                    "expected NotFound (length check should pass at the boundary), got {other:?}"
373                )
374            }
375        }
376    }
377
378    #[tokio::test]
379    async fn unknown_camera_is_not_found() {
380        let state = test_state().await;
381        let from = Utc::now();
382        let to = from + chrono::Duration::seconds(60);
383        match export_clip(&state, "ghost", from, to).await {
384            Err(AppError::NotFound(msg)) => assert_eq!(msg, "camera ghost not found"),
385            other => panic!("expected NotFound, got {other:?}"),
386        }
387    }
388
389    #[tokio::test]
390    async fn existing_camera_without_segments_is_not_found() {
391        let state = test_state().await;
392        insert_camera(&state.pool, "cam_empty").await;
393        let from = Utc::now();
394        let to = from + chrono::Duration::seconds(60);
395        match export_clip(&state, "cam_empty", from, to).await {
396            Err(AppError::NotFound(msg)) => {
397                assert_eq!(msg, "no recorded footage in the requested range")
398            }
399            other => panic!("expected NotFound, got {other:?}"),
400        }
401    }
402}