async_jsonrpc_client/ws_client/
mod.rs

1mod builder;
2mod manager;
3mod task;
4#[cfg(test)]
5mod tests;
6
7use std::{
8    pin::Pin,
9    task::{Context, Poll},
10    time::Duration,
11};
12
13use futures::{
14    channel::{mpsc, oneshot},
15    future,
16    sink::SinkExt,
17    stream::{Stream, StreamExt},
18};
19use jsonrpc_types::*;
20
21pub use self::builder::WsClientBuilder;
22use crate::{
23    error::WsClientError,
24    transport::{BatchTransport, PubsubTransport, Transport},
25};
26
27/// Message that the client can send to the background task.
28pub(crate) enum ToBackTaskMessage {
29    Request {
30        method: String,
31        params: Option<Params>,
32        /// One-shot channel where to send back the response of the request.
33        send_back: oneshot::Sender<Result<Output, WsClientError>>,
34    },
35    BatchRequest {
36        batch: Vec<(String, Option<Params>)>,
37        /// One-shot channel where to send back the response of the batch request.
38        send_back: oneshot::Sender<Result<Vec<Output>, WsClientError>>,
39    },
40    Subscribe {
41        subscribe_method: String,
42        params: Option<Params>,
43        /// One-shot channel where to send back the response (subscription id) and a `Receiver`
44        /// that will receive subscription notification when we get a response (subscription id)
45        /// from the server about the subscription.
46        send_back: oneshot::Sender<Result<(Id, mpsc::Receiver<SubscriptionNotification>), WsClientError>>,
47    },
48    Unsubscribe {
49        unsubscribe_method: String,
50        subscription_id: Id,
51        /// One-shot channel where to send back the response of the unsubscribe request.
52        send_back: oneshot::Sender<Result<bool, WsClientError>>,
53    },
54}
55
56/// WebSocket JSON-RPC client
57#[derive(Clone)]
58pub struct WsClient {
59    to_back: mpsc::Sender<ToBackTaskMessage>,
60    /// Request timeout.
61    timeout: Option<Duration>,
62}
63
64impl WsClient {
65    /// Creates a new WebSocket JSON-RPC client.
66    pub async fn new(url: impl Into<String>) -> Result<Self, WsClientError> {
67        WsClientBuilder::new()
68            .build(url)
69            .await
70            .map_err(WsClientError::WebSocket)
71    }
72
73    /// Creates a `WsClientBuilder` to configure a `WsClient`.
74    ///
75    /// This is the same as `WsClientBuilder::new()`.
76    pub fn builder() -> WsClientBuilder {
77        WsClientBuilder::new()
78    }
79
80    /// Sends a `method call` request to the server.
81    async fn send_request(&self, method: impl Into<String>, params: Option<Params>) -> Result<Output, WsClientError> {
82        let method = method.into();
83        log::debug!("[frontend] Send request: method={}, params={:?}", method, params);
84
85        let (tx, rx) = oneshot::channel();
86        self.to_back
87            .clone()
88            .send(ToBackTaskMessage::Request {
89                method,
90                params,
91                send_back: tx,
92            })
93            .await
94            .map_err(|_| WsClientError::InternalChannel)?;
95
96        let res = if let Some(duration) = self.timeout {
97            #[cfg(feature = "ws-async-std")]
98            let timeout = async_std::task::sleep(duration);
99            #[cfg(feature = "ws-tokio")]
100            let timeout = tokio::time::sleep(duration);
101            futures::pin_mut!(rx, timeout);
102            match future::select(rx, timeout).await {
103                future::Either::Left((response, _)) => response,
104                future::Either::Right((_, _)) => return Err(WsClientError::RequestTimeout),
105            }
106        } else {
107            rx.await
108        };
109        match res {
110            Ok(Ok(output)) => Ok(output),
111            Ok(Err(err)) => Err(err),
112            Err(_) => Err(WsClientError::InternalChannel),
113        }
114    }
115
116    /// Sends a batch of `method call` requests to the server.
117    async fn send_request_batch<I, M>(&self, batch: I) -> Result<Vec<Output>, WsClientError>
118    where
119        I: IntoIterator<Item = (M, Option<Params>)>,
120        M: Into<String>,
121    {
122        let batch = batch
123            .into_iter()
124            .map(|(method, params)| (method.into(), params))
125            .collect::<Vec<_>>();
126        log::debug!("[frontend] Send a batch of requests: {:?}", batch);
127
128        let (tx, rx) = oneshot::channel();
129        self.to_back
130            .clone()
131            .send(ToBackTaskMessage::BatchRequest { batch, send_back: tx })
132            .await
133            .map_err(|_| WsClientError::InternalChannel)?;
134
135        let res = if let Some(duration) = self.timeout {
136            #[cfg(feature = "ws-async-std")]
137            let timeout = async_std::task::sleep(duration);
138            #[cfg(feature = "ws-tokio")]
139            let timeout = tokio::time::sleep(duration);
140            futures::pin_mut!(rx, timeout);
141            match future::select(rx, timeout).await {
142                future::Either::Left((response, _)) => response,
143                future::Either::Right((_, _)) => return Err(WsClientError::RequestTimeout),
144            }
145        } else {
146            rx.await
147        };
148        match res {
149            Ok(Ok(outputs)) => Ok(outputs),
150            Ok(Err(err)) => Err(err),
151            Err(_) => Err(WsClientError::InternalChannel),
152        }
153    }
154
155    /// Sends a subscribe request to the server.
156    ///
157    /// `subscribe_method` and `params` are used to ask for the subscription towards the server.
158    /// `unsubscribe_method` is used to close the subscription.
159    async fn send_subscribe(
160        &self,
161        subscribe_method: impl Into<String>,
162        params: Option<Params>,
163    ) -> Result<WsSubscription<SubscriptionNotification>, WsClientError> {
164        let subscribe_method = subscribe_method.into();
165        log::debug!("[frontend] Subscribe: method={}, params={:?}", subscribe_method, params);
166        let (tx, rx) = oneshot::channel();
167        self.to_back
168            .clone()
169            .send(ToBackTaskMessage::Subscribe {
170                subscribe_method,
171                params,
172                send_back: tx,
173            })
174            .await
175            .map_err(|_| WsClientError::InternalChannel)?;
176
177        let res = if let Some(duration) = self.timeout {
178            #[cfg(feature = "ws-async-std")]
179            let timeout = async_std::task::sleep(duration);
180            #[cfg(feature = "ws-tokio")]
181            let timeout = tokio::time::sleep(duration);
182            futures::pin_mut!(rx, timeout);
183            match future::select(rx, timeout).await {
184                future::Either::Left((response, _)) => response,
185                future::Either::Right((_, _)) => return Err(WsClientError::RequestTimeout),
186            }
187        } else {
188            rx.await
189        };
190        match res {
191            Ok(Ok((id, notification_rx))) => Ok(WsSubscription { id, notification_rx }),
192            Ok(Err(err)) => Err(err),
193            Err(_) => Err(WsClientError::InternalChannel),
194        }
195    }
196
197    /// Sends an unsubscribe request to the server.
198    async fn send_unsubscribe(
199        &self,
200        unsubscribe_method: impl Into<String>,
201        subscription_id: Id,
202    ) -> Result<bool, WsClientError> {
203        let unsubscribe_method = unsubscribe_method.into();
204        log::debug!(
205            "[frontend] unsubscribe: method={}, id={:?}",
206            unsubscribe_method,
207            subscription_id
208        );
209        let (tx, rx) = oneshot::channel();
210        self.to_back
211            .clone()
212            .send(ToBackTaskMessage::Unsubscribe {
213                unsubscribe_method,
214                subscription_id,
215                send_back: tx,
216            })
217            .await
218            .map_err(|_| WsClientError::InternalChannel)?;
219
220        let res = if let Some(duration) = self.timeout {
221            #[cfg(feature = "ws-async-std")]
222            let timeout = async_std::task::sleep(duration);
223            #[cfg(feature = "ws-tokio")]
224            let timeout = tokio::time::sleep(duration);
225            futures::pin_mut!(rx, timeout);
226            match future::select(rx, timeout).await {
227                future::Either::Left((response, _)) => response,
228                future::Either::Right((_, _)) => return Err(WsClientError::RequestTimeout),
229            }
230        } else {
231            rx.await
232        };
233
234        match res {
235            Ok(Ok(res)) => Ok(res),
236            Ok(Err(err)) => Err(err),
237            Err(_) => Err(WsClientError::InternalChannel),
238        }
239    }
240}
241
242/// Active subscription on a websocket client.
243pub struct WsSubscription<Notif> {
244    /// Subscription ID.
245    pub id: Id,
246    /// Channel from which we receive notifications from the server.
247    notification_rx: mpsc::Receiver<Notif>,
248}
249
250impl<Notif> WsSubscription<Notif> {
251    /// Returns the next notification from the websocket stream.
252    ///
253    /// Ignore any malformed packet.
254    pub async fn next(&mut self) -> Option<Notif> {
255        self.notification_rx.next().await
256    }
257}
258
259impl<Notif> Stream for WsSubscription<Notif> {
260    type Item = Notif;
261
262    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
263        mpsc::Receiver::<Notif>::poll_next(Pin::new(&mut self.notification_rx), cx)
264    }
265}
266
267#[async_trait::async_trait]
268impl Transport for WsClient {
269    type Error = WsClientError;
270
271    async fn request<M>(&self, method: M, params: Option<Params>) -> Result<Output, Self::Error>
272    where
273        M: Into<String> + Send,
274    {
275        self.send_request(method, params).await
276    }
277}
278
279#[async_trait::async_trait]
280impl BatchTransport for WsClient {
281    async fn request_batch<I, M>(&self, batch: I) -> Result<Vec<Output>, <Self as Transport>::Error>
282    where
283        I: IntoIterator<Item = (M, Option<Params>)> + Send,
284        I::IntoIter: Send,
285        M: Into<String>,
286    {
287        self.send_request_batch(batch).await
288    }
289}
290
291#[async_trait::async_trait]
292impl PubsubTransport for WsClient {
293    type NotificationStream = WsSubscription<SubscriptionNotification>;
294
295    async fn subscribe<M>(
296        &self,
297        subscribe_method: M,
298        params: Option<Params>,
299    ) -> Result<(Id, Self::NotificationStream), <Self as Transport>::Error>
300    where
301        M: Into<String> + Send,
302    {
303        let notification_stream = self.send_subscribe(subscribe_method, params).await?;
304        Ok((notification_stream.id.clone(), notification_stream))
305    }
306
307    async fn unsubscribe<M>(
308        &self,
309        unsubscribe_method: M,
310        subscription_id: Id,
311    ) -> Result<bool, <Self as Transport>::Error>
312    where
313        M: Into<String> + Send,
314    {
315        self.send_unsubscribe(unsubscribe_method, subscription_id).await
316    }
317}