1use axum::body::Body;
5use axum::extract::{DefaultBodyLimit, Path, Query, State};
6use axum::http::{header, StatusCode};
7use axum::response::Response;
8use axum::routing::{get, post};
9use axum::{Json, Router};
10use chrono::Utc;
11use serde::{Deserialize, Serialize};
12use serde_json::{json, Value};
13use sqlx::types::Json as SqlxJson;
14use uuid::Uuid;
15
16use crate::auth::{self, Principal};
17use crate::error::{AppError, AppResult};
18use crate::models::{AiIngest, AiTask, AiTaskCreate, AiTaskUpdate, Detection};
19use crate::routes::cameras::load_camera;
20use crate::services::sampler::SamplerInfo;
21use crate::state::AppState;
22
23pub fn router() -> Router<AppState> {
24 Router::new()
25 .route(
26 "/api/v1/cameras/{id}/ai-tasks",
27 get(list_camera_tasks).post(create_task),
28 )
29 .route(
30 "/api/v1/ai-tasks/{task_id}",
31 axum::routing::patch(update_task).delete(delete_task),
32 )
33 .route("/api/v1/ai/tasks", get(list_all_tasks))
34 .route("/api/v1/ai/samplers", get(sampler_status))
35 .route(
39 "/api/v1/ai/events",
40 post(ingest).layer(DefaultBodyLimit::max(INGEST_BODY_LIMIT_BYTES)),
41 )
42 .route("/api/v1/cameras/{id}/frame", get(latest_frame))
43 .route("/api/v1/cameras/{id}/detections", get(list_detections))
44}
45
46fn validate_profile(p: &str) -> AppResult<()> {
47 if matches!(p, "sub" | "main") {
48 Ok(())
49 } else {
50 Err(AppError::BadRequest(
51 "`stream_profile` must be 'sub' or 'main'".into(),
52 ))
53 }
54}
55
56async fn list_camera_tasks(
57 State(st): State<AppState>,
58 Path(id): Path<String>,
59 principal: Principal,
60) -> AppResult<Json<Vec<AiTask>>> {
61 principal.require(principal.can_view(), "view AI tasks")?;
62 let _ = load_camera(&st.pool, &id).await?;
63 let tasks = sqlx::query_as::<_, AiTask>(
64 "SELECT * FROM ai_tasks WHERE camera_id = ? ORDER BY created_at ASC",
65 )
66 .bind(&id)
67 .fetch_all(&st.pool)
68 .await?;
69 Ok(Json(tasks))
70}
71
72async fn create_task(
73 State(st): State<AppState>,
74 Path(id): Path<String>,
75 principal: Principal,
76 Json(body): Json<AiTaskCreate>,
77) -> AppResult<(StatusCode, Json<AiTask>)> {
78 principal.require(principal.can_manage_registry(), "create AI tasks")?;
79 let _ = load_camera(&st.pool, &id).await?;
80 if body.task_type.trim().is_empty() {
81 return Err(AppError::BadRequest("`task_type` is required".into()));
82 }
83 let profile = body.stream_profile.unwrap_or_else(|| "sub".into());
84 validate_profile(&profile)?;
85 let fps = body.fps.unwrap_or(st.cfg.default_ai_fps).clamp(0.1, 30.0);
86 let width = body
87 .width
88 .unwrap_or(st.cfg.default_ai_width)
89 .clamp(160, 3840);
90 let enabled = body.enabled.unwrap_or(true);
91 let config = SqlxJson(body.config.unwrap_or_else(|| json!({})));
92 let now = Utc::now();
93 let task_id = format!("ai_{}", Uuid::new_v4().simple());
94
95 sqlx::query(
96 "INSERT INTO ai_tasks
97 (id, camera_id, task_type, enabled, stream_profile, fps, width, config, created_at, updated_at)
98 VALUES (?,?,?,?,?,?,?,?,?,?)",
99 )
100 .bind(&task_id)
101 .bind(&id)
102 .bind(&body.task_type)
103 .bind(enabled)
104 .bind(&profile)
105 .bind(fps)
106 .bind(width)
107 .bind(config)
108 .bind(now)
109 .bind(now)
110 .execute(&st.pool)
111 .await?;
112
113 st.sampler.reconcile().await;
114 let task = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
115 .bind(&task_id)
116 .fetch_one(&st.pool)
117 .await?;
118 auth::audit(
119 &st.pool,
120 &principal,
121 "create_ai_task",
122 "ai_task",
123 &task_id,
124 json!({ "camera_id": &id, "task_type": &task.task_type }),
125 )
126 .await;
127 Ok((StatusCode::CREATED, Json(task)))
128}
129
130async fn update_task(
131 State(st): State<AppState>,
132 Path(task_id): Path<String>,
133 principal: Principal,
134 Json(body): Json<AiTaskUpdate>,
135) -> AppResult<Json<AiTask>> {
136 principal.require(principal.can_manage_registry(), "update AI tasks")?;
137 let cur = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
138 .bind(&task_id)
139 .fetch_optional(&st.pool)
140 .await?
141 .ok_or_else(|| AppError::NotFound(format!("ai task {task_id} not found")))?;
142
143 let task_type = body.task_type.unwrap_or(cur.task_type);
144 let profile = body.stream_profile.unwrap_or(cur.stream_profile);
145 validate_profile(&profile)?;
146 let fps = body.fps.map(|v| v.clamp(0.1, 30.0)).unwrap_or(cur.fps);
147 let width = body.width.map(|v| v.clamp(160, 3840)).unwrap_or(cur.width);
148 let enabled = body.enabled.unwrap_or(cur.enabled);
149 let config = SqlxJson(body.config.unwrap_or(cur.config.0));
150
151 sqlx::query(
152 "UPDATE ai_tasks SET task_type=?, stream_profile=?, fps=?, width=?, enabled=?, config=?, updated_at=?
153 WHERE id=?",
154 )
155 .bind(&task_type)
156 .bind(&profile)
157 .bind(fps)
158 .bind(width)
159 .bind(enabled)
160 .bind(config)
161 .bind(Utc::now())
162 .bind(&task_id)
163 .execute(&st.pool)
164 .await?;
165
166 st.sampler.reconcile().await;
167 let task = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
168 .bind(&task_id)
169 .fetch_one(&st.pool)
170 .await?;
171 auth::audit(
172 &st.pool,
173 &principal,
174 "update_ai_task",
175 "ai_task",
176 &task_id,
177 json!({}),
178 )
179 .await;
180 Ok(Json(task))
181}
182
183async fn delete_task(
184 State(st): State<AppState>,
185 Path(task_id): Path<String>,
186 principal: Principal,
187) -> AppResult<StatusCode> {
188 principal.require(principal.can_manage_registry(), "delete AI tasks")?;
189 let res = sqlx::query("DELETE FROM ai_tasks WHERE id = ?")
190 .bind(&task_id)
191 .execute(&st.pool)
192 .await?;
193 if res.rows_affected() == 0 {
194 return Err(AppError::NotFound(format!("ai task {task_id} not found")));
195 }
196 st.sampler.reconcile().await;
197 auth::audit(
198 &st.pool,
199 &principal,
200 "delete_ai_task",
201 "ai_task",
202 &task_id,
203 json!({}),
204 )
205 .await;
206 Ok(StatusCode::NO_CONTENT)
207}
208
209#[derive(Debug, Serialize)]
210struct WorkerTask {
211 id: String,
212 camera_id: String,
213 task_type: String,
214 stream_profile: String,
215 fps: f64,
216 width: i64,
217 config: Value,
218 frame_url: String,
219}
220
221async fn list_all_tasks(
223 State(st): State<AppState>,
224 principal: crate::auth::Principal,
225) -> AppResult<Json<Vec<WorkerTask>>> {
226 principal.require(principal.can_view(), "discover AI tasks")?;
229 let tasks = sqlx::query_as::<_, AiTask>(
230 "SELECT t.* FROM ai_tasks t JOIN cameras c ON c.id = t.camera_id
231 WHERE t.enabled = 1 AND c.enabled = 1
232 ORDER BY t.camera_id ASC",
233 )
234 .fetch_all(&st.pool)
235 .await?;
236 let out = tasks
237 .into_iter()
238 .map(|t| WorkerTask {
239 frame_url: format!(
240 "/api/v1/cameras/{}/frame?profile={}",
241 t.camera_id, t.stream_profile
242 ),
243 id: t.id,
244 camera_id: t.camera_id,
245 task_type: t.task_type,
246 stream_profile: t.stream_profile,
247 fps: t.fps,
248 width: t.width,
249 config: t.config.0,
250 })
251 .collect();
252 Ok(Json(out))
253}
254
255async fn sampler_status(
256 State(st): State<AppState>,
257 principal: Principal,
258) -> AppResult<Json<Vec<SamplerInfo>>> {
259 principal.require(principal.can_view(), "view sampler status")?;
260 Ok(Json(st.sampler.statuses().await))
261}
262
263#[derive(Debug, Deserialize)]
264struct FrameQuery {
265 profile: Option<String>,
266}
267
268async fn latest_frame(
270 State(st): State<AppState>,
271 principal: crate::auth::Principal,
272 Path(id): Path<String>,
273 Query(q): Query<FrameQuery>,
274) -> AppResult<Response> {
275 principal.require(principal.can_view(), "read camera frames")?;
279 if id.contains('/') || id.contains('\\') || id.contains("..") {
281 return Err(AppError::BadRequest("invalid camera id".into()));
282 }
283 let profile = q.profile.unwrap_or_else(|| "sub".into());
284 validate_profile(&profile)?;
285 let path = st.sampler.frame_path(&id, &profile);
286 let bytes = tokio::fs::read(&path).await.map_err(|_| {
287 AppError::NotFound("no sampled frame yet (is an AI task enabled for this camera?)".into())
288 })?;
289 let captured = tokio::fs::metadata(&path)
290 .await
291 .ok()
292 .and_then(|m| m.modified().ok())
293 .and_then(|t| {
294 t.duration_since(std::time::UNIX_EPOCH)
295 .ok()
296 .map(|d| chrono::DateTime::<Utc>::from_timestamp_millis(d.as_millis() as i64))
297 })
298 .flatten();
299 let age_ms = captured
300 .map(|c| (Utc::now() - c).num_milliseconds().max(0))
301 .unwrap_or(0);
302
303 Response::builder()
304 .header(header::CONTENT_TYPE, "image/jpeg")
305 .header(header::CACHE_CONTROL, "no-store")
306 .header("x-frame-age-ms", age_ms.to_string())
307 .header(
308 "x-frame-captured-at",
309 captured.map(|c| c.to_rfc3339()).unwrap_or_default(),
310 )
311 .body(Body::from(bytes))
312 .map_err(|e| AppError::Other(anyhow::anyhow!("building response: {e}")))
313}
314
315#[derive(Debug, Deserialize)]
316struct DetectionQuery {
317 from: Option<String>,
318 to: Option<String>,
319 label: Option<String>,
320 limit: Option<i64>,
321}
322
323async fn list_detections(
324 State(st): State<AppState>,
325 principal: crate::auth::Principal,
326 Path(id): Path<String>,
327 Query(q): Query<DetectionQuery>,
328) -> AppResult<Json<Vec<Detection>>> {
329 principal.require(principal.can_view(), "read detections")?;
330 let _ = load_camera(&st.pool, &id).await?;
331 let limit = q.limit.unwrap_or(200).clamp(1, 5000);
332 let from = parse_opt_ts(&q.from, "from")?;
333 let to = parse_opt_ts(&q.to, "to")?;
334 let rows = sqlx::query_as::<_, Detection>(
335 "SELECT * FROM detections
336 WHERE camera_id = ?
337 AND (? IS NULL OR timestamp >= ?)
338 AND (? IS NULL OR timestamp <= ?)
339 AND (? IS NULL OR label = ?)
340 ORDER BY timestamp DESC LIMIT ?",
341 )
342 .bind(&id)
343 .bind(from)
344 .bind(from)
345 .bind(to)
346 .bind(to)
347 .bind(&q.label)
348 .bind(&q.label)
349 .bind(limit)
350 .fetch_all(&st.pool)
351 .await?;
352 Ok(Json(rows))
353}
354
355const MAX_INGEST_DETECTIONS: usize = 1000;
357
358const INGEST_BODY_LIMIT_BYTES: usize = 8 * 1024 * 1024;
362
363const DETECTION_INSERT_COLS: usize = 11;
365const SQLITE_MAX_BIND_VARS: usize = 999;
368const DETECTION_INSERT_CHUNK: usize = SQLITE_MAX_BIND_VARS / DETECTION_INSERT_COLS;
370
371async fn ingest(
374 State(st): State<AppState>,
375 principal: crate::auth::Principal,
376 Json(body): Json<AiIngest>,
377) -> AppResult<Json<Value>> {
378 principal.require(principal.can_ingest(), "ingest perception events")?;
379 let cam = load_camera(&st.pool, &body.camera_id).await?;
380 if body.task_type.trim().is_empty() {
381 return Err(AppError::BadRequest("`task_type` is required".into()));
382 }
383 if body.detections.len() > MAX_INGEST_DETECTIONS {
384 return Err(AppError::BadRequest(format!(
385 "too many detections in one request ({}); max {MAX_INGEST_DETECTIONS}",
386 body.detections.len()
387 )));
388 }
389 let ts = parse_opt_ts(&body.timestamp, "timestamp")?.unwrap_or_else(Utc::now);
390
391 let mut inserted = 0u64;
392 let mut tx = st.pool.begin().await?;
393 let outbox_res = sqlx::query(
398 "INSERT INTO outbox (topic, camera_id, site_id, frame_id, task_type, detection_count, created_at)
399 VALUES ('detections', ?, ?, ?, ?, ?, ?)
400 ON CONFLICT DO NOTHING",
401 )
402 .bind(&body.camera_id)
403 .bind(&cam.site_id)
404 .bind(&body.frame_id)
405 .bind(&body.task_type)
406 .bind(body.detections.len() as i64)
407 .bind(Utc::now())
408 .execute(&mut *tx)
409 .await?;
410 if outbox_res.rows_affected() == 0 {
411 tx.commit().await?;
413 return Ok(Json(json!({ "detections_ingested": 0, "duplicate": true })));
414 }
415 for chunk in body.detections.chunks(DETECTION_INSERT_CHUNK) {
420 let tuples = vec!["(?,?,?,?,?,?,?,?,?,?,?)"; chunk.len()].join(",");
421 let sql = format!(
422 "INSERT INTO detections
423 (id, camera_id, task_type, timestamp, label, confidence, bbox, track_id, attributes, frame_id, created_at)
424 VALUES {tuples}"
425 );
426 let mut q = sqlx::query(&sql);
427 for d in chunk {
428 let bbox = d.bbox.clone().map(SqlxJson);
429 let attrs = SqlxJson(d.attributes.clone().unwrap_or_else(|| json!({})));
430 q = q
431 .bind(format!("det_{}", Uuid::new_v4().simple()))
432 .bind(&body.camera_id)
433 .bind(&body.task_type)
434 .bind(ts)
435 .bind(&d.label)
436 .bind(d.confidence)
437 .bind(bbox)
438 .bind(&d.track_id)
439 .bind(attrs)
440 .bind(&body.frame_id)
441 .bind(Utc::now());
442 }
443 inserted += q.execute(&mut *tx).await?.rows_affected();
444 }
445 tx.commit().await?;
446
447 let batch = crate::services::consumer::DetectionBatch {
455 camera_id: &body.camera_id,
456 site_id: cam.site_id.as_deref(),
457 task_type: &body.task_type,
458 detections: &body.detections,
459 timestamp: ts,
460 };
461 let fanned = crate::services::consumer::fan_out(
462 &st.pool,
463 &st.consumers,
464 &batch,
465 body.frame_id.as_deref(),
466 )
467 .await;
468 if fanned {
469 if let Some(fid) = body.frame_id.as_deref() {
470 let _ = sqlx::query(
471 "UPDATE outbox SET fanned_out_at = ? \
472 WHERE topic = 'detections' AND camera_id = ? AND frame_id = ? AND fanned_out_at IS NULL",
473 )
474 .bind(Utc::now())
475 .bind(&body.camera_id)
476 .bind(fid)
477 .execute(&st.pool)
478 .await;
479 }
480 }
481
482 if let Some(ev) = &body.event {
483 let severity = ev.severity.clone().unwrap_or_else(|| "info".into());
484 let payload = ev.payload.clone().unwrap_or_else(|| json!({}));
485 crate::repo::log_event(
486 &st.pool,
487 Some(&body.camera_id),
488 &ev.event_type,
489 &severity,
490 payload,
491 )
492 .await?;
493 }
494
495 Ok(Json(json!({ "detections_ingested": inserted })))
496}
497
498fn parse_opt_ts(s: &Option<String>, field: &str) -> AppResult<Option<chrono::DateTime<Utc>>> {
499 match s {
500 Some(v) => crate::util::parse_rfc3339(v)
501 .map(Some)
502 .ok_or_else(|| AppError::BadRequest(format!("invalid `{field}` timestamp"))),
503 None => Ok(None),
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510
511 #[test]
512 fn validate_profile_accepts_sub_and_main() {
513 assert!(validate_profile("sub").is_ok());
514 assert!(validate_profile("main").is_ok());
515 }
516
517 #[test]
518 fn validate_profile_rejects_other_values() {
519 for bad in ["", "Sub", "MAIN", " sub", "sub ", "substream", "foo"] {
521 match validate_profile(bad) {
522 Err(AppError::BadRequest(m)) => {
523 assert!(
524 m.contains("stream_profile"),
525 "unexpected message for {bad:?}: {m}"
526 );
527 }
528 other => panic!("expected BadRequest for {bad:?}, got {other:?}"),
529 }
530 }
531 }
532
533 #[test]
534 fn parse_opt_ts_none_is_ok_none() {
535 let out = parse_opt_ts(&None, "from").unwrap();
536 assert!(out.is_none());
537 }
538
539 #[test]
540 fn parse_opt_ts_valid_matches_util() {
541 let raw = "2026-06-13T05:02:19Z".to_string();
542 let parsed = parse_opt_ts(&Some(raw.clone()), "from").unwrap();
543 assert_eq!(parsed, crate::util::parse_rfc3339(&raw));
545 assert_eq!(parsed.unwrap().to_rfc3339(), "2026-06-13T05:02:19+00:00");
546 }
547
548 #[test]
549 fn parse_opt_ts_invalid_reports_field() {
550 match parse_opt_ts(&Some("not-a-timestamp".to_string()), "to") {
551 Err(AppError::BadRequest(m)) => {
552 assert!(m.contains("to"), "message should name the field: {m}");
553 assert!(m.contains("timestamp"), "message: {m}");
554 }
555 other => panic!("expected BadRequest, got {other:?}"),
556 }
557 }
558
559 #[test]
560 fn max_ingest_detections_bound_is_stable() {
561 assert_eq!(MAX_INGEST_DETECTIONS, 1000);
562 }
563}