1use axum::body::Body;
5use axum::extract::{DefaultBodyLimit, Path, Query, State};
6use axum::http::{header, StatusCode};
7use axum::response::Response;
8use axum::routing::{get, post};
9use axum::{Json, Router};
10use chrono::Utc;
11use serde::{Deserialize, Serialize};
12use serde_json::{json, Value};
13use sqlx::types::Json as SqlxJson;
14use uuid::Uuid;
15
16use crate::auth::{self, Principal};
17use crate::error::{AppError, AppResult};
18use crate::models::{AiIngest, AiTask, AiTaskCreate, AiTaskUpdate, Detection};
19use crate::routes::cameras::load_camera;
20use crate::services::sampler::SamplerInfo;
21use crate::state::AppState;
22
23pub fn router() -> Router<AppState> {
24 Router::new()
25 .route(
26 "/api/v1/cameras/{id}/ai-tasks",
27 get(list_camera_tasks).post(create_task),
28 )
29 .route(
30 "/api/v1/ai-tasks/{task_id}",
31 axum::routing::patch(update_task).delete(delete_task),
32 )
33 .route("/api/v1/ai/tasks", get(list_all_tasks))
34 .route("/api/v1/ai/samplers", get(sampler_status))
35 .route(
39 "/api/v1/ai/events",
40 post(ingest).layer(DefaultBodyLimit::max(INGEST_BODY_LIMIT_BYTES)),
41 )
42 .route("/api/v1/cameras/{id}/frame", get(latest_frame))
43 .route("/api/v1/cameras/{id}/detections", get(list_detections))
44}
45
46fn validate_profile(p: &str) -> AppResult<()> {
47 if matches!(p, "sub" | "main") {
48 Ok(())
49 } else {
50 Err(AppError::BadRequest(
51 "`stream_profile` must be 'sub' or 'main'".into(),
52 ))
53 }
54}
55
56async fn list_camera_tasks(
57 State(st): State<AppState>,
58 Path(id): Path<String>,
59 principal: Principal,
60) -> AppResult<Json<Vec<AiTask>>> {
61 principal.require(principal.can_view(), "view AI tasks")?;
62 let _ = load_camera(&st.pool, &id).await?;
63 let tasks = sqlx::query_as::<_, AiTask>(
64 "SELECT * FROM ai_tasks WHERE camera_id = ? ORDER BY created_at ASC",
65 )
66 .bind(&id)
67 .fetch_all(&st.pool)
68 .await?;
69 Ok(Json(tasks))
70}
71
72async fn create_task(
73 State(st): State<AppState>,
74 Path(id): Path<String>,
75 principal: Principal,
76 Json(body): Json<AiTaskCreate>,
77) -> AppResult<(StatusCode, Json<AiTask>)> {
78 principal.require(principal.can_manage_registry(), "create AI tasks")?;
79 let _ = load_camera(&st.pool, &id).await?;
80 if body.task_type.trim().is_empty() {
81 return Err(AppError::BadRequest("`task_type` is required".into()));
82 }
83 let profile = body.stream_profile.unwrap_or_else(|| "sub".into());
84 validate_profile(&profile)?;
85 let fps = body.fps.unwrap_or(st.cfg.default_ai_fps).clamp(0.1, 30.0);
86 let width = body
87 .width
88 .unwrap_or(st.cfg.default_ai_width)
89 .clamp(160, 3840);
90 let enabled = body.enabled.unwrap_or(true);
91 let config = SqlxJson(body.config.unwrap_or_else(|| json!({})));
92
93 if let Some(existing) = sqlx::query_as::<_, AiTask>(
98 "SELECT * FROM ai_tasks WHERE camera_id = ? AND task_type = ? AND stream_profile = ?",
99 )
100 .bind(&id)
101 .bind(&body.task_type)
102 .bind(&profile)
103 .fetch_optional(&st.pool)
104 .await?
105 {
106 return Ok((StatusCode::OK, Json(existing)));
107 }
108
109 let now = Utc::now();
110 let task_id = format!("ai_{}", Uuid::new_v4().simple());
111
112 sqlx::query(
113 "INSERT INTO ai_tasks
114 (id, camera_id, task_type, enabled, stream_profile, fps, width, config, created_at, updated_at)
115 VALUES (?,?,?,?,?,?,?,?,?,?)",
116 )
117 .bind(&task_id)
118 .bind(&id)
119 .bind(&body.task_type)
120 .bind(enabled)
121 .bind(&profile)
122 .bind(fps)
123 .bind(width)
124 .bind(config)
125 .bind(now)
126 .bind(now)
127 .execute(&st.pool)
128 .await?;
129
130 st.sampler.reconcile().await;
131 let task = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
132 .bind(&task_id)
133 .fetch_one(&st.pool)
134 .await?;
135 auth::audit(
136 &st.pool,
137 &principal,
138 "create_ai_task",
139 "ai_task",
140 &task_id,
141 json!({ "camera_id": &id, "task_type": &task.task_type }),
142 )
143 .await;
144 Ok((StatusCode::CREATED, Json(task)))
145}
146
147async fn update_task(
148 State(st): State<AppState>,
149 Path(task_id): Path<String>,
150 principal: Principal,
151 Json(body): Json<AiTaskUpdate>,
152) -> AppResult<Json<AiTask>> {
153 principal.require(principal.can_manage_registry(), "update AI tasks")?;
154 let cur = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
155 .bind(&task_id)
156 .fetch_optional(&st.pool)
157 .await?
158 .ok_or_else(|| AppError::NotFound(format!("ai task {task_id} not found")))?;
159
160 let task_type = body.task_type.unwrap_or(cur.task_type);
161 let profile = body.stream_profile.unwrap_or(cur.stream_profile);
162 validate_profile(&profile)?;
163 let fps = body.fps.map(|v| v.clamp(0.1, 30.0)).unwrap_or(cur.fps);
164 let width = body.width.map(|v| v.clamp(160, 3840)).unwrap_or(cur.width);
165 let enabled = body.enabled.unwrap_or(cur.enabled);
166 let config = SqlxJson(body.config.unwrap_or(cur.config.0));
167
168 sqlx::query(
169 "UPDATE ai_tasks SET task_type=?, stream_profile=?, fps=?, width=?, enabled=?, config=?, updated_at=?
170 WHERE id=?",
171 )
172 .bind(&task_type)
173 .bind(&profile)
174 .bind(fps)
175 .bind(width)
176 .bind(enabled)
177 .bind(config)
178 .bind(Utc::now())
179 .bind(&task_id)
180 .execute(&st.pool)
181 .await?;
182
183 st.sampler.reconcile().await;
184 let task = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
185 .bind(&task_id)
186 .fetch_one(&st.pool)
187 .await?;
188 auth::audit(
189 &st.pool,
190 &principal,
191 "update_ai_task",
192 "ai_task",
193 &task_id,
194 json!({}),
195 )
196 .await;
197 Ok(Json(task))
198}
199
200async fn delete_task(
201 State(st): State<AppState>,
202 Path(task_id): Path<String>,
203 principal: Principal,
204) -> AppResult<StatusCode> {
205 principal.require(principal.can_manage_registry(), "delete AI tasks")?;
206 let res = sqlx::query("DELETE FROM ai_tasks WHERE id = ?")
207 .bind(&task_id)
208 .execute(&st.pool)
209 .await?;
210 if res.rows_affected() == 0 {
211 return Err(AppError::NotFound(format!("ai task {task_id} not found")));
212 }
213 st.sampler.reconcile().await;
214 auth::audit(
215 &st.pool,
216 &principal,
217 "delete_ai_task",
218 "ai_task",
219 &task_id,
220 json!({}),
221 )
222 .await;
223 Ok(StatusCode::NO_CONTENT)
224}
225
226#[derive(Debug, Serialize)]
227struct WorkerTask {
228 id: String,
229 camera_id: String,
230 task_type: String,
231 stream_profile: String,
232 fps: f64,
233 width: i64,
234 config: Value,
235 frame_url: String,
236}
237
238async fn list_all_tasks(
240 State(st): State<AppState>,
241 principal: crate::auth::Principal,
242) -> AppResult<Json<Vec<WorkerTask>>> {
243 principal.require(principal.can_view(), "discover AI tasks")?;
246 let tasks = sqlx::query_as::<_, AiTask>(
247 "SELECT t.* FROM ai_tasks t JOIN cameras c ON c.id = t.camera_id
248 WHERE t.enabled = 1 AND c.enabled = 1
249 ORDER BY t.camera_id ASC",
250 )
251 .fetch_all(&st.pool)
252 .await?;
253 let out = tasks
254 .into_iter()
255 .map(|t| WorkerTask {
256 frame_url: format!(
257 "/api/v1/cameras/{}/frame?profile={}",
258 t.camera_id, t.stream_profile
259 ),
260 id: t.id,
261 camera_id: t.camera_id,
262 task_type: t.task_type,
263 stream_profile: t.stream_profile,
264 fps: t.fps,
265 width: t.width,
266 config: t.config.0,
267 })
268 .collect();
269 Ok(Json(out))
270}
271
272async fn sampler_status(
273 State(st): State<AppState>,
274 principal: Principal,
275) -> AppResult<Json<Vec<SamplerInfo>>> {
276 principal.require(principal.can_view(), "view sampler status")?;
277 Ok(Json(st.sampler.statuses().await))
278}
279
280#[derive(Debug, Deserialize)]
281struct FrameQuery {
282 profile: Option<String>,
283}
284
285async fn latest_frame(
287 State(st): State<AppState>,
288 principal: crate::auth::Principal,
289 Path(id): Path<String>,
290 Query(q): Query<FrameQuery>,
291) -> AppResult<Response> {
292 principal.require(principal.can_view(), "read camera frames")?;
296 if id.contains('/') || id.contains('\\') || id.contains("..") {
298 return Err(AppError::BadRequest("invalid camera id".into()));
299 }
300 let profile = q.profile.unwrap_or_else(|| "sub".into());
301 validate_profile(&profile)?;
302 let path = st.sampler.frame_path(&id, &profile);
303 let bytes = tokio::fs::read(&path).await.map_err(|_| {
304 AppError::NotFound("no sampled frame yet (is an AI task enabled for this camera?)".into())
305 })?;
306 let captured = tokio::fs::metadata(&path)
307 .await
308 .ok()
309 .and_then(|m| m.modified().ok())
310 .and_then(|t| {
311 t.duration_since(std::time::UNIX_EPOCH)
312 .ok()
313 .map(|d| chrono::DateTime::<Utc>::from_timestamp_millis(d.as_millis() as i64))
314 })
315 .flatten();
316 let age_ms = captured
317 .map(|c| (Utc::now() - c).num_milliseconds().max(0))
318 .unwrap_or(0);
319
320 Response::builder()
321 .header(header::CONTENT_TYPE, "image/jpeg")
322 .header(header::CACHE_CONTROL, "no-store")
323 .header("x-frame-age-ms", age_ms.to_string())
324 .header(
325 "x-frame-captured-at",
326 captured.map(|c| c.to_rfc3339()).unwrap_or_default(),
327 )
328 .body(Body::from(bytes))
329 .map_err(|e| AppError::Other(anyhow::anyhow!("building response: {e}")))
330}
331
332#[derive(Debug, Deserialize)]
333struct DetectionQuery {
334 from: Option<String>,
335 to: Option<String>,
336 label: Option<String>,
337 limit: Option<i64>,
338}
339
340async fn list_detections(
341 State(st): State<AppState>,
342 principal: crate::auth::Principal,
343 Path(id): Path<String>,
344 Query(q): Query<DetectionQuery>,
345) -> AppResult<Json<Vec<Detection>>> {
346 principal.require(principal.can_view(), "read detections")?;
347 let _ = load_camera(&st.pool, &id).await?;
348 let limit = q.limit.unwrap_or(200).clamp(1, 5000);
349 let from = parse_opt_ts(&q.from, "from")?;
350 let to = parse_opt_ts(&q.to, "to")?;
351 let rows = sqlx::query_as::<_, Detection>(
352 "SELECT * FROM detections
353 WHERE camera_id = ?
354 AND (? IS NULL OR timestamp >= ?)
355 AND (? IS NULL OR timestamp <= ?)
356 AND (? IS NULL OR label = ?)
357 ORDER BY timestamp DESC LIMIT ?",
358 )
359 .bind(&id)
360 .bind(from)
361 .bind(from)
362 .bind(to)
363 .bind(to)
364 .bind(&q.label)
365 .bind(&q.label)
366 .bind(limit)
367 .fetch_all(&st.pool)
368 .await?;
369 Ok(Json(rows))
370}
371
372const MAX_INGEST_DETECTIONS: usize = 1000;
374
375const INGEST_BODY_LIMIT_BYTES: usize = 8 * 1024 * 1024;
379
380const DETECTION_INSERT_COLS: usize = 11;
382const SQLITE_MAX_BIND_VARS: usize = 999;
385const DETECTION_INSERT_CHUNK: usize = SQLITE_MAX_BIND_VARS / DETECTION_INSERT_COLS;
387
388async fn ingest(
391 State(st): State<AppState>,
392 principal: crate::auth::Principal,
393 Json(body): Json<AiIngest>,
394) -> AppResult<Json<Value>> {
395 principal.require(principal.can_ingest(), "ingest perception events")?;
396 let cam = load_camera(&st.pool, &body.camera_id).await?;
397 if body.task_type.trim().is_empty() {
398 return Err(AppError::BadRequest("`task_type` is required".into()));
399 }
400 if body.detections.len() > MAX_INGEST_DETECTIONS {
401 return Err(AppError::BadRequest(format!(
402 "too many detections in one request ({}); max {MAX_INGEST_DETECTIONS}",
403 body.detections.len()
404 )));
405 }
406 let ts = parse_opt_ts(&body.timestamp, "timestamp")?.unwrap_or_else(Utc::now);
407
408 let mut inserted = 0u64;
409 let mut tx = st.pool.begin().await?;
410 let outbox_res = sqlx::query(
415 "INSERT INTO outbox (topic, camera_id, site_id, frame_id, task_type, detection_count, created_at)
416 VALUES ('detections', ?, ?, ?, ?, ?, ?)
417 ON CONFLICT DO NOTHING",
418 )
419 .bind(&body.camera_id)
420 .bind(&cam.site_id)
421 .bind(&body.frame_id)
422 .bind(&body.task_type)
423 .bind(body.detections.len() as i64)
424 .bind(Utc::now())
425 .execute(&mut *tx)
426 .await?;
427 if outbox_res.rows_affected() == 0 {
428 tx.commit().await?;
430 return Ok(Json(json!({ "detections_ingested": 0, "duplicate": true })));
431 }
432 for chunk in body.detections.chunks(DETECTION_INSERT_CHUNK) {
437 let tuples = vec!["(?,?,?,?,?,?,?,?,?,?,?)"; chunk.len()].join(",");
438 let sql = format!(
439 "INSERT INTO detections
440 (id, camera_id, task_type, timestamp, label, confidence, bbox, track_id, attributes, frame_id, created_at)
441 VALUES {tuples}"
442 );
443 let mut q = sqlx::query(&sql);
444 for d in chunk {
445 let bbox = d.bbox.clone().map(SqlxJson);
446 let attrs = SqlxJson(d.attributes.clone().unwrap_or_else(|| json!({})));
447 q = q
448 .bind(format!("det_{}", Uuid::new_v4().simple()))
449 .bind(&body.camera_id)
450 .bind(&body.task_type)
451 .bind(ts)
452 .bind(&d.label)
453 .bind(d.confidence)
454 .bind(bbox)
455 .bind(&d.track_id)
456 .bind(attrs)
457 .bind(&body.frame_id)
458 .bind(Utc::now());
459 }
460 inserted += q.execute(&mut *tx).await?.rows_affected();
461 }
462 tx.commit().await?;
463
464 let batch = crate::services::consumer::DetectionBatch {
472 camera_id: &body.camera_id,
473 site_id: cam.site_id.as_deref(),
474 task_type: &body.task_type,
475 detections: &body.detections,
476 timestamp: ts,
477 };
478 let fanned = crate::services::consumer::fan_out(
479 &st.pool,
480 &st.consumers,
481 &batch,
482 body.frame_id.as_deref(),
483 )
484 .await;
485 if fanned {
486 if let Some(fid) = body.frame_id.as_deref() {
487 let _ = sqlx::query(
488 "UPDATE outbox SET fanned_out_at = ? \
489 WHERE topic = 'detections' AND camera_id = ? AND frame_id = ? AND fanned_out_at IS NULL",
490 )
491 .bind(Utc::now())
492 .bind(&body.camera_id)
493 .bind(fid)
494 .execute(&st.pool)
495 .await;
496 }
497 }
498
499 if let Some(ev) = &body.event {
500 let severity = ev.severity.clone().unwrap_or_else(|| "info".into());
501 let payload = ev.payload.clone().unwrap_or_else(|| json!({}));
502 crate::repo::log_event(
503 &st.pool,
504 Some(&body.camera_id),
505 &ev.event_type,
506 &severity,
507 payload,
508 )
509 .await?;
510 }
511
512 Ok(Json(json!({ "detections_ingested": inserted })))
513}
514
515fn parse_opt_ts(s: &Option<String>, field: &str) -> AppResult<Option<chrono::DateTime<Utc>>> {
516 match s {
517 Some(v) => crate::util::parse_rfc3339(v)
518 .map(Some)
519 .ok_or_else(|| AppError::BadRequest(format!("invalid `{field}` timestamp"))),
520 None => Ok(None),
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527
528 #[test]
529 fn validate_profile_accepts_sub_and_main() {
530 assert!(validate_profile("sub").is_ok());
531 assert!(validate_profile("main").is_ok());
532 }
533
534 #[test]
535 fn validate_profile_rejects_other_values() {
536 for bad in ["", "Sub", "MAIN", " sub", "sub ", "substream", "foo"] {
538 match validate_profile(bad) {
539 Err(AppError::BadRequest(m)) => {
540 assert!(
541 m.contains("stream_profile"),
542 "unexpected message for {bad:?}: {m}"
543 );
544 }
545 other => panic!("expected BadRequest for {bad:?}, got {other:?}"),
546 }
547 }
548 }
549
550 #[test]
551 fn parse_opt_ts_none_is_ok_none() {
552 let out = parse_opt_ts(&None, "from").unwrap();
553 assert!(out.is_none());
554 }
555
556 #[test]
557 fn parse_opt_ts_valid_matches_util() {
558 let raw = "2026-06-13T05:02:19Z".to_string();
559 let parsed = parse_opt_ts(&Some(raw.clone()), "from").unwrap();
560 assert_eq!(parsed, crate::util::parse_rfc3339(&raw));
562 assert_eq!(parsed.unwrap().to_rfc3339(), "2026-06-13T05:02:19+00:00");
563 }
564
565 #[test]
566 fn parse_opt_ts_invalid_reports_field() {
567 match parse_opt_ts(&Some("not-a-timestamp".to_string()), "to") {
568 Err(AppError::BadRequest(m)) => {
569 assert!(m.contains("to"), "message should name the field: {m}");
570 assert!(m.contains("timestamp"), "message: {m}");
571 }
572 other => panic!("expected BadRequest, got {other:?}"),
573 }
574 }
575
576 #[test]
577 fn max_ingest_detections_bound_is_stable() {
578 assert_eq!(MAX_INGEST_DETECTIONS, 1000);
579 }
580
581 async fn test_state() -> AppState {
582 let pool = sqlx::sqlite::SqlitePoolOptions::new()
583 .max_connections(1)
584 .connect("sqlite::memory:")
585 .await
586 .unwrap();
587 crate::db::run_migrations(&pool).await.unwrap();
588 let cfg = std::sync::Arc::new(crate::config::Config::from_env());
589 AppState {
590 recorder: crate::services::recorder::RecorderManager::new(pool.clone(), cfg.clone()),
591 sampler: crate::services::sampler::SamplerManager::new(pool.clone(), cfg.clone()),
592 mirror: None,
593 consumers: std::sync::Arc::new(Vec::new()),
594 modules: std::sync::Arc::new(Vec::new()),
595 catalog: std::sync::Arc::new(crate::services::registry::CatalogService::new(&cfg)),
596 http: reqwest::Client::new(),
597 started_at: chrono::Utc::now(),
598 pool,
599 cfg,
600 }
601 }
602
603 #[tokio::test]
606 async fn create_ai_task_is_idempotent_per_slot() {
607 let st = test_state().await;
608 let now = Utc::now();
609 sqlx::query(
610 "INSERT INTO cameras (id, name, enabled, created_at, updated_at) VALUES (?,?,?,?,?)",
611 )
612 .bind("cam_x")
613 .bind("cam_x")
614 .bind(1)
615 .bind(now)
616 .bind(now)
617 .execute(&st.pool)
618 .await
619 .unwrap();
620 let body = || AiTaskCreate {
621 task_type: "detection".into(),
622 stream_profile: Some("sub".into()),
623 fps: Some(2.0),
624 width: Some(640),
625 config: None,
626 enabled: Some(true),
627 };
628 let mk = |b| {
629 create_task(
630 State(st.clone()),
631 Path("cam_x".into()),
632 Principal::system_admin(),
633 Json(b),
634 )
635 };
636
637 let (s1, Json(t1)) = mk(body()).await.unwrap();
638 let (s2, Json(t2)) = mk(body()).await.unwrap();
639 assert_eq!(s1, StatusCode::CREATED);
640 assert_eq!(s2, StatusCode::OK, "re-create returns the existing task");
641 assert_eq!(t1.id, t2.id, "no duplicate task created");
642 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM ai_tasks")
643 .fetch_one(&st.pool)
644 .await
645 .unwrap();
646 assert_eq!(count, 1, "still exactly one task");
647
648 let mut other = body();
650 other.stream_profile = Some("main".into());
651 let (s3, _) = mk(other).await.unwrap();
652 assert_eq!(
653 s3,
654 StatusCode::CREATED,
655 "a different profile is a separate task"
656 );
657 }
658}