use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use sqlx::SqlitePool;
use tokio::io::AsyncReadExt;
use tokio::sync::{watch, Mutex};
use tokio::task::JoinHandle;
use crate::camera_url;
use crate::config::Config;
use crate::models::Camera;
use crate::services::recorder::build_record_command;
const STDERR_TAIL_CAP: usize = 8192;
struct MirrorTask {
stop: watch::Sender<bool>,
handle: JoinHandle<()>,
generation: u64,
}
pub struct MirrorRecorderManager {
pool: SqlitePool,
cfg: Arc<Config>,
mirror_root: PathBuf,
tasks: Mutex<HashMap<String, MirrorTask>>,
next_generation: AtomicU64,
}
impl MirrorRecorderManager {
pub fn new(pool: SqlitePool, cfg: Arc<Config>, mirror_root: PathBuf) -> Arc<Self> {
Arc::new(Self {
pool,
cfg,
mirror_root,
tasks: Mutex::new(HashMap::new()),
next_generation: AtomicU64::new(1),
})
}
fn camera_dir(&self, camera_id: &str) -> PathBuf {
self.mirror_root.join(camera_id)
}
fn should_mirror(cam: &Camera) -> bool {
cam.should_record() && cam.mirror_enabled
}
pub async fn start_all(self: &Arc<Self>) -> anyhow::Result<()> {
if !self.cfg.recorder_enabled {
return Ok(());
}
let cams: Vec<Camera> = sqlx::query_as::<_, Camera>(
"SELECT * FROM cameras WHERE enabled = 1 AND record_enabled = 1 AND mirror_enabled = 1",
)
.fetch_all(&self.pool)
.await?;
tracing::info!(count = cams.len(), root = %self.mirror_root.display(), "mirror: starting cameras");
for cam in cams {
self.spawn(cam.id).await;
}
Ok(())
}
pub async fn reconcile(self: &Arc<Self>, camera_id: &str) {
self.stop(camera_id).await;
if !self.cfg.recorder_enabled {
return;
}
let cam = sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
.bind(camera_id)
.fetch_optional(&self.pool)
.await
.ok()
.flatten();
if let Some(cam) = cam {
if Self::should_mirror(&cam) {
self.spawn(camera_id.to_string()).await;
}
}
}
pub async fn stop(self: &Arc<Self>, camera_id: &str) {
let task = { self.tasks.lock().await.remove(camera_id) };
if let Some(task) = task {
let _ = task.stop.send(true);
let mut handle = task.handle;
if tokio::time::timeout(Duration::from_secs(8), &mut handle)
.await
.is_err()
{
tracing::warn!(%camera_id, "mirror: task did not stop within 8s; aborting");
handle.abort();
let _ = handle.await;
}
}
}
pub async fn shutdown(self: &Arc<Self>) {
let ids: Vec<String> = { self.tasks.lock().await.keys().cloned().collect() };
tracing::info!(count = ids.len(), "mirror: shutting down");
for id in ids {
self.stop(&id).await;
}
}
async fn spawn(self: &Arc<Self>, camera_id: String) {
let (tx, rx) = watch::channel(false);
let generation = self.next_generation.fetch_add(1, Ordering::Relaxed);
let mut tasks = self.tasks.lock().await;
let me = self.clone();
let id_for_task = camera_id.clone();
let handle = tokio::spawn(async move {
me.supervise(id_for_task, generation, rx).await;
});
if let Some(old) = tasks.insert(
camera_id,
MirrorTask {
stop: tx,
handle,
generation,
},
) {
let _ = old.stop.send(true);
old.handle.abort();
}
}
async fn supervise(
self: Arc<Self>,
camera_id: String,
generation: u64,
stop: watch::Receiver<bool>,
) {
self.run_mirror(camera_id.clone(), stop).await;
let mut tasks = self.tasks.lock().await;
if tasks.get(&camera_id).map(|t| t.generation) == Some(generation) {
tasks.remove(&camera_id);
}
}
async fn run_mirror(&self, camera_id: String, mut stop: watch::Receiver<bool>) {
let mut backoff: u64 = 1;
loop {
if *stop.borrow() {
return;
}
let cam = match sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
.bind(&camera_id)
.fetch_optional(&self.pool)
.await
{
Ok(Some(c)) => c,
Ok(None) => return, Err(e) => {
tracing::error!(%camera_id, error = %e, "mirror: failed to load camera");
if sleep_or_stop(&mut stop, 10).await {
return;
}
continue;
}
};
if !Self::should_mirror(&cam) {
return;
}
let Some(url) = camera_url::record_url(&cam) else {
tracing::warn!(%camera_id, "mirror: no RTSP URL; retrying");
if sleep_or_stop(&mut stop, 30).await {
return;
}
continue;
};
let dir = self.camera_dir(&camera_id);
if let Err(e) = tokio::fs::create_dir_all(&dir).await {
tracing::error!(%camera_id, error = %e, "mirror: cannot create mirror dir");
}
let masked = camera_url::mask_url(&url);
tracing::info!(%camera_id, url = %masked, dir = %dir.display(), "mirror: starting ffmpeg");
let mut child = match build_record_command(&self.cfg, &cam, &url, &dir).spawn() {
Ok(c) => c,
Err(e) => {
tracing::error!(%camera_id, "mirror: spawn ffmpeg failed: {e}");
if sleep_or_stop(&mut stop, 15).await {
return;
}
continue;
}
};
let stderr = child.stderr.take();
let stderr_task = tokio::spawn(async move {
let mut tail: Vec<u8> = Vec::new();
if let Some(mut s) = stderr {
let mut chunk = [0u8; 4096];
loop {
match s.read(&mut chunk).await {
Ok(0) | Err(_) => break,
Ok(n) => {
tail.extend_from_slice(&chunk[..n]);
if tail.len() > STDERR_TAIL_CAP {
let excess = tail.len() - STDERR_TAIL_CAP;
tail.drain(0..excess);
}
}
}
}
}
tail
});
let started = Utc::now();
tokio::select! {
status = child.wait() => {
let raw = String::from_utf8_lossy(&stderr_task.await.unwrap_or_default())
.trim().to_string();
let err_tail = camera_url::mask_url(&raw);
let ran = (Utc::now() - started).num_seconds();
match status {
Ok(s) if s.success() =>
tracing::warn!(%camera_id, ran_s = ran, "mirror: ffmpeg exited (stream ended)"),
Ok(s) =>
tracing::warn!(%camera_id, ran_s = ran, code = ?s.code(), tail = %err_tail, "mirror: ffmpeg exited with error"),
Err(e) =>
tracing::error!(%camera_id, error = %e, "mirror: ffmpeg wait failed"),
}
backoff = if ran > 30 { 1 } else { (backoff * 2).min(30) };
if sleep_or_stop(&mut stop, backoff).await {
return;
}
}
_ = stop.changed() => {
tracing::info!(%camera_id, "mirror: stop requested");
let _ = child.kill().await;
return;
}
}
}
}
}
async fn sleep_or_stop(stop: &mut watch::Receiver<bool>, secs: u64) -> bool {
if *stop.borrow() {
return true;
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(secs)) => *stop.borrow(),
_ = stop.changed() => *stop.borrow(),
}
}