1use axum::extract::{Path, State};
2use axum::http::StatusCode;
3use axum::routing::get;
4use axum::{Json, Router};
5use chrono::Utc;
6use serde_json::{json, Value};
7use sqlx::types::Json as SqlxJson;
8use sqlx::SqlitePool;
9
10use crate::auth::{self, Principal};
11use crate::camera_url;
12use crate::error::{AppError, AppResult};
13use crate::models::{Camera, CameraCreate, CameraUpdate, CameraView};
14use crate::state::AppState;
15use crate::util;
16
17pub fn router() -> Router<AppState> {
18 Router::new()
19 .route("/api/v1/cameras", get(list_cameras).post(create_camera))
20 .route(
21 "/api/v1/cameras/{id}",
22 get(get_camera_handler)
23 .patch(update_camera)
24 .delete(delete_camera),
25 )
26 .route(
27 "/api/v1/cameras/{id}/test",
28 get(test_camera).post(test_camera),
29 )
30}
31
32fn validate_record_mode(mode: &str) -> AppResult<()> {
36 if matches!(
37 mode,
38 "continuous" | "scheduled" | "event" | "scheduled_event"
39 ) {
40 Ok(())
41 } else {
42 Err(AppError::BadRequest(
43 "`record_mode` must be continuous|scheduled|event|scheduled_event".into(),
44 ))
45 }
46}
47
48pub(crate) async fn load_camera(pool: &SqlitePool, id: &str) -> AppResult<Camera> {
49 sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
50 .bind(id)
51 .fetch_optional(pool)
52 .await?
53 .ok_or_else(|| AppError::NotFound(format!("camera {id} not found")))
54}
55
56async fn list_cameras(
57 State(st): State<AppState>,
58 principal: Principal,
59) -> AppResult<Json<Vec<CameraView>>> {
60 principal.require(principal.can_view(), "list cameras")?;
61 let cams = sqlx::query_as::<_, Camera>("SELECT * FROM cameras ORDER BY id ASC")
62 .fetch_all(&st.pool)
63 .await?;
64 Ok(Json(cams.into_iter().map(CameraView::from).collect()))
65}
66
67async fn get_camera_handler(
68 State(st): State<AppState>,
69 principal: Principal,
70 Path(id): Path<String>,
71) -> AppResult<Json<CameraView>> {
72 principal.require(principal.can_view(), "view a camera")?;
73 Ok(Json(load_camera(&st.pool, &id).await?.into()))
74}
75
76async fn create_camera(
77 State(st): State<AppState>,
78 principal: Principal,
79 Json(body): Json<CameraCreate>,
80) -> AppResult<(StatusCode, Json<CameraView>)> {
81 principal.require(principal.can_manage_registry(), "create cameras")?;
82 let id = body
83 .id
84 .as_deref()
85 .map(util::slugify)
86 .unwrap_or_else(|| util::slugify(&body.name));
87 if body.name.trim().is_empty() {
88 return Err(AppError::BadRequest("`name` is required".into()));
89 }
90
91 let exists: Option<(String,)> = sqlx::query_as("SELECT id FROM cameras WHERE id = ?")
92 .bind(&id)
93 .fetch_optional(&st.pool)
94 .await?;
95 if exists.is_some() {
96 return Err(AppError::Conflict(format!(
97 "camera id `{id}` already exists"
98 )));
99 }
100
101 let record_stream = body.record_stream.unwrap_or_else(|| "main".into());
102 if !matches!(record_stream.as_str(), "main" | "sub") {
103 return Err(AppError::BadRequest(
104 "`record_stream` must be 'main' or 'sub'".into(),
105 ));
106 }
107 for url in [
108 body.main_stream_url.as_deref(),
109 body.sub_stream_url.as_deref(),
110 ]
111 .into_iter()
112 .flatten()
113 {
114 camera_url::validate_stream_url(url).map_err(AppError::BadRequest)?;
115 }
116
117 let now = Utc::now();
118 let caps = SqlxJson(body.capabilities.unwrap_or_else(|| json!({})));
119 let rtsp_port = body.rtsp_port.unwrap_or(554);
120 let record_enabled = body.record_enabled.unwrap_or(true);
121 let enabled = body.enabled.unwrap_or(true);
122 let seg = body
123 .segment_seconds
124 .unwrap_or(st.cfg.default_segment_seconds)
125 .clamp(2, 3600);
126 let retention = body
127 .retention_hours
128 .unwrap_or(st.cfg.default_retention_hours)
129 .max(1);
130 let storage_quota_bytes =
133 body.storage_quota_bytes
134 .or_else(|| match st.cfg.default_camera_quota_bytes {
135 0 => None,
136 q => Some(q as i64),
137 });
138 let record_audio = body.record_audio.unwrap_or(st.cfg.default_record_audio);
139 let record_mode = body.record_mode.unwrap_or_else(|| "continuous".into());
140 validate_record_mode(&record_mode)?;
141 let pre_roll_seconds = body
142 .pre_roll_seconds
143 .unwrap_or(st.cfg.default_pre_roll_seconds)
144 .clamp(0, 300);
145 let post_roll_seconds = body
146 .post_roll_seconds
147 .unwrap_or(st.cfg.default_post_roll_seconds)
148 .clamp(0, 3600);
149 let mirror_enabled = body.mirror_enabled.unwrap_or(false);
150 let anr_enabled = body.anr_enabled.unwrap_or(false);
151 let anr_replay_url_template = body
152 .anr_replay_url_template
153 .as_deref()
154 .map(str::trim)
155 .filter(|s| !s.is_empty())
156 .map(str::to_string);
157
158 let password = body
160 .password
161 .as_deref()
162 .map(crate::services::secrets::encrypt_for_storage)
163 .transpose()?;
164
165 sqlx::query(
166 "INSERT INTO cameras
167 (id, site_id, name, vendor, model, address, rtsp_port, username, password,
168 main_stream_url, sub_stream_url, record_stream, capabilities, record_enabled,
169 segment_seconds, retention_hours, storage_quota_bytes, record_audio, record_mode,
170 pre_roll_seconds, post_roll_seconds, mirror_enabled, anr_enabled, anr_replay_url_template,
171 enabled, created_at, updated_at)
172 VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
173 )
174 .bind(&id)
175 .bind(&body.site_id)
176 .bind(&body.name)
177 .bind(&body.vendor)
178 .bind(&body.model)
179 .bind(&body.address)
180 .bind(rtsp_port)
181 .bind(&body.username)
182 .bind(&password)
183 .bind(&body.main_stream_url)
184 .bind(&body.sub_stream_url)
185 .bind(&record_stream)
186 .bind(caps)
187 .bind(record_enabled)
188 .bind(seg)
189 .bind(retention)
190 .bind(storage_quota_bytes)
191 .bind(record_audio)
192 .bind(&record_mode)
193 .bind(pre_roll_seconds)
194 .bind(post_roll_seconds)
195 .bind(mirror_enabled)
196 .bind(anr_enabled)
197 .bind(&anr_replay_url_template)
198 .bind(enabled)
199 .bind(now)
200 .bind(now)
201 .execute(&st.pool)
202 .await?;
203
204 sqlx::query(
205 "INSERT INTO camera_status (camera_id, state, updated_at) VALUES (?, 'unknown', ?)
206 ON CONFLICT(camera_id) DO NOTHING",
207 )
208 .bind(&id)
209 .bind(now)
210 .execute(&st.pool)
211 .await?;
212
213 st.recorder.reconcile(&id).await;
214 if let Some(m) = &st.mirror {
215 m.reconcile(&id).await;
216 }
217 let cam = load_camera(&st.pool, &id).await?;
218 auth::audit(
219 &st.pool,
220 &principal,
221 "create_camera",
222 "camera",
223 &id,
224 json!({ "name": &body.name, "vendor": &body.vendor }),
225 )
226 .await;
227 Ok((StatusCode::CREATED, Json(cam.into())))
228}
229
230async fn update_camera(
231 State(st): State<AppState>,
232 Path(id): Path<String>,
233 principal: Principal,
234 Json(body): Json<CameraUpdate>,
235) -> AppResult<Json<CameraView>> {
236 principal.require(principal.can_manage_registry(), "update cameras")?;
237 let cur = load_camera(&st.pool, &id).await?;
238
239 let record_stream = body.record_stream.unwrap_or(cur.record_stream);
240 if !matches!(record_stream.as_str(), "main" | "sub") {
241 return Err(AppError::BadRequest(
242 "`record_stream` must be 'main' or 'sub'".into(),
243 ));
244 }
245
246 let name = body.name.unwrap_or(cur.name);
247 let site_id = body.site_id.or(cur.site_id);
248 let vendor = body.vendor.unwrap_or(cur.vendor);
249 let model = body.model.or(cur.model);
250 let address = body.address.or(cur.address);
251 let rtsp_port = body.rtsp_port.unwrap_or(cur.rtsp_port);
252 let username = body.username.or(cur.username);
253 let password = match body.password {
256 Some(p) => Some(crate::services::secrets::encrypt_for_storage(&p)?),
257 None => cur.password,
258 };
259 let main_stream_url = body.main_stream_url.or(cur.main_stream_url);
260 let sub_stream_url = body.sub_stream_url.or(cur.sub_stream_url);
261 for url in [main_stream_url.as_deref(), sub_stream_url.as_deref()]
262 .into_iter()
263 .flatten()
264 {
265 camera_url::validate_stream_url(url).map_err(AppError::BadRequest)?;
266 }
267 let caps = SqlxJson(body.capabilities.unwrap_or(cur.capabilities.0));
268 let record_enabled = body.record_enabled.unwrap_or(cur.record_enabled);
269 let enabled = body.enabled.unwrap_or(cur.enabled);
270 let priority = body.priority.unwrap_or(cur.priority);
271 let seg = body
272 .segment_seconds
273 .map(|v| v.clamp(2, 3600))
274 .unwrap_or(cur.segment_seconds);
275 let retention = body
276 .retention_hours
277 .map(|v| v.max(1))
278 .unwrap_or(cur.retention_hours);
279 let storage_quota_bytes = body.storage_quota_bytes.or(cur.storage_quota_bytes);
280 let record_audio = body.record_audio.unwrap_or(cur.record_audio);
281 let record_mode = body.record_mode.unwrap_or(cur.record_mode);
282 validate_record_mode(&record_mode)?;
283 let pre_roll_seconds = body
284 .pre_roll_seconds
285 .map(|v| v.clamp(0, 300))
286 .unwrap_or(cur.pre_roll_seconds);
287 let post_roll_seconds = body
288 .post_roll_seconds
289 .map(|v| v.clamp(0, 3600))
290 .unwrap_or(cur.post_roll_seconds);
291 let mirror_enabled = body.mirror_enabled.unwrap_or(cur.mirror_enabled);
292 let anr_enabled = body.anr_enabled.unwrap_or(cur.anr_enabled);
293 let anr_replay_url_template = body
294 .anr_replay_url_template
295 .as_deref()
296 .map(str::trim)
297 .filter(|s| !s.is_empty())
298 .map(str::to_string)
299 .or(cur.anr_replay_url_template);
300
301 sqlx::query(
302 "UPDATE cameras SET
303 name=?, site_id=?, vendor=?, model=?, address=?, rtsp_port=?, username=?, password=?,
304 main_stream_url=?, sub_stream_url=?, record_stream=?, capabilities=?, record_enabled=?,
305 segment_seconds=?, retention_hours=?, storage_quota_bytes=?, record_audio=?, record_mode=?,
306 pre_roll_seconds=?, post_roll_seconds=?, mirror_enabled=?, anr_enabled=?,
307 anr_replay_url_template=?, enabled=?, priority=?, updated_at=?
308 WHERE id=?",
309 )
310 .bind(&name)
311 .bind(&site_id)
312 .bind(&vendor)
313 .bind(&model)
314 .bind(&address)
315 .bind(rtsp_port)
316 .bind(&username)
317 .bind(&password)
318 .bind(&main_stream_url)
319 .bind(&sub_stream_url)
320 .bind(&record_stream)
321 .bind(caps)
322 .bind(record_enabled)
323 .bind(seg)
324 .bind(retention)
325 .bind(storage_quota_bytes)
326 .bind(record_audio)
327 .bind(&record_mode)
328 .bind(pre_roll_seconds)
329 .bind(post_roll_seconds)
330 .bind(mirror_enabled)
331 .bind(anr_enabled)
332 .bind(&anr_replay_url_template)
333 .bind(enabled)
334 .bind(priority)
335 .bind(Utc::now())
336 .bind(&id)
337 .execute(&st.pool)
338 .await?;
339
340 st.recorder.reconcile(&id).await;
341 if let Some(m) = &st.mirror {
342 m.reconcile(&id).await;
343 }
344 st.sampler.reconcile().await;
346 auth::audit(
347 &st.pool,
348 &principal,
349 "update_camera",
350 "camera",
351 &id,
352 json!({}),
353 )
354 .await;
355 Ok(Json(load_camera(&st.pool, &id).await?.into()))
356}
357
358async fn delete_camera(
359 State(st): State<AppState>,
360 Path(id): Path<String>,
361 principal: Principal,
362) -> AppResult<StatusCode> {
363 principal.require(principal.can_manage_registry(), "delete cameras")?;
364 let _ = load_camera(&st.pool, &id).await?; st.recorder.stop(&id).await;
366 if let Some(m) = &st.mirror {
367 m.stop(&id).await;
368 }
369 let evidence: Vec<(Option<String>,)> =
371 sqlx::query_as("SELECT evidence_path FROM zone_events WHERE camera_id = ?")
372 .bind(&id)
373 .fetch_all(&st.pool)
374 .await
375 .unwrap_or_default();
376 for (ev,) in &evidence {
377 if let Some(name) = ev.as_deref().and_then(|u| u.rsplit('/').next()) {
378 let _ = tokio::fs::remove_file(st.cfg.snapshots_dir.join(name)).await;
379 }
380 }
381 let _ = sqlx::query("DELETE FROM zone_events WHERE camera_id = ?")
382 .bind(&id)
383 .execute(&st.pool)
384 .await;
385 sqlx::query("DELETE FROM cameras WHERE id = ?")
386 .bind(&id)
387 .execute(&st.pool)
388 .await?;
389 st.sampler.reconcile().await;
391 let _ = tokio::fs::remove_dir_all(st.cfg.camera_recordings_dir(&id)).await;
392 let _ = tokio::fs::remove_dir_all(st.cfg.camera_frames_dir(&id)).await;
393 if let Some(dir) = &st.cfg.mirror_recordings_dir {
394 let _ = tokio::fs::remove_dir_all(dir.join(&id)).await;
395 }
396 auth::audit(
397 &st.pool,
398 &principal,
399 "delete_camera",
400 "camera",
401 &id,
402 json!({}),
403 )
404 .await;
405 Ok(StatusCode::NO_CONTENT)
406}
407
408async fn test_camera(
410 State(st): State<AppState>,
411 principal: Principal,
412 Path(id): Path<String>,
413) -> AppResult<Json<Value>> {
414 principal.require(principal.can_view(), "test camera connectivity")?;
415 let cam = load_camera(&st.pool, &id).await?;
416 let url = camera_url::record_url(&cam)
417 .ok_or_else(|| AppError::BadRequest("camera has no stream URL".into()))?;
418
419 let probe = tokio::time::timeout(
420 std::time::Duration::from_secs(12),
421 util::ffprobe_stream(&st.cfg.ffprobe_bin, &url),
422 )
423 .await;
424
425 let result = match probe {
426 Ok(Ok(info)) => json!({
427 "reachable": true,
428 "codec": info.codec,
429 "width": info.width,
430 "height": info.height,
431 "url": camera_url::mask_url(&url),
432 }),
433 Ok(Err(e)) => json!({
434 "reachable": false,
435 "error": camera_url::mask_url(&e.to_string()),
436 "url": camera_url::mask_url(&url),
437 }),
438 Err(_) => json!({
439 "reachable": false,
440 "error": "probe timed out after 12s",
441 "url": camera_url::mask_url(&url),
442 }),
443 };
444 Ok(Json(result))
445}
446
447#[cfg(test)]
448mod tests {
449 use crate::config::Config;
450 use crate::services::recorder::RecorderManager;
451 use crate::services::sampler::SamplerManager;
452 use crate::state::AppState;
453 use axum::body::Body;
454 use axum::http::{Request, StatusCode};
455 use std::sync::Arc;
456 use tower::Service;
457
458 async fn test_state(auth_enabled: bool) -> AppState {
461 let pool = sqlx::sqlite::SqlitePoolOptions::new()
462 .max_connections(1)
463 .connect("sqlite::memory:")
464 .await
465 .unwrap();
466 crate::db::run_migrations(&pool).await.unwrap();
467 let mut cfg = Config::from_env();
468 cfg.auth_enabled = auth_enabled;
469 let cfg = Arc::new(cfg);
470 AppState {
471 recorder: RecorderManager::new(pool.clone(), cfg.clone()),
472 sampler: SamplerManager::new(pool.clone(), cfg.clone()),
473 mirror: None,
474 consumers: Arc::new(Vec::new()),
475 modules: Arc::new(Vec::new()),
476 catalog: Arc::new(crate::services::registry::CatalogService::new(&cfg)),
477 http: reqwest::Client::new(),
478 started_at: chrono::Utc::now(),
479 pool,
480 cfg,
481 }
482 }
483
484 async fn unauthenticated_list_status(auth_enabled: bool) -> StatusCode {
486 let st = test_state(auth_enabled).await;
487 let mut app = super::router().with_state(st);
488 let req = Request::builder()
489 .method("GET")
490 .uri("/api/v1/cameras")
491 .body(Body::empty())
492 .unwrap();
493 app.call(req).await.unwrap().status()
494 }
495
496 #[tokio::test]
499 async fn legacy_route_rejects_unauthenticated_when_auth_enabled() {
500 assert_eq!(
501 unauthenticated_list_status(true).await,
502 StatusCode::UNAUTHORIZED
503 );
504 }
505
506 #[tokio::test]
509 async fn legacy_route_open_when_auth_disabled() {
510 assert_eq!(unauthenticated_list_status(false).await, StatusCode::OK);
511 }
512}