Skip to main content

heldar_kernel/routes/
cameras.rs

1use axum::extract::{Path, State};
2use axum::http::StatusCode;
3use axum::routing::get;
4use axum::{Json, Router};
5use chrono::Utc;
6use serde_json::{json, Value};
7use sqlx::types::Json as SqlxJson;
8use sqlx::SqlitePool;
9
10use crate::auth::{self, Principal};
11use crate::camera_url;
12use crate::error::{AppError, AppResult};
13use crate::models::{Camera, CameraCreate, CameraUpdate, CameraView};
14use crate::state::AppState;
15use crate::util;
16
17pub fn router() -> Router<AppState> {
18    Router::new()
19        .route("/api/v1/cameras", get(list_cameras).post(create_camera))
20        .route(
21            "/api/v1/cameras/{id}",
22            get(get_camera_handler)
23                .patch(update_camera)
24                .delete(delete_camera),
25        )
26        .route(
27            "/api/v1/cameras/{id}/test",
28            get(test_camera).post(test_camera),
29        )
30}
31
32/// Accepted `record_mode` values. `event` / `scheduled_event` event-triggering is wired in a later
33/// batch; this batch honors `continuous` (always) and the time-of-day window for `scheduled` /
34/// `scheduled_event`.
35fn validate_record_mode(mode: &str) -> AppResult<()> {
36    if matches!(
37        mode,
38        "continuous" | "scheduled" | "event" | "scheduled_event"
39    ) {
40        Ok(())
41    } else {
42        Err(AppError::BadRequest(
43            "`record_mode` must be continuous|scheduled|event|scheduled_event".into(),
44        ))
45    }
46}
47
48pub(crate) async fn load_camera(pool: &SqlitePool, id: &str) -> AppResult<Camera> {
49    sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
50        .bind(id)
51        .fetch_optional(pool)
52        .await?
53        .ok_or_else(|| AppError::NotFound(format!("camera {id} not found")))
54}
55
56async fn list_cameras(
57    State(st): State<AppState>,
58    principal: Principal,
59) -> AppResult<Json<Vec<CameraView>>> {
60    principal.require(principal.can_view(), "list cameras")?;
61    let cams = sqlx::query_as::<_, Camera>("SELECT * FROM cameras ORDER BY id ASC")
62        .fetch_all(&st.pool)
63        .await?;
64    Ok(Json(cams.into_iter().map(CameraView::from).collect()))
65}
66
67async fn get_camera_handler(
68    State(st): State<AppState>,
69    principal: Principal,
70    Path(id): Path<String>,
71) -> AppResult<Json<CameraView>> {
72    principal.require(principal.can_view(), "view a camera")?;
73    Ok(Json(load_camera(&st.pool, &id).await?.into()))
74}
75
76async fn create_camera(
77    State(st): State<AppState>,
78    principal: Principal,
79    Json(body): Json<CameraCreate>,
80) -> AppResult<(StatusCode, Json<CameraView>)> {
81    principal.require(principal.can_manage_registry(), "create cameras")?;
82    let id = body
83        .id
84        .as_deref()
85        .map(util::slugify)
86        .unwrap_or_else(|| util::slugify(&body.name));
87    if body.name.trim().is_empty() {
88        return Err(AppError::BadRequest("`name` is required".into()));
89    }
90
91    let exists: Option<(String,)> = sqlx::query_as("SELECT id FROM cameras WHERE id = ?")
92        .bind(&id)
93        .fetch_optional(&st.pool)
94        .await?;
95    if exists.is_some() {
96        return Err(AppError::Conflict(format!(
97            "camera id `{id}` already exists"
98        )));
99    }
100
101    let record_stream = body.record_stream.unwrap_or_else(|| "main".into());
102    if !matches!(record_stream.as_str(), "main" | "sub") {
103        return Err(AppError::BadRequest(
104            "`record_stream` must be 'main' or 'sub'".into(),
105        ));
106    }
107    for url in [
108        body.main_stream_url.as_deref(),
109        body.sub_stream_url.as_deref(),
110    ]
111    .into_iter()
112    .flatten()
113    {
114        camera_url::validate_stream_url(url).map_err(AppError::BadRequest)?;
115    }
116
117    let now = Utc::now();
118    let caps = SqlxJson(body.capabilities.unwrap_or_else(|| json!({})));
119    let rtsp_port = body.rtsp_port.unwrap_or(554);
120    let record_enabled = body.record_enabled.unwrap_or(true);
121    let enabled = body.enabled.unwrap_or(true);
122    let seg = body
123        .segment_seconds
124        .unwrap_or(st.cfg.default_segment_seconds)
125        .clamp(2, 3600);
126    let retention = body
127        .retention_hours
128        .unwrap_or(st.cfg.default_retention_hours)
129        .max(1);
130    // Fall back to the configured default quota when omitted; a default of 0 means "no quota" and is
131    // stored as NULL (no per-camera cap).
132    let storage_quota_bytes =
133        body.storage_quota_bytes
134            .or_else(|| match st.cfg.default_camera_quota_bytes {
135                0 => None,
136                q => Some(q as i64),
137            });
138    let record_audio = body.record_audio.unwrap_or(st.cfg.default_record_audio);
139    let record_mode = body.record_mode.unwrap_or_else(|| "continuous".into());
140    validate_record_mode(&record_mode)?;
141    let pre_roll_seconds = body
142        .pre_roll_seconds
143        .unwrap_or(st.cfg.default_pre_roll_seconds)
144        .clamp(0, 300);
145    let post_roll_seconds = body
146        .post_roll_seconds
147        .unwrap_or(st.cfg.default_post_roll_seconds)
148        .clamp(0, 3600);
149    let mirror_enabled = body.mirror_enabled.unwrap_or(false);
150    let anr_enabled = body.anr_enabled.unwrap_or(false);
151    let anr_replay_url_template = body
152        .anr_replay_url_template
153        .as_deref()
154        .map(str::trim)
155        .filter(|s| !s.is_empty())
156        .map(str::to_string);
157
158    // Encrypt the camera password at rest when HELDAR_SECRET_KEY is configured (plaintext otherwise).
159    let password = body
160        .password
161        .as_deref()
162        .map(crate::services::secrets::encrypt_for_storage)
163        .transpose()?;
164
165    sqlx::query(
166        "INSERT INTO cameras
167           (id, site_id, name, vendor, model, address, rtsp_port, username, password,
168            main_stream_url, sub_stream_url, record_stream, capabilities, record_enabled,
169            segment_seconds, retention_hours, storage_quota_bytes, record_audio, record_mode,
170            pre_roll_seconds, post_roll_seconds, mirror_enabled, anr_enabled, anr_replay_url_template,
171            enabled, created_at, updated_at)
172         VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
173    )
174    .bind(&id)
175    .bind(&body.site_id)
176    .bind(&body.name)
177    .bind(&body.vendor)
178    .bind(&body.model)
179    .bind(&body.address)
180    .bind(rtsp_port)
181    .bind(&body.username)
182    .bind(&password)
183    .bind(&body.main_stream_url)
184    .bind(&body.sub_stream_url)
185    .bind(&record_stream)
186    .bind(caps)
187    .bind(record_enabled)
188    .bind(seg)
189    .bind(retention)
190    .bind(storage_quota_bytes)
191    .bind(record_audio)
192    .bind(&record_mode)
193    .bind(pre_roll_seconds)
194    .bind(post_roll_seconds)
195    .bind(mirror_enabled)
196    .bind(anr_enabled)
197    .bind(&anr_replay_url_template)
198    .bind(enabled)
199    .bind(now)
200    .bind(now)
201    .execute(&st.pool)
202    .await?;
203
204    sqlx::query(
205        "INSERT INTO camera_status (camera_id, state, updated_at) VALUES (?, 'unknown', ?)
206         ON CONFLICT(camera_id) DO NOTHING",
207    )
208    .bind(&id)
209    .bind(now)
210    .execute(&st.pool)
211    .await?;
212
213    st.recorder.reconcile(&id).await;
214    if let Some(m) = &st.mirror {
215        m.reconcile(&id).await;
216    }
217    let cam = load_camera(&st.pool, &id).await?;
218    auth::audit(
219        &st.pool,
220        &principal,
221        "create_camera",
222        "camera",
223        &id,
224        json!({ "name": &body.name, "vendor": &body.vendor }),
225    )
226    .await;
227    Ok((StatusCode::CREATED, Json(cam.into())))
228}
229
230async fn update_camera(
231    State(st): State<AppState>,
232    Path(id): Path<String>,
233    principal: Principal,
234    Json(body): Json<CameraUpdate>,
235) -> AppResult<Json<CameraView>> {
236    principal.require(principal.can_manage_registry(), "update cameras")?;
237    let cur = load_camera(&st.pool, &id).await?;
238
239    let record_stream = body.record_stream.unwrap_or(cur.record_stream);
240    if !matches!(record_stream.as_str(), "main" | "sub") {
241        return Err(AppError::BadRequest(
242            "`record_stream` must be 'main' or 'sub'".into(),
243        ));
244    }
245
246    let name = body.name.unwrap_or(cur.name);
247    let site_id = body.site_id.or(cur.site_id);
248    let vendor = body.vendor.unwrap_or(cur.vendor);
249    let model = body.model.or(cur.model);
250    let address = body.address.or(cur.address);
251    let rtsp_port = body.rtsp_port.unwrap_or(cur.rtsp_port);
252    let username = body.username.or(cur.username);
253    // A new password is plaintext from the client → encrypt at rest; otherwise keep the stored value
254    // (already in its at-rest form — do not re-encrypt).
255    let password = match body.password {
256        Some(p) => Some(crate::services::secrets::encrypt_for_storage(&p)?),
257        None => cur.password,
258    };
259    let main_stream_url = body.main_stream_url.or(cur.main_stream_url);
260    let sub_stream_url = body.sub_stream_url.or(cur.sub_stream_url);
261    for url in [main_stream_url.as_deref(), sub_stream_url.as_deref()]
262        .into_iter()
263        .flatten()
264    {
265        camera_url::validate_stream_url(url).map_err(AppError::BadRequest)?;
266    }
267    let caps = SqlxJson(body.capabilities.unwrap_or(cur.capabilities.0));
268    let record_enabled = body.record_enabled.unwrap_or(cur.record_enabled);
269    let enabled = body.enabled.unwrap_or(cur.enabled);
270    let priority = body.priority.unwrap_or(cur.priority);
271    let seg = body
272        .segment_seconds
273        .map(|v| v.clamp(2, 3600))
274        .unwrap_or(cur.segment_seconds);
275    let retention = body
276        .retention_hours
277        .map(|v| v.max(1))
278        .unwrap_or(cur.retention_hours);
279    let storage_quota_bytes = body.storage_quota_bytes.or(cur.storage_quota_bytes);
280    let record_audio = body.record_audio.unwrap_or(cur.record_audio);
281    let record_mode = body.record_mode.unwrap_or(cur.record_mode);
282    validate_record_mode(&record_mode)?;
283    let pre_roll_seconds = body
284        .pre_roll_seconds
285        .map(|v| v.clamp(0, 300))
286        .unwrap_or(cur.pre_roll_seconds);
287    let post_roll_seconds = body
288        .post_roll_seconds
289        .map(|v| v.clamp(0, 3600))
290        .unwrap_or(cur.post_roll_seconds);
291    let mirror_enabled = body.mirror_enabled.unwrap_or(cur.mirror_enabled);
292    let anr_enabled = body.anr_enabled.unwrap_or(cur.anr_enabled);
293    let anr_replay_url_template = body
294        .anr_replay_url_template
295        .as_deref()
296        .map(str::trim)
297        .filter(|s| !s.is_empty())
298        .map(str::to_string)
299        .or(cur.anr_replay_url_template);
300
301    sqlx::query(
302        "UPDATE cameras SET
303            name=?, site_id=?, vendor=?, model=?, address=?, rtsp_port=?, username=?, password=?,
304            main_stream_url=?, sub_stream_url=?, record_stream=?, capabilities=?, record_enabled=?,
305            segment_seconds=?, retention_hours=?, storage_quota_bytes=?, record_audio=?, record_mode=?,
306            pre_roll_seconds=?, post_roll_seconds=?, mirror_enabled=?, anr_enabled=?,
307            anr_replay_url_template=?, enabled=?, priority=?, updated_at=?
308         WHERE id=?",
309    )
310    .bind(&name)
311    .bind(&site_id)
312    .bind(&vendor)
313    .bind(&model)
314    .bind(&address)
315    .bind(rtsp_port)
316    .bind(&username)
317    .bind(&password)
318    .bind(&main_stream_url)
319    .bind(&sub_stream_url)
320    .bind(&record_stream)
321    .bind(caps)
322    .bind(record_enabled)
323    .bind(seg)
324    .bind(retention)
325    .bind(storage_quota_bytes)
326    .bind(record_audio)
327    .bind(&record_mode)
328    .bind(pre_roll_seconds)
329    .bind(post_roll_seconds)
330    .bind(mirror_enabled)
331    .bind(anr_enabled)
332    .bind(&anr_replay_url_template)
333    .bind(enabled)
334    .bind(priority)
335    .bind(Utc::now())
336    .bind(&id)
337    .execute(&st.pool)
338    .await?;
339
340    st.recorder.reconcile(&id).await;
341    if let Some(m) = &st.mirror {
342        m.reconcile(&id).await;
343    }
344    // A disable / URL change / enable also affects AI sampling for this camera.
345    st.sampler.reconcile().await;
346    auth::audit(
347        &st.pool,
348        &principal,
349        "update_camera",
350        "camera",
351        &id,
352        json!({}),
353    )
354    .await;
355    Ok(Json(load_camera(&st.pool, &id).await?.into()))
356}
357
358async fn delete_camera(
359    State(st): State<AppState>,
360    Path(id): Path<String>,
361    principal: Principal,
362) -> AppResult<StatusCode> {
363    principal.require(principal.can_manage_registry(), "delete cameras")?;
364    let _ = load_camera(&st.pool, &id).await?; // 404 if missing
365    st.recorder.stop(&id).await;
366    if let Some(m) = &st.mirror {
367        m.stop(&id).await;
368    }
369    // Clean up zone-event evidence files + rows for this camera (zone_events has no FK cascade).
370    let evidence: Vec<(Option<String>,)> =
371        sqlx::query_as("SELECT evidence_path FROM zone_events WHERE camera_id = ?")
372            .bind(&id)
373            .fetch_all(&st.pool)
374            .await
375            .unwrap_or_default();
376    for (ev,) in &evidence {
377        if let Some(name) = ev.as_deref().and_then(|u| u.rsplit('/').next()) {
378            let _ = tokio::fs::remove_file(st.cfg.snapshots_dir.join(name)).await;
379        }
380    }
381    let _ = sqlx::query("DELETE FROM zone_events WHERE camera_id = ?")
382        .bind(&id)
383        .execute(&st.pool)
384        .await;
385    sqlx::query("DELETE FROM cameras WHERE id = ?")
386        .bind(&id)
387        .execute(&st.pool)
388        .await?;
389    // Stop any AI sampler for this camera (its ai_tasks cascade-deleted) and remove its on-disk data.
390    st.sampler.reconcile().await;
391    let _ = tokio::fs::remove_dir_all(st.cfg.camera_recordings_dir(&id)).await;
392    let _ = tokio::fs::remove_dir_all(st.cfg.camera_frames_dir(&id)).await;
393    if let Some(dir) = &st.cfg.mirror_recordings_dir {
394        let _ = tokio::fs::remove_dir_all(dir.join(&id)).await;
395    }
396    auth::audit(
397        &st.pool,
398        &principal,
399        "delete_camera",
400        "camera",
401        &id,
402        json!({}),
403    )
404    .await;
405    Ok(StatusCode::NO_CONTENT)
406}
407
408/// Probe the camera's recording stream to confirm reachability and read its codec/dimensions.
409async fn test_camera(
410    State(st): State<AppState>,
411    principal: Principal,
412    Path(id): Path<String>,
413) -> AppResult<Json<Value>> {
414    principal.require(principal.can_view(), "test camera connectivity")?;
415    let cam = load_camera(&st.pool, &id).await?;
416    let url = camera_url::record_url(&cam)
417        .ok_or_else(|| AppError::BadRequest("camera has no stream URL".into()))?;
418
419    let probe = tokio::time::timeout(
420        std::time::Duration::from_secs(12),
421        util::ffprobe_stream(&st.cfg.ffprobe_bin, &url),
422    )
423    .await;
424
425    let result = match probe {
426        Ok(Ok(info)) => json!({
427            "reachable": true,
428            "codec": info.codec,
429            "width": info.width,
430            "height": info.height,
431            "url": camera_url::mask_url(&url),
432        }),
433        Ok(Err(e)) => json!({
434            "reachable": false,
435            "error": camera_url::mask_url(&e.to_string()),
436            "url": camera_url::mask_url(&url),
437        }),
438        Err(_) => json!({
439            "reachable": false,
440            "error": "probe timed out after 12s",
441            "url": camera_url::mask_url(&url),
442        }),
443    };
444    Ok(Json(result))
445}
446
447#[cfg(test)]
448mod tests {
449    use crate::config::Config;
450    use crate::services::recorder::RecorderManager;
451    use crate::services::sampler::SamplerManager;
452    use crate::state::AppState;
453    use axum::body::Body;
454    use axum::http::{Request, StatusCode};
455    use std::sync::Arc;
456    use tower::Service;
457
458    /// Build a minimal in-memory AppState (single-connection so migrations persist) with auth
459    /// toggled, for exercising the route-level Principal gate end to end.
460    async fn test_state(auth_enabled: bool) -> AppState {
461        let pool = sqlx::sqlite::SqlitePoolOptions::new()
462            .max_connections(1)
463            .connect("sqlite::memory:")
464            .await
465            .unwrap();
466        crate::db::run_migrations(&pool).await.unwrap();
467        let mut cfg = Config::from_env();
468        cfg.auth_enabled = auth_enabled;
469        let cfg = Arc::new(cfg);
470        AppState {
471            recorder: RecorderManager::new(pool.clone(), cfg.clone()),
472            sampler: SamplerManager::new(pool.clone(), cfg.clone()),
473            mirror: None,
474            consumers: Arc::new(Vec::new()),
475            modules: Arc::new(Vec::new()),
476            catalog: Arc::new(crate::services::registry::CatalogService::new(&cfg)),
477            http: reqwest::Client::new(),
478            started_at: chrono::Utc::now(),
479            pool,
480            cfg,
481        }
482    }
483
484    /// Send an unauthenticated GET /api/v1/cameras through the real router and report the status.
485    async fn unauthenticated_list_status(auth_enabled: bool) -> StatusCode {
486        let st = test_state(auth_enabled).await;
487        let mut app = super::router().with_state(st);
488        let req = Request::builder()
489            .method("GET")
490            .uri("/api/v1/cameras")
491            .body(Body::empty())
492            .unwrap();
493        app.call(req).await.unwrap().status()
494    }
495
496    /// With auth ENABLED, an unauthenticated request to a representative legacy route is rejected by
497    /// the Principal extractor (401) — the auth gap this batch closes.
498    #[tokio::test]
499    async fn legacy_route_rejects_unauthenticated_when_auth_enabled() {
500        assert_eq!(
501            unauthenticated_list_status(true).await,
502            StatusCode::UNAUTHORIZED
503        );
504    }
505
506    /// With auth DISABLED the Principal is the permissive system admin, so the new `require()` guard
507    /// is a behavioral NO-OP and the legacy route stays open (200).
508    #[tokio::test]
509    async fn legacy_route_open_when_auth_disabled() {
510        assert_eq!(unauthenticated_list_status(false).await, StatusCode::OK);
511    }
512}