Skip to main content

heldar_kernel/services/
recorder.rs

1//! Recorder supervisor: one FFmpeg process per camera, recording the configured stream
2//! into time-segmented fragmented-MP4 files with `-c copy` (no decode). Supervises the
3//! process, reconnects with backoff on stream loss, and maintains live camera status.
4
5use std::collections::{HashMap, HashSet};
6use std::process::Stdio;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use chrono::{DateTime, Datelike, Local, Timelike, Utc};
12use serde_json::{json, Value};
13use sqlx::SqlitePool;
14use tokio::io::AsyncReadExt;
15use tokio::process::Command;
16use tokio::sync::{watch, Mutex};
17use tokio::task::JoinHandle;
18
19use crate::camera_url;
20use crate::config::Config;
21use crate::models::{Camera, RecordSchedule};
22use crate::repo;
23
24/// Keep at most this many bytes of an FFmpeg run's stderr (the tail is what matters).
25const STDERR_TAIL_CAP: usize = 8192;
26
27struct CameraTask {
28    stop: watch::Sender<bool>,
29    /// Event-trigger channel for `event` / `scheduled_event` cameras: holds the current trigger
30    /// window end (`None` = no active trigger window). [`RecorderManager::trigger`] extends it; the
31    /// event supervisor records while it (or a schedule window) is active. Unused for
32    /// `continuous` / `scheduled` tasks.
33    trigger: watch::Sender<Option<DateTime<Utc>>>,
34    handle: JoinHandle<()>,
35    /// Monotonic id distinguishing this task from any later task for the same camera.
36    generation: u64,
37}
38
39/// Whether a record mode is event-capable (records on triggers): `event` or `scheduled_event`.
40fn event_capable(mode: &str) -> bool {
41    matches!(mode, "event" | "scheduled_event")
42}
43
44/// Owns and supervises the per-camera recorder tasks.
45pub struct RecorderManager {
46    pool: SqlitePool,
47    cfg: Arc<Config>,
48    tasks: Mutex<HashMap<String, CameraTask>>,
49    next_generation: AtomicU64,
50}
51
52impl RecorderManager {
53    pub fn new(pool: SqlitePool, cfg: Arc<Config>) -> Arc<Self> {
54        Arc::new(Self {
55            pool,
56            cfg,
57            tasks: Mutex::new(HashMap::new()),
58            next_generation: AtomicU64::new(1),
59        })
60    }
61
62    /// Start recorders for all cameras that should record.
63    pub async fn start_all(self: &Arc<Self>) -> anyhow::Result<()> {
64        if !self.cfg.recorder_enabled {
65            tracing::warn!("recorder globally disabled (HELDAR_RECORDER_ENABLED=false)");
66            return Ok(());
67        }
68        let cams: Vec<Camera> = sqlx::query_as::<_, Camera>(
69            "SELECT * FROM cameras WHERE enabled = 1 AND record_enabled = 1",
70        )
71        .fetch_all(&self.pool)
72        .await?;
73        tracing::info!(count = cams.len(), "recorder: starting cameras");
74        for cam in cams {
75            // Honor the recording schedule at boot: a `scheduled` camera outside its window is left
76            // idle (the schedule watcher will start it when the window opens). Continuous cameras
77            // always start. Event-capable cameras (`event` / `scheduled_event`) always spawn ARMED:
78            // their supervisor sits idle until a trigger (or, for scheduled_event, a window) makes it
79            // record.
80            if event_capable(&cam.record_mode) || self.eval_schedule(&cam.id).await {
81                self.spawn(cam.id).await;
82            } else {
83                let _ = repo::set_state(&self.pool, &cam.id, "disabled", None).await;
84            }
85        }
86        Ok(())
87    }
88
89    /// Reconcile a single camera's recorder against its current DB state. Starts a recorder when the
90    /// camera should record AND its schedule says it should be recording now; otherwise stops it and
91    /// marks it idle. Always restarts a running recorder (config may have changed) — callers that
92    /// must not churn an actively-recording camera should use [`Self::reconcile_schedules`].
93    pub async fn reconcile(self: &Arc<Self>, camera_id: &str) {
94        self.stop(camera_id).await;
95        if !self.cfg.recorder_enabled {
96            return;
97        }
98        let cam = sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
99            .bind(camera_id)
100            .fetch_optional(&self.pool)
101            .await
102            .ok()
103            .flatten();
104        match cam {
105            Some(cam) if cam.should_record() => {
106                if event_capable(&cam.record_mode) || self.eval_schedule(camera_id).await {
107                    // Continuous / in-window scheduled: records immediately. Event-capable: spawns
108                    // ARMED — the event supervisor decides record-vs-idle from triggers + schedule.
109                    self.spawn(camera_id.to_string()).await;
110                } else {
111                    // Enabled but a `scheduled` camera outside its window: intentionally not
112                    // recording right now (the schedule watcher will start it when the window opens).
113                    let _ = repo::set_state(&self.pool, camera_id, "disabled", None).await;
114                }
115            }
116            Some(_) => {
117                let _ = repo::set_state(&self.pool, camera_id, "disabled", None).await;
118            }
119            None => {}
120        }
121    }
122
123    /// Whether `camera_id` should be recording at this instant per its `record_mode` + schedule,
124    /// IGNORING event triggers (those are handled by the event supervisor / [`Self::trigger`]):
125    /// - `continuous` is always on.
126    /// - `scheduled` / `scheduled_event` are on only inside an enabled time-of-day window for today's
127    ///   weekday, evaluated against the SERVER's LOCAL timezone (chrono::Local), with overnight wrap.
128    /// - `event` (and any unknown mode) has no time-based recording, so it is off here; it records
129    ///   only while a trigger window is active.
130    pub async fn eval_schedule(&self, camera_id: &str) -> bool {
131        let mode: Option<String> =
132            sqlx::query_scalar("SELECT record_mode FROM cameras WHERE id = ?")
133                .bind(camera_id)
134                .fetch_optional(&self.pool)
135                .await
136                .ok()
137                .flatten();
138        match mode.as_deref().unwrap_or("continuous") {
139            "continuous" => true,
140            "scheduled" | "scheduled_event" => {
141                let rows = sqlx::query_as::<_, RecordSchedule>(
142                    "SELECT * FROM camera_schedules WHERE camera_id = ? AND enabled = 1",
143                )
144                .bind(camera_id)
145                .fetch_all(&self.pool)
146                .await
147                .unwrap_or_default();
148                let now = Local::now();
149                rows.iter().any(|s| schedule_active_at(s, now))
150            }
151            _ => false,
152        }
153    }
154
155    /// Reconcile only the pure `scheduled` cameras whose recording state must change because their
156    /// window just opened or closed. Called periodically by the schedule watcher. Cameras already in
157    /// the correct state are left untouched, so an actively-recording camera is never restarted
158    /// mid-window. `scheduled_event` is deliberately excluded: those tasks are always ARMED and the
159    /// event supervisor opens/closes their window itself (so the watcher must not churn them).
160    pub async fn reconcile_schedules(self: &Arc<Self>) {
161        if !self.cfg.recorder_enabled {
162            return;
163        }
164        let ids: Vec<String> = sqlx::query_scalar(
165            "SELECT id FROM cameras
166             WHERE enabled = 1 AND record_enabled = 1
167               AND record_mode = 'scheduled'",
168        )
169        .fetch_all(&self.pool)
170        .await
171        .unwrap_or_default();
172        if ids.is_empty() {
173            return;
174        }
175        let active: HashSet<String> = self.active_ids().await.into_iter().collect();
176        for id in ids {
177            let want = self.eval_schedule(&id).await;
178            let running = active.contains(&id);
179            if want != running {
180                self.reconcile(&id).await;
181            }
182        }
183    }
184
185    /// Stop a camera's recorder task, killing its FFmpeg process. Returns only once the task is
186    /// actually gone (aborting it if it does not stop promptly).
187    pub async fn stop(self: &Arc<Self>, camera_id: &str) {
188        let task = { self.tasks.lock().await.remove(camera_id) };
189        if let Some(task) = task {
190            let _ = task.stop.send(true);
191            let mut handle = task.handle;
192            if tokio::time::timeout(Duration::from_secs(8), &mut handle)
193                .await
194                .is_err()
195            {
196                // The task did not honor the stop signal in time. Abort it: dropping its frame
197                // drops the FFmpeg Child, and kill_on_drop terminates the process.
198                tracing::warn!(%camera_id, "recorder: task did not stop within 8s; aborting");
199                handle.abort();
200                let _ = handle.await;
201            }
202        }
203    }
204
205    /// Stop all recorder tasks (graceful shutdown).
206    pub async fn shutdown(self: &Arc<Self>) {
207        let ids: Vec<String> = { self.tasks.lock().await.keys().cloned().collect() };
208        tracing::info!(count = ids.len(), "recorder: shutting down");
209        for id in ids {
210            self.stop(&id).await;
211        }
212    }
213
214    /// Camera ids currently being supervised.
215    pub async fn active_ids(&self) -> Vec<String> {
216        self.tasks.lock().await.keys().cloned().collect()
217    }
218
219    async fn spawn(self: &Arc<Self>, camera_id: String) {
220        let (tx, rx) = watch::channel(false);
221        // Trigger window channel (event / scheduled_event). Starts with no active window.
222        let (trig_tx, trig_rx) = watch::channel(None::<DateTime<Utc>>);
223        let generation = self.next_generation.fetch_add(1, Ordering::Relaxed);
224
225        // Hold the map lock across spawn+insert so a concurrent stop()/delete can never observe a
226        // gap where the task is running but not yet registered (which would let it slip through).
227        let mut tasks = self.tasks.lock().await;
228        let me = self.clone();
229        let id_for_task = camera_id.clone();
230        let handle = tokio::spawn(async move {
231            me.supervise(id_for_task, generation, rx, trig_rx).await;
232        });
233        if let Some(old) = tasks.insert(
234            camera_id,
235            CameraTask {
236                stop: tx,
237                trigger: trig_tx,
238                handle,
239                generation,
240            },
241        ) {
242            // Displaced a previous task: signal AND abort it so two FFmpegs never overlap.
243            let _ = old.stop.send(true);
244            old.handle.abort();
245        }
246    }
247
248    async fn supervise(
249        self: Arc<Self>,
250        camera_id: String,
251        generation: u64,
252        stop: watch::Receiver<bool>,
253        trigger: watch::Receiver<Option<DateTime<Utc>>>,
254    ) {
255        // Choose the supervisor by record mode at task start. A mode change always goes through
256        // `reconcile()` (stop + respawn), so picking the path here is sufficient; both paths also
257        // self-exit if the camera is later deleted / disabled / its mode no longer matches.
258        let mode: Option<String> =
259            sqlx::query_scalar("SELECT record_mode FROM cameras WHERE id = ?")
260                .bind(&camera_id)
261                .fetch_optional(&self.pool)
262                .await
263                .ok()
264                .flatten();
265        if event_capable(mode.as_deref().unwrap_or("continuous")) {
266            self.run_event_supervise(camera_id.clone(), stop, trigger)
267                .await;
268        } else {
269            self.run_supervise(camera_id.clone(), stop).await;
270        }
271        // Self-exit cleanup: remove our own entry, but only if it is still ours (a concurrent
272        // spawn may have installed a newer task for this camera).
273        let mut tasks = self.tasks.lock().await;
274        if tasks.get(&camera_id).map(|t| t.generation) == Some(generation) {
275            tasks.remove(&camera_id);
276            tracing::debug!(%camera_id, "recorder: task removed itself from map on exit");
277        }
278    }
279
280    async fn run_supervise(&self, camera_id: String, mut stop: watch::Receiver<bool>) {
281        let mut backoff: u64 = 1;
282        loop {
283            if *stop.borrow() {
284                return;
285            }
286
287            let cam = match sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
288                .bind(&camera_id)
289                .fetch_optional(&self.pool)
290                .await
291            {
292                Ok(Some(c)) => c,
293                Ok(None) => return, // camera deleted
294                Err(e) => {
295                    tracing::error!(%camera_id, error = %e, "recorder: failed to load camera");
296                    if sleep_or_stop(&mut stop, 10).await {
297                        return;
298                    }
299                    continue;
300                }
301            };
302            if !cam.should_record() {
303                let _ = repo::set_state(&self.pool, &camera_id, "disabled", None).await;
304                return;
305            }
306
307            let Some(url) = camera_url::record_url(&cam) else {
308                let msg = "no RTSP URL: set address+credentials or an explicit stream URL";
309                let _ = repo::set_state(&self.pool, &camera_id, "error", Some(msg)).await;
310                let _ = repo::log_event(
311                    &self.pool,
312                    Some(&camera_id),
313                    "recorder_error",
314                    "warning",
315                    json!({ "reason": msg }),
316                )
317                .await;
318                if sleep_or_stop(&mut stop, 30).await {
319                    return;
320                }
321                continue;
322            };
323
324            let dir = self.cfg.camera_recordings_dir(&camera_id);
325            if let Err(e) = tokio::fs::create_dir_all(&dir).await {
326                tracing::error!(%camera_id, error = %e, "recorder: cannot create recordings dir");
327            }
328            let seg = cam.segment_seconds.max(2);
329            let masked = camera_url::mask_url(&url);
330
331            let _ = repo::set_state(&self.pool, &camera_id, "connecting", None).await;
332            tracing::info!(%camera_id, url = %masked, segment_s = seg, "recorder: starting ffmpeg");
333
334            let mut child = match self.build_record_command(&cam, &url, &dir).spawn() {
335                Ok(c) => c,
336                Err(e) => {
337                    let msg = format!("spawn ffmpeg failed: {e}");
338                    tracing::error!(%camera_id, "{msg}");
339                    let _ = repo::set_state(&self.pool, &camera_id, "error", Some(&msg)).await;
340                    if sleep_or_stop(&mut stop, 15).await {
341                        return;
342                    }
343                    continue;
344                }
345            };
346
347            let pid = child.id().map(|p| p as i64);
348            let _ = repo::set_running(&self.pool, &camera_id, "recording", pid).await;
349
350            // Drain stderr concurrently (so the pipe never blocks ffmpeg), keeping only a bounded
351            // tail so a chatty/long-lived recorder cannot grow this buffer without bound.
352            let stderr = child.stderr.take();
353            let stderr_task = tokio::spawn(async move {
354                let mut tail: Vec<u8> = Vec::new();
355                if let Some(mut s) = stderr {
356                    let mut chunk = [0u8; 4096];
357                    loop {
358                        match s.read(&mut chunk).await {
359                            Ok(0) | Err(_) => break,
360                            Ok(n) => {
361                                tail.extend_from_slice(&chunk[..n]);
362                                if tail.len() > STDERR_TAIL_CAP {
363                                    let excess = tail.len() - STDERR_TAIL_CAP;
364                                    tail.drain(0..excess);
365                                }
366                            }
367                        }
368                    }
369                }
370                tail
371            });
372
373            let started = Utc::now();
374            tokio::select! {
375                status = child.wait() => {
376                    let raw = String::from_utf8_lossy(&stderr_task.await.unwrap_or_default())
377                        .trim().to_string();
378                    // Mask any credentials FFmpeg echoes back in the RTSP URL before persisting/logging.
379                    let err_tail = camera_url::mask_url(&raw);
380                    let ran = (Utc::now() - started).num_seconds();
381                    match status {
382                        Ok(s) if s.success() =>
383                            tracing::warn!(%camera_id, ran_s = ran, "ffmpeg exited (stream ended)"),
384                        Ok(s) =>
385                            tracing::warn!(%camera_id, ran_s = ran, code = ?s.code(), tail = %err_tail, "ffmpeg exited with error"),
386                        Err(e) =>
387                            tracing::error!(%camera_id, error = %e, "ffmpeg wait failed"),
388                    }
389                    let _ = repo::bump_reconnect(&self.pool, &camera_id, &err_tail).await;
390                    let _ = repo::log_event(&self.pool, Some(&camera_id), "camera_offline", "warning",
391                        json!({ "ran_seconds": ran, "detail": err_tail })).await;
392                    // Reset backoff if it ran a healthy while; otherwise exponential up to 30s.
393                    backoff = if ran > 30 { 1 } else { (backoff * 2).min(30) };
394                    if sleep_or_stop(&mut stop, backoff).await {
395                        return;
396                    }
397                }
398                _ = stop.changed() => {
399                    tracing::info!(%camera_id, "recorder: stop requested");
400                    let _ = child.kill().await;
401                    let _ = repo::set_state(&self.pool, &camera_id, "offline", None).await;
402                    return;
403                }
404            }
405        }
406    }
407
408    /// Build the segmenting FFmpeg command for a camera's recorded stream. Delegates to the shared
409    /// [`build_record_command`] free fn so the continuous / event supervisors AND the mirror recorder
410    /// all produce byte-identical recordings.
411    fn build_record_command(&self, cam: &Camera, url: &str, dir: &std::path::Path) -> Command {
412        build_record_command(&self.cfg, cam, url, dir)
413    }
414
415    /// Supervise an EVENT-capable camera (`event` / `scheduled_event`). The task is always ARMED: it
416    /// sits idle (status `disabled`) until either a trigger window is active — a [`Self::trigger`] set
417    /// `window_end = now + post_roll_seconds` — or, for `scheduled_event`, a recording window is open.
418    /// While either holds it records continuously (segmenting like the main recorder), reconnecting
419    /// with backoff on stream loss, and stops once the trigger window has elapsed AND no schedule
420    /// window is open.
421    ///
422    /// PRE-ROLL is best-effort: the kernel keeps no always-on ring buffer for idle event cameras, so
423    /// recording begins at the trigger. `pre_roll_seconds` is honored only from recent completed
424    /// segments that already exist on disk (e.g. a `scheduled_event` window already in progress, or a
425    /// still-active prior trigger) — assembled at clip/evidence-export time. Frame-accurate pre-roll
426    /// for an idle camera would require continuous buffering (a future enhancement).
427    async fn run_event_supervise(
428        &self,
429        camera_id: String,
430        mut stop: watch::Receiver<bool>,
431        mut trig: watch::Receiver<Option<DateTime<Utc>>>,
432    ) {
433        // Reasons the inner ffmpeg session ended.
434        enum End {
435            Stop,
436            WindowClosed,
437            Exited(std::io::Result<std::process::ExitStatus>),
438        }
439
440        let mut backoff: u64 = 1;
441        loop {
442            if *stop.borrow() {
443                return;
444            }
445            let cam = match sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
446                .bind(&camera_id)
447                .fetch_optional(&self.pool)
448                .await
449            {
450                Ok(Some(c)) => c,
451                Ok(None) => return, // camera deleted
452                Err(e) => {
453                    tracing::error!(%camera_id, error = %e, "recorder(event): failed to load camera");
454                    if sleep_or_stop(&mut stop, 10).await {
455                        return;
456                    }
457                    continue;
458                }
459            };
460            if !cam.should_record() {
461                let _ = repo::set_state(&self.pool, &camera_id, "disabled", None).await;
462                return;
463            }
464            if !event_capable(&cam.record_mode) {
465                // Mode changed out from under us; let reconcile() respawn the right supervisor.
466                return;
467            }
468
469            // Should we be recording right now? A trigger window OR (for scheduled_event) a schedule
470            // window. eval_schedule() returns false for pure `event`, so triggers are its only source.
471            let now = Utc::now();
472            let trigger_active = matches!(*trig.borrow(), Some(end) if now <= end);
473            let schedule_active = self.eval_schedule(&camera_id).await;
474            if !(trigger_active || schedule_active) {
475                // Idle / armed: wait for a trigger, a periodic re-check (a scheduled_event window may
476                // open), or a stop. Status mirrors the legacy "event camera not recording" state.
477                let _ = repo::set_state(&self.pool, &camera_id, "disabled", None).await;
478                let idle_tick = self.cfg.schedule_check_interval_s.max(5);
479                tokio::select! {
480                    _ = stop.changed() => return,
481                    _ = trig.changed() => {}
482                    _ = tokio::time::sleep(Duration::from_secs(idle_tick)) => {}
483                }
484                continue;
485            }
486
487            let Some(url) = camera_url::record_url(&cam) else {
488                let msg = "no RTSP URL: set address+credentials or an explicit stream URL";
489                let _ = repo::set_state(&self.pool, &camera_id, "error", Some(msg)).await;
490                let _ = repo::log_event(
491                    &self.pool,
492                    Some(&camera_id),
493                    "recorder_error",
494                    "warning",
495                    json!({ "reason": msg }),
496                )
497                .await;
498                if sleep_or_stop(&mut stop, 30).await {
499                    return;
500                }
501                continue;
502            };
503
504            let dir = self.cfg.camera_recordings_dir(&camera_id);
505            if let Err(e) = tokio::fs::create_dir_all(&dir).await {
506                tracing::error!(%camera_id, error = %e, "recorder(event): cannot create recordings dir");
507            }
508            let masked = camera_url::mask_url(&url);
509            let _ = repo::set_state(&self.pool, &camera_id, "connecting", None).await;
510            tracing::info!(%camera_id, url = %masked, "recorder(event): trigger/window active; starting ffmpeg");
511
512            let mut child = match self.build_record_command(&cam, &url, &dir).spawn() {
513                Ok(c) => c,
514                Err(e) => {
515                    let msg = format!("spawn ffmpeg failed: {e}");
516                    tracing::error!(%camera_id, "{msg}");
517                    let _ = repo::set_state(&self.pool, &camera_id, "error", Some(&msg)).await;
518                    if sleep_or_stop(&mut stop, 15).await {
519                        return;
520                    }
521                    continue;
522                }
523            };
524            let pid = child.id().map(|p| p as i64);
525            let _ = repo::set_running(&self.pool, &camera_id, "recording", pid).await;
526
527            // Drain stderr concurrently, keeping a bounded tail (same as the main recorder).
528            let stderr = child.stderr.take();
529            let stderr_task = tokio::spawn(async move {
530                let mut tail: Vec<u8> = Vec::new();
531                if let Some(mut s) = stderr {
532                    let mut chunk = [0u8; 4096];
533                    loop {
534                        match s.read(&mut chunk).await {
535                            Ok(0) | Err(_) => break,
536                            Ok(n) => {
537                                tail.extend_from_slice(&chunk[..n]);
538                                if tail.len() > STDERR_TAIL_CAP {
539                                    let excess = tail.len() - STDERR_TAIL_CAP;
540                                    tail.drain(0..excess);
541                                }
542                            }
543                        }
544                    }
545                }
546                tail
547            });
548
549            let started = Utc::now();
550            // Inner loop: keep THIS ffmpeg child alive until stop, process exit, or until we should no
551            // longer be recording (trigger window elapsed AND no schedule window open). Reacts
552            // immediately to an extended/new trigger via `trig.changed()`.
553            let end = loop {
554                let now = Utc::now();
555                // Sleep precisely to the trigger window end (so post-roll stops on time); also re-check
556                // at least every schedule tick to notice a scheduled_event window closing.
557                let mut recheck = self.cfg.schedule_check_interval_s.max(5);
558                if let Some(w_end) = *trig.borrow() {
559                    if w_end > now {
560                        let remaining = (w_end - now).num_seconds().max(0) as u64 + 1;
561                        recheck = recheck.min(remaining);
562                    }
563                }
564                let recheck = recheck.max(1);
565                tokio::select! {
566                    status = child.wait() => break End::Exited(status),
567                    _ = stop.changed() => break End::Stop,
568                    _ = trig.changed() => { /* window extended/changed; recompute deadline */ }
569                    _ = tokio::time::sleep(Duration::from_secs(recheck)) => {
570                        let now = Utc::now();
571                        let trig_on = matches!(*trig.borrow(), Some(e) if now <= e);
572                        let sched_on = self.eval_schedule(&camera_id).await;
573                        if !(trig_on || sched_on) {
574                            break End::WindowClosed;
575                        }
576                    }
577                }
578            };
579
580            match end {
581                End::Stop => {
582                    tracing::info!(%camera_id, "recorder(event): stop requested");
583                    let _ = child.kill().await;
584                    let _ = repo::set_state(&self.pool, &camera_id, "offline", None).await;
585                    return;
586                }
587                End::WindowClosed => {
588                    let _ = child.kill().await;
589                    let _ = repo::set_state(&self.pool, &camera_id, "disabled", None).await;
590                    tracing::info!(%camera_id, "recorder(event): trigger window elapsed; stopping ffmpeg");
591                    backoff = 1;
592                    // Back to the top: re-evaluate (will idle until the next trigger/window).
593                }
594                End::Exited(status) => {
595                    let raw = String::from_utf8_lossy(&stderr_task.await.unwrap_or_default())
596                        .trim()
597                        .to_string();
598                    let err_tail = camera_url::mask_url(&raw);
599                    let ran = (Utc::now() - started).num_seconds();
600                    match status {
601                        Ok(s) if s.success() => {
602                            tracing::warn!(%camera_id, ran_s = ran, "ffmpeg exited (stream ended)")
603                        }
604                        Ok(s) => {
605                            tracing::warn!(%camera_id, ran_s = ran, code = ?s.code(), tail = %err_tail, "ffmpeg exited with error")
606                        }
607                        Err(e) => tracing::error!(%camera_id, error = %e, "ffmpeg wait failed"),
608                    }
609                    let _ = repo::bump_reconnect(&self.pool, &camera_id, &err_tail).await;
610                    let _ = repo::log_event(
611                        &self.pool,
612                        Some(&camera_id),
613                        "camera_offline",
614                        "warning",
615                        json!({ "ran_seconds": ran, "detail": err_tail }),
616                    )
617                    .await;
618                    backoff = if ran > 30 { 1 } else { (backoff * 2).min(30) };
619                    if sleep_or_stop(&mut stop, backoff).await {
620                        return;
621                    }
622                    // Back to the top: if still inside the window, re-spawns ffmpeg (reconnect).
623                }
624            }
625        }
626    }
627
628    /// Fire an event recording trigger for a camera: extend its trigger window to
629    /// `now + post_roll_seconds` (repeated triggers keep the later end). No-op (returns `None`) for a
630    /// camera that is not `event` / `scheduled_event`, is not recording-enabled, or has no armed task
631    /// (e.g. the recorder is globally disabled). Returns the resulting window end. Cheap and
632    /// idempotent — safe to call on every zone/breach event.
633    pub async fn trigger(&self, camera_id: &str, reason: &str) -> Option<DateTime<Utc>> {
634        let cam = sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
635            .bind(camera_id)
636            .fetch_optional(&self.pool)
637            .await
638            .ok()
639            .flatten()?;
640        if !cam.should_record() || !event_capable(&cam.record_mode) {
641            return None;
642        }
643        let post = cam.post_roll_seconds.clamp(0, 3600);
644        let end = Utc::now() + chrono::Duration::seconds(post);
645
646        let tasks = self.tasks.lock().await;
647        let task = tasks.get(camera_id)?;
648        let mut window_end = end;
649        task.trigger.send_modify(|cur| {
650            // Keep the later of the existing window and this one (a trigger only extends).
651            let next = match *cur {
652                Some(existing) if existing > end => existing,
653                _ => end,
654            };
655            *cur = Some(next);
656            window_end = next;
657        });
658        tracing::info!(%camera_id, %reason, window_end = %window_end, "recorder: event trigger");
659        Some(window_end)
660    }
661}
662
663/// Build the segmenting FFmpeg command for a camera's recorded stream (stream-copy, fragmented-MP4
664/// segments, UTC strftime names). Shared verbatim by the continuous + event supervisors and the
665/// mirror recorder ([`crate::services::mirror`]) so every pipeline writes byte-identical segments.
666/// Video is always `-c copy`; audio is passed through only when the camera opts in. `dir` is the
667/// output directory (the primary recordings dir, or the mirror dir for the mirror recorder).
668pub(crate) fn build_record_command(
669    cfg: &Config,
670    cam: &Camera,
671    url: &str,
672    dir: &std::path::Path,
673) -> Command {
674    let seg = cam.segment_seconds.max(2);
675    let pattern = dir.join("%Y%m%d_%H%M%S.mp4");
676    let audio_args: &[&str] = if cam.record_audio {
677        &["-c:a", "copy"]
678    } else {
679        &["-an"]
680    };
681    let mut cmd = Command::new(&cfg.ffmpeg_bin);
682    cmd.kill_on_drop(true)
683        .env("TZ", "UTC")
684        .args(["-nostdin", "-hide_banner", "-loglevel", "warning"])
685        .args(["-rtsp_transport", "tcp"])
686        .args(["-timeout", "15000000"]) // 15s RTSP socket I/O timeout -> exit on stall
687        .args(["-i", url])
688        .args(["-c", "copy"]) // stream-copy (no decode)
689        .args(audio_args) // audio: pass-through when record_audio, else dropped
690        .args(["-f", "segment"])
691        .args(["-segment_time", &seg.to_string()])
692        .args(["-segment_format", "mp4"])
693        .args([
694            "-segment_format_options",
695            "movflags=+frag_keyframe+empty_moov+default_base_moof",
696        ])
697        .args(["-reset_timestamps", "1"])
698        .args(["-strftime", "1"])
699        .arg(&pattern)
700        .stdin(Stdio::null())
701        .stdout(Stdio::null())
702        .stderr(Stdio::piped());
703    cmd
704}
705
706/// Sleep for `secs`, returning `true` if a stop was signaled during the wait.
707async fn sleep_or_stop(stop: &mut watch::Receiver<bool>, secs: u64) -> bool {
708    if *stop.borrow() {
709        return true;
710    }
711    tokio::select! {
712        _ = tokio::time::sleep(Duration::from_secs(secs)) => *stop.borrow(),
713        _ = stop.changed() => *stop.borrow(),
714    }
715}
716
717/// Parse "HH:MM" 24h into minutes-since-midnight (0..=1439). Tolerates non-zero-padded hours/minutes.
718fn parse_hhmm(s: &str) -> Option<i32> {
719    let (h, m) = s.split_once(':')?;
720    let h: i32 = h.trim().parse().ok()?;
721    let m: i32 = m.trim().parse().ok()?;
722    ((0..24).contains(&h) && (0..60).contains(&m)).then_some(h * 60 + m)
723}
724
725/// Parse a JSON array of weekday ints into a list, keeping only valid 0..6 (0=Mon..6=Sun) values.
726fn parse_days(v: &Value) -> Vec<i64> {
727    v.as_array()
728        .map(|a| {
729            a.iter()
730                .filter_map(|d| d.as_i64())
731                .filter(|d| (0..7).contains(d))
732                .collect()
733        })
734        .unwrap_or_default()
735}
736
737/// Is a window active at weekday `wd` (0=Mon..6=Sun) and minute-of-day `minute`? A same-day window
738/// (`start` <= `end`) is `[start, end)` on a scheduled day. An overnight window (`start` > `end`)
739/// wraps midnight: its evening part is on the start day; its early-morning part (before `end`)
740/// belongs to the window that STARTED the previous day.
741fn window_active(days: &[i64], start: i32, end: i32, wd: i64, minute: i32) -> bool {
742    if start <= end {
743        days.contains(&wd) && minute >= start && minute < end
744    } else {
745        let prev = (wd + 6) % 7; // yesterday's weekday
746        (days.contains(&wd) && minute >= start) || (days.contains(&prev) && minute < end)
747    }
748}
749
750/// Whether a single schedule row is active at local instant `now`. Malformed times never match.
751fn schedule_active_at(s: &RecordSchedule, now: DateTime<Local>) -> bool {
752    let (Some(start), Some(end)) = (parse_hhmm(&s.time_start), parse_hhmm(&s.time_end)) else {
753        return false;
754    };
755    let days = parse_days(&s.days.0);
756    let wd = now.weekday().num_days_from_monday() as i64; // 0=Mon..6=Sun
757    let minute = now.hour() as i32 * 60 + now.minute() as i32;
758    window_active(&days, start, end, wd, minute)
759}
760
761#[cfg(test)]
762mod tests {
763    use super::*;
764    use serde_json::json;
765
766    #[test]
767    fn event_capable_modes() {
768        assert!(event_capable("event"));
769        assert!(event_capable("scheduled_event"));
770        assert!(!event_capable("continuous"));
771        assert!(!event_capable("scheduled"));
772        assert!(!event_capable("nonsense"));
773    }
774
775    #[test]
776    fn parse_hhmm_valid_and_invalid() {
777        assert_eq!(parse_hhmm("00:00"), Some(0));
778        assert_eq!(parse_hhmm("9:30"), Some(570));
779        assert_eq!(parse_hhmm("23:59"), Some(1439));
780        assert_eq!(parse_hhmm("24:00"), None);
781        assert_eq!(parse_hhmm("12:60"), None);
782        assert_eq!(parse_hhmm("x:y"), None);
783        assert_eq!(parse_hhmm("1230"), None);
784    }
785
786    #[test]
787    fn parse_days_filters_out_of_range() {
788        assert_eq!(parse_days(&json!([0, 1, 6])), vec![0, 1, 6]);
789        assert_eq!(parse_days(&json!([7, -1, 3])), vec![3]);
790        assert_eq!(parse_days(&json!("nope")), Vec::<i64>::new());
791    }
792
793    #[test]
794    fn window_same_day() {
795        let days = vec![0, 1, 2, 3, 4]; // Mon..Fri, 09:00..17:00
796        assert!(window_active(&days, 540, 1020, 0, 600)); // Mon 10:00 -> in
797        assert!(!window_active(&days, 540, 1020, 0, 480)); // Mon 08:00 -> before
798        assert!(!window_active(&days, 540, 1020, 0, 1020)); // end is exclusive
799        assert!(!window_active(&days, 540, 1020, 5, 600)); // Sat -> not scheduled
800    }
801
802    #[test]
803    fn window_overnight_wrap() {
804        let days = vec![0]; // Monday window 22:00..06:00
805        let (start, end) = (1320, 360);
806        assert!(window_active(&days, start, end, 0, 1380)); // Mon 23:00 -> evening part
807        assert!(window_active(&days, start, end, 1, 120)); // Tue 02:00 -> Monday's carryover
808        assert!(!window_active(&days, start, end, 1, 400)); // Tue 06:40 -> after end
809        assert!(!window_active(&days, start, end, 0, 300)); // Mon 05:00 -> would be Sunday's window
810    }
811}