Skip to main content

heldar_kernel/services/
zones.rs

1//! Zone engine (Stage 3): evaluates tracked detections against per-camera polygon zones and raises
2//! enter / exit / dwell events (with an evidence frame). State is keyed per (camera, zone, track),
3//! held in memory, and driven by SERVER time (never the worker-supplied timestamp), so a skewed
4//! worker clock cannot corrupt or evict state. A small confirm-frame debounce suppresses boundary
5//! jitter, and a track still inside when its state expires gets a synthesized exit. Fed
6//! synchronously from detection ingest.
7
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use chrono::{DateTime, Duration, Utc};
12use serde_json::{json, Value};
13use sqlx::SqlitePool;
14use tokio::sync::Mutex;
15use uuid::Uuid;
16
17use crate::config::Config;
18use crate::models::{DetectionIngest, Zone};
19use crate::repo;
20
21/// How long a track's zone state is retained (server time) without being seen before it is pruned.
22const STATE_TTL_SECS: i64 = 120;
23/// Default consecutive-observation confirmation before a membership transition (debounce); a zone
24/// can override via config.confirm_frames.
25const DEFAULT_CONFIRM_FRAMES: u32 = 2;
26
27#[derive(Debug, Clone)]
28struct TrackZoneState {
29    track: String,
30    zone_id: String,
31    zone_name: String,
32    severity: String,
33    inside: bool,
34    entered_at: DateTime<Utc>,
35    dwell_emitted: bool,
36    last_seen: DateTime<Utc>,
37    candidate: Option<bool>,
38    candidate_count: u32,
39}
40
41/// A zone event to persist + log (resolved fields, so prune-time exits need no Zone lookup).
42struct ZoneEvt {
43    camera_id: String,
44    zone_id: String,
45    zone_name: String,
46    severity: String,
47    track: String,
48    event_type: &'static str,
49    label: String,
50    dwell: Option<f64>,
51}
52
53pub struct ZoneEngine {
54    pool: SqlitePool,
55    cfg: Arc<Config>,
56    state: Mutex<HashMap<String, TrackZoneState>>,
57}
58
59fn point_in_polygon(p: [f64; 2], poly: &[[f64; 2]]) -> bool {
60    let n = poly.len();
61    if n < 3 {
62        return false;
63    }
64    let (x, y) = (p[0], p[1]);
65    let mut inside = false;
66    let mut j = n - 1;
67    for i in 0..n {
68        let (xi, yi) = (poly[i][0], poly[i][1]);
69        let (xj, yj) = (poly[j][0], poly[j][1]);
70        if ((yi > y) != (yj > y)) && (x < (xj - xi) * (y - yi) / (yj - yi) + xi) {
71            inside = !inside;
72        }
73        j = i;
74    }
75    inside
76}
77
78fn parse_polygon(v: &Value) -> Vec<[f64; 2]> {
79    v.as_array()
80        .map(|arr| {
81            arr.iter()
82                .filter_map(|pt| {
83                    let a = pt.as_array()?;
84                    let x = a.first()?.as_f64()?;
85                    let y = a.get(1)?.as_f64()?;
86                    (x.is_finite() && y.is_finite()).then_some([x, y])
87                })
88                .collect()
89        })
90        .unwrap_or_default()
91}
92
93fn parse_labels(v: &Value) -> Vec<String> {
94    v.as_array()
95        .map(|arr| {
96            arr.iter()
97                .filter_map(|l| l.as_str().map(|s| s.to_string()))
98                .collect()
99        })
100        .unwrap_or_default()
101}
102
103fn confirm_frames(zone: &Zone) -> u32 {
104    (zone
105        .config
106        .0
107        .get("confirm_frames")
108        .and_then(|v| v.as_u64())
109        .unwrap_or(DEFAULT_CONFIRM_FRAMES as u64))
110    .clamp(1, 10) as u32
111}
112
113/// Ground point of a detection bbox `[x, y, w, h]` (normalized): bottom-center.
114fn bbox_ground_point(v: &Value) -> Option<[f64; 2]> {
115    let a = v.as_array()?;
116    if a.len() < 4 {
117        return None;
118    }
119    let x = a[0].as_f64()?;
120    let y = a[1].as_f64()?;
121    let w = a[2].as_f64()?;
122    let h = a[3].as_f64()?;
123    if !(x.is_finite() && y.is_finite() && w.is_finite() && h.is_finite()) {
124        return None;
125    }
126    Some([x + w / 2.0, y + h])
127}
128
129#[async_trait::async_trait]
130impl crate::services::consumer::DetectionConsumer for ZoneEngine {
131    fn name(&self) -> &'static str {
132        "zones"
133    }
134    /// The zone engine evaluates any tracked detection, regardless of task type.
135    fn interested_in(&self, _task_type: &str) -> bool {
136        true
137    }
138    async fn consume(&self, batch: &crate::services::consumer::DetectionBatch<'_>) {
139        self.process(batch.camera_id, batch.detections).await;
140    }
141}
142
143impl ZoneEngine {
144    pub fn new(pool: SqlitePool, cfg: Arc<Config>) -> Arc<Self> {
145        Arc::new(Self {
146            pool,
147            cfg,
148            state: Mutex::new(HashMap::new()),
149        })
150    }
151
152    /// Evaluate (tracked) detections for a camera against its zones, raising events. Membership is
153    /// driven by server time; the worker-supplied timestamp is not trusted for state/timing.
154    pub async fn process(&self, camera_id: &str, detections: &[DetectionIngest]) {
155        // Dedup tracked detections by track_id (keep the highest-confidence one per track).
156        let mut by_track: HashMap<&str, &DetectionIngest> = HashMap::new();
157        for d in detections {
158            if let (Some(t), Some(_)) = (d.track_id.as_deref(), d.bbox.as_ref()) {
159                let better = by_track
160                    .get(t)
161                    .map(|p: &&DetectionIngest| {
162                        d.confidence.unwrap_or(0.0) > p.confidence.unwrap_or(0.0)
163                    })
164                    .unwrap_or(true);
165                if better {
166                    by_track.insert(t, d);
167                }
168            }
169        }
170        if by_track.is_empty() {
171            return;
172        }
173        let zones = match sqlx::query_as::<_, Zone>(
174            "SELECT * FROM zones WHERE camera_id = ? AND enabled = 1",
175        )
176        .bind(camera_id)
177        .fetch_all(&self.pool)
178        .await
179        {
180            Ok(z) if !z.is_empty() => z,
181            _ => return,
182        };
183        let parsed: Vec<(Vec<[f64; 2]>, Vec<String>, u32)> = zones
184            .iter()
185            .map(|z| {
186                (
187                    parse_polygon(&z.polygon.0),
188                    parse_labels(&z.labels.0),
189                    confirm_frames(z),
190                )
191            })
192            .collect();
193
194        let now = Utc::now();
195        let mut emits: Vec<ZoneEvt> = Vec::new();
196        {
197            let mut state = self.state.lock().await;
198            for (track, d) in &by_track {
199                let Some(point) = d.bbox.as_ref().and_then(bbox_ground_point) else {
200                    continue;
201                };
202                let label = d.label.as_deref().unwrap_or("");
203                for (idx, zone) in zones.iter().enumerate() {
204                    let (poly, labels, confirm) = &parsed[idx];
205                    if !labels.is_empty() && !labels.iter().any(|l| l == label) {
206                        continue;
207                    }
208                    let raw_inside = point_in_polygon(point, poly);
209                    let key = format!("{camera_id}|{}|{track}", zone.id);
210                    let entry = state.entry(key).or_insert_with(|| TrackZoneState {
211                        track: track.to_string(),
212                        zone_id: zone.id.clone(),
213                        zone_name: zone.name.clone(),
214                        severity: zone.severity.clone(),
215                        inside: false,
216                        entered_at: now,
217                        dwell_emitted: false,
218                        last_seen: now,
219                        candidate: None,
220                        candidate_count: 0,
221                    });
222                    entry.last_seen = now;
223
224                    // Debounce: require `confirm` consecutive observations to flip membership.
225                    if raw_inside == entry.inside {
226                        entry.candidate = None;
227                        entry.candidate_count = 0;
228                    } else {
229                        if entry.candidate == Some(raw_inside) {
230                            entry.candidate_count += 1;
231                        } else {
232                            entry.candidate = Some(raw_inside);
233                            entry.candidate_count = 1;
234                        }
235                        if entry.candidate_count >= *confirm {
236                            entry.inside = raw_inside;
237                            entry.candidate = None;
238                            entry.candidate_count = 0;
239                            if raw_inside {
240                                entry.entered_at = now;
241                                entry.dwell_emitted = false;
242                                emits.push(make_evt(camera_id, zone, track, "enter", label, None));
243                            } else {
244                                emits.push(make_evt(camera_id, zone, track, "exit", label, None));
245                            }
246                        }
247                    }
248
249                    if entry.inside && zone.dwell_seconds > 0.0 && !entry.dwell_emitted {
250                        let dwell = (now - entry.entered_at).num_milliseconds() as f64 / 1000.0;
251                        if dwell >= zone.dwell_seconds {
252                            entry.dwell_emitted = true;
253                            emits.push(make_evt(
254                                camera_id,
255                                zone,
256                                track,
257                                "dwell",
258                                label,
259                                Some(dwell),
260                            ));
261                        }
262                    }
263                }
264            }
265
266            // Prune stale state (server time); synthesize an exit for any track still inside.
267            let cutoff = now - Duration::seconds(STATE_TTL_SECS);
268            let mut survivors: HashMap<String, TrackZoneState> = HashMap::new();
269            for (k, s) in state.drain() {
270                if s.last_seen >= cutoff {
271                    survivors.insert(k, s);
272                } else if s.inside {
273                    emits.push(ZoneEvt {
274                        camera_id: camera_id.to_string(),
275                        zone_id: s.zone_id.clone(),
276                        zone_name: s.zone_name.clone(),
277                        severity: s.severity.clone(),
278                        track: s.track.clone(),
279                        event_type: "exit",
280                        label: String::new(),
281                        dwell: None,
282                    });
283                }
284            }
285            *state = survivors;
286        }
287
288        for e in &emits {
289            self.emit(e, now).await;
290        }
291    }
292
293    async fn emit(&self, evt: &ZoneEvt, now: DateTime<Utc>) {
294        let id = format!("zev_{}", Uuid::new_v4().simple());
295        let evidence = if evt.event_type == "enter" {
296            self.copy_evidence(&evt.camera_id, &id).await
297        } else {
298            None
299        };
300
301        let _ = sqlx::query(
302            "INSERT INTO zone_events
303               (id, camera_id, zone_id, zone_name, track_id, event_type, label, timestamp,
304                dwell_seconds, evidence_path, created_at)
305             VALUES (?,?,?,?,?,?,?,?,?,?,?)",
306        )
307        .bind(&id)
308        .bind(&evt.camera_id)
309        .bind(&evt.zone_id)
310        .bind(&evt.zone_name)
311        .bind(&evt.track)
312        .bind(evt.event_type)
313        .bind(&evt.label)
314        .bind(now)
315        .bind(evt.dwell)
316        .bind(&evidence)
317        .bind(now)
318        .execute(&self.pool)
319        .await;
320
321        let _ = repo::log_event(
322            &self.pool,
323            Some(&evt.camera_id),
324            &format!("zone_{}", evt.event_type),
325            &evt.severity,
326            json!({
327                "zone_id": evt.zone_id,
328                "zone": evt.zone_name,
329                "track_id": evt.track,
330                "label": evt.label,
331                "dwell_seconds": evt.dwell,
332                "evidence": evidence,
333            }),
334        )
335        .await;
336
337        tracing::info!(camera_id = %evt.camera_id, zone = %evt.zone_name, track = %evt.track, event = evt.event_type, "zone event");
338    }
339
340    /// Copy the latest sampled sub-stream frame as evidence; returns its served URL.
341    async fn copy_evidence(&self, camera_id: &str, id: &str) -> Option<String> {
342        let src = self.cfg.camera_frames_dir(camera_id).join("latest_sub.jpg");
343        let filename = format!("zoneevt_{id}.jpg");
344        let dst = self.cfg.snapshots_dir.join(&filename);
345        if tokio::fs::copy(&src, &dst).await.is_ok() {
346            Some(format!("/media/snapshots/{filename}"))
347        } else {
348            None
349        }
350    }
351}
352
353fn make_evt(
354    camera_id: &str,
355    zone: &Zone,
356    track: &str,
357    event_type: &'static str,
358    label: &str,
359    dwell: Option<f64>,
360) -> ZoneEvt {
361    ZoneEvt {
362        camera_id: camera_id.to_string(),
363        zone_id: zone.id.clone(),
364        zone_name: zone.name.clone(),
365        severity: zone.severity.clone(),
366        track: track.to_string(),
367        event_type,
368        label: label.to_string(),
369        dwell,
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376
377    #[test]
378    fn point_in_polygon_basic() {
379        let sq = [[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]];
380        assert!(point_in_polygon([0.5, 0.5], &sq));
381        assert!(!point_in_polygon([1.5, 0.5], &sq));
382        assert!(!point_in_polygon([0.5, 1.5], &sq));
383    }
384
385    #[test]
386    fn bbox_ground_point_is_bottom_center() {
387        assert_eq!(
388            bbox_ground_point(&json!([0.2, 0.1, 0.4, 0.6])),
389            Some([0.4, 0.7])
390        );
391        assert_eq!(bbox_ground_point(&json!([1, 2, 3])), None);
392        assert_eq!(bbox_ground_point(&json!(["x", 0, 0, 0])), None);
393    }
394
395    #[test]
396    fn parse_polygon_skips_non_finite_and_bad_points() {
397        assert_eq!(
398            parse_polygon(&json!([[0.0, 0.0], [1.0, 0.5], ["a", 1]])),
399            vec![[0.0, 0.0], [1.0, 0.5]]
400        );
401    }
402
403    #[test]
404    fn parse_labels_strings() {
405        assert_eq!(
406            parse_labels(&json!(["person", "car"])),
407            vec!["person", "car"]
408        );
409    }
410}