use std::collections::HashSet;
use std::net::IpAddr;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::time::{Instant, timeout};
use crate::soap::XmlNode;
const WSD_MULTICAST: &str = "239.255.255.250:3702";
const WSD_MULTICAST_ADDR: std::net::Ipv4Addr = std::net::Ipv4Addr::new(239, 255, 255, 250);
const UDP_MAX_SIZE: usize = 65_535;
#[derive(Debug, Clone)]
pub struct DiscoveredDevice {
pub endpoint: String,
pub types: Vec<String>,
pub scopes: Vec<String>,
pub xaddrs: Vec<String>,
}
impl DiscoveredDevice {
fn from_xml(node: &XmlNode) -> Self {
let endpoint = node
.path(&["EndpointReference", "Address"])
.map(|n| n.text().to_string())
.unwrap_or_default();
let types = node
.child("Types")
.map(|n| n.text().split_whitespace().map(str::to_string).collect())
.unwrap_or_default();
let scopes = node
.child("Scopes")
.map(|n| n.text().split_whitespace().map(str::to_string).collect())
.unwrap_or_default();
let xaddrs = node
.child("XAddrs")
.map(|n| n.text().split_whitespace().map(str::to_string).collect())
.unwrap_or_default();
Self {
endpoint,
types,
scopes,
xaddrs,
}
}
}
#[derive(Debug, Clone)]
pub enum DiscoveryEvent {
Hello(DiscoveredDevice),
Bye {
endpoint: String,
},
}
pub async fn probe(timeout_dur: Duration) -> Vec<DiscoveredDevice> {
probe_inner(1, timeout_dur, Duration::ZERO, WSD_MULTICAST)
.await
.unwrap_or_default()
}
pub async fn probe_rounds(
rounds: usize,
timeout_per_round: Duration,
interval: Duration,
) -> Vec<DiscoveredDevice> {
probe_inner(rounds, timeout_per_round, interval, WSD_MULTICAST)
.await
.unwrap_or_default()
}
pub async fn probe_unicast(ip: IpAddr, timeout_dur: Duration) -> Vec<DiscoveredDevice> {
let target = format!("{ip}:3702");
probe_unicast_inner(timeout_dur, &target)
.await
.unwrap_or_default()
}
pub async fn listen(timeout_dur: Duration) -> Vec<DiscoveryEvent> {
listen_inner(timeout_dur).await.unwrap_or_default()
}
async fn probe_inner(
rounds: usize,
timeout_per_round: Duration,
interval: Duration,
target: &str,
) -> std::io::Result<Vec<DiscoveredDevice>> {
let mut devices: Vec<DiscoveredDevice> = Vec::new();
let mut seen: HashSet<String> = HashSet::new();
for round in 0..rounds {
if round > 0 && !interval.is_zero() {
tokio::time::sleep(interval).await;
}
let raw = probe_once(timeout_per_round, target).await?;
merge_probe_responses(raw, &mut devices, &mut seen);
}
Ok(devices)
}
fn merge_probe_responses(
raw: Vec<Vec<u8>>,
out: &mut Vec<DiscoveredDevice>,
seen: &mut HashSet<String>,
) {
for data in raw {
let Ok(text) = std::str::from_utf8(&data) else {
continue;
};
let matches = match XmlNode::parse(text) {
Ok(root) => collect_probe_matches(&root),
Err(_) => collect_probe_matches_lenient(text),
};
for d in matches {
if seen.insert(d.endpoint.clone()) {
out.push(d);
}
}
}
}
async fn probe_unicast_inner(
timeout_dur: Duration,
target: &str,
) -> std::io::Result<Vec<DiscoveredDevice>> {
let bind_addr: &str = match target.parse::<std::net::SocketAddr>() {
Ok(addr) if addr.is_ipv6() => "[::]:0",
_ => "0.0.0.0:0",
};
let sock = UdpSocket::bind(bind_addr).await?;
let nvt_probe = build_probe(&new_uuid(), ProbeTarget::NetworkVideoTransmitter);
let device_probe = build_probe(&new_uuid(), ProbeTarget::Device);
let _ = sock.send_to(nvt_probe.as_bytes(), target).await;
let _ = sock.send_to(device_probe.as_bytes(), target).await;
let mut raw: Vec<Vec<u8>> = Vec::new();
let mut buf = vec![0u8; UDP_MAX_SIZE];
let deadline = Instant::now() + timeout_dur;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match timeout(remaining, sock.recv_from(&mut buf)).await {
Ok(Ok((len, _))) => raw.push(buf[..len].to_vec()),
Ok(Err(_)) => continue, Err(_) => break, }
}
let mut devices: Vec<DiscoveredDevice> = Vec::new();
let mut seen: HashSet<String> = HashSet::new();
merge_probe_responses(raw, &mut devices, &mut seen);
Ok(devices)
}
async fn probe_once(timeout_dur: Duration, target: &str) -> std::io::Result<Vec<Vec<u8>>> {
use std::net::{Ipv4Addr, SocketAddrV4};
use std::sync::{Arc, Mutex};
let nvt_probe = Arc::new(build_probe(
&new_uuid(),
ProbeTarget::NetworkVideoTransmitter,
));
let device_probe = Arc::new(build_probe(&new_uuid(), ProbeTarget::Device));
let bind_ips: Vec<Ipv4Addr> = std::iter::once(Ipv4Addr::UNSPECIFIED)
.chain(local_ipv4_addrs())
.collect();
let received: Arc<Mutex<Vec<Vec<u8>>>> = Arc::new(Mutex::new(Vec::new()));
let mut join_set: tokio::task::JoinSet<()> = tokio::task::JoinSet::new();
for ip in bind_ips {
use socket2::{Domain, Protocol, Socket, Type};
let Ok(raw) = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) else {
continue;
};
let addr: std::net::SocketAddr = SocketAddrV4::new(ip, 0).into();
if raw.bind(&addr.into()).is_err() {
continue;
}
let _ = raw.set_multicast_ttl_v4(32);
if ip != Ipv4Addr::UNSPECIFIED {
let _ = raw.set_multicast_if_v4(&ip);
}
let _ = raw.set_nonblocking(true);
let Ok(sock) = UdpSocket::from_std(raw.into()) else {
continue;
};
let _ = sock.send_to(nvt_probe.as_bytes(), target).await;
let _ = sock.send_to(device_probe.as_bytes(), target).await;
let received = Arc::clone(&received);
join_set.spawn(async move {
let mut buf = vec![0u8; UDP_MAX_SIZE];
let deadline = Instant::now() + timeout_dur;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match timeout(remaining, sock.recv_from(&mut buf)).await {
Ok(Ok((len, _))) => {
received
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(buf[..len].to_vec());
}
Ok(Err(_)) => continue, Err(_) => break, }
}
});
}
join_set.join_all().await;
let raw = Arc::try_unwrap(received)
.unwrap_or_default()
.into_inner()
.unwrap_or_else(|e| e.into_inner());
Ok(raw)
}
fn local_ipv4_addrs() -> Vec<std::net::Ipv4Addr> {
if_addrs::get_if_addrs()
.unwrap_or_default()
.into_iter()
.filter_map(|iface| {
if iface.is_loopback() {
return None;
}
match iface.addr {
if_addrs::IfAddr::V4(addr) => Some(addr.ip),
_ => None,
}
})
.collect()
}
async fn listen_inner(timeout_dur: Duration) -> std::io::Result<Vec<DiscoveryEvent>> {
use std::collections::HashMap;
use std::net::Ipv4Addr;
let socket = UdpSocket::bind("0.0.0.0:3702").await?;
socket.join_multicast_v4(WSD_MULTICAST_ADDR, Ipv4Addr::UNSPECIFIED)?;
let mut buf = vec![0u8; UDP_MAX_SIZE];
let mut events: Vec<DiscoveryEvent> = Vec::new();
let mut last_seq: HashMap<String, AppSequence> = HashMap::new();
let deadline = Instant::now() + timeout_dur;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match timeout(remaining, socket.recv_from(&mut buf)).await {
Ok(Ok((len, _addr))) => {
let Ok(text) = std::str::from_utf8(&buf[..len]) else {
continue;
};
if let Ok(root) = XmlNode::parse(text) {
for (ev, seq) in collect_discovery_events(&root) {
if !accept_event(&ev, seq.as_ref(), &mut last_seq) {
continue;
}
events.push(ev);
}
}
}
_ => break,
}
}
Ok(events)
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct AppSequence {
instance_id: u64,
sequence_id: Option<String>,
message_number: u64,
}
impl AppSequence {
fn comparable_to(&self, other: &Self) -> bool {
self.instance_id == other.instance_id && self.sequence_id == other.sequence_id
}
fn from_node(node: &XmlNode) -> Option<Self> {
let instance_id = node.attr("InstanceId")?.parse().ok()?;
let message_number = node.attr("MessageNumber")?.parse().ok()?;
let sequence_id = node.attr("SequenceId").map(str::to_string);
Some(Self {
instance_id,
message_number,
sequence_id,
})
}
}
fn accept_event(
event: &DiscoveryEvent,
seq: Option<&AppSequence>,
last_seq: &mut std::collections::HashMap<String, AppSequence>,
) -> bool {
let endpoint = match event {
DiscoveryEvent::Hello(d) => d.endpoint.clone(),
DiscoveryEvent::Bye { endpoint } => endpoint.clone(),
};
if matches!(event, DiscoveryEvent::Bye { .. })
&& let Some(new) = seq
&& let Some(prev) = last_seq.get(&endpoint)
&& new.comparable_to(prev)
&& new.message_number <= prev.message_number
{
return false;
}
if let Some(new) = seq {
match last_seq.get(&endpoint) {
Some(prev) if new.comparable_to(prev) && new.message_number < prev.message_number => {
}
_ => {
last_seq.insert(endpoint, new.clone());
}
}
}
true
}
fn collect_discovery_events(root: &XmlNode) -> Vec<(DiscoveryEvent, Option<AppSequence>)> {
let action = root
.path(&["Header", "Action"])
.map(|n| n.text())
.unwrap_or("");
let seq = root
.path(&["Header", "AppSequence"])
.and_then(AppSequence::from_node);
let body = root.child("Body").unwrap_or(root);
if action.ends_with("/Hello") {
if let Some(hello) = body.child("Hello") {
return vec![(
DiscoveryEvent::Hello(DiscoveredDevice::from_xml(hello)),
seq,
)];
}
} else if action.ends_with("/Bye") {
if let Some(bye) = body.child("Bye") {
let endpoint = bye
.path(&["EndpointReference", "Address"])
.map(|n| n.text().to_string())
.unwrap_or_default();
return vec![(DiscoveryEvent::Bye { endpoint }, seq)];
}
}
vec![]
}
fn collect_probe_matches(root: &XmlNode) -> Vec<DiscoveredDevice> {
let body = root.child("Body").unwrap_or(root);
let matches = body.child("ProbeMatches").unwrap_or(body);
matches
.children_named("ProbeMatch")
.map(DiscoveredDevice::from_xml)
.collect()
}
fn collect_probe_matches_lenient(text: &str) -> Vec<DiscoveredDevice> {
let mut devices = Vec::new();
for block in extract_blocks(text, "ProbeMatch") {
let endpoint = extract_first_tag(&block, "Address").unwrap_or_default();
if endpoint.is_empty() {
continue;
}
let types = extract_first_tag(&block, "Types")
.map(|s| split_ws(&s))
.unwrap_or_default();
let scopes = extract_first_tag(&block, "Scopes")
.map(|s| split_ws(&s))
.unwrap_or_default();
let xaddrs = extract_first_tag(&block, "XAddrs")
.map(|s| split_ws(&s))
.unwrap_or_default();
devices.push(DiscoveredDevice {
endpoint,
types,
scopes,
xaddrs,
});
}
devices
}
fn split_ws(s: &str) -> Vec<String> {
s.split_whitespace()
.filter(|s| !s.is_empty())
.map(str::to_string)
.collect()
}
fn extract_blocks(xml: &str, local_name: &str) -> Vec<String> {
let mut blocks = Vec::new();
let mut search_from = 0;
while search_from < xml.len() {
let open = match find_open_tag(&xml[search_from..], local_name) {
Some((start, end)) => (search_from + start, search_from + end),
None => break,
};
let close = match find_close_tag(&xml[open.1..], local_name) {
Some(pos) => open.1 + pos,
None => break,
};
blocks.push(xml[open.1..close].to_string());
search_from = close;
}
blocks
}
fn find_open_tag(xml: &str, local_name: &str) -> Option<(usize, usize)> {
let mut pos = 0;
while pos < xml.len() {
let rest = &xml[pos..];
let lt = rest.find('<')?;
let abs_lt = pos + lt;
let after_lt = &xml[abs_lt + 1..];
if after_lt.starts_with('/') || after_lt.starts_with('?') || after_lt.starts_with('!') {
pos = abs_lt + 1;
continue;
}
let gt = match after_lt.find('>') {
Some(p) => p,
None => break,
};
let tag_content = &after_lt[..gt]; let tag_name = tag_content.split_whitespace().next().unwrap_or("");
let local = tag_name.rsplit(':').next().unwrap_or(tag_name);
let local = local.trim_end_matches('/');
if local == local_name {
return Some((abs_lt, abs_lt + 1 + gt + 1));
}
pos = abs_lt + 1;
}
None
}
fn find_close_tag(xml: &str, local_name: &str) -> Option<usize> {
let mut pos = 0;
while pos < xml.len() {
let rest = &xml[pos..];
let lt = rest.find("</")?;
let abs_lt = pos + lt;
let after_close = &xml[abs_lt + 2..];
let gt = after_close.find('>')?;
let tag_name = after_close[..gt].trim();
let local = tag_name.rsplit(':').next().unwrap_or(tag_name);
if local == local_name {
return Some(abs_lt);
}
pos = abs_lt + 2;
}
None
}
fn extract_first_tag(xml: &str, local_name: &str) -> Option<String> {
extract_blocks(xml, local_name)
.into_iter()
.next()
.map(|s| s.trim().to_string())
}
#[derive(Clone, Copy)]
enum ProbeTarget {
NetworkVideoTransmitter,
Device,
}
fn build_probe(message_id: &str, target: ProbeTarget) -> String {
let (types, onvif_ns_decl) = match target {
ProbeTarget::NetworkVideoTransmitter => (
"dn:NetworkVideoTransmitter",
r#" xmlns:dn="http://www.onvif.org/ver10/network/wsdl""#,
),
ProbeTarget::Device => (
"tds:Device",
r#" xmlns:tds="http://www.onvif.org/ver10/device/wsdl""#,
),
};
format!(
concat!(
r#"<?xml version="1.0" encoding="utf-8"?>"#,
r#"<s:Envelope"#,
r#" xmlns:s="http://www.w3.org/2003/05/soap-envelope""#,
r#" xmlns:wsa="http://schemas.xmlsoap.org/ws/2004/08/addressing""#,
r#" xmlns:wsd="http://schemas.xmlsoap.org/ws/2005/04/discovery""#,
r#"{}>"#,
r#"<s:Header>"#,
r#"<wsa:Action s:mustUnderstand="1">http://schemas.xmlsoap.org/ws/2005/04/discovery/Probe</wsa:Action>"#,
r#"<wsa:MessageID>uuid:{}</wsa:MessageID>"#,
r#"<wsa:ReplyTo><wsa:Address>http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous</wsa:Address></wsa:ReplyTo>"#,
r#"<wsa:To s:mustUnderstand="1">urn:schemas-xmlsoap-org:ws:2005:04:discovery</wsa:To>"#,
r#"</s:Header>"#,
r#"<s:Body>"#,
r#"<wsd:Probe><wsd:Types>{}</wsd:Types></wsd:Probe>"#,
r#"</s:Body>"#,
r#"</s:Envelope>"#,
),
onvif_ns_decl, message_id, types
)
}
fn new_uuid() -> String {
format!(
"{:08x}-{:04x}-4{:03x}-{:04x}-{:012x}",
rand::random::<u32>(),
rand::random::<u16>(),
rand::random::<u16>() & 0x0fff,
(rand::random::<u16>() & 0x3fff) | 0x8000,
rand::random::<u64>() & 0x0000_ffff_ffff_ffff,
)
}
#[cfg(test)]
mod tests {
use super::*;
fn probe_match_xml(endpoint: &str, xaddrs: &str) -> String {
format!(
r#"<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"
xmlns:wsd="http://schemas.xmlsoap.org/ws/2005/04/discovery"
xmlns:wsa="http://www.w3.org/2005/08/addressing">
<s:Body>
<wsd:ProbeMatches>
<wsd:ProbeMatch>
<wsa:EndpointReference>
<wsa:Address>{endpoint}</wsa:Address>
</wsa:EndpointReference>
<wsd:Types>dn:NetworkVideoTransmitter</wsd:Types>
<wsd:Scopes>onvif://www.onvif.org/name/Camera1</wsd:Scopes>
<wsd:XAddrs>{xaddrs}</wsd:XAddrs>
<wsd:MetadataVersion>10</wsd:MetadataVersion>
</wsd:ProbeMatch>
</wsd:ProbeMatches>
</s:Body>
</s:Envelope>"#
)
}
#[test]
fn test_parse_probe_match_extracts_fields() {
let xml = probe_match_xml(
"uuid:12345678-0000-0000-0000-000000000001",
"http://192.168.1.100/onvif/device_service",
);
let root = XmlNode::parse(&xml).unwrap();
let devices = collect_probe_matches(&root);
assert_eq!(devices.len(), 1);
let d = &devices[0];
assert_eq!(d.endpoint, "uuid:12345678-0000-0000-0000-000000000001");
assert_eq!(d.xaddrs, ["http://192.168.1.100/onvif/device_service"]);
assert_eq!(d.scopes, ["onvif://www.onvif.org/name/Camera1"]);
assert!(
d.types
.iter()
.any(|t| t.contains("NetworkVideoTransmitter"))
);
}
#[test]
fn test_parse_multiple_xaddrs() {
let xml = probe_match_xml(
"uuid:aabbccdd-0000-0000-0000-000000000002",
"http://192.168.1.101/onvif/device_service http://10.0.0.1/onvif/device_service",
);
let root = XmlNode::parse(&xml).unwrap();
let devices = collect_probe_matches(&root);
assert_eq!(devices[0].xaddrs.len(), 2);
}
#[test]
fn test_parse_empty_body_returns_empty() {
let xml = r#"<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope">
<s:Body/>
</s:Envelope>"#;
let root = XmlNode::parse(xml).unwrap();
assert!(collect_probe_matches(&root).is_empty());
}
#[test]
fn test_build_nvt_probe_is_valid_xml() {
let xml = build_probe("test-uuid-1234", ProbeTarget::NetworkVideoTransmitter);
assert!(
XmlNode::parse(&xml).is_ok(),
"NVT probe output should be valid XML"
);
assert!(xml.contains("NetworkVideoTransmitter"));
assert!(xml.contains("onvif.org/ver10/network/wsdl"));
assert!(xml.contains("test-uuid-1234"));
assert!(
!xml.contains("Device</"),
"NVT probe must not accidentally request Device type"
);
}
#[test]
fn test_build_device_probe_is_valid_xml() {
let xml = build_probe("test-uuid-5678", ProbeTarget::Device);
assert!(
XmlNode::parse(&xml).is_ok(),
"Device probe output should be valid XML"
);
assert!(xml.contains("tds:Device"));
assert!(xml.contains("onvif.org/ver10/device/wsdl"));
assert!(xml.contains("test-uuid-5678"));
assert!(
!xml.contains("NetworkVideoTransmitter"),
"Device probe must not accidentally request NVT type"
);
}
#[test]
fn test_new_uuid_has_five_parts() {
let uuid = new_uuid();
let parts: Vec<&str> = uuid.split('-').collect();
assert_eq!(parts.len(), 5, "UUID should have 5 dash-separated parts");
}
fn hello_xml(endpoint: &str, xaddrs: &str) -> String {
format!(
r#"<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"
xmlns:wsd="http://schemas.xmlsoap.org/ws/2005/04/discovery"
xmlns:wsa="http://www.w3.org/2005/08/addressing">
<s:Header>
<wsa:Action>http://schemas.xmlsoap.org/ws/2005/04/discovery/Hello</wsa:Action>
</s:Header>
<s:Body>
<wsd:Hello>
<wsa:EndpointReference>
<wsa:Address>{endpoint}</wsa:Address>
</wsa:EndpointReference>
<wsd:Types>dn:NetworkVideoTransmitter</wsd:Types>
<wsd:Scopes>onvif://www.onvif.org/name/Camera1</wsd:Scopes>
<wsd:XAddrs>{xaddrs}</wsd:XAddrs>
<wsd:MetadataVersion>1</wsd:MetadataVersion>
</wsd:Hello>
</s:Body>
</s:Envelope>"#
)
}
fn bye_xml(endpoint: &str) -> String {
format!(
r#"<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"
xmlns:wsd="http://schemas.xmlsoap.org/ws/2005/04/discovery"
xmlns:wsa="http://www.w3.org/2005/08/addressing">
<s:Header>
<wsa:Action>http://schemas.xmlsoap.org/ws/2005/04/discovery/Bye</wsa:Action>
</s:Header>
<s:Body>
<wsd:Bye>
<wsa:EndpointReference>
<wsa:Address>{endpoint}</wsa:Address>
</wsa:EndpointReference>
</wsd:Bye>
</s:Body>
</s:Envelope>"#
)
}
#[test]
fn test_collect_hello_event() {
let xml = hello_xml(
"uuid:aaaa-0000-0000-0000-000000000001",
"http://192.168.1.200/onvif/device_service",
);
let root = XmlNode::parse(&xml).unwrap();
let events = collect_discovery_events(&root);
assert_eq!(events.len(), 1);
match &events[0].0 {
DiscoveryEvent::Hello(d) => {
assert_eq!(d.endpoint, "uuid:aaaa-0000-0000-0000-000000000001");
assert_eq!(d.xaddrs, ["http://192.168.1.200/onvif/device_service"]);
}
DiscoveryEvent::Bye { .. } => panic!("expected Hello"),
}
}
#[test]
fn test_collect_bye_event() {
let xml = bye_xml("uuid:bbbb-0000-0000-0000-000000000002");
let root = XmlNode::parse(&xml).unwrap();
let events = collect_discovery_events(&root);
assert_eq!(events.len(), 1);
match &events[0].0 {
DiscoveryEvent::Bye { endpoint } => {
assert_eq!(endpoint, "uuid:bbbb-0000-0000-0000-000000000002");
}
DiscoveryEvent::Hello(_) => panic!("expected Bye"),
}
}
#[test]
fn test_collect_unknown_action_returns_empty() {
let xml = r#"<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"
xmlns:wsa="http://www.w3.org/2005/08/addressing">
<s:Header>
<wsa:Action>http://schemas.xmlsoap.org/ws/2005/04/discovery/Probe</wsa:Action>
</s:Header>
<s:Body/>
</s:Envelope>"#;
let root = XmlNode::parse(xml).unwrap();
assert!(collect_discovery_events(&root).is_empty());
}
fn hello_with_seq_xml(endpoint: &str, instance_id: u64, msg_num: u64) -> String {
format!(
r#"<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"
xmlns:wsd="http://schemas.xmlsoap.org/ws/2005/04/discovery"
xmlns:wsa="http://www.w3.org/2005/08/addressing">
<s:Header>
<wsa:Action>http://schemas.xmlsoap.org/ws/2005/04/discovery/Hello</wsa:Action>
<wsd:AppSequence InstanceId="{instance_id}" MessageNumber="{msg_num}"/>
</s:Header>
<s:Body>
<wsd:Hello>
<wsa:EndpointReference><wsa:Address>{endpoint}</wsa:Address></wsa:EndpointReference>
<wsd:Types>dn:NetworkVideoTransmitter</wsd:Types>
<wsd:XAddrs>http://192.168.1.50/onvif/device_service</wsd:XAddrs>
</wsd:Hello>
</s:Body>
</s:Envelope>"#
)
}
fn bye_with_seq_xml(endpoint: &str, instance_id: u64, msg_num: u64) -> String {
format!(
r#"<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"
xmlns:wsd="http://schemas.xmlsoap.org/ws/2005/04/discovery"
xmlns:wsa="http://www.w3.org/2005/08/addressing">
<s:Header>
<wsa:Action>http://schemas.xmlsoap.org/ws/2005/04/discovery/Bye</wsa:Action>
<wsd:AppSequence InstanceId="{instance_id}" MessageNumber="{msg_num}"/>
</s:Header>
<s:Body>
<wsd:Bye>
<wsa:EndpointReference><wsa:Address>{endpoint}</wsa:Address></wsa:EndpointReference>
</wsd:Bye>
</s:Body>
</s:Envelope>"#
)
}
fn replay(xmls: &[String]) -> Vec<DiscoveryEvent> {
use std::collections::HashMap;
let mut last_seq: HashMap<String, AppSequence> = HashMap::new();
let mut out = Vec::new();
for xml in xmls {
let root = XmlNode::parse(xml).unwrap();
for (ev, seq) in collect_discovery_events(&root) {
if accept_event(&ev, seq.as_ref(), &mut last_seq) {
out.push(ev);
}
}
}
out
}
#[test]
fn test_reorder_filter_in_order_passes() {
let ep = "uuid:device-aa-0000-0000-000000000001";
let stream = vec![hello_with_seq_xml(ep, 100, 1), bye_with_seq_xml(ep, 100, 2)];
let events = replay(&stream);
assert_eq!(events.len(), 2, "in-order Hello/Bye should both pass");
assert!(matches!(events[0], DiscoveryEvent::Hello(_)));
assert!(matches!(events[1], DiscoveryEvent::Bye { .. }));
}
#[test]
fn test_reorder_filter_drops_stale_bye() {
let ep = "uuid:device-bb-0000-0000-000000000002";
let stream = vec![hello_with_seq_xml(ep, 100, 5), bye_with_seq_xml(ep, 100, 3)];
let events = replay(&stream);
assert_eq!(events.len(), 1, "stale Bye must be dropped");
assert!(matches!(events[0], DiscoveryEvent::Hello(_)));
}
#[test]
fn test_reorder_filter_drops_equal_message_number_bye() {
let ep = "uuid:device-cc-0000-0000-000000000003";
let stream = vec![hello_with_seq_xml(ep, 100, 7), bye_with_seq_xml(ep, 100, 7)];
let events = replay(&stream);
assert_eq!(events.len(), 1, "Bye with same msg# is a duplicate");
assert!(matches!(events[0], DiscoveryEvent::Hello(_)));
}
#[test]
fn test_reorder_filter_accepts_bye_after_restart() {
let ep = "uuid:device-dd-0000-0000-000000000004";
let stream = vec![hello_with_seq_xml(ep, 100, 9), bye_with_seq_xml(ep, 200, 1)];
let events = replay(&stream);
assert_eq!(events.len(), 2, "Bye after device restart is real");
assert!(matches!(events[1], DiscoveryEvent::Bye { .. }));
}
#[test]
fn test_reorder_filter_per_endpoint_isolation() {
let ep_a = "uuid:device-ee-0000-0000-00000000000a";
let ep_b = "uuid:device-ee-0000-0000-00000000000b";
let stream = vec![
hello_with_seq_xml(ep_a, 100, 50),
bye_with_seq_xml(ep_b, 100, 1),
];
let events = replay(&stream);
assert_eq!(events.len(), 2);
assert!(matches!(events[1], DiscoveryEvent::Bye { ref endpoint } if endpoint == ep_b));
}
#[test]
fn test_reorder_filter_accepts_bye_when_no_appsequence() {
let ep = "uuid:device-ff-0000-0000-000000000006";
let stream = vec![hello_xml(ep, "http://x"), bye_xml(ep)];
let events = replay(&stream);
assert_eq!(events.len(), 2);
}
#[tokio::test]
async fn test_probe_inner_receives_probe_match() {
use std::time::Duration;
use tokio::net::UdpSocket;
let mock = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let mock_addr = mock.local_addr().unwrap();
let target = format!("127.0.0.1:{}", mock_addr.port());
let canned = probe_match_xml(
"uuid:mock-device-0001-0000-000000000001",
"http://192.168.1.200/onvif/device_service",
);
tokio::spawn(async move {
let mut buf = vec![0u8; UDP_MAX_SIZE];
if let Ok((_, src)) = mock.recv_from(&mut buf).await {
let _ = mock.send_to(canned.as_bytes(), src).await;
}
});
let devices = probe_inner(1, Duration::from_millis(500), Duration::ZERO, &target)
.await
.unwrap();
assert_eq!(devices.len(), 1, "should find exactly one device");
assert_eq!(
devices[0].endpoint,
"uuid:mock-device-0001-0000-000000000001"
);
assert_eq!(
devices[0].xaddrs,
["http://192.168.1.200/onvif/device_service"]
);
assert_eq!(devices[0].scopes, ["onvif://www.onvif.org/name/Camera1"]);
}
#[tokio::test]
async fn test_probe_inner_deduplicates_responses() {
use std::time::Duration;
use tokio::net::UdpSocket;
let mock = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let mock_addr = mock.local_addr().unwrap();
let target = format!("127.0.0.1:{}", mock_addr.port());
let canned = probe_match_xml(
"uuid:mock-device-dup-0000-000000000002",
"http://192.168.1.201/onvif/device_service",
);
tokio::spawn(async move {
let mut buf = vec![0u8; UDP_MAX_SIZE];
if let Ok((_, src)) = mock.recv_from(&mut buf).await {
let _ = mock.send_to(canned.as_bytes(), src).await;
let _ = mock.send_to(canned.as_bytes(), src).await;
}
});
let devices = probe_inner(1, Duration::from_millis(500), Duration::ZERO, &target)
.await
.unwrap();
assert_eq!(devices.len(), 1, "duplicates should be merged into one");
}
#[tokio::test]
async fn test_probe_inner_ignores_garbage_response() {
use std::time::Duration;
use tokio::net::UdpSocket;
let mock = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let mock_addr = mock.local_addr().unwrap();
let target = format!("127.0.0.1:{}", mock_addr.port());
tokio::spawn(async move {
let mut buf = vec![0u8; UDP_MAX_SIZE];
if let Ok((_, src)) = mock.recv_from(&mut buf).await {
let _ = mock.send_to(b"not xml at all !!!", src).await;
}
});
let devices = probe_inner(1, Duration::from_millis(300), Duration::ZERO, &target)
.await
.unwrap();
assert!(
devices.is_empty(),
"garbage response should yield no devices"
);
}
#[tokio::test]
async fn test_probe_inner_multi_round_dedups_and_sleeps() {
use std::time::{Duration, Instant};
use tokio::net::UdpSocket;
let mock = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let mock_addr = mock.local_addr().unwrap();
let target = format!("127.0.0.1:{}", mock_addr.port());
let canned = probe_match_xml(
"uuid:mock-device-multi-0000-000000000003",
"http://192.168.1.202/onvif/device_service",
);
tokio::spawn(async move {
let mut buf = vec![0u8; UDP_MAX_SIZE];
for _ in 0..3 {
if let Ok((_, src)) = mock.recv_from(&mut buf).await {
let _ = mock.send_to(canned.as_bytes(), src).await;
}
}
});
let started = Instant::now();
let devices = probe_inner(
2,
Duration::from_millis(200),
Duration::from_millis(300),
&target,
)
.await
.unwrap();
let elapsed = started.elapsed();
assert_eq!(
devices.len(),
1,
"same endpoint across rounds should dedup to one"
);
assert!(
elapsed >= Duration::from_millis(700),
"expected >= 700ms for 2 rounds + 1 interval, got {elapsed:?}"
);
}
const MALFORMED_PROBE_MATCH: &str = r#"<?xml version="1.0" encoding="UTF-8"?>
<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"
xmlns:wsd="http://schemas.xmlsoap.org/ws/2005/04/discovery"
xmlns:wsa="http://www.w3.org/2005/08/addressing">
<s:Body>
<wsd:ProbeMatches>
<wsd:ProbeMatch>
<wsa:EndpointReference>
<wsa:Address>uuid:malformed-scope-0000-0000-000000000099</wsa:Address>
</wsa:EndpointReference>
<wsd:Types>dn:NetworkVideoTransmitter</wsd:Types>
<wsd:Scopes>onvif://www.onvif.org/name/Alpha&Beta</wsd:Scopes>
<wsd:XAddrs>http://192.168.1.99/onvif/device_service</wsd:XAddrs>
</wsd:ProbeMatch>
</wsd:ProbeMatches>
</s:Body>
</s:Envelope>"#;
#[test]
fn test_strict_parser_rejects_malformed_xml() {
assert!(
XmlNode::parse(MALFORMED_PROBE_MATCH).is_err(),
"fixture should break XmlNode so the lenient fallback is exercised"
);
}
#[test]
fn test_lenient_parser_recovers_malformed_xml() {
let devices = collect_probe_matches_lenient(MALFORMED_PROBE_MATCH);
assert_eq!(
devices.len(),
1,
"lenient scanner should extract one device"
);
let d = &devices[0];
assert_eq!(d.endpoint, "uuid:malformed-scope-0000-0000-000000000099");
assert_eq!(d.xaddrs, ["http://192.168.1.99/onvif/device_service"]);
assert!(
d.types
.iter()
.any(|t| t.contains("NetworkVideoTransmitter")),
"types should include NetworkVideoTransmitter"
);
}
#[test]
fn test_lenient_parser_drops_missing_endpoint() {
let xml = r#"<wsd:ProbeMatch xmlns:wsd="...">
<wsd:Types>dn:NetworkVideoTransmitter</wsd:Types>
<wsd:XAddrs>http://10.0.0.1/onvif/device_service</wsd:XAddrs>
</wsd:ProbeMatch>"#;
let devices = collect_probe_matches_lenient(xml);
assert!(devices.is_empty());
}
#[test]
fn test_lenient_parser_distinguishes_probematch_from_probematches() {
let xml = r#"<wsd:ProbeMatches xmlns:wsd="..."/>"#;
assert!(collect_probe_matches_lenient(xml).is_empty());
}
#[tokio::test]
async fn test_probe_once_sends_nvt_and_device_probes() {
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::net::UdpSocket;
let mock = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let mock_addr = mock.local_addr().unwrap();
let target = format!("127.0.0.1:{}", mock_addr.port());
let captured: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let captured_task = Arc::clone(&captured);
tokio::spawn(async move {
let mut buf = vec![0u8; UDP_MAX_SIZE];
for _ in 0..16 {
match tokio::time::timeout(Duration::from_millis(400), mock.recv_from(&mut buf))
.await
{
Ok(Ok((len, _))) => {
if let Ok(text) = std::str::from_utf8(&buf[..len]) {
captured_task.lock().unwrap().push(text.to_string());
}
}
_ => break,
}
}
});
let _ = probe_inner(1, Duration::from_millis(300), Duration::ZERO, &target)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let probes = captured.lock().unwrap().clone();
assert!(
probes.len() >= 2,
"expected at least one NVT + one Device probe, got {}",
probes.len()
);
assert!(
probes.iter().any(|p| p.contains("NetworkVideoTransmitter")),
"no probe asked for NetworkVideoTransmitter: {probes:?}"
);
assert!(
probes.iter().any(|p| p.contains("tds:Device")),
"no probe asked for tds:Device: {probes:?}"
);
}
#[tokio::test]
async fn test_probe_inner_zero_rounds_is_noop() {
use std::time::{Duration, Instant};
let started = Instant::now();
let devices = probe_inner(0, Duration::from_secs(30), Duration::ZERO, "127.0.0.1:1")
.await
.unwrap();
assert!(devices.is_empty(), "zero rounds should yield no devices");
assert!(
started.elapsed() < Duration::from_millis(200),
"zero rounds must not block on the 30s timeout"
);
}
#[tokio::test]
async fn test_probe_unicast_finds_device() {
use std::time::Duration;
use tokio::net::UdpSocket;
let mock = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let mock_addr = mock.local_addr().unwrap();
let canned = probe_match_xml(
"uuid:unicast-mock-0000-0000-000000000010",
"http://127.0.0.1/onvif/device_service",
);
tokio::spawn(async move {
let mut buf = vec![0u8; UDP_MAX_SIZE];
for _ in 0..2 {
if let Ok((_, src)) = mock.recv_from(&mut buf).await {
let _ = mock.send_to(canned.as_bytes(), src).await;
}
}
});
let devices = probe_unicast_inner(Duration::from_millis(500), &mock_addr.to_string())
.await
.unwrap();
assert_eq!(devices.len(), 1, "duplicate responses should dedup");
assert_eq!(
devices[0].endpoint,
"uuid:unicast-mock-0000-0000-000000000010"
);
}
#[tokio::test]
async fn test_probe_inner_drop_returns_promptly() {
use std::time::{Duration, Instant};
let started = Instant::now();
tokio::select! {
_ = probe_inner(1, Duration::from_secs(30), Duration::ZERO, "127.0.0.1:1") => {}
_ = tokio::time::sleep(Duration::from_millis(50)) => {}
}
assert!(
started.elapsed() < Duration::from_secs(2),
"select cancellation should release the probe future, took {:?}",
started.elapsed()
);
}
#[tokio::test]
async fn test_probe_unicast_silent_target_returns_empty() {
use std::time::{Duration, Instant};
let probe_target = {
let s = tokio::net::UdpSocket::bind("127.0.0.1:0").await.unwrap();
let addr = s.local_addr().unwrap();
drop(s);
addr.to_string()
};
let started = Instant::now();
let devices = probe_unicast_inner(Duration::from_millis(150), &probe_target)
.await
.unwrap();
assert!(devices.is_empty());
assert!(
started.elapsed() < Duration::from_millis(500),
"should return shortly after the configured timeout"
);
}
}