Skip to main content

heldar_kernel/services/
mirror.rs

1//! Dual / mirror recording: a SECOND, supervised ffmpeg pipeline per `mirror_enabled` camera that
2//! writes byte-identical segments to `HELDAR_MIRROR_RECORDINGS_DIR/{camera_id}/` (a redundant DVR
3//! copy on a separate volume). It reuses the recorder's [`build_record_command`] with the output dir
4//! swapped, so the mirror files are indistinguishable from the primaries (same names, same codecs).
5//!
6//! The manager is created (and held as `Option` on [`AppState`]) only when the mirror dir is
7//! configured. It is a SHADOW of the primary recorder: it never writes camera_status (the primary owns
8//! that) and only mirrors continuously while a camera `should_record()` AND has `mirror_enabled`. The
9//! mirror dir is NOT indexed (it is a cold redundant copy); restore/index is an operational step.
10
11use std::collections::HashMap;
12use std::path::PathBuf;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use chrono::Utc;
18use sqlx::SqlitePool;
19use tokio::io::AsyncReadExt;
20use tokio::sync::{watch, Mutex};
21use tokio::task::JoinHandle;
22
23use crate::camera_url;
24use crate::config::Config;
25use crate::models::Camera;
26use crate::services::recorder::build_record_command;
27
28/// Keep at most this many bytes of a mirror ffmpeg run's stderr tail (matches the primary recorder).
29const STDERR_TAIL_CAP: usize = 8192;
30
31struct MirrorTask {
32    stop: watch::Sender<bool>,
33    handle: JoinHandle<()>,
34    /// Monotonic id distinguishing this task from any later task for the same camera.
35    generation: u64,
36}
37
38/// Owns and supervises the per-camera mirror recorder tasks.
39pub struct MirrorRecorderManager {
40    pool: SqlitePool,
41    cfg: Arc<Config>,
42    /// Mirror recordings root (HELDAR_MIRROR_RECORDINGS_DIR); each camera mirrors into a subdir.
43    mirror_root: PathBuf,
44    tasks: Mutex<HashMap<String, MirrorTask>>,
45    next_generation: AtomicU64,
46}
47
48impl MirrorRecorderManager {
49    pub fn new(pool: SqlitePool, cfg: Arc<Config>, mirror_root: PathBuf) -> Arc<Self> {
50        Arc::new(Self {
51            pool,
52            cfg,
53            mirror_root,
54            tasks: Mutex::new(HashMap::new()),
55            next_generation: AtomicU64::new(1),
56        })
57    }
58
59    /// Per-camera mirror output directory under the mirror root.
60    fn camera_dir(&self, camera_id: &str) -> PathBuf {
61        self.mirror_root.join(camera_id)
62    }
63
64    /// Whether a camera should have a mirror pipeline: recording-enabled AND opted into mirroring.
65    fn should_mirror(cam: &Camera) -> bool {
66        cam.should_record() && cam.mirror_enabled
67    }
68
69    /// Start mirror recorders for every camera that should mirror.
70    pub async fn start_all(self: &Arc<Self>) -> anyhow::Result<()> {
71        if !self.cfg.recorder_enabled {
72            return Ok(());
73        }
74        let cams: Vec<Camera> = sqlx::query_as::<_, Camera>(
75            "SELECT * FROM cameras WHERE enabled = 1 AND record_enabled = 1 AND mirror_enabled = 1",
76        )
77        .fetch_all(&self.pool)
78        .await?;
79        tracing::info!(count = cams.len(), root = %self.mirror_root.display(), "mirror: starting cameras");
80        for cam in cams {
81            self.spawn(cam.id).await;
82        }
83        Ok(())
84    }
85
86    /// Reconcile a single camera's mirror recorder against its current DB state (stop, then restart
87    /// when it should mirror). Mirroring is continuous and independent of the recording schedule, so
88    /// this only depends on `should_record()` + `mirror_enabled`.
89    pub async fn reconcile(self: &Arc<Self>, camera_id: &str) {
90        self.stop(camera_id).await;
91        if !self.cfg.recorder_enabled {
92            return;
93        }
94        let cam = sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
95            .bind(camera_id)
96            .fetch_optional(&self.pool)
97            .await
98            .ok()
99            .flatten();
100        if let Some(cam) = cam {
101            if Self::should_mirror(&cam) {
102                self.spawn(camera_id.to_string()).await;
103            }
104        }
105    }
106
107    /// Stop a camera's mirror task, killing its ffmpeg process (aborting if it does not stop promptly).
108    pub async fn stop(self: &Arc<Self>, camera_id: &str) {
109        let task = { self.tasks.lock().await.remove(camera_id) };
110        if let Some(task) = task {
111            let _ = task.stop.send(true);
112            let mut handle = task.handle;
113            if tokio::time::timeout(Duration::from_secs(8), &mut handle)
114                .await
115                .is_err()
116            {
117                tracing::warn!(%camera_id, "mirror: task did not stop within 8s; aborting");
118                handle.abort();
119                let _ = handle.await;
120            }
121        }
122    }
123
124    /// Stop all mirror tasks (graceful shutdown).
125    pub async fn shutdown(self: &Arc<Self>) {
126        let ids: Vec<String> = { self.tasks.lock().await.keys().cloned().collect() };
127        tracing::info!(count = ids.len(), "mirror: shutting down");
128        for id in ids {
129            self.stop(&id).await;
130        }
131    }
132
133    async fn spawn(self: &Arc<Self>, camera_id: String) {
134        let (tx, rx) = watch::channel(false);
135        let generation = self.next_generation.fetch_add(1, Ordering::Relaxed);
136        // Hold the map lock across spawn+insert so a concurrent stop()/delete can never observe a gap
137        // where the task is running but not yet registered.
138        let mut tasks = self.tasks.lock().await;
139        let me = self.clone();
140        let id_for_task = camera_id.clone();
141        let handle = tokio::spawn(async move {
142            me.supervise(id_for_task, generation, rx).await;
143        });
144        if let Some(old) = tasks.insert(
145            camera_id,
146            MirrorTask {
147                stop: tx,
148                handle,
149                generation,
150            },
151        ) {
152            let _ = old.stop.send(true);
153            old.handle.abort();
154        }
155    }
156
157    async fn supervise(
158        self: Arc<Self>,
159        camera_id: String,
160        generation: u64,
161        stop: watch::Receiver<bool>,
162    ) {
163        self.run_mirror(camera_id.clone(), stop).await;
164        // Self-exit cleanup: remove our own entry only if it is still ours.
165        let mut tasks = self.tasks.lock().await;
166        if tasks.get(&camera_id).map(|t| t.generation) == Some(generation) {
167            tasks.remove(&camera_id);
168        }
169    }
170
171    /// Mirror loop: keep a continuous ffmpeg writing identical segments to the mirror dir, reconnecting
172    /// with backoff on stream loss. Self-exits when the camera is deleted / disabled / un-mirrored.
173    async fn run_mirror(&self, camera_id: String, mut stop: watch::Receiver<bool>) {
174        let mut backoff: u64 = 1;
175        loop {
176            if *stop.borrow() {
177                return;
178            }
179            let cam = match sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
180                .bind(&camera_id)
181                .fetch_optional(&self.pool)
182                .await
183            {
184                Ok(Some(c)) => c,
185                Ok(None) => return, // camera deleted
186                Err(e) => {
187                    tracing::error!(%camera_id, error = %e, "mirror: failed to load camera");
188                    if sleep_or_stop(&mut stop, 10).await {
189                        return;
190                    }
191                    continue;
192                }
193            };
194            if !Self::should_mirror(&cam) {
195                // Disabled / un-mirrored out from under us; reconcile() will respawn if needed.
196                return;
197            }
198
199            let Some(url) = camera_url::record_url(&cam) else {
200                tracing::warn!(%camera_id, "mirror: no RTSP URL; retrying");
201                if sleep_or_stop(&mut stop, 30).await {
202                    return;
203                }
204                continue;
205            };
206
207            let dir = self.camera_dir(&camera_id);
208            if let Err(e) = tokio::fs::create_dir_all(&dir).await {
209                tracing::error!(%camera_id, error = %e, "mirror: cannot create mirror dir");
210            }
211            let masked = camera_url::mask_url(&url);
212            tracing::info!(%camera_id, url = %masked, dir = %dir.display(), "mirror: starting ffmpeg");
213
214            let mut child = match build_record_command(&self.cfg, &cam, &url, &dir).spawn() {
215                Ok(c) => c,
216                Err(e) => {
217                    tracing::error!(%camera_id, "mirror: spawn ffmpeg failed: {e}");
218                    if sleep_or_stop(&mut stop, 15).await {
219                        return;
220                    }
221                    continue;
222                }
223            };
224
225            // Drain stderr concurrently with a bounded tail (so the pipe never blocks ffmpeg).
226            let stderr = child.stderr.take();
227            let stderr_task = tokio::spawn(async move {
228                let mut tail: Vec<u8> = Vec::new();
229                if let Some(mut s) = stderr {
230                    let mut chunk = [0u8; 4096];
231                    loop {
232                        match s.read(&mut chunk).await {
233                            Ok(0) | Err(_) => break,
234                            Ok(n) => {
235                                tail.extend_from_slice(&chunk[..n]);
236                                if tail.len() > STDERR_TAIL_CAP {
237                                    let excess = tail.len() - STDERR_TAIL_CAP;
238                                    tail.drain(0..excess);
239                                }
240                            }
241                        }
242                    }
243                }
244                tail
245            });
246
247            let started = Utc::now();
248            tokio::select! {
249                status = child.wait() => {
250                    let raw = String::from_utf8_lossy(&stderr_task.await.unwrap_or_default())
251                        .trim().to_string();
252                    let err_tail = camera_url::mask_url(&raw);
253                    let ran = (Utc::now() - started).num_seconds();
254                    match status {
255                        Ok(s) if s.success() =>
256                            tracing::warn!(%camera_id, ran_s = ran, "mirror: ffmpeg exited (stream ended)"),
257                        Ok(s) =>
258                            tracing::warn!(%camera_id, ran_s = ran, code = ?s.code(), tail = %err_tail, "mirror: ffmpeg exited with error"),
259                        Err(e) =>
260                            tracing::error!(%camera_id, error = %e, "mirror: ffmpeg wait failed"),
261                    }
262                    backoff = if ran > 30 { 1 } else { (backoff * 2).min(30) };
263                    if sleep_or_stop(&mut stop, backoff).await {
264                        return;
265                    }
266                }
267                _ = stop.changed() => {
268                    tracing::info!(%camera_id, "mirror: stop requested");
269                    let _ = child.kill().await;
270                    return;
271                }
272            }
273        }
274    }
275}
276
277/// Sleep for `secs`, returning `true` if a stop was signaled during the wait.
278async fn sleep_or_stop(stop: &mut watch::Receiver<bool>, secs: u64) -> bool {
279    if *stop.borrow() {
280        return true;
281    }
282    tokio::select! {
283        _ = tokio::time::sleep(Duration::from_secs(secs)) => *stop.borrow(),
284        _ = stop.changed() => *stop.borrow(),
285    }
286}