Skip to main content

heldar_kernel/services/
consumer.rs

1//! The kernel's perception-consumer seam.
2//!
3//! The media + AI-task kernel is domain-agnostic: it samples frames, accepts detections from workers,
4//! and persists them. Anything that *interprets* those detections into domain events — the zone engine
5//! (spatial rules), the ANPR/access-control engine (plate authorization), and other domain
6//! verticals — plug in here as a [`DetectionConsumer`] rather than being wired
7//! into the ingest handler or [`crate::state::AppState`] directly.
8//!
9//! This inverts the dependency: ingest fans a committed batch out to a registry of consumers; a
10//! consumer self-declares which `task_type`s it cares about. New apps are *added* to the registry (and,
11//! after the crate split, linked in by the composing binary or fed over the event stream) — the kernel
12//! never gains an `if task_type == "..."` branch.
13
14use std::sync::Arc;
15
16use chrono::{DateTime, Utc};
17use sqlx::SqlitePool;
18
19use crate::models::DetectionIngest;
20
21/// One committed batch of detections handed to consumers after it is persisted. Carries the
22/// site/camera/task context so a consumer needs no extra lookups (and so the seam is tenant-aware for
23/// distributed deployments).
24pub struct DetectionBatch<'a> {
25    pub camera_id: &'a str,
26    pub site_id: Option<&'a str>,
27    /// The task type that produced this batch (consumers self-select on it; a vertical may also
28    /// inspect it). Part of the stable seam contract.
29    pub task_type: &'a str,
30    pub detections: &'a [DetectionIngest],
31    /// Worker-supplied capture time (engines that need trustworthy timing use server time instead;
32    /// time-windowing consumers use this).
33    pub timestamp: DateTime<Utc>,
34}
35
36/// A pluggable interpreter of detection batches. Implementors live in their own module/crate (zones is
37/// kernel-open; ANPR/entry is a proprietary app) and are registered into [`crate::state::AppState`].
38#[async_trait::async_trait]
39pub trait DetectionConsumer: Send + Sync {
40    /// Stable name for logs/metrics.
41    fn name(&self) -> &'static str;
42
43    /// Whether this consumer wants batches of the given `task_type`. Return `true` for all types when
44    /// the consumer is task-agnostic (e.g. the zone engine evaluates any tracked detection).
45    fn interested_in(&self, task_type: &str) -> bool;
46
47    /// Process a persisted batch. Must not panic; errors are the consumer's own to log/handle.
48    async fn consume(&self, batch: &DetectionBatch<'_>);
49}
50
51/// Drive every interested consumer for one committed batch, AT MOST ONCE per
52/// `(consumer, camera_id, frame_id)`.
53///
54/// This is the single fan-out path used by both the ingest handler (inline, low-latency) and the
55/// durable drainer ([`crate::services::fanout`], which replays batches whose fan-out didn't complete
56/// because the process crashed between commit and fan-out). The per-consumer claim row in
57/// `consumer_fanout` is what makes replay safe: a consumer that already saw this frame is skipped, so
58/// a replayed batch never double-drives it (no double-counted ANPR votes / zone state).
59///
60/// A batch without a `frame_id` cannot be deduped, so its consumers are driven directly and it is not
61/// eligible for durable replay (the worker opted out of the idempotency key).
62///
63/// Returns `true` when every interested consumer was driven or was already done — i.e. the batch is
64/// fully fanned out and may be marked complete. Returns `false` if a dedup-claim write failed for some
65/// consumer, so the caller leaves the batch un-fanned for the drainer to retry (the consumers that
66/// already succeeded stay claimed and will not re-run).
67pub async fn fan_out(
68    pool: &SqlitePool,
69    consumers: &[Arc<dyn DetectionConsumer>],
70    batch: &DetectionBatch<'_>,
71    frame_id: Option<&str>,
72) -> bool {
73    let mut complete = true;
74    for consumer in consumers {
75        if !consumer.interested_in(batch.task_type) {
76            continue;
77        }
78        if let Some(fid) = frame_id {
79            // Claim (consumer, camera, frame) before driving. ON CONFLICT DO NOTHING => rows_affected
80            // is 1 only for the first claim; a replay sees 0 and skips. SQLite serializes the insert,
81            // so two racing fan-outs of the same frame can't both claim it.
82            match sqlx::query(
83                "INSERT INTO consumer_fanout (consumer, camera_id, frame_id, fanned_at)
84                 VALUES (?, ?, ?, ?) ON CONFLICT DO NOTHING",
85            )
86            .bind(consumer.name())
87            .bind(batch.camera_id)
88            .bind(fid)
89            .bind(Utc::now())
90            .execute(pool)
91            .await
92            {
93                Ok(r) if r.rows_affected() == 1 => {} // freshly claimed -> drive below
94                Ok(_) => continue, // already fanned to this consumer for this frame
95                Err(e) => {
96                    // Couldn't claim; skip to avoid double-processing and leave the batch un-fanned so
97                    // the drainer retries (the consumer never ran, so no work is lost).
98                    tracing::warn!(consumer = consumer.name(), error = %e, "fan-out dedup claim failed");
99                    complete = false;
100                    continue;
101                }
102            }
103        }
104        tracing::trace!(consumer = consumer.name(), task_type = %batch.task_type, "fan-out");
105        consumer.consume(batch).await;
106    }
107    complete
108}