1mod helpers;
30mod manager;
31mod rpc_service;
32mod utils;
33
34pub use rpc_service::RpcService;
35
36use std::borrow::Cow as StdCow;
37use std::time::Duration;
38
39use crate::JsonRawValue;
40use crate::client::async_client::helpers::process_subscription_close_response;
41use crate::client::async_client::utils::MaybePendingFutures;
42use crate::client::{
43 BatchResponse, ClientT, Error, ReceivedMessage, RegisterNotificationMessage, Subscription, SubscriptionClientT,
44 SubscriptionKind, TransportReceiverT, TransportSenderT,
45};
46use crate::error::RegisterMethodError;
47use crate::middleware::layer::{RpcLogger, RpcLoggerLayer};
48use crate::middleware::{Batch, IsBatch, IsSubscription, Request, RpcServiceBuilder, RpcServiceT};
49use crate::params::{BatchRequestBuilder, EmptyBatchRequest};
50use crate::traits::ToRpcParams;
51use futures_util::Stream;
52use futures_util::future::{self, Either};
53use futures_util::stream::StreamExt;
54use helpers::{
55 build_unsubscribe_message, call_with_timeout, process_batch_response, process_notification,
56 process_single_response, process_subscription_response, stop_subscription,
57};
58use http::Extensions;
59use jsonrpsee_types::response::SubscriptionError;
60use jsonrpsee_types::{InvalidRequestId, ResponseSuccess, TwoPointZero};
61use jsonrpsee_types::{Response, SubscriptionResponse};
62use manager::RequestManager;
63use serde::de::DeserializeOwned;
64use std::sync::Arc;
65use tokio::sync::{mpsc, oneshot};
66use tower::layer::util::Identity;
67
68use self::utils::{InactivityCheck, IntervalStream};
69use super::{
70 FrontToBack, IdKind, MiddlewareBatchResponse, MiddlewareMethodResponse, MiddlewareNotifResponse, RequestIdManager,
71 generate_batch_id_range, subscription_channel,
72};
73
74pub(crate) type Notification<'a> = jsonrpsee_types::Notification<'a, Option<Box<JsonRawValue>>>;
75
76type Logger = tower::layer::util::Stack<RpcLoggerLayer, tower::layer::util::Identity>;
77
78const LOG_TARGET: &str = "jsonrpsee-client";
79const NOT_POISONED: &str = "Not poisoned; qed";
80
81#[derive(Debug, Copy, Clone)]
94pub struct PingConfig {
95 pub(crate) ping_interval: Duration,
97 pub(crate) inactive_limit: Duration,
99 pub(crate) max_failures: usize,
101}
102
103impl Default for PingConfig {
104 fn default() -> Self {
105 Self { ping_interval: Duration::from_secs(30), max_failures: 1, inactive_limit: Duration::from_secs(40) }
106 }
107}
108
109impl PingConfig {
110 pub fn new() -> Self {
112 Self::default()
113 }
114
115 pub fn ping_interval(mut self, ping_interval: Duration) -> Self {
117 self.ping_interval = ping_interval;
118 self
119 }
120
121 pub fn inactive_limit(mut self, inactivity_limit: Duration) -> Self {
127 self.inactive_limit = inactivity_limit;
128 self
129 }
130
131 pub fn max_failures(mut self, max: usize) -> Self {
138 assert!(max > 0);
139 self.max_failures = max;
140 self
141 }
142}
143
144#[derive(Debug, Default, Clone)]
145pub(crate) struct ThreadSafeRequestManager(Arc<std::sync::Mutex<RequestManager>>);
146
147impl ThreadSafeRequestManager {
148 pub(crate) fn new() -> Self {
149 Self::default()
150 }
151
152 pub(crate) fn lock(&self) -> std::sync::MutexGuard<'_, RequestManager> {
153 self.0.lock().expect(NOT_POISONED)
154 }
155}
156
157pub(crate) type SharedDisconnectReason = Arc<std::sync::RwLock<Option<Arc<Error>>>>;
158
159#[derive(Debug)]
164struct ErrorFromBack {
165 conn: mpsc::Sender<FrontToBack>,
166 disconnect_reason: SharedDisconnectReason,
167}
168
169impl ErrorFromBack {
170 fn new(conn: mpsc::Sender<FrontToBack>, disconnect_reason: SharedDisconnectReason) -> Self {
171 Self { conn, disconnect_reason }
172 }
173
174 async fn read_error(&self) -> Error {
175 self.conn.closed().await;
177
178 if let Some(err) = self.disconnect_reason.read().expect(NOT_POISONED).as_ref() {
179 Error::RestartNeeded(err.clone())
180 } else {
181 Error::Custom("Error reason could not be found. This is a bug. Please open an issue.".to_string())
182 }
183 }
184}
185
186#[derive(Debug, Clone)]
188pub struct ClientBuilder<L = Logger> {
189 request_timeout: Duration,
190 max_concurrent_requests: usize,
191 max_buffer_capacity_per_subscription: usize,
192 id_kind: IdKind,
193 ping_config: Option<PingConfig>,
194 tcp_no_delay: bool,
195 service_builder: RpcServiceBuilder<L>,
196}
197
198impl Default for ClientBuilder {
199 fn default() -> Self {
200 Self {
201 request_timeout: Duration::from_secs(60),
202 max_concurrent_requests: 256,
203 max_buffer_capacity_per_subscription: 1024,
204 id_kind: IdKind::Number,
205 ping_config: None,
206 tcp_no_delay: true,
207 service_builder: RpcServiceBuilder::default().rpc_logger(1024),
208 }
209 }
210}
211
212impl ClientBuilder<Identity> {
213 pub fn new() -> ClientBuilder {
215 ClientBuilder::default()
216 }
217}
218
219impl<L> ClientBuilder<L> {
220 pub fn request_timeout(mut self, timeout: Duration) -> Self {
222 self.request_timeout = timeout;
223 self
224 }
225
226 pub fn max_concurrent_requests(mut self, max: usize) -> Self {
228 self.max_concurrent_requests = max;
229 self
230 }
231
232 pub fn max_buffer_capacity_per_subscription(mut self, max: usize) -> Self {
244 assert!(max > 0);
245 self.max_buffer_capacity_per_subscription = max;
246 self
247 }
248
249 pub fn id_format(mut self, id_kind: IdKind) -> Self {
251 self.id_kind = id_kind;
252 self
253 }
254
255 pub fn enable_ws_ping(mut self, cfg: PingConfig) -> Self {
261 self.ping_config = Some(cfg);
262 self
263 }
264
265 pub fn disable_ws_ping(mut self) -> Self {
269 self.ping_config = None;
270 self
271 }
272
273 pub fn set_tcp_no_delay(mut self, no_delay: bool) -> Self {
279 self.tcp_no_delay = no_delay;
280 self
281 }
282
283 pub fn set_rpc_middleware<T>(self, service_builder: RpcServiceBuilder<T>) -> ClientBuilder<T> {
288 ClientBuilder {
289 request_timeout: self.request_timeout,
290 max_concurrent_requests: self.max_concurrent_requests,
291 max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription,
292 id_kind: self.id_kind,
293 ping_config: self.ping_config,
294 tcp_no_delay: self.tcp_no_delay,
295 service_builder,
296 }
297 }
298
299 #[cfg(feature = "async-client")]
305 #[cfg_attr(docsrs, doc(cfg(feature = "async-client")))]
306 pub fn build_with_tokio<S, R, Svc>(self, sender: S, receiver: R) -> Client<Svc>
307 where
308 S: TransportSenderT + Send,
309 R: TransportReceiverT + Send,
310 L: tower::Layer<RpcService, Service = Svc> + Clone + Send + Sync + 'static,
311 {
312 let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
313 let disconnect_reason = SharedDisconnectReason::default();
314 let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription;
315 let (client_dropped_tx, client_dropped_rx) = oneshot::channel();
316 let (send_receive_task_sync_tx, send_receive_task_sync_rx) = mpsc::channel(1);
317 let manager = ThreadSafeRequestManager::new();
318
319 let (ping_interval, inactivity_stream, inactivity_check) = match self.ping_config {
320 None => (IntervalStream::pending(), IntervalStream::pending(), InactivityCheck::Disabled),
321 Some(p) => {
322 let ping_interval = IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(
325 tokio::time::interval(p.ping_interval),
326 ));
327
328 let inactive_interval = {
329 let start = tokio::time::Instant::now() + p.inactive_limit;
330 IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
331 start,
332 p.inactive_limit,
333 )))
334 };
335
336 let inactivity_check = InactivityCheck::new(p.inactive_limit, p.max_failures);
337
338 (ping_interval, inactive_interval, inactivity_check)
339 }
340 };
341
342 tokio::spawn(send_task(SendTaskParams {
343 sender,
344 from_frontend: from_front,
345 close_tx: send_receive_task_sync_tx.clone(),
346 manager: manager.clone(),
347 max_buffer_capacity_per_subscription,
348 ping_interval,
349 }));
350
351 tokio::spawn(read_task(ReadTaskParams {
352 receiver,
353 close_tx: send_receive_task_sync_tx,
354 to_send_task: to_back.clone(),
355 manager,
356 max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription,
357 inactivity_check,
358 inactivity_stream,
359 }));
360
361 tokio::spawn(wait_for_shutdown(send_receive_task_sync_rx, client_dropped_rx, disconnect_reason.clone()));
362
363 Client {
364 to_back: to_back.clone(),
365 service: self.service_builder.service(RpcService::new(to_back.clone())),
366 request_timeout: self.request_timeout,
367 error: ErrorFromBack::new(to_back, disconnect_reason),
368 id_manager: RequestIdManager::new(self.id_kind),
369 on_exit: Some(client_dropped_tx),
370 }
371 }
372
373 #[cfg(all(feature = "async-wasm-client", target_arch = "wasm32"))]
375 #[cfg_attr(docsrs, doc(cfg(feature = "async-wasm-client")))]
376 pub fn build_with_wasm<S, R, Svc>(self, sender: S, receiver: R) -> Client<Svc>
377 where
378 S: TransportSenderT,
379 R: TransportReceiverT,
380 L: tower::Layer<RpcService, Service = Svc> + Clone + Send + Sync + 'static,
381 {
382 use futures_util::stream::Pending;
383
384 type PendingIntervalStream = IntervalStream<Pending<()>>;
385
386 let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
387 let disconnect_reason = SharedDisconnectReason::default();
388 let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription;
389 let (client_dropped_tx, client_dropped_rx) = oneshot::channel();
390 let (send_receive_task_sync_tx, send_receive_task_sync_rx) = mpsc::channel(1);
391 let manager = ThreadSafeRequestManager::new();
392
393 let ping_interval = PendingIntervalStream::pending();
394 let inactivity_stream = PendingIntervalStream::pending();
395 let inactivity_check = InactivityCheck::Disabled;
396
397 wasm_bindgen_futures::spawn_local(send_task(SendTaskParams {
398 sender,
399 from_frontend: from_front,
400 close_tx: send_receive_task_sync_tx.clone(),
401 manager: manager.clone(),
402 max_buffer_capacity_per_subscription,
403 ping_interval,
404 }));
405
406 wasm_bindgen_futures::spawn_local(read_task(ReadTaskParams {
407 receiver,
408 close_tx: send_receive_task_sync_tx,
409 to_send_task: to_back.clone(),
410 manager,
411 max_buffer_capacity_per_subscription: self.max_buffer_capacity_per_subscription,
412 inactivity_check,
413 inactivity_stream,
414 }));
415
416 wasm_bindgen_futures::spawn_local(wait_for_shutdown(
417 send_receive_task_sync_rx,
418 client_dropped_rx,
419 disconnect_reason.clone(),
420 ));
421
422 Client {
423 to_back: to_back.clone(),
424 service: self.service_builder.service(RpcService::new(to_back.clone())),
425 request_timeout: self.request_timeout,
426 error: ErrorFromBack::new(to_back, disconnect_reason),
427 id_manager: RequestIdManager::new(self.id_kind),
428 on_exit: Some(client_dropped_tx),
429 }
430 }
431}
432
433#[derive(Debug)]
435pub struct Client<L = RpcLogger<RpcService>> {
436 to_back: mpsc::Sender<FrontToBack>,
438 error: ErrorFromBack,
439 request_timeout: Duration,
441 id_manager: RequestIdManager,
443 on_exit: Option<oneshot::Sender<()>>,
445 service: L,
446}
447
448impl Client<Identity> {
449 pub fn builder() -> ClientBuilder {
451 ClientBuilder::new()
452 }
453}
454
455impl<L> Client<L> {
456 pub fn is_connected(&self) -> bool {
458 !self.to_back.is_closed()
459 }
460
461 async fn run_future_until_timeout<T>(&self, fut: impl Future<Output = Result<T, Error>>) -> Result<T, Error> {
462 tokio::pin!(fut);
463
464 match futures_util::future::select(fut, futures_timer::Delay::new(self.request_timeout)).await {
465 Either::Left((Ok(r), _)) => Ok(r),
466 Either::Left((Err(Error::ServiceDisconnect), _)) => Err(self.on_disconnect().await),
467 Either::Left((Err(e), _)) => Err(e),
468 Either::Right(_) => Err(Error::RequestTimeout),
469 }
470 }
471
472 pub async fn on_disconnect(&self) -> Error {
479 self.error.read_error().await
480 }
481
482 pub fn request_timeout(&self) -> Duration {
484 self.request_timeout
485 }
486}
487
488impl<L> Drop for Client<L> {
489 fn drop(&mut self) {
490 if let Some(e) = self.on_exit.take() {
491 let _ = e.send(());
492 }
493 }
494}
495
496impl<L> ClientT for Client<L>
497where
498 L: RpcServiceT<
499 MethodResponse = Result<MiddlewareMethodResponse, Error>,
500 BatchResponse = Result<MiddlewareBatchResponse, Error>,
501 NotificationResponse = Result<MiddlewareNotifResponse, Error>,
502 > + Send
503 + Sync,
504{
505 fn notification<Params>(&self, method: &str, params: Params) -> impl Future<Output = Result<(), Error>> + Send
506 where
507 Params: ToRpcParams + Send,
508 {
509 async {
510 let _req_id = self.id_manager.next_request_id();
512 let params = params.to_rpc_params()?.map(StdCow::Owned);
513 let fut = self.service.notification(jsonrpsee_types::Notification::new(method.into(), params));
514 self.run_future_until_timeout(fut).await?;
515 Ok(())
516 }
517 }
518
519 fn request<R, Params>(&self, method: &str, params: Params) -> impl Future<Output = Result<R, Error>> + Send
520 where
521 R: DeserializeOwned,
522 Params: ToRpcParams + Send,
523 {
524 async {
525 let id = self.id_manager.next_request_id();
526 let params = params.to_rpc_params()?;
527 let fut = self.service.call(Request::borrowed(method, params.as_deref(), id.clone()));
528 let rp = self.run_future_until_timeout(fut).await?;
529 let success = ResponseSuccess::try_from(rp.into_response().into_inner())?;
530
531 serde_json::from_str(success.result.get()).map_err(Into::into)
532 }
533 }
534
535 fn batch_request<'a, R>(
536 &self,
537 batch: BatchRequestBuilder<'a>,
538 ) -> impl Future<Output = Result<BatchResponse<'a, R>, Error>> + Send
539 where
540 R: DeserializeOwned + 'a,
541 {
542 async {
543 let batch = batch.build()?;
544 let id = self.id_manager.next_request_id();
545 let id_range = generate_batch_id_range(id, batch.len() as u64)?;
546
547 let mut b = Batch::with_capacity(batch.len());
548
549 for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
550 b.push(Request {
551 jsonrpc: TwoPointZero,
552 id: self.id_manager.as_id_kind().into_id(id),
553 method: method.into(),
554 params: params.map(StdCow::Owned),
555 extensions: Extensions::new(),
556 });
557 }
558
559 b.extensions_mut().insert(IsBatch { id_range });
560
561 let fut = self.service.batch(b);
562 let json_values = self.run_future_until_timeout(fut).await?;
563
564 let mut responses = Vec::with_capacity(json_values.len());
565 let mut successful_calls = 0;
566 let mut failed_calls = 0;
567
568 for json_val in json_values {
569 match ResponseSuccess::try_from(json_val.into_inner()) {
570 Ok(val) => {
571 let result: R = serde_json::from_str(val.result.get()).map_err(Error::ParseError)?;
572 responses.push(Ok(result));
573 successful_calls += 1;
574 }
575 Err(err) => {
576 responses.push(Err(err));
577 failed_calls += 1;
578 }
579 }
580 }
581 Ok(BatchResponse { successful_calls, failed_calls, responses })
582 }
583 }
584}
585
586impl<L> SubscriptionClientT for Client<L>
587where
588 L: RpcServiceT<
589 MethodResponse = Result<MiddlewareMethodResponse, Error>,
590 BatchResponse = Result<MiddlewareBatchResponse, Error>,
591 NotificationResponse = Result<MiddlewareNotifResponse, Error>,
592 > + Send
593 + Sync,
594{
595 fn subscribe<'a, Notif, Params>(
600 &self,
601 subscribe_method: &'a str,
602 params: Params,
603 unsubscribe_method: &'a str,
604 ) -> impl Future<Output = Result<Subscription<Notif>, Error>> + Send
605 where
606 Params: ToRpcParams + Send,
607 Notif: DeserializeOwned,
608 {
609 async move {
610 if subscribe_method == unsubscribe_method {
611 return Err(RegisterMethodError::SubscriptionNameConflict(unsubscribe_method.to_owned()).into());
612 }
613
614 let req_id_sub = self.id_manager.next_request_id();
615 let req_id_unsub = self.id_manager.next_request_id();
616 let params = params.to_rpc_params()?;
617
618 let mut ext = Extensions::new();
619 ext.insert(IsSubscription::new(req_id_sub.clone(), req_id_unsub, unsubscribe_method.to_owned()));
620
621 let req = Request {
622 jsonrpc: TwoPointZero,
623 id: req_id_sub,
624 method: subscribe_method.into(),
625 params: params.map(StdCow::Owned),
626 extensions: ext,
627 };
628
629 let fut = self.service.call(req);
630 let sub = self
631 .run_future_until_timeout(fut)
632 .await?
633 .into_subscription()
634 .expect("Extensions set to subscription, must return subscription; qed");
635 Ok(Subscription::new(self.to_back.clone(), sub.stream, SubscriptionKind::Subscription(sub.sub_id)))
636 }
637 }
638
639 fn subscribe_to_method<N>(&self, method: &str) -> impl Future<Output = Result<Subscription<N>, Error>> + Send
641 where
642 N: DeserializeOwned,
643 {
644 async {
645 let (send_back_tx, send_back_rx) = oneshot::channel();
646 if self
647 .to_back
648 .clone()
649 .send(FrontToBack::RegisterNotification(RegisterNotificationMessage {
650 send_back: send_back_tx,
651 method: method.to_owned(),
652 }))
653 .await
654 .is_err()
655 {
656 return Err(self.on_disconnect().await);
657 }
658
659 let res = call_with_timeout(self.request_timeout, send_back_rx).await;
660
661 let (rx, method) = match res {
662 Ok(Ok(val)) => val,
663 Ok(Err(err)) => return Err(err),
664 Err(_) => return Err(self.on_disconnect().await),
665 };
666
667 Ok(Subscription::new(self.to_back.clone(), rx, SubscriptionKind::Method(method)))
668 }
669 }
670}
671
672fn handle_backend_messages<R: TransportReceiverT>(
676 message: Option<Result<ReceivedMessage, R::Error>>,
677 manager: &ThreadSafeRequestManager,
678 max_buffer_capacity_per_subscription: usize,
679) -> Result<Vec<FrontToBack>, Error> {
680 fn handle_recv_message(
682 raw: &[u8],
683 manager: &ThreadSafeRequestManager,
684 max_buffer_capacity_per_subscription: usize,
685 ) -> Result<Vec<FrontToBack>, Error> {
686 let first_non_whitespace = raw.iter().find(|byte| !byte.is_ascii_whitespace());
687 let mut messages = Vec::new();
688
689 tracing::trace!(target: LOG_TARGET, "rx: {}", serde_json::from_slice::<&JsonRawValue>(raw).map_or("<invalid json>", |v| v.get()));
690
691 match first_non_whitespace {
692 Some(b'{') => {
693 if let Ok(single) = serde_json::from_slice::<Response<_>>(raw) {
695 let maybe_unsub = process_single_response(
696 &mut manager.lock(),
697 single.into_owned().into(),
698 max_buffer_capacity_per_subscription,
699 )?;
700
701 if let Some(unsub) = maybe_unsub {
702 return Ok(vec![FrontToBack::Request(unsub)]);
703 }
704 }
705 else if let Ok(response) = serde_json::from_slice::<SubscriptionResponse<_>>(raw) {
707 if let Some(sub_id) = process_subscription_response(&mut manager.lock(), response) {
708 return Ok(vec![FrontToBack::SubscriptionClosed(sub_id)]);
709 }
710 }
711 else if let Ok(response) = serde_json::from_slice::<SubscriptionError<_>>(raw) {
713 process_subscription_close_response(&mut manager.lock(), response);
714 }
715 else if let Ok(notif) = serde_json::from_slice::<Notification>(raw) {
717 process_notification(&mut manager.lock(), notif);
718 } else {
719 return Err(unparse_error(raw));
720 }
721 }
722 Some(b'[') => {
723 if let Ok(raw_responses) = serde_json::from_slice::<Vec<&JsonRawValue>>(raw) {
725 let mut batch = Vec::with_capacity(raw_responses.len());
726
727 let mut range = None;
728 let mut got_notif = false;
729
730 for r in raw_responses {
731 if let Ok(response) = serde_json::from_str::<Response<_>>(r.get()) {
732 let id = response.id.try_parse_inner_as_number()?;
733 batch.push(response.into_owned().into());
734
735 let r = range.get_or_insert(id..id);
736
737 if id < r.start {
738 r.start = id;
739 }
740
741 if id > r.end {
742 r.end = id;
743 }
744 } else if let Ok(response) = serde_json::from_str::<SubscriptionResponse<_>>(r.get()) {
745 got_notif = true;
746 if let Some(sub_id) = process_subscription_response(&mut manager.lock(), response) {
747 messages.push(FrontToBack::SubscriptionClosed(sub_id));
748 }
749 } else if let Ok(response) = serde_json::from_slice::<SubscriptionError<_>>(raw) {
750 got_notif = true;
751 process_subscription_close_response(&mut manager.lock(), response);
752 } else if let Ok(notif) = serde_json::from_str::<Notification>(r.get()) {
753 got_notif = true;
754 process_notification(&mut manager.lock(), notif);
755 } else {
756 return Err(unparse_error(raw));
757 };
758 }
759
760 if let Some(mut range) = range {
761 range.end += 1;
763 process_batch_response(&mut manager.lock(), batch, range)?;
764 } else if !got_notif {
765 return Err(EmptyBatchRequest.into());
766 }
767 } else {
768 return Err(unparse_error(raw));
769 }
770 }
771 _ => {
772 return Err(unparse_error(raw));
773 }
774 };
775
776 Ok(messages)
777 }
778
779 match message {
780 Some(Ok(ReceivedMessage::Pong)) => {
781 tracing::debug!(target: LOG_TARGET, "Received pong");
782 Ok(vec![])
783 }
784 Some(Ok(ReceivedMessage::Bytes(raw))) => {
785 handle_recv_message(raw.as_ref(), manager, max_buffer_capacity_per_subscription)
786 }
787 Some(Ok(ReceivedMessage::Text(raw))) => {
788 handle_recv_message(raw.as_ref(), manager, max_buffer_capacity_per_subscription)
789 }
790 Some(Err(e)) => Err(Error::Transport(e.into())),
791 None => Err(Error::Custom("TransportReceiver dropped".into())),
792 }
793}
794
795async fn handle_frontend_messages<S: TransportSenderT>(
797 message: FrontToBack,
798 manager: &ThreadSafeRequestManager,
799 sender: &mut S,
800 max_buffer_capacity_per_subscription: usize,
801) -> Result<(), S::Error> {
802 match message {
803 FrontToBack::Batch(batch) => {
804 if let Err(send_back) = manager.lock().insert_pending_batch(batch.ids.clone(), batch.send_back) {
805 tracing::debug!(target: LOG_TARGET, "Batch request already pending: {:?}", batch.ids);
806 let _ = send_back.send(Err(InvalidRequestId::Occupied(format!("{:?}", batch.ids))));
807 return Ok(());
808 }
809
810 sender.send(batch.raw).await?;
811 }
812 FrontToBack::Notification(notif) => {
814 sender.send(notif).await?;
815 }
816 FrontToBack::Request(request) => {
818 if let Err(send_back) = manager.lock().insert_pending_call(request.id.clone(), request.send_back) {
819 tracing::debug!(target: LOG_TARGET, "Denied duplicate method call");
820
821 if let Some(s) = send_back {
822 let _ = s.send(Err(InvalidRequestId::Occupied(request.id.to_string())));
823 }
824 return Ok(());
825 }
826
827 sender.send(request.raw).await?;
828 }
829 FrontToBack::Subscribe(sub) => {
831 if let Err(send_back) = manager.lock().insert_pending_subscription(
832 sub.subscribe_id.clone(),
833 sub.unsubscribe_id.clone(),
834 sub.send_back,
835 sub.unsubscribe_method,
836 ) {
837 tracing::debug!(target: LOG_TARGET, "Denied duplicate subscription");
838
839 let _ = send_back.send(Err(InvalidRequestId::Occupied(format!(
840 "sub_id={}:req_id={}",
841 sub.subscribe_id, sub.unsubscribe_id
842 ))
843 .into()));
844 return Ok(());
845 }
846
847 sender.send(sub.raw).await?;
848 }
849 FrontToBack::SubscriptionClosed(sub_id) => {
851 tracing::trace!(target: LOG_TARGET, "Closing subscription: {:?}", sub_id);
852 let maybe_unsub = {
856 let m = &mut *manager.lock();
857
858 m.get_request_id_by_subscription_id(&sub_id)
859 .and_then(|req_id| build_unsubscribe_message(m, req_id, sub_id))
860 };
861
862 if let Some(unsub) = maybe_unsub {
863 stop_subscription::<S>(sender, unsub).await?;
864 }
865 }
866 FrontToBack::RegisterNotification(reg) => {
868 let (subscribe_tx, subscribe_rx) = subscription_channel(max_buffer_capacity_per_subscription);
869
870 if manager.lock().insert_notification_handler(®.method, subscribe_tx).is_ok() {
871 let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
872 } else {
873 let _ = reg.send_back.send(Err(RegisterMethodError::AlreadyRegistered(reg.method).into()));
874 }
875 }
876 FrontToBack::UnregisterNotification(method) => {
878 let _ = manager.lock().remove_notification_handler(&method);
879 }
880 };
881
882 Ok(())
883}
884
885fn unparse_error(raw: &[u8]) -> Error {
886 let json = serde_json::from_slice::<serde_json::Value>(raw);
887
888 let json_str = match json {
889 Ok(json) => serde_json::to_string(&json).expect("valid JSON; qed"),
890 Err(e) => e.to_string(),
891 };
892
893 Error::Custom(format!("Unparseable message: {json_str}"))
894}
895
896struct SendTaskParams<T: TransportSenderT, S> {
897 sender: T,
898 from_frontend: mpsc::Receiver<FrontToBack>,
899 close_tx: mpsc::Sender<Result<(), Error>>,
900 manager: ThreadSafeRequestManager,
901 max_buffer_capacity_per_subscription: usize,
902 ping_interval: IntervalStream<S>,
903}
904
905async fn send_task<T, S>(params: SendTaskParams<T, S>)
906where
907 T: TransportSenderT,
908 S: Stream + Unpin,
909{
910 let SendTaskParams {
911 mut sender,
912 mut from_frontend,
913 close_tx,
914 manager,
915 max_buffer_capacity_per_subscription,
916 mut ping_interval,
917 } = params;
918
919 let res = loop {
922 tokio::select! {
923 biased;
924 _ = close_tx.closed() => break Ok(()),
925 maybe_msg = from_frontend.recv() => {
926 let Some(msg) = maybe_msg else {
927 break Ok(());
928 };
929
930 if let Err(e) =
931 handle_frontend_messages(msg, &manager, &mut sender, max_buffer_capacity_per_subscription).await
932 {
933 tracing::debug!(target: LOG_TARGET, "ws send failed: {e}");
934 break Err(Error::Transport(e.into()));
935 }
936 }
937 _ = ping_interval.next() => {
938 if let Err(err) = sender.send_ping().await {
939 tracing::debug!(target: LOG_TARGET, "Send ws ping failed: {err}");
940 break Err(Error::Transport(err.into()));
941 }
942 }
943 }
944 };
945
946 from_frontend.close();
947 let _ = sender.close().await;
948 let _ = close_tx.send(res).await;
949}
950
951struct ReadTaskParams<R: TransportReceiverT, S> {
952 receiver: R,
953 close_tx: mpsc::Sender<Result<(), Error>>,
954 to_send_task: mpsc::Sender<FrontToBack>,
955 manager: ThreadSafeRequestManager,
956 max_buffer_capacity_per_subscription: usize,
957 inactivity_check: InactivityCheck,
958 inactivity_stream: IntervalStream<S>,
959}
960
961async fn read_task<R, S>(params: ReadTaskParams<R, S>)
962where
963 R: TransportReceiverT,
964 S: Stream + Unpin,
965{
966 let ReadTaskParams {
967 receiver,
968 close_tx,
969 to_send_task,
970 manager,
971 max_buffer_capacity_per_subscription,
972 mut inactivity_check,
973 mut inactivity_stream,
974 } = params;
975
976 let backend_event = futures_util::stream::unfold(receiver, |mut receiver| async {
977 let res = receiver.receive().await;
978 Some((res, receiver))
979 });
980
981 let pending_unsubscribes = MaybePendingFutures::new();
987
988 tokio::pin!(backend_event, pending_unsubscribes);
989
990 let res = loop {
992 tokio::select! {
993 biased;
995 _ = close_tx.closed() => break Ok(()),
996 _ = pending_unsubscribes.next() => (),
998 maybe_msg = backend_event.next() => {
1000 inactivity_check.mark_as_active();
1001 let Some(msg) = maybe_msg else { break Ok(()) };
1002
1003 match handle_backend_messages::<R>(Some(msg), &manager, max_buffer_capacity_per_subscription) {
1004 Ok(messages) => {
1005 for msg in messages {
1006 pending_unsubscribes.push(to_send_task.send(msg));
1007 }
1008 }
1009 Err(e) => {
1010 tracing::debug!(target: LOG_TARGET, "Failed to read message: {e}");
1011 break Err(e);
1012 }
1013 }
1014 }
1015 _ = inactivity_stream.next() => {
1016 if inactivity_check.is_inactive() {
1017 break Err(Error::Transport("WebSocket ping/pong inactive".into()));
1018 }
1019 }
1020 }
1021 };
1022
1023 let _ = close_tx.send(res).await;
1024}
1025
1026async fn wait_for_shutdown(
1027 mut close_rx: mpsc::Receiver<Result<(), Error>>,
1028 client_dropped: oneshot::Receiver<()>,
1029 err_to_front: SharedDisconnectReason,
1030) {
1031 let rx_item = close_rx.recv();
1032
1033 tokio::pin!(rx_item);
1034
1035 if let Either::Left((Some(Err(err)), _)) = future::select(rx_item, client_dropped).await {
1037 *err_to_front.write().expect(NOT_POISONED) = Some(Arc::new(err));
1038 }
1039}