jsonrpsee_core/client/async_client/
mod.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27//! Abstract async client.
28
29mod helpers;
30mod manager;
31mod rpc_service;
32mod utils;
33
34pub use rpc_service::RpcService;
35
36use std::borrow::Cow as StdCow;
37use std::time::Duration;
38
39use crate::JsonRawValue;
40use crate::client::async_client::helpers::process_subscription_close_response;
41use crate::client::async_client::utils::MaybePendingFutures;
42use crate::client::{
43	BatchResponse, ClientT, Error, ReceivedMessage, RegisterNotificationMessage, Subscription, SubscriptionClientT,
44	SubscriptionKind, TransportReceiverT, TransportSenderT,
45};
46use crate::error::RegisterMethodError;
47use crate::middleware::layer::{RpcLogger, RpcLoggerLayer};
48use crate::middleware::{Batch, IsBatch, IsSubscription, Request, RpcServiceBuilder, RpcServiceT};
49use crate::params::{BatchRequestBuilder, EmptyBatchRequest};
50use crate::traits::ToRpcParams;
51use futures_util::Stream;
52use futures_util::future::{self, Either};
53use futures_util::stream::StreamExt;
54use helpers::{
55	build_unsubscribe_message, call_with_timeout, process_batch_response, process_notification,
56	process_single_response, process_subscription_response, stop_subscription,
57};
58use http::Extensions;
59use jsonrpsee_types::response::SubscriptionError;
60use jsonrpsee_types::{InvalidRequestId, ResponseSuccess, TwoPointZero};
61use jsonrpsee_types::{Response, SubscriptionResponse};
62use manager::RequestManager;
63use serde::de::DeserializeOwned;
64use std::sync::Arc;
65use tokio::sync::{mpsc, oneshot};
66use tower::layer::util::Identity;
67
68use self::utils::{InactivityCheck, IntervalStream};
69use super::{
70	FrontToBack, IdKind, MiddlewareBatchResponse, MiddlewareMethodResponse, MiddlewareNotifResponse, RequestIdManager,
71	generate_batch_id_range, subscription_channel,
72};
73
74pub(crate) type Notification<'a> = jsonrpsee_types::Notification<'a, Option<Box<JsonRawValue>>>;
75
76type Logger = tower::layer::util::Stack<RpcLoggerLayer, tower::layer::util::Identity>;
77
78const LOG_TARGET: &str = "jsonrpsee-client";
79const NOT_POISONED: &str = "Not poisoned; qed";
80
81/// Configuration for WebSocket ping/pong mechanism and it may be used to disconnect
82/// an inactive connection.
83///
84/// jsonrpsee doesn't associate the ping/pong frames just that if
85/// a pong frame isn't received within the `inactive_limit` then it's regarded
86/// as missed.
87///
88/// Such that the `inactive_limit` should be configured to longer than a single
89/// WebSocket ping takes or it might be missed and may end up
90/// terminating the connection.
91///
92/// Default: ping_interval: 30 seconds, max failures: 1 and inactive limit: 40 seconds.
93#[derive(Debug, Copy, Clone)]
94pub struct PingConfig {
95	/// Interval that the pings are sent.
96	pub(crate) ping_interval: Duration,
97	/// Max allowed time for a connection to stay idle.
98	pub(crate) inactive_limit: Duration,
99	/// Max failures.
100	pub(crate) max_failures: usize,
101}
102
103impl Default for PingConfig {
104	fn default() -> Self {
105		Self { ping_interval: Duration::from_secs(30), max_failures: 1, inactive_limit: Duration::from_secs(40) }
106	}
107}
108
109impl PingConfig {
110	/// Create a new PingConfig.
111	pub fn new() -> Self {
112		Self::default()
113	}
114
115	/// Configure the interval when the WebSocket pings are sent out.
116	pub fn ping_interval(mut self, ping_interval: Duration) -> Self {
117		self.ping_interval = ping_interval;
118		self
119	}
120
121	/// Configure how long to wait for the WebSocket pong.
122	/// When this limit is expired it's regarded as unresponsive.
123	///
124	/// You may configure how many times the connection is allowed to
125	/// be inactive by [`PingConfig::max_failures`].
126	pub fn inactive_limit(mut self, inactivity_limit: Duration) -> Self {
127		self.inactive_limit = inactivity_limit;
128		self
129	}
130
131	/// Configure how many times the connection is allowed be
132	/// inactive until the connection is closed.
133	///
134	/// # Panics
135	///
136	/// This method panics if `max` == 0.
137	pub fn max_failures(mut self, max: usize) -> Self {
138		assert!(max > 0);
139		self.max_failures = max;
140		self
141	}
142}
143
144#[derive(Debug, Default, Clone)]
145pub(crate) struct ThreadSafeRequestManager(Arc<std::sync::Mutex<RequestManager>>);
146
147impl ThreadSafeRequestManager {
148	pub(crate) fn new() -> Self {
149		Self::default()
150	}
151
152	pub(crate) fn lock(&self) -> std::sync::MutexGuard<'_, RequestManager> {
153		self.0.lock().expect(NOT_POISONED)
154	}
155}
156
157pub(crate) type SharedDisconnectReason = Arc<std::sync::RwLock<Option<Arc<Error>>>>;
158
159/// If the background thread is terminated, this type
160/// can be used to read the error cause.
161///
162// NOTE: This is an AsyncRwLock to be &self.
163#[derive(Debug)]
164struct ErrorFromBack {
165	conn: mpsc::Sender<FrontToBack>,
166	disconnect_reason: SharedDisconnectReason,
167}
168
169impl ErrorFromBack {
170	fn new(conn: mpsc::Sender<FrontToBack>, disconnect_reason: SharedDisconnectReason) -> Self {
171		Self { conn, disconnect_reason }
172	}
173
174	async fn read_error(&self) -> Error {
175		// When the background task is closed the error is written to `disconnect_reason`.
176		self.conn.closed().await;
177
178		if let Some(err) = self.disconnect_reason.read().expect(NOT_POISONED).as_ref() {
179			Error::RestartNeeded(err.clone())
180		} else {
181			Error::Custom("Error reason could not be found. This is a bug. Please open an issue.".to_string())
182		}
183	}
184}
185
186/// Builder for [`Client`].
187#[derive(Debug, Clone)]
188pub struct ClientBuilder<L = Logger> {
189	request_timeout: Duration,
190	max_concurrent_requests: usize,
191	max_buffer_capacity_per_subscription: usize,
192	id_kind: IdKind,
193	ping_config: Option<PingConfig>,
194	tcp_no_delay: bool,
195	service_builder: RpcServiceBuilder<L>,
196}
197
198impl Default for ClientBuilder {
199	fn default() -> Self {
200		Self {
201			request_timeout: Duration::from_secs(60),
202			max_concurrent_requests: 256,
203			max_buffer_capacity_per_subscription: 1024,
204			id_kind: IdKind::Number,
205			ping_config: None,
206			tcp_no_delay: true,
207			service_builder: RpcServiceBuilder::default().rpc_logger(1024),
208		}
209	}
210}
211
212impl ClientBuilder<Identity> {
213	/// Create a new client builder.
214	pub fn new() -> ClientBuilder {
215		ClientBuilder::default()
216	}
217}
218
219impl<L> ClientBuilder<L> {
220	/// Set request timeout (default is 60 seconds).
221	pub fn request_timeout(mut self, timeout: Duration) -> Self {
222		self.request_timeout = timeout;
223		self
224	}
225
226	/// Set max concurrent requests (default is 256).
227	pub fn max_concurrent_requests(mut self, max: usize) -> Self {
228		self.max_concurrent_requests = max;
229		self
230	}
231
232	/// Set max buffer capacity for each subscription; when the capacity is exceeded the subscription
233	/// will be dropped (default is 1024).
234	///
235	/// You may prevent the subscription from being dropped by polling often enough
236	/// [`Subscription::next()`](../../jsonrpsee_core/client/struct.Subscription.html#method.next) such that
237	/// it can keep with the rate as server produces new items on the subscription.
238	///
239	///
240	/// # Panics
241	///
242	/// This function panics if `max` is 0.
243	pub fn max_buffer_capacity_per_subscription(mut self, max: usize) -> Self {
244		assert!(max > 0);
245		self.max_buffer_capacity_per_subscription = max;
246		self
247	}
248
249	/// Configure the data type of the request object ID (default is number).
250	pub fn id_format(mut self, id_kind: IdKind) -> Self {
251		self.id_kind = id_kind;
252		self
253	}
254
255	/// Enable WebSocket ping/pong on the client.
256	///
257	/// This only works if the transport supports WebSocket pings.
258	///
259	/// Default: pings are disabled.
260	pub fn enable_ws_ping(mut self, cfg: PingConfig) -> Self {
261		self.ping_config = Some(cfg);
262		self
263	}
264
265	/// Disable WebSocket ping/pong on the server.
266	///
267	/// Default: pings are disabled.
268	pub fn disable_ws_ping(mut self) -> Self {
269		self.ping_config = None;
270		self
271	}
272
273	/// Configure `TCP_NODELAY` on the socket to the supplied value `nodelay`.
274	///
275	/// On some transports this may have no effect.
276	///
277	/// Default is `true`.
278	pub fn set_tcp_no_delay(mut self, no_delay: bool) -> Self {
279		self.tcp_no_delay = no_delay;
280		self
281	}
282
283	/// Configure the client to a specific RPC middleware which
284	/// runs for every JSON-RPC call.
285	///
286	/// This is useful for adding a custom logger or something similar.
287	pub fn set_rpc_middleware<T>(self, service_builder: RpcServiceBuilder<T>) -> ClientBuilder<T> {
288		ClientBuilder {
289			request_timeout: self.request_timeout,
290			max_concurrent_requests: self.max_concurrent_requests,
291			max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription,
292			id_kind: self.id_kind,
293			ping_config: self.ping_config,
294			tcp_no_delay: self.tcp_no_delay,
295			service_builder,
296		}
297	}
298
299	/// Build the client with given transport.
300	///
301	/// ## Panics
302	///
303	/// Panics if called outside of `tokio` runtime context.
304	#[cfg(feature = "async-client")]
305	#[cfg_attr(docsrs, doc(cfg(feature = "async-client")))]
306	pub fn build_with_tokio<S, R, Svc>(self, sender: S, receiver: R) -> Client<Svc>
307	where
308		S: TransportSenderT + Send,
309		R: TransportReceiverT + Send,
310		L: tower::Layer<RpcService, Service = Svc> + Clone + Send + Sync + 'static,
311	{
312		let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
313		let disconnect_reason = SharedDisconnectReason::default();
314		let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription;
315		let (client_dropped_tx, client_dropped_rx) = oneshot::channel();
316		let (send_receive_task_sync_tx, send_receive_task_sync_rx) = mpsc::channel(1);
317		let manager = ThreadSafeRequestManager::new();
318
319		let (ping_interval, inactivity_stream, inactivity_check) = match self.ping_config {
320			None => (IntervalStream::pending(), IntervalStream::pending(), InactivityCheck::Disabled),
321			Some(p) => {
322				// NOTE: This emits a tick immediately to sync how the `inactive_interval` works
323				// because it starts measuring when the client start-ups.
324				let ping_interval = IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(
325					tokio::time::interval(p.ping_interval),
326				));
327
328				let inactive_interval = {
329					let start = tokio::time::Instant::now() + p.inactive_limit;
330					IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
331						start,
332						p.inactive_limit,
333					)))
334				};
335
336				let inactivity_check = InactivityCheck::new(p.inactive_limit, p.max_failures);
337
338				(ping_interval, inactive_interval, inactivity_check)
339			}
340		};
341
342		tokio::spawn(send_task(SendTaskParams {
343			sender,
344			from_frontend: from_front,
345			close_tx: send_receive_task_sync_tx.clone(),
346			manager: manager.clone(),
347			max_buffer_capacity_per_subscription,
348			ping_interval,
349		}));
350
351		tokio::spawn(read_task(ReadTaskParams {
352			receiver,
353			close_tx: send_receive_task_sync_tx,
354			to_send_task: to_back.clone(),
355			manager,
356			max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription,
357			inactivity_check,
358			inactivity_stream,
359		}));
360
361		tokio::spawn(wait_for_shutdown(send_receive_task_sync_rx, client_dropped_rx, disconnect_reason.clone()));
362
363		Client {
364			to_back: to_back.clone(),
365			service: self.service_builder.service(RpcService::new(to_back.clone())),
366			request_timeout: self.request_timeout,
367			error: ErrorFromBack::new(to_back, disconnect_reason),
368			id_manager: RequestIdManager::new(self.id_kind),
369			on_exit: Some(client_dropped_tx),
370		}
371	}
372
373	/// Build the client with given transport.
374	#[cfg(all(feature = "async-wasm-client", target_arch = "wasm32"))]
375	#[cfg_attr(docsrs, doc(cfg(feature = "async-wasm-client")))]
376	pub fn build_with_wasm<S, R, Svc>(self, sender: S, receiver: R) -> Client<Svc>
377	where
378		S: TransportSenderT,
379		R: TransportReceiverT,
380		L: tower::Layer<RpcService, Service = Svc> + Clone + Send + Sync + 'static,
381	{
382		use futures_util::stream::Pending;
383
384		type PendingIntervalStream = IntervalStream<Pending<()>>;
385
386		let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
387		let disconnect_reason = SharedDisconnectReason::default();
388		let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription;
389		let (client_dropped_tx, client_dropped_rx) = oneshot::channel();
390		let (send_receive_task_sync_tx, send_receive_task_sync_rx) = mpsc::channel(1);
391		let manager = ThreadSafeRequestManager::new();
392
393		let ping_interval = PendingIntervalStream::pending();
394		let inactivity_stream = PendingIntervalStream::pending();
395		let inactivity_check = InactivityCheck::Disabled;
396
397		wasm_bindgen_futures::spawn_local(send_task(SendTaskParams {
398			sender,
399			from_frontend: from_front,
400			close_tx: send_receive_task_sync_tx.clone(),
401			manager: manager.clone(),
402			max_buffer_capacity_per_subscription,
403			ping_interval,
404		}));
405
406		wasm_bindgen_futures::spawn_local(read_task(ReadTaskParams {
407			receiver,
408			close_tx: send_receive_task_sync_tx,
409			to_send_task: to_back.clone(),
410			manager,
411			max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription,
412			inactivity_check,
413			inactivity_stream,
414		}));
415
416		wasm_bindgen_futures::spawn_local(wait_for_shutdown(
417			send_receive_task_sync_rx,
418			client_dropped_rx,
419			disconnect_reason.clone(),
420		));
421
422		Client {
423			to_back: to_back.clone(),
424			service: self.service_builder.service(RpcService::new(to_back.clone())),
425			request_timeout: self.request_timeout,
426			error: ErrorFromBack::new(to_back, disconnect_reason),
427			id_manager: RequestIdManager::new(self.id_kind),
428			on_exit: Some(client_dropped_tx),
429		}
430	}
431}
432
433/// Generic asynchronous client.
434#[derive(Debug)]
435pub struct Client<L = RpcLogger<RpcService>> {
436	/// Channel to send requests to the background task.
437	to_back: mpsc::Sender<FrontToBack>,
438	error: ErrorFromBack,
439	/// Request timeout. Defaults to 60sec.
440	request_timeout: Duration,
441	/// Request ID manager.
442	id_manager: RequestIdManager,
443	/// When the client is dropped a message is sent to the background thread.
444	on_exit: Option<oneshot::Sender<()>>,
445	service: L,
446}
447
448impl Client<Identity> {
449	/// Create a builder for the client.
450	pub fn builder() -> ClientBuilder {
451		ClientBuilder::new()
452	}
453}
454
455impl<L> Client<L> {
456	/// Checks if the client is connected to the target.
457	pub fn is_connected(&self) -> bool {
458		!self.to_back.is_closed()
459	}
460
461	async fn run_future_until_timeout<T>(&self, fut: impl Future<Output = Result<T, Error>>) -> Result<T, Error> {
462		tokio::pin!(fut);
463
464		match futures_util::future::select(fut, futures_timer::Delay::new(self.request_timeout)).await {
465			Either::Left((Ok(r), _)) => Ok(r),
466			Either::Left((Err(Error::ServiceDisconnect), _)) => Err(self.on_disconnect().await),
467			Either::Left((Err(e), _)) => Err(e),
468			Either::Right(_) => Err(Error::RequestTimeout),
469		}
470	}
471
472	/// Completes when the client is disconnected or the client's background task encountered an error.
473	/// If the client is already disconnected, the future produced by this method will complete immediately.
474	///
475	/// # Cancel safety
476	///
477	/// This method is cancel safe.
478	pub async fn on_disconnect(&self) -> Error {
479		self.error.read_error().await
480	}
481
482	/// Returns configured request timeout.
483	pub fn request_timeout(&self) -> Duration {
484		self.request_timeout
485	}
486}
487
488impl<L> Drop for Client<L> {
489	fn drop(&mut self) {
490		if let Some(e) = self.on_exit.take() {
491			let _ = e.send(());
492		}
493	}
494}
495
496impl<L> ClientT for Client<L>
497where
498	L: RpcServiceT<
499			MethodResponse = Result<MiddlewareMethodResponse, Error>,
500			BatchResponse = Result<MiddlewareBatchResponse, Error>,
501			NotificationResponse = Result<MiddlewareNotifResponse, Error>,
502		> + Send
503		+ Sync,
504{
505	fn notification<Params>(&self, method: &str, params: Params) -> impl Future<Output = Result<(), Error>> + Send
506	where
507		Params: ToRpcParams + Send,
508	{
509		async {
510			// NOTE: we use this to guard against max number of concurrent requests.
511			let _req_id = self.id_manager.next_request_id();
512			let params = params.to_rpc_params()?.map(StdCow::Owned);
513			let fut = self.service.notification(jsonrpsee_types::Notification::new(method.into(), params));
514			self.run_future_until_timeout(fut).await?;
515			Ok(())
516		}
517	}
518
519	fn request<R, Params>(&self, method: &str, params: Params) -> impl Future<Output = Result<R, Error>> + Send
520	where
521		R: DeserializeOwned,
522		Params: ToRpcParams + Send,
523	{
524		async {
525			let id = self.id_manager.next_request_id();
526			let params = params.to_rpc_params()?;
527			let fut = self.service.call(Request::borrowed(method, params.as_deref(), id.clone()));
528			let rp = self.run_future_until_timeout(fut).await?;
529			let success = ResponseSuccess::try_from(rp.into_response().into_inner())?;
530
531			serde_json::from_str(success.result.get()).map_err(Into::into)
532		}
533	}
534
535	fn batch_request<'a, R>(
536		&self,
537		batch: BatchRequestBuilder<'a>,
538	) -> impl Future<Output = Result<BatchResponse<'a, R>, Error>> + Send
539	where
540		R: DeserializeOwned + 'a,
541	{
542		async {
543			let batch = batch.build()?;
544			let id = self.id_manager.next_request_id();
545			let id_range = generate_batch_id_range(id, batch.len() as u64)?;
546
547			let mut b = Batch::with_capacity(batch.len());
548
549			for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
550				b.push(Request {
551					jsonrpc: TwoPointZero,
552					id: self.id_manager.as_id_kind().into_id(id),
553					method: method.into(),
554					params: params.map(StdCow::Owned),
555					extensions: Extensions::new(),
556				});
557			}
558
559			b.extensions_mut().insert(IsBatch { id_range });
560
561			let fut = self.service.batch(b);
562			let json_values = self.run_future_until_timeout(fut).await?;
563
564			let mut responses = Vec::with_capacity(json_values.len());
565			let mut successful_calls = 0;
566			let mut failed_calls = 0;
567
568			for json_val in json_values {
569				match ResponseSuccess::try_from(json_val.into_inner()) {
570					Ok(val) => {
571						let result: R = serde_json::from_str(val.result.get()).map_err(Error::ParseError)?;
572						responses.push(Ok(result));
573						successful_calls += 1;
574					}
575					Err(err) => {
576						responses.push(Err(err));
577						failed_calls += 1;
578					}
579				}
580			}
581			Ok(BatchResponse { successful_calls, failed_calls, responses })
582		}
583	}
584}
585
586impl<L> SubscriptionClientT for Client<L>
587where
588	L: RpcServiceT<
589			MethodResponse = Result<MiddlewareMethodResponse, Error>,
590			BatchResponse = Result<MiddlewareBatchResponse, Error>,
591			NotificationResponse = Result<MiddlewareNotifResponse, Error>,
592		> + Send
593		+ Sync,
594{
595	/// Send a subscription request to the server.
596	///
597	/// The `subscribe_method` and `params` are used to ask for the subscription towards the
598	/// server. The `unsubscribe_method` is used to close the subscription.
599	fn subscribe<'a, Notif, Params>(
600		&self,
601		subscribe_method: &'a str,
602		params: Params,
603		unsubscribe_method: &'a str,
604	) -> impl Future<Output = Result<Subscription<Notif>, Error>> + Send
605	where
606		Params: ToRpcParams + Send,
607		Notif: DeserializeOwned,
608	{
609		async move {
610			if subscribe_method == unsubscribe_method {
611				return Err(RegisterMethodError::SubscriptionNameConflict(unsubscribe_method.to_owned()).into());
612			}
613
614			let req_id_sub = self.id_manager.next_request_id();
615			let req_id_unsub = self.id_manager.next_request_id();
616			let params = params.to_rpc_params()?;
617
618			let mut ext = Extensions::new();
619			ext.insert(IsSubscription::new(req_id_sub.clone(), req_id_unsub, unsubscribe_method.to_owned()));
620
621			let req = Request {
622				jsonrpc: TwoPointZero,
623				id: req_id_sub,
624				method: subscribe_method.into(),
625				params: params.map(StdCow::Owned),
626				extensions: ext,
627			};
628
629			let fut = self.service.call(req);
630			let sub = self
631				.run_future_until_timeout(fut)
632				.await?
633				.into_subscription()
634				.expect("Extensions set to subscription, must return subscription; qed");
635			Ok(Subscription::new(self.to_back.clone(), sub.stream, SubscriptionKind::Subscription(sub.sub_id)))
636		}
637	}
638
639	/// Subscribe to a specific method.
640	fn subscribe_to_method<N>(&self, method: &str) -> impl Future<Output = Result<Subscription<N>, Error>> + Send
641	where
642		N: DeserializeOwned,
643	{
644		async {
645			let (send_back_tx, send_back_rx) = oneshot::channel();
646			if self
647				.to_back
648				.clone()
649				.send(FrontToBack::RegisterNotification(RegisterNotificationMessage {
650					send_back: send_back_tx,
651					method: method.to_owned(),
652				}))
653				.await
654				.is_err()
655			{
656				return Err(self.on_disconnect().await);
657			}
658
659			let res = call_with_timeout(self.request_timeout, send_back_rx).await;
660
661			let (rx, method) = match res {
662				Ok(Ok(val)) => val,
663				Ok(Err(err)) => return Err(err),
664				Err(_) => return Err(self.on_disconnect().await),
665			};
666
667			Ok(Subscription::new(self.to_back.clone(), rx, SubscriptionKind::Method(method)))
668		}
669	}
670}
671
672/// Handle backend messages.
673///
674/// Returns an error if the main background loop should be terminated.
675fn handle_backend_messages<R: TransportReceiverT>(
676	message: Option<Result<ReceivedMessage, R::Error>>,
677	manager: &ThreadSafeRequestManager,
678	max_buffer_capacity_per_subscription: usize,
679) -> Result<Vec<FrontToBack>, Error> {
680	// Handle raw messages of form `ReceivedMessage::Bytes` (Vec<u8>) or ReceivedMessage::Data` (String).
681	fn handle_recv_message(
682		raw: &[u8],
683		manager: &ThreadSafeRequestManager,
684		max_buffer_capacity_per_subscription: usize,
685	) -> Result<Vec<FrontToBack>, Error> {
686		let first_non_whitespace = raw.iter().find(|byte| !byte.is_ascii_whitespace());
687		let mut messages = Vec::new();
688
689		tracing::trace!(target: LOG_TARGET, "rx: {}", serde_json::from_slice::<&JsonRawValue>(raw).map_or("<invalid json>", |v| v.get()));
690
691		match first_non_whitespace {
692			Some(b'{') => {
693				// Single response to a request.
694				if let Ok(single) = serde_json::from_slice::<Response<_>>(raw) {
695					let maybe_unsub = process_single_response(
696						&mut manager.lock(),
697						single.into_owned().into(),
698						max_buffer_capacity_per_subscription,
699					)?;
700
701					if let Some(unsub) = maybe_unsub {
702						return Ok(vec![FrontToBack::Request(unsub)]);
703					}
704				}
705				// Subscription response.
706				else if let Ok(response) = serde_json::from_slice::<SubscriptionResponse<_>>(raw) {
707					if let Some(sub_id) = process_subscription_response(&mut manager.lock(), response) {
708						return Ok(vec![FrontToBack::SubscriptionClosed(sub_id)]);
709					}
710				}
711				// Subscription error response.
712				else if let Ok(response) = serde_json::from_slice::<SubscriptionError<_>>(raw) {
713					process_subscription_close_response(&mut manager.lock(), response);
714				}
715				// Incoming Notification
716				else if let Ok(notif) = serde_json::from_slice::<Notification>(raw) {
717					process_notification(&mut manager.lock(), notif);
718				} else {
719					return Err(unparse_error(raw));
720				}
721			}
722			Some(b'[') => {
723				// Batch response.
724				if let Ok(raw_responses) = serde_json::from_slice::<Vec<&JsonRawValue>>(raw) {
725					let mut batch = Vec::with_capacity(raw_responses.len());
726
727					let mut range = None;
728					let mut got_notif = false;
729
730					for r in raw_responses {
731						if let Ok(response) = serde_json::from_str::<Response<_>>(r.get()) {
732							let id = response.id.try_parse_inner_as_number()?;
733							batch.push(response.into_owned().into());
734
735							let r = range.get_or_insert(id..id);
736
737							if id < r.start {
738								r.start = id;
739							}
740
741							if id > r.end {
742								r.end = id;
743							}
744						} else if let Ok(response) = serde_json::from_str::<SubscriptionResponse<_>>(r.get()) {
745							got_notif = true;
746							if let Some(sub_id) = process_subscription_response(&mut manager.lock(), response) {
747								messages.push(FrontToBack::SubscriptionClosed(sub_id));
748							}
749						} else if let Ok(response) = serde_json::from_slice::<SubscriptionError<_>>(raw) {
750							got_notif = true;
751							process_subscription_close_response(&mut manager.lock(), response);
752						} else if let Ok(notif) = serde_json::from_str::<Notification>(r.get()) {
753							got_notif = true;
754							process_notification(&mut manager.lock(), notif);
755						} else {
756							return Err(unparse_error(raw));
757						};
758					}
759
760					if let Some(mut range) = range {
761						// the range is exclusive so need to add one.
762						range.end += 1;
763						process_batch_response(&mut manager.lock(), batch, range)?;
764					} else if !got_notif {
765						return Err(EmptyBatchRequest.into());
766					}
767				} else {
768					return Err(unparse_error(raw));
769				}
770			}
771			_ => {
772				return Err(unparse_error(raw));
773			}
774		};
775
776		Ok(messages)
777	}
778
779	match message {
780		Some(Ok(ReceivedMessage::Pong)) => {
781			tracing::debug!(target: LOG_TARGET, "Received pong");
782			Ok(vec![])
783		}
784		Some(Ok(ReceivedMessage::Bytes(raw))) => {
785			handle_recv_message(raw.as_ref(), manager, max_buffer_capacity_per_subscription)
786		}
787		Some(Ok(ReceivedMessage::Text(raw))) => {
788			handle_recv_message(raw.as_ref(), manager, max_buffer_capacity_per_subscription)
789		}
790		Some(Err(e)) => Err(Error::Transport(e.into())),
791		None => Err(Error::Custom("TransportReceiver dropped".into())),
792	}
793}
794
795/// Handle frontend messages.
796async fn handle_frontend_messages<S: TransportSenderT>(
797	message: FrontToBack,
798	manager: &ThreadSafeRequestManager,
799	sender: &mut S,
800	max_buffer_capacity_per_subscription: usize,
801) -> Result<(), S::Error> {
802	match message {
803		FrontToBack::Batch(batch) => {
804			if let Err(send_back) = manager.lock().insert_pending_batch(batch.ids.clone(), batch.send_back) {
805				tracing::debug!(target: LOG_TARGET, "Batch request already pending: {:?}", batch.ids);
806				let _ = send_back.send(Err(InvalidRequestId::Occupied(format!("{:?}", batch.ids))));
807				return Ok(());
808			}
809
810			sender.send(batch.raw).await?;
811		}
812		// User called `notification` on the front-end
813		FrontToBack::Notification(notif) => {
814			sender.send(notif).await?;
815		}
816		// User called `request` on the front-end
817		FrontToBack::Request(request) => {
818			if let Err(send_back) = manager.lock().insert_pending_call(request.id.clone(), request.send_back) {
819				tracing::debug!(target: LOG_TARGET, "Denied duplicate method call");
820
821				if let Some(s) = send_back {
822					let _ = s.send(Err(InvalidRequestId::Occupied(request.id.to_string())));
823				}
824				return Ok(());
825			}
826
827			sender.send(request.raw).await?;
828		}
829		// User called `subscribe` on the front-end.
830		FrontToBack::Subscribe(sub) => {
831			if let Err(send_back) = manager.lock().insert_pending_subscription(
832				sub.subscribe_id.clone(),
833				sub.unsubscribe_id.clone(),
834				sub.send_back,
835				sub.unsubscribe_method,
836			) {
837				tracing::debug!(target: LOG_TARGET, "Denied duplicate subscription");
838
839				let _ = send_back.send(Err(InvalidRequestId::Occupied(format!(
840					"sub_id={}:req_id={}",
841					sub.subscribe_id, sub.unsubscribe_id
842				))
843				.into()));
844				return Ok(());
845			}
846
847			sender.send(sub.raw).await?;
848		}
849		// User dropped a subscription.
850		FrontToBack::SubscriptionClosed(sub_id) => {
851			tracing::trace!(target: LOG_TARGET, "Closing subscription: {:?}", sub_id);
852			// NOTE: The subscription may have been closed earlier if
853			// the channel was full or disconnected.
854
855			let maybe_unsub = {
856				let m = &mut *manager.lock();
857
858				m.get_request_id_by_subscription_id(&sub_id)
859					.and_then(|req_id| build_unsubscribe_message(m, req_id, sub_id))
860			};
861
862			if let Some(unsub) = maybe_unsub {
863				stop_subscription::<S>(sender, unsub).await?;
864			}
865		}
866		// User called `register_notification` on the front-end.
867		FrontToBack::RegisterNotification(reg) => {
868			let (subscribe_tx, subscribe_rx) = subscription_channel(max_buffer_capacity_per_subscription);
869
870			if manager.lock().insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
871				let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
872			} else {
873				let _ = reg.send_back.send(Err(RegisterMethodError::AlreadyRegistered(reg.method).into()));
874			}
875		}
876		// User dropped the NotificationHandler for this method
877		FrontToBack::UnregisterNotification(method) => {
878			let _ = manager.lock().remove_notification_handler(&method);
879		}
880	};
881
882	Ok(())
883}
884
885fn unparse_error(raw: &[u8]) -> Error {
886	let json = serde_json::from_slice::<serde_json::Value>(raw);
887
888	let json_str = match json {
889		Ok(json) => serde_json::to_string(&json).expect("valid JSON; qed"),
890		Err(e) => e.to_string(),
891	};
892
893	Error::Custom(format!("Unparseable message: {json_str}"))
894}
895
896struct SendTaskParams<T: TransportSenderT, S> {
897	sender: T,
898	from_frontend: mpsc::Receiver<FrontToBack>,
899	close_tx: mpsc::Sender<Result<(), Error>>,
900	manager: ThreadSafeRequestManager,
901	max_buffer_capacity_per_subscription: usize,
902	ping_interval: IntervalStream<S>,
903}
904
905async fn send_task<T, S>(params: SendTaskParams<T, S>)
906where
907	T: TransportSenderT,
908	S: Stream + Unpin,
909{
910	let SendTaskParams {
911		mut sender,
912		mut from_frontend,
913		close_tx,
914		manager,
915		max_buffer_capacity_per_subscription,
916		mut ping_interval,
917	} = params;
918
919	// This is safe because `tokio::time::Interval`, `tokio::mpsc::Sender` and `tokio::mpsc::Receiver`
920	// are cancel-safe.
921	let res = loop {
922		tokio::select! {
923			biased;
924			_ = close_tx.closed() => break Ok(()),
925			maybe_msg = from_frontend.recv() => {
926				let Some(msg) = maybe_msg else {
927					break Ok(());
928				};
929
930				if let Err(e) =
931					handle_frontend_messages(msg, &manager, &mut sender, max_buffer_capacity_per_subscription).await
932				{
933					tracing::debug!(target: LOG_TARGET, "ws send failed: {e}");
934					break Err(Error::Transport(e.into()));
935				}
936			}
937			_ = ping_interval.next() => {
938				if let Err(err) = sender.send_ping().await {
939					tracing::debug!(target: LOG_TARGET, "Send ws ping failed: {err}");
940					break Err(Error::Transport(err.into()));
941				}
942			}
943		}
944	};
945
946	from_frontend.close();
947	let _ = sender.close().await;
948	let _ = close_tx.send(res).await;
949}
950
951struct ReadTaskParams<R: TransportReceiverT, S> {
952	receiver: R,
953	close_tx: mpsc::Sender<Result<(), Error>>,
954	to_send_task: mpsc::Sender<FrontToBack>,
955	manager: ThreadSafeRequestManager,
956	max_buffer_capacity_per_subscription: usize,
957	inactivity_check: InactivityCheck,
958	inactivity_stream: IntervalStream<S>,
959}
960
961async fn read_task<R, S>(params: ReadTaskParams<R, S>)
962where
963	R: TransportReceiverT,
964	S: Stream + Unpin,
965{
966	let ReadTaskParams {
967		receiver,
968		close_tx,
969		to_send_task,
970		manager,
971		max_buffer_capacity_per_subscription,
972		mut inactivity_check,
973		mut inactivity_stream,
974	} = params;
975
976	let backend_event = futures_util::stream::unfold(receiver, |mut receiver| async {
977		let res = receiver.receive().await;
978		Some((res, receiver))
979	});
980
981	// These "unsubscription" occurs if a subscription gets dropped by frontend before ack:ed or that if
982	// a subscription couldn't keep with the server.
983	//
984	// Thus, these needs to be sent to the server inorder to tell the server to not bother
985	// with those messages anymore.
986	let pending_unsubscribes = MaybePendingFutures::new();
987
988	tokio::pin!(backend_event, pending_unsubscribes);
989
990	// This is safe because futures::Stream and tokio::mpsc::Sender are cancel-safe.
991	let res = loop {
992		tokio::select! {
993			// Closed.
994			biased;
995			_ = close_tx.closed() => break Ok(()),
996			// Unsubscribe completed.
997			_ = pending_unsubscribes.next() => (),
998			// New message received.
999			maybe_msg = backend_event.next() => {
1000				inactivity_check.mark_as_active();
1001				let Some(msg) = maybe_msg else { break Ok(()) };
1002
1003				match handle_backend_messages::<R>(Some(msg), &manager, max_buffer_capacity_per_subscription) {
1004					Ok(messages) => {
1005						for msg in messages {
1006							pending_unsubscribes.push(to_send_task.send(msg));
1007						}
1008					}
1009					Err(e) => {
1010						tracing::debug!(target: LOG_TARGET, "Failed to read message: {e}");
1011						break Err(e);
1012					}
1013				}
1014			}
1015			_ = inactivity_stream.next() => {
1016				if inactivity_check.is_inactive() {
1017					break Err(Error::Transport("WebSocket ping/pong inactive".into()));
1018				}
1019			}
1020		}
1021	};
1022
1023	let _ = close_tx.send(res).await;
1024}
1025
1026async fn wait_for_shutdown(
1027	mut close_rx: mpsc::Receiver<Result<(), Error>>,
1028	client_dropped: oneshot::Receiver<()>,
1029	err_to_front: SharedDisconnectReason,
1030) {
1031	let rx_item = close_rx.recv();
1032
1033	tokio::pin!(rx_item);
1034
1035	// Send an error to the frontend if the send or receive task completed with an error.
1036	if let Either::Left((Some(Err(err)), _)) = future::select(rx_item, client_dropped).await {
1037		*err_to_front.write().expect(NOT_POISONED) = Some(Arc::new(err));
1038	}
1039}