Skip to main content

heldar_kernel/services/
sampler.rs

1//! AI frame sampler (Stage 2): for each (camera, stream_profile) that has an enabled AI task, decode
2//! that stream at a budgeted frame rate and write the latest frame to `frames/<cam>/latest_<profile>.jpg`
3//! (atomic rename, so readers never see a torn JPEG). AI workers pull frames on their own cadence.
4//! A global FPS budget is shared across samplers, and the number of concurrent decoders is capped, so
5//! adding AI cameras degrades gracefully instead of overloading the host (backpressure). AI workers
6//! never touch RTSP directly — they consume sampled frames + post detections back.
7
8use std::collections::HashMap;
9use std::process::Stdio;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use serde::Serialize;
14use serde_json::json;
15use sqlx::SqlitePool;
16use tokio::io::AsyncReadExt;
17use tokio::process::Command;
18use tokio::sync::{watch, Mutex};
19use tokio::task::JoinHandle;
20
21use crate::camera_url;
22use crate::config::Config;
23use crate::models::Camera;
24use crate::repo;
25
26const STDERR_TAIL_CAP: usize = 8192;
27const MIN_FPS: f64 = 0.5;
28
29/// Map a (camera, profile) pair to a stable sampler key + frame filename.
30fn sampler_key(camera_id: &str, profile: &str) -> String {
31    format!("{camera_id}:{profile}")
32}
33fn frame_filename(profile: &str) -> String {
34    format!("latest_{profile}.jpg")
35}
36
37struct SamplerTask {
38    stop: watch::Sender<bool>,
39    handle: JoinHandle<()>,
40}
41
42#[derive(Debug, Clone, Serialize)]
43pub struct SamplerInfo {
44    pub camera_id: String,
45    pub stream_profile: String,
46    pub state: String,
47    pub fps: f64,
48}
49
50/// Owns and supervises the per-(camera,profile) frame samplers.
51pub struct SamplerManager {
52    pool: SqlitePool,
53    cfg: Arc<Config>,
54    tasks: Mutex<HashMap<String, SamplerTask>>,
55    info: Mutex<HashMap<String, SamplerInfo>>,
56    rebalance_lock: Mutex<()>,
57}
58
59impl SamplerManager {
60    pub fn new(pool: SqlitePool, cfg: Arc<Config>) -> Arc<Self> {
61        Arc::new(Self {
62            pool,
63            cfg,
64            tasks: Mutex::new(HashMap::new()),
65            info: Mutex::new(HashMap::new()),
66            rebalance_lock: Mutex::new(()),
67        })
68    }
69
70    pub async fn start_all(self: &Arc<Self>) {
71        self.rebalance().await;
72    }
73
74    /// React to AI-task / camera changes: recompute the budget and (re)start samplers.
75    pub async fn reconcile(self: &Arc<Self>) {
76        self.rebalance().await;
77    }
78
79    /// Per-(camera,profile) sampler status (state + effective fps).
80    pub async fn statuses(&self) -> Vec<SamplerInfo> {
81        let mut v: Vec<SamplerInfo> = self.info.lock().await.values().cloned().collect();
82        v.sort_by(|a, b| {
83            (a.camera_id.as_str(), a.stream_profile.as_str())
84                .cmp(&(b.camera_id.as_str(), b.stream_profile.as_str()))
85        });
86        v
87    }
88
89    /// Stop, recompute the active set + per-camera fps budget, and restart all samplers. Serialized
90    /// by `rebalance_lock` so concurrent AI-task edits cannot race into overlapping ffmpegs.
91    async fn rebalance(self: &Arc<Self>) {
92        let _guard = self.rebalance_lock.lock().await;
93
94        let ids: Vec<String> = { self.tasks.lock().await.keys().cloned().collect() };
95        for id in ids {
96            self.stop(&id).await;
97        }
98        self.info.lock().await.clear();
99
100        if !self.cfg.ai_enabled {
101            return;
102        }
103
104        // Each (camera, stream_profile) with at least one enabled task, with its max fps + width.
105        let rows: Vec<(String, String, f64, i64, i64)> = sqlx::query_as(
106            "SELECT c.id, t.stream_profile, MAX(t.fps) AS fps, MAX(t.width) AS width, c.priority
107             FROM cameras c JOIN ai_tasks t ON t.camera_id = c.id
108             WHERE c.enabled = 1 AND t.enabled = 1
109             GROUP BY c.id, t.stream_profile
110             ORDER BY c.priority DESC, c.id, t.stream_profile",
111        )
112        .fetch_all(&self.pool)
113        .await
114        .unwrap_or_default();
115
116        if rows.is_empty() {
117            return;
118        }
119
120        let budget = self.cfg.ai_max_total_fps.max(1.0);
121        // Cap concurrent decoders so total fps cannot exceed the budget even at the MIN_FPS floor.
122        let max_samplers = (budget / MIN_FPS).floor().max(1.0) as usize;
123        let run = rows.len().min(max_samplers);
124        // Priority-aware allocation: rows are ordered priority DESC, so high-priority cameras (e.g. an
125        // ANPR gate lane) get their requested fps first and the lowest-priority cameras are floored to
126        // MIN_FPS or shed to 0 — instead of degrading every camera equally / blinding arbitrary ones.
127        let want: Vec<f64> = rows.iter().map(|r| r.2).collect();
128        let alloc = allocate_fps(&want, budget, max_samplers);
129        if rows.len() > run {
130            tracing::warn!(
131                requested = rows.len(),
132                running = run,
133                "sampler: AI fps budget exhausted; lowest-priority cameras will not be sampled"
134            );
135        }
136        tracing::info!(
137            samplers = run,
138            budget,
139            "sampler: rebalancing AI frame budget by priority"
140        );
141
142        for (i, (cam, profile, _max_fps, width, _priority)) in rows.into_iter().enumerate() {
143            let fps = alloc[i];
144            if fps > 0.0 {
145                self.spawn(cam, profile, fps, width).await;
146            } else {
147                self.set_info(&cam, &profile, "budget_exhausted", 0.0).await;
148            }
149        }
150    }
151
152    async fn stop(self: &Arc<Self>, key: &str) {
153        let task = { self.tasks.lock().await.remove(key) };
154        if let Some(task) = task {
155            let _ = task.stop.send(true);
156            let mut handle = task.handle;
157            if tokio::time::timeout(Duration::from_secs(8), &mut handle)
158                .await
159                .is_err()
160            {
161                tracing::warn!(key, "sampler: task did not stop within 8s; aborting");
162                handle.abort();
163                let _ = handle.await;
164            }
165        }
166    }
167
168    pub async fn shutdown(self: &Arc<Self>) {
169        // Hold the rebalance lock so an in-flight reconcile cannot re-spawn after we stop.
170        let _guard = self.rebalance_lock.lock().await;
171        let ids: Vec<String> = { self.tasks.lock().await.keys().cloned().collect() };
172        for id in ids {
173            self.stop(&id).await;
174        }
175    }
176
177    async fn spawn(self: &Arc<Self>, camera_id: String, profile: String, fps: f64, width: i64) {
178        let key = sampler_key(&camera_id, &profile);
179        let (tx, rx) = watch::channel(false);
180        let mut tasks = self.tasks.lock().await;
181        let me = self.clone();
182        let handle = tokio::spawn(async move {
183            me.supervise(camera_id, profile, fps, width, rx).await;
184        });
185        if let Some(old) = tasks.insert(key, SamplerTask { stop: tx, handle }) {
186            let _ = old.stop.send(true);
187            old.handle.abort();
188        }
189    }
190
191    async fn set_info(&self, camera_id: &str, profile: &str, state: &str, fps: f64) {
192        self.info.lock().await.insert(
193            sampler_key(camera_id, profile),
194            SamplerInfo {
195                camera_id: camera_id.to_string(),
196                stream_profile: profile.to_string(),
197                state: state.to_string(),
198                fps,
199            },
200        );
201    }
202
203    /// Remove this sampler's own task + info entry (on a self-initiated exit).
204    async fn cleanup_self(&self, key: &str) {
205        self.tasks.lock().await.remove(key);
206        self.info.lock().await.remove(key);
207    }
208
209    async fn supervise(
210        self: Arc<Self>,
211        camera_id: String,
212        profile: String,
213        fps: f64,
214        width: i64,
215        mut stop: watch::Receiver<bool>,
216    ) {
217        let key = sampler_key(&camera_id, &profile);
218        let mut backoff: u64 = 1;
219        loop {
220            if *stop.borrow() {
221                self.set_info(&camera_id, &profile, "stopped", fps).await;
222                return;
223            }
224            let cam = match sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
225                .bind(&camera_id)
226                .fetch_optional(&self.pool)
227                .await
228            {
229                Ok(Some(c)) if c.enabled => c,
230                Ok(_) => {
231                    // Camera deleted or disabled: clean up our own slot and exit.
232                    self.cleanup_self(&key).await;
233                    return;
234                }
235                Err(e) => {
236                    tracing::error!(%camera_id, error = %e, "sampler: failed to load camera");
237                    if sleep_or_stop(&mut stop, 10).await {
238                        return;
239                    }
240                    continue;
241                }
242            };
243
244            let Some(url) =
245                camera_url::stream_url(&cam, &profile).or_else(|| camera_url::record_url(&cam))
246            else {
247                self.set_info(&camera_id, &profile, "error", fps).await;
248                if sleep_or_stop(&mut stop, 30).await {
249                    return;
250                }
251                continue;
252            };
253
254            let dir = self.cfg.camera_frames_dir(&camera_id);
255            if let Err(e) = tokio::fs::create_dir_all(&dir).await {
256                tracing::error!(%camera_id, error = %e, "sampler: cannot create frames dir");
257            }
258            let latest = dir.join(frame_filename(&profile));
259            let vf = format!("fps={fps},scale={width}:-2");
260            self.set_info(&camera_id, &profile, "connecting", fps).await;
261            tracing::info!(%camera_id, %profile, fps, width, url = %camera_url::mask_url(&url), "sampler: starting");
262
263            let mut child = match Command::new(&self.cfg.ffmpeg_bin)
264                .kill_on_drop(true)
265                .args(["-nostdin", "-hide_banner", "-loglevel", "warning"])
266                .args(["-rtsp_transport", "tcp"])
267                .args(["-timeout", "15000000"])
268                .args(["-i", &url])
269                .args(["-an", "-vf", &vf, "-q:v", "5"])
270                // atomic_writing makes ffmpeg write to a temp file and rename, so a worker reading
271                // the frame never sees a half-written JPEG.
272                .args(["-f", "image2", "-update", "1", "-atomic_writing", "1", "-y"])
273                .arg(&latest)
274                .stdin(Stdio::null())
275                .stdout(Stdio::null())
276                .stderr(Stdio::piped())
277                .spawn()
278            {
279                Ok(c) => c,
280                Err(e) => {
281                    tracing::error!(%camera_id, "sampler: spawn ffmpeg failed: {e}");
282                    self.set_info(&camera_id, &profile, "error", fps).await;
283                    if sleep_or_stop(&mut stop, 15).await {
284                        return;
285                    }
286                    continue;
287                }
288            };
289            self.set_info(&camera_id, &profile, "sampling", fps).await;
290            let started = Instant::now();
291
292            let stderr = child.stderr.take();
293            let stderr_task = tokio::spawn(async move {
294                let mut tail: Vec<u8> = Vec::new();
295                if let Some(mut s) = stderr {
296                    let mut chunk = [0u8; 4096];
297                    loop {
298                        match s.read(&mut chunk).await {
299                            Ok(0) | Err(_) => break,
300                            Ok(n) => {
301                                tail.extend_from_slice(&chunk[..n]);
302                                if tail.len() > STDERR_TAIL_CAP {
303                                    let excess = tail.len() - STDERR_TAIL_CAP;
304                                    tail.drain(0..excess);
305                                }
306                            }
307                        }
308                    }
309                }
310                tail
311            });
312
313            tokio::select! {
314                status = child.wait() => {
315                    let tail = String::from_utf8_lossy(&stderr_task.await.unwrap_or_default()).trim().to_string();
316                    let masked = camera_url::mask_url(&tail);
317                    tracing::warn!(%camera_id, %profile, status = ?status.ok().and_then(|s| s.code()), tail = %masked, "sampler: ffmpeg exited");
318                    self.set_info(&camera_id, &profile, "offline", fps).await;
319                    let _ = repo::log_event(&self.pool, Some(&camera_id), "sampler_offline", "warning",
320                        json!({ "profile": profile, "detail": masked })).await;
321                    // Reset backoff after a healthy run (>30s); otherwise grow it (exponential up to
322                    // 30s) so a persistently-failing camera doesn't hot-loop ffmpeg restarts. Mirrors
323                    // the recorder so a camera that flaps then recovers retries promptly.
324                    backoff = if started.elapsed().as_secs() > 30 { 1 } else { (backoff * 2).min(30) };
325                    if sleep_or_stop(&mut stop, backoff).await {
326                        return;
327                    }
328                }
329                _ = stop.changed() => {
330                    let _ = child.kill().await;
331                    self.set_info(&camera_id, &profile, "stopped", fps).await;
332                    return;
333                }
334            }
335        }
336    }
337}
338
339async fn sleep_or_stop(stop: &mut watch::Receiver<bool>, secs: u64) -> bool {
340    if *stop.borrow() {
341        return true;
342    }
343    tokio::select! {
344        _ = tokio::time::sleep(Duration::from_secs(secs)) => *stop.borrow(),
345        _ = stop.changed() => *stop.borrow(),
346    }
347}
348
349/// Allocate the global AI fps `budget` across `want` (each camera's requested max fps), which MUST be
350/// ordered priority-high-first. Returns granted fps per camera (0.0 = shed / budget-exhausted). Greedy
351/// by priority: each running camera gets its requested fps while reserving `MIN_FPS` for the remaining
352/// running cameras — so high-priority cameras keep full fidelity, the lowest-priority are floored to
353/// `MIN_FPS`, and any beyond `max_samplers` are shed to 0.
354fn allocate_fps(want: &[f64], budget: f64, max_samplers: usize) -> Vec<f64> {
355    let run = want.len().min(max_samplers);
356    let mut out = vec![0.0; want.len()];
357    let mut remaining = budget;
358    for (i, &w) in want.iter().enumerate().take(run) {
359        let others_after = (run - i - 1) as f64;
360        let reserve = MIN_FPS * others_after;
361        let grant = w.min((remaining - reserve).max(MIN_FPS)).max(MIN_FPS);
362        out[i] = grant;
363        remaining -= grant;
364    }
365    out
366}
367
368impl SamplerManager {
369    /// Filesystem path of the latest sampled frame for a (camera, profile).
370    pub fn frame_path(&self, camera_id: &str, profile: &str) -> std::path::PathBuf {
371        self.cfg
372            .camera_frames_dir(camera_id)
373            .join(frame_filename(profile))
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380
381    #[test]
382    fn allocate_favors_priority_then_floors_the_rest() {
383        // Priority-ordered requests [10,5,5] against a budget of 10.
384        let got = allocate_fps(&[10.0, 5.0, 5.0], 10.0, 10);
385        assert_eq!(got.len(), 3);
386        assert!(
387            got[0] > got[1] && got[0] > got[2],
388            "highest-priority camera gets the most: {got:?}"
389        );
390        assert!(
391            got[1] >= MIN_FPS && got[2] >= MIN_FPS,
392            "running cameras stay >= MIN_FPS: {got:?}"
393        );
394        assert!(
395            (got.iter().sum::<f64>() - 10.0).abs() < 1e-9,
396            "the whole budget is allocated when demand exceeds it: {got:?}"
397        );
398    }
399
400    #[test]
401    fn allocate_sheds_lowest_priority_beyond_capacity() {
402        // Room for only 2 of 3 cameras: the last (lowest-priority) is shed to 0.
403        let got = allocate_fps(&[5.0, 5.0, 5.0], 10.0, 2);
404        assert_eq!(got[2], 0.0, "lowest-priority camera shed to 0: {got:?}");
405        assert!(
406            got[0] >= MIN_FPS && got[1] >= MIN_FPS,
407            "the two running cameras stay >= MIN_FPS: {got:?}"
408        );
409    }
410}