1use quick_xml::events::Event;
2use quick_xml::Reader;
3use std::collections::HashSet;
4use std::io;
5use std::net::{SocketAddr, UdpSocket};
6use std::str;
7use std::sync::mpsc::{self, TryRecvError};
8use std::thread;
9use std::time::Duration;
10
11#[derive(Debug, Default, Hash, Eq, PartialEq)]
12pub struct ProbeMatch {
13 urn: String,
14 name: String,
15 hardware: String,
16 location: String,
17 types: Vec<String>,
18 xaddrs: Vec<String>,
19 scopes: Vec<String>,
20}
21impl ProbeMatch {
22 pub fn urn(&self) -> &String {
23 &self.urn
24 }
25 pub fn name(&self) -> &String {
26 &self.name
27 }
28 pub fn hardware(&self) -> &String {
29 &self.hardware
30 }
31 pub fn location(&self) -> &String {
32 &self.location
33 }
34 pub fn types(&self) -> &Vec<String> {
35 &self.types
36 }
37 pub fn xaddrs(&self) -> &Vec<String> {
38 &self.xaddrs
39 }
40 pub fn scopes(&self) -> &Vec<String> {
41 &self.scopes
42 }
43}
44
45fn remove_namespace_prefix(st: &str) -> &str {
46 let parts: Vec<&str> = st.split(':').collect();
47 parts[parts.len() - 1]
48}
49
50fn read_message(socket: &UdpSocket) -> Result<String, io::Error> {
51 let mut buf: [u8; 65_535] = [0; 65_535];
52 let result = socket.recv(&mut buf);
54 result.map(move |_| str::from_utf8(&buf).unwrap().to_string())
55}
56
57fn parse_probe_match(xml: &str) -> Result<ProbeMatch, String> {
58 let mut reader = Reader::from_str(&xml);
59 reader.trim_text(true);
60 let mut buf = Vec::new();
61 loop {
62 match reader.read_event(&mut buf) {
63 Ok(Event::End(ref e)) => {
64 match e.name() {
65 b"SOAP-ENV:Header" => break,
66 _ => (),
67 };
68 }
69 Ok(Event::Eof) => return Err("Finished before header".to_string()),
70 Err(e) => panic!("Error at position {}: {:?}", reader.buffer_position(), e),
71 _ => (),
72 }
73 }
74 buf.clear();
75 let mut buf2 = Vec::new();
76 let mut stack = Vec::<String>::new();
78 let mut probe_match = ProbeMatch::default();
79
80 loop {
81 match reader.read_event(&mut buf2) {
82 Ok(Event::Start(ref e)) => {
83 let name = str::from_utf8(e.name()).unwrap().to_string();
84 stack.push(name);
86 }
87 Ok(Event::End(ref _e)) => {
88 let ended_tag = stack.pop().expect("Stack can't be empty, but it was.");
89 if ended_tag == "SOAP-ENV:Body" {
90 break;
91 }
92 }
96 Ok(Event::Text(e)) => {
97 let text = str::from_utf8(e.escaped())
98 .expect("UTF decode error")
99 .to_string();
100 let tag = stack.get(stack.len() - 1).expect("Stack can't be empty");
101 let tag = remove_namespace_prefix(tag);
102 match tag {
103 "Address" => probe_match.urn = text,
104 "Types" => {
105 probe_match.types = text
106 .split(' ')
107 .map(remove_namespace_prefix)
108 .map(String::from)
109 .collect()
110 }
111 "Scopes" => probe_match.scopes = text.split(' ').map(String::from).collect(),
112 "XAddrs" => probe_match.xaddrs = text.split(' ').map(String::from).collect(),
113 _ => {
114 }
116 };
117 }
118 Ok(Event::Eof) => break,
119 Err(e) => panic!("Error at position {}: {:?}", reader.buffer_position(), e),
120 _ => (),
121 }
122 }
123 for scope in &probe_match.scopes {
124 if scope.starts_with("onvif://www.onvif.org/hardware/") {
125 let parts: Vec<&str> = scope.split('/').collect();
126 probe_match.hardware = (parts[parts.len() - 1]).into();
127 } else if scope.starts_with("onvif://www.onvif.org/location/") {
128 let parts: Vec<&str> = scope.split('/').collect();
129 probe_match.location = (parts[parts.len() - 1]).into();
130 } else if scope.starts_with("onvif://www.onvif.org/name/") {
131 let parts: Vec<&str> = scope.split('/').collect();
132 probe_match.name = (parts[parts.len() - 1]).into();
133 }
134 }
135 Ok(probe_match)
136}
137
138pub fn start_probe(probe_duration: &Duration) -> Result<Vec<ProbeMatch>, io::Error> {
139 let multicast_addr: SocketAddr = "239.255.255.250:3702".parse().unwrap();
140 println!("Started probe");
141 let soap_tmpl = r#"
142 <?xml version="1.0" encoding="UTF-8"?>
143<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope"
144 xmlns:a="http://schemas.xmlsoap.org/ws/2004/08/addressing">
145 <s:Header>
146 <a:Action s:mustUnderstand="1">
147 http://schemas.xmlsoap.org/ws/2005/04/discovery/Probe</a:Action>
148 <a:MessageID>uuid:__uuid__</a:MessageID>
149 <a:ReplyTo>
150 <a:Address>http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous</a:Address>
151 </a:ReplyTo>
152 <a:To s:mustUnderstand="1">urn:schemas-xmlsoap-org:ws:2005:04:discovery</a:To>
153 </s:Header>
154 <s:Body>
155 <Probe xmlns="http://schemas.xmlsoap.org/ws/2005/04/discovery">
156 <d:Types xmlns:d="http://schemas.xmlsoap.org/ws/2005/04/discovery"
157 xmlns:dp0="http://www.onvif.org/ver10/network/wsdl">
158 dp0:__type__</d:Types>
159 </Probe>
160 </s:Body>
161</s:Envelope>
162 "#;
163 let types = vec!["NetworkVideoTransmitter", "Device", "NetworkVideoDisplay"];
164 let soap_reqs: Vec<String> = types
165 .iter()
166 .map(|device_type| {
167 let s: &str = soap_tmpl.clone();
168 let s = s.replace("__type__", device_type);
169 let s = s.replace("__uuid__", "7c208633-8086-4a83-a9d4-b3fd8673b8f7"); s
171 })
172 .collect();
173 let all_interfaces = SocketAddr::from(([0, 0, 0, 0], 0));
174 let socket = UdpSocket::bind(&all_interfaces).expect("Could not bind to udp socket");
175 let read_socket = socket.try_clone().unwrap();
176 read_socket
177 .set_read_timeout(Some(Duration::from_secs(1)))
178 .expect("set_read_timeout call failed");
179 let (thread_stop_tx, thread_stop_rx) = mpsc::channel();
180 let (devices_tx, devices_rx) = mpsc::channel();
181 let _read_thread_handle = thread::spawn(move || {
182 loop {
183 if let Ok(message) = read_message(&read_socket) {
184 devices_tx
185 .send(parse_probe_match(&message))
186 .expect("Could not send found device over channel");
187 }
188 match thread_stop_rx.try_recv() {
189 Ok(_) | Err(TryRecvError::Disconnected) => {
190 println!("Stopping receive thread");
191 break;
192 }
193 Err(TryRecvError::Empty) => {}
194 }
195 }
197 });
198 let _broadcast_thread = thread::spawn(move || {
199 for _ in 1..5 {
200 for soap_req in &soap_reqs {
201 socket
202 .send_to(soap_req.as_bytes(), &multicast_addr)
203 .expect("Could not send req");
204 thread::sleep(Duration::from_millis(100));
205 }
206 }
207 });
208 let mut found_devices = HashSet::new();
211 for _ in 1..10 {
212 let dev = devices_rx.recv().unwrap().unwrap();
213 found_devices.insert(dev);
214 }
215 thread::sleep(*probe_duration);
216 let _ = thread_stop_tx.send(());
217
218 Ok(found_devices.into_iter().collect())
219}
220
221#[cfg(test)]
222mod tests {
223 #[test]
224 fn it_works() {
225 assert_eq!(2 + 2, 4);
226 }
227
228 #[test]
229 fn probe_xml_parse() {
230 use super::*;
231 use std::fs;
232 use std::io::prelude::*;
233
234 let mut fl = fs::File::open("src/resources/probe-discovery-response.xml")
235 .expect("Could not find probe discovery response xml file");
236 let mut probe_discovery_response = String::new();
237 fl.read_to_string(&mut probe_discovery_response)
238 .expect("something went wrong reading the file");
239
240 let probe_match = parse_probe_match(&probe_discovery_response).unwrap();
241 let expected = ProbeMatch {
242 urn: "urn:uuid:a91b83ca-3388-7688-99aa-101806a776fb".into(),
243 name: "NVT".into(),
244 hardware: "IPC-model".into(),
245 location: "china".into(),
246 types: vec!["NetworkVideoTransmitter".into()],
247 xaddrs: vec!["http://192.168.1.70:8899/onvif/device_service".into()],
248 scopes: vec![
249 "onvif://www.onvif.org/type/video_encoder",
250 "onvif://www.onvif.org/type/audio_encoder",
251 "onvif://www.onvif.org/hardware/IPC-model",
252 "onvif://www.onvif.org/location/country/china",
253 "onvif://www.onvif.org/name/NVT",
254 "onvif://www.onvif.org/Profile/Streaming",
255 ].into_iter()
256 .map(String::from)
257 .collect(),
258 };
259 assert_eq!(expected, probe_match);
260 assert_eq!(expected.urn(), probe_match.urn());
261 }
262}