use std::collections::HashSet;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::time::{Instant, timeout};
use crate::soap::XmlNode;
const WSD_MULTICAST: &str = "239.255.255.250:3702";
const WSD_MULTICAST_ADDR: std::net::Ipv4Addr = std::net::Ipv4Addr::new(239, 255, 255, 250);
const UDP_MAX_SIZE: usize = 65_535;
#[derive(Debug, Clone)]
pub struct DiscoveredDevice {
pub endpoint: String,
pub types: Vec<String>,
pub scopes: Vec<String>,
pub xaddrs: Vec<String>,
}
impl DiscoveredDevice {
fn from_xml(node: &XmlNode) -> Self {
let endpoint = node
.path(&["EndpointReference", "Address"])
.map(|n| n.text().to_string())
.unwrap_or_default();
let types = node
.child("Types")
.map(|n| n.text().split_whitespace().map(str::to_string).collect())
.unwrap_or_default();
let scopes = node
.child("Scopes")
.map(|n| n.text().split_whitespace().map(str::to_string).collect())
.unwrap_or_default();
let xaddrs = node
.child("XAddrs")
.map(|n| n.text().split_whitespace().map(str::to_string).collect())
.unwrap_or_default();
Self {
endpoint,
types,
scopes,
xaddrs,
}
}
}
#[derive(Debug, Clone)]
pub enum DiscoveryEvent {
Hello(DiscoveredDevice),
Bye {
endpoint: String,
},
}
pub async fn probe(timeout_dur: Duration) -> Vec<DiscoveredDevice> {
probe_inner(timeout_dur, WSD_MULTICAST)
.await
.unwrap_or_default()
}
pub async fn listen(timeout_dur: Duration) -> Vec<DiscoveryEvent> {
listen_inner(timeout_dur).await.unwrap_or_default()
}
async fn probe_inner(
timeout_dur: Duration,
target: &str,
) -> std::io::Result<Vec<DiscoveredDevice>> {
use std::net::{Ipv4Addr, SocketAddrV4};
use std::sync::{Arc, Mutex};
let message_id = new_uuid();
let xml = Arc::new(build_probe(&message_id));
let bind_ips: Vec<Ipv4Addr> = std::iter::once(Ipv4Addr::UNSPECIFIED)
.chain(local_ipv4_addrs())
.collect();
let received: Arc<Mutex<Vec<Vec<u8>>>> = Arc::new(Mutex::new(Vec::new()));
let mut handles = Vec::new();
for ip in bind_ips {
use socket2::{Domain, Protocol, Socket, Type};
let Ok(raw) = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) else {
continue;
};
let addr: std::net::SocketAddr = SocketAddrV4::new(ip, 0).into();
if raw.bind(&addr.into()).is_err() {
continue;
}
let _ = raw.set_multicast_ttl_v4(4);
if ip != Ipv4Addr::UNSPECIFIED {
let _ = raw.set_multicast_if_v4(&ip);
}
let _ = raw.set_nonblocking(true);
let Ok(sock) = UdpSocket::from_std(raw.into()) else {
continue;
};
let _ = sock.send_to(xml.as_bytes(), target).await;
let received = Arc::clone(&received);
let handle = tokio::task::spawn(async move {
let mut buf = vec![0u8; UDP_MAX_SIZE];
let deadline = Instant::now() + timeout_dur;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match timeout(remaining, sock.recv_from(&mut buf)).await {
Ok(Ok((len, _))) => {
received
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(buf[..len].to_vec());
}
Ok(Err(_)) => continue, Err(_) => break, }
}
});
handles.push(handle);
}
for h in handles {
let _ = h.await;
}
let raw = Arc::try_unwrap(received)
.unwrap_or_default()
.into_inner()
.unwrap_or_else(|e| e.into_inner());
let mut devices: Vec<DiscoveredDevice> = Vec::new();
let mut seen: HashSet<String> = HashSet::new();
for data in raw {
let Ok(text) = std::str::from_utf8(&data) else {
continue;
};
if let Ok(root) = XmlNode::parse(text) {
for d in collect_probe_matches(&root) {
if seen.insert(d.endpoint.clone()) {
devices.push(d);
}
}
}
}
Ok(devices)
}
fn local_ipv4_addrs() -> Vec<std::net::Ipv4Addr> {
if_addrs::get_if_addrs()
.unwrap_or_default()
.into_iter()
.filter_map(|iface| {
if iface.is_loopback() {
return None;
}
match iface.addr {
if_addrs::IfAddr::V4(addr) => Some(addr.ip),
_ => None,
}
})
.collect()
}
async fn listen_inner(timeout_dur: Duration) -> std::io::Result<Vec<DiscoveryEvent>> {
use std::net::Ipv4Addr;
let socket = UdpSocket::bind("0.0.0.0:3702").await?;
socket.join_multicast_v4(WSD_MULTICAST_ADDR, Ipv4Addr::UNSPECIFIED)?;
let mut buf = vec![0u8; UDP_MAX_SIZE];
let mut events: Vec<DiscoveryEvent> = Vec::new();
let deadline = Instant::now() + timeout_dur;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
match timeout(remaining, socket.recv_from(&mut buf)).await {
Ok(Ok((len, _addr))) => {
let Ok(text) = std::str::from_utf8(&buf[..len]) else {
continue;
};
if let Ok(root) = XmlNode::parse(text) {
events.extend(collect_discovery_events(&root));
}
}
_ => break,
}
}
Ok(events)
}
fn collect_discovery_events(root: &XmlNode) -> Vec<DiscoveryEvent> {
let action = root
.path(&["Header", "Action"])
.map(|n| n.text())
.unwrap_or("");
let body = root.child("Body").unwrap_or(root);
if action.ends_with("/Hello") {
if let Some(hello) = body.child("Hello") {
return vec![DiscoveryEvent::Hello(DiscoveredDevice::from_xml(hello))];
}
} else if action.ends_with("/Bye") {
if let Some(bye) = body.child("Bye") {
let endpoint = bye
.path(&["EndpointReference", "Address"])
.map(|n| n.text().to_string())
.unwrap_or_default();
return vec![DiscoveryEvent::Bye { endpoint }];
}
}
vec![]
}
fn collect_probe_matches(root: &XmlNode) -> Vec<DiscoveredDevice> {
let body = root.child("Body").unwrap_or(root);
let matches = body.child("ProbeMatches").unwrap_or(body);
matches
.children_named("ProbeMatch")
.map(DiscoveredDevice::from_xml)
.collect()
}
fn build_probe(message_id: &str) -> String {
format!(
concat!(
r#"<?xml version="1.0" encoding="UTF-8"?>"#,
r#"<s:Envelope"#,
r#" xmlns:s="http://www.w3.org/2003/05/soap-envelope""#,
r#" xmlns:wsa="http://www.w3.org/2005/08/addressing""#,
r#" xmlns:wsd="http://schemas.xmlsoap.org/ws/2005/04/discovery""#,
r#" xmlns:dn="http://www.onvif.org/ver10/network/wsdl">"#,
r#"<s:Header>"#,
r#"<wsa:Action>http://schemas.xmlsoap.org/ws/2005/04/discovery/Probe</wsa:Action>"#,
r#"<wsa:MessageID>uuid:{}</wsa:MessageID>"#,
r#"<wsa:To>urn:schemas-xmlsoap-org:ws:2005:04:discovery</wsa:To>"#,
r#"</s:Header>"#,
r#"<s:Body>"#,
r#"<wsd:Probe><wsd:Types>dn:NetworkVideoTransmitter</wsd:Types></wsd:Probe>"#,
r#"</s:Body>"#,
r#"</s:Envelope>"#,
),
message_id
)
}
fn new_uuid() -> String {
format!(
"{:08x}-{:04x}-4{:03x}-{:04x}-{:012x}",
rand::random::<u32>(),
rand::random::<u16>(),
rand::random::<u16>() & 0x0fff,
(rand::random::<u16>() & 0x3fff) | 0x8000,
rand::random::<u64>() & 0x0000_ffff_ffff_ffff,
)
}
#[cfg(test)]
mod tests {
use super::*;
fn probe_match_xml(endpoint: &str, xaddrs: &str) -> String {
format!(
r#"<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"
xmlns:wsd="http://schemas.xmlsoap.org/ws/2005/04/discovery"
xmlns:wsa="http://www.w3.org/2005/08/addressing">
<s:Body>
<wsd:ProbeMatches>
<wsd:ProbeMatch>
<wsa:EndpointReference>
<wsa:Address>{endpoint}</wsa:Address>
</wsa:EndpointReference>
<wsd:Types>dn:NetworkVideoTransmitter</wsd:Types>
<wsd:Scopes>onvif://www.onvif.org/name/Camera1</wsd:Scopes>
<wsd:XAddrs>{xaddrs}</wsd:XAddrs>
<wsd:MetadataVersion>10</wsd:MetadataVersion>
</wsd:ProbeMatch>
</wsd:ProbeMatches>
</s:Body>
</s:Envelope>"#
)
}
#[test]
fn test_parse_probe_match_extracts_fields() {
let xml = probe_match_xml(
"uuid:12345678-0000-0000-0000-000000000001",
"http://192.168.1.100/onvif/device_service",
);
let root = XmlNode::parse(&xml).unwrap();
let devices = collect_probe_matches(&root);
assert_eq!(devices.len(), 1);
let d = &devices[0];
assert_eq!(d.endpoint, "uuid:12345678-0000-0000-0000-000000000001");
assert_eq!(d.xaddrs, ["http://192.168.1.100/onvif/device_service"]);
assert_eq!(d.scopes, ["onvif://www.onvif.org/name/Camera1"]);
assert!(
d.types
.iter()
.any(|t| t.contains("NetworkVideoTransmitter"))
);
}
#[test]
fn test_parse_multiple_xaddrs() {
let xml = probe_match_xml(
"uuid:aabbccdd-0000-0000-0000-000000000002",
"http://192.168.1.101/onvif/device_service http://10.0.0.1/onvif/device_service",
);
let root = XmlNode::parse(&xml).unwrap();
let devices = collect_probe_matches(&root);
assert_eq!(devices[0].xaddrs.len(), 2);
}
#[test]
fn test_parse_empty_body_returns_empty() {
let xml = r#"<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope">
<s:Body/>
</s:Envelope>"#;
let root = XmlNode::parse(xml).unwrap();
assert!(collect_probe_matches(&root).is_empty());
}
#[test]
fn test_build_probe_is_valid_xml() {
let xml = build_probe("test-uuid-1234");
assert!(
XmlNode::parse(&xml).is_ok(),
"build_probe output should be valid XML"
);
assert!(xml.contains("NetworkVideoTransmitter"));
assert!(xml.contains("test-uuid-1234"));
}
#[test]
fn test_new_uuid_has_five_parts() {
let uuid = new_uuid();
let parts: Vec<&str> = uuid.split('-').collect();
assert_eq!(parts.len(), 5, "UUID should have 5 dash-separated parts");
}
fn hello_xml(endpoint: &str, xaddrs: &str) -> String {
format!(
r#"<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"
xmlns:wsd="http://schemas.xmlsoap.org/ws/2005/04/discovery"
xmlns:wsa="http://www.w3.org/2005/08/addressing">
<s:Header>
<wsa:Action>http://schemas.xmlsoap.org/ws/2005/04/discovery/Hello</wsa:Action>
</s:Header>
<s:Body>
<wsd:Hello>
<wsa:EndpointReference>
<wsa:Address>{endpoint}</wsa:Address>
</wsa:EndpointReference>
<wsd:Types>dn:NetworkVideoTransmitter</wsd:Types>
<wsd:Scopes>onvif://www.onvif.org/name/Camera1</wsd:Scopes>
<wsd:XAddrs>{xaddrs}</wsd:XAddrs>
<wsd:MetadataVersion>1</wsd:MetadataVersion>
</wsd:Hello>
</s:Body>
</s:Envelope>"#
)
}
fn bye_xml(endpoint: &str) -> String {
format!(
r#"<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"
xmlns:wsd="http://schemas.xmlsoap.org/ws/2005/04/discovery"
xmlns:wsa="http://www.w3.org/2005/08/addressing">
<s:Header>
<wsa:Action>http://schemas.xmlsoap.org/ws/2005/04/discovery/Bye</wsa:Action>
</s:Header>
<s:Body>
<wsd:Bye>
<wsa:EndpointReference>
<wsa:Address>{endpoint}</wsa:Address>
</wsa:EndpointReference>
</wsd:Bye>
</s:Body>
</s:Envelope>"#
)
}
#[test]
fn test_collect_hello_event() {
let xml = hello_xml(
"uuid:aaaa-0000-0000-0000-000000000001",
"http://192.168.1.200/onvif/device_service",
);
let root = XmlNode::parse(&xml).unwrap();
let events = collect_discovery_events(&root);
assert_eq!(events.len(), 1);
match &events[0] {
DiscoveryEvent::Hello(d) => {
assert_eq!(d.endpoint, "uuid:aaaa-0000-0000-0000-000000000001");
assert_eq!(d.xaddrs, ["http://192.168.1.200/onvif/device_service"]);
}
DiscoveryEvent::Bye { .. } => panic!("expected Hello"),
}
}
#[test]
fn test_collect_bye_event() {
let xml = bye_xml("uuid:bbbb-0000-0000-0000-000000000002");
let root = XmlNode::parse(&xml).unwrap();
let events = collect_discovery_events(&root);
assert_eq!(events.len(), 1);
match &events[0] {
DiscoveryEvent::Bye { endpoint } => {
assert_eq!(endpoint, "uuid:bbbb-0000-0000-0000-000000000002");
}
DiscoveryEvent::Hello(_) => panic!("expected Bye"),
}
}
#[test]
fn test_collect_unknown_action_returns_empty() {
let xml = r#"<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"
xmlns:wsa="http://www.w3.org/2005/08/addressing">
<s:Header>
<wsa:Action>http://schemas.xmlsoap.org/ws/2005/04/discovery/Probe</wsa:Action>
</s:Header>
<s:Body/>
</s:Envelope>"#;
let root = XmlNode::parse(xml).unwrap();
assert!(collect_discovery_events(&root).is_empty());
}
#[tokio::test]
async fn test_probe_inner_receives_probe_match() {
use std::time::Duration;
use tokio::net::UdpSocket;
let mock = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let mock_addr = mock.local_addr().unwrap();
let target = format!("127.0.0.1:{}", mock_addr.port());
let canned = probe_match_xml(
"uuid:mock-device-0001-0000-000000000001",
"http://192.168.1.200/onvif/device_service",
);
tokio::spawn(async move {
let mut buf = vec![0u8; UDP_MAX_SIZE];
if let Ok((_, src)) = mock.recv_from(&mut buf).await {
let _ = mock.send_to(canned.as_bytes(), src).await;
}
});
let devices = probe_inner(Duration::from_millis(500), &target)
.await
.unwrap();
assert_eq!(devices.len(), 1, "should find exactly one device");
assert_eq!(
devices[0].endpoint,
"uuid:mock-device-0001-0000-000000000001"
);
assert_eq!(
devices[0].xaddrs,
["http://192.168.1.200/onvif/device_service"]
);
assert_eq!(devices[0].scopes, ["onvif://www.onvif.org/name/Camera1"]);
}
#[tokio::test]
async fn test_probe_inner_deduplicates_responses() {
use std::time::Duration;
use tokio::net::UdpSocket;
let mock = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let mock_addr = mock.local_addr().unwrap();
let target = format!("127.0.0.1:{}", mock_addr.port());
let canned = probe_match_xml(
"uuid:mock-device-dup-0000-000000000002",
"http://192.168.1.201/onvif/device_service",
);
tokio::spawn(async move {
let mut buf = vec![0u8; UDP_MAX_SIZE];
if let Ok((_, src)) = mock.recv_from(&mut buf).await {
let _ = mock.send_to(canned.as_bytes(), src).await;
let _ = mock.send_to(canned.as_bytes(), src).await;
}
});
let devices = probe_inner(Duration::from_millis(500), &target)
.await
.unwrap();
assert_eq!(devices.len(), 1, "duplicates should be merged into one");
}
#[tokio::test]
async fn test_probe_inner_ignores_garbage_response() {
use std::time::Duration;
use tokio::net::UdpSocket;
let mock = UdpSocket::bind("0.0.0.0:0").await.unwrap();
let mock_addr = mock.local_addr().unwrap();
let target = format!("127.0.0.1:{}", mock_addr.port());
tokio::spawn(async move {
let mut buf = vec![0u8; UDP_MAX_SIZE];
if let Ok((_, src)) = mock.recv_from(&mut buf).await {
let _ = mock.send_to(b"not xml at all !!!", src).await;
}
});
let devices = probe_inner(Duration::from_millis(300), &target)
.await
.unwrap();
assert!(
devices.is_empty(),
"garbage response should yield no devices"
);
}
}