jsonrpsee_core/client/
mod.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27//! Shared utilities for `jsonrpsee` clients.
28
29cfg_async_client! {
30	pub mod async_client;
31	pub use async_client::{Client, ClientBuilder};
32}
33
34pub mod error;
35
36pub use error::Error;
37
38use std::fmt;
39use std::future::Future;
40use std::ops::{Deref, Range};
41use std::pin::Pin;
42use std::sync::atomic::{AtomicUsize, Ordering};
43use std::sync::{Arc, RwLock};
44use std::task::{self, Poll};
45
46use crate::params::BatchRequestBuilder;
47use crate::traits::{ToJson, ToRpcParams};
48
49use core::marker::PhantomData;
50use futures_util::stream::{Stream, StreamExt};
51use http::Extensions;
52use jsonrpsee_types::{ErrorObject, Id, InvalidRequestId, SubscriptionId};
53use serde::Serialize;
54use serde::de::DeserializeOwned;
55use serde_json::value::RawValue;
56use tokio::sync::mpsc::error::TrySendError;
57use tokio::sync::{mpsc, oneshot};
58
59/// Shared state whether a subscription has lagged or not.
60#[derive(Debug, Clone)]
61pub(crate) struct SubscriptionLagged(Arc<RwLock<bool>>);
62
63/// Owned version of [`RawResponse`].
64pub type RawResponseOwned = RawResponse<'static>;
65
66impl SubscriptionLagged {
67	/// Create a new [`SubscriptionLagged`].
68	pub(crate) fn new() -> Self {
69		Self(Arc::new(RwLock::new(false)))
70	}
71
72	/// A message has been missed.
73	pub(crate) fn set_lagged(&self) {
74		*self.0.write().expect("RwLock not poised; qed") = true;
75	}
76
77	/// Check whether the subscription has missed a message.
78	pub(crate) fn has_lagged(&self) -> bool {
79		*self.0.read().expect("RwLock not poised; qed")
80	}
81}
82
83// Re-exports for the `rpc_params` macro.
84#[doc(hidden)]
85pub mod __reexports {
86	// Needs to be in scope for `ArrayParams` to implement it.
87	pub use crate::traits::ToRpcParams;
88	// Main builder object for constructing the rpc parameters.
89	pub use crate::params::ArrayParams;
90}
91
92/// [JSON-RPC](https://www.jsonrpc.org/specification) client interface that can make requests and notifications.
93pub trait ClientT {
94	/// Send a [notification request](https://www.jsonrpc.org/specification#notification)
95	fn notification<Params>(&self, method: &str, params: Params) -> impl Future<Output = Result<(), Error>> + Send
96	where
97		Params: ToRpcParams + Send;
98
99	/// Send a [method call request](https://www.jsonrpc.org/specification#request_object).
100	fn request<R, Params>(&self, method: &str, params: Params) -> impl Future<Output = Result<R, Error>> + Send
101	where
102		R: DeserializeOwned,
103		Params: ToRpcParams + Send;
104
105	/// Send a [batch request](https://www.jsonrpc.org/specification#batch).
106	///
107	/// The response to batch are returned in the same order as it was inserted in the batch.
108	///
109	///
110	/// Returns `Ok` if all requests in the batch were answered.
111	/// Returns `Error` if the network failed or any of the responses could be parsed a valid JSON-RPC response.
112	fn batch_request<'a, R>(
113		&self,
114		batch: BatchRequestBuilder<'a>,
115	) -> impl Future<Output = Result<BatchResponse<'a, R>, Error>> + Send
116	where
117		R: DeserializeOwned + fmt::Debug + 'a;
118}
119
120/// [JSON-RPC](https://www.jsonrpc.org/specification) client interface that can make requests, notifications and subscriptions.
121pub trait SubscriptionClientT: ClientT {
122	/// Initiate a subscription by performing a JSON-RPC method call where the server responds with
123	/// a `Subscription ID` that is used to fetch messages on that subscription,
124	///
125	/// The `subscribe_method` and `params` are used to ask for the subscription towards the
126	/// server.
127	///
128	/// The params may be used as input for the subscription for the server to process.
129	///
130	/// The `unsubscribe_method` is used to close the subscription
131	///
132	/// The `Notif` param is a generic type to receive generic subscriptions, see [`Subscription`] for further
133	/// documentation.
134	fn subscribe<'a, Notif, Params>(
135		&self,
136		subscribe_method: &'a str,
137		params: Params,
138		unsubscribe_method: &'a str,
139	) -> impl Future<Output = Result<Subscription<Notif>, Error>> + Send
140	where
141		Params: ToRpcParams + Send,
142		Notif: DeserializeOwned;
143
144	/// Register a method subscription, this is used to filter only server notifications that a user is interested in.
145	///
146	/// The `Notif` param is a generic type to receive generic subscriptions, see [`Subscription`] for further
147	/// documentation.
148	fn subscribe_to_method<Notif>(
149		&self,
150		method: &str,
151	) -> impl Future<Output = Result<Subscription<Notif>, Error>> + Send
152	where
153		Notif: DeserializeOwned;
154}
155
156/// Marker trait to determine whether a type implements `Send` or not.
157#[cfg(target_arch = "wasm32")]
158pub trait MaybeSend {}
159
160/// Marker trait to determine whether a type implements `Send` or not.
161#[cfg(not(target_arch = "wasm32"))]
162pub trait MaybeSend: Send {}
163
164#[cfg(not(target_arch = "wasm32"))]
165impl<T: Send> MaybeSend for T {}
166
167#[cfg(target_arch = "wasm32")]
168impl<T> MaybeSend for T {}
169
170/// Transport interface to send data asynchronous.
171pub trait TransportSenderT: 'static {
172	/// Error that may occur during sending a message.
173	type Error: std::error::Error + Send + Sync;
174
175	/// Send.
176	fn send(&mut self, msg: String) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend;
177
178	/// This is optional because it's most likely relevant for WebSocket transports only.
179	/// You should only implement this is your transport supports sending periodic pings.
180	///
181	/// Send ping frame (opcode of 0x9).
182	fn send_ping(&mut self) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
183		async { Ok(()) }
184	}
185
186	/// This is optional because it's most likely relevant for WebSocket transports only.
187	/// You should only implement this is your transport supports being closed.
188	///
189	/// Send customized close message.
190	fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend {
191		async { Ok(()) }
192	}
193}
194
195/// Message type received from the RPC server.
196/// It can either be plain text data, bytes, or `Pong` messages.
197#[derive(Debug, Clone)]
198pub enum ReceivedMessage {
199	/// Incoming packet contains plain `String` data.
200	Text(String),
201	/// Incoming packet contains bytes.
202	Bytes(Vec<u8>),
203	/// Incoming `Pong` frame as a reply to a previously submitted `Ping` frame.
204	Pong,
205}
206
207/// Transport interface to receive data asynchronous.
208pub trait TransportReceiverT: 'static {
209	/// Error that may occur during receiving a message.
210	type Error: std::error::Error + Send + Sync;
211
212	/// Receive.
213	fn receive(&mut self) -> impl Future<Output = Result<ReceivedMessage, Self::Error>> + MaybeSend;
214}
215
216/// Convert the given values to a [`crate::params::ArrayParams`] as expected by a
217/// jsonrpsee Client (http or websocket).
218///
219/// # Panics
220///
221/// Panics if the serialization of parameters fails.
222#[macro_export]
223macro_rules! rpc_params {
224	($($param:expr),*) => {
225		{
226			let mut params = $crate::client::__reexports::ArrayParams::new();
227			$(
228				if let Err(err) = params.insert($param) {
229					panic!("Parameter `{}` cannot be serialized: {:?}", stringify!($param), err);
230				}
231			)*
232			params
233		}
234	};
235}
236
237/// Subscription kind
238#[derive(Debug, Clone)]
239#[non_exhaustive]
240pub enum SubscriptionKind {
241	/// Get notifications based on Subscription ID.
242	Subscription(SubscriptionId<'static>),
243	/// Get notifications based on method name.
244	Method(String),
245}
246
247/// The reason why the subscription was closed.
248#[derive(Debug, Copy, Clone)]
249pub enum SubscriptionCloseReason {
250	/// The connection was closed.
251	ConnectionClosed,
252	/// The subscription could not keep up with the server.
253	Lagged,
254}
255
256/// Represent a client-side subscription which is implemented on top of
257/// a bounded channel where it's possible that the receiver may
258/// not keep up with the sender side a.k.a "slow receiver problem"
259///
260/// The Subscription will try to `unsubscribe` in the drop implementation
261/// but it may fail if the underlying buffer is full.
262/// Thus, if you want to ensure it's actually unsubscribed then
263/// [`Subscription::unsubscribe`] is recommended to use.
264///
265/// ## Lagging
266///
267/// All messages from the server must be kept in a buffer in the client
268/// until they are read by polling the [`Subscription`]. If you don't
269/// poll the client subscription quickly enough, the buffer may fill
270/// up and when subscription is full the subscription is then closed.
271///
272/// You can call [`Subscription::close_reason`] to determine why
273/// the subscription was closed.
274#[derive(Debug)]
275pub struct Subscription<Notif> {
276	is_closed: bool,
277	/// Channel to send requests to the background task.
278	to_back: mpsc::Sender<FrontToBack>,
279	/// Channel from which we receive notifications from the server, as encoded JSON.
280	rx: SubscriptionReceiver,
281	/// Callback kind.
282	kind: Option<SubscriptionKind>,
283	/// Marker in order to pin the `Notif` parameter.
284	marker: PhantomData<Notif>,
285}
286
287// `Subscription` does not automatically implement this due to `PhantomData<Notif>`,
288// but type type has no need to be pinned.
289impl<Notif> std::marker::Unpin for Subscription<Notif> {}
290
291impl<Notif> Subscription<Notif> {
292	/// Create a new subscription.
293	fn new(to_back: mpsc::Sender<FrontToBack>, rx: SubscriptionReceiver, kind: SubscriptionKind) -> Self {
294		Self { to_back, rx, kind: Some(kind), marker: PhantomData, is_closed: false }
295	}
296
297	/// Return the subscription type and, if applicable, ID.
298	pub fn kind(&self) -> &SubscriptionKind {
299		self.kind.as_ref().expect("only None after unsubscribe; qed")
300	}
301
302	/// Unsubscribe and consume the subscription.
303	pub async fn unsubscribe(mut self) -> Result<(), Error> {
304		let msg = match self.kind.take().expect("only None after unsubscribe; qed") {
305			SubscriptionKind::Method(notif) => FrontToBack::UnregisterNotification(notif),
306			SubscriptionKind::Subscription(sub_id) => FrontToBack::SubscriptionClosed(sub_id),
307		};
308		// If this fails the connection was already closed i.e, already "unsubscribed".
309		let _ = self.to_back.send(msg).await;
310
311		// wait until notif channel is closed then the subscription was closed.
312		while self.rx.next().await.is_some() {}
313
314		Ok(())
315	}
316
317	/// The reason why the subscription was closed.
318	///
319	/// Returns Some(reason) is the subscription was closed otherwise
320	/// None is returned.
321	pub fn close_reason(&self) -> Option<SubscriptionCloseReason> {
322		let lagged = self.rx.lagged.has_lagged();
323
324		// `is_closed` is only set if the subscription has been polled
325		// and that is why lagged is checked here as well.
326		if !self.is_closed && !lagged {
327			return None;
328		}
329
330		if lagged { Some(SubscriptionCloseReason::Lagged) } else { Some(SubscriptionCloseReason::ConnectionClosed) }
331	}
332}
333
334/// Batch request message.
335#[derive(Debug)]
336struct BatchMessage {
337	/// Serialized batch request.
338	raw: String,
339	/// Request IDs.
340	ids: Range<u64>,
341	/// One-shot channel over which we send back the result of this request.
342	send_back: oneshot::Sender<Result<Vec<RawResponseOwned>, InvalidRequestId>>,
343}
344
345/// Request message.
346#[derive(Debug)]
347struct RequestMessage {
348	/// Serialized message.
349	raw: String,
350	/// Request ID.
351	id: Id<'static>,
352	/// One-shot channel over which we send back the result of this request.
353	send_back: Option<oneshot::Sender<Result<RawResponseOwned, InvalidRequestId>>>,
354}
355
356/// Subscription message.
357#[derive(Debug)]
358struct SubscriptionMessage {
359	/// Serialized message.
360	raw: String,
361	/// Request ID of the subscribe message.
362	subscribe_id: Id<'static>,
363	/// Request ID of the unsubscribe message.
364	unsubscribe_id: Id<'static>,
365	/// Method to use to unsubscribe later. Used if the channel unexpectedly closes.
366	unsubscribe_method: String,
367	/// If the subscription succeeds, we return a [`mpsc::Receiver`] that will receive notifications.
368	/// When we get a response from the server about that subscription, we send the result over
369	/// this channel.
370	send_back: oneshot::Sender<Result<(SubscriptionReceiver, SubscriptionId<'static>), Error>>,
371}
372
373/// RegisterNotification message.
374#[derive(Debug)]
375struct RegisterNotificationMessage {
376	/// Method name this notification handler is attached to
377	method: String,
378	/// We return a [`mpsc::Receiver`] that will receive notifications.
379	/// When we get a response from the server about that subscription, we send the result over
380	/// this channel.
381	send_back: oneshot::Sender<Result<(SubscriptionReceiver, String), Error>>,
382}
383
384/// Message that the Client can send to the background task.
385#[derive(Debug)]
386enum FrontToBack {
387	/// Send a batch request to the server.
388	Batch(BatchMessage),
389	/// Send a notification to the server.
390	Notification(String),
391	/// Send a request to the server.
392	Request(RequestMessage),
393	/// Send a subscription request to the server.
394	Subscribe(SubscriptionMessage),
395	/// Register a notification handler
396	RegisterNotification(RegisterNotificationMessage),
397	/// Unregister a notification handler
398	UnregisterNotification(String),
399	/// When a subscription channel is closed, we send this message to the background
400	/// task to mark it ready for garbage collection.
401	// NOTE: It is not possible to cancel pending subscriptions or pending requests.
402	// Such operations will be blocked until a response is received or the background
403	// thread has been terminated.
404	SubscriptionClosed(SubscriptionId<'static>),
405}
406
407impl<Notif> Subscription<Notif>
408where
409	Notif: DeserializeOwned,
410{
411	/// Returns the next notification from the stream.
412	/// This may return `None` if the subscription has been terminated,
413	/// which may happen if the channel becomes full or is dropped.
414	///
415	/// **Note:** This has an identical signature to the [`StreamExt::next`]
416	/// method (and delegates to that). Import [`StreamExt`] if you'd like
417	/// access to other stream combinator methods.
418	#[allow(clippy::should_implement_trait)]
419	pub async fn next(&mut self) -> Option<Result<Notif, serde_json::Error>> {
420		StreamExt::next(self).await
421	}
422}
423
424impl<Notif> Stream for Subscription<Notif>
425where
426	Notif: DeserializeOwned,
427{
428	type Item = Result<Notif, serde_json::Error>;
429	fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
430		let res = match futures_util::ready!(self.rx.poll_next_unpin(cx)) {
431			Some(v) => Some(serde_json::from_str::<Notif>(v.get())),
432			None => {
433				self.is_closed = true;
434				None
435			}
436		};
437
438		Poll::Ready(res)
439	}
440}
441
442impl<Notif> Drop for Subscription<Notif> {
443	fn drop(&mut self) {
444		// We can't actually guarantee that this goes through. If the background task is busy, then
445		// the channel's buffer will be full.
446		// However, when a notification arrives, the background task will realize that the channel
447		// to the `Callback` has been closed.
448
449		let msg = match self.kind.take() {
450			Some(SubscriptionKind::Method(notif)) => FrontToBack::UnregisterNotification(notif),
451			Some(SubscriptionKind::Subscription(sub_id)) => FrontToBack::SubscriptionClosed(sub_id),
452			None => return,
453		};
454		let _ = self.to_back.try_send(msg);
455	}
456}
457
458#[derive(Debug)]
459/// Keep track of request IDs.
460pub struct RequestIdManager {
461	/// Get the next request ID.
462	current_id: CurrentId,
463	/// Request ID type.
464	id_kind: IdKind,
465}
466
467impl RequestIdManager {
468	/// Create a new `RequestIdGuard` with the provided concurrency limit.
469	pub fn new(id_kind: IdKind) -> Self {
470		Self { current_id: CurrentId::new(), id_kind }
471	}
472
473	/// Attempts to get the next request ID.
474	pub fn next_request_id(&self) -> Id<'static> {
475		self.id_kind.into_id(self.current_id.next())
476	}
477
478	/// Get a handle to the `IdKind`.
479	pub fn as_id_kind(&self) -> IdKind {
480		self.id_kind
481	}
482}
483
484/// JSON-RPC request object id data type.
485#[derive(Debug, Copy, Clone)]
486pub enum IdKind {
487	/// String.
488	String,
489	/// Number.
490	Number,
491}
492
493impl IdKind {
494	/// Generate an `Id` from number.
495	pub fn into_id(self, id: u64) -> Id<'static> {
496		match self {
497			IdKind::Number => Id::Number(id),
498			IdKind::String => Id::Str(format!("{id}").into()),
499		}
500	}
501}
502
503#[derive(Debug)]
504struct CurrentId(AtomicUsize);
505
506impl CurrentId {
507	fn new() -> Self {
508		CurrentId(AtomicUsize::new(0))
509	}
510
511	fn next(&self) -> u64 {
512		self.0
513			.fetch_add(1, Ordering::Relaxed)
514			.try_into()
515			.expect("usize -> u64 infallible, there are no CPUs > 64 bits; qed")
516	}
517}
518
519/// Generate a range of IDs to be used in a batch request.
520pub fn generate_batch_id_range(id: Id, len: u64) -> Result<Range<u64>, Error> {
521	let id_start = id.try_parse_inner_as_number()?;
522	let id_end = id_start
523		.checked_add(len)
524		.ok_or_else(|| Error::Custom("BatchID range wrapped; restart the client or try again later".to_string()))?;
525
526	Ok(id_start..id_end)
527}
528
529/// Represent a single entry in a batch response.
530pub type BatchEntry<'a, R> = Result<R, ErrorObject<'a>>;
531
532/// Batch response.
533#[derive(Debug, Clone)]
534pub struct BatchResponse<'a, R> {
535	successful_calls: usize,
536	failed_calls: usize,
537	responses: Vec<BatchEntry<'a, R>>,
538}
539
540impl<'a, R: fmt::Debug + 'a> BatchResponse<'a, R> {
541	/// Create a new [`BatchResponse`].
542	pub fn new(successful_calls: usize, responses: Vec<BatchEntry<'a, R>>, failed_calls: usize) -> Self {
543		Self { successful_calls, responses, failed_calls }
544	}
545
546	/// Get the length of the batch response.
547	pub fn len(&self) -> usize {
548		self.responses.len()
549	}
550
551	/// Is empty.
552	pub fn is_empty(&self) -> bool {
553		self.responses.len() == 0
554	}
555
556	/// Get the number of successful calls in the batch.
557	pub fn num_successful_calls(&self) -> usize {
558		self.successful_calls
559	}
560
561	/// Get the number of failed calls in the batch.
562	pub fn num_failed_calls(&self) -> usize {
563		self.failed_calls
564	}
565
566	/// Returns `Ok(iterator)` if all responses were successful
567	/// otherwise `Err(iterator)` is returned.
568	///
569	/// If you want get all responses if an error responses occurs use [`BatchResponse::into_iter`]
570	/// instead where it's possible to implement customized logic.
571	pub fn into_ok(
572		self,
573	) -> Result<impl Iterator<Item = R> + 'a + std::fmt::Debug, impl Iterator<Item = ErrorObject<'a>> + std::fmt::Debug>
574	{
575		if self.failed_calls > 0 {
576			Err(self.into_iter().filter_map(|err| err.err()))
577		} else {
578			Ok(self.into_iter().filter_map(|r| r.ok()))
579		}
580	}
581
582	/// Similar to [`BatchResponse::into_ok`] but takes the responses by reference instead.
583	pub fn ok(
584		&self,
585	) -> Result<impl Iterator<Item = &R> + std::fmt::Debug, impl Iterator<Item = &ErrorObject<'a>> + std::fmt::Debug> {
586		if self.failed_calls > 0 {
587			Err(self.responses.iter().filter_map(|err| err.as_ref().err()))
588		} else {
589			Ok(self.responses.iter().filter_map(|r| r.as_ref().ok()))
590		}
591	}
592
593	/// Returns an iterator over all responses.
594	pub fn iter(&self) -> impl Iterator<Item = &BatchEntry<'_, R>> {
595		self.responses.iter()
596	}
597}
598
599impl<'a, R> IntoIterator for BatchResponse<'a, R> {
600	type Item = BatchEntry<'a, R>;
601	type IntoIter = std::vec::IntoIter<Self::Item>;
602
603	fn into_iter(self) -> Self::IntoIter {
604		self.responses.into_iter()
605	}
606}
607
608#[derive(thiserror::Error, Debug)]
609enum TrySubscriptionSendError {
610	#[error("The subscription is closed")]
611	Closed,
612	#[error("A subscription message was dropped")]
613	TooSlow(Box<RawValue>),
614}
615
616#[derive(Debug)]
617pub(crate) struct SubscriptionSender {
618	inner: mpsc::Sender<Box<RawValue>>,
619	lagged: SubscriptionLagged,
620}
621
622impl SubscriptionSender {
623	fn send(&self, msg: Box<RawValue>) -> Result<(), TrySubscriptionSendError> {
624		match self.inner.try_send(msg) {
625			Ok(_) => Ok(()),
626			Err(TrySendError::Closed(_)) => Err(TrySubscriptionSendError::Closed),
627			Err(TrySendError::Full(m)) => {
628				self.lagged.set_lagged();
629				Err(TrySubscriptionSendError::TooSlow(m))
630			}
631		}
632	}
633}
634
635#[derive(Debug)]
636pub(crate) struct SubscriptionReceiver {
637	inner: mpsc::Receiver<Box<RawValue>>,
638	lagged: SubscriptionLagged,
639}
640
641impl Stream for SubscriptionReceiver {
642	type Item = Box<RawValue>;
643
644	fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
645		self.inner.poll_recv(cx)
646	}
647}
648
649fn subscription_channel(max_buf_size: usize) -> (SubscriptionSender, SubscriptionReceiver) {
650	let (tx, rx) = mpsc::channel(max_buf_size);
651	let lagged_tx = SubscriptionLagged::new();
652	let lagged_rx = lagged_tx.clone();
653
654	(SubscriptionSender { inner: tx, lagged: lagged_tx }, SubscriptionReceiver { inner: rx, lagged: lagged_rx })
655}
656
657/// Represents an active subscription returned by the server.
658#[derive(Debug)]
659pub struct SubscriptionResponse {
660	/// The ID of the subscription.
661	sub_id: SubscriptionId<'static>,
662	// The receiver is used to receive notifications from the server and shouldn't be exposed to the user
663	// from the middleware.
664	stream: SubscriptionReceiver,
665}
666
667impl SubscriptionResponse {
668	/// Get the subscription ID.
669	pub fn subscription_id(&self) -> &SubscriptionId<'static> {
670		&self.sub_id
671	}
672}
673
674/// A raw JSON-RPC response object which can be either a JSON-RPC success or error response.
675///
676/// This is a wrapper around the `jsonrpsee_types::Response` type for ease of use
677/// for middleware client implementations.
678#[derive(Debug)]
679pub struct RawResponse<'a>(jsonrpsee_types::Response<'a, Box<RawValue>>);
680
681impl Serialize for RawResponse<'_> {
682	fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
683	where
684		S: serde::Serializer,
685	{
686		self.0.serialize(serializer)
687	}
688}
689
690impl<'a> From<jsonrpsee_types::Response<'a, Box<RawValue>>> for RawResponse<'a> {
691	fn from(r: jsonrpsee_types::Response<'a, Box<RawValue>>) -> Self {
692		Self(r)
693	}
694}
695
696impl<'a> RawResponse<'a> {
697	/// Whether this response is successful JSON-RPC response.
698	pub fn is_success(&self) -> bool {
699		match self.0.payload {
700			jsonrpsee_types::ResponsePayload::Success(_) => true,
701			jsonrpsee_types::ResponsePayload::Error(_) => false,
702		}
703	}
704
705	/// Extract the error object from the response if it is an error.
706	pub fn as_error(&self) -> Option<&ErrorObject<'_>> {
707		match self.0.payload {
708			jsonrpsee_types::ResponsePayload::Error(ref err) => Some(err),
709			_ => None,
710		}
711	}
712
713	// Extract the result field the response if it is a success.
714	///
715	/// Omits JSON-RPC specific fields like `jsonrpc` and `id`.
716	pub fn as_success(&self) -> Option<&RawValue> {
717		match self.0.payload {
718			jsonrpsee_types::ResponsePayload::Success(ref res) => Some(res),
719			_ => None,
720		}
721	}
722
723	/// Get the request ID.
724	pub fn id(&self) -> &Id<'a> {
725		&self.0.id
726	}
727
728	/// Consume the response and extract the inner value.
729	pub fn into_inner(self) -> jsonrpsee_types::Response<'a, Box<RawValue>> {
730		self.0
731	}
732
733	/// Convert the response into an owned version.
734	pub fn into_owned(self) -> RawResponseOwned {
735		RawResponse(self.0.into_owned())
736	}
737}
738
739impl ToJson for RawResponse<'_> {
740	fn to_json(&self) -> Result<Box<RawValue>, serde_json::Error> {
741		serde_json::value::to_raw_value(&self.0)
742	}
743}
744
745/// Middleware batch response.
746pub type MiddlewareBatchResponse = Vec<RawResponseOwned>;
747
748/// Middleware method response which can be either a method call or a subscription response.
749/// and treated differently.
750#[derive(Debug)]
751pub struct MiddlewareMethodResponse {
752	rp: RawResponseOwned,
753	subscription: Option<SubscriptionResponse>,
754}
755
756impl Serialize for MiddlewareMethodResponse {
757	fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
758	where
759		S: serde::Serializer,
760	{
761		self.rp.serialize(serializer)
762	}
763}
764
765impl Deref for MiddlewareMethodResponse {
766	type Target = RawResponseOwned;
767
768	fn deref(&self) -> &Self::Target {
769		&self.rp
770	}
771}
772
773impl MiddlewareMethodResponse {
774	/// Create a new [`MiddlewareMethodResponse`] which is plain response.
775	pub fn response(rp: RawResponseOwned) -> Self {
776		Self { rp, subscription: None }
777	}
778
779	/// Create a new [`MiddlewareMethodResponse`] which is a subscription response.
780	pub fn subscription_response(rp: RawResponseOwned, subscription: SubscriptionResponse) -> Self {
781		Self { rp, subscription: Some(subscription) }
782	}
783
784	/// Convert the response into parts.
785	pub fn into_parts(self) -> (RawResponseOwned, Option<SubscriptionResponse>) {
786		(self.rp, self.subscription)
787	}
788
789	/// Extract the response.
790	pub fn into_response(self) -> RawResponseOwned {
791		self.rp
792	}
793
794	/// Extract the subscription response if it is a subscription.
795	pub fn into_subscription(self) -> Option<SubscriptionResponse> {
796		self.subscription
797	}
798}
799
800/// Middleware notification response.
801#[derive(Debug, Clone)]
802pub struct MiddlewareNotifResponse(Extensions);
803
804impl From<Extensions> for MiddlewareNotifResponse {
805	fn from(extensions: Extensions) -> Self {
806		Self(extensions)
807	}
808}
809
810impl MiddlewareNotifResponse {
811	/// Get the extensions.
812	pub fn extensions(&self) -> &Extensions {
813		&self.0
814	}
815
816	/// Get the extensions mutable.
817	pub fn extensions_mut(&mut self) -> &mut Extensions {
818		&mut self.0
819	}
820}
821
822impl<T: Serialize> ToJson for Result<T, Error> {
823	fn to_json(&self) -> Result<Box<RawValue>, serde_json::Error> {
824		match self {
825			Ok(v) => serde_json::value::to_raw_value(v),
826			Err(e) => serde_json::value::to_raw_value(&e.to_string()),
827		}
828	}
829}