onvif/
discovery.rs

1use quick_xml::events::Event;
2use quick_xml::Reader;
3use std::collections::HashSet;
4use std::io;
5use std::net::{SocketAddr, UdpSocket};
6use std::str;
7use std::sync::mpsc::{self, TryRecvError};
8use std::thread;
9use std::time::Duration;
10
11#[derive(Debug, Default, Hash, Eq, PartialEq)]
12pub struct ProbeMatch {
13  urn: String,
14  name: String,
15  hardware: String,
16  location: String,
17  types: Vec<String>,
18  xaddrs: Vec<String>,
19  scopes: Vec<String>,
20}
21impl ProbeMatch {
22  pub fn urn(&self) -> &String {
23    &self.urn
24  }
25  pub fn name(&self) -> &String {
26    &self.name
27  }
28  pub fn hardware(&self) -> &String {
29    &self.hardware
30  }
31  pub fn location(&self) -> &String {
32    &self.location
33  }
34  pub fn types(&self) -> &Vec<String> {
35    &self.types
36  }
37  pub fn xaddrs(&self) -> &Vec<String> {
38    &self.xaddrs
39  }
40  pub fn scopes(&self) -> &Vec<String> {
41    &self.scopes
42  }
43}
44
45fn remove_namespace_prefix(st: &str) -> &str {
46  let parts: Vec<&str> = st.split(':').collect();
47  parts[parts.len() - 1]
48}
49
50fn read_message(socket: &UdpSocket) -> Result<String, io::Error> {
51  let mut buf: [u8; 65_535] = [0; 65_535];
52  // println!("Reading data");
53  let result = socket.recv(&mut buf);
54  result.map(move |_| str::from_utf8(&buf).unwrap().to_string())
55}
56
57fn parse_probe_match(xml: &str) -> Result<ProbeMatch, String> {
58  let mut reader = Reader::from_str(&xml);
59  reader.trim_text(true);
60  let mut buf = Vec::new();
61  loop {
62    match reader.read_event(&mut buf) {
63      Ok(Event::End(ref e)) => {
64        match e.name() {
65          b"SOAP-ENV:Header" => break,
66          _ => (),
67        };
68      }
69      Ok(Event::Eof) => return Err("Finished before header".to_string()),
70      Err(e) => panic!("Error at position {}: {:?}", reader.buffer_position(), e),
71      _ => (),
72    }
73  }
74  buf.clear();
75  let mut buf2 = Vec::new();
76  // Header ended
77  let mut stack = Vec::<String>::new();
78  let mut probe_match = ProbeMatch::default();
79
80  loop {
81    match reader.read_event(&mut buf2) {
82      Ok(Event::Start(ref e)) => {
83        let name = str::from_utf8(e.name()).unwrap().to_string();
84        // println!("Pushing {}", name);
85        stack.push(name);
86      }
87      Ok(Event::End(ref _e)) => {
88        let ended_tag = stack.pop().expect("Stack can't be empty, but it was.");
89        if ended_tag == "SOAP-ENV:Body" {
90          break;
91        }
92        // println!("Popped: {}", ended_tag);
93        // TODO: Verify what is popped is the tag that has ended
94        // (std::str::from_utf8(e.name()).unwrap());
95      }
96      Ok(Event::Text(e)) => {
97        let text = str::from_utf8(e.escaped())
98          .expect("UTF decode error")
99          .to_string();
100        let tag = stack.get(stack.len() - 1).expect("Stack can't be empty");
101        let tag = remove_namespace_prefix(tag);
102        match tag {
103          "Address" => probe_match.urn = text,
104          "Types" => {
105            probe_match.types = text
106              .split(' ')
107              .map(remove_namespace_prefix)
108              .map(String::from)
109              .collect()
110          }
111          "Scopes" => probe_match.scopes = text.split(' ').map(String::from).collect(),
112          "XAddrs" => probe_match.xaddrs = text.split(' ').map(String::from).collect(),
113          _ => {
114            // println!("Ignoring text {}", text);
115          }
116        };
117      }
118      Ok(Event::Eof) => break,
119      Err(e) => panic!("Error at position {}: {:?}", reader.buffer_position(), e),
120      _ => (),
121    }
122  }
123  for scope in &probe_match.scopes {
124    if scope.starts_with("onvif://www.onvif.org/hardware/") {
125      let parts: Vec<&str> = scope.split('/').collect();
126      probe_match.hardware = (parts[parts.len() - 1]).into();
127    } else if scope.starts_with("onvif://www.onvif.org/location/") {
128      let parts: Vec<&str> = scope.split('/').collect();
129      probe_match.location = (parts[parts.len() - 1]).into();
130    } else if scope.starts_with("onvif://www.onvif.org/name/") {
131      let parts: Vec<&str> = scope.split('/').collect();
132      probe_match.name = (parts[parts.len() - 1]).into();
133    }
134  }
135  Ok(probe_match)
136}
137
138pub fn start_probe(probe_duration: &Duration) -> Result<Vec<ProbeMatch>, io::Error> {
139  let multicast_addr: SocketAddr = "239.255.255.250:3702".parse().unwrap();
140  println!("Started probe");
141  let soap_tmpl = r#"
142  <?xml version="1.0" encoding="UTF-8"?>
143<s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope" 
144  xmlns:a="http://schemas.xmlsoap.org/ws/2004/08/addressing">
145  <s:Header>
146    <a:Action s:mustUnderstand="1">
147  http://schemas.xmlsoap.org/ws/2005/04/discovery/Probe</a:Action>
148    <a:MessageID>uuid:__uuid__</a:MessageID>
149    <a:ReplyTo>
150      <a:Address>http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous</a:Address>
151    </a:ReplyTo>
152    <a:To s:mustUnderstand="1">urn:schemas-xmlsoap-org:ws:2005:04:discovery</a:To>
153  </s:Header>
154  <s:Body>
155    <Probe xmlns="http://schemas.xmlsoap.org/ws/2005/04/discovery">
156      <d:Types xmlns:d="http://schemas.xmlsoap.org/ws/2005/04/discovery" 
157        xmlns:dp0="http://www.onvif.org/ver10/network/wsdl">
158  dp0:__type__</d:Types>
159    </Probe>
160  </s:Body>
161</s:Envelope>
162  "#;
163  let types = vec!["NetworkVideoTransmitter", "Device", "NetworkVideoDisplay"];
164  let soap_reqs: Vec<String> = types
165    .iter()
166    .map(|device_type| {
167      let s: &str = soap_tmpl.clone();
168      let s = s.replace("__type__", device_type);
169      let s = s.replace("__uuid__", "7c208633-8086-4a83-a9d4-b3fd8673b8f7"); //TODO: Replace hardcoded uuid with a generated one
170      s
171    })
172    .collect();
173  let all_interfaces = SocketAddr::from(([0, 0, 0, 0], 0));
174  let socket = UdpSocket::bind(&all_interfaces).expect("Could not bind to udp socket");
175  let read_socket = socket.try_clone().unwrap();
176  read_socket
177    .set_read_timeout(Some(Duration::from_secs(1)))
178    .expect("set_read_timeout call failed");
179  let (thread_stop_tx, thread_stop_rx) = mpsc::channel();
180  let (devices_tx, devices_rx) = mpsc::channel();
181  let _read_thread_handle = thread::spawn(move || {
182    loop {
183      if let Ok(message) = read_message(&read_socket) {
184        devices_tx
185          .send(parse_probe_match(&message))
186          .expect("Could not send found device over channel");
187      }
188      match thread_stop_rx.try_recv() {
189        Ok(_) | Err(TryRecvError::Disconnected) => {
190          println!("Stopping receive thread");
191          break;
192        }
193        Err(TryRecvError::Empty) => {}
194      }
195      //thread::sleep(Duration::from_millis(100));
196    }
197  });
198  let _broadcast_thread = thread::spawn(move || {
199    for _ in 1..5 {
200      for soap_req in &soap_reqs {
201        socket
202          .send_to(soap_req.as_bytes(), &multicast_addr)
203          .expect("Could not send req");
204        thread::sleep(Duration::from_millis(100));
205      }
206    }
207  });
208  //broadcast_thread.join();
209  //read_thread_handle.join();
210  let mut found_devices = HashSet::new();
211  for _ in 1..10 {
212    let dev = devices_rx.recv().unwrap().unwrap();
213    found_devices.insert(dev);
214  }
215  thread::sleep(*probe_duration);
216  let _ = thread_stop_tx.send(());
217
218  Ok(found_devices.into_iter().collect())
219}
220
221#[cfg(test)]
222mod tests {
223  #[test]
224  fn it_works() {
225    assert_eq!(2 + 2, 4);
226  }
227
228  #[test]
229  fn probe_xml_parse() {
230    use super::*;
231    use std::fs;
232    use std::io::prelude::*;
233
234    let mut fl = fs::File::open("src/resources/probe-discovery-response.xml")
235      .expect("Could not find probe discovery response xml file");
236    let mut probe_discovery_response = String::new();
237    fl.read_to_string(&mut probe_discovery_response)
238      .expect("something went wrong reading the file");
239
240    let probe_match = parse_probe_match(&probe_discovery_response).unwrap();
241    let expected = ProbeMatch {
242      urn: "urn:uuid:a91b83ca-3388-7688-99aa-101806a776fb".into(),
243      name: "NVT".into(),
244      hardware: "IPC-model".into(),
245      location: "china".into(),
246      types: vec!["NetworkVideoTransmitter".into()],
247      xaddrs: vec!["http://192.168.1.70:8899/onvif/device_service".into()],
248      scopes: vec![
249        "onvif://www.onvif.org/type/video_encoder",
250        "onvif://www.onvif.org/type/audio_encoder",
251        "onvif://www.onvif.org/hardware/IPC-model",
252        "onvif://www.onvif.org/location/country/china",
253        "onvif://www.onvif.org/name/NVT",
254        "onvif://www.onvif.org/Profile/Streaming",
255      ].into_iter()
256        .map(String::from)
257        .collect(),
258    };
259    assert_eq!(expected, probe_match);
260    assert_eq!(expected.urn(), probe_match.urn());
261  }
262}