cfg_async_client! {
pub mod async_client;
pub use async_client::{Client, ClientBuilder};
}
pub mod error;
pub use error::Error;
use std::fmt;
use std::future::Future;
use std::ops::{Deref, Range};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::task::{self, Poll};
use crate::params::BatchRequestBuilder;
use crate::traits::{ToJson, ToRpcParams};
use core::marker::PhantomData;
use futures_util::stream::{Stream, StreamExt};
use http::Extensions;
use jsonrpsee_types::{ErrorObject, Id, InvalidRequestId, SubscriptionId};
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::value::RawValue;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, oneshot};
#[derive(Debug, Clone)]
pub(crate) struct SubscriptionLagged(Arc<RwLock<bool>>);
pub type RawResponseOwned = RawResponse<'static>;
impl SubscriptionLagged {
pub(crate) fn new() -> Self {
Self(Arc::new(RwLock::new(false)))
}
pub(crate) fn set_lagged(&self) {
*self.0.write().expect("RwLock not poised; qed") = true;
}
pub(crate) fn has_lagged(&self) -> bool {
*self.0.read().expect("RwLock not poised; qed")
}
}
#[doc(hidden)]
pub mod __reexports {
pub use crate::traits::ToRpcParams;
pub use crate::params::ArrayParams;
}
pub trait ClientT {
fn notification<Params>(&self, method: &str, params: Params) -> impl Future<Output = Result<(), Error>> + Send
where
Params: ToRpcParams + Send;
fn request<R, Params>(&self, method: &str, params: Params) -> impl Future<Output = Result<R, Error>> + Send
where
R: DeserializeOwned,
Params: ToRpcParams + Send;
fn batch_request<'a, R>(
&self,
batch: BatchRequestBuilder<'a>,
) -> impl Future<Output = Result<BatchResponse<'a, R>, Error>> + Send
where
R: DeserializeOwned + fmt::Debug + 'a;
}
pub trait SubscriptionClientT: ClientT {
fn subscribe<'a, Notif, Params>(
&self,
subscribe_method: &'a str,
params: Params,
unsubscribe_method: &'a str,
) -> impl Future<Output = Result<Subscription<Notif>, Error>> + Send
where
Params: ToRpcParams + Send,
Notif: DeserializeOwned;
fn subscribe_to_method<Notif>(
&self,
method: &str,
) -> impl Future<Output = Result<Subscription<Notif>, Error>> + Send
where
Notif: DeserializeOwned;
}
#[cfg(target_arch = "wasm32")]
pub trait MaybeSend {}
#[cfg(not(target_arch = "wasm32"))]
pub trait MaybeSend: Send {}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Send> MaybeSend for T {}
#[cfg(target_arch = "wasm32")]
impl<T> MaybeSend for T {}
pub trait TransportSenderT: 'static {
type Error: std::error::Error + Send + Sync;
fn send(&mut self, msg: String) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend;
fn send_ping(&mut self) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
async { Ok(()) }
}
fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
async { Ok(()) }
}
}
#[derive(Debug, Clone)]
pub enum ReceivedMessage {
Text(String),
Bytes(Vec<u8>),
Pong,
}
pub trait TransportReceiverT: 'static {
type Error: std::error::Error + Send + Sync;
fn receive(&mut self) -> impl Future<Output = Result<ReceivedMessage, Self::Error>> + MaybeSend;
}
#[macro_export]
macro_rules! rpc_params {
($($param:expr),*) => {
{
let mut params = $crate::client::__reexports::ArrayParams::new();
$(
if let Err(err) = params.insert($param) {
panic!("Parameter `{}` cannot be serialized: {:?}", stringify!($param), err);
}
)*
params
}
};
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum SubscriptionKind {
Subscription(SubscriptionId<'static>),
Method(String),
}
#[derive(Debug, Copy, Clone)]
pub enum SubscriptionCloseReason {
ConnectionClosed,
Lagged,
}
#[derive(Debug)]
pub struct Subscription<Notif> {
is_closed: bool,
to_back: mpsc::Sender<FrontToBack>,
rx: SubscriptionReceiver,
kind: Option<SubscriptionKind>,
marker: PhantomData<Notif>,
}
impl<Notif> std::marker::Unpin for Subscription<Notif> {}
impl<Notif> Subscription<Notif> {
fn new(to_back: mpsc::Sender<FrontToBack>, rx: SubscriptionReceiver, kind: SubscriptionKind) -> Self {
Self { to_back, rx, kind: Some(kind), marker: PhantomData, is_closed: false }
}
pub fn kind(&self) -> &SubscriptionKind {
self.kind.as_ref().expect("only None after unsubscribe; qed")
}
pub async fn unsubscribe(mut self) -> Result<(), Error> {
let msg = match self.kind.take().expect("only None after unsubscribe; qed") {
SubscriptionKind::Method(notif) => FrontToBack::UnregisterNotification(notif),
SubscriptionKind::Subscription(sub_id) => FrontToBack::SubscriptionClosed(sub_id),
};
let _ = self.to_back.send(msg).await;
while self.rx.next().await.is_some() {}
Ok(())
}
pub fn close_reason(&self) -> Option<SubscriptionCloseReason> {
let lagged = self.rx.lagged.has_lagged();
if !self.is_closed && !lagged {
return None;
}
if lagged { Some(SubscriptionCloseReason::Lagged) } else { Some(SubscriptionCloseReason::ConnectionClosed) }
}
}
#[derive(Debug)]
struct BatchMessage {
raw: String,
ids: Range<u64>,
send_back: oneshot::Sender<Result<Vec<RawResponseOwned>, InvalidRequestId>>,
}
#[derive(Debug)]
struct RequestMessage {
raw: String,
id: Id<'static>,
send_back: Option<oneshot::Sender<Result<RawResponseOwned, InvalidRequestId>>>,
}
#[derive(Debug)]
struct SubscriptionMessage {
raw: String,
subscribe_id: Id<'static>,
unsubscribe_id: Id<'static>,
unsubscribe_method: String,
send_back: oneshot::Sender<Result<(SubscriptionReceiver, SubscriptionId<'static>), Error>>,
}
#[derive(Debug)]
struct RegisterNotificationMessage {
method: String,
send_back: oneshot::Sender<Result<(SubscriptionReceiver, String), Error>>,
}
#[derive(Debug)]
enum FrontToBack {
Batch(BatchMessage),
Notification(String),
Request(RequestMessage),
Subscribe(SubscriptionMessage),
RegisterNotification(RegisterNotificationMessage),
UnregisterNotification(String),
SubscriptionClosed(SubscriptionId<'static>),
}
impl<Notif> Subscription<Notif>
where
Notif: DeserializeOwned,
{
#[allow(clippy::should_implement_trait)]
pub async fn next(&mut self) -> Option<Result<Notif, serde_json::Error>> {
StreamExt::next(self).await
}
}
impl<Notif> Stream for Subscription<Notif>
where
Notif: DeserializeOwned,
{
type Item = Result<Notif, serde_json::Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
let res = match futures_util::ready!(self.rx.poll_next_unpin(cx)) {
Some(v) => Some(serde_json::from_str::<Notif>(v.get())),
None => {
self.is_closed = true;
None
}
};
Poll::Ready(res)
}
}
impl<Notif> Drop for Subscription<Notif> {
fn drop(&mut self) {
let msg = match self.kind.take() {
Some(SubscriptionKind::Method(notif)) => FrontToBack::UnregisterNotification(notif),
Some(SubscriptionKind::Subscription(sub_id)) => FrontToBack::SubscriptionClosed(sub_id),
None => return,
};
let _ = self.to_back.try_send(msg);
}
}
#[derive(Debug)]
pub struct RequestIdManager {
current_id: CurrentId,
id_kind: IdKind,
}
impl RequestIdManager {
pub fn new(id_kind: IdKind) -> Self {
Self { current_id: CurrentId::new(), id_kind }
}
pub fn next_request_id(&self) -> Id<'static> {
self.id_kind.into_id(self.current_id.next())
}
pub fn as_id_kind(&self) -> IdKind {
self.id_kind
}
}
#[derive(Debug, Copy, Clone)]
pub enum IdKind {
String,
Number,
}
impl IdKind {
pub fn into_id(self, id: u64) -> Id<'static> {
match self {
IdKind::Number => Id::Number(id),
IdKind::String => Id::Str(format!("{id}").into()),
}
}
}
#[derive(Debug)]
struct CurrentId(AtomicUsize);
impl CurrentId {
fn new() -> Self {
CurrentId(AtomicUsize::new(0))
}
fn next(&self) -> u64 {
self.0
.fetch_add(1, Ordering::Relaxed)
.try_into()
.expect("usize -> u64 infallible, there are no CPUs > 64 bits; qed")
}
}
pub fn generate_batch_id_range(id: Id, len: u64) -> Result<Range<u64>, Error> {
let id_start = id.try_parse_inner_as_number()?;
let id_end = id_start
.checked_add(len)
.ok_or_else(|| Error::Custom("BatchID range wrapped; restart the client or try again later".to_string()))?;
Ok(id_start..id_end)
}
pub type BatchEntry<'a, R> = Result<R, ErrorObject<'a>>;
#[derive(Debug, Clone)]
pub struct BatchResponse<'a, R> {
successful_calls: usize,
failed_calls: usize,
responses: Vec<BatchEntry<'a, R>>,
}
impl<'a, R: fmt::Debug + 'a> BatchResponse<'a, R> {
pub fn new(successful_calls: usize, responses: Vec<BatchEntry<'a, R>>, failed_calls: usize) -> Self {
Self { successful_calls, responses, failed_calls }
}
pub fn len(&self) -> usize {
self.responses.len()
}
pub fn is_empty(&self) -> bool {
self.responses.len() == 0
}
pub fn num_successful_calls(&self) -> usize {
self.successful_calls
}
pub fn num_failed_calls(&self) -> usize {
self.failed_calls
}
pub fn into_ok(
self,
) -> Result<impl Iterator<Item = R> + 'a + std::fmt::Debug, impl Iterator<Item = ErrorObject<'a>> + std::fmt::Debug>
{
if self.failed_calls > 0 {
Err(self.into_iter().filter_map(|err| err.err()))
} else {
Ok(self.into_iter().filter_map(|r| r.ok()))
}
}
pub fn ok(
&self,
) -> Result<impl Iterator<Item = &R> + std::fmt::Debug, impl Iterator<Item = &ErrorObject<'a>> + std::fmt::Debug> {
if self.failed_calls > 0 {
Err(self.responses.iter().filter_map(|err| err.as_ref().err()))
} else {
Ok(self.responses.iter().filter_map(|r| r.as_ref().ok()))
}
}
pub fn iter(&self) -> impl Iterator<Item = &BatchEntry<'_, R>> {
self.responses.iter()
}
}
impl<'a, R> IntoIterator for BatchResponse<'a, R> {
type Item = BatchEntry<'a, R>;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.responses.into_iter()
}
}
#[derive(thiserror::Error, Debug)]
enum TrySubscriptionSendError {
#[error("The subscription is closed")]
Closed,
#[error("A subscription message was dropped")]
TooSlow(Box<RawValue>),
}
#[derive(Debug)]
pub(crate) struct SubscriptionSender {
inner: mpsc::Sender<Box<RawValue>>,
lagged: SubscriptionLagged,
}
impl SubscriptionSender {
fn send(&self, msg: Box<RawValue>) -> Result<(), TrySubscriptionSendError> {
match self.inner.try_send(msg) {
Ok(_) => Ok(()),
Err(TrySendError::Closed(_)) => Err(TrySubscriptionSendError::Closed),
Err(TrySendError::Full(m)) => {
self.lagged.set_lagged();
Err(TrySubscriptionSendError::TooSlow(m))
}
}
}
}
#[derive(Debug)]
pub(crate) struct SubscriptionReceiver {
inner: mpsc::Receiver<Box<RawValue>>,
lagged: SubscriptionLagged,
}
impl Stream for SubscriptionReceiver {
type Item = Box<RawValue>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
self.inner.poll_recv(cx)
}
}
fn subscription_channel(max_buf_size: usize) -> (SubscriptionSender, SubscriptionReceiver) {
let (tx, rx) = mpsc::channel(max_buf_size);
let lagged_tx = SubscriptionLagged::new();
let lagged_rx = lagged_tx.clone();
(SubscriptionSender { inner: tx, lagged: lagged_tx }, SubscriptionReceiver { inner: rx, lagged: lagged_rx })
}
#[derive(Debug)]
pub struct SubscriptionResponse {
sub_id: SubscriptionId<'static>,
stream: SubscriptionReceiver,
}
impl SubscriptionResponse {
pub fn subscription_id(&self) -> &SubscriptionId<'static> {
&self.sub_id
}
}
#[derive(Debug)]
pub struct RawResponse<'a>(jsonrpsee_types::Response<'a, Box<RawValue>>);
impl Serialize for RawResponse<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.0.serialize(serializer)
}
}
impl<'a> From<jsonrpsee_types::Response<'a, Box<RawValue>>> for RawResponse<'a> {
fn from(r: jsonrpsee_types::Response<'a, Box<RawValue>>) -> Self {
Self(r)
}
}
impl<'a> RawResponse<'a> {
pub fn is_success(&self) -> bool {
match self.0.payload {
jsonrpsee_types::ResponsePayload::Success(_) => true,
jsonrpsee_types::ResponsePayload::Error(_) => false,
}
}
pub fn as_error(&self) -> Option<&ErrorObject<'_>> {
match self.0.payload {
jsonrpsee_types::ResponsePayload::Error(ref err) => Some(err),
_ => None,
}
}
pub fn as_success(&self) -> Option<&RawValue> {
match self.0.payload {
jsonrpsee_types::ResponsePayload::Success(ref res) => Some(res),
_ => None,
}
}
pub fn id(&self) -> &Id<'a> {
&self.0.id
}
pub fn into_inner(self) -> jsonrpsee_types::Response<'a, Box<RawValue>> {
self.0
}
pub fn into_owned(self) -> RawResponseOwned {
RawResponse(self.0.into_owned())
}
}
impl ToJson for RawResponse<'_> {
fn to_json(&self) -> Result<Box<RawValue>, serde_json::Error> {
serde_json::value::to_raw_value(&self.0)
}
}
pub type MiddlewareBatchResponse = Vec<RawResponseOwned>;
#[derive(Debug)]
pub struct MiddlewareMethodResponse {
rp: RawResponseOwned,
subscription: Option<SubscriptionResponse>,
}
impl Serialize for MiddlewareMethodResponse {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.rp.serialize(serializer)
}
}
impl Deref for MiddlewareMethodResponse {
type Target = RawResponseOwned;
fn deref(&self) -> &Self::Target {
&self.rp
}
}
impl MiddlewareMethodResponse {
pub fn response(rp: RawResponseOwned) -> Self {
Self { rp, subscription: None }
}
pub fn subscription_response(rp: RawResponseOwned, subscription: SubscriptionResponse) -> Self {
Self { rp, subscription: Some(subscription) }
}
pub fn into_parts(self) -> (RawResponseOwned, Option<SubscriptionResponse>) {
(self.rp, self.subscription)
}
pub fn into_response(self) -> RawResponseOwned {
self.rp
}
pub fn into_subscription(self) -> Option<SubscriptionResponse> {
self.subscription
}
}
#[derive(Debug, Clone)]
pub struct MiddlewareNotifResponse(Extensions);
impl From<Extensions> for MiddlewareNotifResponse {
fn from(extensions: Extensions) -> Self {
Self(extensions)
}
}
impl MiddlewareNotifResponse {
pub fn extensions(&self) -> &Extensions {
&self.0
}
pub fn extensions_mut(&mut self) -> &mut Extensions {
&mut self.0
}
}
impl<T: Serialize> ToJson for Result<T, Error> {
fn to_json(&self) -> Result<Box<RawValue>, serde_json::Error> {
match self {
Ok(v) => serde_json::value::to_raw_value(v),
Err(e) => serde_json::value::to_raw_value(&e.to_string()),
}
}
}