Skip to main content

heldar_kernel/routes/
camera_config.rs

1//! Camera configuration (HikVision ISAPI) API: device identity, video-encoding, clock/NTP, ONVIF +
2//! ISAPI integration, on-screen-display overlays, reboot, and a cross-camera bulk apply.
3//!
4//! Reads (GET) are open to any authenticated principal (`can_view`); every mutation (PUT/POST) is
5//! gated by manager+ (`can_manage_registry`) and written to the immutable audit log. The handlers
6//! own persistence/audit; the device protocol lives behind the vendor-agnostic
7//! [`CameraConfigProvider`] built per-camera by [`camera_config::for_camera`]. `GET .../device_info`
8//! and `POST .../onvif/ensure_user` also refresh the `camera_isapi` cache row.
9//!
10//! The reboot endpoint is DISRUPTIVE: it refuses to act unless the body carries `confirm: true`. The
11//! bulk endpoint walks its target cameras SERIALLY, bounding each camera's action with a timeout and
12//! collecting a per-camera result so one unreachable device never aborts the run.
13
14use std::time::Duration;
15
16use axum::extract::{Path, State};
17use axum::routing::{get, post};
18use axum::{Json, Router};
19use chrono::Utc;
20use serde_json::{json, Value};
21
22use crate::auth::{self, Principal};
23use crate::error::{AppError, AppResult};
24use crate::routes::cameras::load_camera;
25use crate::services::camera_config::types::{
26    BulkAction, BulkCameraResult, BulkConfigRequest, BulkConfigResponse, DeviceInfo,
27    EnsureOnvifUserRequest, NtpConfig, OnvifSettings, OnvifUserType, OsdConfig, RebootRequest,
28    TimeConfig, VideoConfig, VideoConfigPatch,
29};
30use crate::services::camera_config::{self, CameraConfigProvider};
31use crate::state::AppState;
32
33/// HikVision main streaming channel; the bulk `SetVideo` action defaults to it when no channel given.
34const MAIN_CHANNEL: i64 = 101;
35/// Least-privilege ONVIF role provisioned by default (media + PTZ for Profile S; not device config).
36const DEFAULT_ONVIF_USER_TYPE: OnvifUserType = OnvifUserType::Operator;
37/// A bulk per-camera action runs several ISAPI requests (e.g. EnableOnvif = GET+PUT Integrate then
38/// GET+POST users); bound the whole action at this multiple of the single-request timeout so one
39/// stuck camera cannot stall the serial loop.
40const BULK_REQUEST_FACTOR: u64 = 6;
41
42pub fn router() -> Router<AppState> {
43    Router::new()
44        // Literal sibling of the `/{id}/config/*` param routes; axum/matchit resolves the static
45        // `config/bulk` ahead of capturing `id = "config"`, so it reaches the bulk handler.
46        .route("/api/v1/cameras/config/bulk", post(bulk_config))
47        .route(
48            "/api/v1/cameras/{id}/config/device_info",
49            get(get_device_info),
50        )
51        .route("/api/v1/cameras/{id}/config/video", get(get_video_list))
52        .route(
53            "/api/v1/cameras/{id}/config/video/{channel}",
54            get(get_video).put(put_video),
55        )
56        .route(
57            "/api/v1/cameras/{id}/config/time",
58            get(get_time).put(put_time),
59        )
60        .route(
61            "/api/v1/cameras/{id}/config/time/ntp",
62            get(get_ntp).put(put_ntp),
63        )
64        .route("/api/v1/cameras/{id}/config/time/sync_now", post(sync_now))
65        .route(
66            "/api/v1/cameras/{id}/config/onvif",
67            get(get_onvif_settings).put(put_onvif_settings),
68        )
69        .route(
70            "/api/v1/cameras/{id}/config/onvif/ensure_user",
71            post(ensure_onvif_user),
72        )
73        .route("/api/v1/cameras/{id}/config/osd", get(get_osd).put(put_osd))
74        .route("/api/v1/cameras/{id}/config/reboot", post(reboot))
75}
76
77/// Build a camera-config provider for `id`, 404ing when the camera is unknown and 400ing when it is
78/// not configurable (no address/credentials, or an unsupported vendor).
79async fn provider_for(st: &AppState, id: &str) -> AppResult<Box<dyn CameraConfigProvider>> {
80    let cam = load_camera(&st.pool, id).await?;
81    camera_config::for_camera(&cam, &st.http, st.cfg.isapi_request_timeout_ms)
82}
83
84// ========================= Device identity =========================
85
86async fn get_device_info(
87    State(st): State<AppState>,
88    Path(id): Path<String>,
89    principal: Principal,
90) -> AppResult<Json<DeviceInfo>> {
91    principal.require(principal.can_view(), "view camera configuration")?;
92    let provider = provider_for(&st, &id).await?;
93    let info = provider.get_device_info().await?;
94    // Refresh the per-camera ISAPI cache (identity columns only; integration/clock state preserved).
95    sqlx::query(
96        "INSERT INTO camera_isapi (camera_id, device_name, model, firmware_version, serial_number, fetched_at)
97         VALUES (?, ?, ?, ?, ?, ?)
98         ON CONFLICT(camera_id) DO UPDATE SET
99            device_name = excluded.device_name,
100            model = excluded.model,
101            firmware_version = excluded.firmware_version,
102            serial_number = excluded.serial_number,
103            fetched_at = excluded.fetched_at",
104    )
105    .bind(&id)
106    .bind(&info.device_name)
107    .bind(&info.model)
108    .bind(&info.firmware_version)
109    .bind(&info.serial_number)
110    .bind(Utc::now())
111    .execute(&st.pool)
112    .await?;
113    Ok(Json(info))
114}
115
116// ========================= Video encoding =========================
117
118async fn get_video_list(
119    State(st): State<AppState>,
120    Path(id): Path<String>,
121    principal: Principal,
122) -> AppResult<Json<Vec<VideoConfig>>> {
123    principal.require(principal.can_view(), "view camera video configuration")?;
124    let provider = provider_for(&st, &id).await?;
125    Ok(Json(provider.list_video_configs().await?))
126}
127
128async fn get_video(
129    State(st): State<AppState>,
130    Path((id, channel)): Path<(String, u32)>,
131    principal: Principal,
132) -> AppResult<Json<VideoConfig>> {
133    principal.require(principal.can_view(), "view camera video configuration")?;
134    let provider = provider_for(&st, &id).await?;
135    Ok(Json(provider.get_video_config(channel).await?))
136}
137
138async fn put_video(
139    State(st): State<AppState>,
140    Path((id, channel)): Path<(String, u32)>,
141    principal: Principal,
142    Json(patch): Json<VideoConfigPatch>,
143) -> AppResult<Json<VideoConfig>> {
144    principal.require(principal.can_manage_registry(), "configure camera video")?;
145    let provider = provider_for(&st, &id).await?;
146    let merged = merge_video(provider.get_video_config(channel).await?, &patch);
147    provider.put_video_config(channel, &merged).await?;
148    let updated = provider.get_video_config(channel).await?;
149    auth::audit(
150        &st.pool,
151        &principal,
152        "camera_config_put_video",
153        "camera",
154        &id,
155        json!({
156            "channel": channel,
157            "codec": updated.codec,
158            "width": updated.width,
159            "height": updated.height,
160            "fps": updated.fps,
161        }),
162    )
163    .await;
164    Ok(Json(updated))
165}
166
167// ========================= Clock / NTP =========================
168
169async fn get_time(
170    State(st): State<AppState>,
171    Path(id): Path<String>,
172    principal: Principal,
173) -> AppResult<Json<TimeConfig>> {
174    principal.require(principal.can_view(), "view camera clock")?;
175    let provider = provider_for(&st, &id).await?;
176    Ok(Json(provider.get_time_config().await?))
177}
178
179async fn put_time(
180    State(st): State<AppState>,
181    Path(id): Path<String>,
182    principal: Principal,
183    Json(cfg): Json<TimeConfig>,
184) -> AppResult<Json<TimeConfig>> {
185    principal.require(principal.can_manage_registry(), "configure camera clock")?;
186    let provider = provider_for(&st, &id).await?;
187    provider.put_time_config(&cfg).await?;
188    let updated = provider.get_time_config().await?;
189    auth::audit(
190        &st.pool,
191        &principal,
192        "camera_config_put_time",
193        "camera",
194        &id,
195        json!({ "time_mode": updated.time_mode, "time_zone": updated.time_zone }),
196    )
197    .await;
198    Ok(Json(updated))
199}
200
201async fn get_ntp(
202    State(st): State<AppState>,
203    Path(id): Path<String>,
204    principal: Principal,
205) -> AppResult<Json<NtpConfig>> {
206    principal.require(principal.can_view(), "view camera NTP server")?;
207    let provider = provider_for(&st, &id).await?;
208    Ok(Json(provider.get_ntp_config().await?))
209}
210
211async fn put_ntp(
212    State(st): State<AppState>,
213    Path(id): Path<String>,
214    principal: Principal,
215    Json(cfg): Json<NtpConfig>,
216) -> AppResult<Json<NtpConfig>> {
217    principal.require(
218        principal.can_manage_registry(),
219        "configure camera NTP server",
220    )?;
221    let provider = provider_for(&st, &id).await?;
222    provider.put_ntp_config(&cfg).await?;
223    let updated = provider.get_ntp_config().await?;
224    auth::audit(
225        &st.pool,
226        &principal,
227        "camera_config_put_ntp",
228        "camera",
229        &id,
230        json!({ "host_name": updated.host_name, "addressing_format": updated.addressing_format }),
231    )
232    .await;
233    Ok(Json(updated))
234}
235
236async fn sync_now(
237    State(st): State<AppState>,
238    Path(id): Path<String>,
239    principal: Principal,
240) -> AppResult<Json<TimeConfig>> {
241    principal.require(principal.can_manage_registry(), "sync camera clock")?;
242    let provider = provider_for(&st, &id).await?;
243    let updated = provider.sync_time_now().await?;
244    auth::audit(
245        &st.pool,
246        &principal,
247        "camera_config_sync_time",
248        "camera",
249        &id,
250        json!({ "time_mode": updated.time_mode }),
251    )
252    .await;
253    Ok(Json(updated))
254}
255
256// ========================= ONVIF / ISAPI integration =========================
257
258async fn get_onvif_settings(
259    State(st): State<AppState>,
260    Path(id): Path<String>,
261    principal: Principal,
262) -> AppResult<Json<OnvifSettings>> {
263    principal.require(principal.can_view(), "view camera ONVIF settings")?;
264    let provider = provider_for(&st, &id).await?;
265    Ok(Json(provider.get_onvif_settings().await?))
266}
267
268async fn put_onvif_settings(
269    State(st): State<AppState>,
270    Path(id): Path<String>,
271    principal: Principal,
272    Json(cfg): Json<OnvifSettings>,
273) -> AppResult<Json<OnvifSettings>> {
274    principal.require(
275        principal.can_manage_registry(),
276        "configure camera ONVIF settings",
277    )?;
278    let provider = provider_for(&st, &id).await?;
279    provider.put_onvif_settings(&cfg).await?;
280    let updated = provider.get_onvif_settings().await?;
281    auth::audit(
282        &st.pool,
283        &principal,
284        "camera_config_put_onvif",
285        "camera",
286        &id,
287        json!({ "onvif_enabled": updated.onvif_enabled, "isapi_enabled": updated.isapi_enabled }),
288    )
289    .await;
290    Ok(Json(updated))
291}
292
293async fn ensure_onvif_user(
294    State(st): State<AppState>,
295    Path(id): Path<String>,
296    principal: Principal,
297    Json(body): Json<EnsureOnvifUserRequest>,
298) -> AppResult<Json<Value>> {
299    principal.require(
300        principal.can_manage_registry(),
301        "provision a camera ONVIF user",
302    )?;
303    let provider = provider_for(&st, &id).await?;
304    // The provider treats a duplicate create as success and does not report created-vs-existed, so
305    // the cache flag is the kernel's record of whether it has already provisioned this user.
306    let already: bool = sqlx::query_scalar::<_, i64>(
307        "SELECT onvif_user_created FROM camera_isapi WHERE camera_id = ?",
308    )
309    .bind(&id)
310    .fetch_optional(&st.pool)
311    .await?
312    .map(|v| v != 0)
313    .unwrap_or(false);
314    let user_type = body.user_type.unwrap_or(DEFAULT_ONVIF_USER_TYPE);
315    provider
316        .ensure_onvif_user(&body.username, &body.password, user_type)
317        .await?;
318    sqlx::query(
319        "INSERT INTO camera_isapi (camera_id, onvif_user_created, fetched_at)
320         VALUES (?, 1, ?)
321         ON CONFLICT(camera_id) DO UPDATE SET onvif_user_created = 1, fetched_at = excluded.fetched_at",
322    )
323    .bind(&id)
324    .bind(Utc::now())
325    .execute(&st.pool)
326    .await?;
327    let created = !already;
328    auth::audit(
329        &st.pool,
330        &principal,
331        "camera_config_ensure_onvif_user",
332        "camera",
333        &id,
334        json!({ "username": body.username, "created": created }),
335    )
336    .await;
337    Ok(Json(json!({ "ok": true, "created": created })))
338}
339
340// ========================= On-screen-display overlays =========================
341
342async fn get_osd(
343    State(st): State<AppState>,
344    Path(id): Path<String>,
345    principal: Principal,
346) -> AppResult<Json<OsdConfig>> {
347    principal.require(principal.can_view(), "view camera OSD overlays")?;
348    let provider = provider_for(&st, &id).await?;
349    Ok(Json(provider.get_osd_config().await?))
350}
351
352async fn put_osd(
353    State(st): State<AppState>,
354    Path(id): Path<String>,
355    principal: Principal,
356    Json(cfg): Json<OsdConfig>,
357) -> AppResult<Json<OsdConfig>> {
358    principal.require(
359        principal.can_manage_registry(),
360        "configure camera OSD overlays",
361    )?;
362    let provider = provider_for(&st, &id).await?;
363    provider.put_osd_config(&cfg).await?;
364    let updated = provider.get_osd_config().await?;
365    auth::audit(
366        &st.pool,
367        &principal,
368        "camera_config_put_osd",
369        "camera",
370        &id,
371        json!({
372            "datetime_enabled": updated.datetime_enabled,
373            "channel_name_enabled": updated.channel_name_enabled,
374        }),
375    )
376    .await;
377    Ok(Json(updated))
378}
379
380// ========================= Reboot (DISRUPTIVE) =========================
381
382async fn reboot(
383    State(st): State<AppState>,
384    Path(id): Path<String>,
385    principal: Principal,
386    Json(body): Json<RebootRequest>,
387) -> AppResult<Json<Value>> {
388    principal.require(principal.can_manage_registry(), "reboot a camera")?;
389    let cam = load_camera(&st.pool, &id).await?; // 404 if missing
390    if !body.confirm {
391        tracing::warn!(camera_id = %id, "camera reboot rejected: `confirm` was not true");
392        return Err(AppError::BadRequest(
393            "rebooting a camera is disruptive; resend with `confirm: true`".into(),
394        ));
395    }
396    let provider = camera_config::for_camera(&cam, &st.http, st.cfg.isapi_request_timeout_ms)?;
397    provider.reboot().await?;
398    auth::audit(
399        &st.pool,
400        &principal,
401        "camera_config_reboot",
402        "camera",
403        &id,
404        json!({ "confirm": true }),
405    )
406    .await;
407    Ok(Json(json!({ "ok": true, "rebooting": true })))
408}
409
410// ========================= Bulk apply =========================
411
412async fn bulk_config(
413    State(st): State<AppState>,
414    principal: Principal,
415    Json(body): Json<BulkConfigRequest>,
416) -> AppResult<Json<BulkConfigResponse>> {
417    principal.require(
418        principal.can_manage_registry(),
419        "run a bulk camera configuration",
420    )?;
421
422    // Resolve the target set: an explicit list, or every enabled camera.
423    let ids: Vec<String> = match &body.camera_ids {
424        Some(list) => list.clone(),
425        None => {
426            sqlx::query_scalar::<_, String>(
427                "SELECT id FROM cameras WHERE enabled = 1 ORDER BY id ASC",
428            )
429            .fetch_all(&st.pool)
430            .await?
431        }
432    };
433
434    let per_camera = Duration::from_millis(
435        st.cfg
436            .isapi_request_timeout_ms
437            .saturating_mul(BULK_REQUEST_FACTOR)
438            .max(2000),
439    );
440
441    // SERIAL loop: one slow/unreachable camera is bounded by `per_camera` and never aborts the run.
442    let mut results: Vec<BulkCameraResult> = Vec::with_capacity(ids.len());
443    for cam_id in &ids {
444        let outcome = run_bulk_for_camera(&st, cam_id, &body.action, per_camera).await;
445        results.push(match outcome {
446            Ok(()) => BulkCameraResult {
447                camera_id: cam_id.clone(),
448                ok: true,
449                error: None,
450            },
451            Err(e) => BulkCameraResult {
452                camera_id: cam_id.clone(),
453                ok: false,
454                error: Some(e.to_string()),
455            },
456        });
457    }
458
459    let succeeded = results.iter().filter(|r| r.ok).count();
460    let failed = results.len() - succeeded;
461    auth::audit(
462        &st.pool,
463        &principal,
464        "camera_config_bulk",
465        "camera",
466        "*",
467        json!({
468            "action": action_name(&body.action),
469            "targets": ids.len(),
470            "succeeded": succeeded,
471            "failed": failed,
472        }),
473    )
474    .await;
475    Ok(Json(BulkConfigResponse {
476        results,
477        succeeded,
478        failed,
479    }))
480}
481
482/// Build a provider for one camera and run the bulk action, bounded by `per_camera`.
483async fn run_bulk_for_camera(
484    st: &AppState,
485    cam_id: &str,
486    action: &BulkAction,
487    per_camera: Duration,
488) -> AppResult<()> {
489    let provider = provider_for(st, cam_id).await?;
490    match tokio::time::timeout(per_camera, apply_bulk_action(provider.as_ref(), action)).await {
491        Ok(res) => res,
492        Err(_) => Err(AppError::Other(anyhow::anyhow!(
493            "camera configuration action timed out"
494        ))),
495    }
496}
497
498/// Apply a single [`BulkAction`] against a live provider.
499async fn apply_bulk_action(
500    provider: &dyn CameraConfigProvider,
501    action: &BulkAction,
502) -> AppResult<()> {
503    match action {
504        BulkAction::EnableOnvif {
505            onvif_username,
506            onvif_password,
507        } => {
508            provider
509                .put_onvif_settings(&OnvifSettings {
510                    onvif_enabled: true,
511                    isapi_enabled: true,
512                })
513                .await?;
514            provider
515                .ensure_onvif_user(onvif_username, onvif_password, DEFAULT_ONVIF_USER_TYPE)
516                .await?;
517        }
518        BulkAction::SyncTime { ntp_server } => {
519            if let Some(server) = ntp_server {
520                provider.put_ntp_config(&ntp_config_for(server)).await?;
521            }
522            provider.sync_time_now().await?;
523        }
524        BulkAction::SetNtp { ntp_server } => {
525            provider.put_ntp_config(&ntp_config_for(ntp_server)).await?;
526        }
527        BulkAction::SetVideo { channel, patch } => {
528            let ch = channel.unwrap_or(MAIN_CHANNEL) as u32;
529            let merged = merge_video(provider.get_video_config(ch).await?, patch);
530            provider.put_video_config(ch, &merged).await?;
531        }
532    }
533    Ok(())
534}
535
536// ========================= helpers =========================
537
538/// Overlay a [`VideoConfigPatch`]'s set fields onto a full [`VideoConfig`] (read-modify-write).
539fn merge_video(mut cfg: VideoConfig, patch: &VideoConfigPatch) -> VideoConfig {
540    if let Some(v) = &patch.codec {
541        cfg.codec = v.clone();
542    }
543    if let Some(v) = patch.width {
544        cfg.width = v;
545    }
546    if let Some(v) = patch.height {
547        cfg.height = v;
548    }
549    if let Some(v) = patch.fps {
550        cfg.fps = v;
551    }
552    if let Some(v) = &patch.quality_control {
553        cfg.quality_control = v.clone();
554    }
555    if let Some(v) = patch.bitrate {
556        cfg.bitrate = v;
557    }
558    if let Some(v) = patch.vbr_upper_cap {
559        cfg.vbr_upper_cap = v;
560    }
561    if let Some(v) = patch.gop {
562        cfg.gop = v;
563    }
564    cfg
565}
566
567/// Build an [`NtpConfig`] from a bare server string, inferring `ipaddress` vs `hostname`.
568fn ntp_config_for(server: &str) -> NtpConfig {
569    let addressing_format = if server.parse::<std::net::IpAddr>().is_ok() {
570        "ipaddress"
571    } else {
572        "hostname"
573    };
574    NtpConfig {
575        addressing_format: addressing_format.to_string(),
576        host_name: server.to_string(),
577        port: 123,
578    }
579}
580
581/// Stable label for a bulk action (audit detail).
582fn action_name(action: &BulkAction) -> &'static str {
583    match action {
584        BulkAction::EnableOnvif { .. } => "enable_onvif",
585        BulkAction::SyncTime { .. } => "sync_time",
586        BulkAction::SetNtp { .. } => "set_ntp",
587        BulkAction::SetVideo { .. } => "set_video",
588    }
589}
590
591#[cfg(test)]
592mod tests {
593    use axum::body::Body;
594    use axum::extract::Path;
595    use axum::http::{Request, StatusCode};
596    use axum::routing::{get, post};
597    use axum::Router;
598    use tower::Service;
599
600    async fn send(app: &mut Router, method: &str, uri: &str) -> (StatusCode, String) {
601        let req = Request::builder()
602            .method(method)
603            .uri(uri)
604            .body(Body::empty())
605            .unwrap();
606        let resp = app.call(req).await.unwrap();
607        let status = resp.status();
608        let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX)
609            .await
610            .unwrap();
611        (status, String::from_utf8_lossy(&bytes).into_owned())
612    }
613
614    /// The literal `cameras/config/bulk` route is a sibling of the `cameras/{id}/config/*` param
615    /// routes; axum/matchit must resolve the static segment ahead of capturing `id = "config"`, so a
616    /// POST reaches the bulk handler. Also assert the real kernel router (camera_config merged before
617    /// cameras) builds without a route conflict.
618    #[tokio::test]
619    async fn bulk_route_beats_id_param() {
620        // The full kernel router merges camera_config before cameras; constructing it would panic on
621        // any conflicting-route overlap.
622        let _ = crate::routes::api_router();
623
624        let mut app: Router = Router::new()
625            .route("/api/v1/cameras/config/bulk", post(|| async { "bulk" }))
626            .route(
627                "/api/v1/cameras/{id}/config/device_info",
628                get(|Path(id): Path<String>| async move { format!("device_info:{id}") }),
629            );
630
631        // The static bulk path wins over the {id} param.
632        let (status, body) = send(&mut app, "POST", "/api/v1/cameras/config/bulk").await;
633        assert_eq!(status, StatusCode::OK);
634        assert_eq!(body, "bulk");
635
636        // A genuine id still routes to the per-camera param handler.
637        let (status, body) =
638            send(&mut app, "GET", "/api/v1/cameras/cam-1/config/device_info").await;
639        assert_eq!(status, StatusCode::OK);
640        assert_eq!(body, "device_info:cam-1");
641    }
642}