1use std::collections::HashMap;
9use std::sync::Arc;
10
11use chrono::{DateTime, Duration, Utc};
12use serde_json::{json, Value};
13use sqlx::SqlitePool;
14use tokio::sync::Mutex;
15use uuid::Uuid;
16
17use crate::config::Config;
18use crate::models::{DetectionIngest, Zone};
19use crate::repo;
20use crate::services::recorder::RecorderManager;
21
22const STATE_TTL_SECS: i64 = 120;
24const DEFAULT_CONFIRM_FRAMES: u32 = 2;
27
28#[derive(Debug, Clone)]
29struct TrackZoneState {
30 track: String,
31 zone_id: String,
32 zone_name: String,
33 severity: String,
34 inside: bool,
35 entered_at: DateTime<Utc>,
36 dwell_emitted: bool,
37 last_seen: DateTime<Utc>,
38 candidate: Option<bool>,
39 candidate_count: u32,
40}
41
42struct ZoneEvt {
44 camera_id: String,
45 zone_id: String,
46 zone_name: String,
47 severity: String,
48 track: String,
49 event_type: &'static str,
50 label: String,
51 dwell: Option<f64>,
52}
53
54pub struct ZoneEngine {
55 pool: SqlitePool,
56 cfg: Arc<Config>,
57 recorder: Arc<RecorderManager>,
60 state: Mutex<HashMap<String, TrackZoneState>>,
61}
62
63fn point_in_polygon(p: [f64; 2], poly: &[[f64; 2]]) -> bool {
64 let n = poly.len();
65 if n < 3 {
66 return false;
67 }
68 let (x, y) = (p[0], p[1]);
69 let mut inside = false;
70 let mut j = n - 1;
71 for i in 0..n {
72 let (xi, yi) = (poly[i][0], poly[i][1]);
73 let (xj, yj) = (poly[j][0], poly[j][1]);
74 if ((yi > y) != (yj > y)) && (x < (xj - xi) * (y - yi) / (yj - yi) + xi) {
75 inside = !inside;
76 }
77 j = i;
78 }
79 inside
80}
81
82fn parse_polygon(v: &Value) -> Vec<[f64; 2]> {
83 v.as_array()
84 .map(|arr| {
85 arr.iter()
86 .filter_map(|pt| {
87 let a = pt.as_array()?;
88 let x = a.first()?.as_f64()?;
89 let y = a.get(1)?.as_f64()?;
90 (x.is_finite() && y.is_finite()).then_some([x, y])
91 })
92 .collect()
93 })
94 .unwrap_or_default()
95}
96
97fn parse_labels(v: &Value) -> Vec<String> {
98 v.as_array()
99 .map(|arr| {
100 arr.iter()
101 .filter_map(|l| l.as_str().map(|s| s.to_string()))
102 .collect()
103 })
104 .unwrap_or_default()
105}
106
107fn confirm_frames(zone: &Zone) -> u32 {
108 (zone
109 .config
110 .0
111 .get("confirm_frames")
112 .and_then(|v| v.as_u64())
113 .unwrap_or(DEFAULT_CONFIRM_FRAMES as u64))
114 .clamp(1, 10) as u32
115}
116
117fn bbox_ground_point(v: &Value) -> Option<[f64; 2]> {
119 let a = v.as_array()?;
120 if a.len() < 4 {
121 return None;
122 }
123 let x = a[0].as_f64()?;
124 let y = a[1].as_f64()?;
125 let w = a[2].as_f64()?;
126 let h = a[3].as_f64()?;
127 if !(x.is_finite() && y.is_finite() && w.is_finite() && h.is_finite()) {
128 return None;
129 }
130 Some([x + w / 2.0, y + h])
131}
132
133#[async_trait::async_trait]
134impl crate::services::consumer::DetectionConsumer for ZoneEngine {
135 fn name(&self) -> &'static str {
136 "zones"
137 }
138 fn interested_in(&self, _task_type: &str) -> bool {
140 true
141 }
142 async fn consume(&self, batch: &crate::services::consumer::DetectionBatch<'_>) {
143 self.process(batch.camera_id, batch.detections).await;
144 }
145}
146
147impl ZoneEngine {
148 pub fn new(pool: SqlitePool, cfg: Arc<Config>, recorder: Arc<RecorderManager>) -> Arc<Self> {
149 Arc::new(Self {
150 pool,
151 cfg,
152 recorder,
153 state: Mutex::new(HashMap::new()),
154 })
155 }
156
157 pub async fn process(&self, camera_id: &str, detections: &[DetectionIngest]) {
160 let mut by_track: HashMap<&str, &DetectionIngest> = HashMap::new();
162 for d in detections {
163 if let (Some(t), Some(_)) = (d.track_id.as_deref(), d.bbox.as_ref()) {
164 let better = by_track
165 .get(t)
166 .map(|p: &&DetectionIngest| {
167 d.confidence.unwrap_or(0.0) > p.confidence.unwrap_or(0.0)
168 })
169 .unwrap_or(true);
170 if better {
171 by_track.insert(t, d);
172 }
173 }
174 }
175 if by_track.is_empty() {
176 return;
177 }
178 let zones = match sqlx::query_as::<_, Zone>(
179 "SELECT * FROM zones WHERE camera_id = ? AND enabled = 1",
180 )
181 .bind(camera_id)
182 .fetch_all(&self.pool)
183 .await
184 {
185 Ok(z) if !z.is_empty() => z,
186 _ => return,
187 };
188 let parsed: Vec<(Vec<[f64; 2]>, Vec<String>, u32)> = zones
189 .iter()
190 .map(|z| {
191 (
192 parse_polygon(&z.polygon.0),
193 parse_labels(&z.labels.0),
194 confirm_frames(z),
195 )
196 })
197 .collect();
198
199 let now = Utc::now();
200 let mut emits: Vec<ZoneEvt> = Vec::new();
201 {
202 let mut state = self.state.lock().await;
203 for (track, d) in &by_track {
204 let Some(point) = d.bbox.as_ref().and_then(bbox_ground_point) else {
205 continue;
206 };
207 let label = d.label.as_deref().unwrap_or("");
208 for (idx, zone) in zones.iter().enumerate() {
209 let (poly, labels, confirm) = &parsed[idx];
210 if !labels.is_empty() && !labels.iter().any(|l| l == label) {
211 continue;
212 }
213 let raw_inside = point_in_polygon(point, poly);
214 let key = format!("{camera_id}|{}|{track}", zone.id);
215 let entry = state.entry(key).or_insert_with(|| TrackZoneState {
216 track: track.to_string(),
217 zone_id: zone.id.clone(),
218 zone_name: zone.name.clone(),
219 severity: zone.severity.clone(),
220 inside: false,
221 entered_at: now,
222 dwell_emitted: false,
223 last_seen: now,
224 candidate: None,
225 candidate_count: 0,
226 });
227 entry.last_seen = now;
228
229 if raw_inside == entry.inside {
231 entry.candidate = None;
232 entry.candidate_count = 0;
233 } else {
234 if entry.candidate == Some(raw_inside) {
235 entry.candidate_count += 1;
236 } else {
237 entry.candidate = Some(raw_inside);
238 entry.candidate_count = 1;
239 }
240 if entry.candidate_count >= *confirm {
241 entry.inside = raw_inside;
242 entry.candidate = None;
243 entry.candidate_count = 0;
244 if raw_inside {
245 entry.entered_at = now;
246 entry.dwell_emitted = false;
247 emits.push(make_evt(camera_id, zone, track, "enter", label, None));
248 } else {
249 emits.push(make_evt(camera_id, zone, track, "exit", label, None));
250 }
251 }
252 }
253
254 if entry.inside && zone.dwell_seconds > 0.0 && !entry.dwell_emitted {
255 let dwell = (now - entry.entered_at).num_milliseconds() as f64 / 1000.0;
256 if dwell >= zone.dwell_seconds {
257 entry.dwell_emitted = true;
258 emits.push(make_evt(
259 camera_id,
260 zone,
261 track,
262 "dwell",
263 label,
264 Some(dwell),
265 ));
266 }
267 }
268 }
269 }
270
271 let cutoff = now - Duration::seconds(STATE_TTL_SECS);
273 let mut survivors: HashMap<String, TrackZoneState> = HashMap::new();
274 for (k, s) in state.drain() {
275 if s.last_seen >= cutoff {
276 survivors.insert(k, s);
277 } else if s.inside {
278 emits.push(ZoneEvt {
279 camera_id: camera_id.to_string(),
280 zone_id: s.zone_id.clone(),
281 zone_name: s.zone_name.clone(),
282 severity: s.severity.clone(),
283 track: s.track.clone(),
284 event_type: "exit",
285 label: String::new(),
286 dwell: None,
287 });
288 }
289 }
290 *state = survivors;
291 }
292
293 for e in &emits {
294 self.emit(e, now).await;
295 }
296 }
297
298 async fn emit(&self, evt: &ZoneEvt, now: DateTime<Utc>) {
299 let id = format!("zev_{}", Uuid::new_v4().simple());
300 let evidence = if evt.event_type == "enter" {
301 self.copy_evidence(&evt.camera_id, &id).await
302 } else {
303 None
304 };
305
306 let _ = sqlx::query(
307 "INSERT INTO zone_events
308 (id, camera_id, zone_id, zone_name, track_id, event_type, label, timestamp,
309 dwell_seconds, evidence_path, created_at)
310 VALUES (?,?,?,?,?,?,?,?,?,?,?)",
311 )
312 .bind(&id)
313 .bind(&evt.camera_id)
314 .bind(&evt.zone_id)
315 .bind(&evt.zone_name)
316 .bind(&evt.track)
317 .bind(evt.event_type)
318 .bind(&evt.label)
319 .bind(now)
320 .bind(evt.dwell)
321 .bind(&evidence)
322 .bind(now)
323 .execute(&self.pool)
324 .await;
325
326 let _ = repo::log_event(
327 &self.pool,
328 Some(&evt.camera_id),
329 &format!("zone_{}", evt.event_type),
330 &evt.severity,
331 json!({
332 "zone_id": evt.zone_id,
333 "zone": evt.zone_name,
334 "track_id": evt.track,
335 "label": evt.label,
336 "dwell_seconds": evt.dwell,
337 "evidence": evidence,
338 }),
339 )
340 .await;
341
342 tracing::info!(camera_id = %evt.camera_id, zone = %evt.zone_name, track = %evt.track, event = evt.event_type, "zone event");
343
344 let _ = self.recorder.trigger(&evt.camera_id, "zone_event").await;
347 }
348
349 async fn copy_evidence(&self, camera_id: &str, id: &str) -> Option<String> {
351 let src = self.cfg.camera_frames_dir(camera_id).join("latest_sub.jpg");
352 let filename = format!("zoneevt_{id}.jpg");
353 let dst = self.cfg.snapshots_dir.join(&filename);
354 if tokio::fs::copy(&src, &dst).await.is_ok() {
355 Some(format!("/media/snapshots/{filename}"))
356 } else {
357 None
358 }
359 }
360}
361
362fn make_evt(
363 camera_id: &str,
364 zone: &Zone,
365 track: &str,
366 event_type: &'static str,
367 label: &str,
368 dwell: Option<f64>,
369) -> ZoneEvt {
370 ZoneEvt {
371 camera_id: camera_id.to_string(),
372 zone_id: zone.id.clone(),
373 zone_name: zone.name.clone(),
374 severity: zone.severity.clone(),
375 track: track.to_string(),
376 event_type,
377 label: label.to_string(),
378 dwell,
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385
386 #[test]
387 fn point_in_polygon_basic() {
388 let sq = [[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]];
389 assert!(point_in_polygon([0.5, 0.5], &sq));
390 assert!(!point_in_polygon([1.5, 0.5], &sq));
391 assert!(!point_in_polygon([0.5, 1.5], &sq));
392 }
393
394 #[test]
395 fn bbox_ground_point_is_bottom_center() {
396 assert_eq!(
397 bbox_ground_point(&json!([0.2, 0.1, 0.4, 0.6])),
398 Some([0.4, 0.7])
399 );
400 assert_eq!(bbox_ground_point(&json!([1, 2, 3])), None);
401 assert_eq!(bbox_ground_point(&json!(["x", 0, 0, 0])), None);
402 }
403
404 #[test]
405 fn parse_polygon_skips_non_finite_and_bad_points() {
406 assert_eq!(
407 parse_polygon(&json!([[0.0, 0.0], [1.0, 0.5], ["a", 1]])),
408 vec![[0.0, 0.0], [1.0, 0.5]]
409 );
410 }
411
412 #[test]
413 fn parse_labels_strings() {
414 assert_eq!(
415 parse_labels(&json!(["person", "car"])),
416 vec!["person", "car"]
417 );
418 }
419}