jsonrpsee_core/client/
mod.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27//! Shared utilities for `jsonrpsee` clients.
28
29cfg_async_client! {
30	pub mod async_client;
31	pub use async_client::{Client, ClientBuilder};
32}
33
34pub mod error;
35pub use error::Error;
36
37use std::fmt;
38use std::ops::Range;
39use std::pin::Pin;
40use std::sync::atomic::{AtomicUsize, Ordering};
41use std::sync::{Arc, RwLock};
42use std::task::{self, Poll};
43use tokio::sync::mpsc::error::TrySendError;
44
45use crate::params::BatchRequestBuilder;
46use crate::traits::ToRpcParams;
47use async_trait::async_trait;
48use core::marker::PhantomData;
49use futures_util::stream::{Stream, StreamExt};
50use jsonrpsee_types::{ErrorObject, Id, SubscriptionId};
51use serde::de::DeserializeOwned;
52use serde_json::Value as JsonValue;
53use tokio::sync::{mpsc, oneshot};
54
55/// Shared state whether a subscription has lagged or not.
56#[derive(Debug, Clone)]
57pub(crate) struct SubscriptionLagged(Arc<RwLock<bool>>);
58
59impl SubscriptionLagged {
60	/// Create a new [`SubscriptionLagged`].
61	pub(crate) fn new() -> Self {
62		Self(Arc::new(RwLock::new(false)))
63	}
64
65	/// A message has been missed.
66	pub(crate) fn set_lagged(&self) {
67		*self.0.write().expect("RwLock not poised; qed") = true;
68	}
69
70	/// Check whether the subscription has missed a message.
71	pub(crate) fn has_lagged(&self) -> bool {
72		*self.0.read().expect("RwLock not poised; qed")
73	}
74}
75
76// Re-exports for the `rpc_params` macro.
77#[doc(hidden)]
78pub mod __reexports {
79	// Needs to be in scope for `ArrayParams` to implement it.
80	pub use crate::traits::ToRpcParams;
81	// Main builder object for constructing the rpc parameters.
82	pub use crate::params::ArrayParams;
83}
84
85/// [JSON-RPC](https://www.jsonrpc.org/specification) client interface that can make requests and notifications.
86#[async_trait]
87pub trait ClientT {
88	/// Send a [notification request](https://www.jsonrpc.org/specification#notification)
89	async fn notification<Params>(&self, method: &str, params: Params) -> Result<(), Error>
90	where
91		Params: ToRpcParams + Send;
92
93	/// Send a [method call request](https://www.jsonrpc.org/specification#request_object).
94	async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, Error>
95	where
96		R: DeserializeOwned,
97		Params: ToRpcParams + Send;
98
99	/// Send a [batch request](https://www.jsonrpc.org/specification#batch).
100	///
101	/// The response to batch are returned in the same order as it was inserted in the batch.
102	///
103	///
104	/// Returns `Ok` if all requests in the batch were answered.
105	/// Returns `Error` if the network failed or any of the responses could be parsed a valid JSON-RPC response.
106	async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result<BatchResponse<'a, R>, Error>
107	where
108		R: DeserializeOwned + fmt::Debug + 'a;
109}
110
111/// [JSON-RPC](https://www.jsonrpc.org/specification) client interface that can make requests, notifications and subscriptions.
112#[async_trait]
113pub trait SubscriptionClientT: ClientT {
114	/// Initiate a subscription by performing a JSON-RPC method call where the server responds with
115	/// a `Subscription ID` that is used to fetch messages on that subscription,
116	///
117	/// The `subscribe_method` and `params` are used to ask for the subscription towards the
118	/// server.
119	///
120	/// The params may be used as input for the subscription for the server to process.
121	///
122	/// The `unsubscribe_method` is used to close the subscription
123	///
124	/// The `Notif` param is a generic type to receive generic subscriptions, see [`Subscription`] for further
125	/// documentation.
126	async fn subscribe<'a, Notif, Params>(
127		&self,
128		subscribe_method: &'a str,
129		params: Params,
130		unsubscribe_method: &'a str,
131	) -> Result<Subscription<Notif>, Error>
132	where
133		Params: ToRpcParams + Send,
134		Notif: DeserializeOwned;
135
136	/// Register a method subscription, this is used to filter only server notifications that a user is interested in.
137	///
138	/// The `Notif` param is a generic type to receive generic subscriptions, see [`Subscription`] for further
139	/// documentation.
140	async fn subscribe_to_method<'a, Notif>(&self, method: &'a str) -> Result<Subscription<Notif>, Error>
141	where
142		Notif: DeserializeOwned;
143}
144
145/// Marker trait to determine whether a type implements `Send` or not.
146#[cfg(target_arch = "wasm32")]
147pub trait MaybeSend {}
148
149/// Marker trait to determine whether a type implements `Send` or not.
150#[cfg(not(target_arch = "wasm32"))]
151pub trait MaybeSend: Send {}
152
153#[cfg(not(target_arch = "wasm32"))]
154impl<T: Send> MaybeSend for T {}
155
156#[cfg(target_arch = "wasm32")]
157impl<T> MaybeSend for T {}
158
159/// Transport interface to send data asynchronous.
160#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
161#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
162pub trait TransportSenderT: MaybeSend + 'static {
163	/// Error that may occur during sending a message.
164	type Error: std::error::Error + Send + Sync;
165
166	/// Send.
167	async fn send(&mut self, msg: String) -> Result<(), Self::Error>;
168
169	/// This is optional because it's most likely relevant for WebSocket transports only.
170	/// You should only implement this is your transport supports sending periodic pings.
171	///
172	/// Send ping frame (opcode of 0x9).
173	async fn send_ping(&mut self) -> Result<(), Self::Error> {
174		Ok(())
175	}
176
177	/// This is optional because it's most likely relevant for WebSocket transports only.
178	/// You should only implement this is your transport supports being closed.
179	///
180	/// Send customized close message.
181	async fn close(&mut self) -> Result<(), Self::Error> {
182		Ok(())
183	}
184}
185
186/// Message type received from the RPC server.
187/// It can either be plain text data, bytes, or `Pong` messages.
188#[derive(Debug, Clone)]
189pub enum ReceivedMessage {
190	/// Incoming packet contains plain `String` data.
191	Text(String),
192	/// Incoming packet contains bytes.
193	Bytes(Vec<u8>),
194	/// Incoming `Pong` frame as a reply to a previously submitted `Ping` frame.
195	Pong,
196}
197
198/// Transport interface to receive data asynchronous.
199#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
200#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
201pub trait TransportReceiverT: 'static {
202	/// Error that may occur during receiving a message.
203	type Error: std::error::Error + Send + Sync;
204
205	/// Receive.
206	async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error>;
207}
208
209/// Convert the given values to a [`crate::params::ArrayParams`] as expected by a
210/// jsonrpsee Client (http or websocket).
211///
212/// # Panics
213///
214/// Panics if the serialization of parameters fails.
215#[macro_export]
216macro_rules! rpc_params {
217	($($param:expr),*) => {
218		{
219			let mut params = $crate::client::__reexports::ArrayParams::new();
220			$(
221				if let Err(err) = params.insert($param) {
222					panic!("Parameter `{}` cannot be serialized: {:?}", stringify!($param), err);
223				}
224			)*
225			params
226		}
227	};
228}
229
230/// Subscription kind
231#[derive(Debug, Clone)]
232#[non_exhaustive]
233pub enum SubscriptionKind {
234	/// Get notifications based on Subscription ID.
235	Subscription(SubscriptionId<'static>),
236	/// Get notifications based on method name.
237	Method(String),
238}
239
240/// The reason why the subscription was closed.
241#[derive(Debug, Copy, Clone)]
242pub enum SubscriptionCloseReason {
243	/// The connection was closed.
244	ConnectionClosed,
245	/// The subscription could not keep up with the server.
246	Lagged,
247}
248
249/// Represent a client-side subscription which is implemented on top of
250/// a bounded channel where it's possible that the receiver may
251/// not keep up with the sender side a.k.a "slow receiver problem"
252///
253/// The Subscription will try to `unsubscribe` in the drop implementation
254/// but it may fail if the underlying buffer is full.
255/// Thus, if you want to ensure it's actually unsubscribed then
256/// [`Subscription::unsubscribe`] is recommended to use.
257///
258/// ## Lagging
259///
260/// All messages from the server must be kept in a buffer in the client
261/// until they are read by polling the [`Subscription`]. If you don't
262/// poll the client subscription quickly enough, the buffer may fill
263/// up and when subscription is full the subscription is then closed.
264///
265/// You can call [`Subscription::close_reason`] to determine why
266/// the subscription was closed.
267#[derive(Debug)]
268pub struct Subscription<Notif> {
269	is_closed: bool,
270	/// Channel to send requests to the background task.
271	to_back: mpsc::Sender<FrontToBack>,
272	/// Channel from which we receive notifications from the server, as encoded `JsonValue`s.
273	rx: SubscriptionReceiver,
274	/// Callback kind.
275	kind: Option<SubscriptionKind>,
276	/// Marker in order to pin the `Notif` parameter.
277	marker: PhantomData<Notif>,
278}
279
280// `Subscription` does not automatically implement this due to `PhantomData<Notif>`,
281// but type type has no need to be pinned.
282impl<Notif> std::marker::Unpin for Subscription<Notif> {}
283
284impl<Notif> Subscription<Notif> {
285	/// Create a new subscription.
286	fn new(to_back: mpsc::Sender<FrontToBack>, rx: SubscriptionReceiver, kind: SubscriptionKind) -> Self {
287		Self { to_back, rx, kind: Some(kind), marker: PhantomData, is_closed: false }
288	}
289
290	/// Return the subscription type and, if applicable, ID.
291	pub fn kind(&self) -> &SubscriptionKind {
292		self.kind.as_ref().expect("only None after unsubscribe; qed")
293	}
294
295	/// Unsubscribe and consume the subscription.
296	pub async fn unsubscribe(mut self) -> Result<(), Error> {
297		let msg = match self.kind.take().expect("only None after unsubscribe; qed") {
298			SubscriptionKind::Method(notif) => FrontToBack::UnregisterNotification(notif),
299			SubscriptionKind::Subscription(sub_id) => FrontToBack::SubscriptionClosed(sub_id),
300		};
301		// If this fails the connection was already closed i.e, already "unsubscribed".
302		let _ = self.to_back.send(msg).await;
303
304		// wait until notif channel is closed then the subscription was closed.
305		while self.rx.next().await.is_some() {}
306
307		Ok(())
308	}
309
310	/// The reason why the subscription was closed.
311	///
312	/// Returns Some(reason) is the subscription was closed otherwise
313	/// None is returned.
314	pub fn close_reason(&self) -> Option<SubscriptionCloseReason> {
315		let lagged = self.rx.lagged.has_lagged();
316
317		// `is_closed` is only set if the subscription has been polled
318		// and that is why lagged is checked here as well.
319		if !self.is_closed && !lagged {
320			return None;
321		}
322
323		if lagged {
324			Some(SubscriptionCloseReason::Lagged)
325		} else {
326			Some(SubscriptionCloseReason::ConnectionClosed)
327		}
328	}
329}
330
331/// Batch request message.
332#[derive(Debug)]
333struct BatchMessage {
334	/// Serialized batch request.
335	raw: String,
336	/// Request IDs.
337	ids: Range<u64>,
338	/// One-shot channel over which we send back the result of this request.
339	send_back: oneshot::Sender<Result<Vec<BatchEntry<'static, JsonValue>>, Error>>,
340}
341
342/// Request message.
343#[derive(Debug)]
344struct RequestMessage {
345	/// Serialized message.
346	raw: String,
347	/// Request ID.
348	id: Id<'static>,
349	/// One-shot channel over which we send back the result of this request.
350	send_back: Option<oneshot::Sender<Result<JsonValue, Error>>>,
351}
352
353/// Subscription message.
354#[derive(Debug)]
355struct SubscriptionMessage {
356	/// Serialized message.
357	raw: String,
358	/// Request ID of the subscribe message.
359	subscribe_id: Id<'static>,
360	/// Request ID of the unsubscribe message.
361	unsubscribe_id: Id<'static>,
362	/// Method to use to unsubscribe later. Used if the channel unexpectedly closes.
363	unsubscribe_method: String,
364	/// If the subscription succeeds, we return a [`mpsc::Receiver`] that will receive notifications.
365	/// When we get a response from the server about that subscription, we send the result over
366	/// this channel.
367	send_back: oneshot::Sender<Result<(SubscriptionReceiver, SubscriptionId<'static>), Error>>,
368}
369
370/// RegisterNotification message.
371#[derive(Debug)]
372struct RegisterNotificationMessage {
373	/// Method name this notification handler is attached to
374	method: String,
375	/// We return a [`mpsc::Receiver`] that will receive notifications.
376	/// When we get a response from the server about that subscription, we send the result over
377	/// this channel.
378	send_back: oneshot::Sender<Result<(SubscriptionReceiver, String), Error>>,
379}
380
381/// Message that the Client can send to the background task.
382#[derive(Debug)]
383enum FrontToBack {
384	/// Send a batch request to the server.
385	Batch(BatchMessage),
386	/// Send a notification to the server.
387	Notification(String),
388	/// Send a request to the server.
389	Request(RequestMessage),
390	/// Send a subscription request to the server.
391	Subscribe(SubscriptionMessage),
392	/// Register a notification handler
393	RegisterNotification(RegisterNotificationMessage),
394	/// Unregister a notification handler
395	UnregisterNotification(String),
396	/// When a subscription channel is closed, we send this message to the background
397	/// task to mark it ready for garbage collection.
398	// NOTE: It is not possible to cancel pending subscriptions or pending requests.
399	// Such operations will be blocked until a response is received or the background
400	// thread has been terminated.
401	SubscriptionClosed(SubscriptionId<'static>),
402}
403
404impl<Notif> Subscription<Notif>
405where
406	Notif: DeserializeOwned,
407{
408	/// Returns the next notification from the stream.
409	/// This may return `None` if the subscription has been terminated,
410	/// which may happen if the channel becomes full or is dropped.
411	///
412	/// **Note:** This has an identical signature to the [`StreamExt::next`]
413	/// method (and delegates to that). Import [`StreamExt`] if you'd like
414	/// access to other stream combinator methods.
415	#[allow(clippy::should_implement_trait)]
416	pub async fn next(&mut self) -> Option<Result<Notif, serde_json::Error>> {
417		StreamExt::next(self).await
418	}
419}
420
421impl<Notif> Stream for Subscription<Notif>
422where
423	Notif: DeserializeOwned,
424{
425	type Item = Result<Notif, serde_json::Error>;
426	fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
427		let res = match futures_util::ready!(self.rx.poll_next_unpin(cx)) {
428			Some(v) => Some(serde_json::from_value::<Notif>(v).map_err(Into::into)),
429			None => {
430				self.is_closed = true;
431				None
432			}
433		};
434
435		Poll::Ready(res)
436	}
437}
438
439impl<Notif> Drop for Subscription<Notif> {
440	fn drop(&mut self) {
441		// We can't actually guarantee that this goes through. If the background task is busy, then
442		// the channel's buffer will be full.
443		// However, when a notification arrives, the background task will realize that the channel
444		// to the `Callback` has been closed.
445
446		let msg = match self.kind.take() {
447			Some(SubscriptionKind::Method(notif)) => FrontToBack::UnregisterNotification(notif),
448			Some(SubscriptionKind::Subscription(sub_id)) => FrontToBack::SubscriptionClosed(sub_id),
449			None => return,
450		};
451		let _ = self.to_back.try_send(msg);
452	}
453}
454
455#[derive(Debug)]
456/// Keep track of request IDs.
457pub struct RequestIdManager {
458	/// Get the next request ID.
459	current_id: CurrentId,
460	/// Request ID type.
461	id_kind: IdKind,
462}
463
464impl RequestIdManager {
465	/// Create a new `RequestIdGuard` with the provided concurrency limit.
466	pub fn new(id_kind: IdKind) -> Self {
467		Self { current_id: CurrentId::new(), id_kind }
468	}
469
470	/// Attempts to get the next request ID.
471	pub fn next_request_id(&self) -> Id<'static> {
472		self.id_kind.into_id(self.current_id.next())
473	}
474
475	/// Get a handle to the `IdKind`.
476	pub fn as_id_kind(&self) -> IdKind {
477		self.id_kind
478	}
479}
480
481/// JSON-RPC request object id data type.
482#[derive(Debug, Copy, Clone)]
483pub enum IdKind {
484	/// String.
485	String,
486	/// Number.
487	Number,
488}
489
490impl IdKind {
491	/// Generate an `Id` from number.
492	pub fn into_id(self, id: u64) -> Id<'static> {
493		match self {
494			IdKind::Number => Id::Number(id),
495			IdKind::String => Id::Str(format!("{id}").into()),
496		}
497	}
498}
499
500#[derive(Debug)]
501struct CurrentId(AtomicUsize);
502
503impl CurrentId {
504	fn new() -> Self {
505		CurrentId(AtomicUsize::new(0))
506	}
507
508	fn next(&self) -> u64 {
509		self.0
510			.fetch_add(1, Ordering::Relaxed)
511			.try_into()
512			.expect("usize -> u64 infallible, there are no CPUs > 64 bits; qed")
513	}
514}
515
516/// Generate a range of IDs to be used in a batch request.
517pub fn generate_batch_id_range(id: Id, len: u64) -> Result<Range<u64>, Error> {
518	let id_start = id.try_parse_inner_as_number()?;
519	let id_end = id_start
520		.checked_add(len)
521		.ok_or_else(|| Error::Custom("BatchID range wrapped; restart the client or try again later".to_string()))?;
522
523	Ok(id_start..id_end)
524}
525
526/// Represent a single entry in a batch response.
527pub type BatchEntry<'a, R> = Result<R, ErrorObject<'a>>;
528
529/// Batch response.
530#[derive(Debug, Clone)]
531pub struct BatchResponse<'a, R> {
532	successful_calls: usize,
533	failed_calls: usize,
534	responses: Vec<BatchEntry<'a, R>>,
535}
536
537impl<'a, R: fmt::Debug + 'a> BatchResponse<'a, R> {
538	/// Create a new [`BatchResponse`].
539	pub fn new(successful_calls: usize, responses: Vec<BatchEntry<'a, R>>, failed_calls: usize) -> Self {
540		Self { successful_calls, responses, failed_calls }
541	}
542
543	/// Get the length of the batch response.
544	pub fn len(&self) -> usize {
545		self.responses.len()
546	}
547
548	/// Is empty.
549	pub fn is_empty(&self) -> bool {
550		self.responses.len() == 0
551	}
552
553	/// Get the number of successful calls in the batch.
554	pub fn num_successful_calls(&self) -> usize {
555		self.successful_calls
556	}
557
558	/// Get the number of failed calls in the batch.
559	pub fn num_failed_calls(&self) -> usize {
560		self.failed_calls
561	}
562
563	/// Returns `Ok(iterator)` if all responses were successful
564	/// otherwise `Err(iterator)` is returned.
565	///
566	/// If you want get all responses if an error responses occurs use [`BatchResponse::into_iter`]
567	/// instead where it's possible to implement customized logic.
568	pub fn into_ok(
569		self,
570	) -> Result<impl Iterator<Item = R> + 'a + std::fmt::Debug, impl Iterator<Item = ErrorObject<'a>> + std::fmt::Debug>
571	{
572		if self.failed_calls > 0 {
573			Err(self.into_iter().filter_map(|err| err.err()))
574		} else {
575			Ok(self.into_iter().filter_map(|r| r.ok()))
576		}
577	}
578
579	/// Similar to [`BatchResponse::into_ok`] but takes the responses by reference instead.
580	pub fn ok(
581		&self,
582	) -> Result<impl Iterator<Item = &R> + std::fmt::Debug, impl Iterator<Item = &ErrorObject<'a>> + std::fmt::Debug> {
583		if self.failed_calls > 0 {
584			Err(self.responses.iter().filter_map(|err| err.as_ref().err()))
585		} else {
586			Ok(self.responses.iter().filter_map(|r| r.as_ref().ok()))
587		}
588	}
589
590	/// Returns an iterator over all responses.
591	pub fn iter(&self) -> impl Iterator<Item = &BatchEntry<'_, R>> {
592		self.responses.iter()
593	}
594}
595
596impl<'a, R> IntoIterator for BatchResponse<'a, R> {
597	type Item = BatchEntry<'a, R>;
598	type IntoIter = std::vec::IntoIter<Self::Item>;
599
600	fn into_iter(self) -> Self::IntoIter {
601		self.responses.into_iter()
602	}
603}
604
605#[derive(thiserror::Error, Debug)]
606enum TrySubscriptionSendError {
607	#[error("The subscription is closed")]
608	Closed,
609	#[error("A subscription message was dropped")]
610	TooSlow(JsonValue),
611}
612
613#[derive(Debug)]
614pub(crate) struct SubscriptionSender {
615	inner: mpsc::Sender<JsonValue>,
616	lagged: SubscriptionLagged,
617}
618
619impl SubscriptionSender {
620	fn send(&self, msg: JsonValue) -> Result<(), TrySubscriptionSendError> {
621		match self.inner.try_send(msg) {
622			Ok(_) => Ok(()),
623			Err(TrySendError::Closed(_)) => Err(TrySubscriptionSendError::Closed),
624			Err(TrySendError::Full(m)) => {
625				self.lagged.set_lagged();
626				Err(TrySubscriptionSendError::TooSlow(m))
627			}
628		}
629	}
630}
631
632#[derive(Debug)]
633pub(crate) struct SubscriptionReceiver {
634	inner: mpsc::Receiver<JsonValue>,
635	lagged: SubscriptionLagged,
636}
637
638impl Stream for SubscriptionReceiver {
639	type Item = JsonValue;
640
641	fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
642		self.inner.poll_recv(cx)
643	}
644}
645
646fn subscription_channel(max_buf_size: usize) -> (SubscriptionSender, SubscriptionReceiver) {
647	let (tx, rx) = mpsc::channel(max_buf_size);
648	let lagged_tx = SubscriptionLagged::new();
649	let lagged_rx = lagged_tx.clone();
650
651	(SubscriptionSender { inner: tx, lagged: lagged_tx }, SubscriptionReceiver { inner: rx, lagged: lagged_rx })
652}