1use std::sync::Arc;
15use std::time::Duration;
16
17use chrono::Utc;
18use sqlx::SqlitePool;
19
20use crate::models::{Detection, DetectionIngest};
21use crate::services::consumer::{fan_out, DetectionBatch, DetectionConsumer};
22
23const REPLAY_GRACE_SECS: i64 = 10;
26const DRAIN_INTERVAL_SECS: u64 = 15;
27const DRAIN_BATCH: i64 = 200;
28
29type UnfannedBatch = (i64, String, Option<String>, String, Option<String>);
31
32pub async fn run(pool: SqlitePool, consumers: Arc<Vec<Arc<dyn DetectionConsumer>>>) {
33 let mut tick = tokio::time::interval(Duration::from_secs(DRAIN_INTERVAL_SECS));
34 loop {
35 tick.tick().await;
36 if let Err(e) = drain(&pool, &consumers).await {
37 tracing::warn!(error = %e, "fanout drainer: drain failed");
38 }
39 }
40}
41
42async fn drain(pool: &SqlitePool, consumers: &[Arc<dyn DetectionConsumer>]) -> anyhow::Result<()> {
43 let cutoff = Utc::now() - chrono::Duration::seconds(REPLAY_GRACE_SECS);
44 let rows: Vec<UnfannedBatch> = sqlx::query_as(
46 "SELECT seq, camera_id, site_id, frame_id, task_type FROM outbox
47 WHERE fanned_out_at IS NULL AND topic = 'detections'
48 AND camera_id IS NOT NULL AND frame_id IS NOT NULL
49 AND created_at < ?
50 ORDER BY seq ASC
51 LIMIT ?",
52 )
53 .bind(cutoff)
54 .bind(DRAIN_BATCH)
55 .fetch_all(pool)
56 .await?;
57 if rows.is_empty() {
58 return Ok(());
59 }
60 tracing::info!(
61 count = rows.len(),
62 "fanout drainer: replaying detection batches whose fan-out did not complete"
63 );
64 for (seq, camera_id, site_id, frame_id, task_type) in rows {
65 let task_type = task_type.unwrap_or_default();
66 let dets: Vec<Detection> = sqlx::query_as(
67 "SELECT * FROM detections WHERE camera_id = ? AND frame_id = ? ORDER BY id ASC",
68 )
69 .bind(&camera_id)
70 .bind(&frame_id)
71 .fetch_all(pool)
72 .await?;
73 let ts = dets.first().map(|d| d.timestamp).unwrap_or_else(Utc::now);
76 let ingest: Vec<DetectionIngest> = dets
77 .into_iter()
78 .map(|d| DetectionIngest {
79 label: d.label,
80 confidence: d.confidence,
81 bbox: d.bbox.map(|j| j.0),
82 track_id: d.track_id,
83 attributes: Some(d.attributes.0),
84 })
85 .collect();
86 let batch = DetectionBatch {
87 camera_id: &camera_id,
88 site_id: site_id.as_deref(),
89 task_type: &task_type,
90 detections: &ingest,
91 timestamp: ts,
92 };
93 let complete = fan_out(pool, consumers, &batch, Some(&frame_id)).await;
94 if complete {
95 let _ = sqlx::query("UPDATE outbox SET fanned_out_at = ? WHERE seq = ?")
96 .bind(Utc::now())
97 .bind(seq)
98 .execute(pool)
99 .await;
100 }
101 }
102 Ok(())
103}
104
105#[cfg(test)]
106mod tests {
107 use super::*;
108 use std::sync::atomic::{AtomicUsize, Ordering};
109
110 struct Counter {
111 hits: Arc<AtomicUsize>,
112 }
113 #[async_trait::async_trait]
114 impl DetectionConsumer for Counter {
115 fn name(&self) -> &'static str {
116 "test-counter"
117 }
118 fn interested_in(&self, _task_type: &str) -> bool {
119 true
120 }
121 async fn consume(&self, _batch: &DetectionBatch<'_>) {
122 self.hits.fetch_add(1, Ordering::SeqCst);
123 }
124 }
125
126 async fn mem_pool() -> SqlitePool {
127 let pool = sqlx::sqlite::SqlitePoolOptions::new()
128 .max_connections(1)
129 .connect("sqlite::memory:")
130 .await
131 .unwrap();
132 crate::db::run_migrations(&pool).await.unwrap();
133 pool
134 }
135
136 async fn seed_unfanned_batch(pool: &SqlitePool, camera: &str, frame: &str) {
137 let old = Utc::now() - chrono::Duration::seconds(60); sqlx::query(
140 "INSERT INTO cameras (id, name, retention_hours, storage_quota_bytes, created_at, updated_at)
141 VALUES (?, ?, 168, NULL, ?, ?)",
142 )
143 .bind(camera).bind(camera).bind(old).bind(old).execute(pool).await.unwrap();
144 sqlx::query(
145 "INSERT INTO outbox (topic, camera_id, site_id, frame_id, task_type, detection_count, created_at)
146 VALUES ('detections', ?, NULL, ?, 'object_detection', 1, ?)",
147 )
148 .bind(camera).bind(frame).bind(old).execute(pool).await.unwrap();
149 sqlx::query(
150 "INSERT INTO detections (id, camera_id, task_type, timestamp, label, confidence, bbox, track_id, attributes, frame_id, created_at)
151 VALUES (?, ?, 'object_detection', ?, 'car', 0.9, NULL, NULL, '{}', ?, ?)",
152 )
153 .bind(format!("det_{frame}")).bind(camera).bind(old).bind(frame).bind(old)
154 .execute(pool).await.unwrap();
155 }
156
157 #[tokio::test]
158 async fn drain_replays_unfanned_batch_exactly_once() {
159 let pool = mem_pool().await;
160 seed_unfanned_batch(&pool, "cam1", "frameA").await;
161 let hits = Arc::new(AtomicUsize::new(0));
162 let consumers: Vec<Arc<dyn DetectionConsumer>> =
163 vec![Arc::new(Counter { hits: hits.clone() })];
164
165 drain(&pool, &consumers).await.unwrap();
167 assert_eq!(
168 hits.load(Ordering::SeqCst),
169 1,
170 "batch should be fanned once"
171 );
172
173 drain(&pool, &consumers).await.unwrap();
175 assert_eq!(
176 hits.load(Ordering::SeqCst),
177 1,
178 "no replay once marked fanned"
179 );
180
181 sqlx::query("UPDATE outbox SET fanned_out_at = NULL")
184 .execute(&pool)
185 .await
186 .unwrap();
187 drain(&pool, &consumers).await.unwrap();
188 assert_eq!(
189 hits.load(Ordering::SeqCst),
190 1,
191 "consumer_fanout dedup must prevent re-driving the same (consumer, frame)"
192 );
193 }
194}