Skip to main content

heldar_kernel/services/
fanout.rs

1//! Durable perception fan-out drainer.
2//!
3//! Detection ingest fans a committed batch out to consumers (zones / ANPR / movement) AFTER the
4//! transaction commits — so a process crash between commit and fan-out would drop the consumer
5//! notification entirely. This loop closes that gap: it periodically replays any `outbox` batch whose
6//! `fanned_out_at` is still NULL (a batch that was committed but never fully fanned out), rebuilding
7//! the batch from the persisted detections and driving the consumers.
8//!
9//! Replay is safe because [`consumer::fan_out`] claims each `(consumer, camera_id, frame_id)`
10//! at-most-once via `consumer_fanout`: a consumer that already processed a frame is skipped, so a
11//! replay never double-drives it. Only batches with a `frame_id` (the worker's idempotency key) are
12//! replayable; batches without one are inline-only.
13
14use std::sync::Arc;
15use std::time::Duration;
16
17use chrono::Utc;
18use sqlx::SqlitePool;
19
20use crate::models::{Detection, DetectionIngest};
21use crate::services::consumer::{fan_out, DetectionBatch, DetectionConsumer};
22
23/// How long a batch must sit un-fanned before the drainer replays it. Keeps the drainer from racing
24/// the inline ingest fan-out for batches that are simply mid-flight.
25const REPLAY_GRACE_SECS: i64 = 10;
26const DRAIN_INTERVAL_SECS: u64 = 15;
27const DRAIN_BATCH: i64 = 200;
28
29/// One un-fanned outbox batch to replay: `(seq, camera_id, site_id, frame_id, task_type)`.
30type UnfannedBatch = (i64, String, Option<String>, String, Option<String>);
31
32pub async fn run(pool: SqlitePool, consumers: Arc<Vec<Arc<dyn DetectionConsumer>>>) {
33    let mut tick = tokio::time::interval(Duration::from_secs(DRAIN_INTERVAL_SECS));
34    loop {
35        tick.tick().await;
36        if let Err(e) = drain(&pool, &consumers).await {
37            tracing::warn!(error = %e, "fanout drainer: drain failed");
38        }
39    }
40}
41
42async fn drain(pool: &SqlitePool, consumers: &[Arc<dyn DetectionConsumer>]) -> anyhow::Result<()> {
43    let cutoff = Utc::now() - chrono::Duration::seconds(REPLAY_GRACE_SECS);
44    // Committed-but-not-fanned detection batches, replayable (have an idempotency key), past the grace.
45    let rows: Vec<UnfannedBatch> = sqlx::query_as(
46        "SELECT seq, camera_id, site_id, frame_id, task_type FROM outbox
47         WHERE fanned_out_at IS NULL AND topic = 'detections'
48           AND camera_id IS NOT NULL AND frame_id IS NOT NULL
49           AND created_at < ?
50         ORDER BY seq ASC
51         LIMIT ?",
52    )
53    .bind(cutoff)
54    .bind(DRAIN_BATCH)
55    .fetch_all(pool)
56    .await?;
57    if rows.is_empty() {
58        return Ok(());
59    }
60    tracing::info!(
61        count = rows.len(),
62        "fanout drainer: replaying detection batches whose fan-out did not complete"
63    );
64    for (seq, camera_id, site_id, frame_id, task_type) in rows {
65        let task_type = task_type.unwrap_or_default();
66        let dets: Vec<Detection> = sqlx::query_as(
67            "SELECT * FROM detections WHERE camera_id = ? AND frame_id = ? ORDER BY id ASC",
68        )
69        .bind(&camera_id)
70        .bind(&frame_id)
71        .fetch_all(pool)
72        .await?;
73        // Reconstruct the worker-shaped batch from persisted rows. Use the detections' own capture
74        // time (all share the ingest `ts`); fall back to now for a detection-less (event-only) batch.
75        let ts = dets.first().map(|d| d.timestamp).unwrap_or_else(Utc::now);
76        let ingest: Vec<DetectionIngest> = dets
77            .into_iter()
78            .map(|d| DetectionIngest {
79                label: d.label,
80                confidence: d.confidence,
81                bbox: d.bbox.map(|j| j.0),
82                track_id: d.track_id,
83                attributes: Some(d.attributes.0),
84            })
85            .collect();
86        let batch = DetectionBatch {
87            camera_id: &camera_id,
88            site_id: site_id.as_deref(),
89            task_type: &task_type,
90            detections: &ingest,
91            timestamp: ts,
92        };
93        let complete = fan_out(pool, consumers, &batch, Some(&frame_id)).await;
94        if complete {
95            let _ = sqlx::query("UPDATE outbox SET fanned_out_at = ? WHERE seq = ?")
96                .bind(Utc::now())
97                .bind(seq)
98                .execute(pool)
99                .await;
100        }
101    }
102    Ok(())
103}
104
105#[cfg(test)]
106mod tests {
107    use super::*;
108    use std::sync::atomic::{AtomicUsize, Ordering};
109
110    struct Counter {
111        hits: Arc<AtomicUsize>,
112    }
113    #[async_trait::async_trait]
114    impl DetectionConsumer for Counter {
115        fn name(&self) -> &'static str {
116            "test-counter"
117        }
118        fn interested_in(&self, _task_type: &str) -> bool {
119            true
120        }
121        async fn consume(&self, _batch: &DetectionBatch<'_>) {
122            self.hits.fetch_add(1, Ordering::SeqCst);
123        }
124    }
125
126    async fn mem_pool() -> SqlitePool {
127        let pool = sqlx::sqlite::SqlitePoolOptions::new()
128            .max_connections(1)
129            .connect("sqlite::memory:")
130            .await
131            .unwrap();
132        crate::db::run_migrations(&pool).await.unwrap();
133        pool
134    }
135
136    async fn seed_unfanned_batch(pool: &SqlitePool, camera: &str, frame: &str) {
137        let old = Utc::now() - chrono::Duration::seconds(60); // past the replay grace
138                                                              // detections.camera_id REFERENCES cameras(id) and sqlx enables foreign_keys by default.
139        sqlx::query(
140            "INSERT INTO cameras (id, name, retention_hours, storage_quota_bytes, created_at, updated_at)
141             VALUES (?, ?, 168, NULL, ?, ?)",
142        )
143        .bind(camera).bind(camera).bind(old).bind(old).execute(pool).await.unwrap();
144        sqlx::query(
145            "INSERT INTO outbox (topic, camera_id, site_id, frame_id, task_type, detection_count, created_at)
146             VALUES ('detections', ?, NULL, ?, 'object_detection', 1, ?)",
147        )
148        .bind(camera).bind(frame).bind(old).execute(pool).await.unwrap();
149        sqlx::query(
150            "INSERT INTO detections (id, camera_id, task_type, timestamp, label, confidence, bbox, track_id, attributes, frame_id, created_at)
151             VALUES (?, ?, 'object_detection', ?, 'car', 0.9, NULL, NULL, '{}', ?, ?)",
152        )
153        .bind(format!("det_{frame}")).bind(camera).bind(old).bind(frame).bind(old)
154        .execute(pool).await.unwrap();
155    }
156
157    #[tokio::test]
158    async fn drain_replays_unfanned_batch_exactly_once() {
159        let pool = mem_pool().await;
160        seed_unfanned_batch(&pool, "cam1", "frameA").await;
161        let hits = Arc::new(AtomicUsize::new(0));
162        let consumers: Vec<Arc<dyn DetectionConsumer>> =
163            vec![Arc::new(Counter { hits: hits.clone() })];
164
165        // First drain: the un-fanned batch is replayed once.
166        drain(&pool, &consumers).await.unwrap();
167        assert_eq!(
168            hits.load(Ordering::SeqCst),
169            1,
170            "batch should be fanned once"
171        );
172
173        // The batch is now marked fanned, so a second drain finds nothing.
174        drain(&pool, &consumers).await.unwrap();
175        assert_eq!(
176            hits.load(Ordering::SeqCst),
177            1,
178            "no replay once marked fanned"
179        );
180
181        // Even if the batch is forced back to un-fanned (e.g. the mark write was lost on a crash), the
182        // per-consumer dedup in consumer_fanout must still prevent a second consume of the same frame.
183        sqlx::query("UPDATE outbox SET fanned_out_at = NULL")
184            .execute(&pool)
185            .await
186            .unwrap();
187        drain(&pool, &consumers).await.unwrap();
188        assert_eq!(
189            hits.load(Ordering::SeqCst),
190            1,
191            "consumer_fanout dedup must prevent re-driving the same (consumer, frame)"
192        );
193    }
194}