Skip to main content

heldar_kernel/services/
recorder.rs

1//! Recorder supervisor: one FFmpeg process per camera, recording the configured stream
2//! into time-segmented fragmented-MP4 files with `-c copy` (no decode). Supervises the
3//! process, reconnects with backoff on stream loss, and maintains live camera status.
4
5use std::collections::HashMap;
6use std::process::Stdio;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use chrono::Utc;
12use serde_json::json;
13use sqlx::SqlitePool;
14use tokio::io::AsyncReadExt;
15use tokio::process::Command;
16use tokio::sync::{watch, Mutex};
17use tokio::task::JoinHandle;
18
19use crate::camera_url;
20use crate::config::Config;
21use crate::models::Camera;
22use crate::repo;
23
24/// Keep at most this many bytes of an FFmpeg run's stderr (the tail is what matters).
25const STDERR_TAIL_CAP: usize = 8192;
26
27struct CameraTask {
28    stop: watch::Sender<bool>,
29    handle: JoinHandle<()>,
30    /// Monotonic id distinguishing this task from any later task for the same camera.
31    generation: u64,
32}
33
34/// Owns and supervises the per-camera recorder tasks.
35pub struct RecorderManager {
36    pool: SqlitePool,
37    cfg: Arc<Config>,
38    tasks: Mutex<HashMap<String, CameraTask>>,
39    next_generation: AtomicU64,
40}
41
42impl RecorderManager {
43    pub fn new(pool: SqlitePool, cfg: Arc<Config>) -> Arc<Self> {
44        Arc::new(Self {
45            pool,
46            cfg,
47            tasks: Mutex::new(HashMap::new()),
48            next_generation: AtomicU64::new(1),
49        })
50    }
51
52    /// Start recorders for all cameras that should record.
53    pub async fn start_all(self: &Arc<Self>) -> anyhow::Result<()> {
54        if !self.cfg.recorder_enabled {
55            tracing::warn!("recorder globally disabled (HELDAR_RECORDER_ENABLED=false)");
56            return Ok(());
57        }
58        let cams: Vec<Camera> = sqlx::query_as::<_, Camera>(
59            "SELECT * FROM cameras WHERE enabled = 1 AND record_enabled = 1",
60        )
61        .fetch_all(&self.pool)
62        .await?;
63        tracing::info!(count = cams.len(), "recorder: starting cameras");
64        for cam in cams {
65            self.spawn(cam.id).await;
66        }
67        Ok(())
68    }
69
70    /// Reconcile a single camera's recorder against its current DB state.
71    pub async fn reconcile(self: &Arc<Self>, camera_id: &str) {
72        self.stop(camera_id).await;
73        if !self.cfg.recorder_enabled {
74            return;
75        }
76        let cam = sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
77            .bind(camera_id)
78            .fetch_optional(&self.pool)
79            .await
80            .ok()
81            .flatten();
82        match cam {
83            Some(cam) if cam.should_record() => self.spawn(camera_id.to_string()).await,
84            Some(_) => {
85                let _ = repo::set_state(&self.pool, camera_id, "disabled", None).await;
86            }
87            None => {}
88        }
89    }
90
91    /// Stop a camera's recorder task, killing its FFmpeg process. Returns only once the task is
92    /// actually gone (aborting it if it does not stop promptly).
93    pub async fn stop(self: &Arc<Self>, camera_id: &str) {
94        let task = { self.tasks.lock().await.remove(camera_id) };
95        if let Some(task) = task {
96            let _ = task.stop.send(true);
97            let mut handle = task.handle;
98            if tokio::time::timeout(Duration::from_secs(8), &mut handle)
99                .await
100                .is_err()
101            {
102                // The task did not honor the stop signal in time. Abort it: dropping its frame
103                // drops the FFmpeg Child, and kill_on_drop terminates the process.
104                tracing::warn!(%camera_id, "recorder: task did not stop within 8s; aborting");
105                handle.abort();
106                let _ = handle.await;
107            }
108        }
109    }
110
111    /// Stop all recorder tasks (graceful shutdown).
112    pub async fn shutdown(self: &Arc<Self>) {
113        let ids: Vec<String> = { self.tasks.lock().await.keys().cloned().collect() };
114        tracing::info!(count = ids.len(), "recorder: shutting down");
115        for id in ids {
116            self.stop(&id).await;
117        }
118    }
119
120    /// Camera ids currently being supervised.
121    pub async fn active_ids(&self) -> Vec<String> {
122        self.tasks.lock().await.keys().cloned().collect()
123    }
124
125    async fn spawn(self: &Arc<Self>, camera_id: String) {
126        let (tx, rx) = watch::channel(false);
127        let generation = self.next_generation.fetch_add(1, Ordering::Relaxed);
128
129        // Hold the map lock across spawn+insert so a concurrent stop()/delete can never observe a
130        // gap where the task is running but not yet registered (which would let it slip through).
131        let mut tasks = self.tasks.lock().await;
132        let me = self.clone();
133        let id_for_task = camera_id.clone();
134        let handle = tokio::spawn(async move {
135            me.supervise(id_for_task, generation, rx).await;
136        });
137        if let Some(old) = tasks.insert(
138            camera_id,
139            CameraTask {
140                stop: tx,
141                handle,
142                generation,
143            },
144        ) {
145            // Displaced a previous task: signal AND abort it so two FFmpegs never overlap.
146            let _ = old.stop.send(true);
147            old.handle.abort();
148        }
149    }
150
151    async fn supervise(
152        self: Arc<Self>,
153        camera_id: String,
154        generation: u64,
155        stop: watch::Receiver<bool>,
156    ) {
157        self.run_supervise(camera_id.clone(), stop).await;
158        // Self-exit cleanup: remove our own entry, but only if it is still ours (a concurrent
159        // spawn may have installed a newer task for this camera).
160        let mut tasks = self.tasks.lock().await;
161        if tasks.get(&camera_id).map(|t| t.generation) == Some(generation) {
162            tasks.remove(&camera_id);
163            tracing::debug!(%camera_id, "recorder: task removed itself from map on exit");
164        }
165    }
166
167    async fn run_supervise(&self, camera_id: String, mut stop: watch::Receiver<bool>) {
168        let mut backoff: u64 = 1;
169        loop {
170            if *stop.borrow() {
171                return;
172            }
173
174            let cam = match sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
175                .bind(&camera_id)
176                .fetch_optional(&self.pool)
177                .await
178            {
179                Ok(Some(c)) => c,
180                Ok(None) => return, // camera deleted
181                Err(e) => {
182                    tracing::error!(%camera_id, error = %e, "recorder: failed to load camera");
183                    if sleep_or_stop(&mut stop, 10).await {
184                        return;
185                    }
186                    continue;
187                }
188            };
189            if !cam.should_record() {
190                let _ = repo::set_state(&self.pool, &camera_id, "disabled", None).await;
191                return;
192            }
193
194            let Some(url) = camera_url::record_url(&cam) else {
195                let msg = "no RTSP URL: set address+credentials or an explicit stream URL";
196                let _ = repo::set_state(&self.pool, &camera_id, "error", Some(msg)).await;
197                let _ = repo::log_event(
198                    &self.pool,
199                    Some(&camera_id),
200                    "recorder_error",
201                    "warning",
202                    json!({ "reason": msg }),
203                )
204                .await;
205                if sleep_or_stop(&mut stop, 30).await {
206                    return;
207                }
208                continue;
209            };
210
211            let dir = self.cfg.camera_recordings_dir(&camera_id);
212            if let Err(e) = tokio::fs::create_dir_all(&dir).await {
213                tracing::error!(%camera_id, error = %e, "recorder: cannot create recordings dir");
214            }
215            let seg = cam.segment_seconds.max(2);
216            let pattern = dir.join("%Y%m%d_%H%M%S.mp4");
217            let masked = camera_url::mask_url(&url);
218
219            let _ = repo::set_state(&self.pool, &camera_id, "connecting", None).await;
220            tracing::info!(%camera_id, url = %masked, segment_s = seg, "recorder: starting ffmpeg");
221
222            let mut child = match Command::new(&self.cfg.ffmpeg_bin)
223                .kill_on_drop(true)
224                .env("TZ", "UTC")
225                .args(["-nostdin", "-hide_banner", "-loglevel", "warning"])
226                .args(["-rtsp_transport", "tcp"])
227                .args(["-timeout", "15000000"]) // 15s RTSP socket I/O timeout -> exit on stall
228                .args(["-i", &url])
229                .args(["-c", "copy", "-an"]) // copy video; drop audio in Stage 0
230                .args(["-f", "segment"])
231                .args(["-segment_time", &seg.to_string()])
232                .args(["-segment_format", "mp4"])
233                .args([
234                    "-segment_format_options",
235                    "movflags=+frag_keyframe+empty_moov+default_base_moof",
236                ])
237                .args(["-reset_timestamps", "1"])
238                .args(["-strftime", "1"])
239                .arg(&pattern)
240                .stdin(Stdio::null())
241                .stdout(Stdio::null())
242                .stderr(Stdio::piped())
243                .spawn()
244            {
245                Ok(c) => c,
246                Err(e) => {
247                    let msg = format!("spawn ffmpeg failed: {e}");
248                    tracing::error!(%camera_id, "{msg}");
249                    let _ = repo::set_state(&self.pool, &camera_id, "error", Some(&msg)).await;
250                    if sleep_or_stop(&mut stop, 15).await {
251                        return;
252                    }
253                    continue;
254                }
255            };
256
257            let pid = child.id().map(|p| p as i64);
258            let _ = repo::set_running(&self.pool, &camera_id, "recording", pid).await;
259
260            // Drain stderr concurrently (so the pipe never blocks ffmpeg), keeping only a bounded
261            // tail so a chatty/long-lived recorder cannot grow this buffer without bound.
262            let stderr = child.stderr.take();
263            let stderr_task = tokio::spawn(async move {
264                let mut tail: Vec<u8> = Vec::new();
265                if let Some(mut s) = stderr {
266                    let mut chunk = [0u8; 4096];
267                    loop {
268                        match s.read(&mut chunk).await {
269                            Ok(0) | Err(_) => break,
270                            Ok(n) => {
271                                tail.extend_from_slice(&chunk[..n]);
272                                if tail.len() > STDERR_TAIL_CAP {
273                                    let excess = tail.len() - STDERR_TAIL_CAP;
274                                    tail.drain(0..excess);
275                                }
276                            }
277                        }
278                    }
279                }
280                tail
281            });
282
283            let started = Utc::now();
284            tokio::select! {
285                status = child.wait() => {
286                    let raw = String::from_utf8_lossy(&stderr_task.await.unwrap_or_default())
287                        .trim().to_string();
288                    // Mask any credentials FFmpeg echoes back in the RTSP URL before persisting/logging.
289                    let err_tail = camera_url::mask_url(&raw);
290                    let ran = (Utc::now() - started).num_seconds();
291                    match status {
292                        Ok(s) if s.success() =>
293                            tracing::warn!(%camera_id, ran_s = ran, "ffmpeg exited (stream ended)"),
294                        Ok(s) =>
295                            tracing::warn!(%camera_id, ran_s = ran, code = ?s.code(), tail = %err_tail, "ffmpeg exited with error"),
296                        Err(e) =>
297                            tracing::error!(%camera_id, error = %e, "ffmpeg wait failed"),
298                    }
299                    let _ = repo::bump_reconnect(&self.pool, &camera_id, &err_tail).await;
300                    let _ = repo::log_event(&self.pool, Some(&camera_id), "camera_offline", "warning",
301                        json!({ "ran_seconds": ran, "detail": err_tail })).await;
302                    // Reset backoff if it ran a healthy while; otherwise exponential up to 30s.
303                    backoff = if ran > 30 { 1 } else { (backoff * 2).min(30) };
304                    if sleep_or_stop(&mut stop, backoff).await {
305                        return;
306                    }
307                }
308                _ = stop.changed() => {
309                    tracing::info!(%camera_id, "recorder: stop requested");
310                    let _ = child.kill().await;
311                    let _ = repo::set_state(&self.pool, &camera_id, "offline", None).await;
312                    return;
313                }
314            }
315        }
316    }
317}
318
319/// Sleep for `secs`, returning `true` if a stop was signaled during the wait.
320async fn sleep_or_stop(stop: &mut watch::Receiver<bool>, secs: u64) -> bool {
321    if *stop.borrow() {
322        return true;
323    }
324    tokio::select! {
325        _ = tokio::time::sleep(Duration::from_secs(secs)) => *stop.borrow(),
326        _ = stop.changed() => *stop.borrow(),
327    }
328}