1use std::path::Path as StdPath;
13use std::process::Stdio;
14use std::time::Duration;
15
16use chrono::{DateTime, Utc};
17use serde::{Deserialize, Serialize};
18use tokio::process::Command;
19use uuid::Uuid;
20
21use crate::error::{AppError, AppResult};
22use crate::models::Segment;
23use crate::state::AppState;
24
25const BUILD_TIMEOUT: Duration = Duration::from_secs(300);
29
30const CLEANUP_INTERVAL_S: u64 = 60;
32
33const META_FILE: &str = "session.json";
35
36#[derive(Debug, Serialize)]
39pub struct PlaybackSession {
40 pub id: String,
41 pub camera_id: String,
42 pub playlist_url: String,
44 pub from: DateTime<Utc>,
45 pub to: DateTime<Utc>,
46 pub duration_s: f64,
48 pub segment_count: usize,
49}
50
51#[derive(Debug, Serialize, Deserialize)]
54struct SessionMeta {
55 id: String,
56 camera_id: String,
57 from: DateTime<Utc>,
58 to: DateTime<Utc>,
59 duration_s: f64,
60 segment_count: usize,
61 segment_ids: Vec<String>,
63 created_at: DateTime<Utc>,
64}
65
66fn is_valid_session_id(id: &str) -> bool {
70 id.starts_with("pbs_")
71 && id.len() <= 64
72 && id.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
73}
74
75pub async fn create_session(
79 state: &AppState,
80 camera_id: &str,
81 from: DateTime<Utc>,
82 to: DateTime<Utc>,
83) -> AppResult<PlaybackSession> {
84 if to <= from {
85 return Err(AppError::BadRequest("`to` must be after `from`".into()));
86 }
87 let requested = (to - from).num_milliseconds() as f64 / 1000.0;
88 let max = state.cfg.max_playback_seconds;
89 if requested > max {
90 return Err(AppError::BadRequest(format!(
91 "playback range too long ({requested:.0}s); max {max:.0}s"
92 )));
93 }
94
95 let cam: Option<(i64,)> = sqlx::query_as("SELECT segment_seconds FROM cameras WHERE id = ?")
97 .bind(camera_id)
98 .fetch_optional(&state.pool)
99 .await?;
100 let Some((segment_seconds,)) = cam else {
101 return Err(AppError::NotFound(format!("camera {camera_id} not found")));
102 };
103
104 let segments: Vec<Segment> = sqlx::query_as::<_, Segment>(
106 "SELECT * FROM segments
107 WHERE camera_id = ? AND start_time < ? AND end_time > ?
108 ORDER BY start_time ASC",
109 )
110 .bind(camera_id)
111 .bind(to)
112 .bind(from)
113 .fetch_all(&state.pool)
114 .await?;
115 if segments.is_empty() {
116 return Err(AppError::NotFound(
117 "no recorded footage in the requested range".into(),
118 ));
119 }
120
121 let session_id = format!("pbs_{}", Uuid::new_v4().simple());
122 let session_dir = state.cfg.playback_dir.join(&session_id);
123 tokio::fs::create_dir_all(&session_dir)
124 .await
125 .map_err(|e| AppError::Other(e.into()))?;
126
127 let seg_ids: Vec<String> = segments.iter().map(|s| s.id.clone()).collect();
130 crate::repo::set_segments_locked(&state.pool, &seg_ids, true).await;
131
132 let hls_time = segment_seconds.max(2);
133 let build = generate_hls(state, &session_dir, &segments, from, requested, hls_time).await;
134 if let Err(e) = build {
135 crate::repo::set_segments_locked(&state.pool, &seg_ids, false).await;
137 let _ = tokio::fs::remove_dir_all(&session_dir).await;
138 return Err(e);
139 }
140
141 let meta = SessionMeta {
142 id: session_id.clone(),
143 camera_id: camera_id.to_string(),
144 from,
145 to,
146 duration_s: requested,
147 segment_count: segments.len(),
148 segment_ids: seg_ids.clone(),
149 created_at: Utc::now(),
150 };
151 let meta_json = serde_json::to_vec(&meta).map_err(|e| AppError::Other(e.into()))?;
152 if let Err(e) = tokio::fs::write(session_dir.join(META_FILE), meta_json).await {
153 crate::repo::set_segments_locked(&state.pool, &seg_ids, false).await;
155 let _ = tokio::fs::remove_dir_all(&session_dir).await;
156 return Err(AppError::Other(e.into()));
157 }
158
159 tracing::info!(
160 session = %session_id,
161 camera = %camera_id,
162 segments = segments.len(),
163 duration_s = requested,
164 "playback: created session"
165 );
166 Ok(PlaybackSession {
167 playlist_url: format!("/media/playback/{session_id}/index.m3u8"),
168 id: session_id,
169 camera_id: camera_id.to_string(),
170 from,
171 to,
172 duration_s: requested,
173 segment_count: segments.len(),
174 })
175}
176
177async fn generate_hls(
181 state: &AppState,
182 session_dir: &StdPath,
183 segments: &[Segment],
184 from: DateTime<Utc>,
185 requested: f64,
186 hls_time: i64,
187) -> AppResult<()> {
188 let list_path = session_dir.join("concat.txt");
189 let mut list = String::new();
190 for s in segments {
191 let escaped = s.path.replace('\'', "'\\''");
192 list.push_str(&format!("file '{escaped}'\n"));
193 }
194 tokio::fs::write(&list_path, list)
195 .await
196 .map_err(|e| AppError::Other(e.into()))?;
197
198 let first_start = segments[0].start_time;
199 let ss = ((from - first_start).num_milliseconds() as f64 / 1000.0).max(0.0);
200 let playlist_path = session_dir.join("index.m3u8");
201 let seg_pattern = session_dir.join("seg_%05d.ts");
202
203 let mut cmd = Command::new(&state.cfg.ffmpeg_bin);
204 cmd.kill_on_drop(true)
205 .args([
206 "-hide_banner",
207 "-loglevel",
208 "error",
209 "-f",
210 "concat",
211 "-safe",
212 "0",
213 ])
214 .arg("-i")
215 .arg(&list_path)
216 .args(["-ss", &format!("{ss:.3}")])
218 .args(["-t", &format!("{requested:.3}")])
219 .args(["-c", "copy", "-avoid_negative_ts", "make_zero"])
220 .args(["-f", "hls"])
222 .args(["-hls_time", &hls_time.to_string()])
223 .args(["-hls_playlist_type", "vod"])
224 .arg("-hls_segment_filename")
225 .arg(&seg_pattern)
226 .arg(&playlist_path)
227 .stdin(Stdio::null())
228 .stdout(Stdio::null())
229 .stderr(Stdio::piped());
230
231 let result = tokio::time::timeout(BUILD_TIMEOUT, cmd.output()).await;
232 let _ = tokio::fs::remove_file(&list_path).await;
234
235 let out = match result {
236 Err(_) => return Err(AppError::Other(anyhow::anyhow!("playback build timed out"))),
237 Ok(Err(e)) => return Err(AppError::Other(e.into())),
238 Ok(Ok(out)) => out,
239 };
240 if !out.status.success() {
241 return Err(AppError::Other(anyhow::anyhow!(
242 "ffmpeg playback build failed: {}",
243 String::from_utf8_lossy(&out.stderr).trim()
244 )));
245 }
246 Ok(())
247}
248
249pub async fn delete_session(state: &AppState, session_id: &str) -> AppResult<()> {
252 if !is_valid_session_id(session_id) {
253 return Err(AppError::BadRequest("invalid session id".into()));
254 }
255 let session_dir = state.cfg.playback_dir.join(session_id);
256 if !tokio::fs::try_exists(&session_dir).await.unwrap_or(false) {
257 return Err(AppError::NotFound(format!(
258 "playback session {session_id} not found"
259 )));
260 }
261 if let Some(meta) = read_meta(&session_dir).await {
262 crate::repo::set_segments_locked(&state.pool, &meta.segment_ids, false).await;
263 }
264 tokio::fs::remove_dir_all(&session_dir)
265 .await
266 .map_err(|e| AppError::Other(e.into()))?;
267 tracing::info!(session = %session_id, "playback: deleted session");
268 Ok(())
269}
270
271async fn read_meta(session_dir: &StdPath) -> Option<SessionMeta> {
273 let bytes = tokio::fs::read(session_dir.join(META_FILE)).await.ok()?;
274 serde_json::from_slice(&bytes).ok()
275}
276
277pub async fn run(state: AppState) {
280 let mut tick = tokio::time::interval(Duration::from_secs(CLEANUP_INTERVAL_S));
281 loop {
282 tick.tick().await;
283 if let Err(e) = sweep(&state).await {
284 tracing::error!(error = %e, "playback_session_cleanup: tick failed");
285 }
286 }
287}
288
289async fn sweep(state: &AppState) -> anyhow::Result<()> {
293 let ttl = chrono::Duration::minutes(state.cfg.playback_session_ttl_minutes.max(1));
294 let now = Utc::now();
295 let mut entries = match tokio::fs::read_dir(&state.cfg.playback_dir).await {
296 Ok(e) => e,
297 Err(_) => return Ok(()),
299 };
300 while let Some(entry) = entries.next_entry().await? {
301 let path = entry.path();
302 if !entry.file_type().await.map(|t| t.is_dir()).unwrap_or(false) {
303 continue;
304 }
305 let meta = read_meta(&path).await;
306 let expired = match &meta {
307 Some(m) => m.created_at + ttl <= now,
308 None => dir_mtime_before(&path, now - ttl).await,
309 };
310 if !expired {
311 continue;
312 }
313 if let Some(m) = &meta {
314 crate::repo::set_segments_locked(&state.pool, &m.segment_ids, false).await;
315 }
316 match tokio::fs::remove_dir_all(&path).await {
317 Ok(()) => {
318 tracing::debug!(dir = %path.display(), "playback_session_cleanup: removed expired session")
319 }
320 Err(e) => {
321 tracing::warn!(error = %e, dir = %path.display(), "playback_session_cleanup: failed to remove session dir")
322 }
323 }
324 }
325 Ok(())
326}
327
328async fn dir_mtime_before(path: &StdPath, cutoff: DateTime<Utc>) -> bool {
331 match tokio::fs::metadata(path).await.and_then(|m| m.modified()) {
332 Ok(modified) => DateTime::<Utc>::from(modified) < cutoff,
333 Err(_) => false,
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340
341 #[test]
342 fn session_id_validation_rejects_traversal() {
343 assert!(is_valid_session_id("pbs_0123abcdef"));
344 assert!(!is_valid_session_id("pbs_../../etc"));
345 assert!(!is_valid_session_id("pbs_a/b"));
346 assert!(!is_valid_session_id("../pbs_x"));
347 assert!(!is_valid_session_id("clip_123")); assert!(!is_valid_session_id("pbs_with.dot"));
349 assert!(!is_valid_session_id(&format!("pbs_{}", "a".repeat(80)))); }
351}