1use std::collections::HashMap;
9use std::sync::Arc;
10
11use chrono::{DateTime, Duration, Utc};
12use serde_json::{json, Value};
13use sqlx::SqlitePool;
14use tokio::sync::Mutex;
15use uuid::Uuid;
16
17use crate::config::Config;
18use crate::models::{DetectionIngest, Zone};
19use crate::repo;
20
21const STATE_TTL_SECS: i64 = 120;
23const DEFAULT_CONFIRM_FRAMES: u32 = 2;
26
27#[derive(Debug, Clone)]
28struct TrackZoneState {
29 track: String,
30 zone_id: String,
31 zone_name: String,
32 severity: String,
33 inside: bool,
34 entered_at: DateTime<Utc>,
35 dwell_emitted: bool,
36 last_seen: DateTime<Utc>,
37 candidate: Option<bool>,
38 candidate_count: u32,
39}
40
41struct ZoneEvt {
43 camera_id: String,
44 zone_id: String,
45 zone_name: String,
46 severity: String,
47 track: String,
48 event_type: &'static str,
49 label: String,
50 dwell: Option<f64>,
51}
52
53pub struct ZoneEngine {
54 pool: SqlitePool,
55 cfg: Arc<Config>,
56 state: Mutex<HashMap<String, TrackZoneState>>,
57}
58
59fn point_in_polygon(p: [f64; 2], poly: &[[f64; 2]]) -> bool {
60 let n = poly.len();
61 if n < 3 {
62 return false;
63 }
64 let (x, y) = (p[0], p[1]);
65 let mut inside = false;
66 let mut j = n - 1;
67 for i in 0..n {
68 let (xi, yi) = (poly[i][0], poly[i][1]);
69 let (xj, yj) = (poly[j][0], poly[j][1]);
70 if ((yi > y) != (yj > y)) && (x < (xj - xi) * (y - yi) / (yj - yi) + xi) {
71 inside = !inside;
72 }
73 j = i;
74 }
75 inside
76}
77
78fn parse_polygon(v: &Value) -> Vec<[f64; 2]> {
79 v.as_array()
80 .map(|arr| {
81 arr.iter()
82 .filter_map(|pt| {
83 let a = pt.as_array()?;
84 let x = a.first()?.as_f64()?;
85 let y = a.get(1)?.as_f64()?;
86 (x.is_finite() && y.is_finite()).then_some([x, y])
87 })
88 .collect()
89 })
90 .unwrap_or_default()
91}
92
93fn parse_labels(v: &Value) -> Vec<String> {
94 v.as_array()
95 .map(|arr| {
96 arr.iter()
97 .filter_map(|l| l.as_str().map(|s| s.to_string()))
98 .collect()
99 })
100 .unwrap_or_default()
101}
102
103fn confirm_frames(zone: &Zone) -> u32 {
104 (zone
105 .config
106 .0
107 .get("confirm_frames")
108 .and_then(|v| v.as_u64())
109 .unwrap_or(DEFAULT_CONFIRM_FRAMES as u64))
110 .clamp(1, 10) as u32
111}
112
113fn bbox_ground_point(v: &Value) -> Option<[f64; 2]> {
115 let a = v.as_array()?;
116 if a.len() < 4 {
117 return None;
118 }
119 let x = a[0].as_f64()?;
120 let y = a[1].as_f64()?;
121 let w = a[2].as_f64()?;
122 let h = a[3].as_f64()?;
123 if !(x.is_finite() && y.is_finite() && w.is_finite() && h.is_finite()) {
124 return None;
125 }
126 Some([x + w / 2.0, y + h])
127}
128
129#[async_trait::async_trait]
130impl crate::services::consumer::DetectionConsumer for ZoneEngine {
131 fn name(&self) -> &'static str {
132 "zones"
133 }
134 fn interested_in(&self, _task_type: &str) -> bool {
136 true
137 }
138 async fn consume(&self, batch: &crate::services::consumer::DetectionBatch<'_>) {
139 self.process(batch.camera_id, batch.detections).await;
140 }
141}
142
143impl ZoneEngine {
144 pub fn new(pool: SqlitePool, cfg: Arc<Config>) -> Arc<Self> {
145 Arc::new(Self {
146 pool,
147 cfg,
148 state: Mutex::new(HashMap::new()),
149 })
150 }
151
152 pub async fn process(&self, camera_id: &str, detections: &[DetectionIngest]) {
155 let mut by_track: HashMap<&str, &DetectionIngest> = HashMap::new();
157 for d in detections {
158 if let (Some(t), Some(_)) = (d.track_id.as_deref(), d.bbox.as_ref()) {
159 let better = by_track
160 .get(t)
161 .map(|p: &&DetectionIngest| {
162 d.confidence.unwrap_or(0.0) > p.confidence.unwrap_or(0.0)
163 })
164 .unwrap_or(true);
165 if better {
166 by_track.insert(t, d);
167 }
168 }
169 }
170 if by_track.is_empty() {
171 return;
172 }
173 let zones = match sqlx::query_as::<_, Zone>(
174 "SELECT * FROM zones WHERE camera_id = ? AND enabled = 1",
175 )
176 .bind(camera_id)
177 .fetch_all(&self.pool)
178 .await
179 {
180 Ok(z) if !z.is_empty() => z,
181 _ => return,
182 };
183 let parsed: Vec<(Vec<[f64; 2]>, Vec<String>, u32)> = zones
184 .iter()
185 .map(|z| {
186 (
187 parse_polygon(&z.polygon.0),
188 parse_labels(&z.labels.0),
189 confirm_frames(z),
190 )
191 })
192 .collect();
193
194 let now = Utc::now();
195 let mut emits: Vec<ZoneEvt> = Vec::new();
196 {
197 let mut state = self.state.lock().await;
198 for (track, d) in &by_track {
199 let Some(point) = d.bbox.as_ref().and_then(bbox_ground_point) else {
200 continue;
201 };
202 let label = d.label.as_deref().unwrap_or("");
203 for (idx, zone) in zones.iter().enumerate() {
204 let (poly, labels, confirm) = &parsed[idx];
205 if !labels.is_empty() && !labels.iter().any(|l| l == label) {
206 continue;
207 }
208 let raw_inside = point_in_polygon(point, poly);
209 let key = format!("{camera_id}|{}|{track}", zone.id);
210 let entry = state.entry(key).or_insert_with(|| TrackZoneState {
211 track: track.to_string(),
212 zone_id: zone.id.clone(),
213 zone_name: zone.name.clone(),
214 severity: zone.severity.clone(),
215 inside: false,
216 entered_at: now,
217 dwell_emitted: false,
218 last_seen: now,
219 candidate: None,
220 candidate_count: 0,
221 });
222 entry.last_seen = now;
223
224 if raw_inside == entry.inside {
226 entry.candidate = None;
227 entry.candidate_count = 0;
228 } else {
229 if entry.candidate == Some(raw_inside) {
230 entry.candidate_count += 1;
231 } else {
232 entry.candidate = Some(raw_inside);
233 entry.candidate_count = 1;
234 }
235 if entry.candidate_count >= *confirm {
236 entry.inside = raw_inside;
237 entry.candidate = None;
238 entry.candidate_count = 0;
239 if raw_inside {
240 entry.entered_at = now;
241 entry.dwell_emitted = false;
242 emits.push(make_evt(camera_id, zone, track, "enter", label, None));
243 } else {
244 emits.push(make_evt(camera_id, zone, track, "exit", label, None));
245 }
246 }
247 }
248
249 if entry.inside && zone.dwell_seconds > 0.0 && !entry.dwell_emitted {
250 let dwell = (now - entry.entered_at).num_milliseconds() as f64 / 1000.0;
251 if dwell >= zone.dwell_seconds {
252 entry.dwell_emitted = true;
253 emits.push(make_evt(
254 camera_id,
255 zone,
256 track,
257 "dwell",
258 label,
259 Some(dwell),
260 ));
261 }
262 }
263 }
264 }
265
266 let cutoff = now - Duration::seconds(STATE_TTL_SECS);
268 let mut survivors: HashMap<String, TrackZoneState> = HashMap::new();
269 for (k, s) in state.drain() {
270 if s.last_seen >= cutoff {
271 survivors.insert(k, s);
272 } else if s.inside {
273 emits.push(ZoneEvt {
274 camera_id: camera_id.to_string(),
275 zone_id: s.zone_id.clone(),
276 zone_name: s.zone_name.clone(),
277 severity: s.severity.clone(),
278 track: s.track.clone(),
279 event_type: "exit",
280 label: String::new(),
281 dwell: None,
282 });
283 }
284 }
285 *state = survivors;
286 }
287
288 for e in &emits {
289 self.emit(e, now).await;
290 }
291 }
292
293 async fn emit(&self, evt: &ZoneEvt, now: DateTime<Utc>) {
294 let id = format!("zev_{}", Uuid::new_v4().simple());
295 let evidence = if evt.event_type == "enter" {
296 self.copy_evidence(&evt.camera_id, &id).await
297 } else {
298 None
299 };
300
301 let _ = sqlx::query(
302 "INSERT INTO zone_events
303 (id, camera_id, zone_id, zone_name, track_id, event_type, label, timestamp,
304 dwell_seconds, evidence_path, created_at)
305 VALUES (?,?,?,?,?,?,?,?,?,?,?)",
306 )
307 .bind(&id)
308 .bind(&evt.camera_id)
309 .bind(&evt.zone_id)
310 .bind(&evt.zone_name)
311 .bind(&evt.track)
312 .bind(evt.event_type)
313 .bind(&evt.label)
314 .bind(now)
315 .bind(evt.dwell)
316 .bind(&evidence)
317 .bind(now)
318 .execute(&self.pool)
319 .await;
320
321 let _ = repo::log_event(
322 &self.pool,
323 Some(&evt.camera_id),
324 &format!("zone_{}", evt.event_type),
325 &evt.severity,
326 json!({
327 "zone_id": evt.zone_id,
328 "zone": evt.zone_name,
329 "track_id": evt.track,
330 "label": evt.label,
331 "dwell_seconds": evt.dwell,
332 "evidence": evidence,
333 }),
334 )
335 .await;
336
337 tracing::info!(camera_id = %evt.camera_id, zone = %evt.zone_name, track = %evt.track, event = evt.event_type, "zone event");
338 }
339
340 async fn copy_evidence(&self, camera_id: &str, id: &str) -> Option<String> {
342 let src = self.cfg.camera_frames_dir(camera_id).join("latest_sub.jpg");
343 let filename = format!("zoneevt_{id}.jpg");
344 let dst = self.cfg.snapshots_dir.join(&filename);
345 if tokio::fs::copy(&src, &dst).await.is_ok() {
346 Some(format!("/media/snapshots/{filename}"))
347 } else {
348 None
349 }
350 }
351}
352
353fn make_evt(
354 camera_id: &str,
355 zone: &Zone,
356 track: &str,
357 event_type: &'static str,
358 label: &str,
359 dwell: Option<f64>,
360) -> ZoneEvt {
361 ZoneEvt {
362 camera_id: camera_id.to_string(),
363 zone_id: zone.id.clone(),
364 zone_name: zone.name.clone(),
365 severity: zone.severity.clone(),
366 track: track.to_string(),
367 event_type,
368 label: label.to_string(),
369 dwell,
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 #[test]
378 fn point_in_polygon_basic() {
379 let sq = [[0.0, 0.0], [1.0, 0.0], [1.0, 1.0], [0.0, 1.0]];
380 assert!(point_in_polygon([0.5, 0.5], &sq));
381 assert!(!point_in_polygon([1.5, 0.5], &sq));
382 assert!(!point_in_polygon([0.5, 1.5], &sq));
383 }
384
385 #[test]
386 fn bbox_ground_point_is_bottom_center() {
387 assert_eq!(
388 bbox_ground_point(&json!([0.2, 0.1, 0.4, 0.6])),
389 Some([0.4, 0.7])
390 );
391 assert_eq!(bbox_ground_point(&json!([1, 2, 3])), None);
392 assert_eq!(bbox_ground_point(&json!(["x", 0, 0, 0])), None);
393 }
394
395 #[test]
396 fn parse_polygon_skips_non_finite_and_bad_points() {
397 assert_eq!(
398 parse_polygon(&json!([[0.0, 0.0], [1.0, 0.5], ["a", 1]])),
399 vec![[0.0, 0.0], [1.0, 0.5]]
400 );
401 }
402
403 #[test]
404 fn parse_labels_strings() {
405 assert_eq!(
406 parse_labels(&json!(["person", "car"])),
407 vec!["person", "car"]
408 );
409 }
410}