Skip to main content

heldar_kernel/services/
onvif.rs

1//! ONVIF (Profile S MVP): WS-Discovery, a device probe, and PTZ control.
2//!
3//! All SOAP is hand-rolled with `format!` and parsed by substring extraction (no XML crate, per the
4//! kernel's offline-build constraint). Authentication uses a WS-Security UsernameToken with a
5//! `PasswordDigest` = `base64(sha1(nonce + created + password))` built from the existing `sha1` +
6//! `base64` crates.
7//!
8//! Scope (intentionally narrow): device identification (GetDeviceInformation), service discovery
9//! (GetCapabilities, falling back to GetServices), media profiles (GetProfiles), best-effort stream
10//! URI (GetStreamUri), and PTZ (ContinuousMove / Stop / GetPresets / GotoPreset). Events, Profile G
11//! (recording/replay), Profile T, imaging, and absolute/relative moves are out of scope.
12
13use std::collections::HashSet;
14use std::time::Duration;
15
16use argon2::password_hash::rand_core::OsRng;
17use base64::engine::general_purpose::STANDARD as BASE64;
18use base64::Engine as _;
19use chrono::{SecondsFormat, Utc};
20use rand_core::RngCore;
21use serde::Serialize;
22use serde_json::{json, Value};
23use sha1::{Digest, Sha1};
24use sqlx::types::Json as SqlxJson;
25use sqlx::SqlitePool;
26use tokio::net::UdpSocket;
27use uuid::Uuid;
28
29use crate::camera_url;
30use crate::config::Config;
31use crate::error::{AppError, AppResult};
32use crate::models::{Camera, CameraOnvif, PtzPreset};
33use crate::state::AppState;
34
35const WSDD_MULTICAST: &str = "239.255.255.250:3702";
36
37// ONVIF / WS-* namespace + action constants.
38const NS_DEVICE: &str = "http://www.onvif.org/ver10/device/wsdl";
39const NS_MEDIA: &str = "http://www.onvif.org/ver10/media/wsdl";
40const NS_PTZ: &str = "http://www.onvif.org/ver20/ptz/wsdl";
41
42// ========================= XML helpers (substring extraction) =========================
43//
44// These tolerate namespace prefixes and attributes on tags. They assume the small, well-formed SOAP
45// bodies ONVIF devices return (no same-name nesting), which holds for every element we read here.
46
47/// Locate the first element with local name `local` at/after byte `from`. Returns
48/// `(open_lt, open_gt, self_closing)`: index of the opening `<`, index of that tag's `>`, and whether
49/// the element is self-closing (`/>`). Comments, declarations, and closing tags are skipped.
50fn find_open(xml: &str, local: &str, from: usize) -> Option<(usize, usize, bool)> {
51    let bytes = xml.as_bytes();
52    let mut i = from.min(xml.len());
53    while let Some(rel) = xml[i..].find('<') {
54        let lt = i + rel;
55        match bytes.get(lt + 1).copied() {
56            Some(b'/') | Some(b'!') | Some(b'?') => {
57                i = lt + 1;
58                continue;
59            }
60            _ => {}
61        }
62        let name_start = lt + 1;
63        let gt_rel = xml[name_start..].find('>')?;
64        let gt = name_start + gt_rel;
65        let self_closing = gt > name_start && bytes.get(gt - 1).copied() == Some(b'/');
66        let tag = &xml[name_start..gt];
67        let head = tag.split([' ', '\t', '\n', '\r', '/']).next().unwrap_or("");
68        let local_name = head.rsplit(':').next().unwrap_or(head);
69        if local_name == local {
70            return Some((lt, gt, self_closing));
71        }
72        i = gt + 1;
73    }
74    None
75}
76
77/// Find the byte offset of the first closing tag `</...local>` in `xml`.
78fn find_close(xml: &str, local: &str) -> Option<usize> {
79    let mut i = 0;
80    while let Some(rel) = xml[i..].find("</") {
81        let pos = i + rel;
82        let after = &xml[pos + 2..];
83        let gt_rel = after.find('>')?;
84        let name = after[..gt_rel].trim();
85        let local_name = name.rsplit(':').next().unwrap_or(name);
86        if local_name == local {
87            return Some(pos);
88        }
89        i = pos + 2;
90    }
91    None
92}
93
94/// Inner XML (raw) of the first element with local name `local`.
95fn first_inner<'a>(xml: &'a str, local: &str) -> Option<&'a str> {
96    let (_lt, gt, self_closing) = find_open(xml, local, 0)?;
97    if self_closing {
98        return Some("");
99    }
100    let cs = gt + 1;
101    let close_rel = find_close(&xml[cs..], local)?;
102    Some(&xml[cs..cs + close_rel])
103}
104
105/// Trimmed, entity-decoded text content of the first element with local name `local`. Returns `None`
106/// when the element is absent or its text is empty.
107fn first_text(xml: &str, local: &str) -> Option<String> {
108    let inner = first_inner(xml, local)?;
109    let t = inner.trim();
110    if t.is_empty() {
111        None
112    } else {
113        Some(xml_unescape(t))
114    }
115}
116
117/// All elements with local name `local`, returned as `(opening_tag, inner_xml)` pairs.
118fn elements<'a>(xml: &'a str, local: &str) -> Vec<(&'a str, &'a str)> {
119    let mut out = Vec::new();
120    let mut from = 0;
121    while let Some((lt, gt, self_closing)) = find_open(xml, local, from) {
122        let open = &xml[lt..=gt];
123        if self_closing {
124            out.push((open, ""));
125            from = gt + 1;
126            continue;
127        }
128        let cs = gt + 1;
129        match find_close(&xml[cs..], local) {
130            Some(close_rel) => {
131                out.push((open, &xml[cs..cs + close_rel]));
132                from = cs + close_rel;
133            }
134            None => break,
135        }
136    }
137    out
138}
139
140/// Extract the value of attribute `name` from an opening tag string (e.g. `<tptz:Preset token="P1">`).
141fn attr_in_tag(tag: &str, name: &str) -> Option<String> {
142    let bytes = tag.as_bytes();
143    let mut i = 0;
144    while let Some(rel) = tag[i..].find(name) {
145        let pos = i + rel;
146        let before_ok = pos == 0
147            || matches!(bytes.get(pos - 1), Some(b) if b.is_ascii_whitespace() || *b == b'<');
148        let after = &tag[pos + name.len()..];
149        let after_trim = after.trim_start();
150        if before_ok && after_trim.starts_with('=') {
151            let rest = after_trim[1..].trim_start();
152            let quote = rest.chars().next()?;
153            if quote == '"' || quote == '\'' {
154                let val = &rest[1..];
155                let end = val.find(quote)?;
156                return Some(xml_unescape(&val[..end]));
157            }
158        }
159        i = pos + name.len();
160    }
161    None
162}
163
164/// Decode the five predefined XML entities (enough for ONVIF text/attribute values).
165fn xml_unescape(s: &str) -> String {
166    s.replace("&lt;", "<")
167        .replace("&gt;", ">")
168        .replace("&quot;", "\"")
169        .replace("&apos;", "'")
170        .replace("&amp;", "&")
171}
172
173/// Escape the characters that are not safe in XML text / attribute values.
174fn xml_escape(s: &str) -> String {
175    s.replace('&', "&amp;")
176        .replace('<', "&lt;")
177        .replace('>', "&gt;")
178        .replace('"', "&quot;")
179        .replace('\'', "&apos;")
180}
181
182/// Extract a SOAP fault reason from a fault body, if present.
183fn fault_reason(xml: &str) -> Option<String> {
184    first_text(xml, "Text").or_else(|| first_text(xml, "faultstring"))
185}
186
187/// Extract the host (no scheme/userinfo/port/path) from a URL.
188fn host_of(url: &str) -> Option<String> {
189    let after = url.split("://").nth(1)?;
190    let authority = after.split('/').next()?;
191    let authority = authority.rsplit('@').next().unwrap_or(authority);
192    let host = authority.split(':').next()?;
193    (!host.is_empty()).then(|| host.to_string())
194}
195
196// ========================= SOAP envelope + WS-Security =========================
197
198/// Build the WS-Security UsernameToken header (PasswordDigest). Empty when no username is set
199/// (anonymous request — many devices allow GetDeviceInformation/GetCapabilities without auth).
200fn security_header(user: Option<&str>, pass: Option<&str>) -> String {
201    let Some(user) = user.filter(|u| !u.is_empty()) else {
202        return String::new();
203    };
204    let pass = pass.unwrap_or("");
205    let mut nonce = [0u8; 16];
206    OsRng.fill_bytes(&mut nonce);
207    let created = Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true);
208    let mut hasher = Sha1::new();
209    hasher.update(nonce);
210    hasher.update(created.as_bytes());
211    hasher.update(pass.as_bytes());
212    let digest = BASE64.encode(hasher.finalize());
213    let nonce_b64 = BASE64.encode(nonce);
214    format!(
215        "<wsse:Security s:mustUnderstand=\"1\">\
216<wsse:UsernameToken>\
217<wsse:Username>{user}</wsse:Username>\
218<wsse:Password Type=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-username-token-profile-1.0#PasswordDigest\">{digest}</wsse:Password>\
219<wsse:Nonce EncodingType=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary\">{nonce_b64}</wsse:Nonce>\
220<wsu:Created>{created}</wsu:Created>\
221</wsse:UsernameToken>\
222</wsse:Security>",
223        user = xml_escape(user),
224    )
225}
226
227/// Wrap a SOAP body in an envelope carrying every namespace prefix the service calls use.
228fn envelope(security: &str, body: &str) -> String {
229    format!(
230        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
231<s:Envelope \
232xmlns:s=\"http://www.w3.org/2003/05/soap-envelope\" \
233xmlns:tds=\"{NS_DEVICE}\" \
234xmlns:trt=\"{NS_MEDIA}\" \
235xmlns:tptz=\"{NS_PTZ}\" \
236xmlns:tt=\"http://www.onvif.org/ver10/schema\" \
237xmlns:wsse=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd\" \
238xmlns:wsu=\"http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd\">\
239<s:Header>{security}</s:Header><s:Body>{body}</s:Body></s:Envelope>"
240    )
241}
242
243/// POST a SOAP envelope to an ONVIF service endpoint and return the response body text. ONVIF SOAP
244/// faults (often returned with HTTP 4xx/5xx) are surfaced as a clear error with the fault reason.
245async fn soap_call(
246    state: &AppState,
247    url: &str,
248    action: &str,
249    envelope: String,
250) -> AppResult<String> {
251    let timeout = Duration::from_millis(state.cfg.onvif_request_timeout_ms.max(500));
252    let content_type = format!("application/soap+xml; charset=utf-8; action=\"{action}\"");
253    let resp = state
254        .http
255        .post(url)
256        .header(reqwest::header::CONTENT_TYPE, content_type)
257        .timeout(timeout)
258        .body(envelope)
259        .send()
260        .await
261        .map_err(|e| AppError::Other(anyhow::anyhow!("ONVIF request to {url} failed: {e}")))?;
262    let status = resp.status();
263    let text = resp.text().await.unwrap_or_default();
264    if !status.is_success() {
265        let reason = fault_reason(&text).unwrap_or_else(|| format!("HTTP {status}"));
266        return Err(AppError::Other(anyhow::anyhow!("ONVIF fault: {reason}")));
267    }
268    Ok(text)
269}
270
271// ========================= WS-Discovery =========================
272
273/// A device found by WS-Discovery.
274#[derive(Debug, Clone, Serialize)]
275pub struct DiscoveredOnvifDevice {
276    /// The device's `wsa:EndpointReference/Address` (a `urn:uuid:` URN), if present.
277    pub endpoint_reference: Option<String>,
278    /// The first transport address (the ONVIF device service URL we would probe).
279    pub device_url: String,
280    /// All advertised transport addresses.
281    pub xaddrs: Vec<String>,
282    /// Host extracted from `device_url` (matches a camera's `address`).
283    pub address: Option<String>,
284    /// Advertised device types (e.g. `dn:NetworkVideoTransmitter`).
285    pub types: Option<String>,
286    /// Advertised scope URIs (name/hardware/location hints).
287    pub scopes: Vec<String>,
288}
289
290fn discovery_probe(msg_id: &str) -> String {
291    format!(
292        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
293<e:Envelope \
294xmlns:e=\"http://www.w3.org/2003/05/soap-envelope\" \
295xmlns:w=\"http://schemas.xmlsoap.org/ws/2004/08/addressing\" \
296xmlns:d=\"http://schemas.xmlsoap.org/ws/2005/04/discovery\" \
297xmlns:dn=\"http://www.onvif.org/ver10/network/wsdl\">\
298<e:Header>\
299<w:MessageID>urn:uuid:{msg_id}</w:MessageID>\
300<w:To e:mustUnderstand=\"true\">urn:schemas-xmlsoap-org:ws:2005:04:discovery</w:To>\
301<w:Action e:mustUnderstand=\"true\">http://schemas.xmlsoap.org/ws/2005/04/discovery/Probe</w:Action>\
302</e:Header>\
303<e:Body><d:Probe><d:Types>dn:NetworkVideoTransmitter</d:Types></d:Probe></e:Body>\
304</e:Envelope>"
305    )
306}
307
308/// Multicast a WS-Discovery Probe and collect ProbeMatch replies for the configured window.
309pub async fn discover(cfg: &Config) -> AppResult<Vec<DiscoveredOnvifDevice>> {
310    let window = Duration::from_millis(cfg.onvif_discovery_timeout_ms.max(200));
311    let socket = UdpSocket::bind("0.0.0.0:0")
312        .await
313        .map_err(|e| AppError::Other(anyhow::anyhow!("ONVIF discovery: bind UDP socket: {e}")))?;
314    let _ = socket.set_broadcast(true);
315
316    let probe = discovery_probe(&Uuid::new_v4().to_string());
317    socket
318        .send_to(probe.as_bytes(), WSDD_MULTICAST)
319        .await
320        .map_err(|e| AppError::Other(anyhow::anyhow!("ONVIF discovery: send Probe: {e}")))?;
321
322    let mut devices: Vec<DiscoveredOnvifDevice> = Vec::new();
323    let mut seen: HashSet<String> = HashSet::new();
324    let deadline = tokio::time::Instant::now() + window;
325    let mut buf = vec![0u8; 65_535];
326    loop {
327        let now = tokio::time::Instant::now();
328        if now >= deadline {
329            break;
330        }
331        let remaining = deadline - now;
332        match tokio::time::timeout(remaining, socket.recv_from(&mut buf)).await {
333            Ok(Ok((n, _src))) => {
334                let xml = String::from_utf8_lossy(&buf[..n]);
335                parse_probe_matches(&xml, &mut devices, &mut seen);
336            }
337            Ok(Err(e)) => {
338                tracing::warn!(error = %e, "ONVIF discovery: recv error");
339                break;
340            }
341            Err(_) => break, // window elapsed
342        }
343    }
344    Ok(devices)
345}
346
347/// Parse every `ProbeMatch` in a discovery reply envelope into deduplicated devices.
348fn parse_probe_matches(
349    xml: &str,
350    out: &mut Vec<DiscoveredOnvifDevice>,
351    seen: &mut HashSet<String>,
352) {
353    for (_open, inner) in elements(xml, "ProbeMatch") {
354        let xaddrs: Vec<String> = first_text(inner, "XAddrs")
355            .map(|s| s.split_whitespace().map(|x| x.to_string()).collect())
356            .unwrap_or_default();
357        let Some(device_url) = xaddrs.first().cloned() else {
358            continue;
359        };
360        if !seen.insert(device_url.clone()) {
361            continue;
362        }
363        let scopes: Vec<String> = first_text(inner, "Scopes")
364            .map(|s| s.split_whitespace().map(|x| x.to_string()).collect())
365            .unwrap_or_default();
366        out.push(DiscoveredOnvifDevice {
367            endpoint_reference: first_text(inner, "Address"),
368            address: host_of(&device_url),
369            types: first_text(inner, "Types"),
370            scopes,
371            xaddrs,
372            device_url,
373        });
374    }
375}
376
377// ========================= Probe (identify + capabilities + profiles) =========================
378
379/// The chosen media profile and PTZ binding extracted from GetProfiles.
380struct ProfileChoice {
381    token: Option<String>,
382    node_token: Option<String>,
383    has_ptz_config: bool,
384}
385
386/// Pick a media profile: prefer the first one carrying a PTZConfiguration; otherwise the first
387/// profile. Returns its token, bound PTZ node token, and whether it has a PTZConfiguration.
388fn parse_profiles(xml: &str) -> ProfileChoice {
389    let mut first: Option<(Option<String>, Option<String>, bool)> = None;
390    for (open, inner) in elements(xml, "Profiles") {
391        let token = attr_in_tag(open, "token");
392        let ptz_cfg = first_inner(inner, "PTZConfiguration");
393        let has_ptz = ptz_cfg.is_some();
394        let node_token = ptz_cfg.and_then(|c| first_text(c, "NodeToken"));
395        if has_ptz {
396            return ProfileChoice {
397                token,
398                node_token,
399                has_ptz_config: true,
400            };
401        }
402        if first.is_none() {
403            first = Some((token, node_token, has_ptz));
404        }
405    }
406    match first {
407        Some((token, node_token, has_ptz_config)) => ProfileChoice {
408            token,
409            node_token,
410            has_ptz_config,
411        },
412        None => ProfileChoice {
413            token: None,
414            node_token: None,
415            has_ptz_config: false,
416        },
417    }
418}
419
420/// Resolve the ONVIF device service URL for a camera: explicit override, then any previously probed
421/// URL, then the standard path derived from the camera's address.
422async fn resolve_device_url(
423    pool: &SqlitePool,
424    cam: &Camera,
425    override_url: Option<String>,
426) -> AppResult<String> {
427    if let Some(u) = override_url
428        .map(|s| s.trim().to_string())
429        .filter(|s| !s.is_empty())
430    {
431        camera_url::validate_stream_url(&u).map_err(AppError::BadRequest)?;
432        return Ok(u);
433    }
434    let existing: Option<String> =
435        sqlx::query_scalar("SELECT device_url FROM camera_onvif WHERE camera_id = ?")
436            .bind(&cam.id)
437            .fetch_optional(pool)
438            .await?;
439    if let Some(u) = existing.filter(|s| !s.trim().is_empty()) {
440        return Ok(u);
441    }
442    let host = cam
443        .address
444        .as_deref()
445        .map(str::trim)
446        .filter(|s| !s.is_empty())
447        .ok_or_else(|| {
448            AppError::BadRequest(
449                "camera has no address; set its address or pass an explicit `device_url`".into(),
450            )
451        })?;
452    Ok(format!("http://{host}/onvif/device_service"))
453}
454
455/// Probe a camera's ONVIF interface and persist the result into `camera_onvif`.
456pub async fn probe(
457    state: &AppState,
458    camera_id: &str,
459    device_url_override: Option<String>,
460) -> AppResult<CameraOnvif> {
461    let cam: Camera = sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
462        .bind(camera_id)
463        .fetch_optional(&state.pool)
464        .await?
465        .ok_or_else(|| AppError::NotFound(format!("camera {camera_id} not found")))?;
466
467    let device_url = resolve_device_url(&state.pool, &cam, device_url_override).await?;
468    let user = cam.username.as_deref();
469    let pass = cam.password.as_deref();
470
471    // 1) Device identification.
472    let info = soap_call(
473        state,
474        &device_url,
475        &format!("{NS_DEVICE}/GetDeviceInformation"),
476        envelope(&security_header(user, pass), "<tds:GetDeviceInformation/>"),
477    )
478    .await?;
479    let manufacturer = first_text(&info, "Manufacturer");
480    let model = first_text(&info, "Model");
481    let firmware_version = first_text(&info, "FirmwareVersion");
482    let serial_number = first_text(&info, "SerialNumber");
483    let hardware_id = first_text(&info, "HardwareId");
484
485    // 2) Service endpoints: GetCapabilities first, GetServices as a fallback.
486    let (mut media_url, mut ptz_url) = match soap_call(
487        state,
488        &device_url,
489        &format!("{NS_DEVICE}/GetCapabilities"),
490        envelope(
491            &security_header(user, pass),
492            "<tds:GetCapabilities><tds:Category>All</tds:Category></tds:GetCapabilities>",
493        ),
494    )
495    .await
496    {
497        Ok(caps) => parse_capabilities(&caps),
498        Err(e) => {
499            tracing::warn!(%camera_id, error = %e, "ONVIF: GetCapabilities failed; trying GetServices");
500            (None, None)
501        }
502    };
503    if media_url.is_none() {
504        if let Ok(services) = soap_call(
505            state,
506            &device_url,
507            &format!("{NS_DEVICE}/GetServices"),
508            envelope(
509                &security_header(user, pass),
510                "<tds:GetServices><tds:IncludeCapability>false</tds:IncludeCapability></tds:GetServices>",
511            ),
512        )
513        .await
514        {
515            let (m, p) = parse_services(&services);
516            media_url = media_url.or(m);
517            ptz_url = ptz_url.or(p);
518        }
519    }
520
521    // 3) Media profiles (profile token + PTZ binding). Needs the media service URL.
522    let mut profile = ProfileChoice {
523        token: None,
524        node_token: None,
525        has_ptz_config: false,
526    };
527    if let Some(murl) = media_url.as_deref() {
528        match soap_call(
529            state,
530            murl,
531            &format!("{NS_MEDIA}/GetProfiles"),
532            envelope(&security_header(user, pass), "<trt:GetProfiles/>"),
533        )
534        .await
535        {
536            Ok(profiles) => profile = parse_profiles(&profiles),
537            Err(e) => tracing::warn!(%camera_id, error = %e, "ONVIF: GetProfiles failed"),
538        }
539    }
540
541    let ptz_enabled = ptz_url.is_some() && profile.has_ptz_config && profile.token.is_some();
542
543    // 4) Best-effort stream URI: only used to FILL a camera that has no recordable URL yet.
544    if let (Some(murl), Some(token)) = (media_url.as_deref(), profile.token.as_deref()) {
545        if camera_url::record_url(&cam).is_none() {
546            if let Some(uri) = get_stream_uri(state, murl, token, user, pass).await {
547                let with_creds = inject_creds(&uri, user, pass);
548                let _ = sqlx::query(
549                    "UPDATE cameras SET main_stream_url = ?, updated_at = ? WHERE id = ? AND (main_stream_url IS NULL OR main_stream_url = '')",
550                )
551                .bind(&with_creds)
552                .bind(Utc::now())
553                .bind(camera_id)
554                .execute(&state.pool)
555                .await;
556                tracing::info!(%camera_id, "ONVIF: filled main_stream_url from GetStreamUri");
557            }
558        }
559    }
560
561    // Preserve any scopes captured by a prior discovery (probe itself does not fetch scopes).
562    let scopes: Value =
563        sqlx::query_scalar::<_, String>("SELECT scopes FROM camera_onvif WHERE camera_id = ?")
564            .bind(camera_id)
565            .fetch_optional(&state.pool)
566            .await?
567            .and_then(|s| serde_json::from_str(&s).ok())
568            .unwrap_or_else(|| json!([]));
569
570    let now = Utc::now();
571    sqlx::query(
572        "INSERT INTO camera_onvif
573           (camera_id, device_url, manufacturer, model, firmware_version, serial_number, hardware_id,
574            scopes, media_url, ptz_url, profile_token, ptz_node_token, ptz_enabled, probed_at)
575         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
576         ON CONFLICT(camera_id) DO UPDATE SET
577            device_url = excluded.device_url,
578            manufacturer = excluded.manufacturer,
579            model = excluded.model,
580            firmware_version = excluded.firmware_version,
581            serial_number = excluded.serial_number,
582            hardware_id = excluded.hardware_id,
583            scopes = excluded.scopes,
584            media_url = excluded.media_url,
585            ptz_url = excluded.ptz_url,
586            profile_token = excluded.profile_token,
587            ptz_node_token = excluded.ptz_node_token,
588            ptz_enabled = excluded.ptz_enabled,
589            probed_at = excluded.probed_at",
590    )
591    .bind(camera_id)
592    .bind(&device_url)
593    .bind(&manufacturer)
594    .bind(&model)
595    .bind(&firmware_version)
596    .bind(&serial_number)
597    .bind(&hardware_id)
598    .bind(SqlxJson(scopes))
599    .bind(&media_url)
600    .bind(&ptz_url)
601    .bind(&profile.token)
602    .bind(&profile.node_token)
603    .bind(ptz_enabled)
604    .bind(now)
605    .execute(&state.pool)
606    .await?;
607
608    load_onvif(&state.pool, camera_id).await
609}
610
611/// Parse media + PTZ service URLs from a GetCapabilities response.
612fn parse_capabilities(xml: &str) -> (Option<String>, Option<String>) {
613    let media_url = first_inner(xml, "Media").and_then(|b| first_text(b, "XAddr"));
614    let ptz_url = first_inner(xml, "PTZ").and_then(|b| first_text(b, "XAddr"));
615    (media_url, ptz_url)
616}
617
618/// Parse media + PTZ service URLs from a GetServices response (one `Service` per namespace).
619fn parse_services(xml: &str) -> (Option<String>, Option<String>) {
620    let mut media_url = None;
621    let mut ptz_url = None;
622    for (_open, inner) in elements(xml, "Service") {
623        let ns = first_text(inner, "Namespace").unwrap_or_default();
624        let xaddr = first_text(inner, "XAddr");
625        if ns.contains("/media/") && media_url.is_none() {
626            media_url = xaddr;
627        } else if ns.contains("/ptz/") && ptz_url.is_none() {
628            ptz_url = xaddr;
629        }
630    }
631    (media_url, ptz_url)
632}
633
634/// Best-effort GetStreamUri (RTSP unicast) for a media profile.
635async fn get_stream_uri(
636    state: &AppState,
637    media_url: &str,
638    profile_token: &str,
639    user: Option<&str>,
640    pass: Option<&str>,
641) -> Option<String> {
642    let body = format!(
643        "<trt:GetStreamUri>\
644<trt:StreamSetup><tt:Stream>RTP-Unicast</tt:Stream>\
645<tt:Transport><tt:Protocol>RTSP</tt:Protocol></tt:Transport></trt:StreamSetup>\
646<trt:ProfileToken>{}</trt:ProfileToken>\
647</trt:GetStreamUri>",
648        xml_escape(profile_token)
649    );
650    match soap_call(
651        state,
652        media_url,
653        &format!("{NS_MEDIA}/GetStreamUri"),
654        envelope(&security_header(user, pass), &body),
655    )
656    .await
657    {
658        Ok(resp) => first_text(&resp, "Uri").filter(|u| u.starts_with("rtsp://")),
659        Err(e) => {
660            tracing::warn!(error = %e, "ONVIF: GetStreamUri failed");
661            None
662        }
663    }
664}
665
666/// Inject `user:pass@` userinfo into an `rtsp://` URI that lacks it (so the recorder can authenticate).
667fn inject_creds(uri: &str, user: Option<&str>, pass: Option<&str>) -> String {
668    let Some(user) = user.filter(|u| !u.is_empty()) else {
669        return uri.to_string();
670    };
671    let Some(rest) = uri.strip_prefix("rtsp://") else {
672        return uri.to_string();
673    };
674    let authority = rest.split('/').next().unwrap_or("");
675    if authority.contains('@') {
676        return uri.to_string(); // already has userinfo
677    }
678    let creds = match pass.filter(|p| !p.is_empty()) {
679        Some(p) => format!(
680            "{}:{}@",
681            camera_url::encode_userinfo(user),
682            camera_url::encode_userinfo(p)
683        ),
684        None => format!("{}@", camera_url::encode_userinfo(user)),
685    };
686    format!("rtsp://{creds}{rest}")
687}
688
689// ========================= PTZ control =========================
690
691/// Load a camera's persisted ONVIF profile (404 when the camera has not been probed yet).
692pub async fn load_onvif(pool: &SqlitePool, camera_id: &str) -> AppResult<CameraOnvif> {
693    sqlx::query_as::<_, CameraOnvif>("SELECT * FROM camera_onvif WHERE camera_id = ?")
694        .bind(camera_id)
695        .fetch_optional(pool)
696        .await?
697        .ok_or_else(|| {
698            AppError::NotFound(format!(
699                "camera {camera_id} has no ONVIF profile; run the ONVIF probe first"
700            ))
701        })
702}
703
704/// Load a camera + its ONVIF profile, asserting the PTZ service + profile token are present.
705async fn load_ptz_target(
706    pool: &SqlitePool,
707    camera_id: &str,
708) -> AppResult<(Camera, CameraOnvif, String, String)> {
709    let cam: Camera = sqlx::query_as::<_, Camera>("SELECT * FROM cameras WHERE id = ?")
710        .bind(camera_id)
711        .fetch_optional(pool)
712        .await?
713        .ok_or_else(|| AppError::NotFound(format!("camera {camera_id} not found")))?;
714    let onvif = load_onvif(pool, camera_id).await?;
715    let ptz_url = onvif
716        .ptz_url
717        .clone()
718        .filter(|s| !s.trim().is_empty())
719        .ok_or_else(|| AppError::BadRequest("camera exposes no ONVIF PTZ service".into()))?;
720    let profile_token = onvif
721        .profile_token
722        .clone()
723        .filter(|s| !s.trim().is_empty())
724        .ok_or_else(|| AppError::BadRequest("camera has no ONVIF media profile token".into()))?;
725    Ok((cam, onvif, ptz_url, profile_token))
726}
727
728/// Continuously pan/tilt/zoom at the given normalized velocities (each clamped to -1.0..=1.0). The
729/// motion runs until [`stop`] (or the device's own timeout).
730pub async fn continuous_move(
731    state: &AppState,
732    camera_id: &str,
733    pan: f64,
734    tilt: f64,
735    zoom: f64,
736) -> AppResult<()> {
737    let (cam, _onvif, ptz_url, token) = load_ptz_target(&state.pool, camera_id).await?;
738    let pan = pan.clamp(-1.0, 1.0);
739    let tilt = tilt.clamp(-1.0, 1.0);
740    let zoom = zoom.clamp(-1.0, 1.0);
741    let body = format!(
742        "<tptz:ContinuousMove>\
743<tptz:ProfileToken>{token}</tptz:ProfileToken>\
744<tptz:Velocity>\
745<tt:PanTilt x=\"{pan:.4}\" y=\"{tilt:.4}\"/>\
746<tt:Zoom x=\"{zoom:.4}\"/>\
747</tptz:Velocity>\
748</tptz:ContinuousMove>",
749        token = xml_escape(&token),
750    );
751    soap_call(
752        state,
753        &ptz_url,
754        &format!("{NS_PTZ}/ContinuousMove"),
755        envelope(
756            &security_header(cam.username.as_deref(), cam.password.as_deref()),
757            &body,
758        ),
759    )
760    .await?;
761    Ok(())
762}
763
764/// Stop all PTZ motion (pan/tilt + zoom).
765pub async fn stop(state: &AppState, camera_id: &str) -> AppResult<()> {
766    let (cam, _onvif, ptz_url, token) = load_ptz_target(&state.pool, camera_id).await?;
767    let body = format!(
768        "<tptz:Stop>\
769<tptz:ProfileToken>{token}</tptz:ProfileToken>\
770<tptz:PanTilt>true</tptz:PanTilt>\
771<tptz:Zoom>true</tptz:Zoom>\
772</tptz:Stop>",
773        token = xml_escape(&token),
774    );
775    soap_call(
776        state,
777        &ptz_url,
778        &format!("{NS_PTZ}/Stop"),
779        envelope(
780            &security_header(cam.username.as_deref(), cam.password.as_deref()),
781            &body,
782        ),
783    )
784    .await?;
785    Ok(())
786}
787
788/// Fetch the camera's PTZ presets, persist them (upsert + prune stale), and return the current set.
789pub async fn get_presets(state: &AppState, camera_id: &str) -> AppResult<Vec<PtzPreset>> {
790    let (cam, _onvif, ptz_url, token) = load_ptz_target(&state.pool, camera_id).await?;
791    let body = format!(
792        "<tptz:GetPresets><tptz:ProfileToken>{token}</tptz:ProfileToken></tptz:GetPresets>",
793        token = xml_escape(&token),
794    );
795    let resp = soap_call(
796        state,
797        &ptz_url,
798        &format!("{NS_PTZ}/GetPresets"),
799        envelope(
800            &security_header(cam.username.as_deref(), cam.password.as_deref()),
801            &body,
802        ),
803    )
804    .await?;
805
806    // Parse <tptz:Preset token="X"><tt:Name>Y</tt:Name>...</tptz:Preset>.
807    let mut fetched: Vec<(String, Option<String>)> = Vec::new();
808    for (open, inner) in elements(&resp, "Preset") {
809        if let Some(tok) = attr_in_tag(open, "token").filter(|t| !t.is_empty()) {
810            fetched.push((tok, first_text(inner, "Name")));
811        }
812    }
813
814    let now = Utc::now();
815    for (tok, name) in &fetched {
816        let id = format!("ptz_{}", Uuid::new_v4().simple());
817        sqlx::query(
818            "INSERT INTO camera_ptz_presets (id, camera_id, token, name, fetched_at)
819             VALUES (?, ?, ?, ?, ?)
820             ON CONFLICT(camera_id, token) DO UPDATE SET name = excluded.name, fetched_at = excluded.fetched_at",
821        )
822        .bind(&id)
823        .bind(camera_id)
824        .bind(tok)
825        .bind(name)
826        .bind(now)
827        .execute(&state.pool)
828        .await?;
829    }
830    // Prune presets that the device no longer reports.
831    if fetched.is_empty() {
832        sqlx::query("DELETE FROM camera_ptz_presets WHERE camera_id = ?")
833            .bind(camera_id)
834            .execute(&state.pool)
835            .await?;
836    } else {
837        let placeholders = vec!["?"; fetched.len()].join(",");
838        let sql = format!(
839            "DELETE FROM camera_ptz_presets WHERE camera_id = ? AND token NOT IN ({placeholders})"
840        );
841        let mut q = sqlx::query(&sql).bind(camera_id);
842        for (tok, _) in &fetched {
843            q = q.bind(tok);
844        }
845        q.execute(&state.pool).await?;
846    }
847
848    sqlx::query_as::<_, PtzPreset>(
849        "SELECT * FROM camera_ptz_presets WHERE camera_id = ? ORDER BY token ASC",
850    )
851    .bind(camera_id)
852    .fetch_all(&state.pool)
853    .await
854    .map_err(AppError::from)
855}
856
857/// Move the camera to a stored preset by its device token.
858pub async fn goto_preset(state: &AppState, camera_id: &str, preset_token: &str) -> AppResult<()> {
859    let (cam, _onvif, ptz_url, token) = load_ptz_target(&state.pool, camera_id).await?;
860    if preset_token.trim().is_empty() {
861        return Err(AppError::BadRequest("`token` is required".into()));
862    }
863    let body = format!(
864        "<tptz:GotoPreset>\
865<tptz:ProfileToken>{token}</tptz:ProfileToken>\
866<tptz:PresetToken>{preset}</tptz:PresetToken>\
867</tptz:GotoPreset>",
868        token = xml_escape(&token),
869        preset = xml_escape(preset_token),
870    );
871    soap_call(
872        state,
873        &ptz_url,
874        &format!("{NS_PTZ}/GotoPreset"),
875        envelope(
876            &security_header(cam.username.as_deref(), cam.password.as_deref()),
877            &body,
878        ),
879    )
880    .await?;
881    Ok(())
882}
883
884#[cfg(test)]
885mod tests {
886    use super::*;
887
888    #[test]
889    fn extracts_simple_text_with_prefix() {
890        let xml = "<tds:Manufacturer>HIKVISION</tds:Manufacturer><tds:Model>DS-2CD</tds:Model>";
891        assert_eq!(
892            first_text(xml, "Manufacturer").as_deref(),
893            Some("HIKVISION")
894        );
895        assert_eq!(first_text(xml, "Model").as_deref(), Some("DS-2CD"));
896        assert_eq!(first_text(xml, "SerialNumber"), None);
897    }
898
899    #[test]
900    fn unescapes_entities() {
901        let xml = "<x>A &amp; B &lt;c&gt;</x>";
902        assert_eq!(first_text(xml, "x").as_deref(), Some("A & B <c>"));
903    }
904
905    #[test]
906    fn parses_capabilities_blocks() {
907        let xml = "<tt:Capabilities>\
908<tt:Media><tt:XAddr>http://10.0.0.2/onvif/Media</tt:XAddr></tt:Media>\
909<tt:PTZ><tt:XAddr>http://10.0.0.2/onvif/PTZ</tt:XAddr></tt:PTZ>\
910</tt:Capabilities>";
911        let (m, p) = parse_capabilities(xml);
912        assert_eq!(m.as_deref(), Some("http://10.0.0.2/onvif/Media"));
913        assert_eq!(p.as_deref(), Some("http://10.0.0.2/onvif/PTZ"));
914    }
915
916    #[test]
917    fn parses_services_by_namespace() {
918        let xml = "<tds:GetServicesResponse>\
919<tds:Service><tds:Namespace>http://www.onvif.org/ver10/media/wsdl</tds:Namespace>\
920<tds:XAddr>http://10.0.0.2/onvif/Media</tds:XAddr></tds:Service>\
921<tds:Service><tds:Namespace>http://www.onvif.org/ver20/ptz/wsdl</tds:Namespace>\
922<tds:XAddr>http://10.0.0.2/onvif/PTZ</tds:XAddr></tds:Service>\
923</tds:GetServicesResponse>";
924        let (m, p) = parse_services(xml);
925        assert_eq!(m.as_deref(), Some("http://10.0.0.2/onvif/Media"));
926        assert_eq!(p.as_deref(), Some("http://10.0.0.2/onvif/PTZ"));
927    }
928
929    #[test]
930    fn prefers_profile_with_ptz_config() {
931        let xml = "<trt:GetProfilesResponse>\
932<trt:Profiles token=\"P0\" fixed=\"true\"><tt:VideoSourceConfiguration/></trt:Profiles>\
933<trt:Profiles token=\"P1\"><tt:PTZConfiguration><tt:NodeToken>NODE0</tt:NodeToken></tt:PTZConfiguration></trt:Profiles>\
934</trt:GetProfilesResponse>";
935        let c = parse_profiles(xml);
936        assert_eq!(c.token.as_deref(), Some("P1"));
937        assert_eq!(c.node_token.as_deref(), Some("NODE0"));
938        assert!(c.has_ptz_config);
939    }
940
941    #[test]
942    fn falls_back_to_first_profile_without_ptz() {
943        let xml = "<trt:Profiles token=\"OnlyOne\"><tt:VideoSourceConfiguration/></trt:Profiles>";
944        let c = parse_profiles(xml);
945        assert_eq!(c.token.as_deref(), Some("OnlyOne"));
946        assert!(!c.has_ptz_config);
947    }
948
949    #[test]
950    fn parses_preset_token_and_name() {
951        let xml = "<tptz:GetPresetsResponse>\
952<tptz:Preset token=\"1\"><tt:Name>Gate</tt:Name></tptz:Preset>\
953<tptz:Preset token=\"2\"/>\
954</tptz:GetPresetsResponse>";
955        let presets: Vec<_> = elements(xml, "Preset")
956            .into_iter()
957            .filter_map(|(open, inner)| {
958                attr_in_tag(open, "token").map(|t| (t, first_text(inner, "Name")))
959            })
960            .collect();
961        assert_eq!(presets.len(), 2);
962        assert_eq!(presets[0].0, "1");
963        assert_eq!(presets[0].1.as_deref(), Some("Gate"));
964        assert_eq!(presets[1].0, "2");
965        assert_eq!(presets[1].1, None);
966    }
967
968    #[test]
969    fn parses_xaddrs_from_probe_match() {
970        let xml = "<d:ProbeMatches><d:ProbeMatch>\
971<wsa:EndpointReference><wsa:Address>urn:uuid:abc</wsa:Address></wsa:EndpointReference>\
972<d:Types>dn:NetworkVideoTransmitter</d:Types>\
973<d:Scopes>onvif://www.onvif.org/name/Cam onvif://www.onvif.org/hardware/DS</d:Scopes>\
974<d:XAddrs>http://192.168.0.2/onvif/device_service</d:XAddrs>\
975</d:ProbeMatch></d:ProbeMatches>";
976        let mut out = Vec::new();
977        let mut seen = HashSet::new();
978        parse_probe_matches(xml, &mut out, &mut seen);
979        assert_eq!(out.len(), 1);
980        assert_eq!(out[0].device_url, "http://192.168.0.2/onvif/device_service");
981        assert_eq!(out[0].address.as_deref(), Some("192.168.0.2"));
982        assert_eq!(out[0].scopes.len(), 2);
983        assert_eq!(out[0].endpoint_reference.as_deref(), Some("urn:uuid:abc"));
984    }
985
986    #[test]
987    fn injects_creds_into_stream_uri() {
988        assert_eq!(
989            inject_creds(
990                "rtsp://10.0.0.2:554/Streaming/101",
991                Some("admin"),
992                Some("p@ss")
993            ),
994            "rtsp://admin:p%40ss@10.0.0.2:554/Streaming/101"
995        );
996        // Already has userinfo: unchanged.
997        assert_eq!(
998            inject_creds("rtsp://u:p@10.0.0.2/s", Some("admin"), Some("x")),
999            "rtsp://u:p@10.0.0.2/s"
1000        );
1001        // No username: unchanged.
1002        assert_eq!(
1003            inject_creds("rtsp://10.0.0.2/s", None, None),
1004            "rtsp://10.0.0.2/s"
1005        );
1006    }
1007
1008    #[test]
1009    fn fault_reason_extracted() {
1010        let xml = "<s:Fault><s:Reason><s:Text>Sender not authorized</s:Text></s:Reason></s:Fault>";
1011        assert_eq!(fault_reason(xml).as_deref(), Some("Sender not authorized"));
1012    }
1013
1014    #[test]
1015    fn host_of_strips_everything() {
1016        assert_eq!(
1017            host_of("http://192.168.0.2/onvif/x").as_deref(),
1018            Some("192.168.0.2")
1019        );
1020        assert_eq!(
1021            host_of("http://u:p@10.0.0.5:8000/x").as_deref(),
1022            Some("10.0.0.5")
1023        );
1024    }
1025}