use std::path::Path as StdPath;
use std::process::Stdio;
use std::time::Duration;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tokio::process::Command;
use uuid::Uuid;
use crate::error::{AppError, AppResult};
use crate::models::Segment;
use crate::state::AppState;
const BUILD_TIMEOUT: Duration = Duration::from_secs(300);
const CLEANUP_INTERVAL_S: u64 = 60;
const META_FILE: &str = "session.json";
#[derive(Debug, Serialize)]
pub struct PlaybackSession {
pub id: String,
pub camera_id: String,
pub playlist_url: String,
pub from: DateTime<Utc>,
pub to: DateTime<Utc>,
pub duration_s: f64,
pub segment_count: usize,
}
#[derive(Debug, Serialize, Deserialize)]
struct SessionMeta {
id: String,
camera_id: String,
from: DateTime<Utc>,
to: DateTime<Utc>,
duration_s: f64,
segment_count: usize,
segment_ids: Vec<String>,
created_at: DateTime<Utc>,
}
fn is_valid_session_id(id: &str) -> bool {
id.starts_with("pbs_")
&& id.len() <= 64
&& id.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
}
pub async fn create_session(
state: &AppState,
camera_id: &str,
from: DateTime<Utc>,
to: DateTime<Utc>,
) -> AppResult<PlaybackSession> {
if to <= from {
return Err(AppError::BadRequest("`to` must be after `from`".into()));
}
let requested = (to - from).num_milliseconds() as f64 / 1000.0;
let max = state.cfg.max_playback_seconds;
if requested > max {
return Err(AppError::BadRequest(format!(
"playback range too long ({requested:.0}s); max {max:.0}s"
)));
}
let cam: Option<(i64,)> = sqlx::query_as("SELECT segment_seconds FROM cameras WHERE id = ?")
.bind(camera_id)
.fetch_optional(&state.pool)
.await?;
let Some((segment_seconds,)) = cam else {
return Err(AppError::NotFound(format!("camera {camera_id} not found")));
};
let segments: Vec<Segment> = sqlx::query_as::<_, Segment>(
"SELECT * FROM segments
WHERE camera_id = ? AND start_time < ? AND end_time > ?
ORDER BY start_time ASC",
)
.bind(camera_id)
.bind(to)
.bind(from)
.fetch_all(&state.pool)
.await?;
if segments.is_empty() {
return Err(AppError::NotFound(
"no recorded footage in the requested range".into(),
));
}
let session_id = format!("pbs_{}", Uuid::new_v4().simple());
let session_dir = state.cfg.playback_dir.join(&session_id);
tokio::fs::create_dir_all(&session_dir)
.await
.map_err(|e| AppError::Other(e.into()))?;
let seg_ids: Vec<String> = segments.iter().map(|s| s.id.clone()).collect();
crate::repo::set_segments_locked(&state.pool, &seg_ids, true).await;
let hls_time = segment_seconds.max(2);
let build = generate_hls(state, &session_dir, &segments, from, requested, hls_time).await;
if let Err(e) = build {
crate::repo::set_segments_locked(&state.pool, &seg_ids, false).await;
let _ = tokio::fs::remove_dir_all(&session_dir).await;
return Err(e);
}
let meta = SessionMeta {
id: session_id.clone(),
camera_id: camera_id.to_string(),
from,
to,
duration_s: requested,
segment_count: segments.len(),
segment_ids: seg_ids.clone(),
created_at: Utc::now(),
};
let meta_json = serde_json::to_vec(&meta).map_err(|e| AppError::Other(e.into()))?;
if let Err(e) = tokio::fs::write(session_dir.join(META_FILE), meta_json).await {
crate::repo::set_segments_locked(&state.pool, &seg_ids, false).await;
let _ = tokio::fs::remove_dir_all(&session_dir).await;
return Err(AppError::Other(e.into()));
}
tracing::info!(
session = %session_id,
camera = %camera_id,
segments = segments.len(),
duration_s = requested,
"playback: created session"
);
Ok(PlaybackSession {
playlist_url: format!("/media/playback/{session_id}/index.m3u8"),
id: session_id,
camera_id: camera_id.to_string(),
from,
to,
duration_s: requested,
segment_count: segments.len(),
})
}
async fn generate_hls(
state: &AppState,
session_dir: &StdPath,
segments: &[Segment],
from: DateTime<Utc>,
requested: f64,
hls_time: i64,
) -> AppResult<()> {
let list_path = session_dir.join("concat.txt");
let mut list = String::new();
for s in segments {
let escaped = s.path.replace('\'', "'\\''");
list.push_str(&format!("file '{escaped}'\n"));
}
tokio::fs::write(&list_path, list)
.await
.map_err(|e| AppError::Other(e.into()))?;
let first_start = segments[0].start_time;
let ss = ((from - first_start).num_milliseconds() as f64 / 1000.0).max(0.0);
let playlist_path = session_dir.join("index.m3u8");
let seg_pattern = session_dir.join("seg_%05d.ts");
let mut cmd = Command::new(&state.cfg.ffmpeg_bin);
cmd.kill_on_drop(true)
.args([
"-hide_banner",
"-loglevel",
"error",
"-f",
"concat",
"-safe",
"0",
])
.arg("-i")
.arg(&list_path)
.args(["-ss", &format!("{ss:.3}")])
.args(["-t", &format!("{requested:.3}")])
.args(["-c", "copy", "-avoid_negative_ts", "make_zero"])
.args(["-f", "hls"])
.args(["-hls_time", &hls_time.to_string()])
.args(["-hls_playlist_type", "vod"])
.arg("-hls_segment_filename")
.arg(&seg_pattern)
.arg(&playlist_path)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::piped());
let result = tokio::time::timeout(BUILD_TIMEOUT, cmd.output()).await;
let _ = tokio::fs::remove_file(&list_path).await;
let out = match result {
Err(_) => return Err(AppError::Other(anyhow::anyhow!("playback build timed out"))),
Ok(Err(e)) => return Err(AppError::Other(e.into())),
Ok(Ok(out)) => out,
};
if !out.status.success() {
return Err(AppError::Other(anyhow::anyhow!(
"ffmpeg playback build failed: {}",
String::from_utf8_lossy(&out.stderr).trim()
)));
}
Ok(())
}
pub async fn delete_session(state: &AppState, session_id: &str) -> AppResult<()> {
if !is_valid_session_id(session_id) {
return Err(AppError::BadRequest("invalid session id".into()));
}
let session_dir = state.cfg.playback_dir.join(session_id);
if !tokio::fs::try_exists(&session_dir).await.unwrap_or(false) {
return Err(AppError::NotFound(format!(
"playback session {session_id} not found"
)));
}
if let Some(meta) = read_meta(&session_dir).await {
crate::repo::set_segments_locked(&state.pool, &meta.segment_ids, false).await;
}
tokio::fs::remove_dir_all(&session_dir)
.await
.map_err(|e| AppError::Other(e.into()))?;
tracing::info!(session = %session_id, "playback: deleted session");
Ok(())
}
async fn read_meta(session_dir: &StdPath) -> Option<SessionMeta> {
let bytes = tokio::fs::read(session_dir.join(META_FILE)).await.ok()?;
serde_json::from_slice(&bytes).ok()
}
pub async fn run(state: AppState) {
let mut tick = tokio::time::interval(Duration::from_secs(CLEANUP_INTERVAL_S));
loop {
tick.tick().await;
if let Err(e) = sweep(&state).await {
tracing::error!(error = %e, "playback_session_cleanup: tick failed");
}
}
}
async fn sweep(state: &AppState) -> anyhow::Result<()> {
let ttl = chrono::Duration::minutes(state.cfg.playback_session_ttl_minutes.max(1));
let now = Utc::now();
let mut entries = match tokio::fs::read_dir(&state.cfg.playback_dir).await {
Ok(e) => e,
Err(_) => return Ok(()),
};
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if !entry.file_type().await.map(|t| t.is_dir()).unwrap_or(false) {
continue;
}
let meta = read_meta(&path).await;
let expired = match &meta {
Some(m) => m.created_at + ttl <= now,
None => dir_mtime_before(&path, now - ttl).await,
};
if !expired {
continue;
}
if let Some(m) = &meta {
crate::repo::set_segments_locked(&state.pool, &m.segment_ids, false).await;
}
match tokio::fs::remove_dir_all(&path).await {
Ok(()) => {
tracing::debug!(dir = %path.display(), "playback_session_cleanup: removed expired session")
}
Err(e) => {
tracing::warn!(error = %e, dir = %path.display(), "playback_session_cleanup: failed to remove session dir")
}
}
}
Ok(())
}
async fn dir_mtime_before(path: &StdPath, cutoff: DateTime<Utc>) -> bool {
match tokio::fs::metadata(path).await.and_then(|m| m.modified()) {
Ok(modified) => DateTime::<Utc>::from(modified) < cutoff,
Err(_) => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn session_id_validation_rejects_traversal() {
assert!(is_valid_session_id("pbs_0123abcdef"));
assert!(!is_valid_session_id("pbs_../../etc"));
assert!(!is_valid_session_id("pbs_a/b"));
assert!(!is_valid_session_id("../pbs_x"));
assert!(!is_valid_session_id("clip_123")); assert!(!is_valid_session_id("pbs_with.dot"));
assert!(!is_valid_session_id(&format!("pbs_{}", "a".repeat(80)))); }
}