Skip to main content

celestia_rpc/
client.rs

1//! Clients for the celestia Json-RPC.
2//!
3//! This module aims to provide a convenient way to create a Json-RPC clients. If
4//! you need more configuration options and / or some custom client you can create
5//! one using [`jsonrpsee`] crate directly.
6
7#[cfg(not(target_arch = "wasm32"))]
8pub use self::native::Client;
9
10#[cfg(all(target_arch = "wasm32", feature = "wasm-bindgen"))]
11pub use self::wasm::Client;
12
13#[cfg(not(target_arch = "wasm32"))]
14mod native {
15    use std::fmt;
16    use std::result::Result;
17    use std::sync::Arc;
18    use std::time::Duration;
19
20    use http::{HeaderValue, header};
21    use jsonrpsee::core::ClientError;
22    use jsonrpsee::core::client::{BatchResponse, ClientT, Subscription, SubscriptionClientT};
23    use jsonrpsee::core::params::BatchRequestBuilder;
24    use jsonrpsee::core::traits::ToRpcParams;
25    use jsonrpsee::http_client::{HeaderMap, HttpClient, HttpClientBuilder};
26    use jsonrpsee::ws_client::{PingConfig, WsClient, WsClientBuilder};
27    use serde::de::DeserializeOwned;
28    use tokio::sync::RwLock;
29    use tracing::warn;
30
31    use crate::Error;
32
33    const MAX_RESPONSE_SIZE: usize = 256 * 1024 * 1024;
34
35    /// Json RPC client.
36    pub enum Client {
37        /// A client using 'http\[s\]' protocol.
38        Http(HttpClient),
39        /// A client using 'ws\[s\]' protocol.
40        Ws(WsReconnectClient<WsClient>),
41    }
42
43    impl Client {
44        /// Create a new Json RPC client.
45        ///
46        /// Only 'http\[s\]' and 'ws\[s\]' protocols are supported and they should
47        /// be specified in the provided `url`. For more flexibility
48        /// consider creating the client using [`jsonrpsee`] directly.
49        ///
50        /// Please note that currently the celestia-node supports only 'http' and 'ws'.
51        /// For a secure connection you have to hide it behind a proxy.
52        pub async fn new(
53            url: &str,
54            auth_token: Option<&str>,
55            connect_timeout: Option<Duration>,
56            request_timeout: Option<Duration>,
57        ) -> Result<Self, Error> {
58            let protocol = url.split_once(':').map(|(proto, _)| proto);
59            let client = match protocol {
60                Some("http") | Some("https") => {
61                    let headers = build_headers(auth_token)?;
62                    let mut builder = HttpClientBuilder::default()
63                        .max_response_size(MAX_RESPONSE_SIZE as u32)
64                        .set_headers(headers);
65                    if let Some(timeout) = request_timeout {
66                        builder = builder.request_timeout(timeout);
67                    }
68                    if connect_timeout.is_some() {
69                        warn!("ignored connect_timeout: not supported with http(s)");
70                    }
71                    Client::Http(builder.build(url)?)
72                }
73                Some("ws") | Some("wss") => Client::Ws(
74                    WsReconnectClient::new(url, auth_token, connect_timeout, request_timeout)
75                        .await?,
76                ),
77                _ => return Err(Error::ProtocolNotSupported(url.into())),
78            };
79
80            Ok(client)
81        }
82    }
83
84    pub struct WsReconnectClient<C> {
85        state: RwLock<WsState<C>>,
86        build: BuildFn<C>,
87    }
88
89    struct WsState<C> {
90        inner: Arc<C>,
91        poisoned: bool,
92        epoch: u64,
93    }
94
95    type BuildFn<C> =
96        Arc<dyn Fn() -> futures_util::future::BoxFuture<'static, Result<C, Error>> + Send + Sync>;
97
98    impl WsReconnectClient<WsClient> {
99        async fn new(
100            url: &str,
101            auth_token: Option<&str>,
102            connect_timeout: Option<Duration>,
103            request_timeout: Option<Duration>,
104        ) -> Result<Self, Error> {
105            let url = url.to_owned();
106            let auth_token = auth_token.map(str::to_owned);
107            let build: BuildFn<WsClient> = Arc::new(move || {
108                let url = url.clone();
109                let auth_token = auth_token.clone();
110                Box::pin(async move {
111                    build_ws_client(
112                        &url,
113                        auth_token.as_deref(),
114                        connect_timeout,
115                        request_timeout,
116                    )
117                    .await
118                })
119            });
120            WsReconnectClient::new_with_factory(build).await
121        }
122    }
123
124    impl<C> WsReconnectClient<C>
125    where
126        C: ClientT + SubscriptionClientT + Send + Sync + 'static,
127    {
128        async fn new_with_factory(build: BuildFn<C>) -> Result<Self, Error> {
129            let inner = Arc::new((build)().await?);
130            Ok(Self {
131                state: RwLock::new(WsState {
132                    inner,
133                    poisoned: false,
134                    epoch: 0,
135                }),
136                build,
137            })
138        }
139
140        async fn get_inner(&self) -> Result<Arc<C>, ClientError> {
141            let (epoch, poisoned, inner) = {
142                let state = self.state.read().await;
143                (state.epoch, state.poisoned, state.inner.clone())
144            };
145            if !poisoned {
146                return Ok(inner);
147            }
148
149            let mut state = self.state.write().await;
150            if state.poisoned && state.epoch == epoch {
151                let new_inner = Arc::new(
152                    (self.build)()
153                        .await
154                        .map_err(|err| ClientError::Custom(err.to_string()))?,
155                );
156                state.inner = new_inner;
157                state.poisoned = false;
158            }
159            Ok(state.inner.clone())
160        }
161
162        async fn mark_poisoned(&self) {
163            let mut state = self.state.write().await;
164            if !state.poisoned {
165                state.poisoned = true;
166                state.epoch += 1;
167            }
168        }
169
170        async fn notification<Params>(
171            &self,
172            method: &str,
173            params: Params,
174        ) -> Result<(), ClientError>
175        where
176            Params: ToRpcParams + Send,
177        {
178            let inner = self.get_inner().await?;
179            let err = match inner.notification(method, params).await {
180                Err(err @ ClientError::RestartNeeded(_)) => err,
181                res => return res,
182            };
183            self.mark_poisoned().await;
184            Err(err)
185        }
186
187        async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, ClientError>
188        where
189            R: DeserializeOwned,
190            Params: ToRpcParams + Send,
191        {
192            let inner = self.get_inner().await?;
193            let err = match inner.request(method, params).await {
194                Err(err @ ClientError::RestartNeeded(_)) => err,
195                res => return res,
196            };
197            self.mark_poisoned().await;
198            Err(err)
199        }
200
201        async fn subscribe<'a, N, Params>(
202            &self,
203            subscribe_method: &'a str,
204            params: Params,
205            unsubscribe_method: &'a str,
206        ) -> Result<Subscription<N>, ClientError>
207        where
208            Params: ToRpcParams + Send,
209            N: DeserializeOwned,
210        {
211            let inner = self.get_inner().await?;
212            let err = match inner
213                .subscribe(subscribe_method, params, unsubscribe_method)
214                .await
215            {
216                Err(err @ ClientError::RestartNeeded(_)) => err,
217                res => return res,
218            };
219            self.mark_poisoned().await;
220            Err(err)
221        }
222
223        async fn subscribe_to_method<N>(&self, method: &str) -> Result<Subscription<N>, ClientError>
224        where
225            N: DeserializeOwned,
226        {
227            let inner = self.get_inner().await?;
228            let err = match inner.subscribe_to_method(method).await {
229                Err(err @ ClientError::RestartNeeded(_)) => err,
230                res => return res,
231            };
232            self.mark_poisoned().await;
233            Err(err)
234        }
235    }
236
237    fn build_headers(auth_token: Option<&str>) -> Result<HeaderMap, Error> {
238        let mut headers = HeaderMap::new();
239
240        if let Some(token) = auth_token {
241            let val = HeaderValue::from_str(&format!("Bearer {token}"))?;
242            headers.insert(header::AUTHORIZATION, val);
243        }
244
245        Ok(headers)
246    }
247
248    async fn build_ws_client(
249        url: &str,
250        auth_token: Option<&str>,
251        connect_timeout: Option<Duration>,
252        request_timeout: Option<Duration>,
253    ) -> Result<WsClient, Error> {
254        let headers = build_headers(auth_token)?;
255        let mut builder = WsClientBuilder::default()
256            .max_response_size(MAX_RESPONSE_SIZE as u32)
257            .set_headers(headers)
258            .enable_ws_ping(PingConfig::default());
259        if let Some(timeout) = request_timeout {
260            builder = builder.request_timeout(timeout);
261        }
262        if let Some(timeout) = connect_timeout {
263            builder = builder.connection_timeout(timeout);
264        }
265        Ok(builder.build(url).await?)
266    }
267
268    impl ClientT for Client {
269        async fn notification<Params>(
270            &self,
271            method: &str,
272            params: Params,
273        ) -> Result<(), ClientError>
274        where
275            Params: ToRpcParams + Send,
276        {
277            match self {
278                Client::Http(client) => client.notification(method, params).await,
279                Client::Ws(client) => client.notification(method, params).await,
280            }
281        }
282
283        async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, ClientError>
284        where
285            R: DeserializeOwned,
286            Params: ToRpcParams + Send,
287        {
288            match self {
289                Client::Http(client) => client.request(method, params).await,
290                Client::Ws(client) => client.request(method, params).await,
291            }
292        }
293
294        async fn batch_request<'a, R>(
295            &self,
296            batch: BatchRequestBuilder<'a>,
297        ) -> Result<BatchResponse<'a, R>, ClientError>
298        where
299            R: DeserializeOwned + fmt::Debug + 'a,
300        {
301            match self {
302                Client::Http(client) => client.batch_request(batch).await,
303                Client::Ws(client) => {
304                    let inner = client.get_inner().await?;
305                    let err = match inner.batch_request(batch).await {
306                        Err(err @ ClientError::RestartNeeded(_)) => err,
307                        res => return res,
308                    };
309                    client.mark_poisoned().await;
310                    Err(err)
311                }
312            }
313        }
314    }
315
316    impl SubscriptionClientT for Client {
317        async fn subscribe<'a, N, Params>(
318            &self,
319            subscribe_method: &'a str,
320            params: Params,
321            unsubscribe_method: &'a str,
322        ) -> Result<Subscription<N>, ClientError>
323        where
324            Params: ToRpcParams + Send,
325            N: DeserializeOwned,
326        {
327            match self {
328                Client::Http(client) => {
329                    client
330                        .subscribe(subscribe_method, params, unsubscribe_method)
331                        .await
332                }
333                Client::Ws(client) => {
334                    client
335                        .subscribe(subscribe_method, params, unsubscribe_method)
336                        .await
337                }
338            }
339        }
340
341        async fn subscribe_to_method<N>(&self, method: &str) -> Result<Subscription<N>, ClientError>
342        where
343            N: DeserializeOwned,
344        {
345            match self {
346                Client::Http(client) => client.subscribe_to_method(method).await,
347                Client::Ws(client) => client.subscribe_to_method(method).await,
348            }
349        }
350    }
351
352    impl fmt::Debug for Client {
353        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354            f.write_str("Client { .. }")
355        }
356    }
357
358    #[cfg(test)]
359    mod tests {
360        use std::sync::Arc;
361        use std::sync::atomic::{AtomicUsize, Ordering};
362
363        use jsonrpsee::core::ClientError;
364        use jsonrpsee::core::params::BatchRequestBuilder;
365        use serde::de::DeserializeOwned;
366        use serde_json::Value as JsonValue;
367        use tokio::join;
368        use tokio::sync::Barrier;
369
370        use super::{BuildFn, WsReconnectClient};
371
372        struct FakeWsClient {
373            remaining_failures: AtomicUsize,
374            response: JsonValue,
375            barrier: Option<Arc<tokio::sync::Barrier>>,
376        }
377
378        impl FakeWsClient {
379            fn new(remaining_failures: usize, response: JsonValue) -> Self {
380                Self {
381                    remaining_failures: AtomicUsize::new(remaining_failures),
382                    response,
383                    barrier: None,
384                }
385            }
386
387            fn new_with_barrier(
388                remaining_failures: usize,
389                response: JsonValue,
390                barrier: Arc<tokio::sync::Barrier>,
391            ) -> Self {
392                Self {
393                    remaining_failures: AtomicUsize::new(remaining_failures),
394                    response,
395                    barrier: Some(barrier),
396                }
397            }
398        }
399
400        impl jsonrpsee::core::client::ClientT for FakeWsClient {
401            async fn notification<Params>(
402                &self,
403                _method: &str,
404                _params: Params,
405            ) -> Result<(), ClientError>
406            where
407                Params: jsonrpsee::core::traits::ToRpcParams + Send,
408            {
409                Ok(())
410            }
411
412            fn request<R, Params>(
413                &self,
414                _method: &str,
415                _params: Params,
416            ) -> impl std::future::Future<Output = Result<R, ClientError>> + Send
417            where
418                R: DeserializeOwned,
419                Params: jsonrpsee::core::traits::ToRpcParams + Send,
420            {
421                let barrier = self.barrier.clone();
422                let response = self.response.clone();
423                async move {
424                    if let Some(barrier) = barrier {
425                        barrier.wait().await;
426                    }
427                    let should_fail = self
428                        .remaining_failures
429                        .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
430                            value.checked_sub(1)
431                        })
432                        .is_ok();
433                    if should_fail {
434                        return Err(ClientError::RestartNeeded(Arc::new(ClientError::Custom(
435                            "restart".into(),
436                        ))));
437                    }
438                    serde_json::from_value(response).map_err(ClientError::ParseError)
439                }
440            }
441
442            async fn batch_request<'a, R>(
443                &self,
444                _batch: BatchRequestBuilder<'a>,
445            ) -> Result<jsonrpsee::core::client::BatchResponse<'a, R>, ClientError>
446            where
447                R: DeserializeOwned + std::fmt::Debug + 'a,
448            {
449                Err(ClientError::Custom(
450                    "batch_request not implemented in FakeWsClient".into(),
451                ))
452            }
453        }
454
455        impl jsonrpsee::core::client::SubscriptionClientT for FakeWsClient {
456            async fn subscribe<'a, N, Params>(
457                &self,
458                _subscribe_method: &'a str,
459                _params: Params,
460                _unsubscribe_method: &'a str,
461            ) -> Result<jsonrpsee::core::client::Subscription<N>, ClientError>
462            where
463                Params: jsonrpsee::core::traits::ToRpcParams + Send,
464                N: DeserializeOwned,
465            {
466                Err(ClientError::Custom(
467                    "subscribe not implemented in FakeWsClient".into(),
468                ))
469            }
470
471            async fn subscribe_to_method<N>(
472                &self,
473                _method: &str,
474            ) -> Result<jsonrpsee::core::client::Subscription<N>, ClientError>
475            where
476                N: DeserializeOwned,
477            {
478                Err(ClientError::Custom(
479                    "subscribe_to_method not implemented in FakeWsClient".into(),
480                ))
481            }
482        }
483
484        #[tokio::test]
485        async fn request_marks_poisoned_on_restart_needed() {
486            let build_count = Arc::new(AtomicUsize::new(0));
487            let response = serde_json::json!(7u64);
488            let build: BuildFn<FakeWsClient> = {
489                let build_count = build_count.clone();
490                Arc::new(move || {
491                    let build_count = build_count.clone();
492                    let response = response.clone();
493                    Box::pin(async move {
494                        let id = build_count.fetch_add(1, Ordering::SeqCst);
495                        let failures = if id == 0 { 1 } else { 0 };
496                        Ok(FakeWsClient::new(failures, response))
497                    })
498                })
499            };
500
501            let client = WsReconnectClient::new_with_factory(build).await.unwrap();
502            let err = client
503                .request::<u64, _>("test", Vec::<u8>::new())
504                .await
505                .unwrap_err();
506            assert!(matches!(err, ClientError::RestartNeeded(_)));
507            assert_eq!(build_count.load(Ordering::SeqCst), 1);
508
509            let value: u64 = client.request("test", Vec::<u8>::new()).await.unwrap();
510            assert_eq!(value, 7);
511            assert_eq!(build_count.load(Ordering::SeqCst), 2);
512        }
513
514        #[tokio::test]
515        async fn concurrent_requests_share_single_reconnect() {
516            let build_count = Arc::new(AtomicUsize::new(0));
517            let response = serde_json::json!(5u64);
518            let build: BuildFn<FakeWsClient> = {
519                let build_count = build_count.clone();
520                Arc::new(move || {
521                    let build_count = build_count.clone();
522                    let response = response.clone();
523                    Box::pin(async move {
524                        let id = build_count.fetch_add(1, Ordering::SeqCst);
525                        let failures = if id == 0 { 1 } else { 0 };
526                        Ok(FakeWsClient::new(failures, response))
527                    })
528                })
529            };
530
531            let client = WsReconnectClient::new_with_factory(build).await.unwrap();
532            let err = client
533                .request::<u64, _>("test", Vec::<u8>::new())
534                .await
535                .unwrap_err();
536            assert!(matches!(err, ClientError::RestartNeeded(_)));
537            assert_eq!(build_count.load(Ordering::SeqCst), 1);
538
539            let (a, b) = join!(
540                client.request::<u64, _>("test", Vec::<u8>::new()),
541                client.request::<u64, _>("test", Vec::<u8>::new())
542            );
543            assert_eq!(a.unwrap(), 5);
544            assert_eq!(b.unwrap(), 5);
545            assert_eq!(build_count.load(Ordering::SeqCst), 2);
546        }
547
548        #[tokio::test]
549        async fn concurrent_restart_needed_dedupes_reconnect() {
550            let build_count = Arc::new(AtomicUsize::new(0));
551            let response = serde_json::json!(9u64);
552            let barrier = Arc::new(Barrier::new(2));
553            let build: BuildFn<FakeWsClient> = {
554                let build_count = build_count.clone();
555                let barrier = barrier.clone();
556                Arc::new(move || {
557                    let build_count = build_count.clone();
558                    let response = response.clone();
559                    let barrier = barrier.clone();
560                    Box::pin(async move {
561                        let id = build_count.fetch_add(1, Ordering::SeqCst);
562                        let failures = if id == 0 { 2 } else { 0 };
563                        if id == 0 {
564                            Ok(FakeWsClient::new_with_barrier(failures, response, barrier))
565                        } else {
566                            Ok(FakeWsClient::new(failures, response))
567                        }
568                    })
569                })
570            };
571
572            let client = WsReconnectClient::new_with_factory(build).await.unwrap();
573            let (a, b) = join!(
574                client.request::<u64, _>("test", Vec::<u8>::new()),
575                client.request::<u64, _>("test", Vec::<u8>::new())
576            );
577            assert!(matches!(a.unwrap_err(), ClientError::RestartNeeded(_)));
578            assert!(matches!(b.unwrap_err(), ClientError::RestartNeeded(_)));
579            assert_eq!(build_count.load(Ordering::SeqCst), 1);
580
581            let value: u64 = client.request("test", Vec::<u8>::new()).await.unwrap();
582            assert_eq!(value, 9);
583            assert_eq!(build_count.load(Ordering::SeqCst), 2);
584        }
585    }
586}
587
588#[cfg(all(target_arch = "wasm32", feature = "wasm-bindgen"))]
589mod wasm {
590    use std::fmt;
591    use std::result::Result;
592    use std::sync::atomic::{AtomicU64, Ordering};
593    use std::time::Duration;
594
595    use gloo_net::http::{Request as JsRequest, Response as JsResponse};
596    use gloo_timers::callback::Timeout;
597    use jsonrpsee::core::client::{BatchResponse, ClientT, Subscription, SubscriptionClientT};
598    use jsonrpsee::core::middleware::Batch;
599    use jsonrpsee::core::params::BatchRequestBuilder;
600    use jsonrpsee::core::traits::ToRpcParams;
601    use jsonrpsee::core::{ClientError, JsonRawValue};
602    use jsonrpsee::types::{
603        Id, InvalidRequestId, Notification, Request, Response, ResponseSuccess,
604    };
605    use send_wrapper::SendWrapper;
606    use serde::Serialize;
607    use serde::de::DeserializeOwned;
608    use tracing::warn;
609    use web_sys::AbortController;
610
611    use crate::Error;
612
613    const ABORT_ERROR_NAME: &str = "AbortError";
614
615    /// Json RPC client.
616    pub struct Client {
617        id: AtomicU64,
618        url: String,
619        auth_token: Option<String>,
620        timeout_ms: Option<u32>,
621    }
622
623    impl Client {
624        /// Create a new Json RPC client.
625        ///
626        /// Only the 'http\[s\]' protocols are supported because JavaScript
627        /// doesn't allow setting headers with websocket. If you want to
628        /// use the websocket client anyway, you can use the one from the
629        /// `jsonrpsee` directly, but you need a node with `--rpc.skip-auth`.
630        pub async fn new(
631            url: &str,
632            auth_token: Option<&str>,
633            connect_timeout: Option<Duration>,
634            request_timeout: Option<Duration>,
635        ) -> Result<Self, Error> {
636            let protocol = url.split_once(':').map(|(proto, _)| proto);
637            match protocol {
638                Some("http") | Some("https") => (),
639                _ => return Err(Error::ProtocolNotSupported(url.into())),
640            };
641            let timeout_ms = request_timeout
642                .map(|t| t.as_millis().try_into())
643                .transpose()
644                .map_err(|_| Error::TimeoutOutOfRange)?;
645            if connect_timeout.is_some() {
646                warn!("ignored connect_timeout: not supported in wasm");
647            }
648
649            Ok(Client {
650                id: AtomicU64::new(0),
651                url: url.into(),
652                auth_token: auth_token.map(ToOwned::to_owned),
653                timeout_ms,
654            })
655        }
656
657        async fn send<T: Serialize>(&self, request: T) -> Result<JsResponse, ClientError> {
658            let (fut, _timeout) = {
659                let mut req = JsRequest::post(&self.url);
660                let mut timeout_callback = None;
661
662                if let Some(timeout) = self.timeout_ms {
663                    let abort_controller =
664                        AbortController::new().expect("AbortController should be available");
665                    let abort_signal = abort_controller.signal();
666                    let _ = timeout_callback.insert(Timeout::new(timeout, move || {
667                        abort_controller.abort();
668                    }));
669                    req = req.abort_signal(Some(&abort_signal));
670                }
671
672                if let Some(token) = self.auth_token.as_ref() {
673                    req = req.header("Authorization", &format!("Bearer {token}"));
674                }
675
676                (
677                    req.json(&request).map_err(into_parse_error)?.send(),
678                    SendWrapper::new(timeout_callback),
679                )
680            };
681
682            SendWrapper::new(fut).await.map_err(|e| match e {
683                gloo_net::Error::JsError(e) => {
684                    if e.name == ABORT_ERROR_NAME {
685                        ClientError::RequestTimeout
686                    } else {
687                        ClientError::Transport(e.into())
688                    }
689                }
690                gloo_net::Error::SerdeError(e) => ClientError::ParseError(e),
691                gloo_net::Error::GlooError(e) => ClientError::Transport(e.into()),
692            })
693        }
694
695        async fn send_and_read_body<T: Serialize>(
696            &self,
697            request: T,
698        ) -> Result<Vec<u8>, ClientError> {
699            let response = SendWrapper::new(self.send(request).await?);
700
701            if !response.ok() {
702                let error = format!(
703                    "Request failed: {} {}",
704                    response.status(),
705                    response.status_text(),
706                );
707                return Err(ClientError::Transport(error.into()));
708            }
709
710            SendWrapper::new(response.binary())
711                .await
712                .map_err(|err| ClientError::Transport(err.into()))
713        }
714    }
715
716    impl ClientT for Client {
717        async fn notification<Params>(
718            &self,
719            method: &str,
720            params: Params,
721        ) -> Result<(), ClientError>
722        where
723            Params: ToRpcParams + Send,
724        {
725            let params = params.to_rpc_params()?;
726            let notification = Notification::new(method.into(), params);
727
728            self.send(notification).await?;
729
730            Ok(())
731        }
732
733        async fn request<R, Params>(&self, method: &str, params: Params) -> Result<R, ClientError>
734        where
735            R: DeserializeOwned,
736            Params: ToRpcParams + Send,
737        {
738            let id = Id::Number(self.id.fetch_add(1, Ordering::Relaxed));
739            let params = params.to_rpc_params()?;
740            let request = Request::borrowed(method, params.as_deref(), id.clone());
741            let body = self.send_and_read_body(request).await?;
742
743            // deserialize whole jsonrpc response except the `result` field, which is
744            // kept in raw form (if it is present at all)
745            let response: Response<&JsonRawValue> = serde_json::from_slice(&body)?;
746            // bail if the response is an error
747            let success = ResponseSuccess::try_from(response)?;
748
749            if success.id == id {
750                // read the actual `result` field
751                let result = serde_json::from_str(success.result.get())?;
752                Ok(result)
753            } else {
754                Err(InvalidRequestId::NotPendingRequest(success.id.to_string()).into())
755            }
756        }
757
758        async fn batch_request<'a, R>(
759            &self,
760            batch: BatchRequestBuilder<'a>,
761        ) -> Result<BatchResponse<'a, R>, ClientError>
762        where
763            R: DeserializeOwned + fmt::Debug + 'a,
764        {
765            // this will throw an error if batch is empty
766            let batch = batch.build()?;
767            let batch_len = batch.len();
768
769            // fill in batch request
770            let mut ids = Vec::with_capacity(batch_len);
771            let mut batch_request = Batch::with_capacity(batch_len);
772
773            for (method, params) in batch.into_iter() {
774                let id = self.id.fetch_add(1, Ordering::Relaxed);
775                let request = Request::owned(method.into(), params, Id::Number(id));
776
777                ids.push(id);
778                batch_request.push(request);
779            }
780
781            // send it and grab response, using the same trick with not deserializing `result` field yet
782            let body = self.send_and_read_body(batch_request).await?;
783            let mut resps: Vec<Response<&JsonRawValue>> = serde_json::from_slice(&body)?;
784
785            // no docs mention that responses should be returned in order
786            // but jsonrpsee implementations always take care to do that
787            resps.sort_by(|lhs, rhs| {
788                // put those with non-numeric ids first, we'll error out on them
789                // at the beginning of the next loop
790                let lhs_id = lhs.id.try_parse_inner_as_number().unwrap_or(0);
791                let rhs_id = rhs.id.try_parse_inner_as_number().unwrap_or(0);
792                lhs_id.cmp(&rhs_id)
793            });
794
795            // prepare the batch result
796            let mut successful = 0;
797            let mut failed = 0;
798
799            let mut batch_resp = Vec::with_capacity(batch_len);
800            for resp in resps.into_iter() {
801                let id = resp.id.try_parse_inner_as_number()?;
802                if !ids.contains(&id) {
803                    return Err(InvalidRequestId::NotPendingRequest(id.to_string()).into());
804                }
805
806                let res = match ResponseSuccess::try_from(resp) {
807                    Ok(success) => {
808                        successful += 1;
809                        Ok(serde_json::from_str(success.result.get())?)
810                    }
811                    Err(error) => {
812                        failed += 1;
813                        Err(error)
814                    }
815                };
816
817                batch_resp.push(res);
818            }
819
820            Ok(BatchResponse::new(successful, batch_resp, failed))
821        }
822    }
823
824    impl SubscriptionClientT for Client {
825        async fn subscribe<'a, N, Params>(
826            &self,
827            _subscribe_method: &'a str,
828            _params: Params,
829            _unsubscribe_method: &'a str,
830        ) -> Result<Subscription<N>, ClientError>
831        where
832            Params: ToRpcParams + Send,
833            N: DeserializeOwned,
834        {
835            Err(ClientError::HttpNotImplemented)
836        }
837
838        async fn subscribe_to_method<N>(
839            &self,
840            _method: &str,
841        ) -> Result<Subscription<N>, ClientError>
842        where
843            N: DeserializeOwned,
844        {
845            Err(ClientError::HttpNotImplemented)
846        }
847    }
848
849    impl fmt::Debug for Client {
850        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
851            f.write_str("Client { .. }")
852        }
853    }
854
855    /// Used to translate gloo errors originating from serde into client errors.
856    /// Can panic if used in a place that has other gloo error types possible.
857    fn into_parse_error(err: gloo_net::Error) -> ClientError {
858        match err {
859            gloo_net::Error::SerdeError(e) => ClientError::ParseError(e),
860            _ => unreachable!("this can only fail on a call to serde_json"),
861        }
862    }
863}