Skip to main content

heldar_kernel/routes/
ai.rs

1//! Stage 2 AI surface: AI task CRUD, the worker contract (discover tasks, pull the latest sampled
2//! frame, post detections/events back), sampler status, and a detections query.
3
4use axum::body::Body;
5use axum::extract::{DefaultBodyLimit, Path, Query, State};
6use axum::http::{header, StatusCode};
7use axum::response::Response;
8use axum::routing::{get, post};
9use axum::{Json, Router};
10use chrono::Utc;
11use serde::{Deserialize, Serialize};
12use serde_json::{json, Value};
13use sqlx::types::Json as SqlxJson;
14use uuid::Uuid;
15
16use crate::auth::{self, Principal};
17use crate::error::{AppError, AppResult};
18use crate::models::{AiIngest, AiTask, AiTaskCreate, AiTaskUpdate, Detection};
19use crate::routes::cameras::load_camera;
20use crate::services::sampler::SamplerInfo;
21use crate::state::AppState;
22
23pub fn router() -> Router<AppState> {
24    Router::new()
25        .route(
26            "/api/v1/cameras/{id}/ai-tasks",
27            get(list_camera_tasks).post(create_task),
28        )
29        .route(
30            "/api/v1/ai-tasks/{task_id}",
31            axum::routing::patch(update_task).delete(delete_task),
32        )
33        .route("/api/v1/ai/tasks", get(list_all_tasks))
34        .route("/api/v1/ai/samplers", get(sampler_status))
35        // Bound the ingest body BEFORE deserialization so a hostile/buggy worker can't force a huge
36        // allocation (the MAX_INGEST_DETECTIONS count check only runs after the body is fully parsed).
37        // Generous headroom for MAX_INGEST_DETECTIONS rich detections; well under any real batch.
38        .route(
39            "/api/v1/ai/events",
40            post(ingest).layer(DefaultBodyLimit::max(INGEST_BODY_LIMIT_BYTES)),
41        )
42        .route("/api/v1/cameras/{id}/frame", get(latest_frame))
43        .route("/api/v1/cameras/{id}/detections", get(list_detections))
44}
45
46fn validate_profile(p: &str) -> AppResult<()> {
47    if matches!(p, "sub" | "main") {
48        Ok(())
49    } else {
50        Err(AppError::BadRequest(
51            "`stream_profile` must be 'sub' or 'main'".into(),
52        ))
53    }
54}
55
56async fn list_camera_tasks(
57    State(st): State<AppState>,
58    Path(id): Path<String>,
59    principal: Principal,
60) -> AppResult<Json<Vec<AiTask>>> {
61    principal.require(principal.can_view(), "view AI tasks")?;
62    let _ = load_camera(&st.pool, &id).await?;
63    let tasks = sqlx::query_as::<_, AiTask>(
64        "SELECT * FROM ai_tasks WHERE camera_id = ? ORDER BY created_at ASC",
65    )
66    .bind(&id)
67    .fetch_all(&st.pool)
68    .await?;
69    Ok(Json(tasks))
70}
71
72async fn create_task(
73    State(st): State<AppState>,
74    Path(id): Path<String>,
75    principal: Principal,
76    Json(body): Json<AiTaskCreate>,
77) -> AppResult<(StatusCode, Json<AiTask>)> {
78    principal.require(principal.can_manage_registry(), "create AI tasks")?;
79    let _ = load_camera(&st.pool, &id).await?;
80    if body.task_type.trim().is_empty() {
81        return Err(AppError::BadRequest("`task_type` is required".into()));
82    }
83    let profile = body.stream_profile.unwrap_or_else(|| "sub".into());
84    validate_profile(&profile)?;
85    let fps = body.fps.unwrap_or(st.cfg.default_ai_fps).clamp(0.1, 30.0);
86    let width = body
87        .width
88        .unwrap_or(st.cfg.default_ai_width)
89        .clamp(160, 3840);
90    let enabled = body.enabled.unwrap_or(true);
91    let config = SqlxJson(body.config.unwrap_or_else(|| json!({})));
92
93    // Idempotency: a camera has at most one task of a given type per stream profile. If one already
94    // exists, return it instead of silently creating a duplicate — stacked-up identical detection
95    // tasks (e.g. a provisioning script re-POSTing on every restart) waste inference. Change an
96    // existing task via PATCH, not by re-creating it.
97    if let Some(existing) = sqlx::query_as::<_, AiTask>(
98        "SELECT * FROM ai_tasks WHERE camera_id = ? AND task_type = ? AND stream_profile = ?",
99    )
100    .bind(&id)
101    .bind(&body.task_type)
102    .bind(&profile)
103    .fetch_optional(&st.pool)
104    .await?
105    {
106        return Ok((StatusCode::OK, Json(existing)));
107    }
108
109    let now = Utc::now();
110    let task_id = format!("ai_{}", Uuid::new_v4().simple());
111
112    sqlx::query(
113        "INSERT INTO ai_tasks
114           (id, camera_id, task_type, enabled, stream_profile, fps, width, config, created_at, updated_at)
115         VALUES (?,?,?,?,?,?,?,?,?,?)",
116    )
117    .bind(&task_id)
118    .bind(&id)
119    .bind(&body.task_type)
120    .bind(enabled)
121    .bind(&profile)
122    .bind(fps)
123    .bind(width)
124    .bind(config)
125    .bind(now)
126    .bind(now)
127    .execute(&st.pool)
128    .await?;
129
130    st.sampler.reconcile().await;
131    let task = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
132        .bind(&task_id)
133        .fetch_one(&st.pool)
134        .await?;
135    auth::audit(
136        &st.pool,
137        &principal,
138        "create_ai_task",
139        "ai_task",
140        &task_id,
141        json!({ "camera_id": &id, "task_type": &task.task_type }),
142    )
143    .await;
144    Ok((StatusCode::CREATED, Json(task)))
145}
146
147async fn update_task(
148    State(st): State<AppState>,
149    Path(task_id): Path<String>,
150    principal: Principal,
151    Json(body): Json<AiTaskUpdate>,
152) -> AppResult<Json<AiTask>> {
153    principal.require(principal.can_manage_registry(), "update AI tasks")?;
154    let cur = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
155        .bind(&task_id)
156        .fetch_optional(&st.pool)
157        .await?
158        .ok_or_else(|| AppError::NotFound(format!("ai task {task_id} not found")))?;
159
160    let task_type = body.task_type.unwrap_or(cur.task_type);
161    let profile = body.stream_profile.unwrap_or(cur.stream_profile);
162    validate_profile(&profile)?;
163    let fps = body.fps.map(|v| v.clamp(0.1, 30.0)).unwrap_or(cur.fps);
164    let width = body.width.map(|v| v.clamp(160, 3840)).unwrap_or(cur.width);
165    let enabled = body.enabled.unwrap_or(cur.enabled);
166    let config = SqlxJson(body.config.unwrap_or(cur.config.0));
167
168    sqlx::query(
169        "UPDATE ai_tasks SET task_type=?, stream_profile=?, fps=?, width=?, enabled=?, config=?, updated_at=?
170         WHERE id=?",
171    )
172    .bind(&task_type)
173    .bind(&profile)
174    .bind(fps)
175    .bind(width)
176    .bind(enabled)
177    .bind(config)
178    .bind(Utc::now())
179    .bind(&task_id)
180    .execute(&st.pool)
181    .await?;
182
183    st.sampler.reconcile().await;
184    let task = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
185        .bind(&task_id)
186        .fetch_one(&st.pool)
187        .await?;
188    auth::audit(
189        &st.pool,
190        &principal,
191        "update_ai_task",
192        "ai_task",
193        &task_id,
194        json!({}),
195    )
196    .await;
197    Ok(Json(task))
198}
199
200async fn delete_task(
201    State(st): State<AppState>,
202    Path(task_id): Path<String>,
203    principal: Principal,
204) -> AppResult<StatusCode> {
205    principal.require(principal.can_manage_registry(), "delete AI tasks")?;
206    let res = sqlx::query("DELETE FROM ai_tasks WHERE id = ?")
207        .bind(&task_id)
208        .execute(&st.pool)
209        .await?;
210    if res.rows_affected() == 0 {
211        return Err(AppError::NotFound(format!("ai task {task_id} not found")));
212    }
213    st.sampler.reconcile().await;
214    auth::audit(
215        &st.pool,
216        &principal,
217        "delete_ai_task",
218        "ai_task",
219        &task_id,
220        json!({}),
221    )
222    .await;
223    Ok(StatusCode::NO_CONTENT)
224}
225
226#[derive(Debug, Serialize)]
227struct WorkerTask {
228    id: String,
229    camera_id: String,
230    task_type: String,
231    stream_profile: String,
232    fps: f64,
233    width: i64,
234    config: Value,
235    frame_url: String,
236}
237
238/// Worker discovery: every enabled AI task on an enabled camera, with the frame URL to pull.
239async fn list_all_tasks(
240    State(st): State<AppState>,
241    principal: crate::auth::Principal,
242) -> AppResult<Json<Vec<WorkerTask>>> {
243    // Authentication floor: when auth is enabled this rejects anonymous callers (the worker sends an
244    // integration API key). When auth is disabled the principal is the synthetic system admin.
245    principal.require(principal.can_view(), "discover AI tasks")?;
246    let tasks = sqlx::query_as::<_, AiTask>(
247        "SELECT t.* FROM ai_tasks t JOIN cameras c ON c.id = t.camera_id
248         WHERE t.enabled = 1 AND c.enabled = 1
249         ORDER BY t.camera_id ASC",
250    )
251    .fetch_all(&st.pool)
252    .await?;
253    let out = tasks
254        .into_iter()
255        .map(|t| WorkerTask {
256            frame_url: format!(
257                "/api/v1/cameras/{}/frame?profile={}",
258                t.camera_id, t.stream_profile
259            ),
260            id: t.id,
261            camera_id: t.camera_id,
262            task_type: t.task_type,
263            stream_profile: t.stream_profile,
264            fps: t.fps,
265            width: t.width,
266            config: t.config.0,
267        })
268        .collect();
269    Ok(Json(out))
270}
271
272async fn sampler_status(
273    State(st): State<AppState>,
274    principal: Principal,
275) -> AppResult<Json<Vec<SamplerInfo>>> {
276    principal.require(principal.can_view(), "view sampler status")?;
277    Ok(Json(st.sampler.statuses().await))
278}
279
280#[derive(Debug, Deserialize)]
281struct FrameQuery {
282    profile: Option<String>,
283}
284
285/// Serve the latest sampled frame for a camera + stream profile (the AI worker's input).
286async fn latest_frame(
287    State(st): State<AppState>,
288    principal: crate::auth::Principal,
289    Path(id): Path<String>,
290    Query(q): Query<FrameQuery>,
291) -> AppResult<Response> {
292    // Authentication floor (a frame can contain faces/plates). Note: when auth is enabled the SPA's
293    // <img> tags cannot send a bearer header — token-in-query / cookie for the media plane is handled
294    // in the auth-split work; the worker authenticates via X-API-Key.
295    principal.require(principal.can_view(), "read camera frames")?;
296    // Defense in depth: the id becomes a path segment, so reject any separators/traversal.
297    if id.contains('/') || id.contains('\\') || id.contains("..") {
298        return Err(AppError::BadRequest("invalid camera id".into()));
299    }
300    let profile = q.profile.unwrap_or_else(|| "sub".into());
301    validate_profile(&profile)?;
302    let path = st.sampler.frame_path(&id, &profile);
303    let bytes = tokio::fs::read(&path).await.map_err(|_| {
304        AppError::NotFound("no sampled frame yet (is an AI task enabled for this camera?)".into())
305    })?;
306    let captured = tokio::fs::metadata(&path)
307        .await
308        .ok()
309        .and_then(|m| m.modified().ok())
310        .and_then(|t| {
311            t.duration_since(std::time::UNIX_EPOCH)
312                .ok()
313                .map(|d| chrono::DateTime::<Utc>::from_timestamp_millis(d.as_millis() as i64))
314        })
315        .flatten();
316    let age_ms = captured
317        .map(|c| (Utc::now() - c).num_milliseconds().max(0))
318        .unwrap_or(0);
319
320    Response::builder()
321        .header(header::CONTENT_TYPE, "image/jpeg")
322        .header(header::CACHE_CONTROL, "no-store")
323        .header("x-frame-age-ms", age_ms.to_string())
324        .header(
325            "x-frame-captured-at",
326            captured.map(|c| c.to_rfc3339()).unwrap_or_default(),
327        )
328        .body(Body::from(bytes))
329        .map_err(|e| AppError::Other(anyhow::anyhow!("building response: {e}")))
330}
331
332#[derive(Debug, Deserialize)]
333struct DetectionQuery {
334    from: Option<String>,
335    to: Option<String>,
336    label: Option<String>,
337    limit: Option<i64>,
338}
339
340async fn list_detections(
341    State(st): State<AppState>,
342    principal: crate::auth::Principal,
343    Path(id): Path<String>,
344    Query(q): Query<DetectionQuery>,
345) -> AppResult<Json<Vec<Detection>>> {
346    principal.require(principal.can_view(), "read detections")?;
347    let _ = load_camera(&st.pool, &id).await?;
348    let limit = q.limit.unwrap_or(200).clamp(1, 5000);
349    let from = parse_opt_ts(&q.from, "from")?;
350    let to = parse_opt_ts(&q.to, "to")?;
351    let rows = sqlx::query_as::<_, Detection>(
352        "SELECT * FROM detections
353         WHERE camera_id = ?
354           AND (? IS NULL OR timestamp >= ?)
355           AND (? IS NULL OR timestamp <= ?)
356           AND (? IS NULL OR label = ?)
357         ORDER BY timestamp DESC LIMIT ?",
358    )
359    .bind(&id)
360    .bind(from)
361    .bind(from)
362    .bind(to)
363    .bind(to)
364    .bind(&q.label)
365    .bind(&q.label)
366    .bind(limit)
367    .fetch_all(&st.pool)
368    .await?;
369    Ok(Json(rows))
370}
371
372/// Max detections accepted in a single ingest request (DoS / write-amplification bound).
373const MAX_INGEST_DETECTIONS: usize = 1000;
374
375/// Hard cap on the ingest request body, enforced by the framework BEFORE deserialization (defense
376/// in depth vs the post-parse count guard). 8 MiB comfortably fits MAX_INGEST_DETECTIONS detections
377/// with bounding boxes + attributes, while refusing a body crafted to exhaust memory.
378const INGEST_BODY_LIMIT_BYTES: usize = 8 * 1024 * 1024;
379
380/// Columns bound per detection row in the batched INSERT in [`ingest`].
381const DETECTION_INSERT_COLS: usize = 11;
382/// SQLite's compile-time bound-variable ceiling (SQLITE_MAX_VARIABLE_NUMBER). The batched insert is
383/// chunked so a single statement never exceeds it, even at [`MAX_INGEST_DETECTIONS`].
384const SQLITE_MAX_BIND_VARS: usize = 999;
385/// Detection rows per INSERT statement (≈90), keeping bound variables under [`SQLITE_MAX_BIND_VARS`].
386const DETECTION_INSERT_CHUNK: usize = SQLITE_MAX_BIND_VARS / DETECTION_INSERT_COLS;
387
388/// Ingest detections (and an optional event) posted by an AI worker. Detections are written in a
389/// single transaction so a batch is all-or-nothing.
390async fn ingest(
391    State(st): State<AppState>,
392    principal: crate::auth::Principal,
393    Json(body): Json<AiIngest>,
394) -> AppResult<Json<Value>> {
395    principal.require(principal.can_ingest(), "ingest perception events")?;
396    let cam = load_camera(&st.pool, &body.camera_id).await?;
397    if body.task_type.trim().is_empty() {
398        return Err(AppError::BadRequest("`task_type` is required".into()));
399    }
400    if body.detections.len() > MAX_INGEST_DETECTIONS {
401        return Err(AppError::BadRequest(format!(
402            "too many detections in one request ({}); max {MAX_INGEST_DETECTIONS}",
403            body.detections.len()
404        )));
405    }
406    let ts = parse_opt_ts(&body.timestamp, "timestamp")?.unwrap_or_else(Utc::now);
407
408    let mut inserted = 0u64;
409    let mut tx = st.pool.begin().await?;
410    // Idempotency + atomic capture: record the batch in the outbox FIRST, in the same transaction.
411    // A duplicate (camera_id, frame_id) — i.e. an at-least-once redelivery — conflicts and inserts 0
412    // rows; we then skip both the detection writes and the consumer fan-out, so a replayed batch can
413    // never double-count ANPR votes or corrupt zone state. With no frame_id every batch is accepted.
414    let outbox_res = sqlx::query(
415        "INSERT INTO outbox (topic, camera_id, site_id, frame_id, task_type, detection_count, created_at)
416         VALUES ('detections', ?, ?, ?, ?, ?, ?)
417         ON CONFLICT DO NOTHING",
418    )
419    .bind(&body.camera_id)
420    .bind(&cam.site_id)
421    .bind(&body.frame_id)
422    .bind(&body.task_type)
423    .bind(body.detections.len() as i64)
424    .bind(Utc::now())
425    .execute(&mut *tx)
426    .await?;
427    if outbox_res.rows_affected() == 0 {
428        // Duplicate frame already ingested — no-op (idempotent).
429        tx.commit().await?;
430        return Ok(Json(json!({ "detections_ingested": 0, "duplicate": true })));
431    }
432    // Batched multi-row insert: one INSERT per chunk instead of one statement per detection. Same
433    // columns, values, and semantics as the prior per-row loop, still inside the transaction. We
434    // chunk so a single statement's bound-variable count stays under SQLite's limit even at
435    // MAX_INGEST_DETECTIONS.
436    for chunk in body.detections.chunks(DETECTION_INSERT_CHUNK) {
437        let tuples = vec!["(?,?,?,?,?,?,?,?,?,?,?)"; chunk.len()].join(",");
438        let sql = format!(
439            "INSERT INTO detections
440               (id, camera_id, task_type, timestamp, label, confidence, bbox, track_id, attributes, frame_id, created_at)
441             VALUES {tuples}"
442        );
443        let mut q = sqlx::query(&sql);
444        for d in chunk {
445            let bbox = d.bbox.clone().map(SqlxJson);
446            let attrs = SqlxJson(d.attributes.clone().unwrap_or_else(|| json!({})));
447            q = q
448                .bind(format!("det_{}", Uuid::new_v4().simple()))
449                .bind(&body.camera_id)
450                .bind(&body.task_type)
451                .bind(ts)
452                .bind(&d.label)
453                .bind(d.confidence)
454                .bind(bbox)
455                .bind(&d.track_id)
456                .bind(attrs)
457                .bind(&body.frame_id)
458                .bind(Utc::now());
459        }
460        inserted += q.execute(&mut *tx).await?.rows_affected();
461    }
462    tx.commit().await?;
463
464    // Fan the committed batch out to registered perception consumers (zones, ANPR/entry, future
465    // apps). The kernel does not know or branch on which apps exist — each consumer self-selects by
466    // task_type. Engines that need trustworthy timing use server time, not the worker timestamp.
467    //
468    // Durability: fan-out happens after commit, so a crash here would otherwise drop the consumer
469    // notification. `fan_out` claims each (consumer, frame) at-most-once; on success we mark the
470    // outbox batch fanned, and the `fanout` drainer replays any batch left un-fanned by a crash.
471    let batch = crate::services::consumer::DetectionBatch {
472        camera_id: &body.camera_id,
473        site_id: cam.site_id.as_deref(),
474        task_type: &body.task_type,
475        detections: &body.detections,
476        timestamp: ts,
477    };
478    let fanned = crate::services::consumer::fan_out(
479        &st.pool,
480        &st.consumers,
481        &batch,
482        body.frame_id.as_deref(),
483    )
484    .await;
485    if fanned {
486        if let Some(fid) = body.frame_id.as_deref() {
487            let _ = sqlx::query(
488                "UPDATE outbox SET fanned_out_at = ? \
489                 WHERE topic = 'detections' AND camera_id = ? AND frame_id = ? AND fanned_out_at IS NULL",
490            )
491            .bind(Utc::now())
492            .bind(&body.camera_id)
493            .bind(fid)
494            .execute(&st.pool)
495            .await;
496        }
497    }
498
499    if let Some(ev) = &body.event {
500        let severity = ev.severity.clone().unwrap_or_else(|| "info".into());
501        let payload = ev.payload.clone().unwrap_or_else(|| json!({}));
502        crate::repo::log_event(
503            &st.pool,
504            Some(&body.camera_id),
505            &ev.event_type,
506            &severity,
507            payload,
508        )
509        .await?;
510    }
511
512    Ok(Json(json!({ "detections_ingested": inserted })))
513}
514
515fn parse_opt_ts(s: &Option<String>, field: &str) -> AppResult<Option<chrono::DateTime<Utc>>> {
516    match s {
517        Some(v) => crate::util::parse_rfc3339(v)
518            .map(Some)
519            .ok_or_else(|| AppError::BadRequest(format!("invalid `{field}` timestamp"))),
520        None => Ok(None),
521    }
522}
523
524#[cfg(test)]
525mod tests {
526    use super::*;
527
528    #[test]
529    fn validate_profile_accepts_sub_and_main() {
530        assert!(validate_profile("sub").is_ok());
531        assert!(validate_profile("main").is_ok());
532    }
533
534    #[test]
535    fn validate_profile_rejects_other_values() {
536        // Case-sensitive and whitespace-sensitive: only the exact lowercase tokens pass.
537        for bad in ["", "Sub", "MAIN", " sub", "sub ", "substream", "foo"] {
538            match validate_profile(bad) {
539                Err(AppError::BadRequest(m)) => {
540                    assert!(
541                        m.contains("stream_profile"),
542                        "unexpected message for {bad:?}: {m}"
543                    );
544                }
545                other => panic!("expected BadRequest for {bad:?}, got {other:?}"),
546            }
547        }
548    }
549
550    #[test]
551    fn parse_opt_ts_none_is_ok_none() {
552        let out = parse_opt_ts(&None, "from").unwrap();
553        assert!(out.is_none());
554    }
555
556    #[test]
557    fn parse_opt_ts_valid_matches_util() {
558        let raw = "2026-06-13T05:02:19Z".to_string();
559        let parsed = parse_opt_ts(&Some(raw.clone()), "from").unwrap();
560        // parse_opt_ts is a thin wrapper over crate::util::parse_rfc3339: anchor to it.
561        assert_eq!(parsed, crate::util::parse_rfc3339(&raw));
562        assert_eq!(parsed.unwrap().to_rfc3339(), "2026-06-13T05:02:19+00:00");
563    }
564
565    #[test]
566    fn parse_opt_ts_invalid_reports_field() {
567        match parse_opt_ts(&Some("not-a-timestamp".to_string()), "to") {
568            Err(AppError::BadRequest(m)) => {
569                assert!(m.contains("to"), "message should name the field: {m}");
570                assert!(m.contains("timestamp"), "message: {m}");
571            }
572            other => panic!("expected BadRequest, got {other:?}"),
573        }
574    }
575
576    #[test]
577    fn max_ingest_detections_bound_is_stable() {
578        assert_eq!(MAX_INGEST_DETECTIONS, 1000);
579    }
580
581    async fn test_state() -> AppState {
582        let pool = sqlx::sqlite::SqlitePoolOptions::new()
583            .max_connections(1)
584            .connect("sqlite::memory:")
585            .await
586            .unwrap();
587        crate::db::run_migrations(&pool).await.unwrap();
588        let cfg = std::sync::Arc::new(crate::config::Config::from_env());
589        AppState {
590            recorder: crate::services::recorder::RecorderManager::new(pool.clone(), cfg.clone()),
591            sampler: crate::services::sampler::SamplerManager::new(pool.clone(), cfg.clone()),
592            mirror: None,
593            consumers: std::sync::Arc::new(Vec::new()),
594            modules: std::sync::Arc::new(Vec::new()),
595            catalog: std::sync::Arc::new(crate::services::registry::CatalogService::new(&cfg)),
596            http: reqwest::Client::new(),
597            started_at: chrono::Utc::now(),
598            pool,
599            cfg,
600        }
601    }
602
603    /// Re-creating the same task type on the same camera+profile returns the existing task (200), never
604    /// a duplicate — but a different stream profile is a distinct task.
605    #[tokio::test]
606    async fn create_ai_task_is_idempotent_per_slot() {
607        let st = test_state().await;
608        let now = Utc::now();
609        sqlx::query(
610            "INSERT INTO cameras (id, name, enabled, created_at, updated_at) VALUES (?,?,?,?,?)",
611        )
612        .bind("cam_x")
613        .bind("cam_x")
614        .bind(1)
615        .bind(now)
616        .bind(now)
617        .execute(&st.pool)
618        .await
619        .unwrap();
620        let body = || AiTaskCreate {
621            task_type: "detection".into(),
622            stream_profile: Some("sub".into()),
623            fps: Some(2.0),
624            width: Some(640),
625            config: None,
626            enabled: Some(true),
627        };
628        let mk = |b| {
629            create_task(
630                State(st.clone()),
631                Path("cam_x".into()),
632                Principal::system_admin(),
633                Json(b),
634            )
635        };
636
637        let (s1, Json(t1)) = mk(body()).await.unwrap();
638        let (s2, Json(t2)) = mk(body()).await.unwrap();
639        assert_eq!(s1, StatusCode::CREATED);
640        assert_eq!(s2, StatusCode::OK, "re-create returns the existing task");
641        assert_eq!(t1.id, t2.id, "no duplicate task created");
642        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM ai_tasks")
643            .fetch_one(&st.pool)
644            .await
645            .unwrap();
646        assert_eq!(count, 1, "still exactly one task");
647
648        // a different stream profile is a distinct slot → a new task
649        let mut other = body();
650        other.stream_profile = Some("main".into());
651        let (s3, _) = mk(other).await.unwrap();
652        assert_eq!(
653            s3,
654            StatusCode::CREATED,
655            "a different profile is a separate task"
656        );
657    }
658}