async_jsonrpc_client/ws_client/
mod.rs1mod builder;
2mod manager;
3mod task;
4#[cfg(test)]
5mod tests;
6
7use std::{
8 pin::Pin,
9 task::{Context, Poll},
10 time::Duration,
11};
12
13use futures::{
14 channel::{mpsc, oneshot},
15 future,
16 sink::SinkExt,
17 stream::{Stream, StreamExt},
18};
19use jsonrpc_types::*;
20
21pub use self::builder::WsClientBuilder;
22use crate::{
23 error::WsClientError,
24 transport::{BatchTransport, PubsubTransport, Transport},
25};
26
27pub(crate) enum ToBackTaskMessage {
29 Request {
30 method: String,
31 params: Option<Params>,
32 send_back: oneshot::Sender<Result<Output, WsClientError>>,
34 },
35 BatchRequest {
36 batch: Vec<(String, Option<Params>)>,
37 send_back: oneshot::Sender<Result<Vec<Output>, WsClientError>>,
39 },
40 Subscribe {
41 subscribe_method: String,
42 params: Option<Params>,
43 send_back: oneshot::Sender<Result<(Id, mpsc::Receiver<SubscriptionNotification>), WsClientError>>,
47 },
48 Unsubscribe {
49 unsubscribe_method: String,
50 subscription_id: Id,
51 send_back: oneshot::Sender<Result<bool, WsClientError>>,
53 },
54}
55
56#[derive(Clone)]
58pub struct WsClient {
59 to_back: mpsc::Sender<ToBackTaskMessage>,
60 timeout: Option<Duration>,
62}
63
64impl WsClient {
65 pub async fn new(url: impl Into<String>) -> Result<Self, WsClientError> {
67 WsClientBuilder::new()
68 .build(url)
69 .await
70 .map_err(WsClientError::WebSocket)
71 }
72
73 pub fn builder() -> WsClientBuilder {
77 WsClientBuilder::new()
78 }
79
80 async fn send_request(&self, method: impl Into<String>, params: Option<Params>) -> Result<Output, WsClientError> {
82 let method = method.into();
83 log::debug!("[frontend] Send request: method={}, params={:?}", method, params);
84
85 let (tx, rx) = oneshot::channel();
86 self.to_back
87 .clone()
88 .send(ToBackTaskMessage::Request {
89 method,
90 params,
91 send_back: tx,
92 })
93 .await
94 .map_err(|_| WsClientError::InternalChannel)?;
95
96 let res = if let Some(duration) = self.timeout {
97 #[cfg(feature = "ws-async-std")]
98 let timeout = async_std::task::sleep(duration);
99 #[cfg(feature = "ws-tokio")]
100 let timeout = tokio::time::sleep(duration);
101 futures::pin_mut!(rx, timeout);
102 match future::select(rx, timeout).await {
103 future::Either::Left((response, _)) => response,
104 future::Either::Right((_, _)) => return Err(WsClientError::RequestTimeout),
105 }
106 } else {
107 rx.await
108 };
109 match res {
110 Ok(Ok(output)) => Ok(output),
111 Ok(Err(err)) => Err(err),
112 Err(_) => Err(WsClientError::InternalChannel),
113 }
114 }
115
116 async fn send_request_batch<I, M>(&self, batch: I) -> Result<Vec<Output>, WsClientError>
118 where
119 I: IntoIterator<Item = (M, Option<Params>)>,
120 M: Into<String>,
121 {
122 let batch = batch
123 .into_iter()
124 .map(|(method, params)| (method.into(), params))
125 .collect::<Vec<_>>();
126 log::debug!("[frontend] Send a batch of requests: {:?}", batch);
127
128 let (tx, rx) = oneshot::channel();
129 self.to_back
130 .clone()
131 .send(ToBackTaskMessage::BatchRequest { batch, send_back: tx })
132 .await
133 .map_err(|_| WsClientError::InternalChannel)?;
134
135 let res = if let Some(duration) = self.timeout {
136 #[cfg(feature = "ws-async-std")]
137 let timeout = async_std::task::sleep(duration);
138 #[cfg(feature = "ws-tokio")]
139 let timeout = tokio::time::sleep(duration);
140 futures::pin_mut!(rx, timeout);
141 match future::select(rx, timeout).await {
142 future::Either::Left((response, _)) => response,
143 future::Either::Right((_, _)) => return Err(WsClientError::RequestTimeout),
144 }
145 } else {
146 rx.await
147 };
148 match res {
149 Ok(Ok(outputs)) => Ok(outputs),
150 Ok(Err(err)) => Err(err),
151 Err(_) => Err(WsClientError::InternalChannel),
152 }
153 }
154
155 async fn send_subscribe(
160 &self,
161 subscribe_method: impl Into<String>,
162 params: Option<Params>,
163 ) -> Result<WsSubscription<SubscriptionNotification>, WsClientError> {
164 let subscribe_method = subscribe_method.into();
165 log::debug!("[frontend] Subscribe: method={}, params={:?}", subscribe_method, params);
166 let (tx, rx) = oneshot::channel();
167 self.to_back
168 .clone()
169 .send(ToBackTaskMessage::Subscribe {
170 subscribe_method,
171 params,
172 send_back: tx,
173 })
174 .await
175 .map_err(|_| WsClientError::InternalChannel)?;
176
177 let res = if let Some(duration) = self.timeout {
178 #[cfg(feature = "ws-async-std")]
179 let timeout = async_std::task::sleep(duration);
180 #[cfg(feature = "ws-tokio")]
181 let timeout = tokio::time::sleep(duration);
182 futures::pin_mut!(rx, timeout);
183 match future::select(rx, timeout).await {
184 future::Either::Left((response, _)) => response,
185 future::Either::Right((_, _)) => return Err(WsClientError::RequestTimeout),
186 }
187 } else {
188 rx.await
189 };
190 match res {
191 Ok(Ok((id, notification_rx))) => Ok(WsSubscription { id, notification_rx }),
192 Ok(Err(err)) => Err(err),
193 Err(_) => Err(WsClientError::InternalChannel),
194 }
195 }
196
197 async fn send_unsubscribe(
199 &self,
200 unsubscribe_method: impl Into<String>,
201 subscription_id: Id,
202 ) -> Result<bool, WsClientError> {
203 let unsubscribe_method = unsubscribe_method.into();
204 log::debug!(
205 "[frontend] unsubscribe: method={}, id={:?}",
206 unsubscribe_method,
207 subscription_id
208 );
209 let (tx, rx) = oneshot::channel();
210 self.to_back
211 .clone()
212 .send(ToBackTaskMessage::Unsubscribe {
213 unsubscribe_method,
214 subscription_id,
215 send_back: tx,
216 })
217 .await
218 .map_err(|_| WsClientError::InternalChannel)?;
219
220 let res = if let Some(duration) = self.timeout {
221 #[cfg(feature = "ws-async-std")]
222 let timeout = async_std::task::sleep(duration);
223 #[cfg(feature = "ws-tokio")]
224 let timeout = tokio::time::sleep(duration);
225 futures::pin_mut!(rx, timeout);
226 match future::select(rx, timeout).await {
227 future::Either::Left((response, _)) => response,
228 future::Either::Right((_, _)) => return Err(WsClientError::RequestTimeout),
229 }
230 } else {
231 rx.await
232 };
233
234 match res {
235 Ok(Ok(res)) => Ok(res),
236 Ok(Err(err)) => Err(err),
237 Err(_) => Err(WsClientError::InternalChannel),
238 }
239 }
240}
241
242pub struct WsSubscription<Notif> {
244 pub id: Id,
246 notification_rx: mpsc::Receiver<Notif>,
248}
249
250impl<Notif> WsSubscription<Notif> {
251 pub async fn next(&mut self) -> Option<Notif> {
255 self.notification_rx.next().await
256 }
257}
258
259impl<Notif> Stream for WsSubscription<Notif> {
260 type Item = Notif;
261
262 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
263 mpsc::Receiver::<Notif>::poll_next(Pin::new(&mut self.notification_rx), cx)
264 }
265}
266
267#[async_trait::async_trait]
268impl Transport for WsClient {
269 type Error = WsClientError;
270
271 async fn request<M>(&self, method: M, params: Option<Params>) -> Result<Output, Self::Error>
272 where
273 M: Into<String> + Send,
274 {
275 self.send_request(method, params).await
276 }
277}
278
279#[async_trait::async_trait]
280impl BatchTransport for WsClient {
281 async fn request_batch<I, M>(&self, batch: I) -> Result<Vec<Output>, <Self as Transport>::Error>
282 where
283 I: IntoIterator<Item = (M, Option<Params>)> + Send,
284 I::IntoIter: Send,
285 M: Into<String>,
286 {
287 self.send_request_batch(batch).await
288 }
289}
290
291#[async_trait::async_trait]
292impl PubsubTransport for WsClient {
293 type NotificationStream = WsSubscription<SubscriptionNotification>;
294
295 async fn subscribe<M>(
296 &self,
297 subscribe_method: M,
298 params: Option<Params>,
299 ) -> Result<(Id, Self::NotificationStream), <Self as Transport>::Error>
300 where
301 M: Into<String> + Send,
302 {
303 let notification_stream = self.send_subscribe(subscribe_method, params).await?;
304 Ok((notification_stream.id.clone(), notification_stream))
305 }
306
307 async fn unsubscribe<M>(
308 &self,
309 unsubscribe_method: M,
310 subscription_id: Id,
311 ) -> Result<bool, <Self as Transport>::Error>
312 where
313 M: Into<String> + Send,
314 {
315 self.send_unsubscribe(unsubscribe_method, subscription_id).await
316 }
317}