use crate::rpc::{Error, Request, Result, RpcParams, Subscribe};
use jsonrpsee::{
client_transport::ws::{Url, WsTransportClientBuilder},
core::{
client::{Client, ClientBuilder, ClientT, Error as JsonrpseeError, SubscriptionClientT},
traits::ToRpcParams,
},
};
use serde::de::DeserializeOwned;
use serde_json::value::RawValue;
use std::sync::Arc;
pub use subscription::SubscriptionWrapper;
mod subscription;
#[derive(Clone)]
pub struct JsonrpseeClient {
inner: Arc<Client>,
}
impl JsonrpseeClient {
pub async fn with_default_url() -> Result<Self> {
Self::new("ws://127.0.0.1:9944").await
}
pub async fn new(url: &str) -> Result<Self> {
let parsed_url: Url = url.parse().map_err(|e| Error::Client(Box::new(e)))?;
let (tx, rx) = WsTransportClientBuilder::default()
.build(parsed_url)
.await
.map_err(|e| Error::Client(Box::new(e)))?;
let client = ClientBuilder::default()
.max_buffer_capacity_per_subscription(4096)
.build_with_tokio(tx, rx);
Ok(Self { inner: Arc::new(client) })
}
pub async fn new_with_port(address: &str, port: u32) -> Result<Self> {
let url = format!("{address}:{port:?}");
Self::new(&url).await
}
pub fn new_with_client(client: Client) -> Self {
let inner = Arc::new(client);
Self { inner }
}
}
impl JsonrpseeClient {
pub fn is_connected(&self) -> bool {
self.inner.is_connected()
}
#[deprecated = "Use on_disconnect instead."]
pub async fn disconnect_reason(&self) -> JsonrpseeError {
self.inner.on_disconnect().await
}
pub async fn on_disconnect(&self) {
self.inner.on_disconnect().await;
}
}
#[maybe_async::async_impl(?Send)]
impl Request for JsonrpseeClient {
async fn request<R: DeserializeOwned>(&self, method: &str, params: RpcParams) -> Result<R> {
self.inner
.request(method, RpcParamsWrapper(params))
.await
.map_err(|e| Error::Client(Box::new(e)))
}
}
#[maybe_async::async_impl(?Send)]
impl Subscribe for JsonrpseeClient {
type Subscription<Notification>
= SubscriptionWrapper<Notification>
where
Notification: DeserializeOwned;
async fn subscribe<Notification: DeserializeOwned>(
&self,
sub: &str,
params: RpcParams,
unsub: &str,
) -> Result<Self::Subscription<Notification>> {
self.inner
.subscribe(sub, RpcParamsWrapper(params), unsub)
.await
.map(|sub| sub.into())
.map_err(|e| Error::Client(Box::new(e)))
}
}
struct RpcParamsWrapper(RpcParams);
impl ToRpcParams for RpcParamsWrapper {
fn to_rpc_params(self) -> core::result::Result<Option<Box<RawValue>>, serde_json::Error> {
if let Some(json) = self.0.build() {
RawValue::from_string(json).map(Some)
} else {
Ok(None)
}
}
}