librespot_core/dealer/
manager.rs1use futures_core::Stream;
2use futures_util::StreamExt;
3use std::{pin::Pin, str::FromStr, sync::OnceLock};
4use thiserror::Error;
5use tokio::sync::mpsc;
6use tokio_stream::wrappers::UnboundedReceiverStream;
7use url::Url;
8
9use super::{
10 Builder, Dealer, GetUrlResult, Request, RequestHandler, Responder, Response, Subscription,
11 protocol::Message,
12};
13use crate::{Error, Session};
14
15component! {
16 DealerManager: DealerManagerInner {
17 builder: OnceLock<Builder> = OnceLock::from(Builder::new()),
18 dealer: OnceLock<Dealer> = OnceLock::new(),
19 }
20}
21
22pub type BoxedStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
23pub type BoxedStreamResult<T> = BoxedStream<Result<T, Error>>;
24
25#[derive(Error, Debug)]
26enum DealerError {
27 #[error("Builder wasn't available")]
28 BuilderNotAvailable,
29 #[error("Websocket couldn't be started because: {0}")]
30 LaunchFailure(Error),
31 #[error("Failed to set dealer")]
32 CouldNotSetDealer,
33}
34
35impl From<DealerError> for Error {
36 fn from(err: DealerError) -> Self {
37 Error::failed_precondition(err)
38 }
39}
40
41#[derive(Debug)]
42pub enum Reply {
43 Success,
44 Failure,
45 Unanswered,
46}
47
48pub type RequestReply = (Request, mpsc::UnboundedSender<Reply>);
49type RequestReceiver = mpsc::UnboundedReceiver<RequestReply>;
50type RequestSender = mpsc::UnboundedSender<RequestReply>;
51
52struct DealerRequestHandler(RequestSender);
53
54impl DealerRequestHandler {
55 pub fn new() -> (Self, RequestReceiver) {
56 let (tx, rx) = mpsc::unbounded_channel();
57 (DealerRequestHandler(tx), rx)
58 }
59}
60
61impl RequestHandler for DealerRequestHandler {
62 fn handle_request(&self, request: Request, responder: Responder) {
63 let (tx, mut rx) = mpsc::unbounded_channel();
64
65 if let Err(why) = self.0.send((request, tx)) {
66 error!("failed sending dealer request {why}");
67 responder.send(Response { success: false });
68 return;
69 }
70
71 tokio::spawn(async move {
72 let reply = rx.recv().await.unwrap_or(Reply::Failure);
73 debug!("replying to ws request: {reply:?}");
74 match reply {
75 Reply::Unanswered => responder.force_unanswered(),
76 Reply::Success | Reply::Failure => responder.send(Response {
77 success: matches!(reply, Reply::Success),
78 }),
79 }
80 });
81 }
82}
83
84impl DealerManager {
85 async fn get_url(session: Session) -> GetUrlResult {
86 let (host, port) = session.apresolver().resolve("dealer").await?;
87 let token = session.login5().auth_token().await?.access_token;
88 let url = format!("wss://{host}:{port}/?access_token={token}");
89 let url = Url::from_str(&url)?;
90 Ok(url)
91 }
92
93 pub fn add_listen_for(&self, url: impl Into<String>) -> Result<Subscription, Error> {
94 let url = url.into();
95 self.lock(|inner| {
96 if let Some(dealer) = inner.dealer.get() {
97 dealer.subscribe(&[&url])
98 } else if let Some(builder) = inner.builder.get_mut() {
99 builder.subscribe(&[&url])
100 } else {
101 Err(DealerError::BuilderNotAvailable.into())
102 }
103 })
104 }
105
106 pub fn listen_for<T>(
107 &self,
108 uri: impl Into<String>,
109 t: impl Fn(Message) -> Result<T, Error> + Send + 'static,
110 ) -> Result<BoxedStreamResult<T>, Error> {
111 Ok(Box::pin(self.add_listen_for(uri)?.map(t)))
112 }
113
114 pub fn add_handle_for(&self, url: impl Into<String>) -> Result<RequestReceiver, Error> {
115 let url = url.into();
116
117 let (handler, receiver) = DealerRequestHandler::new();
118 self.lock(|inner| {
119 if let Some(dealer) = inner.dealer.get() {
120 dealer.add_handler(&url, handler).map(|_| receiver)
121 } else if let Some(builder) = inner.builder.get_mut() {
122 builder.add_handler(&url, handler).map(|_| receiver)
123 } else {
124 Err(DealerError::BuilderNotAvailable.into())
125 }
126 })
127 }
128
129 pub fn handle_for(&self, uri: impl Into<String>) -> Result<BoxedStream<RequestReply>, Error> {
130 Ok(Box::pin(
131 self.add_handle_for(uri).map(UnboundedReceiverStream::new)?,
132 ))
133 }
134
135 pub fn handles(&self, uri: &str) -> bool {
136 self.lock(|inner| {
137 if let Some(dealer) = inner.dealer.get() {
138 dealer.handles(uri)
139 } else if let Some(builder) = inner.builder.get() {
140 builder.handles(uri)
141 } else {
142 false
143 }
144 })
145 }
146
147 pub async fn start(&self) -> Result<(), Error> {
148 debug!("Launching dealer");
149
150 let session = self.session();
151 let get_url = move || Self::get_url(session.clone());
155
156 let dealer = self
157 .lock(move |inner| inner.builder.take())
158 .ok_or(DealerError::BuilderNotAvailable)?
159 .launch(get_url, None)
160 .await
161 .map_err(DealerError::LaunchFailure)?;
162
163 self.lock(|inner| inner.dealer.set(dealer))
164 .map_err(|_| DealerError::CouldNotSetDealer)?;
165
166 Ok(())
167 }
168
169 pub async fn close(&self) {
170 if let Some(dealer) = self.lock(|inner| inner.dealer.take()) {
171 dealer.close().await
172 }
173 }
174}