heldar_kernel/services/
mirror.rs1use std::collections::HashMap;
12use std::path::PathBuf;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use chrono::Utc;
18use sqlx::SqlitePool;
19use tokio::io::AsyncReadExt;
20use tokio::sync::{watch, Mutex};
21use tokio::task::JoinHandle;
22
23use crate::camera_url;
24use crate::config::Config;
25use crate::models::Camera;
26use crate::services::recorder::build_record_command;
27
28const STDERR_TAIL_CAP: usize = 8192;
30
31struct MirrorTask {
32 stop: watch::Sender<bool>,
33 handle: JoinHandle<()>,
34 generation: u64,
36}
37
38pub struct MirrorRecorderManager {
40 pool: SqlitePool,
41 cfg: Arc<Config>,
42 mirror_root: PathBuf,
44 tasks: Mutex<HashMap<String, MirrorTask>>,
45 next_generation: AtomicU64,
46}
47
48impl MirrorRecorderManager {
49 pub fn new(pool: SqlitePool, cfg: Arc<Config>, mirror_root: PathBuf) -> Arc<Self> {
50 Arc::new(Self {
51 pool,
52 cfg,
53 mirror_root,
54 tasks: Mutex::new(HashMap::new()),
55 next_generation: AtomicU64::new(1),
56 })
57 }
58
59 fn camera_dir(&self, camera_id: &str) -> PathBuf {
61 self.mirror_root.join(camera_id)
62 }
63
64 fn should_mirror(cam: &Camera) -> bool {
66 cam.should_record() && cam.mirror_enabled
67 }
68
69 pub async fn start_all(self: &Arc<Self>) -> anyhow::Result<()> {
71 if !self.cfg.recorder_enabled {
72 return Ok(());
73 }
74 let cams: Vec<Camera> = sqlx::query_as::<_, Camera>(
75 "SELECT * FROM cameras WHERE enabled = 1 AND record_enabled = 1 AND mirror_enabled = 1",
76 )
77 .fetch_all(&self.pool)
78 .await?;
79 tracing::info!(count = cams.len(), root = %self.mirror_root.display(), "mirror: starting cameras");
80 for cam in cams {
81 self.spawn(cam.id).await;
82 }
83 Ok(())
84 }
85
86 pub async fn reconcile(self: &Arc<Self>, camera_id: &str) {
90 self.stop(camera_id).await;
91 if !self.cfg.recorder_enabled {
92 return;
93 }
94 let cam = sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
95 .bind(camera_id)
96 .fetch_optional(&self.pool)
97 .await
98 .ok()
99 .flatten();
100 if let Some(cam) = cam {
101 if Self::should_mirror(&cam) {
102 self.spawn(camera_id.to_string()).await;
103 }
104 }
105 }
106
107 pub async fn stop(self: &Arc<Self>, camera_id: &str) {
109 let task = { self.tasks.lock().await.remove(camera_id) };
110 if let Some(task) = task {
111 let _ = task.stop.send(true);
112 let mut handle = task.handle;
113 if tokio::time::timeout(Duration::from_secs(8), &mut handle)
114 .await
115 .is_err()
116 {
117 tracing::warn!(%camera_id, "mirror: task did not stop within 8s; aborting");
118 handle.abort();
119 let _ = handle.await;
120 }
121 }
122 }
123
124 pub async fn shutdown(self: &Arc<Self>) {
126 let ids: Vec<String> = { self.tasks.lock().await.keys().cloned().collect() };
127 tracing::info!(count = ids.len(), "mirror: shutting down");
128 for id in ids {
129 self.stop(&id).await;
130 }
131 }
132
133 async fn spawn(self: &Arc<Self>, camera_id: String) {
134 let (tx, rx) = watch::channel(false);
135 let generation = self.next_generation.fetch_add(1, Ordering::Relaxed);
136 let mut tasks = self.tasks.lock().await;
139 let me = self.clone();
140 let id_for_task = camera_id.clone();
141 let handle = tokio::spawn(async move {
142 me.supervise(id_for_task, generation, rx).await;
143 });
144 if let Some(old) = tasks.insert(
145 camera_id,
146 MirrorTask {
147 stop: tx,
148 handle,
149 generation,
150 },
151 ) {
152 let _ = old.stop.send(true);
153 old.handle.abort();
154 }
155 }
156
157 async fn supervise(
158 self: Arc<Self>,
159 camera_id: String,
160 generation: u64,
161 stop: watch::Receiver<bool>,
162 ) {
163 self.run_mirror(camera_id.clone(), stop).await;
164 let mut tasks = self.tasks.lock().await;
166 if tasks.get(&camera_id).map(|t| t.generation) == Some(generation) {
167 tasks.remove(&camera_id);
168 }
169 }
170
171 async fn run_mirror(&self, camera_id: String, mut stop: watch::Receiver<bool>) {
174 let mut backoff: u64 = 1;
175 loop {
176 if *stop.borrow() {
177 return;
178 }
179 let cam = match sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
180 .bind(&camera_id)
181 .fetch_optional(&self.pool)
182 .await
183 {
184 Ok(Some(c)) => c,
185 Ok(None) => return, Err(e) => {
187 tracing::error!(%camera_id, error = %e, "mirror: failed to load camera");
188 if sleep_or_stop(&mut stop, 10).await {
189 return;
190 }
191 continue;
192 }
193 };
194 if !Self::should_mirror(&cam) {
195 return;
197 }
198
199 let Some(url) = camera_url::record_url(&cam) else {
200 tracing::warn!(%camera_id, "mirror: no RTSP URL; retrying");
201 if sleep_or_stop(&mut stop, 30).await {
202 return;
203 }
204 continue;
205 };
206
207 let dir = self.camera_dir(&camera_id);
208 if let Err(e) = tokio::fs::create_dir_all(&dir).await {
209 tracing::error!(%camera_id, error = %e, "mirror: cannot create mirror dir");
210 }
211 let masked = camera_url::mask_url(&url);
212 tracing::info!(%camera_id, url = %masked, dir = %dir.display(), "mirror: starting ffmpeg");
213
214 let mut child = match build_record_command(&self.cfg, &cam, &url, &dir).spawn() {
215 Ok(c) => c,
216 Err(e) => {
217 tracing::error!(%camera_id, "mirror: spawn ffmpeg failed: {e}");
218 if sleep_or_stop(&mut stop, 15).await {
219 return;
220 }
221 continue;
222 }
223 };
224
225 let stderr = child.stderr.take();
227 let stderr_task = tokio::spawn(async move {
228 let mut tail: Vec<u8> = Vec::new();
229 if let Some(mut s) = stderr {
230 let mut chunk = [0u8; 4096];
231 loop {
232 match s.read(&mut chunk).await {
233 Ok(0) | Err(_) => break,
234 Ok(n) => {
235 tail.extend_from_slice(&chunk[..n]);
236 if tail.len() > STDERR_TAIL_CAP {
237 let excess = tail.len() - STDERR_TAIL_CAP;
238 tail.drain(0..excess);
239 }
240 }
241 }
242 }
243 }
244 tail
245 });
246
247 let started = Utc::now();
248 tokio::select! {
249 status = child.wait() => {
250 let raw = String::from_utf8_lossy(&stderr_task.await.unwrap_or_default())
251 .trim().to_string();
252 let err_tail = camera_url::mask_url(&raw);
253 let ran = (Utc::now() - started).num_seconds();
254 match status {
255 Ok(s) if s.success() =>
256 tracing::warn!(%camera_id, ran_s = ran, "mirror: ffmpeg exited (stream ended)"),
257 Ok(s) =>
258 tracing::warn!(%camera_id, ran_s = ran, code = ?s.code(), tail = %err_tail, "mirror: ffmpeg exited with error"),
259 Err(e) =>
260 tracing::error!(%camera_id, error = %e, "mirror: ffmpeg wait failed"),
261 }
262 backoff = if ran > 30 { 1 } else { (backoff * 2).min(30) };
263 if sleep_or_stop(&mut stop, backoff).await {
264 return;
265 }
266 }
267 _ = stop.changed() => {
268 tracing::info!(%camera_id, "mirror: stop requested");
269 let _ = child.kill().await;
270 return;
271 }
272 }
273 }
274 }
275}
276
277async fn sleep_or_stop(stop: &mut watch::Receiver<bool>, secs: u64) -> bool {
279 if *stop.borrow() {
280 return true;
281 }
282 tokio::select! {
283 _ = tokio::time::sleep(Duration::from_secs(secs)) => *stop.borrow(),
284 _ = stop.changed() => *stop.borrow(),
285 }
286}