Skip to main content

heldar_kernel/services/
zones.rs

1//! Zone engine (Stage 3): evaluates tracked detections against per-camera polygon zones and raises
2//! enter / exit / dwell events (with an evidence frame). State is keyed per (camera, zone, track),
3//! held in memory, and driven by SERVER time (never the worker-supplied timestamp), so a skewed
4//! worker clock cannot corrupt or evict state. A small confirm-frame debounce suppresses boundary
5//! jitter, and a track still inside when its state expires gets a synthesized exit. Fed
6//! synchronously from detection ingest.
7
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use chrono::{DateTime, Duration, Utc};
12use serde_json::{json, Value};
13use sqlx::SqlitePool;
14use tokio::sync::Mutex;
15use uuid::Uuid;
16
17use crate::config::Config;
18use crate::models::{DetectionIngest, Zone};
19use crate::repo;
20use crate::services::recorder::RecorderManager;
21
22/// How long a track's zone state is retained (server time) without being seen before it is pruned.
23const STATE_TTL_SECS: i64 = 120;
24/// Default consecutive-observation confirmation before a membership transition (debounce); a zone
25/// can override via config.confirm_frames.
26const DEFAULT_CONFIRM_FRAMES: u32 = 2;
27
28#[derive(Debug, Clone)]
29struct TrackZoneState {
30    track: String,
31    zone_id: String,
32    zone_name: String,
33    severity: String,
34    inside: bool,
35    entered_at: DateTime<Utc>,
36    dwell_emitted: bool,
37    last_seen: DateTime<Utc>,
38    candidate: Option<bool>,
39    candidate_count: u32,
40}
41
42/// A zone event to persist + log (resolved fields, so prune-time exits need no Zone lookup).
43struct ZoneEvt {
44    camera_id: String,
45    zone_id: String,
46    zone_name: String,
47    severity: String,
48    track: String,
49    event_type: &'static str,
50    label: String,
51    dwell: Option<f64>,
52}
53
54pub struct ZoneEngine {
55    pool: SqlitePool,
56    cfg: Arc<Config>,
57    /// Recorder handle: a committed zone event triggers event-mode recording (no-op for cameras not
58    /// in `event` / `scheduled_event` mode — [`RecorderManager::trigger`] guards on the mode).
59    recorder: Arc<RecorderManager>,
60    state: Mutex<HashMap<String, TrackZoneState>>,
61}
62
63fn point_in_polygon(p: [f64; 2], poly: &[[f64; 2]]) -> bool {
64    let n = poly.len();
65    if n < 3 {
66        return false;
67    }
68    let (x, y) = (p[0], p[1]);
69    let mut inside = false;
70    let mut j = n - 1;
71    for i in 0..n {
72        let (xi, yi) = (poly[i][0], poly[i][1]);
73        let (xj, yj) = (poly[j][0], poly[j][1]);
74        if ((yi > y) != (yj > y)) && (x < (xj - xi) * (y - yi) / (yj - yi) + xi) {
75            inside = !inside;
76        }
77        j = i;
78    }
79    inside
80}
81
82fn parse_polygon(v: &Value) -> Vec<[f64; 2]> {
83    v.as_array()
84        .map(|arr| {
85            arr.iter()
86                .filter_map(|pt| {
87                    let a = pt.as_array()?;
88                    let x = a.first()?.as_f64()?;
89                    let y = a.get(1)?.as_f64()?;
90                    (x.is_finite() && y.is_finite()).then_some([x, y])
91                })
92                .collect()
93        })
94        .unwrap_or_default()
95}
96
97fn parse_labels(v: &Value) -> Vec<String> {
98    v.as_array()
99        .map(|arr| {
100            arr.iter()
101                .filter_map(|l| l.as_str().map(|s| s.to_string()))
102                .collect()
103        })
104        .unwrap_or_default()
105}
106
107fn confirm_frames(zone: &Zone) -> u32 {
108    (zone
109        .config
110        .0
111        .get("confirm_frames")
112        .and_then(|v| v.as_u64())
113        .unwrap_or(DEFAULT_CONFIRM_FRAMES as u64))
114    .clamp(1, 10) as u32
115}
116
117/// Ground point of a detection bbox `[x, y, w, h]` (normalized): bottom-center.
118fn bbox_ground_point(v: &Value) -> Option<[f64; 2]> {
119    let a = v.as_array()?;
120    if a.len() < 4 {
121        return None;
122    }
123    let x = a[0].as_f64()?;
124    let y = a[1].as_f64()?;
125    let w = a[2].as_f64()?;
126    let h = a[3].as_f64()?;
127    if !(x.is_finite() && y.is_finite() && w.is_finite() && h.is_finite()) {
128        return None;
129    }
130    Some([x + w / 2.0, y + h])
131}
132
133#[async_trait::async_trait]
134impl crate::services::consumer::DetectionConsumer for ZoneEngine {
135    fn name(&self) -> &'static str {
136        "zones"
137    }
138    /// The zone engine evaluates any tracked detection, regardless of task type.
139    fn interested_in(&self, _task_type: &str) -> bool {
140        true
141    }
142    async fn consume(&self, batch: &crate::services::consumer::DetectionBatch<'_>) {
143        self.process(batch.camera_id, batch.detections).await;
144    }
145}
146
147impl ZoneEngine {
148    pub fn new(pool: SqlitePool, cfg: Arc<Config>, recorder: Arc<RecorderManager>) -> Arc<Self> {
149        Arc::new(Self {
150            pool,
151            cfg,
152            recorder,
153            state: Mutex::new(HashMap::new()),
154        })
155    }
156
157    /// Evaluate (tracked) detections for a camera against its zones, raising events. Membership is
158    /// driven by server time; the worker-supplied timestamp is not trusted for state/timing.
159    pub async fn process(&self, camera_id: &str, detections: &[DetectionIngest]) {
160        // Dedup tracked detections by track_id (keep the highest-confidence one per track).
161        let mut by_track: HashMap<&str, &DetectionIngest> = HashMap::new();
162        for d in detections {
163            if let (Some(t), Some(_)) = (d.track_id.as_deref(), d.bbox.as_ref()) {
164                let better = by_track
165                    .get(t)
166                    .map(|p: &&DetectionIngest| {
167                        d.confidence.unwrap_or(0.0) > p.confidence.unwrap_or(0.0)
168                    })
169                    .unwrap_or(true);
170                if better {
171                    by_track.insert(t, d);
172                }
173            }
174        }
175        if by_track.is_empty() {
176            return;
177        }
178        let zones = match sqlx::query_as::<_, Zone>(
179            "SELECT * FROM zones WHERE camera_id = ? AND enabled = 1",
180        )
181        .bind(camera_id)
182        .fetch_all(&self.pool)
183        .await
184        {
185            Ok(z) if !z.is_empty() => z,
186            _ => return,
187        };
188        let parsed: Vec<(Vec<[f64; 2]>, Vec<String>, u32)> = zones
189            .iter()
190            .map(|z| {
191                (
192                    parse_polygon(&z.polygon.0),
193                    parse_labels(&z.labels.0),
194                    confirm_frames(z),
195                )
196            })
197            .collect();
198
199        let now = Utc::now();
200        let mut emits: Vec<ZoneEvt> = Vec::new();
201        {
202            let mut state = self.state.lock().await;
203            for (track, d) in &by_track {
204                let Some(point) = d.bbox.as_ref().and_then(bbox_ground_point) else {
205                    continue;
206                };
207                let label = d.label.as_deref().unwrap_or("");
208                for (idx, zone) in zones.iter().enumerate() {
209                    let (poly, labels, confirm) = &parsed[idx];
210                    if !labels.is_empty() && !labels.iter().any(|l| l == label) {
211                        continue;
212                    }
213                    let raw_inside = point_in_polygon(point, poly);
214                    let key = format!("{camera_id}|{}|{track}", zone.id);
215                    let entry = state.entry(key).or_insert_with(|| TrackZoneState {
216                        track: track.to_string(),
217                        zone_id: zone.id.clone(),
218                        zone_name: zone.name.clone(),
219                        severity: zone.severity.clone(),
220                        inside: false,
221                        entered_at: now,
222                        dwell_emitted: false,
223                        last_seen: now,
224                        candidate: None,
225                        candidate_count: 0,
226                    });
227                    entry.last_seen = now;
228
229                    // Debounce: require `confirm` consecutive observations to flip membership.
230                    if raw_inside == entry.inside {
231                        entry.candidate = None;
232                        entry.candidate_count = 0;
233                    } else {
234                        if entry.candidate == Some(raw_inside) {
235                            entry.candidate_count += 1;
236                        } else {
237                            entry.candidate = Some(raw_inside);
238                            entry.candidate_count = 1;
239                        }
240                        if entry.candidate_count >= *confirm {
241                            entry.inside = raw_inside;
242                            entry.candidate = None;
243                            entry.candidate_count = 0;
244                            if raw_inside {
245                                entry.entered_at = now;
246                                entry.dwell_emitted = false;
247                                emits.push(make_evt(camera_id, zone, track, "enter", label, None));
248                            } else {
249                                emits.push(make_evt(camera_id, zone, track, "exit", label, None));
250                            }
251                        }
252                    }
253
254                    if entry.inside && zone.dwell_seconds > 0.0 && !entry.dwell_emitted {
255                        let dwell = (now - entry.entered_at).num_milliseconds() as f64 / 1000.0;
256                        if dwell >= zone.dwell_seconds {
257                            entry.dwell_emitted = true;
258                            emits.push(make_evt(
259                                camera_id,
260                                zone,
261                                track,
262                                "dwell",
263                                label,
264                                Some(dwell),
265                            ));
266                        }
267                    }
268                }
269            }
270
271            // Prune stale state (server time); synthesize an exit for any track still inside.
272            let cutoff = now - Duration::seconds(STATE_TTL_SECS);
273            let mut survivors: HashMap<String, TrackZoneState> = HashMap::new();
274            for (k, s) in state.drain() {
275                if s.last_seen >= cutoff {
276                    survivors.insert(k, s);
277                } else if s.inside {
278                    emits.push(ZoneEvt {
279                        camera_id: camera_id.to_string(),
280                        zone_id: s.zone_id.clone(),
281                        zone_name: s.zone_name.clone(),
282                        severity: s.severity.clone(),
283                        track: s.track.clone(),
284                        event_type: "exit",
285                        label: String::new(),
286                        dwell: None,
287                    });
288                }
289            }
290            *state = survivors;
291        }
292
293        for e in &emits {
294            self.emit(e, now).await;
295        }
296    }
297
298    async fn emit(&self, evt: &ZoneEvt, now: DateTime<Utc>) {
299        let id = format!("zev_{}", Uuid::new_v4().simple());
300        let evidence = if evt.event_type == "enter" {
301            self.copy_evidence(&evt.camera_id, &id).await
302        } else {
303            None
304        };
305
306        let _ = sqlx::query(
307            "INSERT INTO zone_events
308               (id, camera_id, zone_id, zone_name, track_id, event_type, label, timestamp,
309                dwell_seconds, evidence_path, created_at)
310             VALUES (?,?,?,?,?,?,?,?,?,?,?)",
311        )
312        .bind(&id)
313        .bind(&evt.camera_id)
314        .bind(&evt.zone_id)
315        .bind(&evt.zone_name)
316        .bind(&evt.track)
317        .bind(evt.event_type)
318        .bind(&evt.label)
319        .bind(now)
320        .bind(evt.dwell)
321        .bind(&evidence)
322        .bind(now)
323        .execute(&self.pool)
324        .await;
325
326        let _ = repo::log_event(
327            &self.pool,
328            Some(&evt.camera_id),
329            &format!("zone_{}", evt.event_type),
330            &evt.severity,
331            json!({
332                "zone_id": evt.zone_id,
333                "zone": evt.zone_name,
334                "track_id": evt.track,
335                "label": evt.label,
336                "dwell_seconds": evt.dwell,
337                "evidence": evidence,
338            }),
339        )
340        .await;
341
342        tracing::info!(camera_id = %evt.camera_id, zone = %evt.zone_name, track = %evt.track, event = evt.event_type, "zone event");
343
344        // Event-triggered recording: extend the camera's trigger window. A no-op unless the camera's
345        // record_mode is `event` / `scheduled_event` (the recorder guards on the mode).
346        let _ = self.recorder.trigger(&evt.camera_id, "zone_event").await;
347    }
348
349    /// Copy the latest sampled sub-stream frame as evidence; returns its served URL.
350    async fn copy_evidence(&self, camera_id: &str, id: &str) -> Option<String> {
351        let src = self.cfg.camera_frames_dir(camera_id).join("latest_sub.jpg");
352        let filename = format!("zoneevt_{id}.jpg");
353        let dst = self.cfg.snapshots_dir.join(&filename);
354        if tokio::fs::copy(&src, &dst).await.is_ok() {
355            Some(format!("/media/snapshots/{filename}"))
356        } else {
357            None
358        }
359    }
360}
361
362fn make_evt(
363    camera_id: &str,
364    zone: &Zone,
365    track: &str,
366    event_type: &'static str,
367    label: &str,
368    dwell: Option<f64>,
369) -> ZoneEvt {
370    ZoneEvt {
371        camera_id: camera_id.to_string(),
372        zone_id: zone.id.clone(),
373        zone_name: zone.name.clone(),
374        severity: zone.severity.clone(),
375        track: track.to_string(),
376        event_type,
377        label: label.to_string(),
378        dwell,
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385
386    #[test]
387    fn point_in_polygon_basic() {
388        let sq = [[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]];
389        assert!(point_in_polygon([0.5, 0.5], &sq));
390        assert!(!point_in_polygon([1.5, 0.5], &sq));
391        assert!(!point_in_polygon([0.5, 1.5], &sq));
392    }
393
394    #[test]
395    fn bbox_ground_point_is_bottom_center() {
396        assert_eq!(
397            bbox_ground_point(&json!([0.2, 0.1, 0.4, 0.6])),
398            Some([0.4, 0.7])
399        );
400        assert_eq!(bbox_ground_point(&json!([1, 2, 3])), None);
401        assert_eq!(bbox_ground_point(&json!(["x", 0, 0, 0])), None);
402    }
403
404    #[test]
405    fn parse_polygon_skips_non_finite_and_bad_points() {
406        assert_eq!(
407            parse_polygon(&json!([[0.0, 0.0], [1.0, 0.5], ["a", 1]])),
408            vec![[0.0, 0.0], [1.0, 0.5]]
409        );
410    }
411
412    #[test]
413    fn parse_labels_strings() {
414        assert_eq!(
415            parse_labels(&json!(["person", "car"])),
416            vec!["person", "car"]
417        );
418    }
419}