1mod helpers;
30mod manager;
31mod utils;
32
33use crate::client::async_client::helpers::{process_subscription_close_response, InnerBatchResponse};
34use crate::client::async_client::utils::MaybePendingFutures;
35use crate::client::{
36 BatchMessage, BatchResponse, ClientT, Error, ReceivedMessage, RegisterNotificationMessage, RequestMessage,
37 Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT,
38};
39use crate::error::RegisterMethodError;
40use crate::params::{BatchRequestBuilder, EmptyBatchRequest};
41use crate::tracing::client::{rx_log_from_json, tx_log_from_str};
42use crate::traits::ToRpcParams;
43use crate::JsonRawValue;
44use std::borrow::Cow as StdCow;
45
46use core::time::Duration;
47use helpers::{
48 build_unsubscribe_message, call_with_timeout, process_batch_response, process_notification,
49 process_single_response, process_subscription_response, stop_subscription,
50};
51use jsonrpsee_types::{InvalidRequestId, ResponseSuccess, TwoPointZero};
52use manager::RequestManager;
53use std::sync::Arc;
54
55use async_trait::async_trait;
56use futures_timer::Delay;
57use futures_util::future::{self, Either};
58use futures_util::stream::StreamExt;
59use futures_util::Stream;
60use jsonrpsee_types::response::{ResponsePayload, SubscriptionError};
61use jsonrpsee_types::{NotificationSer, RequestSer, Response, SubscriptionResponse};
62use serde::de::DeserializeOwned;
63use tokio::sync::{mpsc, oneshot};
64use tracing::instrument;
65
66use self::utils::{InactivityCheck, IntervalStream};
67use super::{generate_batch_id_range, subscription_channel, FrontToBack, IdKind, RequestIdManager};
68
69pub(crate) type Notification<'a> = jsonrpsee_types::Notification<'a, Option<serde_json::Value>>;
70
71const LOG_TARGET: &str = "jsonrpsee-client";
72const NOT_POISONED: &str = "Not poisoned; qed";
73
74#[derive(Debug, Copy, Clone)]
87pub struct PingConfig {
88 pub(crate) ping_interval: Duration,
90 pub(crate) inactive_limit: Duration,
92 pub(crate) max_failures: usize,
94}
95
96impl Default for PingConfig {
97 fn default() -> Self {
98 Self { ping_interval: Duration::from_secs(30), max_failures: 1, inactive_limit: Duration::from_secs(40) }
99 }
100}
101
102impl PingConfig {
103 pub fn new() -> Self {
105 Self::default()
106 }
107
108 pub fn ping_interval(mut self, ping_interval: Duration) -> Self {
110 self.ping_interval = ping_interval;
111 self
112 }
113
114 pub fn inactive_limit(mut self, inactivity_limit: Duration) -> Self {
120 self.inactive_limit = inactivity_limit;
121 self
122 }
123
124 pub fn max_failures(mut self, max: usize) -> Self {
131 assert!(max > 0);
132 self.max_failures = max;
133 self
134 }
135}
136
137#[derive(Debug, Default, Clone)]
138pub(crate) struct ThreadSafeRequestManager(Arc<std::sync::Mutex<RequestManager>>);
139
140impl ThreadSafeRequestManager {
141 pub(crate) fn new() -> Self {
142 Self::default()
143 }
144
145 pub(crate) fn lock(&self) -> std::sync::MutexGuard<RequestManager> {
146 self.0.lock().expect(NOT_POISONED)
147 }
148}
149
150pub(crate) type SharedDisconnectReason = Arc<std::sync::RwLock<Option<Arc<Error>>>>;
151
152#[derive(Debug)]
157struct ErrorFromBack {
158 conn: mpsc::Sender<FrontToBack>,
159 disconnect_reason: SharedDisconnectReason,
160}
161
162impl ErrorFromBack {
163 fn new(conn: mpsc::Sender<FrontToBack>, disconnect_reason: SharedDisconnectReason) -> Self {
164 Self { conn, disconnect_reason }
165 }
166
167 async fn read_error(&self) -> Error {
168 self.conn.closed().await;
170
171 if let Some(err) = self.disconnect_reason.read().expect(NOT_POISONED).as_ref() {
172 Error::RestartNeeded(err.clone())
173 } else {
174 Error::Custom("Error reason could not be found. This is a bug. Please open an issue.".to_string())
175 }
176 }
177}
178
179#[derive(Debug, Copy, Clone)]
181pub struct ClientBuilder {
182 request_timeout: Duration,
183 max_concurrent_requests: usize,
184 max_buffer_capacity_per_subscription: usize,
185 id_kind: IdKind,
186 max_log_length: u32,
187 ping_config: Option<PingConfig>,
188 tcp_no_delay: bool,
189}
190
191impl Default for ClientBuilder {
192 fn default() -> Self {
193 Self {
194 request_timeout: Duration::from_secs(60),
195 max_concurrent_requests: 256,
196 max_buffer_capacity_per_subscription: 1024,
197 id_kind: IdKind::Number,
198 max_log_length: 4096,
199 ping_config: None,
200 tcp_no_delay: true,
201 }
202 }
203}
204
205impl ClientBuilder {
206 pub fn new() -> ClientBuilder {
208 ClientBuilder::default()
209 }
210
211 pub fn request_timeout(mut self, timeout: Duration) -> Self {
213 self.request_timeout = timeout;
214 self
215 }
216
217 pub fn max_concurrent_requests(mut self, max: usize) -> Self {
219 self.max_concurrent_requests = max;
220 self
221 }
222
223 pub fn max_buffer_capacity_per_subscription(mut self, max: usize) -> Self {
235 assert!(max > 0);
236 self.max_buffer_capacity_per_subscription = max;
237 self
238 }
239
240 pub fn id_format(mut self, id_kind: IdKind) -> Self {
242 self.id_kind = id_kind;
243 self
244 }
245
246 pub fn set_max_logging_length(mut self, max: u32) -> Self {
250 self.max_log_length = max;
251 self
252 }
253
254 pub fn enable_ws_ping(mut self, cfg: PingConfig) -> Self {
260 self.ping_config = Some(cfg);
261 self
262 }
263
264 pub fn disable_ws_ping(mut self) -> Self {
268 self.ping_config = None;
269 self
270 }
271
272 pub fn set_tcp_no_delay(mut self, no_delay: bool) -> Self {
278 self.tcp_no_delay = no_delay;
279 self
280 }
281
282 #[cfg(feature = "async-client")]
288 #[cfg_attr(docsrs, doc(cfg(feature = "async-client")))]
289 pub fn build_with_tokio<S, R>(self, sender: S, receiver: R) -> Client
290 where
291 S: TransportSenderT + Send,
292 R: TransportReceiverT + Send,
293 {
294 let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
295 let disconnect_reason = SharedDisconnectReason::default();
296 let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription;
297 let (client_dropped_tx, client_dropped_rx) = oneshot::channel();
298 let (send_receive_task_sync_tx, send_receive_task_sync_rx) = mpsc::channel(1);
299 let manager = ThreadSafeRequestManager::new();
300
301 let (ping_interval, inactivity_stream, inactivity_check) = match self.ping_config {
302 None => (IntervalStream::pending(), IntervalStream::pending(), InactivityCheck::Disabled),
303 Some(p) => {
304 let ping_interval = IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(
307 tokio::time::interval(p.ping_interval),
308 ));
309
310 let inactive_interval = {
311 let start = tokio::time::Instant::now() + p.inactive_limit;
312 IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
313 start,
314 p.inactive_limit,
315 )))
316 };
317
318 let inactivity_check = InactivityCheck::new(p.inactive_limit, p.max_failures);
319
320 (ping_interval, inactive_interval, inactivity_check)
321 }
322 };
323
324 tokio::spawn(send_task(SendTaskParams {
325 sender,
326 from_frontend: from_front,
327 close_tx: send_receive_task_sync_tx.clone(),
328 manager: manager.clone(),
329 max_buffer_capacity_per_subscription,
330 ping_interval,
331 }));
332
333 tokio::spawn(read_task(ReadTaskParams {
334 receiver,
335 close_tx: send_receive_task_sync_tx,
336 to_send_task: to_back.clone(),
337 manager,
338 max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription,
339 inactivity_check,
340 inactivity_stream,
341 }));
342
343 tokio::spawn(wait_for_shutdown(send_receive_task_sync_rx, client_dropped_rx, disconnect_reason.clone()));
344
345 Client {
346 to_back: to_back.clone(),
347 request_timeout: self.request_timeout,
348 error: ErrorFromBack::new(to_back, disconnect_reason),
349 id_manager: RequestIdManager::new(self.id_kind),
350 max_log_length: self.max_log_length,
351 on_exit: Some(client_dropped_tx),
352 }
353 }
354
355 #[cfg(all(feature = "async-wasm-client", target_arch = "wasm32"))]
357 #[cfg_attr(docsrs, doc(cfg(feature = "async-wasm-client")))]
358 pub fn build_with_wasm<S, R>(self, sender: S, receiver: R) -> Client
359 where
360 S: TransportSenderT,
361 R: TransportReceiverT,
362 {
363 use futures_util::stream::Pending;
364
365 type PendingIntervalStream = IntervalStream<Pending<()>>;
366
367 let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
368 let disconnect_reason = SharedDisconnectReason::default();
369 let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription;
370 let (client_dropped_tx, client_dropped_rx) = oneshot::channel();
371 let (send_receive_task_sync_tx, send_receive_task_sync_rx) = mpsc::channel(1);
372 let manager = ThreadSafeRequestManager::new();
373
374 let ping_interval = PendingIntervalStream::pending();
375 let inactivity_stream = PendingIntervalStream::pending();
376 let inactivity_check = InactivityCheck::Disabled;
377
378 wasm_bindgen_futures::spawn_local(send_task(SendTaskParams {
379 sender,
380 from_frontend: from_front,
381 close_tx: send_receive_task_sync_tx.clone(),
382 manager: manager.clone(),
383 max_buffer_capacity_per_subscription,
384 ping_interval,
385 }));
386
387 wasm_bindgen_futures::spawn_local(read_task(ReadTaskParams {
388 receiver,
389 close_tx: send_receive_task_sync_tx,
390 to_send_task: to_back.clone(),
391 manager,
392 max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription,
393 inactivity_check,
394 inactivity_stream,
395 }));
396
397 wasm_bindgen_futures::spawn_local(wait_for_shutdown(
398 send_receive_task_sync_rx,
399 client_dropped_rx,
400 disconnect_reason.clone(),
401 ));
402
403 Client {
404 to_back: to_back.clone(),
405 request_timeout: self.request_timeout,
406 error: ErrorFromBack::new(to_back, disconnect_reason),
407 id_manager: RequestIdManager::new(self.id_kind),
408 max_log_length: self.max_log_length,
409 on_exit: Some(client_dropped_tx),
410 }
411 }
412}
413
414#[derive(Debug)]
416pub struct Client {
417 to_back: mpsc::Sender<FrontToBack>,
419 error: ErrorFromBack,
420 request_timeout: Duration,
422 id_manager: RequestIdManager,
424 max_log_length: u32,
428 on_exit: Option<oneshot::Sender<()>>,
430}
431
432impl Client {
433 pub fn builder() -> ClientBuilder {
435 ClientBuilder::new()
436 }
437
438 pub fn is_connected(&self) -> bool {
440 !self.to_back.is_closed()
441 }
442
443 pub async fn disconnect_reason(&self) -> Error {
453 self.error.read_error().await
454 }
455
456 pub async fn on_disconnect(&self) {
463 self.to_back.closed().await;
464 }
465}
466
467impl Drop for Client {
468 fn drop(&mut self) {
469 if let Some(e) = self.on_exit.take() {
470 let _ = e.send(());
471 }
472 }
473}
474
475#[async_trait]
476impl ClientT for Client {
477 #[instrument(name = "notification", skip(self, params), level = "trace")]
478 async fn notification<Params>(&self, method: &str, params: Params) -> Result<(), Error>
479 where
480 Params: ToRpcParams + Send,
481 {
482 let _req_id = self.id_manager.next_request_id();
484 let params = params.to_rpc_params()?;
485 let notif = NotificationSer::borrowed(&method, params.as_deref());
486
487 let raw = serde_json::to_string(¬if).map_err(Error::ParseError)?;
488 tx_log_from_str(&raw, self.max_log_length);
489
490 let sender = self.to_back.clone();
491 let fut = sender.send(FrontToBack::Notification(raw));
492
493 tokio::pin!(fut);
494
495 match future::select(fut, Delay::new(self.request_timeout)).await {
496 Either::Left((Ok(()), _)) => Ok(()),
497 Either::Left((Err(_), _)) => Err(self.disconnect_reason().await),
498 Either::Right((_, _)) => Err(Error::RequestTimeout),
499 }
500 }
501
502 #[instrument(name = "method_call", skip(self, params), level = "trace")]
503 async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, Error>
504 where
505 R: DeserializeOwned,
506 Params: ToRpcParams + Send,
507 {
508 let (send_back_tx, send_back_rx) = oneshot::channel();
509 let id = self.id_manager.next_request_id();
510
511 let params = params.to_rpc_params()?;
512 let raw =
513 serde_json::to_string(&RequestSer::borrowed(&id, &method, params.as_deref())).map_err(Error::ParseError)?;
514 tx_log_from_str(&raw, self.max_log_length);
515
516 if self
517 .to_back
518 .clone()
519 .send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) }))
520 .await
521 .is_err()
522 {
523 return Err(self.disconnect_reason().await);
524 }
525
526 let json_value = match call_with_timeout(self.request_timeout, send_back_rx).await {
527 Ok(Ok(v)) => v,
528 Ok(Err(err)) => return Err(err),
529 Err(_) => return Err(self.disconnect_reason().await),
530 };
531
532 rx_log_from_json(&Response::new(ResponsePayload::success_borrowed(&json_value), id), self.max_log_length);
533
534 serde_json::from_value(json_value).map_err(Error::ParseError)
535 }
536
537 #[instrument(name = "batch", skip(self, batch), level = "trace")]
538 async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result<BatchResponse<'a, R>, Error>
539 where
540 R: DeserializeOwned,
541 {
542 let batch = batch.build()?;
543 let id = self.id_manager.next_request_id();
544 let id_range = generate_batch_id_range(id, batch.len() as u64)?;
545
546 let mut batches = Vec::with_capacity(batch.len());
547 for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
548 let id = self.id_manager.as_id_kind().into_id(id);
549 batches.push(RequestSer {
550 jsonrpc: TwoPointZero,
551 id,
552 method: method.into(),
553 params: params.map(StdCow::Owned),
554 });
555 }
556
557 let (send_back_tx, send_back_rx) = oneshot::channel();
558
559 let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?;
560
561 tx_log_from_str(&raw, self.max_log_length);
562
563 if self
564 .to_back
565 .clone()
566 .send(FrontToBack::Batch(BatchMessage { raw, ids: id_range, send_back: send_back_tx }))
567 .await
568 .is_err()
569 {
570 return Err(self.disconnect_reason().await);
571 }
572
573 let res = call_with_timeout(self.request_timeout, send_back_rx).await;
574 let json_values = match res {
575 Ok(Ok(v)) => v,
576 Ok(Err(err)) => return Err(err),
577 Err(_) => return Err(self.disconnect_reason().await),
578 };
579
580 rx_log_from_json(&json_values, self.max_log_length);
581
582 let mut responses = Vec::with_capacity(json_values.len());
583 let mut successful_calls = 0;
584 let mut failed_calls = 0;
585
586 for json_val in json_values {
587 match json_val {
588 Ok(val) => {
589 let result: R = serde_json::from_value(val).map_err(Error::ParseError)?;
590 responses.push(Ok(result));
591 successful_calls += 1;
592 }
593 Err(err) => {
594 responses.push(Err(err));
595 failed_calls += 1;
596 }
597 }
598 }
599 Ok(BatchResponse { successful_calls, failed_calls, responses })
600 }
601}
602
603#[async_trait]
604impl SubscriptionClientT for Client {
605 #[instrument(name = "subscription", fields(method = subscribe_method), skip(self, params, subscribe_method, unsubscribe_method), level = "trace")]
610 async fn subscribe<'a, Notif, Params>(
611 &self,
612 subscribe_method: &'a str,
613 params: Params,
614 unsubscribe_method: &'a str,
615 ) -> Result<Subscription<Notif>, Error>
616 where
617 Params: ToRpcParams + Send,
618 Notif: DeserializeOwned,
619 {
620 if subscribe_method == unsubscribe_method {
621 return Err(RegisterMethodError::SubscriptionNameConflict(unsubscribe_method.to_owned()).into());
622 }
623
624 let id_sub = self.id_manager.next_request_id();
625 let id_unsub = self.id_manager.next_request_id();
626 let params = params.to_rpc_params()?;
627
628 let raw = serde_json::to_string(&RequestSer::borrowed(&id_sub, &subscribe_method, params.as_deref()))
629 .map_err(Error::ParseError)?;
630
631 tx_log_from_str(&raw, self.max_log_length);
632
633 let (send_back_tx, send_back_rx) = tokio::sync::oneshot::channel();
634 if self
635 .to_back
636 .clone()
637 .send(FrontToBack::Subscribe(SubscriptionMessage {
638 raw,
639 subscribe_id: id_sub,
640 unsubscribe_id: id_unsub.clone(),
641 unsubscribe_method: unsubscribe_method.to_owned(),
642 send_back: send_back_tx,
643 }))
644 .await
645 .is_err()
646 {
647 return Err(self.disconnect_reason().await);
648 }
649
650 let (notifs_rx, sub_id) = match call_with_timeout(self.request_timeout, send_back_rx).await {
651 Ok(Ok(val)) => val,
652 Ok(Err(err)) => return Err(err),
653 Err(_) => return Err(self.disconnect_reason().await),
654 };
655
656 rx_log_from_json(&Response::new(ResponsePayload::success_borrowed(&sub_id), id_unsub), self.max_log_length);
657
658 Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id)))
659 }
660
661 #[instrument(name = "subscribe_method", skip(self), level = "trace")]
663 async fn subscribe_to_method<'a, N>(&self, method: &'a str) -> Result<Subscription<N>, Error>
664 where
665 N: DeserializeOwned,
666 {
667 let (send_back_tx, send_back_rx) = oneshot::channel();
668 if self
669 .to_back
670 .clone()
671 .send(FrontToBack::RegisterNotification(RegisterNotificationMessage {
672 send_back: send_back_tx,
673 method: method.to_owned(),
674 }))
675 .await
676 .is_err()
677 {
678 return Err(self.disconnect_reason().await);
679 }
680
681 let res = call_with_timeout(self.request_timeout, send_back_rx).await;
682
683 let (rx, method) = match res {
684 Ok(Ok(val)) => val,
685 Ok(Err(err)) => return Err(err),
686 Err(_) => return Err(self.disconnect_reason().await),
687 };
688
689 Ok(Subscription::new(self.to_back.clone(), rx, SubscriptionKind::Method(method)))
690 }
691}
692
693fn handle_backend_messages<R: TransportReceiverT>(
697 message: Option<Result<ReceivedMessage, R::Error>>,
698 manager: &ThreadSafeRequestManager,
699 max_buffer_capacity_per_subscription: usize,
700) -> Result<Vec<FrontToBack>, Error> {
701 fn handle_recv_message(
703 raw: &[u8],
704 manager: &ThreadSafeRequestManager,
705 max_buffer_capacity_per_subscription: usize,
706 ) -> Result<Vec<FrontToBack>, Error> {
707 let first_non_whitespace = raw.iter().find(|byte| !byte.is_ascii_whitespace());
708 let mut messages = Vec::new();
709
710 match first_non_whitespace {
711 Some(b'{') => {
712 if let Ok(single) = serde_json::from_slice::<Response<_>>(raw) {
714 let maybe_unsub =
715 process_single_response(&mut manager.lock(), single, max_buffer_capacity_per_subscription)?;
716
717 if let Some(unsub) = maybe_unsub {
718 return Ok(vec![FrontToBack::Request(unsub)]);
719 }
720 }
721 else if let Ok(response) = serde_json::from_slice::<SubscriptionResponse<_>>(raw) {
723 if let Some(sub_id) = process_subscription_response(&mut manager.lock(), response) {
724 return Ok(vec![FrontToBack::SubscriptionClosed(sub_id)]);
725 }
726 }
727 else if let Ok(response) = serde_json::from_slice::<SubscriptionError<_>>(raw) {
729 process_subscription_close_response(&mut manager.lock(), response);
730 }
731 else if let Ok(notif) = serde_json::from_slice::<Notification>(raw) {
733 process_notification(&mut manager.lock(), notif);
734 } else {
735 return Err(unparse_error(raw));
736 }
737 }
738 Some(b'[') => {
739 if let Ok(raw_responses) = serde_json::from_slice::<Vec<&JsonRawValue>>(raw) {
741 let mut batch = Vec::with_capacity(raw_responses.len());
742
743 let mut range = None;
744 let mut got_notif = false;
745
746 for r in raw_responses {
747 if let Ok(response) = serde_json::from_str::<Response<_>>(r.get()) {
748 let id = response.id.try_parse_inner_as_number()?;
749 let result = ResponseSuccess::try_from(response).map(|s| s.result);
750 batch.push(InnerBatchResponse { id, result });
751
752 let r = range.get_or_insert(id..id);
753
754 if id < r.start {
755 r.start = id;
756 }
757
758 if id > r.end {
759 r.end = id;
760 }
761 } else if let Ok(response) = serde_json::from_str::<SubscriptionResponse<_>>(r.get()) {
762 got_notif = true;
763 if let Some(sub_id) = process_subscription_response(&mut manager.lock(), response) {
764 messages.push(FrontToBack::SubscriptionClosed(sub_id));
765 }
766 } else if let Ok(response) = serde_json::from_slice::<SubscriptionError<_>>(raw) {
767 got_notif = true;
768 process_subscription_close_response(&mut manager.lock(), response);
769 } else if let Ok(notif) = serde_json::from_str::<Notification>(r.get()) {
770 got_notif = true;
771 process_notification(&mut manager.lock(), notif);
772 } else {
773 return Err(unparse_error(raw));
774 };
775 }
776
777 if let Some(mut range) = range {
778 range.end += 1;
780 process_batch_response(&mut manager.lock(), batch, range)?;
781 } else if !got_notif {
782 return Err(EmptyBatchRequest.into());
783 }
784 } else {
785 return Err(unparse_error(raw));
786 }
787 }
788 _ => {
789 return Err(unparse_error(raw));
790 }
791 };
792
793 Ok(messages)
794 }
795
796 match message {
797 Some(Ok(ReceivedMessage::Pong)) => {
798 tracing::debug!(target: LOG_TARGET, "Received pong");
799 Ok(vec![])
800 }
801 Some(Ok(ReceivedMessage::Bytes(raw))) => {
802 handle_recv_message(raw.as_ref(), manager, max_buffer_capacity_per_subscription)
803 }
804 Some(Ok(ReceivedMessage::Text(raw))) => {
805 handle_recv_message(raw.as_ref(), manager, max_buffer_capacity_per_subscription)
806 }
807 Some(Err(e)) => Err(Error::Transport(e.into())),
808 None => Err(Error::Custom("TransportReceiver dropped".into())),
809 }
810}
811
812async fn handle_frontend_messages<S: TransportSenderT>(
814 message: FrontToBack,
815 manager: &ThreadSafeRequestManager,
816 sender: &mut S,
817 max_buffer_capacity_per_subscription: usize,
818) -> Result<(), S::Error> {
819 match message {
820 FrontToBack::Batch(batch) => {
821 if let Err(send_back) = manager.lock().insert_pending_batch(batch.ids.clone(), batch.send_back) {
822 tracing::debug!(target: LOG_TARGET, "Batch request already pending: {:?}", batch.ids);
823 let _ = send_back.send(Err(InvalidRequestId::Occupied(format!("{:?}", batch.ids)).into()));
824 return Ok(());
825 }
826
827 sender.send(batch.raw).await?;
828 }
829 FrontToBack::Notification(notif) => {
831 sender.send(notif).await?;
832 }
833 FrontToBack::Request(request) => {
835 if let Err(send_back) = manager.lock().insert_pending_call(request.id.clone(), request.send_back) {
836 tracing::debug!(target: LOG_TARGET, "Denied duplicate method call");
837
838 if let Some(s) = send_back {
839 let _ = s.send(Err(InvalidRequestId::Occupied(request.id.to_string()).into()));
840 }
841 return Ok(());
842 }
843
844 sender.send(request.raw).await?;
845 }
846 FrontToBack::Subscribe(sub) => {
848 if let Err(send_back) = manager.lock().insert_pending_subscription(
849 sub.subscribe_id.clone(),
850 sub.unsubscribe_id.clone(),
851 sub.send_back,
852 sub.unsubscribe_method,
853 ) {
854 tracing::debug!(target: LOG_TARGET, "Denied duplicate subscription");
855
856 let _ = send_back.send(Err(InvalidRequestId::Occupied(format!(
857 "sub_id={}:req_id={}",
858 sub.subscribe_id, sub.unsubscribe_id
859 ))
860 .into()));
861 return Ok(());
862 }
863
864 sender.send(sub.raw).await?;
865 }
866 FrontToBack::SubscriptionClosed(sub_id) => {
868 tracing::trace!(target: LOG_TARGET, "Closing subscription: {:?}", sub_id);
869 let maybe_unsub = {
873 let m = &mut *manager.lock();
874
875 m.get_request_id_by_subscription_id(&sub_id)
876 .and_then(|req_id| build_unsubscribe_message(m, req_id, sub_id))
877 };
878
879 if let Some(unsub) = maybe_unsub {
880 stop_subscription::<S>(sender, unsub).await?;
881 }
882 }
883 FrontToBack::RegisterNotification(reg) => {
885 let (subscribe_tx, subscribe_rx) = subscription_channel(max_buffer_capacity_per_subscription);
886
887 if manager.lock().insert_notification_handler(®.method, subscribe_tx).is_ok() {
888 let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
889 } else {
890 let _ = reg.send_back.send(Err(RegisterMethodError::AlreadyRegistered(reg.method).into()));
891 }
892 }
893 FrontToBack::UnregisterNotification(method) => {
895 let _ = manager.lock().remove_notification_handler(&method);
896 }
897 };
898
899 Ok(())
900}
901
902fn unparse_error(raw: &[u8]) -> Error {
903 let json = serde_json::from_slice::<serde_json::Value>(raw);
904
905 let json_str = match json {
906 Ok(json) => serde_json::to_string(&json).expect("valid JSON; qed"),
907 Err(e) => e.to_string(),
908 };
909
910 Error::Custom(format!("Unparseable message: {json_str}"))
911}
912
913struct SendTaskParams<T: TransportSenderT, S> {
914 sender: T,
915 from_frontend: mpsc::Receiver<FrontToBack>,
916 close_tx: mpsc::Sender<Result<(), Error>>,
917 manager: ThreadSafeRequestManager,
918 max_buffer_capacity_per_subscription: usize,
919 ping_interval: IntervalStream<S>,
920}
921
922async fn send_task<T, S>(params: SendTaskParams<T, S>)
923where
924 T: TransportSenderT,
925 S: Stream + Unpin,
926{
927 let SendTaskParams {
928 mut sender,
929 mut from_frontend,
930 close_tx,
931 manager,
932 max_buffer_capacity_per_subscription,
933 mut ping_interval,
934 } = params;
935
936 let res = loop {
939 tokio::select! {
940 biased;
941 _ = close_tx.closed() => break Ok(()),
942 maybe_msg = from_frontend.recv() => {
943 let Some(msg) = maybe_msg else {
944 break Ok(());
945 };
946
947 if let Err(e) =
948 handle_frontend_messages(msg, &manager, &mut sender, max_buffer_capacity_per_subscription).await
949 {
950 tracing::debug!(target: LOG_TARGET, "ws send failed: {e}");
951 break Err(Error::Transport(e.into()));
952 }
953 }
954 _ = ping_interval.next() => {
955 if let Err(err) = sender.send_ping().await {
956 tracing::debug!(target: LOG_TARGET, "Send ws ping failed: {err}");
957 break Err(Error::Transport(err.into()));
958 }
959 }
960 }
961 };
962
963 from_frontend.close();
964 let _ = sender.close().await;
965 let _ = close_tx.send(res).await;
966}
967
968struct ReadTaskParams<R: TransportReceiverT, S> {
969 receiver: R,
970 close_tx: mpsc::Sender<Result<(), Error>>,
971 to_send_task: mpsc::Sender<FrontToBack>,
972 manager: ThreadSafeRequestManager,
973 max_buffer_capacity_per_subscription: usize,
974 inactivity_check: InactivityCheck,
975 inactivity_stream: IntervalStream<S>,
976}
977
978async fn read_task<R, S>(params: ReadTaskParams<R, S>)
979where
980 R: TransportReceiverT,
981 S: Stream + Unpin,
982{
983 let ReadTaskParams {
984 receiver,
985 close_tx,
986 to_send_task,
987 manager,
988 max_buffer_capacity_per_subscription,
989 mut inactivity_check,
990 mut inactivity_stream,
991 } = params;
992
993 let backend_event = futures_util::stream::unfold(receiver, |mut receiver| async {
994 let res = receiver.receive().await;
995 Some((res, receiver))
996 });
997
998 let pending_unsubscribes = MaybePendingFutures::new();
1004
1005 tokio::pin!(backend_event, pending_unsubscribes);
1006
1007 let res = loop {
1009 tokio::select! {
1010 biased;
1012 _ = close_tx.closed() => break Ok(()),
1013 _ = pending_unsubscribes.next() => (),
1015 maybe_msg = backend_event.next() => {
1017 inactivity_check.mark_as_active();
1018 let Some(msg) = maybe_msg else { break Ok(()) };
1019
1020 match handle_backend_messages::<R>(Some(msg), &manager, max_buffer_capacity_per_subscription) {
1021 Ok(messages) => {
1022 for msg in messages {
1023 pending_unsubscribes.push(to_send_task.send(msg));
1024 }
1025 }
1026 Err(e) => {
1027 tracing::debug!(target: LOG_TARGET, "Failed to read message: {e}");
1028 break Err(e);
1029 }
1030 }
1031 }
1032 _ = inactivity_stream.next() => {
1033 if inactivity_check.is_inactive() {
1034 break Err(Error::Transport("WebSocket ping/pong inactive".into()));
1035 }
1036 }
1037 }
1038 };
1039
1040 let _ = close_tx.send(res).await;
1041}
1042
1043async fn wait_for_shutdown(
1044 mut close_rx: mpsc::Receiver<Result<(), Error>>,
1045 client_dropped: oneshot::Receiver<()>,
1046 err_to_front: SharedDisconnectReason,
1047) {
1048 let rx_item = close_rx.recv();
1049
1050 tokio::pin!(rx_item);
1051
1052 if let Either::Left((Some(Err(err)), _)) = future::select(rx_item, client_dropped).await {
1054 *err_to_front.write().expect(NOT_POISONED) = Some(Arc::new(err));
1055 }
1056}