use std::fmt;
use std::ops::Range;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task;
use crate::error::Error;
use crate::params::BatchRequestBuilder;
use crate::traits::ToRpcParams;
use async_trait::async_trait;
use core::marker::PhantomData;
use futures_channel::{mpsc, oneshot};
use futures_util::future::FutureExt;
use futures_util::sink::SinkExt;
use futures_util::stream::{Stream, StreamExt};
use jsonrpsee_types::{ErrorObject, Id, SubscriptionId};
use serde::de::DeserializeOwned;
use serde_json::Value as JsonValue;
#[doc(hidden)]
pub mod __reexports {
pub use crate::traits::ToRpcParams;
pub use crate::params::ArrayParams;
}
cfg_async_client! {
pub mod async_client;
pub use async_client::{Client, ClientBuilder};
}
#[async_trait]
pub trait ClientT {
async fn notification<Params>(&self, method: &str, params: Params) -> Result<(), Error>
where
Params: ToRpcParams + Send;
async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, Error>
where
R: DeserializeOwned,
Params: ToRpcParams + Send;
async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result<BatchResponse<'a, R>, Error>
where
R: DeserializeOwned + fmt::Debug + 'a;
}
#[async_trait]
pub trait SubscriptionClientT: ClientT {
async fn subscribe<'a, Notif, Params>(
&self,
subscribe_method: &'a str,
params: Params,
unsubscribe_method: &'a str,
) -> Result<Subscription<Notif>, Error>
where
Params: ToRpcParams + Send,
Notif: DeserializeOwned;
async fn subscribe_to_method<'a, Notif>(&self, method: &'a str) -> Result<Subscription<Notif>, Error>
where
Notif: DeserializeOwned;
}
#[cfg(target_arch = "wasm32")]
pub trait MaybeSend {}
#[cfg(not(target_arch = "wasm32"))]
pub trait MaybeSend: Send {}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Send> MaybeSend for T {}
#[cfg(target_arch = "wasm32")]
impl<T> MaybeSend for T {}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait TransportSenderT: MaybeSend + 'static {
type Error: std::error::Error + Send + Sync;
async fn send(&mut self, msg: String) -> Result<(), Self::Error>;
async fn send_ping(&mut self) -> Result<(), Self::Error> {
Ok(())
}
async fn close(&mut self) -> Result<(), Self::Error> {
Ok(())
}
}
#[derive(Debug, Clone)]
pub enum ReceivedMessage {
Text(String),
Bytes(Vec<u8>),
Pong,
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
pub trait TransportReceiverT: 'static {
type Error: std::error::Error + Send + Sync;
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error>;
}
#[macro_export]
macro_rules! rpc_params {
($($param:expr),*) => {
{
let mut params = $crate::client::__reexports::ArrayParams::new();
$(
if let Err(err) = params.insert($param) {
panic!("Parameter `{}` cannot be serialized: {:?}", stringify!($param), err);
}
)*
params
}
};
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum SubscriptionKind {
Subscription(SubscriptionId<'static>),
Method(String),
}
#[derive(Debug)]
pub struct Subscription<Notif> {
to_back: mpsc::Sender<FrontToBack>,
notifs_rx: mpsc::Receiver<JsonValue>,
kind: Option<SubscriptionKind>,
marker: PhantomData<Notif>,
}
impl<Notif> std::marker::Unpin for Subscription<Notif> {}
impl<Notif> Subscription<Notif> {
pub fn new(
to_back: mpsc::Sender<FrontToBack>,
notifs_rx: mpsc::Receiver<JsonValue>,
kind: SubscriptionKind,
) -> Self {
Self { to_back, notifs_rx, kind: Some(kind), marker: PhantomData }
}
pub fn kind(&self) -> &SubscriptionKind {
self.kind.as_ref().expect("only None after unsubscribe; qed")
}
pub async fn unsubscribe(mut self) -> Result<(), Error> {
let msg = match self.kind.take().expect("only None after unsubscribe; qed") {
SubscriptionKind::Method(notif) => FrontToBack::UnregisterNotification(notif),
SubscriptionKind::Subscription(sub_id) => FrontToBack::SubscriptionClosed(sub_id),
};
self.to_back.send(msg).await?;
while self.notifs_rx.next().await.is_some() {}
Ok(())
}
}
#[derive(Debug)]
pub struct BatchMessage {
pub raw: String,
pub ids: Range<u64>,
pub send_back: oneshot::Sender<Result<Vec<BatchEntry<'static, JsonValue>>, Error>>,
}
#[derive(Debug)]
pub struct RequestMessage {
pub raw: String,
pub id: Id<'static>,
pub send_back: Option<oneshot::Sender<Result<JsonValue, Error>>>,
}
#[derive(Debug)]
pub struct SubscriptionMessage {
pub raw: String,
pub subscribe_id: Id<'static>,
pub unsubscribe_id: Id<'static>,
pub unsubscribe_method: String,
pub send_back: oneshot::Sender<Result<(mpsc::Receiver<JsonValue>, SubscriptionId<'static>), Error>>,
}
#[derive(Debug)]
pub struct RegisterNotificationMessage {
pub method: String,
pub send_back: oneshot::Sender<Result<(mpsc::Receiver<JsonValue>, String), Error>>,
}
#[derive(Debug)]
pub enum FrontToBack {
Batch(BatchMessage),
Notification(String),
Request(RequestMessage),
Subscribe(SubscriptionMessage),
RegisterNotification(RegisterNotificationMessage),
UnregisterNotification(String),
SubscriptionClosed(SubscriptionId<'static>),
}
impl<Notif> Subscription<Notif>
where
Notif: DeserializeOwned,
{
#[allow(clippy::should_implement_trait)]
pub async fn next(&mut self) -> Option<Result<Notif, Error>> {
StreamExt::next(self).await
}
}
impl<Notif> Stream for Subscription<Notif>
where
Notif: DeserializeOwned,
{
type Item = Result<Notif, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
let n = futures_util::ready!(self.notifs_rx.poll_next_unpin(cx));
let res = n.map(|n| match serde_json::from_value::<Notif>(n) {
Ok(parsed) => Ok(parsed),
Err(e) => Err(Error::ParseError(e)),
});
task::Poll::Ready(res)
}
}
impl<Notif> Drop for Subscription<Notif> {
fn drop(&mut self) {
let msg = match self.kind.take() {
Some(SubscriptionKind::Method(notif)) => FrontToBack::UnregisterNotification(notif),
Some(SubscriptionKind::Subscription(sub_id)) => FrontToBack::SubscriptionClosed(sub_id),
None => return,
};
let _ = self.to_back.send(msg).now_or_never();
}
}
#[derive(Debug)]
pub struct RequestIdManager {
current_pending: Arc<()>,
max_concurrent_requests: usize,
current_id: AtomicU64,
id_kind: IdKind,
}
impl RequestIdManager {
pub fn new(limit: usize, id_kind: IdKind) -> Self {
Self { current_pending: Arc::new(()), max_concurrent_requests: limit, current_id: AtomicU64::new(0), id_kind }
}
fn get_slot(&self) -> Result<Arc<()>, Error> {
if Arc::strong_count(&self.current_pending) > self.max_concurrent_requests {
Err(Error::MaxSlotsExceeded)
} else {
Ok(self.current_pending.clone())
}
}
pub fn next_request_id(&self) -> Result<RequestIdGuard<Id<'static>>, Error> {
let rc = self.get_slot()?;
let id = self.id_kind.into_id(self.current_id.fetch_add(1, Ordering::SeqCst));
Ok(RequestIdGuard { _rc: rc, id })
}
pub fn next_request_two_ids(&self) -> Result<RequestIdGuard<(Id<'static>, Id<'static>)>, Error> {
let rc = self.get_slot()?;
let id1 = self.id_kind.into_id(self.current_id.fetch_add(1, Ordering::SeqCst));
let id2 = self.id_kind.into_id(self.current_id.fetch_add(1, Ordering::SeqCst));
Ok(RequestIdGuard { _rc: rc, id: (id1, id2) })
}
pub fn as_id_kind(&self) -> IdKind {
self.id_kind
}
}
#[derive(Debug)]
pub struct RequestIdGuard<T: Clone> {
id: T,
_rc: Arc<()>,
}
impl<T: Clone> RequestIdGuard<T> {
pub fn inner(&self) -> T {
self.id.clone()
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum CertificateStore {
Native,
WebPki,
}
#[derive(Debug, Copy, Clone)]
pub enum IdKind {
String,
Number,
}
impl IdKind {
pub fn into_id(self, id: u64) -> Id<'static> {
match self {
IdKind::Number => Id::Number(id),
IdKind::String => Id::Str(format!("{}", id).into()),
}
}
}
pub fn generate_batch_id_range(guard: &RequestIdGuard<Id>, len: u64) -> Result<Range<u64>, Error> {
let id_start = guard.inner().try_parse_inner_as_number().ok_or(Error::InvalidRequestId)?;
let id_end = id_start
.checked_add(len)
.ok_or_else(|| Error::Custom("BatchID range wrapped; restart the client or try again later".to_string()))?;
Ok(id_start..id_end)
}
pub type BatchEntry<'a, R> = Result<R, ErrorObject<'a>>;
#[derive(Debug, Clone)]
pub struct BatchResponse<'a, R> {
successful_calls: usize,
failed_calls: usize,
responses: Vec<BatchEntry<'a, R>>,
}
impl<'a, R: fmt::Debug + 'a> BatchResponse<'a, R> {
pub fn new(successful_calls: usize, responses: Vec<BatchEntry<'a, R>>, failed_calls: usize) -> Self {
Self { successful_calls, responses, failed_calls }
}
pub fn len(&self) -> usize {
self.responses.len()
}
pub fn is_empty(&self) -> bool {
self.responses.len() == 0
}
pub fn num_successful_calls(&self) -> usize {
self.successful_calls
}
pub fn num_failed_calls(&self) -> usize {
self.failed_calls
}
pub fn into_ok(
self,
) -> Result<impl Iterator<Item = R> + 'a + std::fmt::Debug, impl Iterator<Item = ErrorObject<'a>> + std::fmt::Debug>
{
if self.failed_calls > 0 {
Err(self.into_iter().filter_map(|err| err.err()))
} else {
Ok(self.into_iter().filter_map(|r| r.ok()))
}
}
pub fn ok(
&self,
) -> Result<impl Iterator<Item = &R> + std::fmt::Debug, impl Iterator<Item = &ErrorObject<'a>> + std::fmt::Debug> {
if self.failed_calls > 0 {
Err(self.responses.iter().filter_map(|err| err.as_ref().err()))
} else {
Ok(self.responses.iter().filter_map(|r| r.as_ref().ok()))
}
}
pub fn iter(&self) -> impl Iterator<Item = &BatchEntry<'_, R>> {
self.responses.iter()
}
}
impl<'a, R> IntoIterator for BatchResponse<'a, R> {
type Item = BatchEntry<'a, R>;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.responses.into_iter()
}
}
#[cfg(test)]
mod tests {
use super::{IdKind, RequestIdManager};
#[test]
fn request_id_guard_works() {
let manager = RequestIdManager::new(2, IdKind::Number);
let _first = manager.next_request_id().unwrap();
{
let _second = manager.next_request_two_ids().unwrap();
assert!(manager.next_request_id().is_err());
}
assert!(manager.next_request_id().is_ok());
}
}