use super::{RawRpcSubscription, RpcClientT};
use crate::Error;
use futures::{Stream, StreamExt};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::value::RawValue;
use std::{pin::Pin, sync::Arc, task::Poll};
#[derive(Clone)]
pub struct RpcClient {
client: Arc<dyn RpcClientT>,
}
impl RpcClient {
#[cfg(feature = "jsonrpsee")]
#[cfg_attr(docsrs, doc(cfg(feature = "jsonrpsee")))]
pub async fn from_url<U: AsRef<str>>(url: U) -> Result<Self, Error> {
crate::utils::validate_url_is_secure(url.as_ref())?;
RpcClient::from_insecure_url(url).await
}
#[cfg(feature = "jsonrpsee")]
pub async fn from_insecure_url<U: AsRef<str>>(url: U) -> Result<Self, Error> {
let client = super::jsonrpsee_client(url.as_ref())
.await
.map_err(|e| Error::Client(Box::new(e)))?;
Ok(Self::new(client))
}
pub fn new<R: RpcClientT>(client: R) -> Self {
RpcClient {
client: Arc::new(client),
}
}
pub async fn request<Res: DeserializeOwned>(
&self,
method: &str,
params: RpcParams,
) -> Result<Res, Error> {
let res = self.client.request_raw(method, params.build()).await?;
let val = serde_json::from_str(res.get()).map_err(Error::Deserialization)?;
Ok(val)
}
pub async fn subscribe<Res: DeserializeOwned>(
&self,
sub: &str,
params: RpcParams,
unsub: &str,
) -> Result<RpcSubscription<Res>, Error> {
let sub = self
.client
.subscribe_raw(sub, params.build(), unsub)
.await?;
Ok(RpcSubscription::new(sub))
}
}
impl<C: RpcClientT> From<C> for RpcClient {
fn from(client: C) -> Self {
RpcClient::new(client)
}
}
impl std::fmt::Debug for RpcClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("RpcClient").finish()
}
}
impl std::ops::Deref for RpcClient {
type Target = dyn RpcClientT;
fn deref(&self) -> &Self::Target {
&*self.client
}
}
#[macro_export]
macro_rules! rpc_params {
($($p:expr), *) => {{
#[allow(unused_mut)]
let mut params = $crate::client::RpcParams::new();
$(
params.push($p).expect("values passed to rpc_params! must be serializable to JSON");
)*
params
}}
}
pub use rpc_params;
#[derive(Debug, Clone, Default)]
pub struct RpcParams(Vec<u8>);
impl RpcParams {
pub fn new() -> Self {
Self(Vec::new())
}
pub fn push<P: Serialize>(&mut self, param: P) -> Result<(), Error> {
if self.0.is_empty() {
self.0.push(b'[');
} else {
self.0.push(b',')
}
serde_json::to_writer(&mut self.0, ¶m).map_err(Error::Serialization)?;
Ok(())
}
pub fn build(mut self) -> Option<Box<RawValue>> {
if self.0.is_empty() {
None
} else {
self.0.push(b']');
let s = unsafe { String::from_utf8_unchecked(self.0) };
Some(RawValue::from_string(s).expect("Should be valid JSON"))
}
}
}
pub struct RpcSubscription<Res> {
inner: RawRpcSubscription,
_marker: std::marker::PhantomData<Res>,
}
impl<Res> std::fmt::Debug for RpcSubscription<Res> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RpcSubscription")
.field("inner", &"RawRpcSubscription")
.field("_marker", &self._marker)
.finish()
}
}
impl<Res> RpcSubscription<Res> {
pub fn new(inner: RawRpcSubscription) -> Self {
Self {
inner,
_marker: std::marker::PhantomData,
}
}
pub fn subscription_id(&self) -> Option<&str> {
self.inner.id.as_deref()
}
}
impl<Res: DeserializeOwned> RpcSubscription<Res> {
pub async fn next(&mut self) -> Option<Result<Res, Error>> {
StreamExt::next(self).await
}
}
impl<Res> std::marker::Unpin for RpcSubscription<Res> {}
impl<Res: DeserializeOwned> Stream for RpcSubscription<Res> {
type Item = Result<Res, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let res = futures::ready!(self.inner.stream.poll_next_unpin(cx));
let res = res.map(|r| {
r.and_then(|raw_val| {
serde_json::from_str(raw_val.get()).map_err(Error::Deserialization)
})
});
Poll::Ready(res)
}
}