1use std::collections::HashMap;
16use std::sync::Arc;
17
18use chrono::{DateTime, Duration, Utc};
19use serde_json::{json, Value};
20use sqlx::SqlitePool;
21use tokio::sync::Mutex;
22use uuid::Uuid;
23
24use heldar_kernel::config::Config;
25use heldar_kernel::models::DetectionIngest;
26use heldar_kernel::repo;
27
28const STATE_TTL_SECS: i64 = 30;
30
31pub fn normalize_plate(s: &str) -> String {
33 s.chars()
34 .filter(|c| c.is_ascii_alphanumeric())
35 .map(|c| c.to_ascii_uppercase())
36 .collect()
37}
38
39pub fn is_plausible_plate(norm: &str) -> bool {
42 let len = norm.len();
43 if !(3..=10).contains(&len) {
44 return false;
45 }
46 let has_alpha = norm.bytes().any(|b| b.is_ascii_alphabetic());
47 let has_digit = norm.bytes().any(|b| b.is_ascii_digit());
48 has_alpha && has_digit
49}
50
51#[derive(Default, Clone)]
52struct PlateVote {
53 count: u32,
54 conf_sum: f64,
55}
56
57#[derive(Default, Clone)]
58struct ObservedAttrs {
59 vehicle_type: Option<(String, f64)>,
60 color: Option<(String, f64)>,
61 make: Option<(String, f64)>,
62 model: Option<(String, f64)>,
63}
64
65impl ObservedAttrs {
66 fn observe(slot: &mut Option<(String, f64)>, val: Option<&str>, conf: f64) {
68 if let Some(v) = val.filter(|s| !s.trim().is_empty()) {
69 let better = slot.as_ref().map(|(_, c)| conf > *c).unwrap_or(true);
70 if better {
71 *slot = Some((v.trim().to_string(), conf));
72 }
73 }
74 }
75}
76
77struct TrackVoteState {
78 camera_id: String,
79 site_id: Option<String>,
80 track: Option<String>,
81 direction: String,
82 votes: HashMap<String, PlateVote>,
83 raw_by_norm: HashMap<String, String>,
84 attrs: ObservedAttrs,
85 last_seen: DateTime<Utc>,
86 committed: bool,
87 model_versions: Value,
88 uid: u64,
91}
92
93struct CommitJob {
95 key: String,
97 uid: u64,
100 camera_id: String,
101 site_id: Option<String>,
102 track: Option<String>,
103 direction: String,
104 plate_norm: String,
105 plate_raw: String,
106 plate_conf: f64,
107 vehicle_type: Option<String>,
108 color: Option<String>,
109 make: Option<String>,
110 model: Option<String>,
111 model_versions: Value,
112}
113
114pub struct AnprEngine {
115 pool: SqlitePool,
116 cfg: Arc<Config>,
118 ecfg: Arc<crate::config::EntryConfig>,
120 state: Mutex<HashMap<String, TrackVoteState>>,
121 next_uid: std::sync::atomic::AtomicU64,
124}
125
126fn attr_str<'a>(attrs: &'a Value, key: &str) -> Option<&'a str> {
127 attrs.get(key).and_then(|v| v.as_str())
128}
129
130#[async_trait::async_trait]
131impl heldar_kernel::services::consumer::DetectionConsumer for AnprEngine {
132 fn name(&self) -> &'static str {
133 "anpr"
134 }
135 fn interested_in(&self, task_type: &str) -> bool {
137 task_type.eq_ignore_ascii_case("anpr")
138 }
139 async fn consume(&self, batch: &heldar_kernel::services::consumer::DetectionBatch<'_>) {
140 self.process(batch.camera_id, batch.site_id, batch.detections)
141 .await;
142 }
143}
144
145impl AnprEngine {
146 pub fn new(
147 pool: SqlitePool,
148 cfg: Arc<Config>,
149 ecfg: Arc<crate::config::EntryConfig>,
150 ) -> Arc<Self> {
151 Arc::new(Self {
152 pool,
153 cfg,
154 ecfg,
155 state: Mutex::new(HashMap::new()),
156 next_uid: std::sync::atomic::AtomicU64::new(1),
157 })
158 }
159
160 pub async fn process(
164 &self,
165 camera_id: &str,
166 site_id: Option<&str>,
167 detections: &[DetectionIngest],
168 ) {
169 let now = Utc::now();
170 let min_votes = self.ecfg.anpr_min_votes;
171 let mut jobs: Vec<CommitJob> = Vec::new();
172 {
173 let mut state = self.state.lock().await;
174 for d in detections {
175 let attrs = match d.attributes.as_ref() {
176 Some(a) if a.is_object() => a,
177 _ => continue,
178 };
179 let plate_raw = attr_str(attrs, "plate").map(|s| s.to_string());
180 let plate_norm = plate_raw
181 .as_deref()
182 .map(normalize_plate)
183 .unwrap_or_default();
184 let plate_conf = attrs
185 .get("plate_confidence")
186 .and_then(|v| v.as_f64())
187 .filter(|c| c.is_finite())
188 .unwrap_or(0.0);
189
190 let track = d.track_id.clone();
193 let sub = track
194 .clone()
195 .unwrap_or_else(|| format!("plate:{plate_norm}"));
196 if track.is_none() && plate_norm.is_empty() {
197 continue; }
199 let key = format!("{camera_id}|{sub}");
200 let entry = state.entry(key).or_insert_with(|| TrackVoteState {
201 camera_id: camera_id.to_string(),
202 site_id: site_id.map(|s| s.to_string()),
203 track: track.clone(),
204 direction: "unknown".into(),
205 votes: HashMap::new(),
206 raw_by_norm: HashMap::new(),
207 attrs: ObservedAttrs::default(),
208 last_seen: now,
209 committed: false,
210 model_versions: json!({}),
211 uid: self
212 .next_uid
213 .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
214 });
215 entry.last_seen = now;
216
217 if !plate_norm.is_empty() {
218 let v = entry.votes.entry(plate_norm.clone()).or_default();
219 v.count += 1;
220 v.conf_sum += plate_conf.max(0.0);
221 if let Some(raw) = &plate_raw {
222 entry.raw_by_norm.insert(plate_norm.clone(), raw.clone());
223 }
224 }
225 let vc = d.confidence.unwrap_or(plate_conf).max(0.0);
226 ObservedAttrs::observe(
227 &mut entry.attrs.vehicle_type,
228 attr_str(attrs, "vehicle_type").or(d.label.as_deref()),
229 vc,
230 );
231 ObservedAttrs::observe(&mut entry.attrs.color, attr_str(attrs, "color"), vc);
232 ObservedAttrs::observe(&mut entry.attrs.make, attr_str(attrs, "make"), vc);
233 ObservedAttrs::observe(&mut entry.attrs.model, attr_str(attrs, "model"), vc);
234 if let Some(dir) = attr_str(attrs, "direction") {
235 if matches!(dir, "inbound" | "outbound") {
236 entry.direction = dir.to_string();
237 }
238 }
239 if let Some(mv) = attrs.get("model_versions") {
240 if mv.is_object() {
241 entry.model_versions = mv.clone();
242 }
243 }
244 }
245
246 for (key, st) in state.iter_mut() {
250 if st.committed {
251 continue;
252 }
253 if let Some((_, count, _)) = winning_plate(&st.votes) {
254 if count >= min_votes {
255 if let Some(job) = build_job(key, st) {
256 jobs.push(job);
257 }
258 st.committed = true;
259 }
260 }
261 }
262
263 let cutoff = now - Duration::seconds(STATE_TTL_SECS);
268 let mut survivors: HashMap<String, TrackVoteState> = HashMap::new();
269 for (k, st) in state.drain() {
270 if st.last_seen >= cutoff {
271 survivors.insert(k, st);
272 } else if !st.committed && winning_plate(&st.votes).is_some() {
273 if let Some(job) = build_job(&k, &st) {
274 jobs.push(job);
275 }
276 }
277 }
278 *state = survivors;
279 }
280
281 for job in jobs {
282 let (key, uid) = (job.key.clone(), job.uid);
283 if !self.commit(job, now).await {
288 let mut state = self.state.lock().await;
289 if let Some(s) = state.get_mut(&key) {
290 if s.uid == uid {
291 s.committed = false;
292 }
293 }
294 }
295 }
296 }
297
298 async fn commit(&self, job: CommitJob, now: DateTime<Utc>) -> bool {
300 let resolution = self.resolve(&job).await;
301 let id = format!("evt_{}", Uuid::new_v4().simple());
302 let evidence_path = self.copy_evidence(&job.camera_id, &id).await;
303
304 let event_type = if job.direction == "outbound" {
305 "vehicle_exit"
306 } else {
307 "vehicle_entry"
308 };
309 let make_model = match (&job.make, &job.model) {
310 (Some(mk), Some(md)) => Some(format!("{mk} {md}")),
311 (Some(mk), None) => Some(mk.clone()),
312 (None, Some(md)) => Some(md.clone()),
313 (None, None) => None,
314 };
315 let plate_out = (!job.plate_norm.is_empty()).then(|| job.plate_raw.clone());
316 let subject = json!({
317 "type": "vehicle",
318 "plate": plate_out,
319 "plate_confidence": job.plate_conf,
320 "plate_valid": is_plausible_plate(&job.plate_norm),
321 "vehicle_type": job.vehicle_type,
322 "color": job.color,
323 "make_model": make_model,
324 });
325 let evidence = json!({ "snapshot_path": evidence_path });
326 let workflow = json!({ "status": resolution.workflow_status });
327 let audit = json!({ "created_by": "system", "model_versions": job.model_versions });
328 let plate_db = (!job.plate_norm.is_empty()).then(|| job.plate_norm.clone());
329
330 let res = sqlx::query(
331 "INSERT INTO entry_events
332 (id, site_id, camera_id, event_type, timestamp, direction, plate, plate_confidence,
333 subject, authorization, auth_status, evidence, workflow_status, workflow, audit,
334 track_id, created_at)
335 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
336 )
337 .bind(&id)
338 .bind(&job.site_id)
339 .bind(&job.camera_id)
340 .bind(event_type)
341 .bind(now)
342 .bind(&job.direction)
343 .bind(&plate_db)
344 .bind(job.plate_conf)
345 .bind(sqlx::types::Json(&subject))
346 .bind(sqlx::types::Json(&resolution.authorization))
347 .bind(&resolution.auth_status)
348 .bind(sqlx::types::Json(&evidence))
349 .bind(&resolution.workflow_status)
350 .bind(sqlx::types::Json(&workflow))
351 .bind(sqlx::types::Json(&audit))
352 .bind(&job.track)
353 .bind(now)
354 .execute(&self.pool)
355 .await;
356 if let Err(e) = res {
357 tracing::error!(error = %e, "anpr: failed to insert entry event");
358 return false;
359 }
360
361 let _ = repo::log_event(
363 &self.pool,
364 Some(&job.camera_id),
365 &format!("entry_{}", resolution.auth_status),
366 &resolution.severity,
367 json!({
368 "entry_event_id": id,
369 "plate": plate_db,
370 "auth_status": resolution.auth_status,
371 "source": resolution.source,
372 "event_type": event_type,
373 "evidence": evidence_path,
374 }),
375 )
376 .await;
377
378 tracing::info!(
379 camera_id = %job.camera_id,
380 plate = %job.plate_norm,
381 auth = %resolution.auth_status,
382 source = %resolution.source,
383 "entry event"
384 );
385 true
386 }
387
388 async fn resolve(&self, job: &CommitJob) -> Resolution {
392 let plate = &job.plate_norm;
393 let now = Utc::now();
394
395 if plate.is_empty() || !is_plausible_plate(plate) {
397 return Resolution::unmatched(json!({
398 "status": "unmatched",
399 "source": "none",
400 "note": if plate.is_empty() { "no_plate_read" } else { "plate_unreadable" },
401 }));
402 }
403
404 match sqlx::query_as::<_, (Option<String>, String)>(
408 "SELECT reason, severity FROM watchlist WHERE plate_norm = ? AND active = 1 AND kind = 'block'",
409 )
410 .bind(plate)
411 .fetch_optional(&self.pool)
412 .await
413 {
414 Ok(Some((reason, severity))) => {
415 return Resolution {
416 auth_status: "blocked".into(),
417 workflow_status: "pending".into(),
418 severity: if severity.is_empty() {
419 "critical".into()
420 } else {
421 severity
422 },
423 source: "watchlist".into(),
424 authorization: json!({
425 "status": "blocked", "source": "watchlist", "kind": "block", "reason": reason,
426 }),
427 };
428 }
429 Ok(None) => {}
430 Err(e) => {
431 tracing::error!(error = %e, plate = %plate, "anpr: block-watchlist lookup failed; failing closed to exception");
432 return Resolution {
433 auth_status: "exception".into(),
434 workflow_status: "pending".into(),
435 severity: "warning".into(),
436 source: "system".into(),
437 authorization: json!({
438 "status": "exception", "source": "system", "note": "watchlist_lookup_failed",
439 }),
440 };
441 }
442 }
443
444 let vehicle = sqlx::query_as::<_, RegVehicle>(
448 "SELECT id, vehicle_type, color, valid_from, valid_until
449 FROM vehicles WHERE plate_norm = ? AND active = 1",
450 )
451 .bind(plate)
452 .fetch_optional(&self.pool)
453 .await
454 .ok()
455 .flatten();
456
457 if let Some(v) = vehicle {
458 let in_window = v.valid_from.map(|t| t <= now).unwrap_or(true)
459 && v.valid_until.map(|t| t >= now).unwrap_or(true);
460 if !in_window {
461 return Resolution {
462 auth_status: "exception".into(),
463 workflow_status: "pending".into(),
464 severity: "warning".into(),
465 source: "registered_vehicle".into(),
466 authorization: json!({
467 "status": "exception", "source": "registered_vehicle",
468 "vehicle_id": v.id, "note": "outside_validity_window",
469 }),
470 };
471 }
472 let mut mismatches: Vec<String> = Vec::new();
474 check_mismatch(
475 &mut mismatches,
476 "color",
477 v.color.as_deref(),
478 job.color.as_deref(),
479 );
480 check_mismatch(
481 &mut mismatches,
482 "vehicle_type",
483 v.vehicle_type.as_deref(),
484 job.vehicle_type.as_deref(),
485 );
486 if mismatches.is_empty() {
487 let alert = self.has_alert(plate).await;
490 let status = if alert { "exception" } else { "matched" };
491 return Resolution {
492 auth_status: status.into(),
493 workflow_status: if alert {
494 "pending".into()
495 } else {
496 "auto".into()
497 },
498 severity: if alert {
499 "warning".into()
500 } else {
501 "info".into()
502 },
503 source: "registered_vehicle".into(),
504 authorization: json!({
505 "status": status, "source": "registered_vehicle",
506 "vehicle_id": v.id, "alert": alert,
507 }),
508 };
509 } else {
510 return Resolution {
511 auth_status: "exception".into(),
512 workflow_status: "pending".into(),
513 severity: "warning".into(),
514 source: "registered_vehicle".into(),
515 authorization: json!({
516 "status": "exception", "source": "registered_vehicle",
517 "vehicle_id": v.id, "mismatches": mismatches,
518 }),
519 };
520 }
521 }
522
523 let pass = sqlx::query_as::<_, (String, String)>(
527 "SELECT id, status FROM visitor_passes
528 WHERE plate_norm = ? AND status IN ('active','checked_in')
529 AND valid_from <= ? AND valid_until >= ?
530 ORDER BY valid_until DESC LIMIT 1",
531 )
532 .bind(plate)
533 .bind(now)
534 .bind(now)
535 .fetch_optional(&self.pool)
536 .await
537 .ok()
538 .flatten();
539 if let Some((pass_id, status)) = pass {
540 if status == "active" && job.direction != "outbound" {
542 let _ = sqlx::query(
543 "UPDATE visitor_passes SET status='checked_in', checked_in_at=?, updated_at=? WHERE id=?",
544 )
545 .bind(now)
546 .bind(now)
547 .bind(&pass_id)
548 .execute(&self.pool)
549 .await;
550 }
551 let alert = self.has_alert(plate).await;
552 let st = if alert { "exception" } else { "matched" };
553 return Resolution {
554 auth_status: st.into(),
555 workflow_status: if alert {
556 "pending".into()
557 } else {
558 "auto".into()
559 },
560 severity: if alert {
561 "warning".into()
562 } else {
563 "info".into()
564 },
565 source: "visitor_pass".into(),
566 authorization: json!({
567 "status": st, "source": "visitor_pass", "pass_id": pass_id, "alert": alert,
568 }),
569 };
570 }
571 if let Ok(Some((pass_id,))) = sqlx::query_as::<_, (String,)>(
573 "SELECT id FROM visitor_passes WHERE plate_norm = ? AND status IN ('active','checked_in')
574 ORDER BY valid_until DESC LIMIT 1",
575 )
576 .bind(plate)
577 .fetch_optional(&self.pool)
578 .await
579 {
580 return Resolution {
581 auth_status: "exception".into(),
582 workflow_status: "pending".into(),
583 severity: "warning".into(),
584 source: "visitor_pass".into(),
585 authorization: json!({
586 "status": "exception", "source": "visitor_pass",
587 "pass_id": pass_id, "note": "pass_outside_validity_window",
588 }),
589 };
590 }
591
592 if let Ok(Some((reason,))) = sqlx::query_as::<_, (Option<String>,)>(
594 "SELECT reason FROM watchlist WHERE plate_norm = ? AND active = 1 AND kind = 'vip'",
595 )
596 .bind(plate)
597 .fetch_optional(&self.pool)
598 .await
599 {
600 return Resolution {
601 auth_status: "matched".into(),
602 workflow_status: "auto".into(),
603 severity: "info".into(),
604 source: "watchlist".into(),
605 authorization: json!({
606 "status": "matched", "source": "watchlist", "kind": "vip", "reason": reason,
607 }),
608 };
609 }
610
611 if self.has_alert(plate).await {
613 return Resolution {
614 auth_status: "exception".into(),
615 workflow_status: "pending".into(),
616 severity: "warning".into(),
617 source: "watchlist".into(),
618 authorization: json!({ "status": "exception", "source": "watchlist", "kind": "alert" }),
619 };
620 }
621 Resolution::unmatched(json!({ "status": "unmatched", "source": "none" }))
622 }
623
624 async fn has_alert(&self, plate: &str) -> bool {
626 sqlx::query_scalar::<_, i64>(
627 "SELECT COUNT(*) FROM watchlist WHERE plate_norm = ? AND active = 1 AND kind = 'alert'",
628 )
629 .bind(plate)
630 .fetch_one(&self.pool)
631 .await
632 .unwrap_or(0)
633 > 0
634 }
635
636 async fn copy_evidence(&self, camera_id: &str, id: &str) -> Option<String> {
638 let dir = self.cfg.camera_frames_dir(camera_id);
639 let filename = format!("entryevt_{id}.jpg");
640 let dst = self.cfg.snapshots_dir.join(&filename);
641 for profile in ["main", "sub"] {
642 let src = dir.join(format!("latest_{profile}.jpg"));
643 if tokio::fs::copy(&src, &dst).await.is_ok() {
644 return Some(format!("/media/snapshots/{filename}"));
645 }
646 }
647 None
648 }
649}
650
651#[derive(sqlx::FromRow)]
652struct RegVehicle {
653 id: String,
654 vehicle_type: Option<String>,
655 color: Option<String>,
656 valid_from: Option<DateTime<Utc>>,
657 valid_until: Option<DateTime<Utc>>,
658}
659
660struct Resolution {
661 auth_status: String,
662 workflow_status: String,
663 severity: String,
664 source: String,
665 authorization: Value,
666}
667
668impl Resolution {
669 fn unmatched(authorization: Value) -> Self {
670 Resolution {
671 auth_status: "unmatched".into(),
672 workflow_status: "pending".into(),
673 severity: "warning".into(),
674 source: "none".into(),
675 authorization,
676 }
677 }
678}
679
680fn check_mismatch(
682 out: &mut Vec<String>,
683 field: &str,
684 registered: Option<&str>,
685 detected: Option<&str>,
686) {
687 if let (Some(r), Some(d)) = (registered, detected) {
688 if !r.trim().is_empty() && !d.trim().is_empty() && !r.trim().eq_ignore_ascii_case(d.trim())
689 {
690 out.push(format!("{field}: registered={r}, detected={d}"));
691 }
692 }
693}
694
695fn winning_plate(votes: &HashMap<String, PlateVote>) -> Option<(String, u32, f64)> {
699 let leader = |plausible_only: bool| {
700 votes
701 .iter()
702 .filter(|(norm, _)| !plausible_only || is_plausible_plate(norm))
703 .max_by(|a, b| {
704 a.1.count.cmp(&b.1.count).then(
705 a.1.conf_sum
706 .partial_cmp(&b.1.conf_sum)
707 .unwrap_or(std::cmp::Ordering::Equal),
708 )
709 })
710 };
711 let (norm, vote) = leader(true).or_else(|| leader(false))?;
712 let avg = if vote.count > 0 {
713 vote.conf_sum / vote.count as f64
714 } else {
715 0.0
716 };
717 Some((norm.clone(), vote.count, avg))
718}
719
720fn build_job(key: &str, st: &TrackVoteState) -> Option<CommitJob> {
721 let (plate_norm, _count, plate_conf) = winning_plate(&st.votes)?;
722 let plate_raw = st
723 .raw_by_norm
724 .get(&plate_norm)
725 .cloned()
726 .unwrap_or_else(|| plate_norm.clone());
727 Some(CommitJob {
728 key: key.to_string(),
729 uid: st.uid,
730 camera_id: st.camera_id.clone(),
731 site_id: st.site_id.clone(),
732 track: st.track.clone(),
733 direction: st.direction.clone(),
734 plate_norm,
735 plate_raw,
736 plate_conf,
737 vehicle_type: st.attrs.vehicle_type.as_ref().map(|(v, _)| v.clone()),
738 color: st.attrs.color.as_ref().map(|(v, _)| v.clone()),
739 make: st.attrs.make.as_ref().map(|(v, _)| v.clone()),
740 model: st.attrs.model.as_ref().map(|(v, _)| v.clone()),
741 model_versions: st.model_versions.clone(),
742 })
743}
744
745#[cfg(test)]
746mod tests {
747 use super::*;
748
749 fn votes(pairs: &[(&str, u32, f64)]) -> HashMap<String, PlateVote> {
750 pairs
751 .iter()
752 .map(|(p, c, s)| {
753 (
754 p.to_string(),
755 PlateVote {
756 count: *c,
757 conf_sum: *s,
758 },
759 )
760 })
761 .collect()
762 }
763
764 #[test]
765 fn normalize_strips_and_uppercases() {
766 assert_eq!(normalize_plate("abc 1234"), "ABC1234");
767 assert_eq!(normalize_plate("W-XY 88.88"), "WXY8888");
768 assert_eq!(normalize_plate(""), "");
769 }
770
771 #[test]
772 fn winning_plate_prefers_plausible_over_higher_voted_implausible() {
773 let v = votes(&[("12345", 5, 4.0), ("ABC1234", 2, 1.8)]);
775 let (norm, count, _) = winning_plate(&v).unwrap();
776 assert_eq!(norm, "ABC1234");
777 assert_eq!(count, 2);
778 }
779
780 #[test]
781 fn winning_plate_picks_top_votes_among_plausible() {
782 let v = votes(&[("ABC1234", 2, 1.8), ("ABD1234", 5, 4.5)]);
783 let (norm, count, _) = winning_plate(&v).unwrap();
784 assert_eq!(norm, "ABD1234");
785 assert_eq!(count, 5);
786 }
787
788 #[test]
789 fn winning_plate_none_when_no_votes() {
790 assert!(winning_plate(&HashMap::new()).is_none());
791 }
792
793 #[test]
794 fn plausibility_requires_alpha_and_digit() {
795 assert!(is_plausible_plate("ABC1234"));
796 assert!(is_plausible_plate("WA12B"));
797 assert!(!is_plausible_plate("1234"));
798 assert!(!is_plausible_plate("ABCDE"));
799 assert!(!is_plausible_plate("A1"));
800 assert!(!is_plausible_plate("ABCDEFGHIJK1"));
801 }
802
803 #[test]
804 fn mismatch_only_when_both_known_and_differ() {
805 let mut m = Vec::new();
806 check_mismatch(&mut m, "color", Some("white"), Some("black"));
807 check_mismatch(&mut m, "color", Some("white"), Some("WHITE"));
808 check_mismatch(&mut m, "vehicle_type", None, Some("suv"));
809 check_mismatch(&mut m, "vehicle_type", Some("car"), None);
810 assert_eq!(m.len(), 1);
811 assert!(m[0].contains("color"));
812 }
813}