1cfg_async_client! {
30 pub mod async_client;
31 pub use async_client::{Client, ClientBuilder};
32}
33
34pub mod error;
35pub use error::Error;
36
37use std::fmt;
38use std::ops::Range;
39use std::pin::Pin;
40use std::sync::atomic::{AtomicUsize, Ordering};
41use std::sync::{Arc, RwLock};
42use std::task::{self, Poll};
43use tokio::sync::mpsc::error::TrySendError;
44
45use crate::params::BatchRequestBuilder;
46use crate::traits::ToRpcParams;
47use async_trait::async_trait;
48use core::marker::PhantomData;
49use futures_util::stream::{Stream, StreamExt};
50use jsonrpsee_types::{ErrorObject, Id, SubscriptionId};
51use serde::de::DeserializeOwned;
52use serde_json::Value as JsonValue;
53use tokio::sync::{mpsc, oneshot};
54
55#[derive(Debug, Clone)]
57pub(crate) struct SubscriptionLagged(Arc<RwLock<bool>>);
58
59impl SubscriptionLagged {
60 pub(crate) fn new() -> Self {
62 Self(Arc::new(RwLock::new(false)))
63 }
64
65 pub(crate) fn set_lagged(&self) {
67 *self.0.write().expect("RwLock not poised; qed") = true;
68 }
69
70 pub(crate) fn has_lagged(&self) -> bool {
72 *self.0.read().expect("RwLock not poised; qed")
73 }
74}
75
76#[doc(hidden)]
78pub mod __reexports {
79 pub use crate::traits::ToRpcParams;
81 pub use crate::params::ArrayParams;
83}
84
85#[async_trait]
87pub trait ClientT {
88 async fn notification<Params>(&self, method: &str, params: Params) -> Result<(), Error>
90 where
91 Params: ToRpcParams + Send;
92
93 async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, Error>
95 where
96 R: DeserializeOwned,
97 Params: ToRpcParams + Send;
98
99 async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result<BatchResponse<'a, R>, Error>
107 where
108 R: DeserializeOwned + fmt::Debug + 'a;
109}
110
111#[async_trait]
113pub trait SubscriptionClientT: ClientT {
114 async fn subscribe<'a, Notif, Params>(
127 &self,
128 subscribe_method: &'a str,
129 params: Params,
130 unsubscribe_method: &'a str,
131 ) -> Result<Subscription<Notif>, Error>
132 where
133 Params: ToRpcParams + Send,
134 Notif: DeserializeOwned;
135
136 async fn subscribe_to_method<'a, Notif>(&self, method: &'a str) -> Result<Subscription<Notif>, Error>
141 where
142 Notif: DeserializeOwned;
143}
144
145#[cfg(target_arch = "wasm32")]
147pub trait MaybeSend {}
148
149#[cfg(not(target_arch = "wasm32"))]
151pub trait MaybeSend: Send {}
152
153#[cfg(not(target_arch = "wasm32"))]
154impl<T: Send> MaybeSend for T {}
155
156#[cfg(target_arch = "wasm32")]
157impl<T> MaybeSend for T {}
158
159#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
161#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
162pub trait TransportSenderT: MaybeSend + 'static {
163 type Error: std::error::Error + Send + Sync;
165
166 async fn send(&mut self, msg: String) -> Result<(), Self::Error>;
168
169 async fn send_ping(&mut self) -> Result<(), Self::Error> {
174 Ok(())
175 }
176
177 async fn close(&mut self) -> Result<(), Self::Error> {
182 Ok(())
183 }
184}
185
186#[derive(Debug, Clone)]
189pub enum ReceivedMessage {
190 Text(String),
192 Bytes(Vec<u8>),
194 Pong,
196}
197
198#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
200#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
201pub trait TransportReceiverT: 'static {
202 type Error: std::error::Error + Send + Sync;
204
205 async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error>;
207}
208
209#[macro_export]
216macro_rules! rpc_params {
217 ($($param:expr),*) => {
218 {
219 let mut params = $crate::client::__reexports::ArrayParams::new();
220 $(
221 if let Err(err) = params.insert($param) {
222 panic!("Parameter `{}` cannot be serialized: {:?}", stringify!($param), err);
223 }
224 )*
225 params
226 }
227 };
228}
229
230#[derive(Debug, Clone)]
232#[non_exhaustive]
233pub enum SubscriptionKind {
234 Subscription(SubscriptionId<'static>),
236 Method(String),
238}
239
240#[derive(Debug, Copy, Clone)]
242pub enum SubscriptionCloseReason {
243 ConnectionClosed,
245 Lagged,
247}
248
249#[derive(Debug)]
268pub struct Subscription<Notif> {
269 is_closed: bool,
270 to_back: mpsc::Sender<FrontToBack>,
272 rx: SubscriptionReceiver,
274 kind: Option<SubscriptionKind>,
276 marker: PhantomData<Notif>,
278}
279
280impl<Notif> std::marker::Unpin for Subscription<Notif> {}
283
284impl<Notif> Subscription<Notif> {
285 fn new(to_back: mpsc::Sender<FrontToBack>, rx: SubscriptionReceiver, kind: SubscriptionKind) -> Self {
287 Self { to_back, rx, kind: Some(kind), marker: PhantomData, is_closed: false }
288 }
289
290 pub fn kind(&self) -> &SubscriptionKind {
292 self.kind.as_ref().expect("only None after unsubscribe; qed")
293 }
294
295 pub async fn unsubscribe(mut self) -> Result<(), Error> {
297 let msg = match self.kind.take().expect("only None after unsubscribe; qed") {
298 SubscriptionKind::Method(notif) => FrontToBack::UnregisterNotification(notif),
299 SubscriptionKind::Subscription(sub_id) => FrontToBack::SubscriptionClosed(sub_id),
300 };
301 let _ = self.to_back.send(msg).await;
303
304 while self.rx.next().await.is_some() {}
306
307 Ok(())
308 }
309
310 pub fn close_reason(&self) -> Option<SubscriptionCloseReason> {
315 let lagged = self.rx.lagged.has_lagged();
316
317 if !self.is_closed && !lagged {
320 return None;
321 }
322
323 if lagged {
324 Some(SubscriptionCloseReason::Lagged)
325 } else {
326 Some(SubscriptionCloseReason::ConnectionClosed)
327 }
328 }
329}
330
331#[derive(Debug)]
333struct BatchMessage {
334 raw: String,
336 ids: Range<u64>,
338 send_back: oneshot::Sender<Result<Vec<BatchEntry<'static, JsonValue>>, Error>>,
340}
341
342#[derive(Debug)]
344struct RequestMessage {
345 raw: String,
347 id: Id<'static>,
349 send_back: Option<oneshot::Sender<Result<JsonValue, Error>>>,
351}
352
353#[derive(Debug)]
355struct SubscriptionMessage {
356 raw: String,
358 subscribe_id: Id<'static>,
360 unsubscribe_id: Id<'static>,
362 unsubscribe_method: String,
364 send_back: oneshot::Sender<Result<(SubscriptionReceiver, SubscriptionId<'static>), Error>>,
368}
369
370#[derive(Debug)]
372struct RegisterNotificationMessage {
373 method: String,
375 send_back: oneshot::Sender<Result<(SubscriptionReceiver, String), Error>>,
379}
380
381#[derive(Debug)]
383enum FrontToBack {
384 Batch(BatchMessage),
386 Notification(String),
388 Request(RequestMessage),
390 Subscribe(SubscriptionMessage),
392 RegisterNotification(RegisterNotificationMessage),
394 UnregisterNotification(String),
396 SubscriptionClosed(SubscriptionId<'static>),
402}
403
404impl<Notif> Subscription<Notif>
405where
406 Notif: DeserializeOwned,
407{
408 #[allow(clippy::should_implement_trait)]
416 pub async fn next(&mut self) -> Option<Result<Notif, serde_json::Error>> {
417 StreamExt::next(self).await
418 }
419}
420
421impl<Notif> Stream for Subscription<Notif>
422where
423 Notif: DeserializeOwned,
424{
425 type Item = Result<Notif, serde_json::Error>;
426 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
427 let res = match futures_util::ready!(self.rx.poll_next_unpin(cx)) {
428 Some(v) => Some(serde_json::from_value::<Notif>(v).map_err(Into::into)),
429 None => {
430 self.is_closed = true;
431 None
432 }
433 };
434
435 Poll::Ready(res)
436 }
437}
438
439impl<Notif> Drop for Subscription<Notif> {
440 fn drop(&mut self) {
441 let msg = match self.kind.take() {
447 Some(SubscriptionKind::Method(notif)) => FrontToBack::UnregisterNotification(notif),
448 Some(SubscriptionKind::Subscription(sub_id)) => FrontToBack::SubscriptionClosed(sub_id),
449 None => return,
450 };
451 let _ = self.to_back.try_send(msg);
452 }
453}
454
455#[derive(Debug)]
456pub struct RequestIdManager {
458 current_id: CurrentId,
460 id_kind: IdKind,
462}
463
464impl RequestIdManager {
465 pub fn new(id_kind: IdKind) -> Self {
467 Self { current_id: CurrentId::new(), id_kind }
468 }
469
470 pub fn next_request_id(&self) -> Id<'static> {
472 self.id_kind.into_id(self.current_id.next())
473 }
474
475 pub fn as_id_kind(&self) -> IdKind {
477 self.id_kind
478 }
479}
480
481#[derive(Debug, Copy, Clone)]
483pub enum IdKind {
484 String,
486 Number,
488}
489
490impl IdKind {
491 pub fn into_id(self, id: u64) -> Id<'static> {
493 match self {
494 IdKind::Number => Id::Number(id),
495 IdKind::String => Id::Str(format!("{id}").into()),
496 }
497 }
498}
499
500#[derive(Debug)]
501struct CurrentId(AtomicUsize);
502
503impl CurrentId {
504 fn new() -> Self {
505 CurrentId(AtomicUsize::new(0))
506 }
507
508 fn next(&self) -> u64 {
509 self.0
510 .fetch_add(1, Ordering::Relaxed)
511 .try_into()
512 .expect("usize -> u64 infallible, there are no CPUs > 64 bits; qed")
513 }
514}
515
516pub fn generate_batch_id_range(id: Id, len: u64) -> Result<Range<u64>, Error> {
518 let id_start = id.try_parse_inner_as_number()?;
519 let id_end = id_start
520 .checked_add(len)
521 .ok_or_else(|| Error::Custom("BatchID range wrapped; restart the client or try again later".to_string()))?;
522
523 Ok(id_start..id_end)
524}
525
526pub type BatchEntry<'a, R> = Result<R, ErrorObject<'a>>;
528
529#[derive(Debug, Clone)]
531pub struct BatchResponse<'a, R> {
532 successful_calls: usize,
533 failed_calls: usize,
534 responses: Vec<BatchEntry<'a, R>>,
535}
536
537impl<'a, R: fmt::Debug + 'a> BatchResponse<'a, R> {
538 pub fn new(successful_calls: usize, responses: Vec<BatchEntry<'a, R>>, failed_calls: usize) -> Self {
540 Self { successful_calls, responses, failed_calls }
541 }
542
543 pub fn len(&self) -> usize {
545 self.responses.len()
546 }
547
548 pub fn is_empty(&self) -> bool {
550 self.responses.len() == 0
551 }
552
553 pub fn num_successful_calls(&self) -> usize {
555 self.successful_calls
556 }
557
558 pub fn num_failed_calls(&self) -> usize {
560 self.failed_calls
561 }
562
563 pub fn into_ok(
569 self,
570 ) -> Result<impl Iterator<Item = R> + 'a + std::fmt::Debug, impl Iterator<Item = ErrorObject<'a>> + std::fmt::Debug>
571 {
572 if self.failed_calls > 0 {
573 Err(self.into_iter().filter_map(|err| err.err()))
574 } else {
575 Ok(self.into_iter().filter_map(|r| r.ok()))
576 }
577 }
578
579 pub fn ok(
581 &self,
582 ) -> Result<impl Iterator<Item = &R> + std::fmt::Debug, impl Iterator<Item = &ErrorObject<'a>> + std::fmt::Debug> {
583 if self.failed_calls > 0 {
584 Err(self.responses.iter().filter_map(|err| err.as_ref().err()))
585 } else {
586 Ok(self.responses.iter().filter_map(|r| r.as_ref().ok()))
587 }
588 }
589
590 pub fn iter(&self) -> impl Iterator<Item = &BatchEntry<'_, R>> {
592 self.responses.iter()
593 }
594}
595
596impl<'a, R> IntoIterator for BatchResponse<'a, R> {
597 type Item = BatchEntry<'a, R>;
598 type IntoIter = std::vec::IntoIter<Self::Item>;
599
600 fn into_iter(self) -> Self::IntoIter {
601 self.responses.into_iter()
602 }
603}
604
605#[derive(thiserror::Error, Debug)]
606enum TrySubscriptionSendError {
607 #[error("The subscription is closed")]
608 Closed,
609 #[error("A subscription message was dropped")]
610 TooSlow(JsonValue),
611}
612
613#[derive(Debug)]
614pub(crate) struct SubscriptionSender {
615 inner: mpsc::Sender<JsonValue>,
616 lagged: SubscriptionLagged,
617}
618
619impl SubscriptionSender {
620 fn send(&self, msg: JsonValue) -> Result<(), TrySubscriptionSendError> {
621 match self.inner.try_send(msg) {
622 Ok(_) => Ok(()),
623 Err(TrySendError::Closed(_)) => Err(TrySubscriptionSendError::Closed),
624 Err(TrySendError::Full(m)) => {
625 self.lagged.set_lagged();
626 Err(TrySubscriptionSendError::TooSlow(m))
627 }
628 }
629 }
630}
631
632#[derive(Debug)]
633pub(crate) struct SubscriptionReceiver {
634 inner: mpsc::Receiver<JsonValue>,
635 lagged: SubscriptionLagged,
636}
637
638impl Stream for SubscriptionReceiver {
639 type Item = JsonValue;
640
641 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
642 self.inner.poll_recv(cx)
643 }
644}
645
646fn subscription_channel(max_buf_size: usize) -> (SubscriptionSender, SubscriptionReceiver) {
647 let (tx, rx) = mpsc::channel(max_buf_size);
648 let lagged_tx = SubscriptionLagged::new();
649 let lagged_rx = lagged_tx.clone();
650
651 (SubscriptionSender { inner: tx, lagged: lagged_tx }, SubscriptionReceiver { inner: rx, lagged: lagged_rx })
652}