Skip to main content

heldar_kernel/services/
snapshot.rs

1//! Snapshot extraction: a single JPEG frame either from recorded footage at a timestamp,
2//! or live from the camera stream right now.
3
4use std::process::Stdio;
5use std::time::Duration;
6
7use chrono::{DateTime, Utc};
8use tokio::process::Command;
9
10use crate::camera_url;
11use crate::error::{AppError, AppResult};
12use crate::models::{Camera, Segment};
13use crate::state::AppState;
14
15/// Extract one frame from recorded footage at `at`.
16pub async fn snapshot_at(
17    state: &AppState,
18    camera_id: &str,
19    at: DateTime<Utc>,
20) -> AppResult<Vec<u8>> {
21    let seg: Option<Segment> = sqlx::query_as::<_, Segment>(
22        "SELECT * FROM segments
23         WHERE camera_id = ? AND start_time <= ? AND end_time >= ?
24         ORDER BY start_time DESC LIMIT 1",
25    )
26    .bind(camera_id)
27    .bind(at)
28    .bind(at)
29    .fetch_optional(&state.pool)
30    .await?;
31    let seg = seg.ok_or_else(|| AppError::NotFound("no footage at that timestamp".into()))?;
32
33    // Read-lock the source segment so retention can't delete it out from under ffmpeg (TOCTOU).
34    let seg_ids = vec![seg.id.clone()];
35    crate::repo::set_segments_locked(&state.pool, &seg_ids, true).await;
36
37    let outcome: AppResult<Vec<u8>> = async {
38        let offset = ((at - seg.start_time).num_milliseconds() as f64 / 1000.0).max(0.0);
39        let mut cmd = Command::new(&state.cfg.ffmpeg_bin);
40        cmd.kill_on_drop(true)
41            .args(["-hide_banner", "-loglevel", "error"])
42            .args(["-ss", &format!("{offset:.3}")])
43            .arg("-i")
44            .arg(&seg.path)
45            .args([
46                "-frames:v",
47                "1",
48                "-q:v",
49                "3",
50                "-f",
51                "image2",
52                "-c:v",
53                "mjpeg",
54                "pipe:1",
55            ])
56            .stdin(Stdio::null())
57            .stdout(Stdio::piped())
58            .stderr(Stdio::piped());
59
60        let out = tokio::time::timeout(Duration::from_secs(20), cmd.output())
61            .await
62            .map_err(|_| AppError::Other(anyhow::anyhow!("snapshot timed out")))?
63            .map_err(|e| AppError::Other(e.into()))?;
64
65        if !out.status.success() || out.stdout.is_empty() {
66            return Err(AppError::Other(anyhow::anyhow!(
67                "snapshot failed: {}",
68                String::from_utf8_lossy(&out.stderr).trim()
69            )));
70        }
71        Ok(out.stdout)
72    }
73    .await;
74
75    crate::repo::set_segments_locked(&state.pool, &seg_ids, false).await;
76    outcome
77}
78
79/// Grab one frame live from the camera stream (sub-stream preferred).
80pub async fn snapshot_live(state: &AppState, camera_id: &str) -> AppResult<Vec<u8>> {
81    snapshot_live_raw(state, camera_id).await
82}
83
84/// Reusable inner that captures one live frame and returns the raw JPEG bytes. Backs the public
85/// [`snapshot_live`] handler path and is reused by the snapshot scheduler. Identical behaviour;
86/// kept as a distinct entry point so background callers don't depend on the route-facing wrapper.
87pub async fn snapshot_live_raw(state: &AppState, camera_id: &str) -> AppResult<Vec<u8>> {
88    let cam: Option<Camera> = sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
89        .bind(camera_id)
90        .fetch_optional(&state.pool)
91        .await?;
92    let cam = cam.ok_or_else(|| AppError::NotFound(format!("camera {camera_id} not found")))?;
93    let url = camera_url::stream_url(&cam, "sub")
94        .or_else(|| camera_url::record_url(&cam))
95        .ok_or_else(|| AppError::BadRequest("camera has no stream URL".into()))?;
96
97    let mut cmd = Command::new(&state.cfg.ffmpeg_bin);
98    cmd.kill_on_drop(true)
99        .args([
100            "-hide_banner",
101            "-loglevel",
102            "error",
103            "-rtsp_transport",
104            "tcp",
105            "-timeout",
106            "10000000",
107        ])
108        .arg("-i")
109        .arg(&url)
110        .args([
111            "-frames:v",
112            "1",
113            "-q:v",
114            "3",
115            "-f",
116            "image2",
117            "-c:v",
118            "mjpeg",
119            "pipe:1",
120        ])
121        .stdin(Stdio::null())
122        .stdout(Stdio::piped())
123        .stderr(Stdio::piped());
124
125    let out = tokio::time::timeout(Duration::from_secs(20), cmd.output())
126        .await
127        .map_err(|_| AppError::Other(anyhow::anyhow!("live snapshot timed out")))?
128        .map_err(|e| AppError::Other(e.into()))?;
129
130    if !out.status.success() || out.stdout.is_empty() {
131        // Mask credentials that ffmpeg echoes back in the RTSP URL on failure.
132        let stderr = String::from_utf8_lossy(&out.stderr);
133        return Err(AppError::Other(anyhow::anyhow!(
134            "live snapshot failed: {}",
135            camera_url::mask_url(stderr.trim())
136        )));
137    }
138    Ok(out.stdout)
139}