1use axum::body::Body;
5use axum::extract::{Path, Query, State};
6use axum::http::{header, StatusCode};
7use axum::response::Response;
8use axum::routing::{get, post};
9use axum::{Json, Router};
10use chrono::Utc;
11use serde::{Deserialize, Serialize};
12use serde_json::{json, Value};
13use sqlx::types::Json as SqlxJson;
14use uuid::Uuid;
15
16use crate::auth::{self, Principal};
17use crate::error::{AppError, AppResult};
18use crate::models::{AiIngest, AiTask, AiTaskCreate, AiTaskUpdate, Detection};
19use crate::routes::cameras::load_camera;
20use crate::services::sampler::SamplerInfo;
21use crate::state::AppState;
22
23pub fn router() -> Router<AppState> {
24 Router::new()
25 .route(
26 "/api/v1/cameras/{id}/ai-tasks",
27 get(list_camera_tasks).post(create_task),
28 )
29 .route(
30 "/api/v1/ai-tasks/{task_id}",
31 axum::routing::patch(update_task).delete(delete_task),
32 )
33 .route("/api/v1/ai/tasks", get(list_all_tasks))
34 .route("/api/v1/ai/samplers", get(sampler_status))
35 .route("/api/v1/ai/events", post(ingest))
36 .route("/api/v1/cameras/{id}/frame", get(latest_frame))
37 .route("/api/v1/cameras/{id}/detections", get(list_detections))
38}
39
40fn validate_profile(p: &str) -> AppResult<()> {
41 if matches!(p, "sub" | "main") {
42 Ok(())
43 } else {
44 Err(AppError::BadRequest(
45 "`stream_profile` must be 'sub' or 'main'".into(),
46 ))
47 }
48}
49
50async fn list_camera_tasks(
51 State(st): State<AppState>,
52 Path(id): Path<String>,
53 principal: Principal,
54) -> AppResult<Json<Vec<AiTask>>> {
55 principal.require(principal.can_view(), "view AI tasks")?;
56 let _ = load_camera(&st.pool, &id).await?;
57 let tasks = sqlx::query_as::<_, AiTask>(
58 "SELECT * FROM ai_tasks WHERE camera_id = ? ORDER BY created_at ASC",
59 )
60 .bind(&id)
61 .fetch_all(&st.pool)
62 .await?;
63 Ok(Json(tasks))
64}
65
66async fn create_task(
67 State(st): State<AppState>,
68 Path(id): Path<String>,
69 principal: Principal,
70 Json(body): Json<AiTaskCreate>,
71) -> AppResult<(StatusCode, Json<AiTask>)> {
72 principal.require(principal.can_manage_registry(), "create AI tasks")?;
73 let _ = load_camera(&st.pool, &id).await?;
74 if body.task_type.trim().is_empty() {
75 return Err(AppError::BadRequest("`task_type` is required".into()));
76 }
77 let profile = body.stream_profile.unwrap_or_else(|| "sub".into());
78 validate_profile(&profile)?;
79 let fps = body.fps.unwrap_or(st.cfg.default_ai_fps).clamp(0.1, 30.0);
80 let width = body
81 .width
82 .unwrap_or(st.cfg.default_ai_width)
83 .clamp(160, 3840);
84 let enabled = body.enabled.unwrap_or(true);
85 let config = SqlxJson(body.config.unwrap_or_else(|| json!({})));
86 let now = Utc::now();
87 let task_id = format!("ai_{}", Uuid::new_v4().simple());
88
89 sqlx::query(
90 "INSERT INTO ai_tasks
91 (id, camera_id, task_type, enabled, stream_profile, fps, width, config, created_at, updated_at)
92 VALUES (?,?,?,?,?,?,?,?,?,?)",
93 )
94 .bind(&task_id)
95 .bind(&id)
96 .bind(&body.task_type)
97 .bind(enabled)
98 .bind(&profile)
99 .bind(fps)
100 .bind(width)
101 .bind(config)
102 .bind(now)
103 .bind(now)
104 .execute(&st.pool)
105 .await?;
106
107 st.sampler.reconcile().await;
108 let task = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
109 .bind(&task_id)
110 .fetch_one(&st.pool)
111 .await?;
112 auth::audit(
113 &st.pool,
114 &principal,
115 "create_ai_task",
116 "ai_task",
117 &task_id,
118 json!({ "camera_id": &id, "task_type": &task.task_type }),
119 )
120 .await;
121 Ok((StatusCode::CREATED, Json(task)))
122}
123
124async fn update_task(
125 State(st): State<AppState>,
126 Path(task_id): Path<String>,
127 principal: Principal,
128 Json(body): Json<AiTaskUpdate>,
129) -> AppResult<Json<AiTask>> {
130 principal.require(principal.can_manage_registry(), "update AI tasks")?;
131 let cur = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
132 .bind(&task_id)
133 .fetch_optional(&st.pool)
134 .await?
135 .ok_or_else(|| AppError::NotFound(format!("ai task {task_id} not found")))?;
136
137 let task_type = body.task_type.unwrap_or(cur.task_type);
138 let profile = body.stream_profile.unwrap_or(cur.stream_profile);
139 validate_profile(&profile)?;
140 let fps = body.fps.map(|v| v.clamp(0.1, 30.0)).unwrap_or(cur.fps);
141 let width = body.width.map(|v| v.clamp(160, 3840)).unwrap_or(cur.width);
142 let enabled = body.enabled.unwrap_or(cur.enabled);
143 let config = SqlxJson(body.config.unwrap_or(cur.config.0));
144
145 sqlx::query(
146 "UPDATE ai_tasks SET task_type=?, stream_profile=?, fps=?, width=?, enabled=?, config=?, updated_at=?
147 WHERE id=?",
148 )
149 .bind(&task_type)
150 .bind(&profile)
151 .bind(fps)
152 .bind(width)
153 .bind(enabled)
154 .bind(config)
155 .bind(Utc::now())
156 .bind(&task_id)
157 .execute(&st.pool)
158 .await?;
159
160 st.sampler.reconcile().await;
161 let task = sqlx::query_as::<_, AiTask>("SELECT * FROM ai_tasks WHERE id = ?")
162 .bind(&task_id)
163 .fetch_one(&st.pool)
164 .await?;
165 auth::audit(
166 &st.pool,
167 &principal,
168 "update_ai_task",
169 "ai_task",
170 &task_id,
171 json!({}),
172 )
173 .await;
174 Ok(Json(task))
175}
176
177async fn delete_task(
178 State(st): State<AppState>,
179 Path(task_id): Path<String>,
180 principal: Principal,
181) -> AppResult<StatusCode> {
182 principal.require(principal.can_manage_registry(), "delete AI tasks")?;
183 let res = sqlx::query("DELETE FROM ai_tasks WHERE id = ?")
184 .bind(&task_id)
185 .execute(&st.pool)
186 .await?;
187 if res.rows_affected() == 0 {
188 return Err(AppError::NotFound(format!("ai task {task_id} not found")));
189 }
190 st.sampler.reconcile().await;
191 auth::audit(
192 &st.pool,
193 &principal,
194 "delete_ai_task",
195 "ai_task",
196 &task_id,
197 json!({}),
198 )
199 .await;
200 Ok(StatusCode::NO_CONTENT)
201}
202
203#[derive(Debug, Serialize)]
204struct WorkerTask {
205 id: String,
206 camera_id: String,
207 task_type: String,
208 stream_profile: String,
209 fps: f64,
210 width: i64,
211 config: Value,
212 frame_url: String,
213}
214
215async fn list_all_tasks(
217 State(st): State<AppState>,
218 principal: crate::auth::Principal,
219) -> AppResult<Json<Vec<WorkerTask>>> {
220 principal.require(principal.can_view(), "discover AI tasks")?;
223 let tasks = sqlx::query_as::<_, AiTask>(
224 "SELECT t.* FROM ai_tasks t JOIN cameras c ON c.id = t.camera_id
225 WHERE t.enabled = 1 AND c.enabled = 1
226 ORDER BY t.camera_id ASC",
227 )
228 .fetch_all(&st.pool)
229 .await?;
230 let out = tasks
231 .into_iter()
232 .map(|t| WorkerTask {
233 frame_url: format!(
234 "/api/v1/cameras/{}/frame?profile={}",
235 t.camera_id, t.stream_profile
236 ),
237 id: t.id,
238 camera_id: t.camera_id,
239 task_type: t.task_type,
240 stream_profile: t.stream_profile,
241 fps: t.fps,
242 width: t.width,
243 config: t.config.0,
244 })
245 .collect();
246 Ok(Json(out))
247}
248
249async fn sampler_status(
250 State(st): State<AppState>,
251 principal: Principal,
252) -> AppResult<Json<Vec<SamplerInfo>>> {
253 principal.require(principal.can_view(), "view sampler status")?;
254 Ok(Json(st.sampler.statuses().await))
255}
256
257#[derive(Debug, Deserialize)]
258struct FrameQuery {
259 profile: Option<String>,
260}
261
262async fn latest_frame(
264 State(st): State<AppState>,
265 principal: crate::auth::Principal,
266 Path(id): Path<String>,
267 Query(q): Query<FrameQuery>,
268) -> AppResult<Response> {
269 principal.require(principal.can_view(), "read camera frames")?;
273 if id.contains('/') || id.contains('\\') || id.contains("..") {
275 return Err(AppError::BadRequest("invalid camera id".into()));
276 }
277 let profile = q.profile.unwrap_or_else(|| "sub".into());
278 validate_profile(&profile)?;
279 let path = st.sampler.frame_path(&id, &profile);
280 let bytes = tokio::fs::read(&path).await.map_err(|_| {
281 AppError::NotFound("no sampled frame yet (is an AI task enabled for this camera?)".into())
282 })?;
283 let captured = tokio::fs::metadata(&path)
284 .await
285 .ok()
286 .and_then(|m| m.modified().ok())
287 .and_then(|t| {
288 t.duration_since(std::time::UNIX_EPOCH)
289 .ok()
290 .map(|d| chrono::DateTime::<Utc>::from_timestamp_millis(d.as_millis() as i64))
291 })
292 .flatten();
293 let age_ms = captured
294 .map(|c| (Utc::now() - c).num_milliseconds().max(0))
295 .unwrap_or(0);
296
297 Response::builder()
298 .header(header::CONTENT_TYPE, "image/jpeg")
299 .header(header::CACHE_CONTROL, "no-store")
300 .header("x-frame-age-ms", age_ms.to_string())
301 .header(
302 "x-frame-captured-at",
303 captured.map(|c| c.to_rfc3339()).unwrap_or_default(),
304 )
305 .body(Body::from(bytes))
306 .map_err(|e| AppError::Other(anyhow::anyhow!("building response: {e}")))
307}
308
309#[derive(Debug, Deserialize)]
310struct DetectionQuery {
311 from: Option<String>,
312 to: Option<String>,
313 label: Option<String>,
314 limit: Option<i64>,
315}
316
317async fn list_detections(
318 State(st): State<AppState>,
319 principal: crate::auth::Principal,
320 Path(id): Path<String>,
321 Query(q): Query<DetectionQuery>,
322) -> AppResult<Json<Vec<Detection>>> {
323 principal.require(principal.can_view(), "read detections")?;
324 let _ = load_camera(&st.pool, &id).await?;
325 let limit = q.limit.unwrap_or(200).clamp(1, 5000);
326 let from = parse_opt_ts(&q.from, "from")?;
327 let to = parse_opt_ts(&q.to, "to")?;
328 let rows = sqlx::query_as::<_, Detection>(
329 "SELECT * FROM detections
330 WHERE camera_id = ?
331 AND (? IS NULL OR timestamp >= ?)
332 AND (? IS NULL OR timestamp <= ?)
333 AND (? IS NULL OR label = ?)
334 ORDER BY timestamp DESC LIMIT ?",
335 )
336 .bind(&id)
337 .bind(from)
338 .bind(from)
339 .bind(to)
340 .bind(to)
341 .bind(&q.label)
342 .bind(&q.label)
343 .bind(limit)
344 .fetch_all(&st.pool)
345 .await?;
346 Ok(Json(rows))
347}
348
349const MAX_INGEST_DETECTIONS: usize = 1000;
351
352async fn ingest(
355 State(st): State<AppState>,
356 principal: crate::auth::Principal,
357 Json(body): Json<AiIngest>,
358) -> AppResult<Json<Value>> {
359 principal.require(principal.can_ingest(), "ingest perception events")?;
360 let cam = load_camera(&st.pool, &body.camera_id).await?;
361 if body.task_type.trim().is_empty() {
362 return Err(AppError::BadRequest("`task_type` is required".into()));
363 }
364 if body.detections.len() > MAX_INGEST_DETECTIONS {
365 return Err(AppError::BadRequest(format!(
366 "too many detections in one request ({}); max {MAX_INGEST_DETECTIONS}",
367 body.detections.len()
368 )));
369 }
370 let ts = parse_opt_ts(&body.timestamp, "timestamp")?.unwrap_or_else(Utc::now);
371
372 let mut inserted = 0u64;
373 let mut tx = st.pool.begin().await?;
374 let outbox_res = sqlx::query(
379 "INSERT INTO outbox (topic, camera_id, site_id, frame_id, task_type, detection_count, created_at)
380 VALUES ('detections', ?, ?, ?, ?, ?, ?)
381 ON CONFLICT DO NOTHING",
382 )
383 .bind(&body.camera_id)
384 .bind(&cam.site_id)
385 .bind(&body.frame_id)
386 .bind(&body.task_type)
387 .bind(body.detections.len() as i64)
388 .bind(Utc::now())
389 .execute(&mut *tx)
390 .await?;
391 if outbox_res.rows_affected() == 0 {
392 tx.commit().await?;
394 return Ok(Json(json!({ "detections_ingested": 0, "duplicate": true })));
395 }
396 for d in &body.detections {
397 let bbox = d.bbox.clone().map(SqlxJson);
398 let attrs = SqlxJson(d.attributes.clone().unwrap_or_else(|| json!({})));
399 sqlx::query(
400 "INSERT INTO detections
401 (id, camera_id, task_type, timestamp, label, confidence, bbox, track_id, attributes, frame_id, created_at)
402 VALUES (?,?,?,?,?,?,?,?,?,?,?)",
403 )
404 .bind(format!("det_{}", Uuid::new_v4().simple()))
405 .bind(&body.camera_id)
406 .bind(&body.task_type)
407 .bind(ts)
408 .bind(&d.label)
409 .bind(d.confidence)
410 .bind(bbox)
411 .bind(&d.track_id)
412 .bind(attrs)
413 .bind(&body.frame_id)
414 .bind(Utc::now())
415 .execute(&mut *tx)
416 .await?;
417 inserted += 1;
418 }
419 tx.commit().await?;
420
421 let batch = crate::services::consumer::DetectionBatch {
425 camera_id: &body.camera_id,
426 site_id: cam.site_id.as_deref(),
427 task_type: &body.task_type,
428 detections: &body.detections,
429 timestamp: ts,
430 };
431 for consumer in st.consumers.iter() {
432 if consumer.interested_in(&body.task_type) {
433 tracing::trace!(consumer = consumer.name(), task_type = %body.task_type, "ingest fan-out");
434 consumer.consume(&batch).await;
435 }
436 }
437
438 if let Some(ev) = &body.event {
439 let severity = ev.severity.clone().unwrap_or_else(|| "info".into());
440 let payload = ev.payload.clone().unwrap_or_else(|| json!({}));
441 crate::repo::log_event(
442 &st.pool,
443 Some(&body.camera_id),
444 &ev.event_type,
445 &severity,
446 payload,
447 )
448 .await?;
449 }
450
451 Ok(Json(json!({ "detections_ingested": inserted })))
452}
453
454fn parse_opt_ts(s: &Option<String>, field: &str) -> AppResult<Option<chrono::DateTime<Utc>>> {
455 match s {
456 Some(v) => crate::util::parse_rfc3339(v)
457 .map(Some)
458 .ok_or_else(|| AppError::BadRequest(format!("invalid `{field}` timestamp"))),
459 None => Ok(None),
460 }
461}