1cfg_async_client! {
30 pub mod async_client;
31 pub use async_client::{Client, ClientBuilder};
32}
33
34pub mod error;
35
36pub use error::Error;
37
38use std::fmt;
39use std::future::Future;
40use std::ops::{Deref, Range};
41use std::pin::Pin;
42use std::sync::atomic::{AtomicUsize, Ordering};
43use std::sync::{Arc, RwLock};
44use std::task::{self, Poll};
45
46use crate::params::BatchRequestBuilder;
47use crate::traits::{ToJson, ToRpcParams};
48
49use core::marker::PhantomData;
50use futures_util::stream::{Stream, StreamExt};
51use http::Extensions;
52use jsonrpsee_types::{ErrorObject, Id, InvalidRequestId, SubscriptionId};
53use serde::Serialize;
54use serde::de::DeserializeOwned;
55use serde_json::value::RawValue;
56use tokio::sync::mpsc::error::TrySendError;
57use tokio::sync::{mpsc, oneshot};
58
59#[derive(Debug, Clone)]
61pub(crate) struct SubscriptionLagged(Arc<RwLock<bool>>);
62
63pub type RawResponseOwned = RawResponse<'static>;
65
66impl SubscriptionLagged {
67 pub(crate) fn new() -> Self {
69 Self(Arc::new(RwLock::new(false)))
70 }
71
72 pub(crate) fn set_lagged(&self) {
74 *self.0.write().expect("RwLock not poised; qed") = true;
75 }
76
77 pub(crate) fn has_lagged(&self) -> bool {
79 *self.0.read().expect("RwLock not poised; qed")
80 }
81}
82
83#[doc(hidden)]
85pub mod __reexports {
86 pub use crate::traits::ToRpcParams;
88 pub use crate::params::ArrayParams;
90}
91
92pub trait ClientT {
94 fn notification<Params>(&self, method: &str, params: Params) -> impl Future<Output = Result<(), Error>> + Send
96 where
97 Params: ToRpcParams + Send;
98
99 fn request<R, Params>(&self, method: &str, params: Params) -> impl Future<Output = Result<R, Error>> + Send
101 where
102 R: DeserializeOwned,
103 Params: ToRpcParams + Send;
104
105 fn batch_request<'a, R>(
113 &self,
114 batch: BatchRequestBuilder<'a>,
115 ) -> impl Future<Output = Result<BatchResponse<'a, R>, Error>> + Send
116 where
117 R: DeserializeOwned + fmt::Debug + 'a;
118}
119
120pub trait SubscriptionClientT: ClientT {
122 fn subscribe<'a, Notif, Params>(
135 &self,
136 subscribe_method: &'a str,
137 params: Params,
138 unsubscribe_method: &'a str,
139 ) -> impl Future<Output = Result<Subscription<Notif>, Error>> + Send
140 where
141 Params: ToRpcParams + Send,
142 Notif: DeserializeOwned;
143
144 fn subscribe_to_method<Notif>(
149 &self,
150 method: &str,
151 ) -> impl Future<Output = Result<Subscription<Notif>, Error>> + Send
152 where
153 Notif: DeserializeOwned;
154}
155
156#[cfg(target_arch = "wasm32")]
158pub trait MaybeSend {}
159
160#[cfg(not(target_arch = "wasm32"))]
162pub trait MaybeSend: Send {}
163
164#[cfg(not(target_arch = "wasm32"))]
165impl<T: Send> MaybeSend for T {}
166
167#[cfg(target_arch = "wasm32")]
168impl<T> MaybeSend for T {}
169
170pub trait TransportSenderT: 'static {
172 type Error: std::error::Error + Send + Sync;
174
175 fn send(&mut self, msg: String) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend;
177
178 fn send_ping(&mut self) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
183 async { Ok(()) }
184 }
185
186 fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
191 async { Ok(()) }
192 }
193}
194
195#[derive(Debug, Clone)]
198pub enum ReceivedMessage {
199 Text(String),
201 Bytes(Vec<u8>),
203 Pong,
205}
206
207pub trait TransportReceiverT: 'static {
209 type Error: std::error::Error + Send + Sync;
211
212 fn receive(&mut self) -> impl Future<Output = Result<ReceivedMessage, Self::Error>> + MaybeSend;
214}
215
216#[macro_export]
223macro_rules! rpc_params {
224 ($($param:expr),*) => {
225 {
226 let mut params = $crate::client::__reexports::ArrayParams::new();
227 $(
228 if let Err(err) = params.insert($param) {
229 panic!("Parameter `{}` cannot be serialized: {:?}", stringify!($param), err);
230 }
231 )*
232 params
233 }
234 };
235}
236
237#[derive(Debug, Clone)]
239#[non_exhaustive]
240pub enum SubscriptionKind {
241 Subscription(SubscriptionId<'static>),
243 Method(String),
245}
246
247#[derive(Debug, Copy, Clone)]
249pub enum SubscriptionCloseReason {
250 ConnectionClosed,
252 Lagged,
254}
255
256#[derive(Debug)]
275pub struct Subscription<Notif> {
276 is_closed: bool,
277 to_back: mpsc::Sender<FrontToBack>,
279 rx: SubscriptionReceiver,
281 kind: Option<SubscriptionKind>,
283 marker: PhantomData<Notif>,
285}
286
287impl<Notif> std::marker::Unpin for Subscription<Notif> {}
290
291impl<Notif> Subscription<Notif> {
292 fn new(to_back: mpsc::Sender<FrontToBack>, rx: SubscriptionReceiver, kind: SubscriptionKind) -> Self {
294 Self { to_back, rx, kind: Some(kind), marker: PhantomData, is_closed: false }
295 }
296
297 pub fn kind(&self) -> &SubscriptionKind {
299 self.kind.as_ref().expect("only None after unsubscribe; qed")
300 }
301
302 pub async fn unsubscribe(mut self) -> Result<(), Error> {
304 let msg = match self.kind.take().expect("only None after unsubscribe; qed") {
305 SubscriptionKind::Method(notif) => FrontToBack::UnregisterNotification(notif),
306 SubscriptionKind::Subscription(sub_id) => FrontToBack::SubscriptionClosed(sub_id),
307 };
308 let _ = self.to_back.send(msg).await;
310
311 while self.rx.next().await.is_some() {}
313
314 Ok(())
315 }
316
317 pub fn close_reason(&self) -> Option<SubscriptionCloseReason> {
322 let lagged = self.rx.lagged.has_lagged();
323
324 if !self.is_closed && !lagged {
327 return None;
328 }
329
330 if lagged { Some(SubscriptionCloseReason::Lagged) } else { Some(SubscriptionCloseReason::ConnectionClosed) }
331 }
332}
333
334#[derive(Debug)]
336struct BatchMessage {
337 raw: String,
339 ids: Range<u64>,
341 send_back: oneshot::Sender<Result<Vec<RawResponseOwned>, InvalidRequestId>>,
343}
344
345#[derive(Debug)]
347struct RequestMessage {
348 raw: String,
350 id: Id<'static>,
352 send_back: Option<oneshot::Sender<Result<RawResponseOwned, InvalidRequestId>>>,
354}
355
356#[derive(Debug)]
358struct SubscriptionMessage {
359 raw: String,
361 subscribe_id: Id<'static>,
363 unsubscribe_id: Id<'static>,
365 unsubscribe_method: String,
367 send_back: oneshot::Sender<Result<(SubscriptionReceiver, SubscriptionId<'static>), Error>>,
371}
372
373#[derive(Debug)]
375struct RegisterNotificationMessage {
376 method: String,
378 send_back: oneshot::Sender<Result<(SubscriptionReceiver, String), Error>>,
382}
383
384#[derive(Debug)]
386enum FrontToBack {
387 Batch(BatchMessage),
389 Notification(String),
391 Request(RequestMessage),
393 Subscribe(SubscriptionMessage),
395 RegisterNotification(RegisterNotificationMessage),
397 UnregisterNotification(String),
399 SubscriptionClosed(SubscriptionId<'static>),
405}
406
407impl<Notif> Subscription<Notif>
408where
409 Notif: DeserializeOwned,
410{
411 #[allow(clippy::should_implement_trait)]
419 pub async fn next(&mut self) -> Option<Result<Notif, serde_json::Error>> {
420 StreamExt::next(self).await
421 }
422}
423
424impl<Notif> Stream for Subscription<Notif>
425where
426 Notif: DeserializeOwned,
427{
428 type Item = Result<Notif, serde_json::Error>;
429 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
430 let res = match futures_util::ready!(self.rx.poll_next_unpin(cx)) {
431 Some(v) => Some(serde_json::from_str::<Notif>(v.get())),
432 None => {
433 self.is_closed = true;
434 None
435 }
436 };
437
438 Poll::Ready(res)
439 }
440}
441
442impl<Notif> Drop for Subscription<Notif> {
443 fn drop(&mut self) {
444 let msg = match self.kind.take() {
450 Some(SubscriptionKind::Method(notif)) => FrontToBack::UnregisterNotification(notif),
451 Some(SubscriptionKind::Subscription(sub_id)) => FrontToBack::SubscriptionClosed(sub_id),
452 None => return,
453 };
454 let _ = self.to_back.try_send(msg);
455 }
456}
457
458#[derive(Debug)]
459pub struct RequestIdManager {
461 current_id: CurrentId,
463 id_kind: IdKind,
465}
466
467impl RequestIdManager {
468 pub fn new(id_kind: IdKind) -> Self {
470 Self { current_id: CurrentId::new(), id_kind }
471 }
472
473 pub fn next_request_id(&self) -> Id<'static> {
475 self.id_kind.into_id(self.current_id.next())
476 }
477
478 pub fn as_id_kind(&self) -> IdKind {
480 self.id_kind
481 }
482}
483
484#[derive(Debug, Copy, Clone)]
486pub enum IdKind {
487 String,
489 Number,
491}
492
493impl IdKind {
494 pub fn into_id(self, id: u64) -> Id<'static> {
496 match self {
497 IdKind::Number => Id::Number(id),
498 IdKind::String => Id::Str(format!("{id}").into()),
499 }
500 }
501}
502
503#[derive(Debug)]
504struct CurrentId(AtomicUsize);
505
506impl CurrentId {
507 fn new() -> Self {
508 CurrentId(AtomicUsize::new(0))
509 }
510
511 fn next(&self) -> u64 {
512 self.0
513 .fetch_add(1, Ordering::Relaxed)
514 .try_into()
515 .expect("usize -> u64 infallible, there are no CPUs > 64 bits; qed")
516 }
517}
518
519pub fn generate_batch_id_range(id: Id, len: u64) -> Result<Range<u64>, Error> {
521 let id_start = id.try_parse_inner_as_number()?;
522 let id_end = id_start
523 .checked_add(len)
524 .ok_or_else(|| Error::Custom("BatchID range wrapped; restart the client or try again later".to_string()))?;
525
526 Ok(id_start..id_end)
527}
528
529pub type BatchEntry<'a, R> = Result<R, ErrorObject<'a>>;
531
532#[derive(Debug, Clone)]
534pub struct BatchResponse<'a, R> {
535 successful_calls: usize,
536 failed_calls: usize,
537 responses: Vec<BatchEntry<'a, R>>,
538}
539
540impl<'a, R: fmt::Debug + 'a> BatchResponse<'a, R> {
541 pub fn new(successful_calls: usize, responses: Vec<BatchEntry<'a, R>>, failed_calls: usize) -> Self {
543 Self { successful_calls, responses, failed_calls }
544 }
545
546 pub fn len(&self) -> usize {
548 self.responses.len()
549 }
550
551 pub fn is_empty(&self) -> bool {
553 self.responses.len() == 0
554 }
555
556 pub fn num_successful_calls(&self) -> usize {
558 self.successful_calls
559 }
560
561 pub fn num_failed_calls(&self) -> usize {
563 self.failed_calls
564 }
565
566 pub fn into_ok(
572 self,
573 ) -> Result<impl Iterator<Item = R> + 'a + std::fmt::Debug, impl Iterator<Item = ErrorObject<'a>> + std::fmt::Debug>
574 {
575 if self.failed_calls > 0 {
576 Err(self.into_iter().filter_map(|err| err.err()))
577 } else {
578 Ok(self.into_iter().filter_map(|r| r.ok()))
579 }
580 }
581
582 pub fn ok(
584 &self,
585 ) -> Result<impl Iterator<Item = &R> + std::fmt::Debug, impl Iterator<Item = &ErrorObject<'a>> + std::fmt::Debug> {
586 if self.failed_calls > 0 {
587 Err(self.responses.iter().filter_map(|err| err.as_ref().err()))
588 } else {
589 Ok(self.responses.iter().filter_map(|r| r.as_ref().ok()))
590 }
591 }
592
593 pub fn iter(&self) -> impl Iterator<Item = &BatchEntry<'_, R>> {
595 self.responses.iter()
596 }
597}
598
599impl<'a, R> IntoIterator for BatchResponse<'a, R> {
600 type Item = BatchEntry<'a, R>;
601 type IntoIter = std::vec::IntoIter<Self::Item>;
602
603 fn into_iter(self) -> Self::IntoIter {
604 self.responses.into_iter()
605 }
606}
607
608#[derive(thiserror::Error, Debug)]
609enum TrySubscriptionSendError {
610 #[error("The subscription is closed")]
611 Closed,
612 #[error("A subscription message was dropped")]
613 TooSlow(Box<RawValue>),
614}
615
616#[derive(Debug)]
617pub(crate) struct SubscriptionSender {
618 inner: mpsc::Sender<Box<RawValue>>,
619 lagged: SubscriptionLagged,
620}
621
622impl SubscriptionSender {
623 fn send(&self, msg: Box<RawValue>) -> Result<(), TrySubscriptionSendError> {
624 match self.inner.try_send(msg) {
625 Ok(_) => Ok(()),
626 Err(TrySendError::Closed(_)) => Err(TrySubscriptionSendError::Closed),
627 Err(TrySendError::Full(m)) => {
628 self.lagged.set_lagged();
629 Err(TrySubscriptionSendError::TooSlow(m))
630 }
631 }
632 }
633}
634
635#[derive(Debug)]
636pub(crate) struct SubscriptionReceiver {
637 inner: mpsc::Receiver<Box<RawValue>>,
638 lagged: SubscriptionLagged,
639}
640
641impl Stream for SubscriptionReceiver {
642 type Item = Box<RawValue>;
643
644 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
645 self.inner.poll_recv(cx)
646 }
647}
648
649fn subscription_channel(max_buf_size: usize) -> (SubscriptionSender, SubscriptionReceiver) {
650 let (tx, rx) = mpsc::channel(max_buf_size);
651 let lagged_tx = SubscriptionLagged::new();
652 let lagged_rx = lagged_tx.clone();
653
654 (SubscriptionSender { inner: tx, lagged: lagged_tx }, SubscriptionReceiver { inner: rx, lagged: lagged_rx })
655}
656
657#[derive(Debug)]
659pub struct SubscriptionResponse {
660 sub_id: SubscriptionId<'static>,
662 stream: SubscriptionReceiver,
665}
666
667impl SubscriptionResponse {
668 pub fn subscription_id(&self) -> &SubscriptionId<'static> {
670 &self.sub_id
671 }
672}
673
674#[derive(Debug)]
679pub struct RawResponse<'a>(jsonrpsee_types::Response<'a, Box<RawValue>>);
680
681impl Serialize for RawResponse<'_> {
682 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
683 where
684 S: serde::Serializer,
685 {
686 self.0.serialize(serializer)
687 }
688}
689
690impl<'a> From<jsonrpsee_types::Response<'a, Box<RawValue>>> for RawResponse<'a> {
691 fn from(r: jsonrpsee_types::Response<'a, Box<RawValue>>) -> Self {
692 Self(r)
693 }
694}
695
696impl<'a> RawResponse<'a> {
697 pub fn is_success(&self) -> bool {
699 match self.0.payload {
700 jsonrpsee_types::ResponsePayload::Success(_) => true,
701 jsonrpsee_types::ResponsePayload::Error(_) => false,
702 }
703 }
704
705 pub fn as_error(&self) -> Option<&ErrorObject<'_>> {
707 match self.0.payload {
708 jsonrpsee_types::ResponsePayload::Error(ref err) => Some(err),
709 _ => None,
710 }
711 }
712
713 pub fn as_success(&self) -> Option<&RawValue> {
717 match self.0.payload {
718 jsonrpsee_types::ResponsePayload::Success(ref res) => Some(res),
719 _ => None,
720 }
721 }
722
723 pub fn id(&self) -> &Id<'a> {
725 &self.0.id
726 }
727
728 pub fn into_inner(self) -> jsonrpsee_types::Response<'a, Box<RawValue>> {
730 self.0
731 }
732
733 pub fn into_owned(self) -> RawResponseOwned {
735 RawResponse(self.0.into_owned())
736 }
737}
738
739impl ToJson for RawResponse<'_> {
740 fn to_json(&self) -> Result<Box<RawValue>, serde_json::Error> {
741 serde_json::value::to_raw_value(&self.0)
742 }
743}
744
745pub type MiddlewareBatchResponse = Vec<RawResponseOwned>;
747
748#[derive(Debug)]
751pub struct MiddlewareMethodResponse {
752 rp: RawResponseOwned,
753 subscription: Option<SubscriptionResponse>,
754}
755
756impl Serialize for MiddlewareMethodResponse {
757 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
758 where
759 S: serde::Serializer,
760 {
761 self.rp.serialize(serializer)
762 }
763}
764
765impl Deref for MiddlewareMethodResponse {
766 type Target = RawResponseOwned;
767
768 fn deref(&self) -> &Self::Target {
769 &self.rp
770 }
771}
772
773impl MiddlewareMethodResponse {
774 pub fn response(rp: RawResponseOwned) -> Self {
776 Self { rp, subscription: None }
777 }
778
779 pub fn subscription_response(rp: RawResponseOwned, subscription: SubscriptionResponse) -> Self {
781 Self { rp, subscription: Some(subscription) }
782 }
783
784 pub fn into_parts(self) -> (RawResponseOwned, Option<SubscriptionResponse>) {
786 (self.rp, self.subscription)
787 }
788
789 pub fn into_response(self) -> RawResponseOwned {
791 self.rp
792 }
793
794 pub fn into_subscription(self) -> Option<SubscriptionResponse> {
796 self.subscription
797 }
798}
799
800#[derive(Debug, Clone)]
802pub struct MiddlewareNotifResponse(Extensions);
803
804impl From<Extensions> for MiddlewareNotifResponse {
805 fn from(extensions: Extensions) -> Self {
806 Self(extensions)
807 }
808}
809
810impl MiddlewareNotifResponse {
811 pub fn extensions(&self) -> &Extensions {
813 &self.0
814 }
815
816 pub fn extensions_mut(&mut self) -> &mut Extensions {
818 &mut self.0
819 }
820}
821
822impl<T: Serialize> ToJson for Result<T, Error> {
823 fn to_json(&self) -> Result<Box<RawValue>, serde_json::Error> {
824 match self {
825 Ok(v) => serde_json::value::to_raw_value(v),
826 Err(e) => serde_json::value::to_raw_value(&e.to_string()),
827 }
828 }
829}