1use std::collections::HashMap;
6use std::process::Stdio;
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Duration;
10
11use chrono::Utc;
12use serde_json::json;
13use sqlx::SqlitePool;
14use tokio::io::AsyncReadExt;
15use tokio::process::Command;
16use tokio::sync::{watch, Mutex};
17use tokio::task::JoinHandle;
18
19use crate::camera_url;
20use crate::config::Config;
21use crate::models::Camera;
22use crate::repo;
23
24const STDERR_TAIL_CAP: usize = 8192;
26
27struct CameraTask {
28 stop: watch::Sender<bool>,
29 handle: JoinHandle<()>,
30 generation: u64,
32}
33
34pub struct RecorderManager {
36 pool: SqlitePool,
37 cfg: Arc<Config>,
38 tasks: Mutex<HashMap<String, CameraTask>>,
39 next_generation: AtomicU64,
40}
41
42impl RecorderManager {
43 pub fn new(pool: SqlitePool, cfg: Arc<Config>) -> Arc<Self> {
44 Arc::new(Self {
45 pool,
46 cfg,
47 tasks: Mutex::new(HashMap::new()),
48 next_generation: AtomicU64::new(1),
49 })
50 }
51
52 pub async fn start_all(self: &Arc<Self>) -> anyhow::Result<()> {
54 if !self.cfg.recorder_enabled {
55 tracing::warn!("recorder globally disabled (HELDAR_RECORDER_ENABLED=false)");
56 return Ok(());
57 }
58 let cams: Vec<Camera> = sqlx::query_as::<_, Camera>(
59 "SELECT * FROM cameras WHERE enabled = 1 AND record_enabled = 1",
60 )
61 .fetch_all(&self.pool)
62 .await?;
63 tracing::info!(count = cams.len(), "recorder: starting cameras");
64 for cam in cams {
65 self.spawn(cam.id).await;
66 }
67 Ok(())
68 }
69
70 pub async fn reconcile(self: &Arc<Self>, camera_id: &str) {
72 self.stop(camera_id).await;
73 if !self.cfg.recorder_enabled {
74 return;
75 }
76 let cam = sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
77 .bind(camera_id)
78 .fetch_optional(&self.pool)
79 .await
80 .ok()
81 .flatten();
82 match cam {
83 Some(cam) if cam.should_record() => self.spawn(camera_id.to_string()).await,
84 Some(_) => {
85 let _ = repo::set_state(&self.pool, camera_id, "disabled", None).await;
86 }
87 None => {}
88 }
89 }
90
91 pub async fn stop(self: &Arc<Self>, camera_id: &str) {
94 let task = { self.tasks.lock().await.remove(camera_id) };
95 if let Some(task) = task {
96 let _ = task.stop.send(true);
97 let mut handle = task.handle;
98 if tokio::time::timeout(Duration::from_secs(8), &mut handle)
99 .await
100 .is_err()
101 {
102 tracing::warn!(%camera_id, "recorder: task did not stop within 8s; aborting");
105 handle.abort();
106 let _ = handle.await;
107 }
108 }
109 }
110
111 pub async fn shutdown(self: &Arc<Self>) {
113 let ids: Vec<String> = { self.tasks.lock().await.keys().cloned().collect() };
114 tracing::info!(count = ids.len(), "recorder: shutting down");
115 for id in ids {
116 self.stop(&id).await;
117 }
118 }
119
120 pub async fn active_ids(&self) -> Vec<String> {
122 self.tasks.lock().await.keys().cloned().collect()
123 }
124
125 async fn spawn(self: &Arc<Self>, camera_id: String) {
126 let (tx, rx) = watch::channel(false);
127 let generation = self.next_generation.fetch_add(1, Ordering::Relaxed);
128
129 let mut tasks = self.tasks.lock().await;
132 let me = self.clone();
133 let id_for_task = camera_id.clone();
134 let handle = tokio::spawn(async move {
135 me.supervise(id_for_task, generation, rx).await;
136 });
137 if let Some(old) = tasks.insert(
138 camera_id,
139 CameraTask {
140 stop: tx,
141 handle,
142 generation,
143 },
144 ) {
145 let _ = old.stop.send(true);
147 old.handle.abort();
148 }
149 }
150
151 async fn supervise(
152 self: Arc<Self>,
153 camera_id: String,
154 generation: u64,
155 stop: watch::Receiver<bool>,
156 ) {
157 self.run_supervise(camera_id.clone(), stop).await;
158 let mut tasks = self.tasks.lock().await;
161 if tasks.get(&camera_id).map(|t| t.generation) == Some(generation) {
162 tasks.remove(&camera_id);
163 tracing::debug!(%camera_id, "recorder: task removed itself from map on exit");
164 }
165 }
166
167 async fn run_supervise(&self, camera_id: String, mut stop: watch::Receiver<bool>) {
168 let mut backoff: u64 = 1;
169 loop {
170 if *stop.borrow() {
171 return;
172 }
173
174 let cam = match sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
175 .bind(&camera_id)
176 .fetch_optional(&self.pool)
177 .await
178 {
179 Ok(Some(c)) => c,
180 Ok(None) => return, Err(e) => {
182 tracing::error!(%camera_id, error = %e, "recorder: failed to load camera");
183 if sleep_or_stop(&mut stop, 10).await {
184 return;
185 }
186 continue;
187 }
188 };
189 if !cam.should_record() {
190 let _ = repo::set_state(&self.pool, &camera_id, "disabled", None).await;
191 return;
192 }
193
194 let Some(url) = camera_url::record_url(&cam) else {
195 let msg = "no RTSP URL: set address+credentials or an explicit stream URL";
196 let _ = repo::set_state(&self.pool, &camera_id, "error", Some(msg)).await;
197 let _ = repo::log_event(
198 &self.pool,
199 Some(&camera_id),
200 "recorder_error",
201 "warning",
202 json!({ "reason": msg }),
203 )
204 .await;
205 if sleep_or_stop(&mut stop, 30).await {
206 return;
207 }
208 continue;
209 };
210
211 let dir = self.cfg.camera_recordings_dir(&camera_id);
212 if let Err(e) = tokio::fs::create_dir_all(&dir).await {
213 tracing::error!(%camera_id, error = %e, "recorder: cannot create recordings dir");
214 }
215 let seg = cam.segment_seconds.max(2);
216 let pattern = dir.join("%Y%m%d_%H%M%S.mp4");
217 let masked = camera_url::mask_url(&url);
218
219 let _ = repo::set_state(&self.pool, &camera_id, "connecting", None).await;
220 tracing::info!(%camera_id, url = %masked, segment_s = seg, "recorder: starting ffmpeg");
221
222 let mut child = match Command::new(&self.cfg.ffmpeg_bin)
223 .kill_on_drop(true)
224 .env("TZ", "UTC")
225 .args(["-nostdin", "-hide_banner", "-loglevel", "warning"])
226 .args(["-rtsp_transport", "tcp"])
227 .args(["-timeout", "15000000"]) .args(["-i", &url])
229 .args(["-c", "copy", "-an"]) .args(["-f", "segment"])
231 .args(["-segment_time", &seg.to_string()])
232 .args(["-segment_format", "mp4"])
233 .args([
234 "-segment_format_options",
235 "movflags=+frag_keyframe+empty_moov+default_base_moof",
236 ])
237 .args(["-reset_timestamps", "1"])
238 .args(["-strftime", "1"])
239 .arg(&pattern)
240 .stdin(Stdio::null())
241 .stdout(Stdio::null())
242 .stderr(Stdio::piped())
243 .spawn()
244 {
245 Ok(c) => c,
246 Err(e) => {
247 let msg = format!("spawn ffmpeg failed: {e}");
248 tracing::error!(%camera_id, "{msg}");
249 let _ = repo::set_state(&self.pool, &camera_id, "error", Some(&msg)).await;
250 if sleep_or_stop(&mut stop, 15).await {
251 return;
252 }
253 continue;
254 }
255 };
256
257 let pid = child.id().map(|p| p as i64);
258 let _ = repo::set_running(&self.pool, &camera_id, "recording", pid).await;
259
260 let stderr = child.stderr.take();
263 let stderr_task = tokio::spawn(async move {
264 let mut tail: Vec<u8> = Vec::new();
265 if let Some(mut s) = stderr {
266 let mut chunk = [0u8; 4096];
267 loop {
268 match s.read(&mut chunk).await {
269 Ok(0) | Err(_) => break,
270 Ok(n) => {
271 tail.extend_from_slice(&chunk[..n]);
272 if tail.len() > STDERR_TAIL_CAP {
273 let excess = tail.len() - STDERR_TAIL_CAP;
274 tail.drain(0..excess);
275 }
276 }
277 }
278 }
279 }
280 tail
281 });
282
283 let started = Utc::now();
284 tokio::select! {
285 status = child.wait() => {
286 let raw = String::from_utf8_lossy(&stderr_task.await.unwrap_or_default())
287 .trim().to_string();
288 let err_tail = camera_url::mask_url(&raw);
290 let ran = (Utc::now() - started).num_seconds();
291 match status {
292 Ok(s) if s.success() =>
293 tracing::warn!(%camera_id, ran_s = ran, "ffmpeg exited (stream ended)"),
294 Ok(s) =>
295 tracing::warn!(%camera_id, ran_s = ran, code = ?s.code(), tail = %err_tail, "ffmpeg exited with error"),
296 Err(e) =>
297 tracing::error!(%camera_id, error = %e, "ffmpeg wait failed"),
298 }
299 let _ = repo::bump_reconnect(&self.pool, &camera_id, &err_tail).await;
300 let _ = repo::log_event(&self.pool, Some(&camera_id), "camera_offline", "warning",
301 json!({ "ran_seconds": ran, "detail": err_tail })).await;
302 backoff = if ran > 30 { 1 } else { (backoff * 2).min(30) };
304 if sleep_or_stop(&mut stop, backoff).await {
305 return;
306 }
307 }
308 _ = stop.changed() => {
309 tracing::info!(%camera_id, "recorder: stop requested");
310 let _ = child.kill().await;
311 let _ = repo::set_state(&self.pool, &camera_id, "offline", None).await;
312 return;
313 }
314 }
315 }
316 }
317}
318
319async fn sleep_or_stop(stop: &mut watch::Receiver<bool>, secs: u64) -> bool {
321 if *stop.borrow() {
322 return true;
323 }
324 tokio::select! {
325 _ = tokio::time::sleep(Duration::from_secs(secs)) => *stop.borrow(),
326 _ = stop.changed() => *stop.borrow(),
327 }
328}