1use std::collections::HashMap;
9use std::process::Stdio;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use serde::Serialize;
14use serde_json::json;
15use sqlx::SqlitePool;
16use tokio::io::AsyncReadExt;
17use tokio::process::Command;
18use tokio::sync::{watch, Mutex};
19use tokio::task::JoinHandle;
20
21use crate::camera_url;
22use crate::config::Config;
23use crate::models::Camera;
24use crate::repo;
25
26const STDERR_TAIL_CAP: usize = 8192;
27const MIN_FPS: f64 = 0.5;
28
29fn sampler_key(camera_id: &str, profile: &str) -> String {
31 format!("{camera_id}:{profile}")
32}
33fn frame_filename(profile: &str) -> String {
34 format!("latest_{profile}.jpg")
35}
36
37struct SamplerTask {
38 stop: watch::Sender<bool>,
39 handle: JoinHandle<()>,
40}
41
42#[derive(Debug, Clone, Serialize)]
43pub struct SamplerInfo {
44 pub camera_id: String,
45 pub stream_profile: String,
46 pub state: String,
47 pub fps: f64,
48}
49
50pub struct SamplerManager {
52 pool: SqlitePool,
53 cfg: Arc<Config>,
54 tasks: Mutex<HashMap<String, SamplerTask>>,
55 info: Mutex<HashMap<String, SamplerInfo>>,
56 rebalance_lock: Mutex<()>,
57}
58
59impl SamplerManager {
60 pub fn new(pool: SqlitePool, cfg: Arc<Config>) -> Arc<Self> {
61 Arc::new(Self {
62 pool,
63 cfg,
64 tasks: Mutex::new(HashMap::new()),
65 info: Mutex::new(HashMap::new()),
66 rebalance_lock: Mutex::new(()),
67 })
68 }
69
70 pub async fn start_all(self: &Arc<Self>) {
71 self.rebalance().await;
72 }
73
74 pub async fn reconcile(self: &Arc<Self>) {
76 self.rebalance().await;
77 }
78
79 pub async fn statuses(&self) -> Vec<SamplerInfo> {
81 let mut v: Vec<SamplerInfo> = self.info.lock().await.values().cloned().collect();
82 v.sort_by(|a, b| {
83 (a.camera_id.as_str(), a.stream_profile.as_str())
84 .cmp(&(b.camera_id.as_str(), b.stream_profile.as_str()))
85 });
86 v
87 }
88
89 async fn rebalance(self: &Arc<Self>) {
92 let _guard = self.rebalance_lock.lock().await;
93
94 let ids: Vec<String> = { self.tasks.lock().await.keys().cloned().collect() };
95 for id in ids {
96 self.stop(&id).await;
97 }
98 self.info.lock().await.clear();
99
100 if !self.cfg.ai_enabled {
101 return;
102 }
103
104 let rows: Vec<(String, String, f64, i64, i64)> = sqlx::query_as(
106 "SELECT c.id, t.stream_profile, MAX(t.fps) AS fps, MAX(t.width) AS width, c.priority
107 FROM cameras c JOIN ai_tasks t ON t.camera_id = c.id
108 WHERE c.enabled = 1 AND t.enabled = 1
109 GROUP BY c.id, t.stream_profile
110 ORDER BY c.priority DESC, c.id, t.stream_profile",
111 )
112 .fetch_all(&self.pool)
113 .await
114 .unwrap_or_default();
115
116 if rows.is_empty() {
117 return;
118 }
119
120 let budget = self.cfg.ai_max_total_fps.max(1.0);
121 let max_samplers = (budget / MIN_FPS).floor().max(1.0) as usize;
123 let run = rows.len().min(max_samplers);
124 let want: Vec<f64> = rows.iter().map(|r| r.2).collect();
128 let alloc = allocate_fps(&want, budget, max_samplers);
129 if rows.len() > run {
130 tracing::warn!(
131 requested = rows.len(),
132 running = run,
133 "sampler: AI fps budget exhausted; lowest-priority cameras will not be sampled"
134 );
135 }
136 tracing::info!(
137 samplers = run,
138 budget,
139 "sampler: rebalancing AI frame budget by priority"
140 );
141
142 for (i, (cam, profile, _max_fps, width, _priority)) in rows.into_iter().enumerate() {
143 let fps = alloc[i];
144 if fps > 0.0 {
145 self.spawn(cam, profile, fps, width).await;
146 } else {
147 self.set_info(&cam, &profile, "budget_exhausted", 0.0).await;
148 }
149 }
150 }
151
152 async fn stop(self: &Arc<Self>, key: &str) {
153 let task = { self.tasks.lock().await.remove(key) };
154 if let Some(task) = task {
155 let _ = task.stop.send(true);
156 let mut handle = task.handle;
157 if tokio::time::timeout(Duration::from_secs(8), &mut handle)
158 .await
159 .is_err()
160 {
161 tracing::warn!(key, "sampler: task did not stop within 8s; aborting");
162 handle.abort();
163 let _ = handle.await;
164 }
165 }
166 }
167
168 pub async fn shutdown(self: &Arc<Self>) {
169 let _guard = self.rebalance_lock.lock().await;
171 let ids: Vec<String> = { self.tasks.lock().await.keys().cloned().collect() };
172 for id in ids {
173 self.stop(&id).await;
174 }
175 }
176
177 async fn spawn(self: &Arc<Self>, camera_id: String, profile: String, fps: f64, width: i64) {
178 let key = sampler_key(&camera_id, &profile);
179 let (tx, rx) = watch::channel(false);
180 let mut tasks = self.tasks.lock().await;
181 let me = self.clone();
182 let handle = tokio::spawn(async move {
183 me.supervise(camera_id, profile, fps, width, rx).await;
184 });
185 if let Some(old) = tasks.insert(key, SamplerTask { stop: tx, handle }) {
186 let _ = old.stop.send(true);
187 old.handle.abort();
188 }
189 }
190
191 async fn set_info(&self, camera_id: &str, profile: &str, state: &str, fps: f64) {
192 self.info.lock().await.insert(
193 sampler_key(camera_id, profile),
194 SamplerInfo {
195 camera_id: camera_id.to_string(),
196 stream_profile: profile.to_string(),
197 state: state.to_string(),
198 fps,
199 },
200 );
201 }
202
203 async fn cleanup_self(&self, key: &str) {
205 self.tasks.lock().await.remove(key);
206 self.info.lock().await.remove(key);
207 }
208
209 async fn supervise(
210 self: Arc<Self>,
211 camera_id: String,
212 profile: String,
213 fps: f64,
214 width: i64,
215 mut stop: watch::Receiver<bool>,
216 ) {
217 let key = sampler_key(&camera_id, &profile);
218 let mut backoff: u64 = 1;
219 loop {
220 if *stop.borrow() {
221 self.set_info(&camera_id, &profile, "stopped", fps).await;
222 return;
223 }
224 let cam = match sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
225 .bind(&camera_id)
226 .fetch_optional(&self.pool)
227 .await
228 {
229 Ok(Some(c)) if c.enabled => c,
230 Ok(_) => {
231 self.cleanup_self(&key).await;
233 return;
234 }
235 Err(e) => {
236 tracing::error!(%camera_id, error = %e, "sampler: failed to load camera");
237 if sleep_or_stop(&mut stop, 10).await {
238 return;
239 }
240 continue;
241 }
242 };
243
244 let Some(url) =
245 camera_url::stream_url(&cam, &profile).or_else(|| camera_url::record_url(&cam))
246 else {
247 self.set_info(&camera_id, &profile, "error", fps).await;
248 if sleep_or_stop(&mut stop, 30).await {
249 return;
250 }
251 continue;
252 };
253
254 let dir = self.cfg.camera_frames_dir(&camera_id);
255 if let Err(e) = tokio::fs::create_dir_all(&dir).await {
256 tracing::error!(%camera_id, error = %e, "sampler: cannot create frames dir");
257 }
258 let latest = dir.join(frame_filename(&profile));
259 let vf = format!("fps={fps},scale={width}:-2");
260 self.set_info(&camera_id, &profile, "connecting", fps).await;
261 tracing::info!(%camera_id, %profile, fps, width, url = %camera_url::mask_url(&url), "sampler: starting");
262
263 let mut child = match Command::new(&self.cfg.ffmpeg_bin)
264 .kill_on_drop(true)
265 .args(["-nostdin", "-hide_banner", "-loglevel", "warning"])
266 .args(["-rtsp_transport", "tcp"])
267 .args(["-timeout", "15000000"])
268 .args(["-i", &url])
269 .args(["-an", "-vf", &vf, "-q:v", "5"])
270 .args(["-f", "image2", "-update", "1", "-atomic_writing", "1", "-y"])
273 .arg(&latest)
274 .stdin(Stdio::null())
275 .stdout(Stdio::null())
276 .stderr(Stdio::piped())
277 .spawn()
278 {
279 Ok(c) => c,
280 Err(e) => {
281 tracing::error!(%camera_id, "sampler: spawn ffmpeg failed: {e}");
282 self.set_info(&camera_id, &profile, "error", fps).await;
283 if sleep_or_stop(&mut stop, 15).await {
284 return;
285 }
286 continue;
287 }
288 };
289 self.set_info(&camera_id, &profile, "sampling", fps).await;
290 let started = Instant::now();
291
292 let stderr = child.stderr.take();
293 let stderr_task = tokio::spawn(async move {
294 let mut tail: Vec<u8> = Vec::new();
295 if let Some(mut s) = stderr {
296 let mut chunk = [0u8; 4096];
297 loop {
298 match s.read(&mut chunk).await {
299 Ok(0) | Err(_) => break,
300 Ok(n) => {
301 tail.extend_from_slice(&chunk[..n]);
302 if tail.len() > STDERR_TAIL_CAP {
303 let excess = tail.len() - STDERR_TAIL_CAP;
304 tail.drain(0..excess);
305 }
306 }
307 }
308 }
309 }
310 tail
311 });
312
313 tokio::select! {
314 status = child.wait() => {
315 let tail = String::from_utf8_lossy(&stderr_task.await.unwrap_or_default()).trim().to_string();
316 let masked = camera_url::mask_url(&tail);
317 tracing::warn!(%camera_id, %profile, status = ?status.ok().and_then(|s| s.code()), tail = %masked, "sampler: ffmpeg exited");
318 self.set_info(&camera_id, &profile, "offline", fps).await;
319 let _ = repo::log_event(&self.pool, Some(&camera_id), "sampler_offline", "warning",
320 json!({ "profile": profile, "detail": masked })).await;
321 backoff = if started.elapsed().as_secs() > 30 { 1 } else { (backoff * 2).min(30) };
325 if sleep_or_stop(&mut stop, backoff).await {
326 return;
327 }
328 }
329 _ = stop.changed() => {
330 let _ = child.kill().await;
331 self.set_info(&camera_id, &profile, "stopped", fps).await;
332 return;
333 }
334 }
335 }
336 }
337}
338
339async fn sleep_or_stop(stop: &mut watch::Receiver<bool>, secs: u64) -> bool {
340 if *stop.borrow() {
341 return true;
342 }
343 tokio::select! {
344 _ = tokio::time::sleep(Duration::from_secs(secs)) => *stop.borrow(),
345 _ = stop.changed() => *stop.borrow(),
346 }
347}
348
349fn allocate_fps(want: &[f64], budget: f64, max_samplers: usize) -> Vec<f64> {
355 let run = want.len().min(max_samplers);
356 let mut out = vec![0.0; want.len()];
357 let mut remaining = budget;
358 for (i, &w) in want.iter().enumerate().take(run) {
359 let others_after = (run - i - 1) as f64;
360 let reserve = MIN_FPS * others_after;
361 let grant = w.min((remaining - reserve).max(MIN_FPS)).max(MIN_FPS);
362 out[i] = grant;
363 remaining -= grant;
364 }
365 out
366}
367
368impl SamplerManager {
369 pub fn frame_path(&self, camera_id: &str, profile: &str) -> std::path::PathBuf {
371 self.cfg
372 .camera_frames_dir(camera_id)
373 .join(frame_filename(profile))
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380
381 #[test]
382 fn allocate_favors_priority_then_floors_the_rest() {
383 let got = allocate_fps(&[10.0, 5.0, 5.0], 10.0, 10);
385 assert_eq!(got.len(), 3);
386 assert!(
387 got[0] > got[1] && got[0] > got[2],
388 "highest-priority camera gets the most: {got:?}"
389 );
390 assert!(
391 got[1] >= MIN_FPS && got[2] >= MIN_FPS,
392 "running cameras stay >= MIN_FPS: {got:?}"
393 );
394 assert!(
395 (got.iter().sum::<f64>() - 10.0).abs() < 1e-9,
396 "the whole budget is allocated when demand exceeds it: {got:?}"
397 );
398 }
399
400 #[test]
401 fn allocate_sheds_lowest_priority_beyond_capacity() {
402 let got = allocate_fps(&[5.0, 5.0, 5.0], 10.0, 2);
404 assert_eq!(got[2], 0.0, "lowest-priority camera shed to 0: {got:?}");
405 assert!(
406 got[0] >= MIN_FPS && got[1] >= MIN_FPS,
407 "the two running cameras stay >= MIN_FPS: {got:?}"
408 );
409 }
410}