Skip to main content

heldar_kernel/services/
discovery.rs

1//! Network discovery: scan an IPv4 range / CIDR for cameras (open RTSP), then identify each by
2//! probing a list of vendor RTSP paths with one or more credential sets (via ffprobe). Vendor-
3//! agnostic: HikVision, Dahua, Axis, and generic/ONVIF paths are all tried. Optionally auto-
4//! registers verified devices (recording disabled by default).
5
6use std::net::Ipv4Addr;
7use std::sync::Arc;
8use std::time::Duration;
9
10use chrono::Utc;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use sqlx::types::Json as SqlxJson;
14use sqlx::SqlitePool;
15use tokio::net::TcpStream;
16use tokio::sync::Semaphore;
17use tokio::task::JoinSet;
18
19use crate::camera_url;
20use crate::config::Config;
21use crate::util::{self, ProbeInfo};
22
23/// Cap on how many addresses a single scan may cover (a /22 worth), to bound work.
24const MAX_TARGETS: usize = 1024;
25const SCAN_CONCURRENCY: usize = 64;
26/// How many hosts to credential-probe (ffprobe) at once.
27const PROBE_CONCURRENCY: usize = 8;
28/// Per-attempt ffprobe budget, and a cap on attempts per host (bounds time + lockout risk).
29const PROBE_TIMEOUT: Duration = Duration::from_secs(5);
30const MAX_PROBE_ATTEMPTS: usize = 18;
31
32/// Candidate RTSP stream paths, tagged with the vendor they imply. Tried in order, but paths whose
33/// vendor matches the HTTP banner guess are moved to the front.
34const PROBE_PATHS: &[(&str, &str)] = &[
35    ("hikvision", "/Streaming/Channels/101"),
36    ("hikvision", "/Streaming/Channels/102"),
37    ("dahua", "/cam/realmonitor?channel=1&subtype=0"),
38    ("dahua", "/cam/realmonitor?channel=1&subtype=1"),
39    ("axis", "/axis-media/media.amp"),
40    ("generic", "/live"),
41    ("generic", "/live.sdp"),
42    ("generic", "/Streaming/Channels/1"),
43    ("generic", "/h264"),
44    ("generic", "/11"),
45    ("generic", "/stream1"),
46    ("generic", "/video1"),
47    ("generic", "/media/video1"),
48    ("generic", "/ch0_0.h264"),
49    ("generic", "/onvif1"),
50    ("generic", "/"),
51];
52
53/// Well-known default credentials, tried only when `try_default_creds` is set AND the device is not
54/// identified as HikVision (HikVision locks out after a few failures — we never brute-force it).
55const DEFAULT_CREDS: &[(&str, &str)] = &[
56    ("admin", "admin"),
57    ("admin", "12345"),
58    ("admin", "123456"),
59    ("admin", ""),
60    ("root", "root"),
61    ("root", "admin"),
62    ("admin", "9999"),
63];
64
65#[derive(Debug, Clone, Deserialize)]
66pub struct Credential {
67    pub username: String,
68    pub password: String,
69}
70
71#[derive(Debug, Deserialize)]
72pub struct DiscoverOptions {
73    /// CIDR ("192.168.0.0/24"), range ("192.168.0.2-192.168.0.12"), single IP, or comma list.
74    pub targets: String,
75    /// Single credential (convenience). Combined with `credentials` if both are given.
76    pub username: Option<String>,
77    pub password: Option<String>,
78    /// Additional credential sets to try, in order.
79    pub credentials: Option<Vec<Credential>>,
80    /// Probe RTSP paths + credentials with ffprobe to confirm a working stream.
81    #[serde(default)]
82    pub verify: bool,
83    /// Also try a built-in default-credentials list (non-HikVision hosts only).
84    #[serde(default)]
85    pub try_default_creds: bool,
86    /// Register verified, not-yet-known devices as cameras (recording disabled by default).
87    #[serde(default)]
88    pub auto_add: bool,
89    pub rtsp_port: Option<u16>,
90    pub connect_timeout_ms: Option<u64>,
91}
92
93#[derive(Debug, Clone, Serialize)]
94pub struct DiscoveredDevice {
95    pub address: String,
96    pub rtsp_port: u16,
97    pub rtsp_open: bool,
98    pub http_open: bool,
99    pub vendor_guess: String,
100    pub http_server: Option<String>,
101    pub verified: bool,
102    pub codec: Option<String>,
103    pub width: Option<i64>,
104    pub height: Option<i64>,
105    /// The RTSP path that verified (e.g. `/Streaming/Channels/101`).
106    pub stream_path: Option<String>,
107    /// The username that verified (password is never serialized).
108    pub matched_username: Option<String>,
109    #[serde(skip)]
110    pub matched_password: Option<String>,
111    pub suggested_id: String,
112    pub already_registered: bool,
113}
114
115/// Expand a targets spec into a bounded list of IPv4 addresses.
116pub fn parse_targets(spec: &str) -> Result<Vec<Ipv4Addr>, String> {
117    let mut out: Vec<Ipv4Addr> = Vec::new();
118    let push = |a: u32, out: &mut Vec<Ipv4Addr>| -> Result<(), String> {
119        if out.len() >= MAX_TARGETS {
120            return Err(format!("too many targets (> {MAX_TARGETS})"));
121        }
122        out.push(Ipv4Addr::from(a));
123        Ok(())
124    };
125
126    for token in spec.split(',').map(|s| s.trim()).filter(|s| !s.is_empty()) {
127        if let Some((base, prefix)) = token.split_once('/') {
128            let base: Ipv4Addr = base.parse().map_err(|_| format!("bad CIDR base: {base}"))?;
129            let prefix: u32 = prefix
130                .parse()
131                .map_err(|_| format!("bad CIDR prefix: {prefix}"))?;
132            if prefix > 32 {
133                return Err(format!("bad CIDR prefix: {prefix}"));
134            }
135            let base_u = u32::from(base);
136            let mask = if prefix == 0 {
137                0
138            } else {
139                u32::MAX << (32 - prefix)
140            };
141            let network = base_u & mask;
142            let broadcast = network | !mask;
143            let (start, end) = if prefix <= 30 {
144                (network + 1, broadcast - 1)
145            } else {
146                (network, broadcast)
147            };
148            for a in start..=end {
149                push(a, &mut out)?;
150            }
151        } else if let Some((a, b)) = token.split_once('-') {
152            let a: Ipv4Addr = a
153                .trim()
154                .parse()
155                .map_err(|_| format!("bad range start: {a}"))?;
156            let b: Ipv4Addr = b
157                .trim()
158                .parse()
159                .map_err(|_| format!("bad range end: {b}"))?;
160            let (a, b) = (u32::from(a), u32::from(b));
161            if b < a {
162                return Err("range end precedes start".into());
163            }
164            for x in a..=b {
165                push(x, &mut out)?;
166            }
167        } else {
168            let ip: Ipv4Addr = token.parse().map_err(|_| format!("bad IP: {token}"))?;
169            out.push(ip);
170        }
171    }
172    if out.is_empty() {
173        return Err("no targets specified".into());
174    }
175    Ok(out)
176}
177
178async fn port_open(ip: Ipv4Addr, port: u16, timeout: Duration) -> bool {
179    matches!(
180        tokio::time::timeout(timeout, TcpStream::connect((ip, port))).await,
181        Ok(Ok(_))
182    )
183}
184
185fn guess_vendor(server: Option<&str>, body: &str) -> String {
186    let s = server.unwrap_or("").to_ascii_lowercase();
187    let b = body.to_ascii_lowercase();
188    if s.contains("hikvision")
189        || b.contains("hikvision")
190        || s == "webserver"
191        || b.contains("/doc/page/login")
192    {
193        "hikvision".into()
194    } else if s.contains("app-webs") || b.contains("dahua") {
195        "dahua".into()
196    } else if s.contains("axis") || b.contains("axis") {
197        "axis".into()
198    } else if b.contains("boa")
199        || s.contains("boa")
200        || s.contains("hipcam")
201        || s.contains("uc-httpd")
202    {
203        "generic".into()
204    } else {
205        "unknown".into()
206    }
207}
208
209/// Build an RTSP URL with (optionally empty) credentials.
210fn build_rtsp_url(host: &str, port: u16, user: &str, pass: &str, path: &str) -> String {
211    if user.is_empty() {
212        format!("rtsp://{host}:{port}{path}")
213    } else {
214        format!(
215            "rtsp://{}:{}@{host}:{port}{path}",
216            camera_url::encode_userinfo(user),
217            camera_url::encode_userinfo(pass)
218        )
219    }
220}
221
222struct ProbeMatch {
223    vendor: String,
224    path: String,
225    username: String,
226    password: String,
227    info: ProbeInfo,
228}
229
230/// Try credential/path combinations against a host until one yields a readable stream.
231async fn probe_host(
232    ffprobe_bin: &str,
233    host: &str,
234    port: u16,
235    banner_vendor: &str,
236    creds: &[(String, String)],
237    try_default_creds: bool,
238) -> Option<ProbeMatch> {
239    // Vendor-ordered paths: matches for the banner vendor first.
240    let mut paths: Vec<(&str, &str)> = PROBE_PATHS.to_vec();
241    paths.sort_by_key(|(v, _)| if *v == banner_vendor { 0 } else { 1 });
242
243    // Credential order: provided creds first; default creds only for non-HikVision hosts.
244    let mut cred_list: Vec<(String, String)> = creds.to_vec();
245    if try_default_creds && banner_vendor != "hikvision" {
246        for (u, p) in DEFAULT_CREDS {
247            cred_list.push((u.to_string(), p.to_string()));
248        }
249    }
250    if cred_list.is_empty() {
251        cred_list.push((String::new(), String::new()));
252    }
253
254    let mut attempts = 0usize;
255    for (user, pass) in &cred_list {
256        for (vendor, path) in &paths {
257            if attempts >= MAX_PROBE_ATTEMPTS {
258                return None;
259            }
260            attempts += 1;
261            let url = build_rtsp_url(host, port, user, pass, path);
262            match tokio::time::timeout(PROBE_TIMEOUT, util::ffprobe_stream(ffprobe_bin, &url)).await
263            {
264                Ok(Ok(info)) if info.codec.is_some() => {
265                    return Some(ProbeMatch {
266                        vendor: (*vendor).to_string(),
267                        path: (*path).to_string(),
268                        username: user.clone(),
269                        password: pass.clone(),
270                        info,
271                    });
272                }
273                _ => {}
274            }
275        }
276    }
277    None
278}
279
280pub async fn discover(
281    pool: &SqlitePool,
282    cfg: &Config,
283    http: &reqwest::Client,
284    opts: &DiscoverOptions,
285) -> Result<Vec<DiscoveredDevice>, String> {
286    let ips = parse_targets(&opts.targets)?;
287    let rtsp_port = opts.rtsp_port.unwrap_or(554);
288    let timeout = Duration::from_millis(opts.connect_timeout_ms.unwrap_or(700));
289
290    let existing: Vec<String> =
291        sqlx::query_scalar("SELECT address FROM cameras WHERE address IS NOT NULL")
292            .fetch_all(pool)
293            .await
294            .unwrap_or_default();
295
296    // Assemble the credential list (single convenience cred + explicit list).
297    let mut creds: Vec<(String, String)> = Vec::new();
298    if let Some(u) = opts.username.as_deref().filter(|s| !s.is_empty()) {
299        creds.push((u.to_string(), opts.password.clone().unwrap_or_default()));
300    }
301    if let Some(list) = &opts.credentials {
302        for c in list {
303            creds.push((c.username.clone(), c.password.clone()));
304        }
305    }
306
307    // 1) Bounded-concurrency port scan for open RTSP (and HTTP, for vendor identification).
308    let sem = Arc::new(Semaphore::new(SCAN_CONCURRENCY));
309    let mut set: JoinSet<(Ipv4Addr, bool, bool)> = JoinSet::new();
310    for ip in ips {
311        let sem = sem.clone();
312        set.spawn(async move {
313            let _permit = sem.acquire_owned().await.expect("semaphore");
314            let rtsp = port_open(ip, rtsp_port, timeout).await;
315            let http = if rtsp {
316                port_open(ip, 80, timeout).await
317            } else {
318                false
319            };
320            (ip, rtsp, http)
321        });
322    }
323    let mut candidates: Vec<(Ipv4Addr, bool)> = Vec::new();
324    while let Some(res) = set.join_next().await {
325        if let Ok((ip, rtsp, http)) = res {
326            if rtsp {
327                candidates.push((ip, http));
328            }
329        }
330    }
331    candidates.sort_by_key(|(ip, _)| u32::from(*ip));
332
333    // 2) Identify (HTTP banner) + optionally verify (ffprobe paths x creds), parallel across hosts.
334    let probe_sem = Arc::new(Semaphore::new(PROBE_CONCURRENCY));
335    let mut probe_set: JoinSet<DiscoveredDevice> = JoinSet::new();
336    for (ip, http_open) in candidates {
337        let http = http.clone();
338        let probe_sem = probe_sem.clone();
339        let ffprobe_bin = cfg.ffprobe_bin.clone();
340        let creds = creds.clone();
341        let verify = opts.verify;
342        let try_default = opts.try_default_creds;
343        let existing = existing.clone();
344        probe_set.spawn(async move {
345            let _permit = probe_sem.acquire_owned().await.expect("semaphore");
346            let addr = ip.to_string();
347
348            let mut http_server = None;
349            let mut vendor_guess = "unknown".to_string();
350            if http_open {
351                if let Ok(resp) = http
352                    .get(format!("http://{addr}/"))
353                    .timeout(Duration::from_secs(3))
354                    .send()
355                    .await
356                {
357                    let server = resp
358                        .headers()
359                        .get("server")
360                        .and_then(|v| v.to_str().ok())
361                        .map(|s| s.to_string());
362                    http_server = server.clone();
363                    let body = resp.text().await.unwrap_or_default();
364                    vendor_guess = guess_vendor(server.as_deref(), &body);
365                }
366            }
367
368            let mut device = DiscoveredDevice {
369                address: addr.clone(),
370                rtsp_port,
371                rtsp_open: true,
372                http_open,
373                vendor_guess: vendor_guess.clone(),
374                http_server,
375                verified: false,
376                codec: None,
377                width: None,
378                height: None,
379                stream_path: None,
380                matched_username: None,
381                matched_password: None,
382                suggested_id: format!("cam_{}", addr.replace('.', "_")),
383                already_registered: existing.iter().any(|a| a == &addr),
384            };
385
386            if verify {
387                if let Some(m) = probe_host(
388                    &ffprobe_bin,
389                    &addr,
390                    rtsp_port,
391                    &vendor_guess,
392                    &creds,
393                    try_default,
394                )
395                .await
396                {
397                    device.verified = true;
398                    // The working path is stronger vendor evidence than the banner.
399                    if m.vendor != "generic" {
400                        device.vendor_guess = m.vendor;
401                    } else if vendor_guess == "unknown" {
402                        device.vendor_guess = "generic".into();
403                    }
404                    device.codec = m.info.codec;
405                    device.width = m.info.width;
406                    device.height = m.info.height;
407                    device.stream_path = Some(m.path);
408                    device.matched_username = Some(m.username);
409                    device.matched_password = Some(m.password);
410                }
411            }
412            device
413        });
414    }
415
416    let mut devices = Vec::new();
417    while let Some(res) = probe_set.join_next().await {
418        if let Ok(d) = res {
419            devices.push(d);
420        }
421    }
422    devices.sort_by_key(|d| {
423        d.address
424            .parse::<Ipv4Addr>()
425            .map(u32::from)
426            .unwrap_or(u32::MAX)
427    });
428    Ok(devices)
429}
430
431/// Register a discovered device as a camera with recording DISABLED (operator enables it later).
432/// HikVision/Dahua use the vendor template (so the sub-stream is derivable); other vendors store the
433/// exact discovered RTSP URL. Returns the new camera id.
434pub async fn add_device(pool: &SqlitePool, device: &DiscoveredDevice) -> sqlx::Result<String> {
435    let vendor = device.vendor_guess.as_str();
436    let username = device.matched_username.as_deref();
437    let password = device.matched_password.as_deref();
438
439    // For non-template vendors, store the exact URL that verified (path can't be guessed).
440    let main_stream_url = if matches!(vendor, "hikvision" | "dahua") {
441        None
442    } else {
443        device.stream_path.as_deref().map(|path| {
444            build_rtsp_url(
445                &device.address,
446                device.rtsp_port,
447                username.unwrap_or(""),
448                password.unwrap_or(""),
449                path,
450            )
451        })
452    };
453    let store_vendor = if vendor == "unknown" {
454        "generic"
455    } else {
456        vendor
457    };
458
459    let now = Utc::now();
460    sqlx::query(
461        "INSERT INTO cameras
462           (id, name, vendor, address, rtsp_port, username, password, main_stream_url, record_stream,
463            capabilities, record_enabled, segment_seconds, retention_hours, enabled, created_at, updated_at)
464         VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'main', ?, 0, 60, 24, 1, ?, ?)",
465    )
466    .bind(&device.suggested_id)
467    .bind(format!("Camera {}", device.address))
468    .bind(store_vendor)
469    .bind(&device.address)
470    .bind(device.rtsp_port as i64)
471    .bind(username)
472    .bind(password)
473    .bind(&main_stream_url)
474    .bind(SqlxJson(json!({
475        "discovered": true,
476        "stream_path": device.stream_path,
477        "codec": device.codec,
478    })))
479    .bind(now)
480    .bind(now)
481    .execute(pool)
482    .await?;
483    sqlx::query(
484        "INSERT INTO camera_status (camera_id, state, updated_at) VALUES (?, 'unknown', ?)
485         ON CONFLICT(camera_id) DO NOTHING",
486    )
487    .bind(&device.suggested_id)
488    .bind(now)
489    .execute(pool)
490    .await?;
491    Ok(device.suggested_id.clone())
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497
498    #[test]
499    fn parse_cidr_excludes_network_and_broadcast() {
500        let ips = parse_targets("192.168.0.0/30").unwrap();
501        assert_eq!(
502            ips,
503            vec![
504                "192.168.0.1".parse::<Ipv4Addr>().unwrap(),
505                "192.168.0.2".parse().unwrap()
506            ]
507        );
508    }
509
510    #[test]
511    fn parse_range_and_list() {
512        let ips = parse_targets("192.168.0.2-192.168.0.4, 10.0.0.5").unwrap();
513        assert_eq!(ips.len(), 4);
514        assert_eq!(ips[3], "10.0.0.5".parse::<Ipv4Addr>().unwrap());
515    }
516
517    #[test]
518    fn parse_rejects_oversized_and_bad() {
519        assert!(parse_targets("10.0.0.0/8").is_err());
520        assert!(parse_targets("not-an-ip").is_err());
521    }
522
523    #[test]
524    fn build_rtsp_url_with_and_without_creds() {
525        assert_eq!(
526            build_rtsp_url("10.0.0.5", 554, "admin", "p@ss", "/live"),
527            "rtsp://admin:p%40ss@10.0.0.5:554/live"
528        );
529        assert_eq!(
530            build_rtsp_url("10.0.0.5", 554, "", "", "/live"),
531            "rtsp://10.0.0.5:554/live"
532        );
533    }
534}