Skip to main content

heldar_kernel/services/
playback_session.rs

1//! Segment-spanning HLS playback sessions: interactive review of recorded footage over a time range,
2//! with native seek. A session concatenates the segments overlapping the requested window, trims to
3//! the exact offsets, and remuxes (`-c copy`, no re-encode) into a self-contained HLS VOD playlist
4//! under `playback_dir/{session_id}/`. HLS players seek freely within a VOD playlist.
5//!
6//! The source segments are read-locked (`repo::set_segments_locked`) for the session lifetime so the
7//! retention sweeper cannot prune footage that is being reviewed; the lock is released when the
8//! session is deleted or expires. Sessions are tracked entirely on the filesystem (a `session.json`
9//! per dir) so the background cleanup sweeper needs no shared in-memory state — and a crash leaves no
10//! dangling locks (startup [`crate::db::clear_segment_read_locks`] clears every transient read-lock).
11
12use std::path::Path as StdPath;
13use std::process::Stdio;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use tokio::process::Command;
19use uuid::Uuid;
20
21use crate::error::{AppError, AppResult};
22use crate::models::Segment;
23use crate::state::AppState;
24
25/// Bound a single remux so a hung/cancelled job cannot wedge the request or orphan ffmpeg
26/// (kill_on_drop kills the child when the timed-out future is dropped). Stream-copy of even a couple
27/// of hours of footage is fast; this is a generous backstop.
28const BUILD_TIMEOUT: Duration = Duration::from_secs(300);
29
30/// How often the cleanup sweeper looks for expired sessions to remove (seconds).
31const CLEANUP_INTERVAL_S: u64 = 60;
32
33/// Filename of the per-session metadata sidecar inside each session dir.
34const META_FILE: &str = "session.json";
35
36/// A live playback session over a recorded time range. Returned to clients (Serialize); the durable
37/// metadata sidecar carries the extra bookkeeping fields the cleanup sweeper needs.
38#[derive(Debug, Serialize)]
39pub struct PlaybackSession {
40    pub id: String,
41    pub camera_id: String,
42    /// HLS VOD playlist served under `/media/playback/{session_id}/index.m3u8` (play with hls.js).
43    pub playlist_url: String,
44    pub from: DateTime<Utc>,
45    pub to: DateTime<Utc>,
46    /// Length of the requested window in seconds (the playlist may be shorter where footage has gaps).
47    pub duration_s: f64,
48    pub segment_count: usize,
49}
50
51/// On-disk session record (`playback_dir/{session_id}/session.json`). Carries the source segment ids
52/// so delete/cleanup can release exactly this session's read-locks, plus `created_at` for TTL expiry.
53#[derive(Debug, Serialize, Deserialize)]
54struct SessionMeta {
55    id: String,
56    camera_id: String,
57    from: DateTime<Utc>,
58    to: DateTime<Utc>,
59    duration_s: f64,
60    segment_count: usize,
61    /// Source segment ids read-locked for this session's lifetime.
62    segment_ids: Vec<String>,
63    created_at: DateTime<Utc>,
64}
65
66/// A session id is server-generated (`pbs_<hex>`). Validate any client-supplied id strictly before
67/// joining it to `playback_dir`, so a crafted id cannot traverse out of the playback tree (the id is
68/// used to `remove_dir_all` a path).
69fn is_valid_session_id(id: &str) -> bool {
70    id.starts_with("pbs_")
71        && id.len() <= 64
72        && id.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
73}
74
75/// Create a playback session for `camera_id` over `[from, to)`: read-lock the overlapping segments and
76/// generate a trimmed HLS VOD playlist from them. Rejects (400) a range longer than
77/// `HELDAR_MAX_PLAYBACK_SECONDS`, an empty/reversed range, or a range with no recorded footage (404).
78pub async fn create_session(
79    state: &AppState,
80    camera_id: &str,
81    from: DateTime<Utc>,
82    to: DateTime<Utc>,
83) -> AppResult<PlaybackSession> {
84    if to <= from {
85        return Err(AppError::BadRequest("`to` must be after `from`".into()));
86    }
87    let requested = (to - from).num_milliseconds() as f64 / 1000.0;
88    let max = state.cfg.max_playback_seconds;
89    if requested > max {
90        return Err(AppError::BadRequest(format!(
91            "playback range too long ({requested:.0}s); max {max:.0}s"
92        )));
93    }
94
95    // Camera must exist; its segment length sets the target HLS segment duration.
96    let cam: Option<(i64,)> = sqlx::query_as("SELECT segment_seconds FROM cameras WHERE id = ?")
97        .bind(camera_id)
98        .fetch_optional(&state.pool)
99        .await?;
100    let Some((segment_seconds,)) = cam else {
101        return Err(AppError::NotFound(format!("camera {camera_id} not found")));
102    };
103
104    // Same overlap query the clip exporter uses: any segment intersecting the window.
105    let segments: Vec<Segment> = sqlx::query_as::<_, Segment>(
106        "SELECT * FROM segments
107         WHERE camera_id = ? AND start_time < ? AND end_time > ?
108         ORDER BY start_time ASC",
109    )
110    .bind(camera_id)
111    .bind(to)
112    .bind(from)
113    .fetch_all(&state.pool)
114    .await?;
115    if segments.is_empty() {
116        return Err(AppError::NotFound(
117            "no recorded footage in the requested range".into(),
118        ));
119    }
120
121    let session_id = format!("pbs_{}", Uuid::new_v4().simple());
122    let session_dir = state.cfg.playback_dir.join(&session_id);
123    tokio::fs::create_dir_all(&session_dir)
124        .await
125        .map_err(|e| AppError::Other(e.into()))?;
126
127    // Read-lock the source segments so the retention sweeper can't delete them out from under the
128    // session (the HLS dir is a self-contained copy, but the product holds footage under review).
129    let seg_ids: Vec<String> = segments.iter().map(|s| s.id.clone()).collect();
130    crate::repo::set_segments_locked(&state.pool, &seg_ids, true).await;
131
132    let hls_time = segment_seconds.max(2);
133    let build = generate_hls(state, &session_dir, &segments, from, requested, hls_time).await;
134    if let Err(e) = build {
135        // Generation failed: release the locks and remove the half-built dir, then surface the error.
136        crate::repo::set_segments_locked(&state.pool, &seg_ids, false).await;
137        let _ = tokio::fs::remove_dir_all(&session_dir).await;
138        return Err(e);
139    }
140
141    let meta = SessionMeta {
142        id: session_id.clone(),
143        camera_id: camera_id.to_string(),
144        from,
145        to,
146        duration_s: requested,
147        segment_count: segments.len(),
148        segment_ids: seg_ids.clone(),
149        created_at: Utc::now(),
150    };
151    let meta_json = serde_json::to_vec(&meta).map_err(|e| AppError::Other(e.into()))?;
152    if let Err(e) = tokio::fs::write(session_dir.join(META_FILE), meta_json).await {
153        // Without the sidecar the cleanup sweeper can't release the locks; fail clean instead.
154        crate::repo::set_segments_locked(&state.pool, &seg_ids, false).await;
155        let _ = tokio::fs::remove_dir_all(&session_dir).await;
156        return Err(AppError::Other(e.into()));
157    }
158
159    tracing::info!(
160        session = %session_id,
161        camera = %camera_id,
162        segments = segments.len(),
163        duration_s = requested,
164        "playback: created session"
165    );
166    Ok(PlaybackSession {
167        playlist_url: format!("/media/playback/{session_id}/index.m3u8"),
168        id: session_id,
169        camera_id: camera_id.to_string(),
170        from,
171        to,
172        duration_s: requested,
173        segment_count: segments.len(),
174    })
175}
176
177/// Concatenate `segments`, trim to the exact `[from, from+requested)` window, and remux (`-c copy`)
178/// into an HLS VOD playlist (`index.m3u8` + `seg_*.ts`) inside `session_dir`. The temp concat list
179/// (which holds absolute recording paths) is removed on every outcome so it is never served.
180async fn generate_hls(
181    state: &AppState,
182    session_dir: &StdPath,
183    segments: &[Segment],
184    from: DateTime<Utc>,
185    requested: f64,
186    hls_time: i64,
187) -> AppResult<()> {
188    let list_path = session_dir.join("concat.txt");
189    let mut list = String::new();
190    for s in segments {
191        let escaped = s.path.replace('\'', "'\\''");
192        list.push_str(&format!("file '{escaped}'\n"));
193    }
194    tokio::fs::write(&list_path, list)
195        .await
196        .map_err(|e| AppError::Other(e.into()))?;
197
198    let first_start = segments[0].start_time;
199    let ss = ((from - first_start).num_milliseconds() as f64 / 1000.0).max(0.0);
200    let playlist_path = session_dir.join("index.m3u8");
201    let seg_pattern = session_dir.join("seg_%05d.ts");
202
203    let mut cmd = Command::new(&state.cfg.ffmpeg_bin);
204    cmd.kill_on_drop(true)
205        .args([
206            "-hide_banner",
207            "-loglevel",
208            "error",
209            "-f",
210            "concat",
211            "-safe",
212            "0",
213        ])
214        .arg("-i")
215        .arg(&list_path)
216        // Trim the concatenated input to the exact window (keyframe-aligned, like the clip exporter).
217        .args(["-ss", &format!("{ss:.3}")])
218        .args(["-t", &format!("{requested:.3}")])
219        .args(["-c", "copy", "-avoid_negative_ts", "make_zero"])
220        // HLS VOD: a complete, seekable playlist (vod forces an unbounded list_size = all segments).
221        .args(["-f", "hls"])
222        .args(["-hls_time", &hls_time.to_string()])
223        .args(["-hls_playlist_type", "vod"])
224        .arg("-hls_segment_filename")
225        .arg(&seg_pattern)
226        .arg(&playlist_path)
227        .stdin(Stdio::null())
228        .stdout(Stdio::null())
229        .stderr(Stdio::piped());
230
231    let result = tokio::time::timeout(BUILD_TIMEOUT, cmd.output()).await;
232    // Always drop the temp concat list (it leaks absolute recording paths), on every outcome.
233    let _ = tokio::fs::remove_file(&list_path).await;
234
235    let out = match result {
236        Err(_) => return Err(AppError::Other(anyhow::anyhow!("playback build timed out"))),
237        Ok(Err(e)) => return Err(AppError::Other(e.into())),
238        Ok(Ok(out)) => out,
239    };
240    if !out.status.success() {
241        return Err(AppError::Other(anyhow::anyhow!(
242            "ffmpeg playback build failed: {}",
243            String::from_utf8_lossy(&out.stderr).trim()
244        )));
245    }
246    Ok(())
247}
248
249/// Delete a playback session: release its segment read-locks and remove its HLS dir. Idempotent in
250/// the locks (best-effort); 404 if the session does not exist.
251pub async fn delete_session(state: &AppState, session_id: &str) -> AppResult<()> {
252    if !is_valid_session_id(session_id) {
253        return Err(AppError::BadRequest("invalid session id".into()));
254    }
255    let session_dir = state.cfg.playback_dir.join(session_id);
256    if !tokio::fs::try_exists(&session_dir).await.unwrap_or(false) {
257        return Err(AppError::NotFound(format!(
258            "playback session {session_id} not found"
259        )));
260    }
261    if let Some(meta) = read_meta(&session_dir).await {
262        crate::repo::set_segments_locked(&state.pool, &meta.segment_ids, false).await;
263    }
264    tokio::fs::remove_dir_all(&session_dir)
265        .await
266        .map_err(|e| AppError::Other(e.into()))?;
267    tracing::info!(session = %session_id, "playback: deleted session");
268    Ok(())
269}
270
271/// Read+parse a session's metadata sidecar, if present and well-formed.
272async fn read_meta(session_dir: &StdPath) -> Option<SessionMeta> {
273    let bytes = tokio::fs::read(session_dir.join(META_FILE)).await.ok()?;
274    serde_json::from_slice(&bytes).ok()
275}
276
277/// Background sweeper: remove session dirs older than the TTL and release their read-locks. Spawned
278/// (supervised) from `main`.
279pub async fn run(state: AppState) {
280    let mut tick = tokio::time::interval(Duration::from_secs(CLEANUP_INTERVAL_S));
281    loop {
282        tick.tick().await;
283        if let Err(e) = sweep(&state).await {
284            tracing::error!(error = %e, "playback_session_cleanup: tick failed");
285        }
286    }
287}
288
289/// Remove every expired session once. Expiry is `created_at + TTL <= now` from the metadata sidecar;
290/// a dir with no/corrupt metadata falls back to its directory mtime (and releases no locks — startup
291/// already clears stale read-locks).
292async fn sweep(state: &AppState) -> anyhow::Result<()> {
293    let ttl = chrono::Duration::minutes(state.cfg.playback_session_ttl_minutes.max(1));
294    let now = Utc::now();
295    let mut entries = match tokio::fs::read_dir(&state.cfg.playback_dir).await {
296        Ok(e) => e,
297        // The playback dir may not exist yet (no session ever created); nothing to do.
298        Err(_) => return Ok(()),
299    };
300    while let Some(entry) = entries.next_entry().await? {
301        let path = entry.path();
302        if !entry.file_type().await.map(|t| t.is_dir()).unwrap_or(false) {
303            continue;
304        }
305        let meta = read_meta(&path).await;
306        let expired = match &meta {
307            Some(m) => m.created_at + ttl <= now,
308            None => dir_mtime_before(&path, now - ttl).await,
309        };
310        if !expired {
311            continue;
312        }
313        if let Some(m) = &meta {
314            crate::repo::set_segments_locked(&state.pool, &m.segment_ids, false).await;
315        }
316        match tokio::fs::remove_dir_all(&path).await {
317            Ok(()) => {
318                tracing::debug!(dir = %path.display(), "playback_session_cleanup: removed expired session")
319            }
320            Err(e) => {
321                tracing::warn!(error = %e, dir = %path.display(), "playback_session_cleanup: failed to remove session dir")
322            }
323        }
324    }
325    Ok(())
326}
327
328/// Whether a directory's last-modified time is before `cutoff` (fallback expiry when the metadata
329/// sidecar is missing/unreadable). Conservative: returns false when the time can't be read.
330async fn dir_mtime_before(path: &StdPath, cutoff: DateTime<Utc>) -> bool {
331    match tokio::fs::metadata(path).await.and_then(|m| m.modified()) {
332        Ok(modified) => DateTime::<Utc>::from(modified) < cutoff,
333        Err(_) => false,
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340
341    #[test]
342    fn session_id_validation_rejects_traversal() {
343        assert!(is_valid_session_id("pbs_0123abcdef"));
344        assert!(!is_valid_session_id("pbs_../../etc"));
345        assert!(!is_valid_session_id("pbs_a/b"));
346        assert!(!is_valid_session_id("../pbs_x"));
347        assert!(!is_valid_session_id("clip_123")); // wrong prefix
348        assert!(!is_valid_session_id("pbs_with.dot"));
349        assert!(!is_valid_session_id(&format!("pbs_{}", "a".repeat(80)))); // too long
350    }
351}