sonos/
upnp.rs

1use crate::Error;
2use instant_xml::FromXml;
3use reqwest::{Method, Response, Url};
4use std::net::IpAddr;
5use tokio::io::AsyncReadExt;
6use tokio::net::{TcpListener, TcpStream};
7use tokio::sync::mpsc::{channel, Receiver, Sender};
8use url::Host;
9
10const UPNP_DEVICE: &str = "urn:schemas-upnp-org:device-1-0";
11
12#[derive(Debug, FromXml, Clone)]
13#[xml(rename = "device", ns(UPNP_DEVICE))]
14pub struct DeviceSpec {
15    #[xml(rename = "friendlyName")]
16    pub friendly_name: String,
17    #[xml(rename = "deviceType")]
18    pub device_type: String,
19    #[xml(rename = "modelNumber")]
20    pub model_number: Option<String>,
21    #[xml(rename = "modelDescription")]
22    pub model_description: Option<String>,
23    #[xml(rename = "modelName")]
24    pub model_name: Option<String>,
25    #[xml(rename = "SSLPort")]
26    pub ssl_port: Option<u16>,
27
28    service_list: Option<ServiceList>,
29    device_list: Option<DeviceList>,
30}
31
32impl DeviceSpec {
33    pub fn parse_xml(xml: &str) -> crate::Result<Self> {
34        let spec: Root = instant_xml::from_str(xml).map_err(|error| crate::Error::XmlParse {
35            error,
36            text: xml.to_string(),
37        })?;
38        Ok(spec.device)
39    }
40
41    pub fn services(&self) -> &[Service] {
42        match &self.service_list {
43            None => &[],
44            Some(list) => &list.services,
45        }
46    }
47
48    pub fn get_service(&self, service_type: &str) -> Option<&Service> {
49        if let Some(s) = self
50            .services()
51            .iter()
52            .find(|s| *s.service_type == *service_type)
53        {
54            return Some(s);
55        }
56        if let Some(dev) = &self.device_list {
57            for d in dev.devices.iter() {
58                if let Some(s) = d.get_service(service_type) {
59                    return Some(s);
60                }
61            }
62        }
63
64        None
65    }
66}
67
68#[derive(Debug, FromXml, Clone)]
69#[xml(rename = "serviceList", ns(UPNP_DEVICE))]
70struct ServiceList {
71    pub services: Vec<Service>,
72}
73
74#[derive(Debug, FromXml, Clone)]
75#[xml(rename = "deviceList", ns(UPNP_DEVICE))]
76struct DeviceList {
77    pub devices: Vec<DeviceSpec>,
78}
79
80#[derive(Debug, FromXml)]
81#[xml(rename = "root", ns(UPNP_DEVICE))]
82struct Root {
83    device: DeviceSpec,
84}
85
86#[derive(Debug, FromXml, Clone)]
87#[xml(rename = "service", ns(UPNP_DEVICE))]
88pub struct Service {
89    #[xml(rename = "serviceType")]
90    pub service_type: String,
91    #[xml(rename = "serviceId")]
92    pub service_id: String,
93    #[xml(rename = "controlURL")]
94    pub control_url: String,
95    #[xml(rename = "eventSubURL")]
96    pub event_sub_url: String,
97    #[xml(rename = "SCPDURL")]
98    pub scpd_url: String,
99}
100
101impl Service {
102    fn join_url(&self, base_url: &Url, url: &str) -> Url {
103        match base_url.join(url) {
104            Ok(url) => url,
105            Err(err) => {
106                log::error!("Cannot join {base_url} with {url}: {err:#}");
107                url.parse().expect("URL to be valid")
108            }
109        }
110    }
111
112    pub fn control_url(&self, url: &Url) -> Url {
113        self.join_url(url, &self.control_url)
114    }
115
116    pub fn event_sub_url(&self, url: &Url) -> Url {
117        self.join_url(url, &self.event_sub_url)
118    }
119
120    /// The URL for the Service Control Protocol Description
121    pub fn scpd_url(&self, url: &Url) -> Url {
122        self.join_url(url, &self.scpd_url)
123    }
124
125    pub async fn subscribe<T: DecodeXml + 'static>(
126        &self,
127        url: &Url,
128    ) -> crate::Result<EventStream<T>> {
129        let sub_url = self.event_sub_url(url);
130
131        // Figure out an appropriate local address to talk to
132        // this device
133        let host = url
134            .host()
135            .ok_or_else(|| Error::NoIpInDeviceUrl(url.clone()))?;
136        let ip: IpAddr = match host {
137            Host::Domain(_s) => return Err(Error::NoIpInDeviceUrl(url.clone())),
138            Host::Ipv4(v4) => v4.into(),
139            Host::Ipv6(v6) => v6.into(),
140        };
141
142        let probe = TcpStream::connect((ip, url.port().unwrap_or(80))).await?;
143        let listener = TcpListener::bind((probe.local_addr()?.ip(), 0)).await?;
144        let local = listener.local_addr()?;
145
146        let response = reqwest::Client::new()
147            .request(
148                Method::from_bytes(b"SUBSCRIBE").expect("SUBSCRIBE to be a valid method"),
149                sub_url.clone(),
150            )
151            .header("CALLBACK", format!("<http://{local}>"))
152            .header("NT", "upnp:event")
153            .header("TIMEOUT", format!("Second-{SUBSCRIPTION_TIMEOUT}"))
154            .send()
155            .await?;
156
157        let response = Error::check_response(response).await?;
158
159        log::trace!("response: {response:?}");
160
161        let sid = response
162            .headers()
163            .get("sid")
164            .ok_or(Error::SubscriptionFailedNoSid)?
165            .to_str()
166            .map_err(|_| Error::SubscriptionFailedNoSid)?
167            .to_string();
168
169        let body = response.text().await?;
170        log::trace!("Got response: {body}");
171
172        let (tx, rx) = channel(16);
173        {
174            let sid = sid.clone();
175            let sub_url = sub_url.clone();
176            tokio::spawn(async move { process_subscription(listener, tx, sid, sub_url).await });
177        }
178
179        Ok(EventStream { sid, rx, sub_url })
180    }
181}
182
183const SUBSCRIPTION_TIMEOUT: u64 = 60;
184
185async fn process_subscription<T: DecodeXml + 'static>(
186    listener: TcpListener,
187    tx: Sender<SubscriptionMessage<T>>,
188    sid: String,
189    sub_url: Url,
190) -> crate::Result<()> {
191    let mut deadline =
192        tokio::time::Instant::now() + tokio::time::Duration::from_secs(SUBSCRIPTION_TIMEOUT - 10);
193    loop {
194        match tokio::time::timeout_at(deadline, listener.accept()).await {
195            Ok(Ok((client, _addr))) => {
196                let tx = tx.clone();
197                tokio::spawn(async move { handle_subscription_request(client, tx).await });
198            }
199            Ok(Err(err)) => {
200                log::error!("accept failed: {err:#}");
201                return Ok(());
202            }
203            Err(_) => {
204                log::debug!("time to renew!");
205                // Time to renew subscription
206                let renew = match tx.try_send(SubscriptionMessage::Ping) {
207                    Ok(_) | Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => true,
208                    Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
209                        // It's dead; don't bother renewing
210                        false
211                    }
212                };
213
214                renew_or_cancel_sub(&sub_url, renew, &sid).await?;
215
216                if renew {
217                    deadline = tokio::time::Instant::now()
218                        + tokio::time::Duration::from_secs(SUBSCRIPTION_TIMEOUT - 10);
219                } else {
220                    return Ok(());
221                }
222            }
223        }
224    }
225}
226
227async fn handle_subscription_request<T: DecodeXml>(
228    mut client: TcpStream,
229    tx: Sender<SubscriptionMessage<T>>,
230) -> crate::Result<()> {
231    let mut reqbuf = vec![];
232    let mut buf = [0u8; 4096];
233
234    while let Ok(len) = client.read(&mut buf).await {
235        reqbuf.extend_from_slice(&buf[0..len]);
236
237        let mut headers = [httparse::EMPTY_HEADER; 16];
238        let mut req = httparse::Request::new(&mut headers);
239
240        match req.parse(&reqbuf) {
241            Err(err) => {
242                log::error!("Error parsing request: {err:#}");
243                break;
244            }
245            Ok(httparse::Status::Partial) => continue,
246            Ok(httparse::Status::Complete(body_start)) => {
247                // It's only *maybe* complete; check the content-length
248                // vs. the data in the buffer
249                if let Some(cl) = req
250                    .headers
251                    .iter()
252                    .find(|h| h.name.eq_ignore_ascii_case("Content-Length"))
253                {
254                    match std::str::from_utf8(cl.value)
255                        .ok()
256                        .and_then(|s| s.parse::<usize>().ok())
257                    {
258                        Some(cl) => {
259                            let avail = reqbuf.len() - body_start;
260                            if avail < cl {
261                                // We need more data
262                                continue;
263                            }
264                        }
265                        None => {
266                            log::error!("Invalid header: {cl:?}");
267                            break;
268                        }
269                    }
270                }
271                let body = String::from_utf8_lossy(&reqbuf[body_start..]).to_string();
272
273                log::trace!("{req:#?}");
274                log::trace!("{body}");
275
276                match T::decode_xml(&body) {
277                    Ok(event) => {
278                        if let Err(err) = tx.send(SubscriptionMessage::Event(event)).await {
279                            log::error!("Channel is dead {err:#}");
280                            return Ok(());
281                        }
282                    }
283                    Err(err) => {
284                        log::error!("Failed to parse PropertySet: {err:#} from {body}");
285                    }
286                }
287
288                break;
289            }
290        }
291    }
292    Ok(())
293}
294
295async fn renew_or_cancel_sub(sub_url: &Url, subscribe: bool, sid: &str) -> crate::Result<Response> {
296    let mut request = reqwest::Client::new()
297        .request(
298            Method::from_bytes(if subscribe {
299                b"SUBSCRIBE"
300            } else {
301                b"UNSUBSCRIBE"
302            })
303            .expect("SUBSCRIBE to be a valid method"),
304            sub_url.clone(),
305        )
306        .header("SID", sid);
307    if subscribe {
308        request = request.header("TIMEOUT", format!("Second-{SUBSCRIPTION_TIMEOUT}"));
309    }
310    let response = request.send().await?;
311
312    let response = Error::check_response(response).await?;
313
314    Ok(response)
315}
316
317enum SubscriptionMessage<T> {
318    Ping,
319    Event(T),
320}
321
322/// A helper trait for parsing a uPNP event stream into
323/// a more ergonomic Rust type
324pub trait DecodeXml: Send {
325    fn decode_xml(xml: &str) -> crate::Result<Self>
326    where
327        Self: Sized;
328}
329
330/// A helper trait for encoding types into an XML representation
331pub trait EncodeXml {
332    fn encode_xml(&self) -> std::result::Result<String, instant_xml::Error>;
333}
334
335/// Manages a live subscription to an event stream for a service.
336/// While this object is live, the event stream will be renewed
337/// every minute.
338/// The stream isn't automatically cancelled on Drop because there
339/// is no async-Drop, but you can call the `unsubscribe` method
340/// to explicitly cancel it.
341/// The stream dispatching machinery has liveness checking that will ping
342/// the internal receiver and will cancel the subscription after about
343/// a minute or so of the EventStream being dropped.
344pub struct EventStream<T: DecodeXml> {
345    rx: Receiver<SubscriptionMessage<T>>,
346    sid: String,
347    sub_url: Url,
348}
349
350impl<T: DecodeXml> EventStream<T> {
351    /// Receives the next event from the stream
352    pub async fn recv(&mut self) -> Option<T> {
353        loop {
354            let msg = self.rx.recv().await?;
355            match msg {
356                SubscriptionMessage::Ping => {}
357                SubscriptionMessage::Event(v) => {
358                    return Some(v);
359                }
360            }
361        }
362    }
363
364    /// Explicitly cancel the subscription
365    pub async fn unsubscribe(self) {
366        renew_or_cancel_sub(&self.sub_url, false, &self.sid)
367            .await
368            .ok();
369    }
370}
371
372pub(crate) const UPNP_EVENT: &str = "urn:schemas-upnp-org:event-1-0";
373
374#[cfg(test)]
375mod test {
376    use super::*;
377
378    #[test]
379    fn parse_device_spec() {
380        let spec_text = include_str!("../data/device_spec.xml");
381        let spec: Root = instant_xml::from_str(&spec_text).unwrap();
382        k9::snapshot!(
383            spec,
384            r#"
385Root {
386    device: DeviceSpec {
387        friendly_name: "192.168.1.157 - Sonos Port - RINCON_XXX",
388        device_type: "urn:schemas-upnp-org:device:ZonePlayer:1",
389        model_number: Some(
390            "S23",
391        ),
392        model_description: Some(
393            "Sonos Port",
394        ),
395        model_name: Some(
396            "Sonos Port",
397        ),
398        ssl_port: Some(
399            1443,
400        ),
401        service_list: Some(
402            ServiceList {
403                services: [
404                    Service {
405                        service_type: "urn:schemas-upnp-org:service:AlarmClock:1",
406                        service_id: "urn:upnp-org:serviceId:AlarmClock",
407                        control_url: "/AlarmClock/Control",
408                        event_sub_url: "/AlarmClock/Event",
409                        scpd_url: "/xml/AlarmClock1.xml",
410                    },
411                    Service {
412                        service_type: "urn:schemas-upnp-org:service:MusicServices:1",
413                        service_id: "urn:upnp-org:serviceId:MusicServices",
414                        control_url: "/MusicServices/Control",
415                        event_sub_url: "/MusicServices/Event",
416                        scpd_url: "/xml/MusicServices1.xml",
417                    },
418                    Service {
419                        service_type: "urn:schemas-upnp-org:service:AudioIn:1",
420                        service_id: "urn:upnp-org:serviceId:AudioIn",
421                        control_url: "/AudioIn/Control",
422                        event_sub_url: "/AudioIn/Event",
423                        scpd_url: "/xml/AudioIn1.xml",
424                    },
425                    Service {
426                        service_type: "urn:schemas-upnp-org:service:DeviceProperties:1",
427                        service_id: "urn:upnp-org:serviceId:DeviceProperties",
428                        control_url: "/DeviceProperties/Control",
429                        event_sub_url: "/DeviceProperties/Event",
430                        scpd_url: "/xml/DeviceProperties1.xml",
431                    },
432                    Service {
433                        service_type: "urn:schemas-upnp-org:service:SystemProperties:1",
434                        service_id: "urn:upnp-org:serviceId:SystemProperties",
435                        control_url: "/SystemProperties/Control",
436                        event_sub_url: "/SystemProperties/Event",
437                        scpd_url: "/xml/SystemProperties1.xml",
438                    },
439                    Service {
440                        service_type: "urn:schemas-upnp-org:service:ZoneGroupTopology:1",
441                        service_id: "urn:upnp-org:serviceId:ZoneGroupTopology",
442                        control_url: "/ZoneGroupTopology/Control",
443                        event_sub_url: "/ZoneGroupTopology/Event",
444                        scpd_url: "/xml/ZoneGroupTopology1.xml",
445                    },
446                    Service {
447                        service_type: "urn:schemas-upnp-org:service:GroupManagement:1",
448                        service_id: "urn:upnp-org:serviceId:GroupManagement",
449                        control_url: "/GroupManagement/Control",
450                        event_sub_url: "/GroupManagement/Event",
451                        scpd_url: "/xml/GroupManagement1.xml",
452                    },
453                    Service {
454                        service_type: "urn:schemas-tencent-com:service:QPlay:1",
455                        service_id: "urn:tencent-com:serviceId:QPlay",
456                        control_url: "/QPlay/Control",
457                        event_sub_url: "/QPlay/Event",
458                        scpd_url: "/xml/QPlay1.xml",
459                    },
460                ],
461            },
462        ),
463        device_list: Some(
464            DeviceList {
465                devices: [
466                    DeviceSpec {
467                        friendly_name: "192.168.1.157 - Sonos Port Media Server - RINCON_XXX",
468                        device_type: "urn:schemas-upnp-org:device:MediaServer:1",
469                        model_number: Some(
470                            "S23",
471                        ),
472                        model_description: Some(
473                            "Sonos Port Media Server",
474                        ),
475                        model_name: Some(
476                            "Sonos Port",
477                        ),
478                        ssl_port: None,
479                        service_list: Some(
480                            ServiceList {
481                                services: [
482                                    Service {
483                                        service_type: "urn:schemas-upnp-org:service:ContentDirectory:1",
484                                        service_id: "urn:upnp-org:serviceId:ContentDirectory",
485                                        control_url: "/MediaServer/ContentDirectory/Control",
486                                        event_sub_url: "/MediaServer/ContentDirectory/Event",
487                                        scpd_url: "/xml/ContentDirectory1.xml",
488                                    },
489                                    Service {
490                                        service_type: "urn:schemas-upnp-org:service:ConnectionManager:1",
491                                        service_id: "urn:upnp-org:serviceId:ConnectionManager",
492                                        control_url: "/MediaServer/ConnectionManager/Control",
493                                        event_sub_url: "/MediaServer/ConnectionManager/Event",
494                                        scpd_url: "/xml/ConnectionManager1.xml",
495                                    },
496                                ],
497                            },
498                        ),
499                        device_list: None,
500                    },
501                    DeviceSpec {
502                        friendly_name: "Some Room - Sonos Port Media Renderer - RINCON_XXX",
503                        device_type: "urn:schemas-upnp-org:device:MediaRenderer:1",
504                        model_number: Some(
505                            "S23",
506                        ),
507                        model_description: Some(
508                            "Sonos Port Media Renderer",
509                        ),
510                        model_name: Some(
511                            "Sonos Port",
512                        ),
513                        ssl_port: None,
514                        service_list: Some(
515                            ServiceList {
516                                services: [
517                                    Service {
518                                        service_type: "urn:schemas-upnp-org:service:RenderingControl:1",
519                                        service_id: "urn:upnp-org:serviceId:RenderingControl",
520                                        control_url: "/MediaRenderer/RenderingControl/Control",
521                                        event_sub_url: "/MediaRenderer/RenderingControl/Event",
522                                        scpd_url: "/xml/RenderingControl1.xml",
523                                    },
524                                    Service {
525                                        service_type: "urn:schemas-upnp-org:service:ConnectionManager:1",
526                                        service_id: "urn:upnp-org:serviceId:ConnectionManager",
527                                        control_url: "/MediaRenderer/ConnectionManager/Control",
528                                        event_sub_url: "/MediaRenderer/ConnectionManager/Event",
529                                        scpd_url: "/xml/ConnectionManager1.xml",
530                                    },
531                                    Service {
532                                        service_type: "urn:schemas-upnp-org:service:AVTransport:1",
533                                        service_id: "urn:upnp-org:serviceId:AVTransport",
534                                        control_url: "/MediaRenderer/AVTransport/Control",
535                                        event_sub_url: "/MediaRenderer/AVTransport/Event",
536                                        scpd_url: "/xml/AVTransport1.xml",
537                                    },
538                                    Service {
539                                        service_type: "urn:schemas-sonos-com:service:Queue:1",
540                                        service_id: "urn:sonos-com:serviceId:Queue",
541                                        control_url: "/MediaRenderer/Queue/Control",
542                                        event_sub_url: "/MediaRenderer/Queue/Event",
543                                        scpd_url: "/xml/Queue1.xml",
544                                    },
545                                    Service {
546                                        service_type: "urn:schemas-upnp-org:service:GroupRenderingControl:1",
547                                        service_id: "urn:upnp-org:serviceId:GroupRenderingControl",
548                                        control_url: "/MediaRenderer/GroupRenderingControl/Control",
549                                        event_sub_url: "/MediaRenderer/GroupRenderingControl/Event",
550                                        scpd_url: "/xml/GroupRenderingControl1.xml",
551                                    },
552                                    Service {
553                                        service_type: "urn:schemas-upnp-org:service:VirtualLineIn:1",
554                                        service_id: "urn:upnp-org:serviceId:VirtualLineIn",
555                                        control_url: "/MediaRenderer/VirtualLineIn/Control",
556                                        event_sub_url: "/MediaRenderer/VirtualLineIn/Event",
557                                        scpd_url: "/xml/VirtualLineIn1.xml",
558                                    },
559                                ],
560                            },
561                        ),
562                        device_list: None,
563                    },
564                ],
565            },
566        ),
567    },
568}
569"#
570        );
571    }
572}