1use std::collections::HashMap;
9use std::process::Stdio;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use serde::Serialize;
14use serde_json::json;
15use sqlx::SqlitePool;
16use tokio::io::AsyncReadExt;
17use tokio::process::Command;
18use tokio::sync::{watch, Mutex};
19use tokio::task::JoinHandle;
20
21use crate::camera_url;
22use crate::config::Config;
23use crate::models::Camera;
24use crate::repo;
25
26const STDERR_TAIL_CAP: usize = 8192;
27const MIN_FPS: f64 = 0.5;
28
29fn sampler_key(camera_id: &str, profile: &str) -> String {
31 format!("{camera_id}:{profile}")
32}
33fn frame_filename(profile: &str) -> String {
34 format!("latest_{profile}.jpg")
35}
36
37struct SamplerTask {
38 stop: watch::Sender<bool>,
39 handle: JoinHandle<()>,
40}
41
42#[derive(Debug, Clone, Serialize)]
43pub struct SamplerInfo {
44 pub camera_id: String,
45 pub stream_profile: String,
46 pub state: String,
47 pub fps: f64,
48}
49
50pub struct SamplerManager {
52 pool: SqlitePool,
53 cfg: Arc<Config>,
54 tasks: Mutex<HashMap<String, SamplerTask>>,
55 info: Mutex<HashMap<String, SamplerInfo>>,
56 rebalance_lock: Mutex<()>,
57}
58
59impl SamplerManager {
60 pub fn new(pool: SqlitePool, cfg: Arc<Config>) -> Arc<Self> {
61 Arc::new(Self {
62 pool,
63 cfg,
64 tasks: Mutex::new(HashMap::new()),
65 info: Mutex::new(HashMap::new()),
66 rebalance_lock: Mutex::new(()),
67 })
68 }
69
70 pub async fn start_all(self: &Arc<Self>) {
71 self.rebalance().await;
72 }
73
74 pub async fn reconcile(self: &Arc<Self>) {
76 self.rebalance().await;
77 }
78
79 pub async fn statuses(&self) -> Vec<SamplerInfo> {
81 let mut v: Vec<SamplerInfo> = self.info.lock().await.values().cloned().collect();
82 v.sort_by(|a, b| {
83 (a.camera_id.as_str(), a.stream_profile.as_str())
84 .cmp(&(b.camera_id.as_str(), b.stream_profile.as_str()))
85 });
86 v
87 }
88
89 async fn rebalance(self: &Arc<Self>) {
92 let _guard = self.rebalance_lock.lock().await;
93
94 let ids: Vec<String> = { self.tasks.lock().await.keys().cloned().collect() };
95 for id in ids {
96 self.stop(&id).await;
97 }
98 self.info.lock().await.clear();
99
100 if !self.cfg.ai_enabled {
101 return;
102 }
103
104 let rows: Vec<(String, String, f64, i64)> = sqlx::query_as(
106 "SELECT c.id, t.stream_profile, MAX(t.fps) AS fps, MAX(t.width) AS width
107 FROM cameras c JOIN ai_tasks t ON t.camera_id = c.id
108 WHERE c.enabled = 1 AND t.enabled = 1
109 GROUP BY c.id, t.stream_profile
110 ORDER BY c.id, t.stream_profile",
111 )
112 .fetch_all(&self.pool)
113 .await
114 .unwrap_or_default();
115
116 if rows.is_empty() {
117 return;
118 }
119
120 let budget = self.cfg.ai_max_total_fps.max(1.0);
121 let max_samplers = (budget / MIN_FPS).floor().max(1.0) as usize;
123 let run = rows.len().min(max_samplers);
124 let per_camera_cap = budget / run as f64;
125 if rows.len() > run {
126 tracing::warn!(
127 requested = rows.len(),
128 running = run,
129 "sampler: AI fps budget exhausted; some cameras will not be sampled"
130 );
131 }
132 tracing::info!(
133 samplers = run,
134 budget,
135 per_camera_cap,
136 "sampler: rebalancing AI frame budget"
137 );
138
139 for (i, (cam, profile, max_fps, width)) in rows.into_iter().enumerate() {
140 if i < run {
141 let effective = max_fps.min(per_camera_cap).max(MIN_FPS);
142 self.spawn(cam, profile, effective, width).await;
143 } else {
144 self.set_info(&cam, &profile, "budget_exhausted", 0.0).await;
145 }
146 }
147 }
148
149 async fn stop(self: &Arc<Self>, key: &str) {
150 let task = { self.tasks.lock().await.remove(key) };
151 if let Some(task) = task {
152 let _ = task.stop.send(true);
153 let mut handle = task.handle;
154 if tokio::time::timeout(Duration::from_secs(8), &mut handle)
155 .await
156 .is_err()
157 {
158 tracing::warn!(key, "sampler: task did not stop within 8s; aborting");
159 handle.abort();
160 let _ = handle.await;
161 }
162 }
163 }
164
165 pub async fn shutdown(self: &Arc<Self>) {
166 let _guard = self.rebalance_lock.lock().await;
168 let ids: Vec<String> = { self.tasks.lock().await.keys().cloned().collect() };
169 for id in ids {
170 self.stop(&id).await;
171 }
172 }
173
174 async fn spawn(self: &Arc<Self>, camera_id: String, profile: String, fps: f64, width: i64) {
175 let key = sampler_key(&camera_id, &profile);
176 let (tx, rx) = watch::channel(false);
177 let mut tasks = self.tasks.lock().await;
178 let me = self.clone();
179 let handle = tokio::spawn(async move {
180 me.supervise(camera_id, profile, fps, width, rx).await;
181 });
182 if let Some(old) = tasks.insert(key, SamplerTask { stop: tx, handle }) {
183 let _ = old.stop.send(true);
184 old.handle.abort();
185 }
186 }
187
188 async fn set_info(&self, camera_id: &str, profile: &str, state: &str, fps: f64) {
189 self.info.lock().await.insert(
190 sampler_key(camera_id, profile),
191 SamplerInfo {
192 camera_id: camera_id.to_string(),
193 stream_profile: profile.to_string(),
194 state: state.to_string(),
195 fps,
196 },
197 );
198 }
199
200 async fn cleanup_self(&self, key: &str) {
202 self.tasks.lock().await.remove(key);
203 self.info.lock().await.remove(key);
204 }
205
206 async fn supervise(
207 self: Arc<Self>,
208 camera_id: String,
209 profile: String,
210 fps: f64,
211 width: i64,
212 mut stop: watch::Receiver<bool>,
213 ) {
214 let key = sampler_key(&camera_id, &profile);
215 let mut backoff: u64 = 1;
216 loop {
217 if *stop.borrow() {
218 self.set_info(&camera_id, &profile, "stopped", fps).await;
219 return;
220 }
221 let cam = match sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
222 .bind(&camera_id)
223 .fetch_optional(&self.pool)
224 .await
225 {
226 Ok(Some(c)) if c.enabled => c,
227 Ok(_) => {
228 self.cleanup_self(&key).await;
230 return;
231 }
232 Err(e) => {
233 tracing::error!(%camera_id, error = %e, "sampler: failed to load camera");
234 if sleep_or_stop(&mut stop, 10).await {
235 return;
236 }
237 continue;
238 }
239 };
240
241 let Some(url) =
242 camera_url::stream_url(&cam, &profile).or_else(|| camera_url::record_url(&cam))
243 else {
244 self.set_info(&camera_id, &profile, "error", fps).await;
245 if sleep_or_stop(&mut stop, 30).await {
246 return;
247 }
248 continue;
249 };
250
251 let dir = self.cfg.camera_frames_dir(&camera_id);
252 if let Err(e) = tokio::fs::create_dir_all(&dir).await {
253 tracing::error!(%camera_id, error = %e, "sampler: cannot create frames dir");
254 }
255 let latest = dir.join(frame_filename(&profile));
256 let vf = format!("fps={fps},scale={width}:-2");
257 self.set_info(&camera_id, &profile, "connecting", fps).await;
258 tracing::info!(%camera_id, %profile, fps, width, url = %camera_url::mask_url(&url), "sampler: starting");
259
260 let mut child = match Command::new(&self.cfg.ffmpeg_bin)
261 .kill_on_drop(true)
262 .args(["-nostdin", "-hide_banner", "-loglevel", "warning"])
263 .args(["-rtsp_transport", "tcp"])
264 .args(["-timeout", "15000000"])
265 .args(["-i", &url])
266 .args(["-an", "-vf", &vf, "-q:v", "5"])
267 .args(["-f", "image2", "-update", "1", "-atomic_writing", "1", "-y"])
270 .arg(&latest)
271 .stdin(Stdio::null())
272 .stdout(Stdio::null())
273 .stderr(Stdio::piped())
274 .spawn()
275 {
276 Ok(c) => c,
277 Err(e) => {
278 tracing::error!(%camera_id, "sampler: spawn ffmpeg failed: {e}");
279 self.set_info(&camera_id, &profile, "error", fps).await;
280 if sleep_or_stop(&mut stop, 15).await {
281 return;
282 }
283 continue;
284 }
285 };
286 self.set_info(&camera_id, &profile, "sampling", fps).await;
287 let started = Instant::now();
288
289 let stderr = child.stderr.take();
290 let stderr_task = tokio::spawn(async move {
291 let mut tail: Vec<u8> = Vec::new();
292 if let Some(mut s) = stderr {
293 let mut chunk = [0u8; 4096];
294 loop {
295 match s.read(&mut chunk).await {
296 Ok(0) | Err(_) => break,
297 Ok(n) => {
298 tail.extend_from_slice(&chunk[..n]);
299 if tail.len() > STDERR_TAIL_CAP {
300 let excess = tail.len() - STDERR_TAIL_CAP;
301 tail.drain(0..excess);
302 }
303 }
304 }
305 }
306 }
307 tail
308 });
309
310 tokio::select! {
311 status = child.wait() => {
312 let tail = String::from_utf8_lossy(&stderr_task.await.unwrap_or_default()).trim().to_string();
313 let masked = camera_url::mask_url(&tail);
314 tracing::warn!(%camera_id, %profile, status = ?status.ok().and_then(|s| s.code()), tail = %masked, "sampler: ffmpeg exited");
315 self.set_info(&camera_id, &profile, "offline", fps).await;
316 let _ = repo::log_event(&self.pool, Some(&camera_id), "sampler_offline", "warning",
317 json!({ "profile": profile, "detail": masked })).await;
318 backoff = if started.elapsed().as_secs() > 30 { 1 } else { (backoff * 2).min(30) };
322 if sleep_or_stop(&mut stop, backoff).await {
323 return;
324 }
325 }
326 _ = stop.changed() => {
327 let _ = child.kill().await;
328 self.set_info(&camera_id, &profile, "stopped", fps).await;
329 return;
330 }
331 }
332 }
333 }
334}
335
336async fn sleep_or_stop(stop: &mut watch::Receiver<bool>, secs: u64) -> bool {
337 if *stop.borrow() {
338 return true;
339 }
340 tokio::select! {
341 _ = tokio::time::sleep(Duration::from_secs(secs)) => *stop.borrow(),
342 _ = stop.changed() => *stop.borrow(),
343 }
344}
345
346impl SamplerManager {
347 pub fn frame_path(&self, camera_id: &str, profile: &str) -> std::path::PathBuf {
349 self.cfg
350 .camera_frames_dir(camera_id)
351 .join(frame_filename(profile))
352 }
353}