Skip to main content

heldar_kernel/routes/
ai.rs

1//! Stage 2 AI surface: AI task CRUD, the worker contract (discover tasks, pull the latest sampled
2//! frame, post detections/events back), sampler status, and a detections query.
3
4use axum::body::Body;
5use axum::extract::{Path, Query, State};
6use axum::http::{header, StatusCode};
7use axum::response::Response;
8use axum::routing::{get, post};
9use axum::{Json, Router};
10use chrono::Utc;
11use serde::{Deserialize, Serialize};
12use serde_json::{json, Value};
13use sqlx::types::Json as SqlxJson;
14use uuid::Uuid;
15
16use crate::auth::{self, Principal};
17use crate::error::{AppError, AppResult};
18use crate::models::{AiIngest, AiTask, AiTaskCreate, AiTaskUpdate, Detection};
19use crate::routes::cameras::load_camera;
20use crate::services::sampler::SamplerInfo;
21use crate::state::AppState;
22
23pub fn router() -> Router<AppState> {
24    Router::new()
25        .route(
26            "/api/v1/cameras/{id}/ai-tasks",
27            get(list_camera_tasks).post(create_task),
28        )
29        .route(
30            "/api/v1/ai-tasks/{task_id}",
31            axum::routing::patch(update_task).delete(delete_task),
32        )
33        .route("/api/v1/ai/tasks", get(list_all_tasks))
34        .route("/api/v1/ai/samplers", get(sampler_status))
35        .route("/api/v1/ai/events", post(ingest))
36        .route("/api/v1/cameras/{id}/frame", get(latest_frame))
37        .route("/api/v1/cameras/{id}/detections", get(list_detections))
38}
39
40fn validate_profile(p: &str) -> AppResult<()> {
41    if matches!(p, "sub" | "main") {
42        Ok(())
43    } else {
44        Err(AppError::BadRequest(
45            "`stream_profile` must be 'sub' or 'main'".into(),
46        ))
47    }
48}
49
50async fn list_camera_tasks(
51    State(st): State<AppState>,
52    Path(id): Path<String>,
53    principal: Principal,
54) -> AppResult<Json<Vec<AiTask>>> {
55    principal.require(principal.can_view(), "view AI tasks")?;
56    let _ = load_camera(&st.pool, &id).await?;
57    let tasks = sqlx::query_as::<_, AiTask>(
58        "SELECT * FROM ai_tasks WHERE camera_id = ? ORDER BY created_at ASC",
59    )
60    .bind(&id)
61    .fetch_all(&st.pool)
62    .await?;
63    Ok(Json(tasks))
64}
65
66async fn create_task(
67    State(st): State<AppState>,
68    Path(id): Path<String>,
69    principal: Principal,
70    Json(body): Json<AiTaskCreate>,
71) -> AppResult<(StatusCode, Json<AiTask>)> {
72    principal.require(principal.can_manage_registry(), "create AI tasks")?;
73    let _ = load_camera(&st.pool, &id).await?;
74    if body.task_type.trim().is_empty() {
75        return Err(AppError::BadRequest("`task_type` is required".into()));
76    }
77    let profile = body.stream_profile.unwrap_or_else(|| "sub".into());
78    validate_profile(&profile)?;
79    let fps = body.fps.unwrap_or(st.cfg.default_ai_fps).clamp(0.1, 30.0);
80    let width = body
81        .width
82        .unwrap_or(st.cfg.default_ai_width)
83        .clamp(160, 3840);
84    let enabled = body.enabled.unwrap_or(true);
85    let config = SqlxJson(body.config.unwrap_or_else(|| json!({})));
86    let now = Utc::now();
87    let task_id = format!("ai_{}", Uuid::new_v4().simple());
88
89    sqlx::query(
90        "INSERT INTO ai_tasks
91           (id, camera_id, task_type, enabled, stream_profile, fps, width, config, created_at, updated_at)
92         VALUES (?,?,?,?,?,?,?,?,?,?)",
93    )
94    .bind(&task_id)
95    .bind(&id)
96    .bind(&body.task_type)
97    .bind(enabled)
98    .bind(&profile)
99    .bind(fps)
100    .bind(width)
101    .bind(config)
102    .bind(now)
103    .bind(now)
104    .execute(&st.pool)
105    .await?;
106
107    st.sampler.reconcile().await;
108    let task = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
109        .bind(&task_id)
110        .fetch_one(&st.pool)
111        .await?;
112    auth::audit(
113        &st.pool,
114        &principal,
115        "create_ai_task",
116        "ai_task",
117        &task_id,
118        json!({ "camera_id": &id, "task_type": &task.task_type }),
119    )
120    .await;
121    Ok((StatusCode::CREATED, Json(task)))
122}
123
124async fn update_task(
125    State(st): State<AppState>,
126    Path(task_id): Path<String>,
127    principal: Principal,
128    Json(body): Json<AiTaskUpdate>,
129) -> AppResult<Json<AiTask>> {
130    principal.require(principal.can_manage_registry(), "update AI tasks")?;
131    let cur = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
132        .bind(&task_id)
133        .fetch_optional(&st.pool)
134        .await?
135        .ok_or_else(|| AppError::NotFound(format!("ai task {task_id} not found")))?;
136
137    let task_type = body.task_type.unwrap_or(cur.task_type);
138    let profile = body.stream_profile.unwrap_or(cur.stream_profile);
139    validate_profile(&profile)?;
140    let fps = body.fps.map(|v| v.clamp(0.1, 30.0)).unwrap_or(cur.fps);
141    let width = body.width.map(|v| v.clamp(160, 3840)).unwrap_or(cur.width);
142    let enabled = body.enabled.unwrap_or(cur.enabled);
143    let config = SqlxJson(body.config.unwrap_or(cur.config.0));
144
145    sqlx::query(
146        "UPDATE ai_tasks SET task_type=?, stream_profile=?, fps=?, width=?, enabled=?, config=?, updated_at=?
147         WHERE id=?",
148    )
149    .bind(&task_type)
150    .bind(&profile)
151    .bind(fps)
152    .bind(width)
153    .bind(enabled)
154    .bind(config)
155    .bind(Utc::now())
156    .bind(&task_id)
157    .execute(&st.pool)
158    .await?;
159
160    st.sampler.reconcile().await;
161    let task = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
162        .bind(&task_id)
163        .fetch_one(&st.pool)
164        .await?;
165    auth::audit(
166        &st.pool,
167        &principal,
168        "update_ai_task",
169        "ai_task",
170        &task_id,
171        json!({}),
172    )
173    .await;
174    Ok(Json(task))
175}
176
177async fn delete_task(
178    State(st): State<AppState>,
179    Path(task_id): Path<String>,
180    principal: Principal,
181) -> AppResult<StatusCode> {
182    principal.require(principal.can_manage_registry(), "delete AI tasks")?;
183    let res = sqlx::query("DELETE FROM ai_tasks WHERE id = ?")
184        .bind(&task_id)
185        .execute(&st.pool)
186        .await?;
187    if res.rows_affected() == 0 {
188        return Err(AppError::NotFound(format!("ai task {task_id} not found")));
189    }
190    st.sampler.reconcile().await;
191    auth::audit(
192        &st.pool,
193        &principal,
194        "delete_ai_task",
195        "ai_task",
196        &task_id,
197        json!({}),
198    )
199    .await;
200    Ok(StatusCode::NO_CONTENT)
201}
202
203#[derive(Debug, Serialize)]
204struct WorkerTask {
205    id: String,
206    camera_id: String,
207    task_type: String,
208    stream_profile: String,
209    fps: f64,
210    width: i64,
211    config: Value,
212    frame_url: String,
213}
214
215/// Worker discovery: every enabled AI task on an enabled camera, with the frame URL to pull.
216async fn list_all_tasks(
217    State(st): State<AppState>,
218    principal: crate::auth::Principal,
219) -> AppResult<Json<Vec<WorkerTask>>> {
220    // Authentication floor: when auth is enabled this rejects anonymous callers (the worker sends an
221    // integration API key). When auth is disabled the principal is the synthetic system admin.
222    principal.require(principal.can_view(), "discover AI tasks")?;
223    let tasks = sqlx::query_as::<_, AiTask>(
224        "SELECT t.* FROM ai_tasks t JOIN cameras c ON c.id = t.camera_id
225         WHERE t.enabled = 1 AND c.enabled = 1
226         ORDER BY t.camera_id ASC",
227    )
228    .fetch_all(&st.pool)
229    .await?;
230    let out = tasks
231        .into_iter()
232        .map(|t| WorkerTask {
233            frame_url: format!(
234                "/api/v1/cameras/{}/frame?profile={}",
235                t.camera_id, t.stream_profile
236            ),
237            id: t.id,
238            camera_id: t.camera_id,
239            task_type: t.task_type,
240            stream_profile: t.stream_profile,
241            fps: t.fps,
242            width: t.width,
243            config: t.config.0,
244        })
245        .collect();
246    Ok(Json(out))
247}
248
249async fn sampler_status(
250    State(st): State<AppState>,
251    principal: Principal,
252) -> AppResult<Json<Vec<SamplerInfo>>> {
253    principal.require(principal.can_view(), "view sampler status")?;
254    Ok(Json(st.sampler.statuses().await))
255}
256
257#[derive(Debug, Deserialize)]
258struct FrameQuery {
259    profile: Option<String>,
260}
261
262/// Serve the latest sampled frame for a camera + stream profile (the AI worker's input).
263async fn latest_frame(
264    State(st): State<AppState>,
265    principal: crate::auth::Principal,
266    Path(id): Path<String>,
267    Query(q): Query<FrameQuery>,
268) -> AppResult<Response> {
269    // Authentication floor (a frame can contain faces/plates). Note: when auth is enabled the SPA's
270    // <img> tags cannot send a bearer header — token-in-query / cookie for the media plane is handled
271    // in the auth-split work; the worker authenticates via X-API-Key.
272    principal.require(principal.can_view(), "read camera frames")?;
273    // Defense in depth: the id becomes a path segment, so reject any separators/traversal.
274    if id.contains('/') || id.contains('\\') || id.contains("..") {
275        return Err(AppError::BadRequest("invalid camera id".into()));
276    }
277    let profile = q.profile.unwrap_or_else(|| "sub".into());
278    validate_profile(&profile)?;
279    let path = st.sampler.frame_path(&id, &profile);
280    let bytes = tokio::fs::read(&path).await.map_err(|_| {
281        AppError::NotFound("no sampled frame yet (is an AI task enabled for this camera?)".into())
282    })?;
283    let captured = tokio::fs::metadata(&path)
284        .await
285        .ok()
286        .and_then(|m| m.modified().ok())
287        .and_then(|t| {
288            t.duration_since(std::time::UNIX_EPOCH)
289                .ok()
290                .map(|d| chrono::DateTime::<Utc>::from_timestamp_millis(d.as_millis() as i64))
291        })
292        .flatten();
293    let age_ms = captured
294        .map(|c| (Utc::now() - c).num_milliseconds().max(0))
295        .unwrap_or(0);
296
297    Response::builder()
298        .header(header::CONTENT_TYPE, "image/jpeg")
299        .header(header::CACHE_CONTROL, "no-store")
300        .header("x-frame-age-ms", age_ms.to_string())
301        .header(
302            "x-frame-captured-at",
303            captured.map(|c| c.to_rfc3339()).unwrap_or_default(),
304        )
305        .body(Body::from(bytes))
306        .map_err(|e| AppError::Other(anyhow::anyhow!("building response: {e}")))
307}
308
309#[derive(Debug, Deserialize)]
310struct DetectionQuery {
311    from: Option<String>,
312    to: Option<String>,
313    label: Option<String>,
314    limit: Option<i64>,
315}
316
317async fn list_detections(
318    State(st): State<AppState>,
319    principal: crate::auth::Principal,
320    Path(id): Path<String>,
321    Query(q): Query<DetectionQuery>,
322) -> AppResult<Json<Vec<Detection>>> {
323    principal.require(principal.can_view(), "read detections")?;
324    let _ = load_camera(&st.pool, &id).await?;
325    let limit = q.limit.unwrap_or(200).clamp(1, 5000);
326    let from = parse_opt_ts(&q.from, "from")?;
327    let to = parse_opt_ts(&q.to, "to")?;
328    let rows = sqlx::query_as::<_, Detection>(
329        "SELECT * FROM detections
330         WHERE camera_id = ?
331           AND (? IS NULL OR timestamp >= ?)
332           AND (? IS NULL OR timestamp <= ?)
333           AND (? IS NULL OR label = ?)
334         ORDER BY timestamp DESC LIMIT ?",
335    )
336    .bind(&id)
337    .bind(from)
338    .bind(from)
339    .bind(to)
340    .bind(to)
341    .bind(&q.label)
342    .bind(&q.label)
343    .bind(limit)
344    .fetch_all(&st.pool)
345    .await?;
346    Ok(Json(rows))
347}
348
349/// Max detections accepted in a single ingest request (DoS / write-amplification bound).
350const MAX_INGEST_DETECTIONS: usize = 1000;
351
352/// Ingest detections (and an optional event) posted by an AI worker. Detections are written in a
353/// single transaction so a batch is all-or-nothing.
354async fn ingest(
355    State(st): State<AppState>,
356    principal: crate::auth::Principal,
357    Json(body): Json<AiIngest>,
358) -> AppResult<Json<Value>> {
359    principal.require(principal.can_ingest(), "ingest perception events")?;
360    let cam = load_camera(&st.pool, &body.camera_id).await?;
361    if body.task_type.trim().is_empty() {
362        return Err(AppError::BadRequest("`task_type` is required".into()));
363    }
364    if body.detections.len() > MAX_INGEST_DETECTIONS {
365        return Err(AppError::BadRequest(format!(
366            "too many detections in one request ({}); max {MAX_INGEST_DETECTIONS}",
367            body.detections.len()
368        )));
369    }
370    let ts = parse_opt_ts(&body.timestamp, "timestamp")?.unwrap_or_else(Utc::now);
371
372    let mut inserted = 0u64;
373    let mut tx = st.pool.begin().await?;
374    // Idempotency + atomic capture: record the batch in the outbox FIRST, in the same transaction.
375    // A duplicate (camera_id, frame_id) — i.e. an at-least-once redelivery — conflicts and inserts 0
376    // rows; we then skip both the detection writes and the consumer fan-out, so a replayed batch can
377    // never double-count ANPR votes or corrupt zone state. With no frame_id every batch is accepted.
378    let outbox_res = sqlx::query(
379        "INSERT INTO outbox (topic, camera_id, site_id, frame_id, task_type, detection_count, created_at)
380         VALUES ('detections', ?, ?, ?, ?, ?, ?)
381         ON CONFLICT DO NOTHING",
382    )
383    .bind(&body.camera_id)
384    .bind(&cam.site_id)
385    .bind(&body.frame_id)
386    .bind(&body.task_type)
387    .bind(body.detections.len() as i64)
388    .bind(Utc::now())
389    .execute(&mut *tx)
390    .await?;
391    if outbox_res.rows_affected() == 0 {
392        // Duplicate frame already ingested — no-op (idempotent).
393        tx.commit().await?;
394        return Ok(Json(json!({ "detections_ingested": 0, "duplicate": true })));
395    }
396    for d in &body.detections {
397        let bbox = d.bbox.clone().map(SqlxJson);
398        let attrs = SqlxJson(d.attributes.clone().unwrap_or_else(|| json!({})));
399        sqlx::query(
400            "INSERT INTO detections
401               (id, camera_id, task_type, timestamp, label, confidence, bbox, track_id, attributes, frame_id, created_at)
402             VALUES (?,?,?,?,?,?,?,?,?,?,?)",
403        )
404        .bind(format!("det_{}", Uuid::new_v4().simple()))
405        .bind(&body.camera_id)
406        .bind(&body.task_type)
407        .bind(ts)
408        .bind(&d.label)
409        .bind(d.confidence)
410        .bind(bbox)
411        .bind(&d.track_id)
412        .bind(attrs)
413        .bind(&body.frame_id)
414        .bind(Utc::now())
415        .execute(&mut *tx)
416        .await?;
417        inserted += 1;
418    }
419    tx.commit().await?;
420
421    // Fan the committed batch out to registered perception consumers (zones, ANPR/entry, future
422    // apps). The kernel does not know or branch on which apps exist — each consumer self-selects by
423    // task_type. Engines that need trustworthy timing use server time, not the worker timestamp.
424    let batch = crate::services::consumer::DetectionBatch {
425        camera_id: &body.camera_id,
426        site_id: cam.site_id.as_deref(),
427        task_type: &body.task_type,
428        detections: &body.detections,
429        timestamp: ts,
430    };
431    for consumer in st.consumers.iter() {
432        if consumer.interested_in(&body.task_type) {
433            tracing::trace!(consumer = consumer.name(), task_type = %body.task_type, "ingest fan-out");
434            consumer.consume(&batch).await;
435        }
436    }
437
438    if let Some(ev) = &body.event {
439        let severity = ev.severity.clone().unwrap_or_else(|| "info".into());
440        let payload = ev.payload.clone().unwrap_or_else(|| json!({}));
441        crate::repo::log_event(
442            &st.pool,
443            Some(&body.camera_id),
444            &ev.event_type,
445            &severity,
446            payload,
447        )
448        .await?;
449    }
450
451    Ok(Json(json!({ "detections_ingested": inserted })))
452}
453
454fn parse_opt_ts(s: &Option<String>, field: &str) -> AppResult<Option<chrono::DateTime<Utc>>> {
455    match s {
456        Some(v) => crate::util::parse_rfc3339(v)
457            .map(Some)
458            .ok_or_else(|| AppError::BadRequest(format!("invalid `{field}` timestamp"))),
459        None => Ok(None),
460    }
461}