mod builder;
mod manager;
mod task;
#[cfg(test)]
mod tests;
use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use futures::{
channel::{mpsc, oneshot},
future,
sink::SinkExt,
stream::{Stream, StreamExt},
};
use jsonrpc_types::*;
pub use self::builder::WsClientBuilder;
use crate::{
error::WsClientError,
transport::{BatchTransport, PubsubTransport, Transport},
};
pub(crate) enum ToBackTaskMessage {
Request {
method: String,
params: Option<Params>,
send_back: oneshot::Sender<Result<Output, WsClientError>>,
},
BatchRequest {
batch: Vec<(String, Option<Params>)>,
send_back: oneshot::Sender<Result<Vec<Output>, WsClientError>>,
},
Subscribe {
subscribe_method: String,
params: Option<Params>,
send_back: oneshot::Sender<Result<(Id, mpsc::Receiver<SubscriptionNotification>), WsClientError>>,
},
Unsubscribe {
unsubscribe_method: String,
subscription_id: Id,
send_back: oneshot::Sender<Result<bool, WsClientError>>,
},
}
#[derive(Clone)]
pub struct WsClient {
to_back: mpsc::Sender<ToBackTaskMessage>,
timeout: Option<Duration>,
}
impl WsClient {
pub async fn new(url: impl Into<String>) -> Result<Self, WsClientError> {
WsClientBuilder::new()
.build(url)
.await
.map_err(WsClientError::WebSocket)
}
pub fn builder() -> WsClientBuilder {
WsClientBuilder::new()
}
async fn send_request(&self, method: impl Into<String>, params: Option<Params>) -> Result<Output, WsClientError> {
let method = method.into();
log::debug!("[frontend] Send request: method={}, params={:?}", method, params);
let (tx, rx) = oneshot::channel();
self.to_back
.clone()
.send(ToBackTaskMessage::Request {
method,
params,
send_back: tx,
})
.await
.map_err(|_| WsClientError::InternalChannel)?;
let res = if let Some(duration) = self.timeout {
#[cfg(feature = "ws-async-std")]
let timeout = async_std::task::sleep(duration);
#[cfg(feature = "ws-tokio")]
let timeout = tokio::time::sleep(duration);
futures::pin_mut!(rx, timeout);
match future::select(rx, timeout).await {
future::Either::Left((response, _)) => response,
future::Either::Right((_, _)) => return Err(WsClientError::RequestTimeout),
}
} else {
rx.await
};
match res {
Ok(Ok(output)) => Ok(output),
Ok(Err(err)) => Err(err),
Err(_) => Err(WsClientError::InternalChannel),
}
}
async fn send_request_batch<I, M>(&self, batch: I) -> Result<Vec<Output>, WsClientError>
where
I: IntoIterator<Item = (M, Option<Params>)>,
M: Into<String>,
{
let batch = batch
.into_iter()
.map(|(method, params)| (method.into(), params))
.collect::<Vec<_>>();
log::debug!("[frontend] Send a batch of requests: {:?}", batch);
let (tx, rx) = oneshot::channel();
self.to_back
.clone()
.send(ToBackTaskMessage::BatchRequest { batch, send_back: tx })
.await
.map_err(|_| WsClientError::InternalChannel)?;
let res = if let Some(duration) = self.timeout {
#[cfg(feature = "ws-async-std")]
let timeout = async_std::task::sleep(duration);
#[cfg(feature = "ws-tokio")]
let timeout = tokio::time::sleep(duration);
futures::pin_mut!(rx, timeout);
match future::select(rx, timeout).await {
future::Either::Left((response, _)) => response,
future::Either::Right((_, _)) => return Err(WsClientError::RequestTimeout),
}
} else {
rx.await
};
match res {
Ok(Ok(outputs)) => Ok(outputs),
Ok(Err(err)) => Err(err),
Err(_) => Err(WsClientError::InternalChannel),
}
}
async fn send_subscribe(
&self,
subscribe_method: impl Into<String>,
params: Option<Params>,
) -> Result<WsSubscription<SubscriptionNotification>, WsClientError> {
let subscribe_method = subscribe_method.into();
log::debug!("[frontend] Subscribe: method={}, params={:?}", subscribe_method, params);
let (tx, rx) = oneshot::channel();
self.to_back
.clone()
.send(ToBackTaskMessage::Subscribe {
subscribe_method,
params,
send_back: tx,
})
.await
.map_err(|_| WsClientError::InternalChannel)?;
let res = if let Some(duration) = self.timeout {
#[cfg(feature = "ws-async-std")]
let timeout = async_std::task::sleep(duration);
#[cfg(feature = "ws-tokio")]
let timeout = tokio::time::sleep(duration);
futures::pin_mut!(rx, timeout);
match future::select(rx, timeout).await {
future::Either::Left((response, _)) => response,
future::Either::Right((_, _)) => return Err(WsClientError::RequestTimeout),
}
} else {
rx.await
};
match res {
Ok(Ok((id, notification_rx))) => Ok(WsSubscription { id, notification_rx }),
Ok(Err(err)) => Err(err),
Err(_) => Err(WsClientError::InternalChannel),
}
}
async fn send_unsubscribe(
&self,
unsubscribe_method: impl Into<String>,
subscription_id: Id,
) -> Result<bool, WsClientError> {
let unsubscribe_method = unsubscribe_method.into();
log::debug!(
"[frontend] unsubscribe: method={}, id={:?}",
unsubscribe_method,
subscription_id
);
let (tx, rx) = oneshot::channel();
self.to_back
.clone()
.send(ToBackTaskMessage::Unsubscribe {
unsubscribe_method,
subscription_id,
send_back: tx,
})
.await
.map_err(|_| WsClientError::InternalChannel)?;
let res = if let Some(duration) = self.timeout {
#[cfg(feature = "ws-async-std")]
let timeout = async_std::task::sleep(duration);
#[cfg(feature = "ws-tokio")]
let timeout = tokio::time::sleep(duration);
futures::pin_mut!(rx, timeout);
match future::select(rx, timeout).await {
future::Either::Left((response, _)) => response,
future::Either::Right((_, _)) => return Err(WsClientError::RequestTimeout),
}
} else {
rx.await
};
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
Err(_) => Err(WsClientError::InternalChannel),
}
}
}
pub struct WsSubscription<Notif> {
pub id: Id,
notification_rx: mpsc::Receiver<Notif>,
}
impl<Notif> WsSubscription<Notif> {
pub async fn next(&mut self) -> Option<Notif> {
self.notification_rx.next().await
}
}
impl<Notif> Stream for WsSubscription<Notif> {
type Item = Notif;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
mpsc::Receiver::<Notif>::poll_next(Pin::new(&mut self.notification_rx), cx)
}
}
#[async_trait::async_trait]
impl Transport for WsClient {
type Error = WsClientError;
async fn request<M>(&self, method: M, params: Option<Params>) -> Result<Output, Self::Error>
where
M: Into<String> + Send,
{
self.send_request(method, params).await
}
}
#[async_trait::async_trait]
impl BatchTransport for WsClient {
async fn request_batch<I, M>(&self, batch: I) -> Result<Vec<Output>, <Self as Transport>::Error>
where
I: IntoIterator<Item = (M, Option<Params>)> + Send,
I::IntoIter: Send,
M: Into<String>,
{
self.send_request_batch(batch).await
}
}
#[async_trait::async_trait]
impl PubsubTransport for WsClient {
type NotificationStream = WsSubscription<SubscriptionNotification>;
async fn subscribe<M>(
&self,
subscribe_method: M,
params: Option<Params>,
) -> Result<(Id, Self::NotificationStream), <Self as Transport>::Error>
where
M: Into<String> + Send,
{
let notification_stream = self.send_subscribe(subscribe_method, params).await?;
Ok((notification_stream.id.clone(), notification_stream))
}
async fn unsubscribe<M>(
&self,
unsubscribe_method: M,
subscription_id: Id,
) -> Result<bool, <Self as Transport>::Error>
where
M: Into<String> + Send,
{
self.send_unsubscribe(unsubscribe_method, subscription_id).await
}
}