heldar_kernel/services/consumer.rs
1//! The kernel's perception-consumer seam.
2//!
3//! The media + AI-task kernel is domain-agnostic: it samples frames, accepts detections from workers,
4//! and persists them. Anything that *interprets* those detections into domain events — the zone engine
5//! (spatial rules), the ANPR/access-control engine (plate authorization), and other domain
6//! verticals — plug in here as a [`DetectionConsumer`] rather than being wired
7//! into the ingest handler or [`crate::state::AppState`] directly.
8//!
9//! This inverts the dependency: ingest fans a committed batch out to a registry of consumers; a
10//! consumer self-declares which `task_type`s it cares about. New apps are *added* to the registry (and,
11//! after the crate split, linked in by the composing binary or fed over the event stream) — the kernel
12//! never gains an `if task_type == "..."` branch.
13
14use std::sync::Arc;
15
16use chrono::{DateTime, Utc};
17use sqlx::SqlitePool;
18
19use crate::models::DetectionIngest;
20
21/// One committed batch of detections handed to consumers after it is persisted. Carries the
22/// site/camera/task context so a consumer needs no extra lookups (and so the seam is tenant-aware for
23/// distributed deployments).
24pub struct DetectionBatch<'a> {
25 pub camera_id: &'a str,
26 pub site_id: Option<&'a str>,
27 /// The task type that produced this batch (consumers self-select on it; a vertical may also
28 /// inspect it). Part of the stable seam contract.
29 pub task_type: &'a str,
30 pub detections: &'a [DetectionIngest],
31 /// Worker-supplied capture time (engines that need trustworthy timing use server time instead;
32 /// time-windowing consumers use this).
33 pub timestamp: DateTime<Utc>,
34}
35
36/// A pluggable interpreter of detection batches. Implementors live in their own module/crate (zones is
37/// kernel-open; ANPR/entry is a proprietary app) and are registered into [`crate::state::AppState`].
38#[async_trait::async_trait]
39pub trait DetectionConsumer: Send + Sync {
40 /// Stable name for logs/metrics.
41 fn name(&self) -> &'static str;
42
43 /// Whether this consumer wants batches of the given `task_type`. Return `true` for all types when
44 /// the consumer is task-agnostic (e.g. the zone engine evaluates any tracked detection).
45 fn interested_in(&self, task_type: &str) -> bool;
46
47 /// Process a persisted batch. Must not panic; errors are the consumer's own to log/handle.
48 async fn consume(&self, batch: &DetectionBatch<'_>);
49}
50
51/// Drive every interested consumer for one committed batch, AT MOST ONCE per
52/// `(consumer, camera_id, frame_id)`.
53///
54/// This is the single fan-out path used by both the ingest handler (inline, low-latency) and the
55/// durable drainer ([`crate::services::fanout`], which replays batches whose fan-out didn't complete
56/// because the process crashed between commit and fan-out). The per-consumer claim row in
57/// `consumer_fanout` is what makes replay safe: a consumer that already saw this frame is skipped, so
58/// a replayed batch never double-drives it (no double-counted ANPR votes / zone state).
59///
60/// A batch without a `frame_id` cannot be deduped, so its consumers are driven directly and it is not
61/// eligible for durable replay (the worker opted out of the idempotency key).
62///
63/// Returns `true` when every interested consumer was driven or was already done — i.e. the batch is
64/// fully fanned out and may be marked complete. Returns `false` if a dedup-claim write failed for some
65/// consumer, so the caller leaves the batch un-fanned for the drainer to retry (the consumers that
66/// already succeeded stay claimed and will not re-run).
67pub async fn fan_out(
68 pool: &SqlitePool,
69 consumers: &[Arc<dyn DetectionConsumer>],
70 batch: &DetectionBatch<'_>,
71 frame_id: Option<&str>,
72) -> bool {
73 let mut complete = true;
74 for consumer in consumers {
75 if !consumer.interested_in(batch.task_type) {
76 continue;
77 }
78 if let Some(fid) = frame_id {
79 // Claim (consumer, camera, frame) before driving. ON CONFLICT DO NOTHING => rows_affected
80 // is 1 only for the first claim; a replay sees 0 and skips. SQLite serializes the insert,
81 // so two racing fan-outs of the same frame can't both claim it.
82 match sqlx::query(
83 "INSERT INTO consumer_fanout (consumer, camera_id, frame_id, fanned_at)
84 VALUES (?, ?, ?, ?) ON CONFLICT DO NOTHING",
85 )
86 .bind(consumer.name())
87 .bind(batch.camera_id)
88 .bind(fid)
89 .bind(Utc::now())
90 .execute(pool)
91 .await
92 {
93 Ok(r) if r.rows_affected() == 1 => {} // freshly claimed -> drive below
94 Ok(_) => continue, // already fanned to this consumer for this frame
95 Err(e) => {
96 // Couldn't claim; skip to avoid double-processing and leave the batch un-fanned so
97 // the drainer retries (the consumer never ran, so no work is lost).
98 tracing::warn!(consumer = consumer.name(), error = %e, "fan-out dedup claim failed");
99 complete = false;
100 continue;
101 }
102 }
103 }
104 tracing::trace!(consumer = consumer.name(), task_type = %batch.task_type, "fan-out");
105 consumer.consume(batch).await;
106 }
107 complete
108}