1use crate::Error;
2use instant_xml::FromXml;
3use reqwest::{Method, Response, Url};
4use std::net::IpAddr;
5use tokio::io::AsyncReadExt;
6use tokio::net::{TcpListener, TcpStream};
7use tokio::sync::mpsc::{channel, Receiver, Sender};
8use url::Host;
9
10const UPNP_DEVICE: &str = "urn:schemas-upnp-org:device-1-0";
11
12#[derive(Debug, FromXml, Clone)]
13#[xml(rename = "device", ns(UPNP_DEVICE))]
14pub struct DeviceSpec {
15 #[xml(rename = "friendlyName")]
16 pub friendly_name: String,
17 #[xml(rename = "deviceType")]
18 pub device_type: String,
19 #[xml(rename = "modelNumber")]
20 pub model_number: Option<String>,
21 #[xml(rename = "modelDescription")]
22 pub model_description: Option<String>,
23 #[xml(rename = "modelName")]
24 pub model_name: Option<String>,
25 #[xml(rename = "SSLPort")]
26 pub ssl_port: Option<u16>,
27
28 service_list: Option<ServiceList>,
29 device_list: Option<DeviceList>,
30}
31
32impl DeviceSpec {
33 pub fn parse_xml(xml: &str) -> crate::Result<Self> {
34 let spec: Root = instant_xml::from_str(xml).map_err(|error| crate::Error::XmlParse {
35 error,
36 text: xml.to_string(),
37 })?;
38 Ok(spec.device)
39 }
40
41 pub fn services(&self) -> &[Service] {
42 match &self.service_list {
43 None => &[],
44 Some(list) => &list.services,
45 }
46 }
47
48 pub fn get_service(&self, service_type: &str) -> Option<&Service> {
49 if let Some(s) = self
50 .services()
51 .iter()
52 .find(|s| *s.service_type == *service_type)
53 {
54 return Some(s);
55 }
56 if let Some(dev) = &self.device_list {
57 for d in dev.devices.iter() {
58 if let Some(s) = d.get_service(service_type) {
59 return Some(s);
60 }
61 }
62 }
63
64 None
65 }
66}
67
68#[derive(Debug, FromXml, Clone)]
69#[xml(rename = "serviceList", ns(UPNP_DEVICE))]
70struct ServiceList {
71 pub services: Vec<Service>,
72}
73
74#[derive(Debug, FromXml, Clone)]
75#[xml(rename = "deviceList", ns(UPNP_DEVICE))]
76struct DeviceList {
77 pub devices: Vec<DeviceSpec>,
78}
79
80#[derive(Debug, FromXml)]
81#[xml(rename = "root", ns(UPNP_DEVICE))]
82struct Root {
83 device: DeviceSpec,
84}
85
86#[derive(Debug, FromXml, Clone)]
87#[xml(rename = "service", ns(UPNP_DEVICE))]
88pub struct Service {
89 #[xml(rename = "serviceType")]
90 pub service_type: String,
91 #[xml(rename = "serviceId")]
92 pub service_id: String,
93 #[xml(rename = "controlURL")]
94 pub control_url: String,
95 #[xml(rename = "eventSubURL")]
96 pub event_sub_url: String,
97 #[xml(rename = "SCPDURL")]
98 pub scpd_url: String,
99}
100
101impl Service {
102 fn join_url(&self, base_url: &Url, url: &str) -> Url {
103 match base_url.join(url) {
104 Ok(url) => url,
105 Err(err) => {
106 log::error!("Cannot join {base_url} with {url}: {err:#}");
107 url.parse().expect("URL to be valid")
108 }
109 }
110 }
111
112 pub fn control_url(&self, url: &Url) -> Url {
113 self.join_url(url, &self.control_url)
114 }
115
116 pub fn event_sub_url(&self, url: &Url) -> Url {
117 self.join_url(url, &self.event_sub_url)
118 }
119
120 pub fn scpd_url(&self, url: &Url) -> Url {
122 self.join_url(url, &self.scpd_url)
123 }
124
125 pub async fn subscribe<T: DecodeXml + 'static>(
126 &self,
127 url: &Url,
128 ) -> crate::Result<EventStream<T>> {
129 let sub_url = self.event_sub_url(url);
130
131 let host = url
134 .host()
135 .ok_or_else(|| Error::NoIpInDeviceUrl(url.clone()))?;
136 let ip: IpAddr = match host {
137 Host::Domain(_s) => return Err(Error::NoIpInDeviceUrl(url.clone())),
138 Host::Ipv4(v4) => v4.into(),
139 Host::Ipv6(v6) => v6.into(),
140 };
141
142 let probe = TcpStream::connect((ip, url.port().unwrap_or(80))).await?;
143 let listener = TcpListener::bind((probe.local_addr()?.ip(), 0)).await?;
144 let local = listener.local_addr()?;
145
146 let response = reqwest::Client::new()
147 .request(
148 Method::from_bytes(b"SUBSCRIBE").expect("SUBSCRIBE to be a valid method"),
149 sub_url.clone(),
150 )
151 .header("CALLBACK", format!("<http://{local}>"))
152 .header("NT", "upnp:event")
153 .header("TIMEOUT", format!("Second-{SUBSCRIPTION_TIMEOUT}"))
154 .send()
155 .await?;
156
157 let response = Error::check_response(response).await?;
158
159 log::trace!("response: {response:?}");
160
161 let sid = response
162 .headers()
163 .get("sid")
164 .ok_or(Error::SubscriptionFailedNoSid)?
165 .to_str()
166 .map_err(|_| Error::SubscriptionFailedNoSid)?
167 .to_string();
168
169 let body = response.text().await?;
170 log::trace!("Got response: {body}");
171
172 let (tx, rx) = channel(16);
173 {
174 let sid = sid.clone();
175 let sub_url = sub_url.clone();
176 tokio::spawn(async move { process_subscription(listener, tx, sid, sub_url).await });
177 }
178
179 Ok(EventStream { sid, rx, sub_url })
180 }
181}
182
183const SUBSCRIPTION_TIMEOUT: u64 = 60;
184
185async fn process_subscription<T: DecodeXml + 'static>(
186 listener: TcpListener,
187 tx: Sender<SubscriptionMessage<T>>,
188 sid: String,
189 sub_url: Url,
190) -> crate::Result<()> {
191 let mut deadline =
192 tokio::time::Instant::now() + tokio::time::Duration::from_secs(SUBSCRIPTION_TIMEOUT - 10);
193 loop {
194 match tokio::time::timeout_at(deadline, listener.accept()).await {
195 Ok(Ok((client, _addr))) => {
196 let tx = tx.clone();
197 tokio::spawn(async move { handle_subscription_request(client, tx).await });
198 }
199 Ok(Err(err)) => {
200 log::error!("accept failed: {err:#}");
201 return Ok(());
202 }
203 Err(_) => {
204 log::debug!("time to renew!");
205 let renew = match tx.try_send(SubscriptionMessage::Ping) {
207 Ok(_) | Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => true,
208 Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
209 false
211 }
212 };
213
214 renew_or_cancel_sub(&sub_url, renew, &sid).await?;
215
216 if renew {
217 deadline = tokio::time::Instant::now()
218 + tokio::time::Duration::from_secs(SUBSCRIPTION_TIMEOUT - 10);
219 } else {
220 return Ok(());
221 }
222 }
223 }
224 }
225}
226
227async fn handle_subscription_request<T: DecodeXml>(
228 mut client: TcpStream,
229 tx: Sender<SubscriptionMessage<T>>,
230) -> crate::Result<()> {
231 let mut reqbuf = vec![];
232 let mut buf = [0u8; 4096];
233
234 while let Ok(len) = client.read(&mut buf).await {
235 reqbuf.extend_from_slice(&buf[0..len]);
236
237 let mut headers = [httparse::EMPTY_HEADER; 16];
238 let mut req = httparse::Request::new(&mut headers);
239
240 match req.parse(&reqbuf) {
241 Err(err) => {
242 log::error!("Error parsing request: {err:#}");
243 break;
244 }
245 Ok(httparse::Status::Partial) => continue,
246 Ok(httparse::Status::Complete(body_start)) => {
247 if let Some(cl) = req
250 .headers
251 .iter()
252 .find(|h| h.name.eq_ignore_ascii_case("Content-Length"))
253 {
254 match std::str::from_utf8(cl.value)
255 .ok()
256 .and_then(|s| s.parse::<usize>().ok())
257 {
258 Some(cl) => {
259 let avail = reqbuf.len() - body_start;
260 if avail < cl {
261 continue;
263 }
264 }
265 None => {
266 log::error!("Invalid header: {cl:?}");
267 break;
268 }
269 }
270 }
271 let body = String::from_utf8_lossy(&reqbuf[body_start..]).to_string();
272
273 log::trace!("{req:#?}");
274 log::trace!("{body}");
275
276 match T::decode_xml(&body) {
277 Ok(event) => {
278 if let Err(err) = tx.send(SubscriptionMessage::Event(event)).await {
279 log::error!("Channel is dead {err:#}");
280 return Ok(());
281 }
282 }
283 Err(err) => {
284 log::error!("Failed to parse PropertySet: {err:#} from {body}");
285 }
286 }
287
288 break;
289 }
290 }
291 }
292 Ok(())
293}
294
295async fn renew_or_cancel_sub(sub_url: &Url, subscribe: bool, sid: &str) -> crate::Result<Response> {
296 let mut request = reqwest::Client::new()
297 .request(
298 Method::from_bytes(if subscribe {
299 b"SUBSCRIBE"
300 } else {
301 b"UNSUBSCRIBE"
302 })
303 .expect("SUBSCRIBE to be a valid method"),
304 sub_url.clone(),
305 )
306 .header("SID", sid);
307 if subscribe {
308 request = request.header("TIMEOUT", format!("Second-{SUBSCRIPTION_TIMEOUT}"));
309 }
310 let response = request.send().await?;
311
312 let response = Error::check_response(response).await?;
313
314 Ok(response)
315}
316
317enum SubscriptionMessage<T> {
318 Ping,
319 Event(T),
320}
321
322pub trait DecodeXml: Send {
325 fn decode_xml(xml: &str) -> crate::Result<Self>
326 where
327 Self: Sized;
328}
329
330pub trait EncodeXml {
332 fn encode_xml(&self) -> std::result::Result<String, instant_xml::Error>;
333}
334
335pub struct EventStream<T: DecodeXml> {
345 rx: Receiver<SubscriptionMessage<T>>,
346 sid: String,
347 sub_url: Url,
348}
349
350impl<T: DecodeXml> EventStream<T> {
351 pub async fn recv(&mut self) -> Option<T> {
353 loop {
354 let msg = self.rx.recv().await?;
355 match msg {
356 SubscriptionMessage::Ping => {}
357 SubscriptionMessage::Event(v) => {
358 return Some(v);
359 }
360 }
361 }
362 }
363
364 pub async fn unsubscribe(self) {
366 renew_or_cancel_sub(&self.sub_url, false, &self.sid)
367 .await
368 .ok();
369 }
370}
371
372pub(crate) const UPNP_EVENT: &str = "urn:schemas-upnp-org:event-1-0";
373
374#[cfg(test)]
375mod test {
376 use super::*;
377
378 #[test]
379 fn parse_device_spec() {
380 let spec_text = include_str!("../data/device_spec.xml");
381 let spec: Root = instant_xml::from_str(&spec_text).unwrap();
382 k9::snapshot!(
383 spec,
384 r#"
385Root {
386 device: DeviceSpec {
387 friendly_name: "192.168.1.157 - Sonos Port - RINCON_XXX",
388 device_type: "urn:schemas-upnp-org:device:ZonePlayer:1",
389 model_number: Some(
390 "S23",
391 ),
392 model_description: Some(
393 "Sonos Port",
394 ),
395 model_name: Some(
396 "Sonos Port",
397 ),
398 ssl_port: Some(
399 1443,
400 ),
401 service_list: Some(
402 ServiceList {
403 services: [
404 Service {
405 service_type: "urn:schemas-upnp-org:service:AlarmClock:1",
406 service_id: "urn:upnp-org:serviceId:AlarmClock",
407 control_url: "/AlarmClock/Control",
408 event_sub_url: "/AlarmClock/Event",
409 scpd_url: "/xml/AlarmClock1.xml",
410 },
411 Service {
412 service_type: "urn:schemas-upnp-org:service:MusicServices:1",
413 service_id: "urn:upnp-org:serviceId:MusicServices",
414 control_url: "/MusicServices/Control",
415 event_sub_url: "/MusicServices/Event",
416 scpd_url: "/xml/MusicServices1.xml",
417 },
418 Service {
419 service_type: "urn:schemas-upnp-org:service:AudioIn:1",
420 service_id: "urn:upnp-org:serviceId:AudioIn",
421 control_url: "/AudioIn/Control",
422 event_sub_url: "/AudioIn/Event",
423 scpd_url: "/xml/AudioIn1.xml",
424 },
425 Service {
426 service_type: "urn:schemas-upnp-org:service:DeviceProperties:1",
427 service_id: "urn:upnp-org:serviceId:DeviceProperties",
428 control_url: "/DeviceProperties/Control",
429 event_sub_url: "/DeviceProperties/Event",
430 scpd_url: "/xml/DeviceProperties1.xml",
431 },
432 Service {
433 service_type: "urn:schemas-upnp-org:service:SystemProperties:1",
434 service_id: "urn:upnp-org:serviceId:SystemProperties",
435 control_url: "/SystemProperties/Control",
436 event_sub_url: "/SystemProperties/Event",
437 scpd_url: "/xml/SystemProperties1.xml",
438 },
439 Service {
440 service_type: "urn:schemas-upnp-org:service:ZoneGroupTopology:1",
441 service_id: "urn:upnp-org:serviceId:ZoneGroupTopology",
442 control_url: "/ZoneGroupTopology/Control",
443 event_sub_url: "/ZoneGroupTopology/Event",
444 scpd_url: "/xml/ZoneGroupTopology1.xml",
445 },
446 Service {
447 service_type: "urn:schemas-upnp-org:service:GroupManagement:1",
448 service_id: "urn:upnp-org:serviceId:GroupManagement",
449 control_url: "/GroupManagement/Control",
450 event_sub_url: "/GroupManagement/Event",
451 scpd_url: "/xml/GroupManagement1.xml",
452 },
453 Service {
454 service_type: "urn:schemas-tencent-com:service:QPlay:1",
455 service_id: "urn:tencent-com:serviceId:QPlay",
456 control_url: "/QPlay/Control",
457 event_sub_url: "/QPlay/Event",
458 scpd_url: "/xml/QPlay1.xml",
459 },
460 ],
461 },
462 ),
463 device_list: Some(
464 DeviceList {
465 devices: [
466 DeviceSpec {
467 friendly_name: "192.168.1.157 - Sonos Port Media Server - RINCON_XXX",
468 device_type: "urn:schemas-upnp-org:device:MediaServer:1",
469 model_number: Some(
470 "S23",
471 ),
472 model_description: Some(
473 "Sonos Port Media Server",
474 ),
475 model_name: Some(
476 "Sonos Port",
477 ),
478 ssl_port: None,
479 service_list: Some(
480 ServiceList {
481 services: [
482 Service {
483 service_type: "urn:schemas-upnp-org:service:ContentDirectory:1",
484 service_id: "urn:upnp-org:serviceId:ContentDirectory",
485 control_url: "/MediaServer/ContentDirectory/Control",
486 event_sub_url: "/MediaServer/ContentDirectory/Event",
487 scpd_url: "/xml/ContentDirectory1.xml",
488 },
489 Service {
490 service_type: "urn:schemas-upnp-org:service:ConnectionManager:1",
491 service_id: "urn:upnp-org:serviceId:ConnectionManager",
492 control_url: "/MediaServer/ConnectionManager/Control",
493 event_sub_url: "/MediaServer/ConnectionManager/Event",
494 scpd_url: "/xml/ConnectionManager1.xml",
495 },
496 ],
497 },
498 ),
499 device_list: None,
500 },
501 DeviceSpec {
502 friendly_name: "Some Room - Sonos Port Media Renderer - RINCON_XXX",
503 device_type: "urn:schemas-upnp-org:device:MediaRenderer:1",
504 model_number: Some(
505 "S23",
506 ),
507 model_description: Some(
508 "Sonos Port Media Renderer",
509 ),
510 model_name: Some(
511 "Sonos Port",
512 ),
513 ssl_port: None,
514 service_list: Some(
515 ServiceList {
516 services: [
517 Service {
518 service_type: "urn:schemas-upnp-org:service:RenderingControl:1",
519 service_id: "urn:upnp-org:serviceId:RenderingControl",
520 control_url: "/MediaRenderer/RenderingControl/Control",
521 event_sub_url: "/MediaRenderer/RenderingControl/Event",
522 scpd_url: "/xml/RenderingControl1.xml",
523 },
524 Service {
525 service_type: "urn:schemas-upnp-org:service:ConnectionManager:1",
526 service_id: "urn:upnp-org:serviceId:ConnectionManager",
527 control_url: "/MediaRenderer/ConnectionManager/Control",
528 event_sub_url: "/MediaRenderer/ConnectionManager/Event",
529 scpd_url: "/xml/ConnectionManager1.xml",
530 },
531 Service {
532 service_type: "urn:schemas-upnp-org:service:AVTransport:1",
533 service_id: "urn:upnp-org:serviceId:AVTransport",
534 control_url: "/MediaRenderer/AVTransport/Control",
535 event_sub_url: "/MediaRenderer/AVTransport/Event",
536 scpd_url: "/xml/AVTransport1.xml",
537 },
538 Service {
539 service_type: "urn:schemas-sonos-com:service:Queue:1",
540 service_id: "urn:sonos-com:serviceId:Queue",
541 control_url: "/MediaRenderer/Queue/Control",
542 event_sub_url: "/MediaRenderer/Queue/Event",
543 scpd_url: "/xml/Queue1.xml",
544 },
545 Service {
546 service_type: "urn:schemas-upnp-org:service:GroupRenderingControl:1",
547 service_id: "urn:upnp-org:serviceId:GroupRenderingControl",
548 control_url: "/MediaRenderer/GroupRenderingControl/Control",
549 event_sub_url: "/MediaRenderer/GroupRenderingControl/Event",
550 scpd_url: "/xml/GroupRenderingControl1.xml",
551 },
552 Service {
553 service_type: "urn:schemas-upnp-org:service:VirtualLineIn:1",
554 service_id: "urn:upnp-org:serviceId:VirtualLineIn",
555 control_url: "/MediaRenderer/VirtualLineIn/Control",
556 event_sub_url: "/MediaRenderer/VirtualLineIn/Event",
557 scpd_url: "/xml/VirtualLineIn1.xml",
558 },
559 ],
560 },
561 ),
562 device_list: None,
563 },
564 ],
565 },
566 ),
567 },
568}
569"#
570 );
571 }
572}