1#[cfg(not(target_arch = "wasm32"))]
8pub use self::native::Client;
9
10#[cfg(all(target_arch = "wasm32", feature = "wasm-bindgen"))]
11pub use self::wasm::Client;
12
13#[cfg(not(target_arch = "wasm32"))]
14mod native {
15 use std::fmt;
16 use std::result::Result;
17 use std::sync::Arc;
18 use std::time::Duration;
19
20 use http::{HeaderValue, header};
21 use jsonrpsee::core::ClientError;
22 use jsonrpsee::core::client::{BatchResponse, ClientT, Subscription, SubscriptionClientT};
23 use jsonrpsee::core::params::BatchRequestBuilder;
24 use jsonrpsee::core::traits::ToRpcParams;
25 use jsonrpsee::http_client::{HeaderMap, HttpClient, HttpClientBuilder};
26 use jsonrpsee::ws_client::{PingConfig, WsClient, WsClientBuilder};
27 use serde::de::DeserializeOwned;
28 use tokio::sync::RwLock;
29 use tracing::warn;
30
31 use crate::Error;
32
33 const MAX_RESPONSE_SIZE: usize = 256 * 1024 * 1024;
34
35 pub enum Client {
37 Http(HttpClient),
39 Ws(WsReconnectClient<WsClient>),
41 }
42
43 impl Client {
44 pub async fn new(
53 url: &str,
54 auth_token: Option<&str>,
55 connect_timeout: Option<Duration>,
56 request_timeout: Option<Duration>,
57 ) -> Result<Self, Error> {
58 let protocol = url.split_once(':').map(|(proto, _)| proto);
59 let client = match protocol {
60 Some("http") | Some("https") => {
61 let headers = build_headers(auth_token)?;
62 let mut builder = HttpClientBuilder::default()
63 .max_response_size(MAX_RESPONSE_SIZE as u32)
64 .set_headers(headers);
65 if let Some(timeout) = request_timeout {
66 builder = builder.request_timeout(timeout);
67 }
68 if connect_timeout.is_some() {
69 warn!("ignored connect_timeout: not supported with http(s)");
70 }
71 Client::Http(builder.build(url)?)
72 }
73 Some("ws") | Some("wss") => Client::Ws(
74 WsReconnectClient::new(url, auth_token, connect_timeout, request_timeout)
75 .await?,
76 ),
77 _ => return Err(Error::ProtocolNotSupported(url.into())),
78 };
79
80 Ok(client)
81 }
82 }
83
84 pub struct WsReconnectClient<C> {
85 state: RwLock<WsState<C>>,
86 build: BuildFn<C>,
87 }
88
89 struct WsState<C> {
90 inner: Arc<C>,
91 poisoned: bool,
92 epoch: u64,
93 }
94
95 type BuildFn<C> =
96 Arc<dyn Fn() -> futures_util::future::BoxFuture<'static, Result<C, Error>> + Send + Sync>;
97
98 impl WsReconnectClient<WsClient> {
99 async fn new(
100 url: &str,
101 auth_token: Option<&str>,
102 connect_timeout: Option<Duration>,
103 request_timeout: Option<Duration>,
104 ) -> Result<Self, Error> {
105 let url = url.to_owned();
106 let auth_token = auth_token.map(str::to_owned);
107 let build: BuildFn<WsClient> = Arc::new(move || {
108 let url = url.clone();
109 let auth_token = auth_token.clone();
110 Box::pin(async move {
111 build_ws_client(
112 &url,
113 auth_token.as_deref(),
114 connect_timeout,
115 request_timeout,
116 )
117 .await
118 })
119 });
120 WsReconnectClient::new_with_factory(build).await
121 }
122 }
123
124 impl<C> WsReconnectClient<C>
125 where
126 C: ClientT + SubscriptionClientT + Send + Sync + 'static,
127 {
128 async fn new_with_factory(build: BuildFn<C>) -> Result<Self, Error> {
129 let inner = Arc::new((build)().await?);
130 Ok(Self {
131 state: RwLock::new(WsState {
132 inner,
133 poisoned: false,
134 epoch: 0,
135 }),
136 build,
137 })
138 }
139
140 async fn get_inner(&self) -> Result<Arc<C>, ClientError> {
141 let (epoch, poisoned, inner) = {
142 let state = self.state.read().await;
143 (state.epoch, state.poisoned, state.inner.clone())
144 };
145 if !poisoned {
146 return Ok(inner);
147 }
148
149 let mut state = self.state.write().await;
150 if state.poisoned && state.epoch == epoch {
151 let new_inner = Arc::new(
152 (self.build)()
153 .await
154 .map_err(|err| ClientError::Custom(err.to_string()))?,
155 );
156 state.inner = new_inner;
157 state.poisoned = false;
158 }
159 Ok(state.inner.clone())
160 }
161
162 async fn mark_poisoned(&self) {
163 let mut state = self.state.write().await;
164 if !state.poisoned {
165 state.poisoned = true;
166 state.epoch += 1;
167 }
168 }
169
170 async fn notification<Params>(
171 &self,
172 method: &str,
173 params: Params,
174 ) -> Result<(), ClientError>
175 where
176 Params: ToRpcParams + Send,
177 {
178 let inner = self.get_inner().await?;
179 let err = match inner.notification(method, params).await {
180 Err(err @ ClientError::RestartNeeded(_)) => err,
181 res => return res,
182 };
183 self.mark_poisoned().await;
184 Err(err)
185 }
186
187 async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, ClientError>
188 where
189 R: DeserializeOwned,
190 Params: ToRpcParams + Send,
191 {
192 let inner = self.get_inner().await?;
193 let err = match inner.request(method, params).await {
194 Err(err @ ClientError::RestartNeeded(_)) => err,
195 res => return res,
196 };
197 self.mark_poisoned().await;
198 Err(err)
199 }
200
201 async fn subscribe<'a, N, Params>(
202 &self,
203 subscribe_method: &'a str,
204 params: Params,
205 unsubscribe_method: &'a str,
206 ) -> Result<Subscription<N>, ClientError>
207 where
208 Params: ToRpcParams + Send,
209 N: DeserializeOwned,
210 {
211 let inner = self.get_inner().await?;
212 let err = match inner
213 .subscribe(subscribe_method, params, unsubscribe_method)
214 .await
215 {
216 Err(err @ ClientError::RestartNeeded(_)) => err,
217 res => return res,
218 };
219 self.mark_poisoned().await;
220 Err(err)
221 }
222
223 async fn subscribe_to_method<N>(&self, method: &str) -> Result<Subscription<N>, ClientError>
224 where
225 N: DeserializeOwned,
226 {
227 let inner = self.get_inner().await?;
228 let err = match inner.subscribe_to_method(method).await {
229 Err(err @ ClientError::RestartNeeded(_)) => err,
230 res => return res,
231 };
232 self.mark_poisoned().await;
233 Err(err)
234 }
235 }
236
237 fn build_headers(auth_token: Option<&str>) -> Result<HeaderMap, Error> {
238 let mut headers = HeaderMap::new();
239
240 if let Some(token) = auth_token {
241 let val = HeaderValue::from_str(&format!("Bearer {token}"))?;
242 headers.insert(header::AUTHORIZATION, val);
243 }
244
245 Ok(headers)
246 }
247
248 async fn build_ws_client(
249 url: &str,
250 auth_token: Option<&str>,
251 connect_timeout: Option<Duration>,
252 request_timeout: Option<Duration>,
253 ) -> Result<WsClient, Error> {
254 let headers = build_headers(auth_token)?;
255 let mut builder = WsClientBuilder::default()
256 .max_response_size(MAX_RESPONSE_SIZE as u32)
257 .set_headers(headers)
258 .enable_ws_ping(PingConfig::default());
259 if let Some(timeout) = request_timeout {
260 builder = builder.request_timeout(timeout);
261 }
262 if let Some(timeout) = connect_timeout {
263 builder = builder.connection_timeout(timeout);
264 }
265 Ok(builder.build(url).await?)
266 }
267
268 impl ClientT for Client {
269 async fn notification<Params>(
270 &self,
271 method: &str,
272 params: Params,
273 ) -> Result<(), ClientError>
274 where
275 Params: ToRpcParams + Send,
276 {
277 match self {
278 Client::Http(client) => client.notification(method, params).await,
279 Client::Ws(client) => client.notification(method, params).await,
280 }
281 }
282
283 async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, ClientError>
284 where
285 R: DeserializeOwned,
286 Params: ToRpcParams + Send,
287 {
288 match self {
289 Client::Http(client) => client.request(method, params).await,
290 Client::Ws(client) => client.request(method, params).await,
291 }
292 }
293
294 async fn batch_request<'a, R>(
295 &self,
296 batch: BatchRequestBuilder<'a>,
297 ) -> Result<BatchResponse<'a, R>, ClientError>
298 where
299 R: DeserializeOwned + fmt::Debug + 'a,
300 {
301 match self {
302 Client::Http(client) => client.batch_request(batch).await,
303 Client::Ws(client) => {
304 let inner = client.get_inner().await?;
305 let err = match inner.batch_request(batch).await {
306 Err(err @ ClientError::RestartNeeded(_)) => err,
307 res => return res,
308 };
309 client.mark_poisoned().await;
310 Err(err)
311 }
312 }
313 }
314 }
315
316 impl SubscriptionClientT for Client {
317 async fn subscribe<'a, N, Params>(
318 &self,
319 subscribe_method: &'a str,
320 params: Params,
321 unsubscribe_method: &'a str,
322 ) -> Result<Subscription<N>, ClientError>
323 where
324 Params: ToRpcParams + Send,
325 N: DeserializeOwned,
326 {
327 match self {
328 Client::Http(client) => {
329 client
330 .subscribe(subscribe_method, params, unsubscribe_method)
331 .await
332 }
333 Client::Ws(client) => {
334 client
335 .subscribe(subscribe_method, params, unsubscribe_method)
336 .await
337 }
338 }
339 }
340
341 async fn subscribe_to_method<N>(&self, method: &str) -> Result<Subscription<N>, ClientError>
342 where
343 N: DeserializeOwned,
344 {
345 match self {
346 Client::Http(client) => client.subscribe_to_method(method).await,
347 Client::Ws(client) => client.subscribe_to_method(method).await,
348 }
349 }
350 }
351
352 impl fmt::Debug for Client {
353 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354 f.write_str("Client { .. }")
355 }
356 }
357
358 #[cfg(test)]
359 mod tests {
360 use std::sync::Arc;
361 use std::sync::atomic::{AtomicUsize, Ordering};
362
363 use jsonrpsee::core::ClientError;
364 use jsonrpsee::core::params::BatchRequestBuilder;
365 use serde::de::DeserializeOwned;
366 use serde_json::Value as JsonValue;
367 use tokio::join;
368 use tokio::sync::Barrier;
369
370 use super::{BuildFn, WsReconnectClient};
371
372 struct FakeWsClient {
373 remaining_failures: AtomicUsize,
374 response: JsonValue,
375 barrier: Option<Arc<tokio::sync::Barrier>>,
376 }
377
378 impl FakeWsClient {
379 fn new(remaining_failures: usize, response: JsonValue) -> Self {
380 Self {
381 remaining_failures: AtomicUsize::new(remaining_failures),
382 response,
383 barrier: None,
384 }
385 }
386
387 fn new_with_barrier(
388 remaining_failures: usize,
389 response: JsonValue,
390 barrier: Arc<tokio::sync::Barrier>,
391 ) -> Self {
392 Self {
393 remaining_failures: AtomicUsize::new(remaining_failures),
394 response,
395 barrier: Some(barrier),
396 }
397 }
398 }
399
400 impl jsonrpsee::core::client::ClientT for FakeWsClient {
401 async fn notification<Params>(
402 &self,
403 _method: &str,
404 _params: Params,
405 ) -> Result<(), ClientError>
406 where
407 Params: jsonrpsee::core::traits::ToRpcParams + Send,
408 {
409 Ok(())
410 }
411
412 fn request<R, Params>(
413 &self,
414 _method: &str,
415 _params: Params,
416 ) -> impl std::future::Future<Output = Result<R, ClientError>> + Send
417 where
418 R: DeserializeOwned,
419 Params: jsonrpsee::core::traits::ToRpcParams + Send,
420 {
421 let barrier = self.barrier.clone();
422 let response = self.response.clone();
423 async move {
424 if let Some(barrier) = barrier {
425 barrier.wait().await;
426 }
427 let should_fail = self
428 .remaining_failures
429 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
430 value.checked_sub(1)
431 })
432 .is_ok();
433 if should_fail {
434 return Err(ClientError::RestartNeeded(Arc::new(ClientError::Custom(
435 "restart".into(),
436 ))));
437 }
438 serde_json::from_value(response).map_err(ClientError::ParseError)
439 }
440 }
441
442 async fn batch_request<'a, R>(
443 &self,
444 _batch: BatchRequestBuilder<'a>,
445 ) -> Result<jsonrpsee::core::client::BatchResponse<'a, R>, ClientError>
446 where
447 R: DeserializeOwned + std::fmt::Debug + 'a,
448 {
449 Err(ClientError::Custom(
450 "batch_request not implemented in FakeWsClient".into(),
451 ))
452 }
453 }
454
455 impl jsonrpsee::core::client::SubscriptionClientT for FakeWsClient {
456 async fn subscribe<'a, N, Params>(
457 &self,
458 _subscribe_method: &'a str,
459 _params: Params,
460 _unsubscribe_method: &'a str,
461 ) -> Result<jsonrpsee::core::client::Subscription<N>, ClientError>
462 where
463 Params: jsonrpsee::core::traits::ToRpcParams + Send,
464 N: DeserializeOwned,
465 {
466 Err(ClientError::Custom(
467 "subscribe not implemented in FakeWsClient".into(),
468 ))
469 }
470
471 async fn subscribe_to_method<N>(
472 &self,
473 _method: &str,
474 ) -> Result<jsonrpsee::core::client::Subscription<N>, ClientError>
475 where
476 N: DeserializeOwned,
477 {
478 Err(ClientError::Custom(
479 "subscribe_to_method not implemented in FakeWsClient".into(),
480 ))
481 }
482 }
483
484 #[tokio::test]
485 async fn request_marks_poisoned_on_restart_needed() {
486 let build_count = Arc::new(AtomicUsize::new(0));
487 let response = serde_json::json!(7u64);
488 let build: BuildFn<FakeWsClient> = {
489 let build_count = build_count.clone();
490 Arc::new(move || {
491 let build_count = build_count.clone();
492 let response = response.clone();
493 Box::pin(async move {
494 let id = build_count.fetch_add(1, Ordering::SeqCst);
495 let failures = if id == 0 { 1 } else { 0 };
496 Ok(FakeWsClient::new(failures, response))
497 })
498 })
499 };
500
501 let client = WsReconnectClient::new_with_factory(build).await.unwrap();
502 let err = client
503 .request::<u64, _>("test", Vec::<u8>::new())
504 .await
505 .unwrap_err();
506 assert!(matches!(err, ClientError::RestartNeeded(_)));
507 assert_eq!(build_count.load(Ordering::SeqCst), 1);
508
509 let value: u64 = client.request("test", Vec::<u8>::new()).await.unwrap();
510 assert_eq!(value, 7);
511 assert_eq!(build_count.load(Ordering::SeqCst), 2);
512 }
513
514 #[tokio::test]
515 async fn concurrent_requests_share_single_reconnect() {
516 let build_count = Arc::new(AtomicUsize::new(0));
517 let response = serde_json::json!(5u64);
518 let build: BuildFn<FakeWsClient> = {
519 let build_count = build_count.clone();
520 Arc::new(move || {
521 let build_count = build_count.clone();
522 let response = response.clone();
523 Box::pin(async move {
524 let id = build_count.fetch_add(1, Ordering::SeqCst);
525 let failures = if id == 0 { 1 } else { 0 };
526 Ok(FakeWsClient::new(failures, response))
527 })
528 })
529 };
530
531 let client = WsReconnectClient::new_with_factory(build).await.unwrap();
532 let err = client
533 .request::<u64, _>("test", Vec::<u8>::new())
534 .await
535 .unwrap_err();
536 assert!(matches!(err, ClientError::RestartNeeded(_)));
537 assert_eq!(build_count.load(Ordering::SeqCst), 1);
538
539 let (a, b) = join!(
540 client.request::<u64, _>("test", Vec::<u8>::new()),
541 client.request::<u64, _>("test", Vec::<u8>::new())
542 );
543 assert_eq!(a.unwrap(), 5);
544 assert_eq!(b.unwrap(), 5);
545 assert_eq!(build_count.load(Ordering::SeqCst), 2);
546 }
547
548 #[tokio::test]
549 async fn concurrent_restart_needed_dedupes_reconnect() {
550 let build_count = Arc::new(AtomicUsize::new(0));
551 let response = serde_json::json!(9u64);
552 let barrier = Arc::new(Barrier::new(2));
553 let build: BuildFn<FakeWsClient> = {
554 let build_count = build_count.clone();
555 let barrier = barrier.clone();
556 Arc::new(move || {
557 let build_count = build_count.clone();
558 let response = response.clone();
559 let barrier = barrier.clone();
560 Box::pin(async move {
561 let id = build_count.fetch_add(1, Ordering::SeqCst);
562 let failures = if id == 0 { 2 } else { 0 };
563 if id == 0 {
564 Ok(FakeWsClient::new_with_barrier(failures, response, barrier))
565 } else {
566 Ok(FakeWsClient::new(failures, response))
567 }
568 })
569 })
570 };
571
572 let client = WsReconnectClient::new_with_factory(build).await.unwrap();
573 let (a, b) = join!(
574 client.request::<u64, _>("test", Vec::<u8>::new()),
575 client.request::<u64, _>("test", Vec::<u8>::new())
576 );
577 assert!(matches!(a.unwrap_err(), ClientError::RestartNeeded(_)));
578 assert!(matches!(b.unwrap_err(), ClientError::RestartNeeded(_)));
579 assert_eq!(build_count.load(Ordering::SeqCst), 1);
580
581 let value: u64 = client.request("test", Vec::<u8>::new()).await.unwrap();
582 assert_eq!(value, 9);
583 assert_eq!(build_count.load(Ordering::SeqCst), 2);
584 }
585 }
586}
587
588#[cfg(all(target_arch = "wasm32", feature = "wasm-bindgen"))]
589mod wasm {
590 use std::fmt;
591 use std::result::Result;
592 use std::sync::atomic::{AtomicU64, Ordering};
593 use std::time::Duration;
594
595 use gloo_net::http::{Request as JsRequest, Response as JsResponse};
596 use gloo_timers::callback::Timeout;
597 use jsonrpsee::core::client::{BatchResponse, ClientT, Subscription, SubscriptionClientT};
598 use jsonrpsee::core::middleware::Batch;
599 use jsonrpsee::core::params::BatchRequestBuilder;
600 use jsonrpsee::core::traits::ToRpcParams;
601 use jsonrpsee::core::{ClientError, JsonRawValue};
602 use jsonrpsee::types::{
603 Id, InvalidRequestId, Notification, Request, Response, ResponseSuccess,
604 };
605 use send_wrapper::SendWrapper;
606 use serde::Serialize;
607 use serde::de::DeserializeOwned;
608 use tracing::warn;
609 use web_sys::AbortController;
610
611 use crate::Error;
612
613 const ABORT_ERROR_NAME: &str = "AbortError";
614
615 pub struct Client {
617 id: AtomicU64,
618 url: String,
619 auth_token: Option<String>,
620 timeout_ms: Option<u32>,
621 }
622
623 impl Client {
624 pub async fn new(
631 url: &str,
632 auth_token: Option<&str>,
633 connect_timeout: Option<Duration>,
634 request_timeout: Option<Duration>,
635 ) -> Result<Self, Error> {
636 let protocol = url.split_once(':').map(|(proto, _)| proto);
637 match protocol {
638 Some("http") | Some("https") => (),
639 _ => return Err(Error::ProtocolNotSupported(url.into())),
640 };
641 let timeout_ms = request_timeout
642 .map(|t| t.as_millis().try_into())
643 .transpose()
644 .map_err(|_| Error::TimeoutOutOfRange)?;
645 if connect_timeout.is_some() {
646 warn!("ignored connect_timeout: not supported in wasm");
647 }
648
649 Ok(Client {
650 id: AtomicU64::new(0),
651 url: url.into(),
652 auth_token: auth_token.map(ToOwned::to_owned),
653 timeout_ms,
654 })
655 }
656
657 async fn send<T: Serialize>(&self, request: T) -> Result<JsResponse, ClientError> {
658 let (fut, _timeout) = {
659 let mut req = JsRequest::post(&self.url);
660 let mut timeout_callback = None;
661
662 if let Some(timeout) = self.timeout_ms {
663 let abort_controller =
664 AbortController::new().expect("AbortController should be available");
665 let abort_signal = abort_controller.signal();
666 let _ = timeout_callback.insert(Timeout::new(timeout, move || {
667 abort_controller.abort();
668 }));
669 req = req.abort_signal(Some(&abort_signal));
670 }
671
672 if let Some(token) = self.auth_token.as_ref() {
673 req = req.header("Authorization", &format!("Bearer {token}"));
674 }
675
676 (
677 req.json(&request).map_err(into_parse_error)?.send(),
678 SendWrapper::new(timeout_callback),
679 )
680 };
681
682 SendWrapper::new(fut).await.map_err(|e| match e {
683 gloo_net::Error::JsError(e) => {
684 if e.name == ABORT_ERROR_NAME {
685 ClientError::RequestTimeout
686 } else {
687 ClientError::Transport(e.into())
688 }
689 }
690 gloo_net::Error::SerdeError(e) => ClientError::ParseError(e),
691 gloo_net::Error::GlooError(e) => ClientError::Transport(e.into()),
692 })
693 }
694
695 async fn send_and_read_body<T: Serialize>(
696 &self,
697 request: T,
698 ) -> Result<Vec<u8>, ClientError> {
699 let response = SendWrapper::new(self.send(request).await?);
700
701 if !response.ok() {
702 let error = format!(
703 "Request failed: {} {}",
704 response.status(),
705 response.status_text(),
706 );
707 return Err(ClientError::Transport(error.into()));
708 }
709
710 SendWrapper::new(response.binary())
711 .await
712 .map_err(|err| ClientError::Transport(err.into()))
713 }
714 }
715
716 impl ClientT for Client {
717 async fn notification<Params>(
718 &self,
719 method: &str,
720 params: Params,
721 ) -> Result<(), ClientError>
722 where
723 Params: ToRpcParams + Send,
724 {
725 let params = params.to_rpc_params()?;
726 let notification = Notification::new(method.into(), params);
727
728 self.send(notification).await?;
729
730 Ok(())
731 }
732
733 async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, ClientError>
734 where
735 R: DeserializeOwned,
736 Params: ToRpcParams + Send,
737 {
738 let id = Id::Number(self.id.fetch_add(1, Ordering::Relaxed));
739 let params = params.to_rpc_params()?;
740 let request = Request::borrowed(method, params.as_deref(), id.clone());
741 let body = self.send_and_read_body(request).await?;
742
743 let response: Response<&JsonRawValue> = serde_json::from_slice(&body)?;
746 let success = ResponseSuccess::try_from(response)?;
748
749 if success.id == id {
750 let result = serde_json::from_str(success.result.get())?;
752 Ok(result)
753 } else {
754 Err(InvalidRequestId::NotPendingRequest(success.id.to_string()).into())
755 }
756 }
757
758 async fn batch_request<'a, R>(
759 &self,
760 batch: BatchRequestBuilder<'a>,
761 ) -> Result<BatchResponse<'a, R>, ClientError>
762 where
763 R: DeserializeOwned + fmt::Debug + 'a,
764 {
765 let batch = batch.build()?;
767 let batch_len = batch.len();
768
769 let mut ids = Vec::with_capacity(batch_len);
771 let mut batch_request = Batch::with_capacity(batch_len);
772
773 for (method, params) in batch.into_iter() {
774 let id = self.id.fetch_add(1, Ordering::Relaxed);
775 let request = Request::owned(method.into(), params, Id::Number(id));
776
777 ids.push(id);
778 batch_request.push(request);
779 }
780
781 let body = self.send_and_read_body(batch_request).await?;
783 let mut resps: Vec<Response<&JsonRawValue>> = serde_json::from_slice(&body)?;
784
785 resps.sort_by(|lhs, rhs| {
788 let lhs_id = lhs.id.try_parse_inner_as_number().unwrap_or(0);
791 let rhs_id = rhs.id.try_parse_inner_as_number().unwrap_or(0);
792 lhs_id.cmp(&rhs_id)
793 });
794
795 let mut successful = 0;
797 let mut failed = 0;
798
799 let mut batch_resp = Vec::with_capacity(batch_len);
800 for resp in resps.into_iter() {
801 let id = resp.id.try_parse_inner_as_number()?;
802 if !ids.contains(&id) {
803 return Err(InvalidRequestId::NotPendingRequest(id.to_string()).into());
804 }
805
806 let res = match ResponseSuccess::try_from(resp) {
807 Ok(success) => {
808 successful += 1;
809 Ok(serde_json::from_str(success.result.get())?)
810 }
811 Err(error) => {
812 failed += 1;
813 Err(error)
814 }
815 };
816
817 batch_resp.push(res);
818 }
819
820 Ok(BatchResponse::new(successful, batch_resp, failed))
821 }
822 }
823
824 impl SubscriptionClientT for Client {
825 async fn subscribe<'a, N, Params>(
826 &self,
827 _subscribe_method: &'a str,
828 _params: Params,
829 _unsubscribe_method: &'a str,
830 ) -> Result<Subscription<N>, ClientError>
831 where
832 Params: ToRpcParams + Send,
833 N: DeserializeOwned,
834 {
835 Err(ClientError::HttpNotImplemented)
836 }
837
838 async fn subscribe_to_method<N>(
839 &self,
840 _method: &str,
841 ) -> Result<Subscription<N>, ClientError>
842 where
843 N: DeserializeOwned,
844 {
845 Err(ClientError::HttpNotImplemented)
846 }
847 }
848
849 impl fmt::Debug for Client {
850 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
851 f.write_str("Client { .. }")
852 }
853 }
854
855 fn into_parse_error(err: gloo_net::Error) -> ClientError {
858 match err {
859 gloo_net::Error::SerdeError(e) => ClientError::ParseError(e),
860 _ => unreachable!("this can only fail on a call to serde_json"),
861 }
862 }
863}