librespot_core/dealer/
manager.rs

1use futures_core::Stream;
2use futures_util::StreamExt;
3use std::{pin::Pin, str::FromStr, sync::OnceLock};
4use thiserror::Error;
5use tokio::sync::mpsc;
6use tokio_stream::wrappers::UnboundedReceiverStream;
7use url::Url;
8
9use super::{
10    Builder, Dealer, GetUrlResult, Request, RequestHandler, Responder, Response, Subscription,
11    protocol::Message,
12};
13use crate::{Error, Session};
14
15component! {
16    DealerManager: DealerManagerInner {
17        builder: OnceLock<Builder> = OnceLock::from(Builder::new()),
18        dealer: OnceLock<Dealer> = OnceLock::new(),
19    }
20}
21
22pub type BoxedStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
23pub type BoxedStreamResult<T> = BoxedStream<Result<T, Error>>;
24
25#[derive(Error, Debug)]
26enum DealerError {
27    #[error("Builder wasn't available")]
28    BuilderNotAvailable,
29    #[error("Websocket couldn't be started because: {0}")]
30    LaunchFailure(Error),
31    #[error("Failed to set dealer")]
32    CouldNotSetDealer,
33}
34
35impl From<DealerError> for Error {
36    fn from(err: DealerError) -> Self {
37        Error::failed_precondition(err)
38    }
39}
40
41#[derive(Debug)]
42pub enum Reply {
43    Success,
44    Failure,
45    Unanswered,
46}
47
48pub type RequestReply = (Request, mpsc::UnboundedSender<Reply>);
49type RequestReceiver = mpsc::UnboundedReceiver<RequestReply>;
50type RequestSender = mpsc::UnboundedSender<RequestReply>;
51
52struct DealerRequestHandler(RequestSender);
53
54impl DealerRequestHandler {
55    pub fn new() -> (Self, RequestReceiver) {
56        let (tx, rx) = mpsc::unbounded_channel();
57        (DealerRequestHandler(tx), rx)
58    }
59}
60
61impl RequestHandler for DealerRequestHandler {
62    fn handle_request(&self, request: Request, responder: Responder) {
63        let (tx, mut rx) = mpsc::unbounded_channel();
64
65        if let Err(why) = self.0.send((request, tx)) {
66            error!("failed sending dealer request {why}");
67            responder.send(Response { success: false });
68            return;
69        }
70
71        tokio::spawn(async move {
72            let reply = rx.recv().await.unwrap_or(Reply::Failure);
73            debug!("replying to ws request: {reply:?}");
74            match reply {
75                Reply::Unanswered => responder.force_unanswered(),
76                Reply::Success | Reply::Failure => responder.send(Response {
77                    success: matches!(reply, Reply::Success),
78                }),
79            }
80        });
81    }
82}
83
84impl DealerManager {
85    async fn get_url(session: Session) -> GetUrlResult {
86        let (host, port) = session.apresolver().resolve("dealer").await?;
87        let token = session.login5().auth_token().await?.access_token;
88        let url = format!("wss://{host}:{port}/?access_token={token}");
89        let url = Url::from_str(&url)?;
90        Ok(url)
91    }
92
93    pub fn add_listen_for(&self, url: impl Into<String>) -> Result<Subscription, Error> {
94        let url = url.into();
95        self.lock(|inner| {
96            if let Some(dealer) = inner.dealer.get() {
97                dealer.subscribe(&[&url])
98            } else if let Some(builder) = inner.builder.get_mut() {
99                builder.subscribe(&[&url])
100            } else {
101                Err(DealerError::BuilderNotAvailable.into())
102            }
103        })
104    }
105
106    pub fn listen_for<T>(
107        &self,
108        uri: impl Into<String>,
109        t: impl Fn(Message) -> Result<T, Error> + Send + 'static,
110    ) -> Result<BoxedStreamResult<T>, Error> {
111        Ok(Box::pin(self.add_listen_for(uri)?.map(t)))
112    }
113
114    pub fn add_handle_for(&self, url: impl Into<String>) -> Result<RequestReceiver, Error> {
115        let url = url.into();
116
117        let (handler, receiver) = DealerRequestHandler::new();
118        self.lock(|inner| {
119            if let Some(dealer) = inner.dealer.get() {
120                dealer.add_handler(&url, handler).map(|_| receiver)
121            } else if let Some(builder) = inner.builder.get_mut() {
122                builder.add_handler(&url, handler).map(|_| receiver)
123            } else {
124                Err(DealerError::BuilderNotAvailable.into())
125            }
126        })
127    }
128
129    pub fn handle_for(&self, uri: impl Into<String>) -> Result<BoxedStream<RequestReply>, Error> {
130        Ok(Box::pin(
131            self.add_handle_for(uri).map(UnboundedReceiverStream::new)?,
132        ))
133    }
134
135    pub fn handles(&self, uri: &str) -> bool {
136        self.lock(|inner| {
137            if let Some(dealer) = inner.dealer.get() {
138                dealer.handles(uri)
139            } else if let Some(builder) = inner.builder.get() {
140                builder.handles(uri)
141            } else {
142                false
143            }
144        })
145    }
146
147    pub async fn start(&self) -> Result<(), Error> {
148        debug!("Launching dealer");
149
150        let session = self.session();
151        // the url has to be a function that can retrieve a new url,
152        // otherwise when we later try to reconnect with the initial url/token
153        // and the token is expired we will just get 401 error
154        let get_url = move || Self::get_url(session.clone());
155
156        let dealer = self
157            .lock(move |inner| inner.builder.take())
158            .ok_or(DealerError::BuilderNotAvailable)?
159            .launch(get_url, None)
160            .await
161            .map_err(DealerError::LaunchFailure)?;
162
163        self.lock(|inner| inner.dealer.set(dealer))
164            .map_err(|_| DealerError::CouldNotSetDealer)?;
165
166        Ok(())
167    }
168
169    pub async fn close(&self) {
170        if let Some(dealer) = self.lock(|inner| inner.dealer.take()) {
171            dealer.close().await
172        }
173    }
174}