jsonrpsee_core/client/async_client/
mod.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27//! Abstract async client.
28
29mod helpers;
30mod manager;
31mod utils;
32
33use crate::client::async_client::helpers::{process_subscription_close_response, InnerBatchResponse};
34use crate::client::async_client::utils::MaybePendingFutures;
35use crate::client::{
36	BatchMessage, BatchResponse, ClientT, Error, ReceivedMessage, RegisterNotificationMessage, RequestMessage,
37	Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT,
38};
39use crate::error::RegisterMethodError;
40use crate::params::{BatchRequestBuilder, EmptyBatchRequest};
41use crate::tracing::client::{rx_log_from_json, tx_log_from_str};
42use crate::traits::ToRpcParams;
43use crate::JsonRawValue;
44use std::borrow::Cow as StdCow;
45
46use core::time::Duration;
47use helpers::{
48	build_unsubscribe_message, call_with_timeout, process_batch_response, process_notification,
49	process_single_response, process_subscription_response, stop_subscription,
50};
51use jsonrpsee_types::{InvalidRequestId, ResponseSuccess, TwoPointZero};
52use manager::RequestManager;
53use std::sync::Arc;
54
55use async_trait::async_trait;
56use futures_timer::Delay;
57use futures_util::future::{self, Either};
58use futures_util::stream::StreamExt;
59use futures_util::Stream;
60use jsonrpsee_types::response::{ResponsePayload, SubscriptionError};
61use jsonrpsee_types::{NotificationSer, RequestSer, Response, SubscriptionResponse};
62use serde::de::DeserializeOwned;
63use tokio::sync::{mpsc, oneshot};
64use tracing::instrument;
65
66use self::utils::{InactivityCheck, IntervalStream};
67use super::{generate_batch_id_range, subscription_channel, FrontToBack, IdKind, RequestIdManager};
68
69pub(crate) type Notification<'a> = jsonrpsee_types::Notification<'a, Option<serde_json::Value>>;
70
71const LOG_TARGET: &str = "jsonrpsee-client";
72const NOT_POISONED: &str = "Not poisoned; qed";
73
74/// Configuration for WebSocket ping/pong mechanism and it may be used to disconnect
75/// an inactive connection.
76///
77/// jsonrpsee doesn't associate the ping/pong frames just that if
78/// a pong frame isn't received within the `inactive_limit` then it's regarded
79/// as missed.
80///
81/// Such that the `inactive_limit` should be configured to longer than a single
82/// WebSocket ping takes or it might be missed and may end up
83/// terminating the connection.
84///
85/// Default: ping_interval: 30 seconds, max failures: 1 and inactive limit: 40 seconds.
86#[derive(Debug, Copy, Clone)]
87pub struct PingConfig {
88	/// Interval that the pings are sent.
89	pub(crate) ping_interval: Duration,
90	/// Max allowed time for a connection to stay idle.
91	pub(crate) inactive_limit: Duration,
92	/// Max failures.
93	pub(crate) max_failures: usize,
94}
95
96impl Default for PingConfig {
97	fn default() -> Self {
98		Self { ping_interval: Duration::from_secs(30), max_failures: 1, inactive_limit: Duration::from_secs(40) }
99	}
100}
101
102impl PingConfig {
103	/// Create a new PingConfig.
104	pub fn new() -> Self {
105		Self::default()
106	}
107
108	/// Configure the interval when the WebSocket pings are sent out.
109	pub fn ping_interval(mut self, ping_interval: Duration) -> Self {
110		self.ping_interval = ping_interval;
111		self
112	}
113
114	/// Configure how long to wait for the WebSocket pong.
115	/// When this limit is expired it's regarded as inresponsive.
116	///
117	/// You may configure how many times the connection is allowed to
118	/// be inactive by [`PingConfig::max_failures`].
119	pub fn inactive_limit(mut self, inactivity_limit: Duration) -> Self {
120		self.inactive_limit = inactivity_limit;
121		self
122	}
123
124	/// Configure how many times the connection is allowed be
125	/// inactive until the connection is closed.
126	///
127	/// # Panics
128	///
129	/// This method panics if `max` == 0.
130	pub fn max_failures(mut self, max: usize) -> Self {
131		assert!(max > 0);
132		self.max_failures = max;
133		self
134	}
135}
136
137#[derive(Debug, Default, Clone)]
138pub(crate) struct ThreadSafeRequestManager(Arc<std::sync::Mutex<RequestManager>>);
139
140impl ThreadSafeRequestManager {
141	pub(crate) fn new() -> Self {
142		Self::default()
143	}
144
145	pub(crate) fn lock(&self) -> std::sync::MutexGuard<RequestManager> {
146		self.0.lock().expect(NOT_POISONED)
147	}
148}
149
150pub(crate) type SharedDisconnectReason = Arc<std::sync::RwLock<Option<Arc<Error>>>>;
151
152/// If the background thread is terminated, this type
153/// can be used to read the error cause.
154///
155// NOTE: This is an AsyncRwLock to be &self.
156#[derive(Debug)]
157struct ErrorFromBack {
158	conn: mpsc::Sender<FrontToBack>,
159	disconnect_reason: SharedDisconnectReason,
160}
161
162impl ErrorFromBack {
163	fn new(conn: mpsc::Sender<FrontToBack>, disconnect_reason: SharedDisconnectReason) -> Self {
164		Self { conn, disconnect_reason }
165	}
166
167	async fn read_error(&self) -> Error {
168		// When the background task is closed the error is written to `disconnect_reason`.
169		self.conn.closed().await;
170
171		if let Some(err) = self.disconnect_reason.read().expect(NOT_POISONED).as_ref() {
172			Error::RestartNeeded(err.clone())
173		} else {
174			Error::Custom("Error reason could not be found. This is a bug. Please open an issue.".to_string())
175		}
176	}
177}
178
179/// Builder for [`Client`].
180#[derive(Debug, Copy, Clone)]
181pub struct ClientBuilder {
182	request_timeout: Duration,
183	max_concurrent_requests: usize,
184	max_buffer_capacity_per_subscription: usize,
185	id_kind: IdKind,
186	max_log_length: u32,
187	ping_config: Option<PingConfig>,
188	tcp_no_delay: bool,
189}
190
191impl Default for ClientBuilder {
192	fn default() -> Self {
193		Self {
194			request_timeout: Duration::from_secs(60),
195			max_concurrent_requests: 256,
196			max_buffer_capacity_per_subscription: 1024,
197			id_kind: IdKind::Number,
198			max_log_length: 4096,
199			ping_config: None,
200			tcp_no_delay: true,
201		}
202	}
203}
204
205impl ClientBuilder {
206	/// Create a builder for the client.
207	pub fn new() -> ClientBuilder {
208		ClientBuilder::default()
209	}
210
211	/// Set request timeout (default is 60 seconds).
212	pub fn request_timeout(mut self, timeout: Duration) -> Self {
213		self.request_timeout = timeout;
214		self
215	}
216
217	/// Set max concurrent requests (default is 256).
218	pub fn max_concurrent_requests(mut self, max: usize) -> Self {
219		self.max_concurrent_requests = max;
220		self
221	}
222
223	/// Set max buffer capacity for each subscription; when the capacity is exceeded the subscription
224	/// will be dropped (default is 1024).
225	///
226	/// You may prevent the subscription from being dropped by polling often enough
227	/// [`Subscription::next()`](../../jsonrpsee_core/client/struct.Subscription.html#method.next) such that
228	/// it can keep with the rate as server produces new items on the subscription.
229	///
230	///
231	/// # Panics
232	///
233	/// This function panics if `max` is 0.
234	pub fn max_buffer_capacity_per_subscription(mut self, max: usize) -> Self {
235		assert!(max > 0);
236		self.max_buffer_capacity_per_subscription = max;
237		self
238	}
239
240	/// Configure the data type of the request object ID (default is number).
241	pub fn id_format(mut self, id_kind: IdKind) -> Self {
242		self.id_kind = id_kind;
243		self
244	}
245
246	/// Set maximum length for logging calls and responses.
247	///
248	/// Logs bigger than this limit will be truncated.
249	pub fn set_max_logging_length(mut self, max: u32) -> Self {
250		self.max_log_length = max;
251		self
252	}
253
254	/// Enable WebSocket ping/pong on the client.
255	///
256	/// This only works if the transport supports WebSocket pings.
257	///
258	/// Default: pings are disabled.
259	pub fn enable_ws_ping(mut self, cfg: PingConfig) -> Self {
260		self.ping_config = Some(cfg);
261		self
262	}
263
264	/// Disable WebSocket ping/pong on the server.
265	///
266	/// Default: pings are disabled.
267	pub fn disable_ws_ping(mut self) -> Self {
268		self.ping_config = None;
269		self
270	}
271
272	/// Configure `TCP_NODELAY` on the socket to the supplied value `nodelay`.
273	///
274	/// On some transports this may have no effect.
275	///
276	/// Default is `true`.
277	pub fn set_tcp_no_delay(mut self, no_delay: bool) -> Self {
278		self.tcp_no_delay = no_delay;
279		self
280	}
281
282	/// Build the client with given transport.
283	///
284	/// ## Panics
285	///
286	/// Panics if called outside of `tokio` runtime context.
287	#[cfg(feature = "async-client")]
288	#[cfg_attr(docsrs, doc(cfg(feature = "async-client")))]
289	pub fn build_with_tokio<S, R>(self, sender: S, receiver: R) -> Client
290	where
291		S: TransportSenderT + Send,
292		R: TransportReceiverT + Send,
293	{
294		let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
295		let disconnect_reason = SharedDisconnectReason::default();
296		let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription;
297		let (client_dropped_tx, client_dropped_rx) = oneshot::channel();
298		let (send_receive_task_sync_tx, send_receive_task_sync_rx) = mpsc::channel(1);
299		let manager = ThreadSafeRequestManager::new();
300
301		let (ping_interval, inactivity_stream, inactivity_check) = match self.ping_config {
302			None => (IntervalStream::pending(), IntervalStream::pending(), InactivityCheck::Disabled),
303			Some(p) => {
304				// NOTE: This emits a tick immediately to sync how the `inactive_interval` works
305				// because it starts measuring when the client start-ups.
306				let ping_interval = IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(
307					tokio::time::interval(p.ping_interval),
308				));
309
310				let inactive_interval = {
311					let start = tokio::time::Instant::now() + p.inactive_limit;
312					IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
313						start,
314						p.inactive_limit,
315					)))
316				};
317
318				let inactivity_check = InactivityCheck::new(p.inactive_limit, p.max_failures);
319
320				(ping_interval, inactive_interval, inactivity_check)
321			}
322		};
323
324		tokio::spawn(send_task(SendTaskParams {
325			sender,
326			from_frontend: from_front,
327			close_tx: send_receive_task_sync_tx.clone(),
328			manager: manager.clone(),
329			max_buffer_capacity_per_subscription,
330			ping_interval,
331		}));
332
333		tokio::spawn(read_task(ReadTaskParams {
334			receiver,
335			close_tx: send_receive_task_sync_tx,
336			to_send_task: to_back.clone(),
337			manager,
338			max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription,
339			inactivity_check,
340			inactivity_stream,
341		}));
342
343		tokio::spawn(wait_for_shutdown(send_receive_task_sync_rx, client_dropped_rx, disconnect_reason.clone()));
344
345		Client {
346			to_back: to_back.clone(),
347			request_timeout: self.request_timeout,
348			error: ErrorFromBack::new(to_back, disconnect_reason),
349			id_manager: RequestIdManager::new(self.id_kind),
350			max_log_length: self.max_log_length,
351			on_exit: Some(client_dropped_tx),
352		}
353	}
354
355	/// Build the client with given transport.
356	#[cfg(all(feature = "async-wasm-client", target_arch = "wasm32"))]
357	#[cfg_attr(docsrs, doc(cfg(feature = "async-wasm-client")))]
358	pub fn build_with_wasm<S, R>(self, sender: S, receiver: R) -> Client
359	where
360		S: TransportSenderT,
361		R: TransportReceiverT,
362	{
363		use futures_util::stream::Pending;
364
365		type PendingIntervalStream = IntervalStream<Pending<()>>;
366
367		let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
368		let disconnect_reason = SharedDisconnectReason::default();
369		let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription;
370		let (client_dropped_tx, client_dropped_rx) = oneshot::channel();
371		let (send_receive_task_sync_tx, send_receive_task_sync_rx) = mpsc::channel(1);
372		let manager = ThreadSafeRequestManager::new();
373
374		let ping_interval = PendingIntervalStream::pending();
375		let inactivity_stream = PendingIntervalStream::pending();
376		let inactivity_check = InactivityCheck::Disabled;
377
378		wasm_bindgen_futures::spawn_local(send_task(SendTaskParams {
379			sender,
380			from_frontend: from_front,
381			close_tx: send_receive_task_sync_tx.clone(),
382			manager: manager.clone(),
383			max_buffer_capacity_per_subscription,
384			ping_interval,
385		}));
386
387		wasm_bindgen_futures::spawn_local(read_task(ReadTaskParams {
388			receiver,
389			close_tx: send_receive_task_sync_tx,
390			to_send_task: to_back.clone(),
391			manager,
392			max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription,
393			inactivity_check,
394			inactivity_stream,
395		}));
396
397		wasm_bindgen_futures::spawn_local(wait_for_shutdown(
398			send_receive_task_sync_rx,
399			client_dropped_rx,
400			disconnect_reason.clone(),
401		));
402
403		Client {
404			to_back: to_back.clone(),
405			request_timeout: self.request_timeout,
406			error: ErrorFromBack::new(to_back, disconnect_reason),
407			id_manager: RequestIdManager::new(self.id_kind),
408			max_log_length: self.max_log_length,
409			on_exit: Some(client_dropped_tx),
410		}
411	}
412}
413
414/// Generic asynchronous client.
415#[derive(Debug)]
416pub struct Client {
417	/// Channel to send requests to the background task.
418	to_back: mpsc::Sender<FrontToBack>,
419	error: ErrorFromBack,
420	/// Request timeout. Defaults to 60sec.
421	request_timeout: Duration,
422	/// Request ID manager.
423	id_manager: RequestIdManager,
424	/// Max length for logging for requests and responses.
425	///
426	/// Entries bigger than this limit will be truncated.
427	max_log_length: u32,
428	/// When the client is dropped a message is sent to the background thread.
429	on_exit: Option<oneshot::Sender<()>>,
430}
431
432impl Client {
433	/// Create a builder for the server.
434	pub fn builder() -> ClientBuilder {
435		ClientBuilder::new()
436	}
437
438	/// Checks if the client is connected to the target.
439	pub fn is_connected(&self) -> bool {
440		!self.to_back.is_closed()
441	}
442
443	/// This is similar to [`Client::on_disconnect`] but it can be used to get
444	/// the reason why the client was disconnected but it's not cancel-safe.
445	///
446	/// The typical use-case is that this method will be called after
447	/// [`Client::on_disconnect`] has returned in a "select loop".
448	///
449	/// # Cancel-safety
450	///
451	/// This method is cancel-safe
452	pub async fn disconnect_reason(&self) -> Error {
453		self.error.read_error().await
454	}
455
456	/// Completes when the client is disconnected or the client's background task encountered an error.
457	/// If the client is already disconnected, the future produced by this method will complete immediately.
458	///
459	/// # Cancel safety
460	///
461	/// This method is cancel safe.
462	pub async fn on_disconnect(&self) {
463		self.to_back.closed().await;
464	}
465}
466
467impl Drop for Client {
468	fn drop(&mut self) {
469		if let Some(e) = self.on_exit.take() {
470			let _ = e.send(());
471		}
472	}
473}
474
475#[async_trait]
476impl ClientT for Client {
477	#[instrument(name = "notification", skip(self, params), level = "trace")]
478	async fn notification<Params>(&self, method: &str, params: Params) -> Result<(), Error>
479	where
480		Params: ToRpcParams + Send,
481	{
482		// NOTE: we use this to guard against max number of concurrent requests.
483		let _req_id = self.id_manager.next_request_id();
484		let params = params.to_rpc_params()?;
485		let notif = NotificationSer::borrowed(&method, params.as_deref());
486
487		let raw = serde_json::to_string(&notif).map_err(Error::ParseError)?;
488		tx_log_from_str(&raw, self.max_log_length);
489
490		let sender = self.to_back.clone();
491		let fut = sender.send(FrontToBack::Notification(raw));
492
493		tokio::pin!(fut);
494
495		match future::select(fut, Delay::new(self.request_timeout)).await {
496			Either::Left((Ok(()), _)) => Ok(()),
497			Either::Left((Err(_), _)) => Err(self.disconnect_reason().await),
498			Either::Right((_, _)) => Err(Error::RequestTimeout),
499		}
500	}
501
502	#[instrument(name = "method_call", skip(self, params), level = "trace")]
503	async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, Error>
504	where
505		R: DeserializeOwned,
506		Params: ToRpcParams + Send,
507	{
508		let (send_back_tx, send_back_rx) = oneshot::channel();
509		let id = self.id_manager.next_request_id();
510
511		let params = params.to_rpc_params()?;
512		let raw =
513			serde_json::to_string(&RequestSer::borrowed(&id, &method, params.as_deref())).map_err(Error::ParseError)?;
514		tx_log_from_str(&raw, self.max_log_length);
515
516		if self
517			.to_back
518			.clone()
519			.send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) }))
520			.await
521			.is_err()
522		{
523			return Err(self.disconnect_reason().await);
524		}
525
526		let json_value = match call_with_timeout(self.request_timeout, send_back_rx).await {
527			Ok(Ok(v)) => v,
528			Ok(Err(err)) => return Err(err),
529			Err(_) => return Err(self.disconnect_reason().await),
530		};
531
532		rx_log_from_json(&Response::new(ResponsePayload::success_borrowed(&json_value), id), self.max_log_length);
533
534		serde_json::from_value(json_value).map_err(Error::ParseError)
535	}
536
537	#[instrument(name = "batch", skip(self, batch), level = "trace")]
538	async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result<BatchResponse<'a, R>, Error>
539	where
540		R: DeserializeOwned,
541	{
542		let batch = batch.build()?;
543		let id = self.id_manager.next_request_id();
544		let id_range = generate_batch_id_range(id, batch.len() as u64)?;
545
546		let mut batches = Vec::with_capacity(batch.len());
547		for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
548			let id = self.id_manager.as_id_kind().into_id(id);
549			batches.push(RequestSer {
550				jsonrpc: TwoPointZero,
551				id,
552				method: method.into(),
553				params: params.map(StdCow::Owned),
554			});
555		}
556
557		let (send_back_tx, send_back_rx) = oneshot::channel();
558
559		let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?;
560
561		tx_log_from_str(&raw, self.max_log_length);
562
563		if self
564			.to_back
565			.clone()
566			.send(FrontToBack::Batch(BatchMessage { raw, ids: id_range, send_back: send_back_tx }))
567			.await
568			.is_err()
569		{
570			return Err(self.disconnect_reason().await);
571		}
572
573		let res = call_with_timeout(self.request_timeout, send_back_rx).await;
574		let json_values = match res {
575			Ok(Ok(v)) => v,
576			Ok(Err(err)) => return Err(err),
577			Err(_) => return Err(self.disconnect_reason().await),
578		};
579
580		rx_log_from_json(&json_values, self.max_log_length);
581
582		let mut responses = Vec::with_capacity(json_values.len());
583		let mut successful_calls = 0;
584		let mut failed_calls = 0;
585
586		for json_val in json_values {
587			match json_val {
588				Ok(val) => {
589					let result: R = serde_json::from_value(val).map_err(Error::ParseError)?;
590					responses.push(Ok(result));
591					successful_calls += 1;
592				}
593				Err(err) => {
594					responses.push(Err(err));
595					failed_calls += 1;
596				}
597			}
598		}
599		Ok(BatchResponse { successful_calls, failed_calls, responses })
600	}
601}
602
603#[async_trait]
604impl SubscriptionClientT for Client {
605	/// Send a subscription request to the server.
606	///
607	/// The `subscribe_method` and `params` are used to ask for the subscription towards the
608	/// server. The `unsubscribe_method` is used to close the subscription.
609	#[instrument(name = "subscription", fields(method = subscribe_method), skip(self, params, subscribe_method, unsubscribe_method), level = "trace")]
610	async fn subscribe<'a, Notif, Params>(
611		&self,
612		subscribe_method: &'a str,
613		params: Params,
614		unsubscribe_method: &'a str,
615	) -> Result<Subscription<Notif>, Error>
616	where
617		Params: ToRpcParams + Send,
618		Notif: DeserializeOwned,
619	{
620		if subscribe_method == unsubscribe_method {
621			return Err(RegisterMethodError::SubscriptionNameConflict(unsubscribe_method.to_owned()).into());
622		}
623
624		let id_sub = self.id_manager.next_request_id();
625		let id_unsub = self.id_manager.next_request_id();
626		let params = params.to_rpc_params()?;
627
628		let raw = serde_json::to_string(&RequestSer::borrowed(&id_sub, &subscribe_method, params.as_deref()))
629			.map_err(Error::ParseError)?;
630
631		tx_log_from_str(&raw, self.max_log_length);
632
633		let (send_back_tx, send_back_rx) = tokio::sync::oneshot::channel();
634		if self
635			.to_back
636			.clone()
637			.send(FrontToBack::Subscribe(SubscriptionMessage {
638				raw,
639				subscribe_id: id_sub,
640				unsubscribe_id: id_unsub.clone(),
641				unsubscribe_method: unsubscribe_method.to_owned(),
642				send_back: send_back_tx,
643			}))
644			.await
645			.is_err()
646		{
647			return Err(self.disconnect_reason().await);
648		}
649
650		let (notifs_rx, sub_id) = match call_with_timeout(self.request_timeout, send_back_rx).await {
651			Ok(Ok(val)) => val,
652			Ok(Err(err)) => return Err(err),
653			Err(_) => return Err(self.disconnect_reason().await),
654		};
655
656		rx_log_from_json(&Response::new(ResponsePayload::success_borrowed(&sub_id), id_unsub), self.max_log_length);
657
658		Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id)))
659	}
660
661	/// Subscribe to a specific method.
662	#[instrument(name = "subscribe_method", skip(self), level = "trace")]
663	async fn subscribe_to_method<'a, N>(&self, method: &'a str) -> Result<Subscription<N>, Error>
664	where
665		N: DeserializeOwned,
666	{
667		let (send_back_tx, send_back_rx) = oneshot::channel();
668		if self
669			.to_back
670			.clone()
671			.send(FrontToBack::RegisterNotification(RegisterNotificationMessage {
672				send_back: send_back_tx,
673				method: method.to_owned(),
674			}))
675			.await
676			.is_err()
677		{
678			return Err(self.disconnect_reason().await);
679		}
680
681		let res = call_with_timeout(self.request_timeout, send_back_rx).await;
682
683		let (rx, method) = match res {
684			Ok(Ok(val)) => val,
685			Ok(Err(err)) => return Err(err),
686			Err(_) => return Err(self.disconnect_reason().await),
687		};
688
689		Ok(Subscription::new(self.to_back.clone(), rx, SubscriptionKind::Method(method)))
690	}
691}
692
693/// Handle backend messages.
694///
695/// Returns an error if the main background loop should be terminated.
696fn handle_backend_messages<R: TransportReceiverT>(
697	message: Option<Result<ReceivedMessage, R::Error>>,
698	manager: &ThreadSafeRequestManager,
699	max_buffer_capacity_per_subscription: usize,
700) -> Result<Vec<FrontToBack>, Error> {
701	// Handle raw messages of form `ReceivedMessage::Bytes` (Vec<u8>) or ReceivedMessage::Data` (String).
702	fn handle_recv_message(
703		raw: &[u8],
704		manager: &ThreadSafeRequestManager,
705		max_buffer_capacity_per_subscription: usize,
706	) -> Result<Vec<FrontToBack>, Error> {
707		let first_non_whitespace = raw.iter().find(|byte| !byte.is_ascii_whitespace());
708		let mut messages = Vec::new();
709
710		match first_non_whitespace {
711			Some(b'{') => {
712				// Single response to a request.
713				if let Ok(single) = serde_json::from_slice::<Response<_>>(raw) {
714					let maybe_unsub =
715						process_single_response(&mut manager.lock(), single, max_buffer_capacity_per_subscription)?;
716
717					if let Some(unsub) = maybe_unsub {
718						return Ok(vec![FrontToBack::Request(unsub)]);
719					}
720				}
721				// Subscription response.
722				else if let Ok(response) = serde_json::from_slice::<SubscriptionResponse<_>>(raw) {
723					if let Some(sub_id) = process_subscription_response(&mut manager.lock(), response) {
724						return Ok(vec![FrontToBack::SubscriptionClosed(sub_id)]);
725					}
726				}
727				// Subscription error response.
728				else if let Ok(response) = serde_json::from_slice::<SubscriptionError<_>>(raw) {
729					process_subscription_close_response(&mut manager.lock(), response);
730				}
731				// Incoming Notification
732				else if let Ok(notif) = serde_json::from_slice::<Notification>(raw) {
733					process_notification(&mut manager.lock(), notif);
734				} else {
735					return Err(unparse_error(raw));
736				}
737			}
738			Some(b'[') => {
739				// Batch response.
740				if let Ok(raw_responses) = serde_json::from_slice::<Vec<&JsonRawValue>>(raw) {
741					let mut batch = Vec::with_capacity(raw_responses.len());
742
743					let mut range = None;
744					let mut got_notif = false;
745
746					for r in raw_responses {
747						if let Ok(response) = serde_json::from_str::<Response<_>>(r.get()) {
748							let id = response.id.try_parse_inner_as_number()?;
749							let result = ResponseSuccess::try_from(response).map(|s| s.result);
750							batch.push(InnerBatchResponse { id, result });
751
752							let r = range.get_or_insert(id..id);
753
754							if id < r.start {
755								r.start = id;
756							}
757
758							if id > r.end {
759								r.end = id;
760							}
761						} else if let Ok(response) = serde_json::from_str::<SubscriptionResponse<_>>(r.get()) {
762							got_notif = true;
763							if let Some(sub_id) = process_subscription_response(&mut manager.lock(), response) {
764								messages.push(FrontToBack::SubscriptionClosed(sub_id));
765							}
766						} else if let Ok(response) = serde_json::from_slice::<SubscriptionError<_>>(raw) {
767							got_notif = true;
768							process_subscription_close_response(&mut manager.lock(), response);
769						} else if let Ok(notif) = serde_json::from_str::<Notification>(r.get()) {
770							got_notif = true;
771							process_notification(&mut manager.lock(), notif);
772						} else {
773							return Err(unparse_error(raw));
774						};
775					}
776
777					if let Some(mut range) = range {
778						// the range is exclusive so need to add one.
779						range.end += 1;
780						process_batch_response(&mut manager.lock(), batch, range)?;
781					} else if !got_notif {
782						return Err(EmptyBatchRequest.into());
783					}
784				} else {
785					return Err(unparse_error(raw));
786				}
787			}
788			_ => {
789				return Err(unparse_error(raw));
790			}
791		};
792
793		Ok(messages)
794	}
795
796	match message {
797		Some(Ok(ReceivedMessage::Pong)) => {
798			tracing::debug!(target: LOG_TARGET, "Received pong");
799			Ok(vec![])
800		}
801		Some(Ok(ReceivedMessage::Bytes(raw))) => {
802			handle_recv_message(raw.as_ref(), manager, max_buffer_capacity_per_subscription)
803		}
804		Some(Ok(ReceivedMessage::Text(raw))) => {
805			handle_recv_message(raw.as_ref(), manager, max_buffer_capacity_per_subscription)
806		}
807		Some(Err(e)) => Err(Error::Transport(e.into())),
808		None => Err(Error::Custom("TransportReceiver dropped".into())),
809	}
810}
811
812/// Handle frontend messages.
813async fn handle_frontend_messages<S: TransportSenderT>(
814	message: FrontToBack,
815	manager: &ThreadSafeRequestManager,
816	sender: &mut S,
817	max_buffer_capacity_per_subscription: usize,
818) -> Result<(), S::Error> {
819	match message {
820		FrontToBack::Batch(batch) => {
821			if let Err(send_back) = manager.lock().insert_pending_batch(batch.ids.clone(), batch.send_back) {
822				tracing::debug!(target: LOG_TARGET, "Batch request already pending: {:?}", batch.ids);
823				let _ = send_back.send(Err(InvalidRequestId::Occupied(format!("{:?}", batch.ids)).into()));
824				return Ok(());
825			}
826
827			sender.send(batch.raw).await?;
828		}
829		// User called `notification` on the front-end
830		FrontToBack::Notification(notif) => {
831			sender.send(notif).await?;
832		}
833		// User called `request` on the front-end
834		FrontToBack::Request(request) => {
835			if let Err(send_back) = manager.lock().insert_pending_call(request.id.clone(), request.send_back) {
836				tracing::debug!(target: LOG_TARGET, "Denied duplicate method call");
837
838				if let Some(s) = send_back {
839					let _ = s.send(Err(InvalidRequestId::Occupied(request.id.to_string()).into()));
840				}
841				return Ok(());
842			}
843
844			sender.send(request.raw).await?;
845		}
846		// User called `subscribe` on the front-end.
847		FrontToBack::Subscribe(sub) => {
848			if let Err(send_back) = manager.lock().insert_pending_subscription(
849				sub.subscribe_id.clone(),
850				sub.unsubscribe_id.clone(),
851				sub.send_back,
852				sub.unsubscribe_method,
853			) {
854				tracing::debug!(target: LOG_TARGET, "Denied duplicate subscription");
855
856				let _ = send_back.send(Err(InvalidRequestId::Occupied(format!(
857					"sub_id={}:req_id={}",
858					sub.subscribe_id, sub.unsubscribe_id
859				))
860				.into()));
861				return Ok(());
862			}
863
864			sender.send(sub.raw).await?;
865		}
866		// User dropped a subscription.
867		FrontToBack::SubscriptionClosed(sub_id) => {
868			tracing::trace!(target: LOG_TARGET, "Closing subscription: {:?}", sub_id);
869			// NOTE: The subscription may have been closed earlier if
870			// the channel was full or disconnected.
871
872			let maybe_unsub = {
873				let m = &mut *manager.lock();
874
875				m.get_request_id_by_subscription_id(&sub_id)
876					.and_then(|req_id| build_unsubscribe_message(m, req_id, sub_id))
877			};
878
879			if let Some(unsub) = maybe_unsub {
880				stop_subscription::<S>(sender, unsub).await?;
881			}
882		}
883		// User called `register_notification` on the front-end.
884		FrontToBack::RegisterNotification(reg) => {
885			let (subscribe_tx, subscribe_rx) = subscription_channel(max_buffer_capacity_per_subscription);
886
887			if manager.lock().insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
888				let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
889			} else {
890				let _ = reg.send_back.send(Err(RegisterMethodError::AlreadyRegistered(reg.method).into()));
891			}
892		}
893		// User dropped the NotificationHandler for this method
894		FrontToBack::UnregisterNotification(method) => {
895			let _ = manager.lock().remove_notification_handler(&method);
896		}
897	};
898
899	Ok(())
900}
901
902fn unparse_error(raw: &[u8]) -> Error {
903	let json = serde_json::from_slice::<serde_json::Value>(raw);
904
905	let json_str = match json {
906		Ok(json) => serde_json::to_string(&json).expect("valid JSON; qed"),
907		Err(e) => e.to_string(),
908	};
909
910	Error::Custom(format!("Unparseable message: {json_str}"))
911}
912
913struct SendTaskParams<T: TransportSenderT, S> {
914	sender: T,
915	from_frontend: mpsc::Receiver<FrontToBack>,
916	close_tx: mpsc::Sender<Result<(), Error>>,
917	manager: ThreadSafeRequestManager,
918	max_buffer_capacity_per_subscription: usize,
919	ping_interval: IntervalStream<S>,
920}
921
922async fn send_task<T, S>(params: SendTaskParams<T, S>)
923where
924	T: TransportSenderT,
925	S: Stream + Unpin,
926{
927	let SendTaskParams {
928		mut sender,
929		mut from_frontend,
930		close_tx,
931		manager,
932		max_buffer_capacity_per_subscription,
933		mut ping_interval,
934	} = params;
935
936	// This is safe because `tokio::time::Interval`, `tokio::mpsc::Sender` and `tokio::mpsc::Receiver`
937	// are cancel-safe.
938	let res = loop {
939		tokio::select! {
940			biased;
941			_ = close_tx.closed() => break Ok(()),
942			maybe_msg = from_frontend.recv() => {
943				let Some(msg) = maybe_msg else {
944					break Ok(());
945				};
946
947				if let Err(e) =
948					handle_frontend_messages(msg, &manager, &mut sender, max_buffer_capacity_per_subscription).await
949				{
950					tracing::debug!(target: LOG_TARGET, "ws send failed: {e}");
951					break Err(Error::Transport(e.into()));
952				}
953			}
954			_ = ping_interval.next() => {
955				if let Err(err) = sender.send_ping().await {
956					tracing::debug!(target: LOG_TARGET, "Send ws ping failed: {err}");
957					break Err(Error::Transport(err.into()));
958				}
959			}
960		}
961	};
962
963	from_frontend.close();
964	let _ = sender.close().await;
965	let _ = close_tx.send(res).await;
966}
967
968struct ReadTaskParams<R: TransportReceiverT, S> {
969	receiver: R,
970	close_tx: mpsc::Sender<Result<(), Error>>,
971	to_send_task: mpsc::Sender<FrontToBack>,
972	manager: ThreadSafeRequestManager,
973	max_buffer_capacity_per_subscription: usize,
974	inactivity_check: InactivityCheck,
975	inactivity_stream: IntervalStream<S>,
976}
977
978async fn read_task<R, S>(params: ReadTaskParams<R, S>)
979where
980	R: TransportReceiverT,
981	S: Stream + Unpin,
982{
983	let ReadTaskParams {
984		receiver,
985		close_tx,
986		to_send_task,
987		manager,
988		max_buffer_capacity_per_subscription,
989		mut inactivity_check,
990		mut inactivity_stream,
991	} = params;
992
993	let backend_event = futures_util::stream::unfold(receiver, |mut receiver| async {
994		let res = receiver.receive().await;
995		Some((res, receiver))
996	});
997
998	// These "unsubscription" occurs if a subscription gets dropped by frontend before ack:ed or that if
999	// a subscription couldn't keep with the server.
1000	//
1001	// Thus, these needs to be sent to the server inorder to tell the server to not bother
1002	// with those messages anymore.
1003	let pending_unsubscribes = MaybePendingFutures::new();
1004
1005	tokio::pin!(backend_event, pending_unsubscribes);
1006
1007	// This is safe because futures::Stream and tokio::mpsc::Sender are cancel-safe.
1008	let res = loop {
1009		tokio::select! {
1010			// Closed.
1011			biased;
1012			_ = close_tx.closed() => break Ok(()),
1013			// Unsubscribe completed.
1014			_ = pending_unsubscribes.next() => (),
1015			// New message received.
1016			maybe_msg = backend_event.next() => {
1017				inactivity_check.mark_as_active();
1018				let Some(msg) = maybe_msg else { break Ok(()) };
1019
1020				match handle_backend_messages::<R>(Some(msg), &manager, max_buffer_capacity_per_subscription) {
1021					Ok(messages) => {
1022						for msg in messages {
1023							pending_unsubscribes.push(to_send_task.send(msg));
1024						}
1025					}
1026					Err(e) => {
1027						tracing::debug!(target: LOG_TARGET, "Failed to read message: {e}");
1028						break Err(e);
1029					}
1030				}
1031			}
1032			_ = inactivity_stream.next() => {
1033				if inactivity_check.is_inactive() {
1034					break Err(Error::Transport("WebSocket ping/pong inactive".into()));
1035				}
1036			}
1037		}
1038	};
1039
1040	let _ = close_tx.send(res).await;
1041}
1042
1043async fn wait_for_shutdown(
1044	mut close_rx: mpsc::Receiver<Result<(), Error>>,
1045	client_dropped: oneshot::Receiver<()>,
1046	err_to_front: SharedDisconnectReason,
1047) {
1048	let rx_item = close_rx.recv();
1049
1050	tokio::pin!(rx_item);
1051
1052	// Send an error to the frontend if the send or receive task completed with an error.
1053	if let Either::Left((Some(Err(err)), _)) = future::select(rx_item, client_dropped).await {
1054		*err_to_front.write().expect(NOT_POISONED) = Some(Arc::new(err));
1055	}
1056}