Skip to main content

heldar_kernel/services/
sampler.rs

1//! AI frame sampler (Stage 2): for each (camera, stream_profile) that has an enabled AI task, decode
2//! that stream at a budgeted frame rate and write the latest frame to `frames/<cam>/latest_<profile>.jpg`
3//! (atomic rename, so readers never see a torn JPEG). AI workers pull frames on their own cadence.
4//! A global FPS budget is shared across samplers, and the number of concurrent decoders is capped, so
5//! adding AI cameras degrades gracefully instead of overloading the host (backpressure). AI workers
6//! never touch RTSP directly — they consume sampled frames + post detections back.
7
8use std::collections::HashMap;
9use std::process::Stdio;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use serde::Serialize;
14use serde_json::json;
15use sqlx::SqlitePool;
16use tokio::io::AsyncReadExt;
17use tokio::process::Command;
18use tokio::sync::{watch, Mutex};
19use tokio::task::JoinHandle;
20
21use crate::camera_url;
22use crate::config::Config;
23use crate::models::Camera;
24use crate::repo;
25
26const STDERR_TAIL_CAP: usize = 8192;
27const MIN_FPS: f64 = 0.5;
28
29/// Map a (camera, profile) pair to a stable sampler key + frame filename.
30fn sampler_key(camera_id: &str, profile: &str) -> String {
31    format!("{camera_id}:{profile}")
32}
33fn frame_filename(profile: &str) -> String {
34    format!("latest_{profile}.jpg")
35}
36
37struct SamplerTask {
38    stop: watch::Sender<bool>,
39    handle: JoinHandle<()>,
40}
41
42#[derive(Debug, Clone, Serialize)]
43pub struct SamplerInfo {
44    pub camera_id: String,
45    pub stream_profile: String,
46    pub state: String,
47    pub fps: f64,
48}
49
50/// Owns and supervises the per-(camera,profile) frame samplers.
51pub struct SamplerManager {
52    pool: SqlitePool,
53    cfg: Arc<Config>,
54    tasks: Mutex<HashMap<String, SamplerTask>>,
55    info: Mutex<HashMap<String, SamplerInfo>>,
56    rebalance_lock: Mutex<()>,
57}
58
59impl SamplerManager {
60    pub fn new(pool: SqlitePool, cfg: Arc<Config>) -> Arc<Self> {
61        Arc::new(Self {
62            pool,
63            cfg,
64            tasks: Mutex::new(HashMap::new()),
65            info: Mutex::new(HashMap::new()),
66            rebalance_lock: Mutex::new(()),
67        })
68    }
69
70    pub async fn start_all(self: &Arc<Self>) {
71        self.rebalance().await;
72    }
73
74    /// React to AI-task / camera changes: recompute the budget and (re)start samplers.
75    pub async fn reconcile(self: &Arc<Self>) {
76        self.rebalance().await;
77    }
78
79    /// Per-(camera,profile) sampler status (state + effective fps).
80    pub async fn statuses(&self) -> Vec<SamplerInfo> {
81        let mut v: Vec<SamplerInfo> = self.info.lock().await.values().cloned().collect();
82        v.sort_by(|a, b| {
83            (a.camera_id.as_str(), a.stream_profile.as_str())
84                .cmp(&(b.camera_id.as_str(), b.stream_profile.as_str()))
85        });
86        v
87    }
88
89    /// Stop, recompute the active set + per-camera fps budget, and restart all samplers. Serialized
90    /// by `rebalance_lock` so concurrent AI-task edits cannot race into overlapping ffmpegs.
91    async fn rebalance(self: &Arc<Self>) {
92        let _guard = self.rebalance_lock.lock().await;
93
94        let ids: Vec<String> = { self.tasks.lock().await.keys().cloned().collect() };
95        for id in ids {
96            self.stop(&id).await;
97        }
98        self.info.lock().await.clear();
99
100        if !self.cfg.ai_enabled {
101            return;
102        }
103
104        // Each (camera, stream_profile) with at least one enabled task, with its max fps + width.
105        let rows: Vec<(String, String, f64, i64)> = sqlx::query_as(
106            "SELECT c.id, t.stream_profile, MAX(t.fps) AS fps, MAX(t.width) AS width
107             FROM cameras c JOIN ai_tasks t ON t.camera_id = c.id
108             WHERE c.enabled = 1 AND t.enabled = 1
109             GROUP BY c.id, t.stream_profile
110             ORDER BY c.id, t.stream_profile",
111        )
112        .fetch_all(&self.pool)
113        .await
114        .unwrap_or_default();
115
116        if rows.is_empty() {
117            return;
118        }
119
120        let budget = self.cfg.ai_max_total_fps.max(1.0);
121        // Cap concurrent decoders so total fps cannot exceed the budget even at the MIN_FPS floor.
122        let max_samplers = (budget / MIN_FPS).floor().max(1.0) as usize;
123        let run = rows.len().min(max_samplers);
124        let per_camera_cap = budget / run as f64;
125        if rows.len() > run {
126            tracing::warn!(
127                requested = rows.len(),
128                running = run,
129                "sampler: AI fps budget exhausted; some cameras will not be sampled"
130            );
131        }
132        tracing::info!(
133            samplers = run,
134            budget,
135            per_camera_cap,
136            "sampler: rebalancing AI frame budget"
137        );
138
139        for (i, (cam, profile, max_fps, width)) in rows.into_iter().enumerate() {
140            if i < run {
141                let effective = max_fps.min(per_camera_cap).max(MIN_FPS);
142                self.spawn(cam, profile, effective, width).await;
143            } else {
144                self.set_info(&cam, &profile, "budget_exhausted", 0.0).await;
145            }
146        }
147    }
148
149    async fn stop(self: &Arc<Self>, key: &str) {
150        let task = { self.tasks.lock().await.remove(key) };
151        if let Some(task) = task {
152            let _ = task.stop.send(true);
153            let mut handle = task.handle;
154            if tokio::time::timeout(Duration::from_secs(8), &mut handle)
155                .await
156                .is_err()
157            {
158                tracing::warn!(key, "sampler: task did not stop within 8s; aborting");
159                handle.abort();
160                let _ = handle.await;
161            }
162        }
163    }
164
165    pub async fn shutdown(self: &Arc<Self>) {
166        // Hold the rebalance lock so an in-flight reconcile cannot re-spawn after we stop.
167        let _guard = self.rebalance_lock.lock().await;
168        let ids: Vec<String> = { self.tasks.lock().await.keys().cloned().collect() };
169        for id in ids {
170            self.stop(&id).await;
171        }
172    }
173
174    async fn spawn(self: &Arc<Self>, camera_id: String, profile: String, fps: f64, width: i64) {
175        let key = sampler_key(&camera_id, &profile);
176        let (tx, rx) = watch::channel(false);
177        let mut tasks = self.tasks.lock().await;
178        let me = self.clone();
179        let handle = tokio::spawn(async move {
180            me.supervise(camera_id, profile, fps, width, rx).await;
181        });
182        if let Some(old) = tasks.insert(key, SamplerTask { stop: tx, handle }) {
183            let _ = old.stop.send(true);
184            old.handle.abort();
185        }
186    }
187
188    async fn set_info(&self, camera_id: &str, profile: &str, state: &str, fps: f64) {
189        self.info.lock().await.insert(
190            sampler_key(camera_id, profile),
191            SamplerInfo {
192                camera_id: camera_id.to_string(),
193                stream_profile: profile.to_string(),
194                state: state.to_string(),
195                fps,
196            },
197        );
198    }
199
200    /// Remove this sampler's own task + info entry (on a self-initiated exit).
201    async fn cleanup_self(&self, key: &str) {
202        self.tasks.lock().await.remove(key);
203        self.info.lock().await.remove(key);
204    }
205
206    async fn supervise(
207        self: Arc<Self>,
208        camera_id: String,
209        profile: String,
210        fps: f64,
211        width: i64,
212        mut stop: watch::Receiver<bool>,
213    ) {
214        let key = sampler_key(&camera_id, &profile);
215        let mut backoff: u64 = 1;
216        loop {
217            if *stop.borrow() {
218                self.set_info(&camera_id, &profile, "stopped", fps).await;
219                return;
220            }
221            let cam = match sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
222                .bind(&camera_id)
223                .fetch_optional(&self.pool)
224                .await
225            {
226                Ok(Some(c)) if c.enabled => c,
227                Ok(_) => {
228                    // Camera deleted or disabled: clean up our own slot and exit.
229                    self.cleanup_self(&key).await;
230                    return;
231                }
232                Err(e) => {
233                    tracing::error!(%camera_id, error = %e, "sampler: failed to load camera");
234                    if sleep_or_stop(&mut stop, 10).await {
235                        return;
236                    }
237                    continue;
238                }
239            };
240
241            let Some(url) =
242                camera_url::stream_url(&cam, &profile).or_else(|| camera_url::record_url(&cam))
243            else {
244                self.set_info(&camera_id, &profile, "error", fps).await;
245                if sleep_or_stop(&mut stop, 30).await {
246                    return;
247                }
248                continue;
249            };
250
251            let dir = self.cfg.camera_frames_dir(&camera_id);
252            if let Err(e) = tokio::fs::create_dir_all(&dir).await {
253                tracing::error!(%camera_id, error = %e, "sampler: cannot create frames dir");
254            }
255            let latest = dir.join(frame_filename(&profile));
256            let vf = format!("fps={fps},scale={width}:-2");
257            self.set_info(&camera_id, &profile, "connecting", fps).await;
258            tracing::info!(%camera_id, %profile, fps, width, url = %camera_url::mask_url(&url), "sampler: starting");
259
260            let mut child = match Command::new(&self.cfg.ffmpeg_bin)
261                .kill_on_drop(true)
262                .args(["-nostdin", "-hide_banner", "-loglevel", "warning"])
263                .args(["-rtsp_transport", "tcp"])
264                .args(["-timeout", "15000000"])
265                .args(["-i", &url])
266                .args(["-an", "-vf", &vf, "-q:v", "5"])
267                // atomic_writing makes ffmpeg write to a temp file and rename, so a worker reading
268                // the frame never sees a half-written JPEG.
269                .args(["-f", "image2", "-update", "1", "-atomic_writing", "1", "-y"])
270                .arg(&latest)
271                .stdin(Stdio::null())
272                .stdout(Stdio::null())
273                .stderr(Stdio::piped())
274                .spawn()
275            {
276                Ok(c) => c,
277                Err(e) => {
278                    tracing::error!(%camera_id, "sampler: spawn ffmpeg failed: {e}");
279                    self.set_info(&camera_id, &profile, "error", fps).await;
280                    if sleep_or_stop(&mut stop, 15).await {
281                        return;
282                    }
283                    continue;
284                }
285            };
286            self.set_info(&camera_id, &profile, "sampling", fps).await;
287            let started = Instant::now();
288
289            let stderr = child.stderr.take();
290            let stderr_task = tokio::spawn(async move {
291                let mut tail: Vec<u8> = Vec::new();
292                if let Some(mut s) = stderr {
293                    let mut chunk = [0u8; 4096];
294                    loop {
295                        match s.read(&mut chunk).await {
296                            Ok(0) | Err(_) => break,
297                            Ok(n) => {
298                                tail.extend_from_slice(&chunk[..n]);
299                                if tail.len() > STDERR_TAIL_CAP {
300                                    let excess = tail.len() - STDERR_TAIL_CAP;
301                                    tail.drain(0..excess);
302                                }
303                            }
304                        }
305                    }
306                }
307                tail
308            });
309
310            tokio::select! {
311                status = child.wait() => {
312                    let tail = String::from_utf8_lossy(&stderr_task.await.unwrap_or_default()).trim().to_string();
313                    let masked = camera_url::mask_url(&tail);
314                    tracing::warn!(%camera_id, %profile, status = ?status.ok().and_then(|s| s.code()), tail = %masked, "sampler: ffmpeg exited");
315                    self.set_info(&camera_id, &profile, "offline", fps).await;
316                    let _ = repo::log_event(&self.pool, Some(&camera_id), "sampler_offline", "warning",
317                        json!({ "profile": profile, "detail": masked })).await;
318                    // Reset backoff after a healthy run (>30s); otherwise grow it (exponential up to
319                    // 30s) so a persistently-failing camera doesn't hot-loop ffmpeg restarts. Mirrors
320                    // the recorder so a camera that flaps then recovers retries promptly.
321                    backoff = if started.elapsed().as_secs() > 30 { 1 } else { (backoff * 2).min(30) };
322                    if sleep_or_stop(&mut stop, backoff).await {
323                        return;
324                    }
325                }
326                _ = stop.changed() => {
327                    let _ = child.kill().await;
328                    self.set_info(&camera_id, &profile, "stopped", fps).await;
329                    return;
330                }
331            }
332        }
333    }
334}
335
336async fn sleep_or_stop(stop: &mut watch::Receiver<bool>, secs: u64) -> bool {
337    if *stop.borrow() {
338        return true;
339    }
340    tokio::select! {
341        _ = tokio::time::sleep(Duration::from_secs(secs)) => *stop.borrow(),
342        _ = stop.changed() => *stop.borrow(),
343    }
344}
345
346impl SamplerManager {
347    /// Filesystem path of the latest sampled frame for a (camera, profile).
348    pub fn frame_path(&self, camera_id: &str, profile: &str) -> std::path::PathBuf {
349        self.cfg
350            .camera_frames_dir(camera_id)
351            .join(frame_filename(profile))
352    }
353}