Skip to main content

heldar_kernel/routes/
ai.rs

1//! Stage 2 AI surface: AI task CRUD, the worker contract (discover tasks, pull the latest sampled
2//! frame, post detections/events back), sampler status, and a detections query.
3
4use axum::body::Body;
5use axum::extract::{DefaultBodyLimit, Path, Query, State};
6use axum::http::{header, StatusCode};
7use axum::response::Response;
8use axum::routing::{get, post};
9use axum::{Json, Router};
10use chrono::Utc;
11use serde::{Deserialize, Serialize};
12use serde_json::{json, Value};
13use sqlx::types::Json as SqlxJson;
14use uuid::Uuid;
15
16use crate::auth::{self, Principal};
17use crate::error::{AppError, AppResult};
18use crate::models::{AiIngest, AiTask, AiTaskCreate, AiTaskUpdate, Detection};
19use crate::routes::cameras::load_camera;
20use crate::services::sampler::SamplerInfo;
21use crate::state::AppState;
22
23pub fn router() -> Router<AppState> {
24    Router::new()
25        .route(
26            "/api/v1/cameras/{id}/ai-tasks",
27            get(list_camera_tasks).post(create_task),
28        )
29        .route(
30            "/api/v1/ai-tasks/{task_id}",
31            axum::routing::patch(update_task).delete(delete_task),
32        )
33        .route("/api/v1/ai/tasks", get(list_all_tasks))
34        .route("/api/v1/ai/samplers", get(sampler_status))
35        // Bound the ingest body BEFORE deserialization so a hostile/buggy worker can't force a huge
36        // allocation (the MAX_INGEST_DETECTIONS count check only runs after the body is fully parsed).
37        // Generous headroom for MAX_INGEST_DETECTIONS rich detections; well under any real batch.
38        .route(
39            "/api/v1/ai/events",
40            post(ingest).layer(DefaultBodyLimit::max(INGEST_BODY_LIMIT_BYTES)),
41        )
42        .route("/api/v1/cameras/{id}/frame", get(latest_frame))
43        .route("/api/v1/cameras/{id}/detections", get(list_detections))
44}
45
46fn validate_profile(p: &str) -> AppResult<()> {
47    if matches!(p, "sub" | "main") {
48        Ok(())
49    } else {
50        Err(AppError::BadRequest(
51            "`stream_profile` must be 'sub' or 'main'".into(),
52        ))
53    }
54}
55
56async fn list_camera_tasks(
57    State(st): State<AppState>,
58    Path(id): Path<String>,
59    principal: Principal,
60) -> AppResult<Json<Vec<AiTask>>> {
61    principal.require(principal.can_view(), "view AI tasks")?;
62    let _ = load_camera(&st.pool, &id).await?;
63    let tasks = sqlx::query_as::<_, AiTask>(
64        "SELECT * FROM ai_tasks WHERE camera_id = ? ORDER BY created_at ASC",
65    )
66    .bind(&id)
67    .fetch_all(&st.pool)
68    .await?;
69    Ok(Json(tasks))
70}
71
72async fn create_task(
73    State(st): State<AppState>,
74    Path(id): Path<String>,
75    principal: Principal,
76    Json(body): Json<AiTaskCreate>,
77) -> AppResult<(StatusCode, Json<AiTask>)> {
78    principal.require(principal.can_manage_registry(), "create AI tasks")?;
79    let _ = load_camera(&st.pool, &id).await?;
80    if body.task_type.trim().is_empty() {
81        return Err(AppError::BadRequest("`task_type` is required".into()));
82    }
83    let profile = body.stream_profile.unwrap_or_else(|| "sub".into());
84    validate_profile(&profile)?;
85    let fps = body.fps.unwrap_or(st.cfg.default_ai_fps).clamp(0.1, 30.0);
86    let width = body
87        .width
88        .unwrap_or(st.cfg.default_ai_width)
89        .clamp(160, 3840);
90    let enabled = body.enabled.unwrap_or(true);
91    let config = SqlxJson(body.config.unwrap_or_else(|| json!({})));
92    let now = Utc::now();
93    let task_id = format!("ai_{}", Uuid::new_v4().simple());
94
95    sqlx::query(
96        "INSERT INTO ai_tasks
97           (id, camera_id, task_type, enabled, stream_profile, fps, width, config, created_at, updated_at)
98         VALUES (?,?,?,?,?,?,?,?,?,?)",
99    )
100    .bind(&task_id)
101    .bind(&id)
102    .bind(&body.task_type)
103    .bind(enabled)
104    .bind(&profile)
105    .bind(fps)
106    .bind(width)
107    .bind(config)
108    .bind(now)
109    .bind(now)
110    .execute(&st.pool)
111    .await?;
112
113    st.sampler.reconcile().await;
114    let task = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
115        .bind(&task_id)
116        .fetch_one(&st.pool)
117        .await?;
118    auth::audit(
119        &st.pool,
120        &principal,
121        "create_ai_task",
122        "ai_task",
123        &task_id,
124        json!({ "camera_id": &id, "task_type": &task.task_type }),
125    )
126    .await;
127    Ok((StatusCode::CREATED, Json(task)))
128}
129
130async fn update_task(
131    State(st): State<AppState>,
132    Path(task_id): Path<String>,
133    principal: Principal,
134    Json(body): Json<AiTaskUpdate>,
135) -> AppResult<Json<AiTask>> {
136    principal.require(principal.can_manage_registry(), "update AI tasks")?;
137    let cur = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
138        .bind(&task_id)
139        .fetch_optional(&st.pool)
140        .await?
141        .ok_or_else(|| AppError::NotFound(format!("ai task {task_id} not found")))?;
142
143    let task_type = body.task_type.unwrap_or(cur.task_type);
144    let profile = body.stream_profile.unwrap_or(cur.stream_profile);
145    validate_profile(&profile)?;
146    let fps = body.fps.map(|v| v.clamp(0.1, 30.0)).unwrap_or(cur.fps);
147    let width = body.width.map(|v| v.clamp(160, 3840)).unwrap_or(cur.width);
148    let enabled = body.enabled.unwrap_or(cur.enabled);
149    let config = SqlxJson(body.config.unwrap_or(cur.config.0));
150
151    sqlx::query(
152        "UPDATE ai_tasks SET task_type=?, stream_profile=?, fps=?, width=?, enabled=?, config=?, updated_at=?
153         WHERE id=?",
154    )
155    .bind(&task_type)
156    .bind(&profile)
157    .bind(fps)
158    .bind(width)
159    .bind(enabled)
160    .bind(config)
161    .bind(Utc::now())
162    .bind(&task_id)
163    .execute(&st.pool)
164    .await?;
165
166    st.sampler.reconcile().await;
167    let task = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
168        .bind(&task_id)
169        .fetch_one(&st.pool)
170        .await?;
171    auth::audit(
172        &st.pool,
173        &principal,
174        "update_ai_task",
175        "ai_task",
176        &task_id,
177        json!({}),
178    )
179    .await;
180    Ok(Json(task))
181}
182
183async fn delete_task(
184    State(st): State<AppState>,
185    Path(task_id): Path<String>,
186    principal: Principal,
187) -> AppResult<StatusCode> {
188    principal.require(principal.can_manage_registry(), "delete AI tasks")?;
189    let res = sqlx::query("DELETE FROM ai_tasks WHERE id = ?")
190        .bind(&task_id)
191        .execute(&st.pool)
192        .await?;
193    if res.rows_affected() == 0 {
194        return Err(AppError::NotFound(format!("ai task {task_id} not found")));
195    }
196    st.sampler.reconcile().await;
197    auth::audit(
198        &st.pool,
199        &principal,
200        "delete_ai_task",
201        "ai_task",
202        &task_id,
203        json!({}),
204    )
205    .await;
206    Ok(StatusCode::NO_CONTENT)
207}
208
209#[derive(Debug, Serialize)]
210struct WorkerTask {
211    id: String,
212    camera_id: String,
213    task_type: String,
214    stream_profile: String,
215    fps: f64,
216    width: i64,
217    config: Value,
218    frame_url: String,
219}
220
221/// Worker discovery: every enabled AI task on an enabled camera, with the frame URL to pull.
222async fn list_all_tasks(
223    State(st): State<AppState>,
224    principal: crate::auth::Principal,
225) -> AppResult<Json<Vec<WorkerTask>>> {
226    // Authentication floor: when auth is enabled this rejects anonymous callers (the worker sends an
227    // integration API key). When auth is disabled the principal is the synthetic system admin.
228    principal.require(principal.can_view(), "discover AI tasks")?;
229    let tasks = sqlx::query_as::<_, AiTask>(
230        "SELECT t.* FROM ai_tasks t JOIN cameras c ON c.id = t.camera_id
231         WHERE t.enabled = 1 AND c.enabled = 1
232         ORDER BY t.camera_id ASC",
233    )
234    .fetch_all(&st.pool)
235    .await?;
236    let out = tasks
237        .into_iter()
238        .map(|t| WorkerTask {
239            frame_url: format!(
240                "/api/v1/cameras/{}/frame?profile={}",
241                t.camera_id, t.stream_profile
242            ),
243            id: t.id,
244            camera_id: t.camera_id,
245            task_type: t.task_type,
246            stream_profile: t.stream_profile,
247            fps: t.fps,
248            width: t.width,
249            config: t.config.0,
250        })
251        .collect();
252    Ok(Json(out))
253}
254
255async fn sampler_status(
256    State(st): State<AppState>,
257    principal: Principal,
258) -> AppResult<Json<Vec<SamplerInfo>>> {
259    principal.require(principal.can_view(), "view sampler status")?;
260    Ok(Json(st.sampler.statuses().await))
261}
262
263#[derive(Debug, Deserialize)]
264struct FrameQuery {
265    profile: Option<String>,
266}
267
268/// Serve the latest sampled frame for a camera + stream profile (the AI worker's input).
269async fn latest_frame(
270    State(st): State<AppState>,
271    principal: crate::auth::Principal,
272    Path(id): Path<String>,
273    Query(q): Query<FrameQuery>,
274) -> AppResult<Response> {
275    // Authentication floor (a frame can contain faces/plates). Note: when auth is enabled the SPA's
276    // <img> tags cannot send a bearer header — token-in-query / cookie for the media plane is handled
277    // in the auth-split work; the worker authenticates via X-API-Key.
278    principal.require(principal.can_view(), "read camera frames")?;
279    // Defense in depth: the id becomes a path segment, so reject any separators/traversal.
280    if id.contains('/') || id.contains('\\') || id.contains("..") {
281        return Err(AppError::BadRequest("invalid camera id".into()));
282    }
283    let profile = q.profile.unwrap_or_else(|| "sub".into());
284    validate_profile(&profile)?;
285    let path = st.sampler.frame_path(&id, &profile);
286    let bytes = tokio::fs::read(&path).await.map_err(|_| {
287        AppError::NotFound("no sampled frame yet (is an AI task enabled for this camera?)".into())
288    })?;
289    let captured = tokio::fs::metadata(&path)
290        .await
291        .ok()
292        .and_then(|m| m.modified().ok())
293        .and_then(|t| {
294            t.duration_since(std::time::UNIX_EPOCH)
295                .ok()
296                .map(|d| chrono::DateTime::<Utc>::from_timestamp_millis(d.as_millis() as i64))
297        })
298        .flatten();
299    let age_ms = captured
300        .map(|c| (Utc::now() - c).num_milliseconds().max(0))
301        .unwrap_or(0);
302
303    Response::builder()
304        .header(header::CONTENT_TYPE, "image/jpeg")
305        .header(header::CACHE_CONTROL, "no-store")
306        .header("x-frame-age-ms", age_ms.to_string())
307        .header(
308            "x-frame-captured-at",
309            captured.map(|c| c.to_rfc3339()).unwrap_or_default(),
310        )
311        .body(Body::from(bytes))
312        .map_err(|e| AppError::Other(anyhow::anyhow!("building response: {e}")))
313}
314
315#[derive(Debug, Deserialize)]
316struct DetectionQuery {
317    from: Option<String>,
318    to: Option<String>,
319    label: Option<String>,
320    limit: Option<i64>,
321}
322
323async fn list_detections(
324    State(st): State<AppState>,
325    principal: crate::auth::Principal,
326    Path(id): Path<String>,
327    Query(q): Query<DetectionQuery>,
328) -> AppResult<Json<Vec<Detection>>> {
329    principal.require(principal.can_view(), "read detections")?;
330    let _ = load_camera(&st.pool, &id).await?;
331    let limit = q.limit.unwrap_or(200).clamp(1, 5000);
332    let from = parse_opt_ts(&q.from, "from")?;
333    let to = parse_opt_ts(&q.to, "to")?;
334    let rows = sqlx::query_as::<_, Detection>(
335        "SELECT * FROM detections
336         WHERE camera_id = ?
337           AND (? IS NULL OR timestamp >= ?)
338           AND (? IS NULL OR timestamp <= ?)
339           AND (? IS NULL OR label = ?)
340         ORDER BY timestamp DESC LIMIT ?",
341    )
342    .bind(&id)
343    .bind(from)
344    .bind(from)
345    .bind(to)
346    .bind(to)
347    .bind(&q.label)
348    .bind(&q.label)
349    .bind(limit)
350    .fetch_all(&st.pool)
351    .await?;
352    Ok(Json(rows))
353}
354
355/// Max detections accepted in a single ingest request (DoS / write-amplification bound).
356const MAX_INGEST_DETECTIONS: usize = 1000;
357
358/// Hard cap on the ingest request body, enforced by the framework BEFORE deserialization (defense
359/// in depth vs the post-parse count guard). 8 MiB comfortably fits MAX_INGEST_DETECTIONS detections
360/// with bounding boxes + attributes, while refusing a body crafted to exhaust memory.
361const INGEST_BODY_LIMIT_BYTES: usize = 8 * 1024 * 1024;
362
363/// Columns bound per detection row in the batched INSERT in [`ingest`].
364const DETECTION_INSERT_COLS: usize = 11;
365/// SQLite's compile-time bound-variable ceiling (SQLITE_MAX_VARIABLE_NUMBER). The batched insert is
366/// chunked so a single statement never exceeds it, even at [`MAX_INGEST_DETECTIONS`].
367const SQLITE_MAX_BIND_VARS: usize = 999;
368/// Detection rows per INSERT statement (≈90), keeping bound variables under [`SQLITE_MAX_BIND_VARS`].
369const DETECTION_INSERT_CHUNK: usize = SQLITE_MAX_BIND_VARS / DETECTION_INSERT_COLS;
370
371/// Ingest detections (and an optional event) posted by an AI worker. Detections are written in a
372/// single transaction so a batch is all-or-nothing.
373async fn ingest(
374    State(st): State<AppState>,
375    principal: crate::auth::Principal,
376    Json(body): Json<AiIngest>,
377) -> AppResult<Json<Value>> {
378    principal.require(principal.can_ingest(), "ingest perception events")?;
379    let cam = load_camera(&st.pool, &body.camera_id).await?;
380    if body.task_type.trim().is_empty() {
381        return Err(AppError::BadRequest("`task_type` is required".into()));
382    }
383    if body.detections.len() > MAX_INGEST_DETECTIONS {
384        return Err(AppError::BadRequest(format!(
385            "too many detections in one request ({}); max {MAX_INGEST_DETECTIONS}",
386            body.detections.len()
387        )));
388    }
389    let ts = parse_opt_ts(&body.timestamp, "timestamp")?.unwrap_or_else(Utc::now);
390
391    let mut inserted = 0u64;
392    let mut tx = st.pool.begin().await?;
393    // Idempotency + atomic capture: record the batch in the outbox FIRST, in the same transaction.
394    // A duplicate (camera_id, frame_id) — i.e. an at-least-once redelivery — conflicts and inserts 0
395    // rows; we then skip both the detection writes and the consumer fan-out, so a replayed batch can
396    // never double-count ANPR votes or corrupt zone state. With no frame_id every batch is accepted.
397    let outbox_res = sqlx::query(
398        "INSERT INTO outbox (topic, camera_id, site_id, frame_id, task_type, detection_count, created_at)
399         VALUES ('detections', ?, ?, ?, ?, ?, ?)
400         ON CONFLICT DO NOTHING",
401    )
402    .bind(&body.camera_id)
403    .bind(&cam.site_id)
404    .bind(&body.frame_id)
405    .bind(&body.task_type)
406    .bind(body.detections.len() as i64)
407    .bind(Utc::now())
408    .execute(&mut *tx)
409    .await?;
410    if outbox_res.rows_affected() == 0 {
411        // Duplicate frame already ingested — no-op (idempotent).
412        tx.commit().await?;
413        return Ok(Json(json!({ "detections_ingested": 0, "duplicate": true })));
414    }
415    // Batched multi-row insert: one INSERT per chunk instead of one statement per detection. Same
416    // columns, values, and semantics as the prior per-row loop, still inside the transaction. We
417    // chunk so a single statement's bound-variable count stays under SQLite's limit even at
418    // MAX_INGEST_DETECTIONS.
419    for chunk in body.detections.chunks(DETECTION_INSERT_CHUNK) {
420        let tuples = vec!["(?,?,?,?,?,?,?,?,?,?,?)"; chunk.len()].join(",");
421        let sql = format!(
422            "INSERT INTO detections
423               (id, camera_id, task_type, timestamp, label, confidence, bbox, track_id, attributes, frame_id, created_at)
424             VALUES {tuples}"
425        );
426        let mut q = sqlx::query(&sql);
427        for d in chunk {
428            let bbox = d.bbox.clone().map(SqlxJson);
429            let attrs = SqlxJson(d.attributes.clone().unwrap_or_else(|| json!({})));
430            q = q
431                .bind(format!("det_{}", Uuid::new_v4().simple()))
432                .bind(&body.camera_id)
433                .bind(&body.task_type)
434                .bind(ts)
435                .bind(&d.label)
436                .bind(d.confidence)
437                .bind(bbox)
438                .bind(&d.track_id)
439                .bind(attrs)
440                .bind(&body.frame_id)
441                .bind(Utc::now());
442        }
443        inserted += q.execute(&mut *tx).await?.rows_affected();
444    }
445    tx.commit().await?;
446
447    // Fan the committed batch out to registered perception consumers (zones, ANPR/entry, future
448    // apps). The kernel does not know or branch on which apps exist — each consumer self-selects by
449    // task_type. Engines that need trustworthy timing use server time, not the worker timestamp.
450    //
451    // Durability: fan-out happens after commit, so a crash here would otherwise drop the consumer
452    // notification. `fan_out` claims each (consumer, frame) at-most-once; on success we mark the
453    // outbox batch fanned, and the `fanout` drainer replays any batch left un-fanned by a crash.
454    let batch = crate::services::consumer::DetectionBatch {
455        camera_id: &body.camera_id,
456        site_id: cam.site_id.as_deref(),
457        task_type: &body.task_type,
458        detections: &body.detections,
459        timestamp: ts,
460    };
461    let fanned = crate::services::consumer::fan_out(
462        &st.pool,
463        &st.consumers,
464        &batch,
465        body.frame_id.as_deref(),
466    )
467    .await;
468    if fanned {
469        if let Some(fid) = body.frame_id.as_deref() {
470            let _ = sqlx::query(
471                "UPDATE outbox SET fanned_out_at = ? \
472                 WHERE topic = 'detections' AND camera_id = ? AND frame_id = ? AND fanned_out_at IS NULL",
473            )
474            .bind(Utc::now())
475            .bind(&body.camera_id)
476            .bind(fid)
477            .execute(&st.pool)
478            .await;
479        }
480    }
481
482    if let Some(ev) = &body.event {
483        let severity = ev.severity.clone().unwrap_or_else(|| "info".into());
484        let payload = ev.payload.clone().unwrap_or_else(|| json!({}));
485        crate::repo::log_event(
486            &st.pool,
487            Some(&body.camera_id),
488            &ev.event_type,
489            &severity,
490            payload,
491        )
492        .await?;
493    }
494
495    Ok(Json(json!({ "detections_ingested": inserted })))
496}
497
498fn parse_opt_ts(s: &Option<String>, field: &str) -> AppResult<Option<chrono::DateTime<Utc>>> {
499    match s {
500        Some(v) => crate::util::parse_rfc3339(v)
501            .map(Some)
502            .ok_or_else(|| AppError::BadRequest(format!("invalid `{field}` timestamp"))),
503        None => Ok(None),
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510
511    #[test]
512    fn validate_profile_accepts_sub_and_main() {
513        assert!(validate_profile("sub").is_ok());
514        assert!(validate_profile("main").is_ok());
515    }
516
517    #[test]
518    fn validate_profile_rejects_other_values() {
519        // Case-sensitive and whitespace-sensitive: only the exact lowercase tokens pass.
520        for bad in ["", "Sub", "MAIN", " sub", "sub ", "substream", "foo"] {
521            match validate_profile(bad) {
522                Err(AppError::BadRequest(m)) => {
523                    assert!(
524                        m.contains("stream_profile"),
525                        "unexpected message for {bad:?}: {m}"
526                    );
527                }
528                other => panic!("expected BadRequest for {bad:?}, got {other:?}"),
529            }
530        }
531    }
532
533    #[test]
534    fn parse_opt_ts_none_is_ok_none() {
535        let out = parse_opt_ts(&None, "from").unwrap();
536        assert!(out.is_none());
537    }
538
539    #[test]
540    fn parse_opt_ts_valid_matches_util() {
541        let raw = "2026-06-13T05:02:19Z".to_string();
542        let parsed = parse_opt_ts(&Some(raw.clone()), "from").unwrap();
543        // parse_opt_ts is a thin wrapper over crate::util::parse_rfc3339: anchor to it.
544        assert_eq!(parsed, crate::util::parse_rfc3339(&raw));
545        assert_eq!(parsed.unwrap().to_rfc3339(), "2026-06-13T05:02:19+00:00");
546    }
547
548    #[test]
549    fn parse_opt_ts_invalid_reports_field() {
550        match parse_opt_ts(&Some("not-a-timestamp".to_string()), "to") {
551            Err(AppError::BadRequest(m)) => {
552                assert!(m.contains("to"), "message should name the field: {m}");
553                assert!(m.contains("timestamp"), "message: {m}");
554            }
555            other => panic!("expected BadRequest, got {other:?}"),
556        }
557    }
558
559    #[test]
560    fn max_ingest_detections_bound_is_stable() {
561        assert_eq!(MAX_INGEST_DETECTIONS, 1000);
562    }
563}