librespot_discovery/
lib.rs

1//! Advertises this device to Spotify clients in the local network.
2//!
3//! This device will show up in the list of "available devices".
4//! Once it is selected from the list, [`Credentials`] are received.
5//! Those can be used to establish a new Session with [`librespot_core`].
6//!
7//! This library uses mDNS and DNS-SD so that other devices can find it,
8//! and spawns an http server to answer requests of Spotify clients.
9
10mod avahi;
11mod server;
12
13use std::{
14    borrow::Cow,
15    error::Error as StdError,
16    pin::Pin,
17    task::{Context, Poll},
18};
19
20use futures_core::Stream;
21use thiserror::Error;
22use tokio::sync::{mpsc, oneshot};
23
24use self::server::DiscoveryServer;
25
26pub use crate::core::Error;
27use librespot_core as core;
28
29/// Credentials to be used in [`librespot`](`librespot_core`).
30pub use crate::core::authentication::Credentials;
31
32/// Determining the icon in the list of available devices.
33pub use crate::core::config::DeviceType;
34
35pub enum DiscoveryEvent {
36    Credentials(Credentials),
37    ServerError(DiscoveryError),
38    ZeroconfError(DiscoveryError),
39}
40
41enum ZeroconfCmd {
42    Shutdown,
43}
44
45pub struct DnsSdHandle {
46    task_handle: tokio::task::JoinHandle<()>,
47    shutdown_tx: oneshot::Sender<ZeroconfCmd>,
48}
49
50impl DnsSdHandle {
51    async fn shutdown(self) {
52        log::debug!("Shutting down zeroconf responder");
53        let Self {
54            task_handle,
55            shutdown_tx,
56        } = self;
57        if shutdown_tx.send(ZeroconfCmd::Shutdown).is_err() {
58            log::warn!("Zeroconf responder unexpectedly disappeared");
59        } else {
60            let _ = task_handle.await;
61            log::debug!("Zeroconf responder stopped");
62        }
63    }
64}
65
66pub type DnsSdServiceBuilder = fn(
67    Cow<'static, str>,
68    Vec<std::net::IpAddr>,
69    u16,
70    mpsc::UnboundedSender<DiscoveryEvent>,
71) -> Result<DnsSdHandle, Error>;
72
73// Default goes first: This matches the behaviour when feature flags were exlusive, i.e. when there
74// was only `feature = "with-dns-sd"` or `not(feature = "with-dns-sd")`
75pub const BACKENDS: &[(
76    &str,
77    // If None, the backend is known but wasn't compiled.
78    Option<DnsSdServiceBuilder>,
79)] = &[
80    #[cfg(feature = "with-avahi")]
81    ("avahi", Some(launch_avahi)),
82    #[cfg(not(feature = "with-avahi"))]
83    ("avahi", None),
84    #[cfg(feature = "with-dns-sd")]
85    ("dns-sd", Some(launch_dns_sd)),
86    #[cfg(not(feature = "with-dns-sd"))]
87    ("dns-sd", None),
88    #[cfg(feature = "with-libmdns")]
89    ("libmdns", Some(launch_libmdns)),
90    #[cfg(not(feature = "with-libmdns"))]
91    ("libmdns", None),
92];
93
94pub fn find(name: Option<&str>) -> Result<DnsSdServiceBuilder, Error> {
95    if let Some(ref name) = name {
96        match BACKENDS.iter().find(|(id, _)| name == id) {
97            Some((_id, Some(launch_svc))) => Ok(*launch_svc),
98            Some((_id, None)) => Err(Error::unavailable(format!(
99                "librespot built without '{name}' support"
100            ))),
101            None => Err(Error::not_found(format!(
102                "unknown zeroconf backend '{name}'"
103            ))),
104        }
105    } else {
106        BACKENDS
107            .iter()
108            .find_map(|(_, launch_svc)| *launch_svc)
109            .ok_or(Error::unavailable(
110                "librespot built without zeroconf backends",
111            ))
112    }
113}
114
115/// Makes this device visible to Spotify clients in the local network.
116///
117/// `Discovery` implements the [`Stream`] trait. Every time this device
118/// is selected in the list of available devices, it yields [`Credentials`].
119pub struct Discovery {
120    server: DiscoveryServer,
121
122    /// An opaque handle to the DNS-SD service. Dropping this will unregister the service.
123    #[allow(unused)]
124    svc: DnsSdHandle,
125
126    event_rx: mpsc::UnboundedReceiver<DiscoveryEvent>,
127}
128
129/// A builder for [`Discovery`].
130pub struct Builder {
131    server_config: server::Config,
132    port: u16,
133    zeroconf_ip: Vec<std::net::IpAddr>,
134    zeroconf_backend: Option<DnsSdServiceBuilder>,
135}
136
137/// Errors that can occur while setting up a [`Discovery`] instance.
138#[derive(Debug, Error)]
139pub enum DiscoveryError {
140    #[error("Creating SHA1 block cipher failed")]
141    AesError(#[from] aes::cipher::InvalidLength),
142
143    #[error("Setting up dns-sd failed: {0}")]
144    DnsSdError(#[source] Box<dyn StdError + Send + Sync>),
145
146    #[error("Creating SHA1 HMAC failed for base key {0:?}")]
147    HmacError(Vec<u8>),
148
149    #[error("Setting up the HTTP server failed: {0}")]
150    HttpServerError(#[from] hyper::Error),
151
152    #[error("Missing params for key {0}")]
153    ParamsError(&'static str),
154}
155
156#[cfg(feature = "with-avahi")]
157impl From<zbus::Error> for DiscoveryError {
158    fn from(error: zbus::Error) -> Self {
159        Self::DnsSdError(Box::new(error))
160    }
161}
162
163impl From<DiscoveryError> for Error {
164    fn from(err: DiscoveryError) -> Self {
165        match err {
166            DiscoveryError::AesError(_) => Error::unavailable(err),
167            DiscoveryError::DnsSdError(_) => Error::unavailable(err),
168            DiscoveryError::HmacError(_) => Error::invalid_argument(err),
169            DiscoveryError::HttpServerError(_) => Error::unavailable(err),
170            DiscoveryError::ParamsError(_) => Error::invalid_argument(err),
171        }
172    }
173}
174
175#[allow(unused)]
176const DNS_SD_SERVICE_NAME: &str = "_spotify-connect._tcp";
177#[allow(unused)]
178const TXT_RECORD: [&str; 2] = ["VERSION=1.0", "CPath=/"];
179
180#[cfg(feature = "with-avahi")]
181async fn avahi_task(
182    name: Cow<'static, str>,
183    port: u16,
184    entry_group: &mut Option<avahi::EntryGroupProxy<'_>>,
185) -> Result<(), DiscoveryError> {
186    use self::avahi::{EntryGroupState, ServerProxy};
187    use futures_util::StreamExt;
188
189    let conn = zbus::Connection::system().await?;
190
191    // Wait for the daemon to show up.
192    // On error: Failed to listen for NameOwnerChanged signal => Fatal DBus issue
193    let bus = zbus::fdo::DBusProxy::new(&conn).await?;
194    let mut stream = bus
195        .receive_name_owner_changed_with_args(&[(0, "org.freedesktop.Avahi")])
196        .await?;
197
198    loop {
199        // Wait for Avahi daemon to be started
200        'wait_avahi: {
201            while let Poll::Ready(Some(_)) = futures_util::poll!(stream.next()) {
202                // Drain queued name owner changes, since we're going to connect in a second
203            }
204
205            // Ping after we connected to the signal since it might have shown up in the meantime
206            if let Ok(avahi_peer) =
207                zbus::fdo::PeerProxy::new(&conn, "org.freedesktop.Avahi", "/").await
208            {
209                if avahi_peer.ping().await.is_ok() {
210                    log::debug!("Pinged Avahi: Available");
211                    break 'wait_avahi;
212                }
213            }
214            log::warn!(
215                "Failed to connect to Avahi, zeroconf discovery will not work until avahi-daemon is started. Check that it is installed and running"
216            );
217
218            // If it didn't, wait for the signal
219            match stream.next().await {
220                Some(_signal) => {
221                    log::debug!("Avahi appeared");
222                    break 'wait_avahi;
223                }
224                // The stream ended, but this should never happen
225                None => {
226                    return Err(zbus::Error::Failure("DBus disappeared".to_owned()).into());
227                }
228            }
229        }
230
231        // Connect to Avahi and publish the service
232        let avahi_server = ServerProxy::new(&conn).await?;
233        log::trace!("Connected to Avahi");
234
235        *entry_group = Some(avahi_server.entry_group_new().await?);
236
237        let mut entry_group_state_stream = entry_group
238            .as_mut()
239            .unwrap()
240            .receive_state_changed()
241            .await?;
242
243        entry_group
244            .as_mut()
245            .unwrap()
246            .add_service(
247                -1, // AVAHI_IF_UNSPEC
248                -1, // IPv4 and IPv6
249                0,  // flags
250                &name,
251                DNS_SD_SERVICE_NAME, // type
252                "",                  // domain: let the server choose
253                "",                  // host: let the server choose
254                port,
255                &TXT_RECORD.map(|s| s.as_bytes()),
256            )
257            .await?;
258
259        entry_group.as_mut().unwrap().commit().await?;
260        log::debug!("Commited zeroconf service with name {}", &name);
261
262        'monitor_service: loop {
263            tokio::select! {
264                Some(state_changed) = entry_group_state_stream.next() => {
265                    let (state, error) = match state_changed.args() {
266                        Ok(sc) => (sc.state, sc.error),
267                        Err(e) => {
268                            log::warn!("Error on receiving EntryGroup state from Avahi: {}", e);
269                            continue 'monitor_service;
270                        }
271                    };
272                    match state {
273                        EntryGroupState::Uncommited | EntryGroupState::Registering => {
274                            // Not yet registered, ignore.
275                        }
276                        EntryGroupState::Established => {
277                            log::info!("Published zeroconf service");
278                        }
279                        EntryGroupState::Collision => {
280                            // This most likely means that librespot has unintentionally been started twice.
281                            // Thus, don't retry with a new name, but abort.
282                            //
283                            // Note that the error would usually already be returned by
284                            // entry_group.add_service above, so this state_changed handler
285                            // won't be hit.
286                            //
287                            // EntryGroup has been withdrawn at this point already!
288                            log::error!("zeroconf collision for name '{}'", &name);
289                            return Err(zbus::Error::Failure(format!("zeroconf collision for name: {name}")).into());
290                        }
291                        EntryGroupState::Failure => {
292                            // TODO: Back off/treat as fatal?
293                            // EntryGroup has been withdrawn at this point already!
294                            // There seems to be no code in Avahi that actually sets this state.
295                            log::error!("zeroconf failure: {}", error);
296                            return Err(zbus::Error::Failure(format!("zeroconf failure: {error}")).into());
297                        }
298                    }
299                }
300                _name_owner_change = stream.next() => {
301                    break 'monitor_service;
302                }
303            }
304        }
305
306        // Avahi disappeared (or the service was immediately taken over by a
307        // new daemon) => drop all handles, and reconnect
308        log::info!("Avahi disappeared, trying to reconnect");
309    }
310}
311
312#[cfg(feature = "with-avahi")]
313fn launch_avahi(
314    name: Cow<'static, str>,
315    _zeroconf_ip: Vec<std::net::IpAddr>,
316    port: u16,
317    status_tx: mpsc::UnboundedSender<DiscoveryEvent>,
318) -> Result<DnsSdHandle, Error> {
319    let (shutdown_tx, shutdown_rx) = oneshot::channel();
320
321    let task_handle = tokio::spawn(async move {
322        let mut entry_group = None;
323        tokio::select! {
324            res = avahi_task(name, port, &mut entry_group) => {
325                if let Err(e) = res {
326                    log::error!("Avahi error: {}", e);
327                    let _ = status_tx.send(DiscoveryEvent::ZeroconfError(e));
328                }
329            },
330            _ = shutdown_rx => {
331                if let Some(entry_group) = entry_group.as_mut() {
332                    if let Err(e) = entry_group.free().await {
333                        log::warn!("Failed to un-publish zeroconf service: {}", e);
334                    } else {
335                        log::debug!("Un-published zeroconf service");
336                    }
337                }
338            },
339        }
340    });
341
342    Ok(DnsSdHandle {
343        task_handle,
344        shutdown_tx,
345    })
346}
347
348#[cfg(feature = "with-dns-sd")]
349fn launch_dns_sd(
350    name: Cow<'static, str>,
351    _zeroconf_ip: Vec<std::net::IpAddr>,
352    port: u16,
353    status_tx: mpsc::UnboundedSender<DiscoveryEvent>,
354) -> Result<DnsSdHandle, Error> {
355    let (shutdown_tx, shutdown_rx) = oneshot::channel();
356
357    let task_handle = tokio::task::spawn_blocking(move || {
358        let inner = move || -> Result<(), DiscoveryError> {
359            let svc = dns_sd::DNSService::register(
360                Some(name.as_ref()),
361                DNS_SD_SERVICE_NAME,
362                None,
363                None,
364                port,
365                &TXT_RECORD,
366            )
367            .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))?;
368
369            let _ = shutdown_rx.blocking_recv();
370
371            std::mem::drop(svc);
372
373            Ok(())
374        };
375
376        if let Err(e) = inner() {
377            log::error!("dns_sd error: {}", e);
378            let _ = status_tx.send(DiscoveryEvent::ZeroconfError(e));
379        }
380    });
381
382    Ok(DnsSdHandle {
383        shutdown_tx,
384        task_handle,
385    })
386}
387
388#[cfg(feature = "with-libmdns")]
389fn launch_libmdns(
390    name: Cow<'static, str>,
391    zeroconf_ip: Vec<std::net::IpAddr>,
392    port: u16,
393    status_tx: mpsc::UnboundedSender<DiscoveryEvent>,
394) -> Result<DnsSdHandle, Error> {
395    let (shutdown_tx, shutdown_rx) = oneshot::channel();
396
397    let task_handle = tokio::task::spawn_blocking(move || {
398        let inner = move || -> Result<(), DiscoveryError> {
399            let responder = if !zeroconf_ip.is_empty() {
400                libmdns::Responder::spawn_with_ip_list(
401                    &tokio::runtime::Handle::current(),
402                    zeroconf_ip,
403                )
404            } else {
405                libmdns::Responder::spawn(&tokio::runtime::Handle::current())
406            }
407            .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))?;
408
409            let svc = responder.register(DNS_SD_SERVICE_NAME, &name, port, &TXT_RECORD);
410
411            let _ = shutdown_rx.blocking_recv();
412
413            std::mem::drop(svc);
414
415            Ok(())
416        };
417
418        if let Err(e) = inner() {
419            log::error!("libmdns error: {e}");
420            let _ = status_tx.send(DiscoveryEvent::ZeroconfError(e));
421        }
422    });
423
424    Ok(DnsSdHandle {
425        shutdown_tx,
426        task_handle,
427    })
428}
429
430impl Builder {
431    /// Starts a new builder using the provided device and client IDs.
432    pub fn new<T: Into<String>>(device_id: T, client_id: T) -> Self {
433        Self {
434            server_config: server::Config {
435                name: "Librespot".into(),
436                device_type: DeviceType::default(),
437                is_group: false,
438                device_id: device_id.into(),
439                client_id: client_id.into(),
440                aliases: Vec::new(),
441            },
442            port: 0,
443            zeroconf_ip: vec![],
444            zeroconf_backend: None,
445        }
446    }
447
448    /// Sets the name to be displayed. Default is `"Librespot"`.
449    pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
450        self.server_config.name = name.into();
451        self
452    }
453
454    /// Sets the device type which is visible as icon in other Spotify clients. Default is `Speaker`.
455    pub fn device_type(mut self, device_type: DeviceType) -> Self {
456        self.server_config.device_type = device_type;
457        self
458    }
459
460    /// Sets whether the device is a group. This affects the icon in Spotify clients. Default is `false`.
461    pub fn is_group(mut self, is_group: bool) -> Self {
462        self.server_config.is_group = is_group;
463        self
464    }
465
466    /// Adds an alias for this device. Multiple aliases can be added by calling this method multiple times.
467    pub fn add_alias(
468        mut self,
469        alias: impl Into<Cow<'static, str>>,
470        id: u32,
471        is_group: bool,
472    ) -> Self {
473        self.server_config.aliases.push(server::Alias {
474            name: alias.into(),
475            id,
476            is_group,
477        });
478        self
479    }
480
481    /// Set the ip addresses on which it should listen to incoming connections. The default is all interfaces.
482    pub fn zeroconf_ip(mut self, zeroconf_ip: Vec<std::net::IpAddr>) -> Self {
483        self.zeroconf_ip = zeroconf_ip;
484        self
485    }
486
487    /// Set the zeroconf (MDNS and DNS-SD) implementation to use.
488    pub fn zeroconf_backend(mut self, zeroconf_backend: DnsSdServiceBuilder) -> Self {
489        self.zeroconf_backend = Some(zeroconf_backend);
490        self
491    }
492
493    /// Sets the port on which it should listen to incoming connections.
494    /// The default value `0` means any port.
495    pub fn port(mut self, port: u16) -> Self {
496        self.port = port;
497        self
498    }
499
500    /// Sets up the [`Discovery`] instance.
501    ///
502    /// # Errors
503    /// If setting up the mdns service or creating the server fails, this function returns an error.
504    pub fn launch(self) -> Result<Discovery, Error> {
505        let name = self.server_config.name.clone();
506        let zeroconf_ip = self.zeroconf_ip;
507
508        let (event_tx, event_rx) = mpsc::unbounded_channel();
509
510        let mut port = self.port;
511        let server = DiscoveryServer::new(self.server_config, &mut port, event_tx.clone())?;
512
513        let launch_svc = self.zeroconf_backend.unwrap_or(find(None)?);
514        let svc = launch_svc(name, zeroconf_ip, port, event_tx)?;
515        Ok(Discovery {
516            server,
517            svc,
518            event_rx,
519        })
520    }
521}
522
523impl Discovery {
524    /// Starts a [`Builder`] with the provided device id.
525    pub fn builder<T: Into<String>>(device_id: T, client_id: T) -> Builder {
526        Builder::new(device_id, client_id)
527    }
528
529    /// Create a new instance with the specified device id and default paramaters.
530    pub fn new<T: Into<String>>(device_id: T, client_id: T) -> Result<Self, Error> {
531        Self::builder(device_id, client_id).launch()
532    }
533
534    pub async fn shutdown(self) {
535        tokio::join!(self.server.shutdown(), self.svc.shutdown(),);
536    }
537}
538
539impl Stream for Discovery {
540    type Item = Credentials;
541
542    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
543        match Pin::new(&mut self.event_rx).poll_recv(cx) {
544            // Yields credentials
545            Poll::Ready(Some(DiscoveryEvent::Credentials(creds))) => Poll::Ready(Some(creds)),
546            // Also terminate the stream on fatal server or MDNS/DNS-SD errors.
547            Poll::Ready(Some(
548                DiscoveryEvent::ServerError(_) | DiscoveryEvent::ZeroconfError(_),
549            )) => Poll::Ready(None),
550            Poll::Ready(None) => Poll::Ready(None),
551            Poll::Pending => Poll::Pending,
552        }
553    }
554}