1mod avahi;
11mod server;
12
13use std::{
14 borrow::Cow,
15 error::Error as StdError,
16 pin::Pin,
17 task::{Context, Poll},
18};
19
20use futures_core::Stream;
21use thiserror::Error;
22use tokio::sync::{mpsc, oneshot};
23
24use self::server::DiscoveryServer;
25
26pub use crate::core::Error;
27use librespot_core as core;
28
29pub use crate::core::authentication::Credentials;
31
32pub use crate::core::config::DeviceType;
34
35pub enum DiscoveryEvent {
36 Credentials(Credentials),
37 ServerError(DiscoveryError),
38 ZeroconfError(DiscoveryError),
39}
40
41enum ZeroconfCmd {
42 Shutdown,
43}
44
45pub struct DnsSdHandle {
46 task_handle: tokio::task::JoinHandle<()>,
47 shutdown_tx: oneshot::Sender<ZeroconfCmd>,
48}
49
50impl DnsSdHandle {
51 async fn shutdown(self) {
52 log::debug!("Shutting down zeroconf responder");
53 let Self {
54 task_handle,
55 shutdown_tx,
56 } = self;
57 if shutdown_tx.send(ZeroconfCmd::Shutdown).is_err() {
58 log::warn!("Zeroconf responder unexpectedly disappeared");
59 } else {
60 let _ = task_handle.await;
61 log::debug!("Zeroconf responder stopped");
62 }
63 }
64}
65
66pub type DnsSdServiceBuilder = fn(
67 Cow<'static, str>,
68 Vec<std::net::IpAddr>,
69 u16,
70 mpsc::UnboundedSender<DiscoveryEvent>,
71) -> Result<DnsSdHandle, Error>;
72
73pub const BACKENDS: &[(
76 &str,
77 Option<DnsSdServiceBuilder>,
79)] = &[
80 #[cfg(feature = "with-avahi")]
81 ("avahi", Some(launch_avahi)),
82 #[cfg(not(feature = "with-avahi"))]
83 ("avahi", None),
84 #[cfg(feature = "with-dns-sd")]
85 ("dns-sd", Some(launch_dns_sd)),
86 #[cfg(not(feature = "with-dns-sd"))]
87 ("dns-sd", None),
88 #[cfg(feature = "with-libmdns")]
89 ("libmdns", Some(launch_libmdns)),
90 #[cfg(not(feature = "with-libmdns"))]
91 ("libmdns", None),
92];
93
94pub fn find(name: Option<&str>) -> Result<DnsSdServiceBuilder, Error> {
95 if let Some(ref name) = name {
96 match BACKENDS.iter().find(|(id, _)| name == id) {
97 Some((_id, Some(launch_svc))) => Ok(*launch_svc),
98 Some((_id, None)) => Err(Error::unavailable(format!(
99 "librespot built without '{name}' support"
100 ))),
101 None => Err(Error::not_found(format!(
102 "unknown zeroconf backend '{name}'"
103 ))),
104 }
105 } else {
106 BACKENDS
107 .iter()
108 .find_map(|(_, launch_svc)| *launch_svc)
109 .ok_or(Error::unavailable(
110 "librespot built without zeroconf backends",
111 ))
112 }
113}
114
115pub struct Discovery {
120 server: DiscoveryServer,
121
122 #[allow(unused)]
124 svc: DnsSdHandle,
125
126 event_rx: mpsc::UnboundedReceiver<DiscoveryEvent>,
127}
128
129pub struct Builder {
131 server_config: server::Config,
132 port: u16,
133 zeroconf_ip: Vec<std::net::IpAddr>,
134 zeroconf_backend: Option<DnsSdServiceBuilder>,
135}
136
137#[derive(Debug, Error)]
139pub enum DiscoveryError {
140 #[error("Creating SHA1 block cipher failed")]
141 AesError(#[from] aes::cipher::InvalidLength),
142
143 #[error("Setting up dns-sd failed: {0}")]
144 DnsSdError(#[source] Box<dyn StdError + Send + Sync>),
145
146 #[error("Creating SHA1 HMAC failed for base key {0:?}")]
147 HmacError(Vec<u8>),
148
149 #[error("Setting up the HTTP server failed: {0}")]
150 HttpServerError(#[from] hyper::Error),
151
152 #[error("Missing params for key {0}")]
153 ParamsError(&'static str),
154}
155
156#[cfg(feature = "with-avahi")]
157impl From<zbus::Error> for DiscoveryError {
158 fn from(error: zbus::Error) -> Self {
159 Self::DnsSdError(Box::new(error))
160 }
161}
162
163impl From<DiscoveryError> for Error {
164 fn from(err: DiscoveryError) -> Self {
165 match err {
166 DiscoveryError::AesError(_) => Error::unavailable(err),
167 DiscoveryError::DnsSdError(_) => Error::unavailable(err),
168 DiscoveryError::HmacError(_) => Error::invalid_argument(err),
169 DiscoveryError::HttpServerError(_) => Error::unavailable(err),
170 DiscoveryError::ParamsError(_) => Error::invalid_argument(err),
171 }
172 }
173}
174
175#[allow(unused)]
176const DNS_SD_SERVICE_NAME: &str = "_spotify-connect._tcp";
177#[allow(unused)]
178const TXT_RECORD: [&str; 2] = ["VERSION=1.0", "CPath=/"];
179
180#[cfg(feature = "with-avahi")]
181async fn avahi_task(
182 name: Cow<'static, str>,
183 port: u16,
184 entry_group: &mut Option<avahi::EntryGroupProxy<'_>>,
185) -> Result<(), DiscoveryError> {
186 use self::avahi::{EntryGroupState, ServerProxy};
187 use futures_util::StreamExt;
188
189 let conn = zbus::Connection::system().await?;
190
191 let bus = zbus::fdo::DBusProxy::new(&conn).await?;
194 let mut stream = bus
195 .receive_name_owner_changed_with_args(&[(0, "org.freedesktop.Avahi")])
196 .await?;
197
198 loop {
199 'wait_avahi: {
201 while let Poll::Ready(Some(_)) = futures_util::poll!(stream.next()) {
202 }
204
205 if let Ok(avahi_peer) =
207 zbus::fdo::PeerProxy::new(&conn, "org.freedesktop.Avahi", "/").await
208 {
209 if avahi_peer.ping().await.is_ok() {
210 log::debug!("Pinged Avahi: Available");
211 break 'wait_avahi;
212 }
213 }
214 log::warn!(
215 "Failed to connect to Avahi, zeroconf discovery will not work until avahi-daemon is started. Check that it is installed and running"
216 );
217
218 match stream.next().await {
220 Some(_signal) => {
221 log::debug!("Avahi appeared");
222 break 'wait_avahi;
223 }
224 None => {
226 return Err(zbus::Error::Failure("DBus disappeared".to_owned()).into());
227 }
228 }
229 }
230
231 let avahi_server = ServerProxy::new(&conn).await?;
233 log::trace!("Connected to Avahi");
234
235 *entry_group = Some(avahi_server.entry_group_new().await?);
236
237 let mut entry_group_state_stream = entry_group
238 .as_mut()
239 .unwrap()
240 .receive_state_changed()
241 .await?;
242
243 entry_group
244 .as_mut()
245 .unwrap()
246 .add_service(
247 -1, -1, 0, &name,
251 DNS_SD_SERVICE_NAME, "", "", port,
255 &TXT_RECORD.map(|s| s.as_bytes()),
256 )
257 .await?;
258
259 entry_group.as_mut().unwrap().commit().await?;
260 log::debug!("Commited zeroconf service with name {}", &name);
261
262 'monitor_service: loop {
263 tokio::select! {
264 Some(state_changed) = entry_group_state_stream.next() => {
265 let (state, error) = match state_changed.args() {
266 Ok(sc) => (sc.state, sc.error),
267 Err(e) => {
268 log::warn!("Error on receiving EntryGroup state from Avahi: {}", e);
269 continue 'monitor_service;
270 }
271 };
272 match state {
273 EntryGroupState::Uncommited | EntryGroupState::Registering => {
274 }
276 EntryGroupState::Established => {
277 log::info!("Published zeroconf service");
278 }
279 EntryGroupState::Collision => {
280 log::error!("zeroconf collision for name '{}'", &name);
289 return Err(zbus::Error::Failure(format!("zeroconf collision for name: {name}")).into());
290 }
291 EntryGroupState::Failure => {
292 log::error!("zeroconf failure: {}", error);
296 return Err(zbus::Error::Failure(format!("zeroconf failure: {error}")).into());
297 }
298 }
299 }
300 _name_owner_change = stream.next() => {
301 break 'monitor_service;
302 }
303 }
304 }
305
306 log::info!("Avahi disappeared, trying to reconnect");
309 }
310}
311
312#[cfg(feature = "with-avahi")]
313fn launch_avahi(
314 name: Cow<'static, str>,
315 _zeroconf_ip: Vec<std::net::IpAddr>,
316 port: u16,
317 status_tx: mpsc::UnboundedSender<DiscoveryEvent>,
318) -> Result<DnsSdHandle, Error> {
319 let (shutdown_tx, shutdown_rx) = oneshot::channel();
320
321 let task_handle = tokio::spawn(async move {
322 let mut entry_group = None;
323 tokio::select! {
324 res = avahi_task(name, port, &mut entry_group) => {
325 if let Err(e) = res {
326 log::error!("Avahi error: {}", e);
327 let _ = status_tx.send(DiscoveryEvent::ZeroconfError(e));
328 }
329 },
330 _ = shutdown_rx => {
331 if let Some(entry_group) = entry_group.as_mut() {
332 if let Err(e) = entry_group.free().await {
333 log::warn!("Failed to un-publish zeroconf service: {}", e);
334 } else {
335 log::debug!("Un-published zeroconf service");
336 }
337 }
338 },
339 }
340 });
341
342 Ok(DnsSdHandle {
343 task_handle,
344 shutdown_tx,
345 })
346}
347
348#[cfg(feature = "with-dns-sd")]
349fn launch_dns_sd(
350 name: Cow<'static, str>,
351 _zeroconf_ip: Vec<std::net::IpAddr>,
352 port: u16,
353 status_tx: mpsc::UnboundedSender<DiscoveryEvent>,
354) -> Result<DnsSdHandle, Error> {
355 let (shutdown_tx, shutdown_rx) = oneshot::channel();
356
357 let task_handle = tokio::task::spawn_blocking(move || {
358 let inner = move || -> Result<(), DiscoveryError> {
359 let svc = dns_sd::DNSService::register(
360 Some(name.as_ref()),
361 DNS_SD_SERVICE_NAME,
362 None,
363 None,
364 port,
365 &TXT_RECORD,
366 )
367 .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))?;
368
369 let _ = shutdown_rx.blocking_recv();
370
371 std::mem::drop(svc);
372
373 Ok(())
374 };
375
376 if let Err(e) = inner() {
377 log::error!("dns_sd error: {}", e);
378 let _ = status_tx.send(DiscoveryEvent::ZeroconfError(e));
379 }
380 });
381
382 Ok(DnsSdHandle {
383 shutdown_tx,
384 task_handle,
385 })
386}
387
388#[cfg(feature = "with-libmdns")]
389fn launch_libmdns(
390 name: Cow<'static, str>,
391 zeroconf_ip: Vec<std::net::IpAddr>,
392 port: u16,
393 status_tx: mpsc::UnboundedSender<DiscoveryEvent>,
394) -> Result<DnsSdHandle, Error> {
395 let (shutdown_tx, shutdown_rx) = oneshot::channel();
396
397 let task_handle = tokio::task::spawn_blocking(move || {
398 let inner = move || -> Result<(), DiscoveryError> {
399 let responder = if !zeroconf_ip.is_empty() {
400 libmdns::Responder::spawn_with_ip_list(
401 &tokio::runtime::Handle::current(),
402 zeroconf_ip,
403 )
404 } else {
405 libmdns::Responder::spawn(&tokio::runtime::Handle::current())
406 }
407 .map_err(|e| DiscoveryError::DnsSdError(Box::new(e)))?;
408
409 let svc = responder.register(DNS_SD_SERVICE_NAME, &name, port, &TXT_RECORD);
410
411 let _ = shutdown_rx.blocking_recv();
412
413 std::mem::drop(svc);
414
415 Ok(())
416 };
417
418 if let Err(e) = inner() {
419 log::error!("libmdns error: {e}");
420 let _ = status_tx.send(DiscoveryEvent::ZeroconfError(e));
421 }
422 });
423
424 Ok(DnsSdHandle {
425 shutdown_tx,
426 task_handle,
427 })
428}
429
430impl Builder {
431 pub fn new<T: Into<String>>(device_id: T, client_id: T) -> Self {
433 Self {
434 server_config: server::Config {
435 name: "Librespot".into(),
436 device_type: DeviceType::default(),
437 is_group: false,
438 device_id: device_id.into(),
439 client_id: client_id.into(),
440 aliases: Vec::new(),
441 },
442 port: 0,
443 zeroconf_ip: vec![],
444 zeroconf_backend: None,
445 }
446 }
447
448 pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
450 self.server_config.name = name.into();
451 self
452 }
453
454 pub fn device_type(mut self, device_type: DeviceType) -> Self {
456 self.server_config.device_type = device_type;
457 self
458 }
459
460 pub fn is_group(mut self, is_group: bool) -> Self {
462 self.server_config.is_group = is_group;
463 self
464 }
465
466 pub fn add_alias(
468 mut self,
469 alias: impl Into<Cow<'static, str>>,
470 id: u32,
471 is_group: bool,
472 ) -> Self {
473 self.server_config.aliases.push(server::Alias {
474 name: alias.into(),
475 id,
476 is_group,
477 });
478 self
479 }
480
481 pub fn zeroconf_ip(mut self, zeroconf_ip: Vec<std::net::IpAddr>) -> Self {
483 self.zeroconf_ip = zeroconf_ip;
484 self
485 }
486
487 pub fn zeroconf_backend(mut self, zeroconf_backend: DnsSdServiceBuilder) -> Self {
489 self.zeroconf_backend = Some(zeroconf_backend);
490 self
491 }
492
493 pub fn port(mut self, port: u16) -> Self {
496 self.port = port;
497 self
498 }
499
500 pub fn launch(self) -> Result<Discovery, Error> {
505 let name = self.server_config.name.clone();
506 let zeroconf_ip = self.zeroconf_ip;
507
508 let (event_tx, event_rx) = mpsc::unbounded_channel();
509
510 let mut port = self.port;
511 let server = DiscoveryServer::new(self.server_config, &mut port, event_tx.clone())?;
512
513 let launch_svc = self.zeroconf_backend.unwrap_or(find(None)?);
514 let svc = launch_svc(name, zeroconf_ip, port, event_tx)?;
515 Ok(Discovery {
516 server,
517 svc,
518 event_rx,
519 })
520 }
521}
522
523impl Discovery {
524 pub fn builder<T: Into<String>>(device_id: T, client_id: T) -> Builder {
526 Builder::new(device_id, client_id)
527 }
528
529 pub fn new<T: Into<String>>(device_id: T, client_id: T) -> Result<Self, Error> {
531 Self::builder(device_id, client_id).launch()
532 }
533
534 pub async fn shutdown(self) {
535 tokio::join!(self.server.shutdown(), self.svc.shutdown(),);
536 }
537}
538
539impl Stream for Discovery {
540 type Item = Credentials;
541
542 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
543 match Pin::new(&mut self.event_rx).poll_recv(cx) {
544 Poll::Ready(Some(DiscoveryEvent::Credentials(creds))) => Poll::Ready(Some(creds)),
546 Poll::Ready(Some(
548 DiscoveryEvent::ServerError(_) | DiscoveryEvent::ZeroconfError(_),
549 )) => Poll::Ready(None),
550 Poll::Ready(None) => Poll::Ready(None),
551 Poll::Pending => Poll::Pending,
552 }
553 }
554}