Skip to main content

heldar_entry/
anpr.rs

1//! ANPR entry engine (Stage 4): consolidates per-frame plate reads from the AI worker into one
2//! authoritative entry/exit event per vehicle track via temporal voting, resolves the plate against
3//! the registered-vehicle / visitor-pass / watchlist registry, classifies authorization
4//! (matched / exception / unmatched / blocked), and writes a canonical entry event with
5//! an evidence frame.
6//!
7//! Like the zone engine, all timing is driven by SERVER time and state is keyed per (camera, track)
8//! in memory. A track commits once its WINNING plate (plausible-preferred, majority vote) has
9//! accumulated `min_votes` reads, or on TTL prune if a vehicle passed too quickly to reach the
10//! threshold but did produce at least one plate read. Plate is the PRIMARY identity
11//! anchor; vehicle attributes (type/color/make/model) are SECONDARY — an attribute mismatch against
12//! a registered plate raises an *exception for guard review*, never an automatic rejection
13//! (by policy: no hard access decision on make/model without local benchmarking).
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use chrono::{DateTime, Duration, Utc};
19use serde_json::{json, Value};
20use sqlx::SqlitePool;
21use tokio::sync::Mutex;
22use uuid::Uuid;
23
24use heldar_kernel::config::Config;
25use heldar_kernel::models::DetectionIngest;
26use heldar_kernel::repo;
27
28/// Retain a track's voting state this long (server time) after it was last seen before pruning.
29const STATE_TTL_SECS: i64 = 30;
30
31/// Normalize a plate string to its lookup key: ASCII alphanumerics only, uppercased.
32pub fn normalize_plate(s: &str) -> String {
33    s.chars()
34        .filter(|c| c.is_ascii_alphanumeric())
35        .map(|c| c.to_ascii_uppercase())
36        .collect()
37}
38
39/// Loose plausibility gate for a normalized plate (Malaysian plates mix letters and digits, 3–10
40/// chars). Used to flag likely-unreadable OCR rather than to hard-reject — guards review the rest.
41pub fn is_plausible_plate(norm: &str) -> bool {
42    let len = norm.len();
43    if !(3..=10).contains(&len) {
44        return false;
45    }
46    let has_alpha = norm.bytes().any(|b| b.is_ascii_alphabetic());
47    let has_digit = norm.bytes().any(|b| b.is_ascii_digit());
48    has_alpha && has_digit
49}
50
51#[derive(Default, Clone)]
52struct PlateVote {
53    count: u32,
54    conf_sum: f64,
55}
56
57#[derive(Default, Clone)]
58struct ObservedAttrs {
59    vehicle_type: Option<(String, f64)>,
60    color: Option<(String, f64)>,
61    make: Option<(String, f64)>,
62    model: Option<(String, f64)>,
63}
64
65impl ObservedAttrs {
66    /// Keep the highest-confidence observation for each attribute.
67    fn observe(slot: &mut Option<(String, f64)>, val: Option<&str>, conf: f64) {
68        if let Some(v) = val.filter(|s| !s.trim().is_empty()) {
69            let better = slot.as_ref().map(|(_, c)| conf > *c).unwrap_or(true);
70            if better {
71                *slot = Some((v.trim().to_string(), conf));
72            }
73        }
74    }
75}
76
77struct TrackVoteState {
78    camera_id: String,
79    site_id: Option<String>,
80    track: Option<String>,
81    direction: String,
82    votes: HashMap<String, PlateVote>,
83    raw_by_norm: HashMap<String, String>,
84    attrs: ObservedAttrs,
85    last_seen: DateTime<Utc>,
86    committed: bool,
87    model_versions: Value,
88    /// Unique per instance (see [`AnprEngine::next_uid`]); distinguishes a track from a successor
89    /// that reused the same map key.
90    uid: u64,
91}
92
93/// A consolidated track ready to resolve + emit (built under the lock, processed after release).
94struct CommitJob {
95    /// State-map key, so a failed insert can clear `committed` and let the track retry.
96    key: String,
97    /// Identity of the track-state this job was built from. A failed insert clears `committed` only
98    /// if the live state entry STILL has this uid (else the key was reused by a different vehicle).
99    uid: u64,
100    camera_id: String,
101    site_id: Option<String>,
102    track: Option<String>,
103    direction: String,
104    plate_norm: String,
105    plate_raw: String,
106    plate_conf: f64,
107    vehicle_type: Option<String>,
108    color: Option<String>,
109    make: Option<String>,
110    model: Option<String>,
111    model_versions: Value,
112}
113
114pub struct AnprEngine {
115    pool: SqlitePool,
116    /// Kernel config — used for media paths (evidence frames).
117    cfg: Arc<Config>,
118    /// Entry-app config — voting threshold.
119    ecfg: Arc<crate::config::EntryConfig>,
120    state: Mutex<HashMap<String, TrackVoteState>>,
121    /// Monotonic id stamped on each track-state instance. Lets a failed commit clear `committed`
122    /// only on the SAME track it committed — never on a successor that reused the (track-id) key.
123    next_uid: std::sync::atomic::AtomicU64,
124}
125
126fn attr_str<'a>(attrs: &'a Value, key: &str) -> Option<&'a str> {
127    attrs.get(key).and_then(|v| v.as_str())
128}
129
130#[async_trait::async_trait]
131impl heldar_kernel::services::consumer::DetectionConsumer for AnprEngine {
132    fn name(&self) -> &'static str {
133        "anpr"
134    }
135    /// Only the ANPR task feeds the entry engine.
136    fn interested_in(&self, task_type: &str) -> bool {
137        task_type.eq_ignore_ascii_case("anpr")
138    }
139    async fn consume(&self, batch: &heldar_kernel::services::consumer::DetectionBatch<'_>) {
140        self.process(batch.camera_id, batch.site_id, batch.detections)
141            .await;
142    }
143}
144
145impl AnprEngine {
146    pub fn new(
147        pool: SqlitePool,
148        cfg: Arc<Config>,
149        ecfg: Arc<crate::config::EntryConfig>,
150    ) -> Arc<Self> {
151        Arc::new(Self {
152            pool,
153            cfg,
154            ecfg,
155            state: Mutex::new(HashMap::new()),
156            next_uid: std::sync::atomic::AtomicU64::new(1),
157        })
158    }
159
160    /// Feed a batch of ANPR detections for a camera. Each detection carries vehicle + plate fields
161    /// in `attributes` (plate, plate_confidence, vehicle_type, color, make, model, direction,
162    /// model_versions). Commits tracks that reach the vote threshold and prunes/commits stale ones.
163    pub async fn process(
164        &self,
165        camera_id: &str,
166        site_id: Option<&str>,
167        detections: &[DetectionIngest],
168    ) {
169        let now = Utc::now();
170        let min_votes = self.ecfg.anpr_min_votes;
171        let mut jobs: Vec<CommitJob> = Vec::new();
172        {
173            let mut state = self.state.lock().await;
174            for d in detections {
175                let attrs = match d.attributes.as_ref() {
176                    Some(a) if a.is_object() => a,
177                    _ => continue,
178                };
179                let plate_raw = attr_str(attrs, "plate").map(|s| s.to_string());
180                let plate_norm = plate_raw
181                    .as_deref()
182                    .map(normalize_plate)
183                    .unwrap_or_default();
184                let plate_conf = attrs
185                    .get("plate_confidence")
186                    .and_then(|v| v.as_f64())
187                    .filter(|c| c.is_finite())
188                    .unwrap_or(0.0);
189
190                // Key per (camera, track). Without a track id, fall back to the plate so repeated
191                // reads of the same plate still consolidate (and dedupe) within the TTL window.
192                let track = d.track_id.clone();
193                let sub = track
194                    .clone()
195                    .unwrap_or_else(|| format!("plate:{plate_norm}"));
196                if track.is_none() && plate_norm.is_empty() {
197                    continue; // nothing to key on
198                }
199                let key = format!("{camera_id}|{sub}");
200                let entry = state.entry(key).or_insert_with(|| TrackVoteState {
201                    camera_id: camera_id.to_string(),
202                    site_id: site_id.map(|s| s.to_string()),
203                    track: track.clone(),
204                    direction: "unknown".into(),
205                    votes: HashMap::new(),
206                    raw_by_norm: HashMap::new(),
207                    attrs: ObservedAttrs::default(),
208                    last_seen: now,
209                    committed: false,
210                    model_versions: json!({}),
211                    uid: self
212                        .next_uid
213                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed),
214                });
215                entry.last_seen = now;
216
217                if !plate_norm.is_empty() {
218                    let v = entry.votes.entry(plate_norm.clone()).or_default();
219                    v.count += 1;
220                    v.conf_sum += plate_conf.max(0.0);
221                    if let Some(raw) = &plate_raw {
222                        entry.raw_by_norm.insert(plate_norm.clone(), raw.clone());
223                    }
224                }
225                let vc = d.confidence.unwrap_or(plate_conf).max(0.0);
226                ObservedAttrs::observe(
227                    &mut entry.attrs.vehicle_type,
228                    attr_str(attrs, "vehicle_type").or(d.label.as_deref()),
229                    vc,
230                );
231                ObservedAttrs::observe(&mut entry.attrs.color, attr_str(attrs, "color"), vc);
232                ObservedAttrs::observe(&mut entry.attrs.make, attr_str(attrs, "make"), vc);
233                ObservedAttrs::observe(&mut entry.attrs.model, attr_str(attrs, "model"), vc);
234                if let Some(dir) = attr_str(attrs, "direction") {
235                    if matches!(dir, "inbound" | "outbound") {
236                        entry.direction = dir.to_string();
237                    }
238                }
239                if let Some(mv) = attrs.get("model_versions") {
240                    if mv.is_object() {
241                        entry.model_versions = mv.clone();
242                    }
243                }
244            }
245
246            // Commit tracks whose WINNING plate has reached the vote threshold (temporal voting on
247            // the plate itself — not the raw detection count, which would let a single noisy read or
248            // a plateless track trip the gate).
249            for (key, st) in state.iter_mut() {
250                if st.committed {
251                    continue;
252                }
253                if let Some((_, count, _)) = winning_plate(&st.votes) {
254                    if count >= min_votes {
255                        if let Some(job) = build_job(key, st) {
256                            jobs.push(job);
257                        }
258                        st.committed = true;
259                    }
260                }
261            }
262
263            // Prune stale tracks; commit-on-prune for vehicles that passed too quickly to reach the
264            // threshold but DID produce at least one plate read. Tracks that never yielded any plate
265            // (pure vehicle detections) are dropped silently so the entry log is not flooded with
266            // "unmatched" events for every transient background vehicle.
267            let cutoff = now - Duration::seconds(STATE_TTL_SECS);
268            let mut survivors: HashMap<String, TrackVoteState> = HashMap::new();
269            for (k, st) in state.drain() {
270                if st.last_seen >= cutoff {
271                    survivors.insert(k, st);
272                } else if !st.committed && winning_plate(&st.votes).is_some() {
273                    if let Some(job) = build_job(&k, &st) {
274                        jobs.push(job);
275                    }
276                }
277            }
278            *state = survivors;
279        }
280
281        for job in jobs {
282            let (key, uid) = (job.key.clone(), job.uid);
283            // If the insert fails, clear `committed` so a still-live track retries next batch
284            // instead of silently dropping the event — but ONLY if the live state entry is still the
285            // same track (matching uid). A concurrent batch may have pruned it and a reused track-id
286            // key now points to a different vehicle; clearing that one would duplicate its event.
287            if !self.commit(job, now).await {
288                let mut state = self.state.lock().await;
289                if let Some(s) = state.get_mut(&key) {
290                    if s.uid == uid {
291                        s.committed = false;
292                    }
293                }
294            }
295        }
296    }
297
298    /// Returns true on a successful entry-event insert.
299    async fn commit(&self, job: CommitJob, now: DateTime<Utc>) -> bool {
300        let resolution = self.resolve(&job).await;
301        let id = format!("evt_{}", Uuid::new_v4().simple());
302        let evidence_path = self.copy_evidence(&job.camera_id, &id).await;
303
304        let event_type = if job.direction == "outbound" {
305            "vehicle_exit"
306        } else {
307            "vehicle_entry"
308        };
309        let make_model = match (&job.make, &job.model) {
310            (Some(mk), Some(md)) => Some(format!("{mk} {md}")),
311            (Some(mk), None) => Some(mk.clone()),
312            (None, Some(md)) => Some(md.clone()),
313            (None, None) => None,
314        };
315        let plate_out = (!job.plate_norm.is_empty()).then(|| job.plate_raw.clone());
316        let subject = json!({
317            "type": "vehicle",
318            "plate": plate_out,
319            "plate_confidence": job.plate_conf,
320            "plate_valid": is_plausible_plate(&job.plate_norm),
321            "vehicle_type": job.vehicle_type,
322            "color": job.color,
323            "make_model": make_model,
324        });
325        let evidence = json!({ "snapshot_path": evidence_path });
326        let workflow = json!({ "status": resolution.workflow_status });
327        let audit = json!({ "created_by": "system", "model_versions": job.model_versions });
328        let plate_db = (!job.plate_norm.is_empty()).then(|| job.plate_norm.clone());
329
330        let res = sqlx::query(
331            "INSERT INTO entry_events
332               (id, site_id, camera_id, event_type, timestamp, direction, plate, plate_confidence,
333                subject, authorization, auth_status, evidence, workflow_status, workflow, audit,
334                track_id, created_at)
335             VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
336        )
337        .bind(&id)
338        .bind(&job.site_id)
339        .bind(&job.camera_id)
340        .bind(event_type)
341        .bind(now)
342        .bind(&job.direction)
343        .bind(&plate_db)
344        .bind(job.plate_conf)
345        .bind(sqlx::types::Json(&subject))
346        .bind(sqlx::types::Json(&resolution.authorization))
347        .bind(&resolution.auth_status)
348        .bind(sqlx::types::Json(&evidence))
349        .bind(&resolution.workflow_status)
350        .bind(sqlx::types::Json(&workflow))
351        .bind(sqlx::types::Json(&audit))
352        .bind(&job.track)
353        .bind(now)
354        .execute(&self.pool)
355        .await;
356        if let Err(e) = res {
357            tracing::error!(error = %e, "anpr: failed to insert entry event");
358            return false;
359        }
360
361        // Mirror into the generic event log so the alert notifier + metrics see exceptions/blocks.
362        let _ = repo::log_event(
363            &self.pool,
364            Some(&job.camera_id),
365            &format!("entry_{}", resolution.auth_status),
366            &resolution.severity,
367            json!({
368                "entry_event_id": id,
369                "plate": plate_db,
370                "auth_status": resolution.auth_status,
371                "source": resolution.source,
372                "event_type": event_type,
373                "evidence": evidence_path,
374            }),
375        )
376        .await;
377
378        tracing::info!(
379            camera_id = %job.camera_id,
380            plate = %job.plate_norm,
381            auth = %resolution.auth_status,
382            source = %resolution.source,
383            "entry event"
384        );
385        true
386    }
387
388    /// Identity resolver: classify a consolidated plate against the registry. Precedence:
389    /// active block-watchlist (security) → registered vehicle (attribute check) → active visitor
390    /// pass → alert/vip watchlist → unmatched. Mutates a matched pass to checked_in on entry.
391    async fn resolve(&self, job: &CommitJob) -> Resolution {
392        let plate = &job.plate_norm;
393        let now = Utc::now();
394
395        // Unreadable plate: nothing to look up — emit for guard review.
396        if plate.is_empty() || !is_plausible_plate(plate) {
397            return Resolution::unmatched(json!({
398                "status": "unmatched",
399                "source": "none",
400                "note": if plate.is_empty() { "no_plate_read" } else { "plate_unreadable" },
401            }));
402        }
403
404        // 1) Block watchlist wins outright. This is the only security-critical lookup, so it must
405        //    FAIL CLOSED: a DB error here must not silently fall through to an "allow" branch — flag
406        //    the event for guard review instead.
407        match sqlx::query_as::<_, (Option<String>, String)>(
408            "SELECT reason, severity FROM watchlist WHERE plate_norm = ? AND active = 1 AND kind = 'block'",
409        )
410        .bind(plate)
411        .fetch_optional(&self.pool)
412        .await
413        {
414            Ok(Some((reason, severity))) => {
415                return Resolution {
416                    auth_status: "blocked".into(),
417                    workflow_status: "pending".into(),
418                    severity: if severity.is_empty() {
419                        "critical".into()
420                    } else {
421                        severity
422                    },
423                    source: "watchlist".into(),
424                    authorization: json!({
425                        "status": "blocked", "source": "watchlist", "kind": "block", "reason": reason,
426                    }),
427                };
428            }
429            Ok(None) => {}
430            Err(e) => {
431                tracing::error!(error = %e, plate = %plate, "anpr: block-watchlist lookup failed; failing closed to exception");
432                return Resolution {
433                    auth_status: "exception".into(),
434                    workflow_status: "pending".into(),
435                    severity: "warning".into(),
436                    source: "system".into(),
437                    authorization: json!({
438                        "status": "exception", "source": "system", "note": "watchlist_lookup_failed",
439                    }),
440                };
441            }
442        }
443
444        // 2) Registered vehicle (within validity window, if set). We deliberately only compare
445        //    color + vehicle_type for mismatch — make/model is assistive metadata only and not
446        //    reliable enough to drive an exception.
447        let vehicle = sqlx::query_as::<_, RegVehicle>(
448            "SELECT id, vehicle_type, color, valid_from, valid_until
449               FROM vehicles WHERE plate_norm = ? AND active = 1",
450        )
451        .bind(plate)
452        .fetch_optional(&self.pool)
453        .await
454        .ok()
455        .flatten();
456
457        if let Some(v) = vehicle {
458            let in_window = v.valid_from.map(|t| t <= now).unwrap_or(true)
459                && v.valid_until.map(|t| t >= now).unwrap_or(true);
460            if !in_window {
461                return Resolution {
462                    auth_status: "exception".into(),
463                    workflow_status: "pending".into(),
464                    severity: "warning".into(),
465                    source: "registered_vehicle".into(),
466                    authorization: json!({
467                        "status": "exception", "source": "registered_vehicle",
468                        "vehicle_id": v.id, "note": "outside_validity_window",
469                    }),
470                };
471            }
472            // Secondary verification: attribute mismatch → exception (never an auto-reject).
473            let mut mismatches: Vec<String> = Vec::new();
474            check_mismatch(
475                &mut mismatches,
476                "color",
477                v.color.as_deref(),
478                job.color.as_deref(),
479            );
480            check_mismatch(
481                &mut mismatches,
482                "vehicle_type",
483                v.vehicle_type.as_deref(),
484                job.vehicle_type.as_deref(),
485            );
486            if mismatches.is_empty() {
487                // An alert listing downgrades a clean match to a review exception; keep the
488                // denormalized column and the embedded authorization JSON in lock-step.
489                let alert = self.has_alert(plate).await;
490                let status = if alert { "exception" } else { "matched" };
491                return Resolution {
492                    auth_status: status.into(),
493                    workflow_status: if alert {
494                        "pending".into()
495                    } else {
496                        "auto".into()
497                    },
498                    severity: if alert {
499                        "warning".into()
500                    } else {
501                        "info".into()
502                    },
503                    source: "registered_vehicle".into(),
504                    authorization: json!({
505                        "status": status, "source": "registered_vehicle",
506                        "vehicle_id": v.id, "alert": alert,
507                    }),
508                };
509            } else {
510                return Resolution {
511                    auth_status: "exception".into(),
512                    workflow_status: "pending".into(),
513                    severity: "warning".into(),
514                    source: "registered_vehicle".into(),
515                    authorization: json!({
516                        "status": "exception", "source": "registered_vehicle",
517                        "vehicle_id": v.id, "mismatches": mismatches,
518                    }),
519                };
520            }
521        }
522
523        // 3) Active visitor pass that is CURRENTLY within its validity window. The window is filtered
524        //    in SQL (a plate may have several active passes — e.g. a future-dated one with a later
525        //    valid_until — and we must not let that mask a presently-valid pass).
526        let pass = sqlx::query_as::<_, (String, String)>(
527            "SELECT id, status FROM visitor_passes
528              WHERE plate_norm = ? AND status IN ('active','checked_in')
529                AND valid_from <= ? AND valid_until >= ?
530              ORDER BY valid_until DESC LIMIT 1",
531        )
532        .bind(plate)
533        .bind(now)
534        .bind(now)
535        .fetch_optional(&self.pool)
536        .await
537        .ok()
538        .flatten();
539        if let Some((pass_id, status)) = pass {
540            // Auto check-in on an inbound match.
541            if status == "active" && job.direction != "outbound" {
542                let _ = sqlx::query(
543                    "UPDATE visitor_passes SET status='checked_in', checked_in_at=?, updated_at=? WHERE id=?",
544                )
545                .bind(now)
546                .bind(now)
547                .bind(&pass_id)
548                .execute(&self.pool)
549                .await;
550            }
551            let alert = self.has_alert(plate).await;
552            let st = if alert { "exception" } else { "matched" };
553            return Resolution {
554                auth_status: st.into(),
555                workflow_status: if alert {
556                    "pending".into()
557                } else {
558                    "auto".into()
559                },
560                severity: if alert {
561                    "warning".into()
562                } else {
563                    "info".into()
564                },
565                source: "visitor_pass".into(),
566                authorization: json!({
567                    "status": st, "source": "visitor_pass", "pass_id": pass_id, "alert": alert,
568                }),
569            };
570        }
571        // 3b) A pass exists for this plate but is outside its validity window → exception for review.
572        if let Ok(Some((pass_id,))) = sqlx::query_as::<_, (String,)>(
573            "SELECT id FROM visitor_passes WHERE plate_norm = ? AND status IN ('active','checked_in')
574              ORDER BY valid_until DESC LIMIT 1",
575        )
576        .bind(plate)
577        .fetch_optional(&self.pool)
578        .await
579        {
580            return Resolution {
581                auth_status: "exception".into(),
582                workflow_status: "pending".into(),
583                severity: "warning".into(),
584                source: "visitor_pass".into(),
585                authorization: json!({
586                    "status": "exception", "source": "visitor_pass",
587                    "pass_id": pass_id, "note": "pass_outside_validity_window",
588                }),
589            };
590        }
591
592        // 4) VIP watchlist (informational allow) — only reached when not registered/passed.
593        if let Ok(Some((reason,))) = sqlx::query_as::<_, (Option<String>,)>(
594            "SELECT reason FROM watchlist WHERE plate_norm = ? AND active = 1 AND kind = 'vip'",
595        )
596        .bind(plate)
597        .fetch_optional(&self.pool)
598        .await
599        {
600            return Resolution {
601                auth_status: "matched".into(),
602                workflow_status: "auto".into(),
603                severity: "info".into(),
604                source: "watchlist".into(),
605                authorization: json!({
606                    "status": "matched", "source": "watchlist", "kind": "vip", "reason": reason,
607                }),
608            };
609        }
610
611        // 5) Unknown plate. If alert-listed, escalate to exception; else simply unmatched.
612        if self.has_alert(plate).await {
613            return Resolution {
614                auth_status: "exception".into(),
615                workflow_status: "pending".into(),
616                severity: "warning".into(),
617                source: "watchlist".into(),
618                authorization: json!({ "status": "exception", "source": "watchlist", "kind": "alert" }),
619            };
620        }
621        Resolution::unmatched(json!({ "status": "unmatched", "source": "none" }))
622    }
623
624    /// True if the plate is on an active alert watchlist (flag-for-review without blocking).
625    async fn has_alert(&self, plate: &str) -> bool {
626        sqlx::query_scalar::<_, i64>(
627            "SELECT COUNT(*) FROM watchlist WHERE plate_norm = ? AND active = 1 AND kind = 'alert'",
628        )
629        .bind(plate)
630        .fetch_one(&self.pool)
631        .await
632        .unwrap_or(0)
633            > 0
634    }
635
636    /// Copy the latest sampled frame (prefer main stream) as evidence; return its served URL.
637    async fn copy_evidence(&self, camera_id: &str, id: &str) -> Option<String> {
638        let dir = self.cfg.camera_frames_dir(camera_id);
639        let filename = format!("entryevt_{id}.jpg");
640        let dst = self.cfg.snapshots_dir.join(&filename);
641        for profile in ["main", "sub"] {
642            let src = dir.join(format!("latest_{profile}.jpg"));
643            if tokio::fs::copy(&src, &dst).await.is_ok() {
644                return Some(format!("/media/snapshots/{filename}"));
645            }
646        }
647        None
648    }
649}
650
651#[derive(sqlx::FromRow)]
652struct RegVehicle {
653    id: String,
654    vehicle_type: Option<String>,
655    color: Option<String>,
656    valid_from: Option<DateTime<Utc>>,
657    valid_until: Option<DateTime<Utc>>,
658}
659
660struct Resolution {
661    auth_status: String,
662    workflow_status: String,
663    severity: String,
664    source: String,
665    authorization: Value,
666}
667
668impl Resolution {
669    fn unmatched(authorization: Value) -> Self {
670        Resolution {
671            auth_status: "unmatched".into(),
672            workflow_status: "pending".into(),
673            severity: "warning".into(),
674            source: "none".into(),
675            authorization,
676        }
677    }
678}
679
680/// Record an attribute mismatch only when BOTH sides are known and differ (case-insensitive).
681fn check_mismatch(
682    out: &mut Vec<String>,
683    field: &str,
684    registered: Option<&str>,
685    detected: Option<&str>,
686) {
687    if let (Some(r), Some(d)) = (registered, detected) {
688        if !r.trim().is_empty() && !d.trim().is_empty() && !r.trim().eq_ignore_ascii_case(d.trim())
689        {
690            out.push(format!("{field}: registered={r}, detected={d}"));
691        }
692    }
693}
694
695/// Pick the winning plate for a track: most votes, tie-broken by summed confidence. Plausible plates
696/// are preferred over implausible ones (so a noisy digits-only reading can't mask a real plate); the
697/// overall vote leader is used only when no candidate is plausible. Returns (plate_norm, votes, avg_conf).
698fn winning_plate(votes: &HashMap<String, PlateVote>) -> Option<(String, u32, f64)> {
699    let leader = |plausible_only: bool| {
700        votes
701            .iter()
702            .filter(|(norm, _)| !plausible_only || is_plausible_plate(norm))
703            .max_by(|a, b| {
704                a.1.count.cmp(&b.1.count).then(
705                    a.1.conf_sum
706                        .partial_cmp(&b.1.conf_sum)
707                        .unwrap_or(std::cmp::Ordering::Equal),
708                )
709            })
710    };
711    let (norm, vote) = leader(true).or_else(|| leader(false))?;
712    let avg = if vote.count > 0 {
713        vote.conf_sum / vote.count as f64
714    } else {
715        0.0
716    };
717    Some((norm.clone(), vote.count, avg))
718}
719
720fn build_job(key: &str, st: &TrackVoteState) -> Option<CommitJob> {
721    let (plate_norm, _count, plate_conf) = winning_plate(&st.votes)?;
722    let plate_raw = st
723        .raw_by_norm
724        .get(&plate_norm)
725        .cloned()
726        .unwrap_or_else(|| plate_norm.clone());
727    Some(CommitJob {
728        key: key.to_string(),
729        uid: st.uid,
730        camera_id: st.camera_id.clone(),
731        site_id: st.site_id.clone(),
732        track: st.track.clone(),
733        direction: st.direction.clone(),
734        plate_norm,
735        plate_raw,
736        plate_conf,
737        vehicle_type: st.attrs.vehicle_type.as_ref().map(|(v, _)| v.clone()),
738        color: st.attrs.color.as_ref().map(|(v, _)| v.clone()),
739        make: st.attrs.make.as_ref().map(|(v, _)| v.clone()),
740        model: st.attrs.model.as_ref().map(|(v, _)| v.clone()),
741        model_versions: st.model_versions.clone(),
742    })
743}
744
745#[cfg(test)]
746mod tests {
747    use super::*;
748
749    fn votes(pairs: &[(&str, u32, f64)]) -> HashMap<String, PlateVote> {
750        pairs
751            .iter()
752            .map(|(p, c, s)| {
753                (
754                    p.to_string(),
755                    PlateVote {
756                        count: *c,
757                        conf_sum: *s,
758                    },
759                )
760            })
761            .collect()
762    }
763
764    #[test]
765    fn normalize_strips_and_uppercases() {
766        assert_eq!(normalize_plate("abc 1234"), "ABC1234");
767        assert_eq!(normalize_plate("W-XY 88.88"), "WXY8888");
768        assert_eq!(normalize_plate(""), "");
769    }
770
771    #[test]
772    fn winning_plate_prefers_plausible_over_higher_voted_implausible() {
773        // "12345" has more votes but is implausible; the plausible "ABC1234" must win.
774        let v = votes(&[("12345", 5, 4.0), ("ABC1234", 2, 1.8)]);
775        let (norm, count, _) = winning_plate(&v).unwrap();
776        assert_eq!(norm, "ABC1234");
777        assert_eq!(count, 2);
778    }
779
780    #[test]
781    fn winning_plate_picks_top_votes_among_plausible() {
782        let v = votes(&[("ABC1234", 2, 1.8), ("ABD1234", 5, 4.5)]);
783        let (norm, count, _) = winning_plate(&v).unwrap();
784        assert_eq!(norm, "ABD1234");
785        assert_eq!(count, 5);
786    }
787
788    #[test]
789    fn winning_plate_none_when_no_votes() {
790        assert!(winning_plate(&HashMap::new()).is_none());
791    }
792
793    #[test]
794    fn plausibility_requires_alpha_and_digit() {
795        assert!(is_plausible_plate("ABC1234"));
796        assert!(is_plausible_plate("WA12B"));
797        assert!(!is_plausible_plate("1234"));
798        assert!(!is_plausible_plate("ABCDE"));
799        assert!(!is_plausible_plate("A1"));
800        assert!(!is_plausible_plate("ABCDEFGHIJK1"));
801    }
802
803    #[test]
804    fn mismatch_only_when_both_known_and_differ() {
805        let mut m = Vec::new();
806        check_mismatch(&mut m, "color", Some("white"), Some("black"));
807        check_mismatch(&mut m, "color", Some("white"), Some("WHITE"));
808        check_mismatch(&mut m, "vehicle_type", None, Some("suv"));
809        check_mismatch(&mut m, "vehicle_type", Some("car"), None);
810        assert_eq!(m.len(), 1);
811        assert!(m[0].contains("color"));
812    }
813}