#![allow(missing_docs)]
use std::fmt::{self, Debug};
use async_trait::async_trait;
use futures_channel::{mpsc, oneshot};
use primitive_types::U256;
use serde::{de::DeserializeOwned, Serialize};
use serde_json::value::{to_raw_value, RawValue};
pub use error::*;
use manager::{RequestManager, SharedChannelMap};
pub use types::ConnectionDetails;
use types::*;
#[cfg(not(target_arch = "wasm32"))]
use super::Authorization;
use crate::neo_clients::{JsonRpcProvider, ProviderError, PubsubClient, RpcClient};
mod backend;
mod manager;
mod error;
mod types;
#[derive(Clone)]
pub struct WsClient {
instructions: mpsc::UnboundedSender<Instruction>,
channel_map: SharedChannelMap,
}
impl WsClient {
pub async fn connect(conn: impl Into<ConnectionDetails>) -> Result<Self, WsClientError> {
let (man, this) = RequestManager::connect(conn.into()).await?;
man.spawn();
Ok(this)
}
pub async fn connect_with_reconnects(
conn: impl Into<ConnectionDetails>,
reconnects: usize,
) -> Result<Self, WsClientError> {
let (man, this) = RequestManager::connect_with_reconnects(conn.into(), reconnects).await?;
man.spawn();
Ok(this)
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn connect_with_config(
conn: impl Into<ConnectionDetails>,
config: impl Into<WebSocketConfig>,
) -> Result<Self, WsClientError> {
let (man, this) = RequestManager::connect_with_config(conn.into(), config.into()).await?;
man.spawn();
Ok(this)
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn connect_with_config_and_reconnects(
conn: impl Into<ConnectionDetails>,
config: impl Into<WebSocketConfig>,
reconnects: usize,
) -> Result<Self, WsClientError> {
let (man, this) = RequestManager::connect_with_config_and_reconnects(
conn.into(),
config.into(),
reconnects,
)
.await?;
man.spawn();
Ok(this)
}
#[tracing::instrument(skip(self, params), err)]
async fn make_request<R>(&self, method: &str, params: Box<RawValue>) -> Result<R, WsClientError>
where
R: DeserializeOwned,
{
let (tx, rx) = oneshot::channel();
let instruction = Instruction::Request { method: method.to_owned(), params, sender: tx };
self.instructions
.unbounded_send(instruction)
.map_err(|_| WsClientError::UnexpectedClose)?;
let res = rx.await.map_err(|_| WsClientError::UnexpectedClose)??;
tracing::trace!(len = res.get().len(), "Received response from request manager");
let resp = serde_json::from_str(res.get())?;
tracing::trace!("Deserialization success");
Ok(resp)
}
}
impl fmt::Debug for WsClient {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Ws").finish_non_exhaustive()
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(? Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl JsonRpcProvider for WsClient {
type Error = WsClientError;
async fn fetch<T, R>(&self, method: &str, params: T) -> Result<R, WsClientError>
where
T: Debug + Serialize + Send + Sync,
R: DeserializeOwned,
{
let params = to_raw_value(¶ms)?;
let res = self.make_request(method, params).await?;
Ok(res)
}
}
impl PubsubClient for WsClient {
type NotificationStream = mpsc::UnboundedReceiver<Box<RawValue>>;
fn subscribe<T: Into<U256>>(&self, id: T) -> Result<Self::NotificationStream, WsClientError> {
let id = id.into();
self.channel_map
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(&id)
.ok_or(WsClientError::UnknownSubscription(id))
}
fn unsubscribe<T: Into<U256>>(&self, id: T) -> Result<(), WsClientError> {
self.instructions
.unbounded_send(Instruction::Unsubscribe { id: id.into() })
.map_err(|_| WsClientError::UnexpectedClose)
}
}
impl RpcClient<WsClient> {
pub async fn connect(url: impl Into<ConnectionDetails>) -> Result<Self, ProviderError> {
let ws = WsClient::connect(url).await?;
Ok(Self::new(ws))
}
pub async fn connect_with_reconnects(
url: impl Into<ConnectionDetails>,
reconnects: usize,
) -> Result<Self, ProviderError> {
let ws = WsClient::connect_with_reconnects(url, reconnects).await?;
Ok(Self::new(ws))
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn connect_with_auth(
url: impl AsRef<str>,
auth: Authorization,
) -> Result<Self, ProviderError> {
let conn = ConnectionDetails::new(url, Some(auth));
let ws = WsClient::connect(conn).await?;
Ok(Self::new(ws))
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn connect_with_auth_and_reconnects(
url: impl AsRef<str>,
auth: Authorization,
reconnects: usize,
) -> Result<Self, ProviderError> {
let conn = ConnectionDetails::new(url, Some(auth));
let ws = WsClient::connect_with_reconnects(conn, reconnects).await?;
Ok(Self::new(ws))
}
}