jsonrpsee_http_client/
rpc_service.rs1use std::sync::Arc;
2
3use hyper::body::Bytes;
4use jsonrpsee_core::{
5 BoxError, JsonRawValue,
6 client::{Error, MiddlewareBatchResponse, MiddlewareMethodResponse, MiddlewareNotifResponse},
7 middleware::{Batch, Notification, Request, RpcServiceT},
8};
9use jsonrpsee_types::Response;
10use tower::Service;
11
12use crate::{
13 HttpRequest, HttpResponse,
14 transport::{Error as TransportError, HttpTransportClient},
15};
16
17#[derive(Clone, Debug)]
19pub struct RpcService<HttpMiddleware> {
20 service: Arc<HttpTransportClient<HttpMiddleware>>,
21}
22
23impl<HttpMiddleware> RpcService<HttpMiddleware> {
24 pub fn new(service: HttpTransportClient<HttpMiddleware>) -> Self {
26 Self { service: Arc::new(service) }
27 }
28}
29
30impl<B, HttpMiddleware> RpcServiceT for RpcService<HttpMiddleware>
31where
32 HttpMiddleware:
33 Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Clone + Send + Sync + 'static,
34 HttpMiddleware::Future: Send,
35 B: http_body::Body<Data = Bytes> + Send + 'static,
36 B::Data: Send,
37 B::Error: Into<BoxError>,
38{
39 type BatchResponse = Result<MiddlewareBatchResponse, Error>;
40 type MethodResponse = Result<MiddlewareMethodResponse, Error>;
41 type NotificationResponse = Result<MiddlewareNotifResponse, Error>;
42
43 fn call<'a>(&self, request: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
44 let service = self.service.clone();
45
46 async move {
47 let raw = serde_json::to_string(&request)?;
48 let bytes = service.send_and_read_body(raw).await.map_err(|e| Error::Transport(e.into()))?;
49 let mut rp: Response<Box<JsonRawValue>> = serde_json::from_slice(&bytes)?;
50 rp.extensions = request.extensions;
51
52 Ok(MiddlewareMethodResponse::response(rp.into_owned().into()))
53 }
54 }
55
56 fn batch<'a>(&self, batch: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
57 let service = self.service.clone();
58
59 async move {
60 let raw = serde_json::to_string(&batch)?;
61 let bytes = service.send_and_read_body(raw).await.map_err(|e| Error::Transport(e.into()))?;
62 let rp: Vec<_> = serde_json::from_slice::<Vec<Response<Box<JsonRawValue>>>>(&bytes)?
63 .into_iter()
64 .map(|r| r.into_owned().into())
65 .collect();
66
67 Ok(rp)
68 }
69 }
70
71 fn notification<'a>(
72 &self,
73 notif: Notification<'a>,
74 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
75 let service = self.service.clone();
76
77 async move {
78 let raw = serde_json::to_string(¬if)?;
79 service.send(raw).await.map_err(|e| Error::Transport(e.into()))?;
80 Ok(notif.extensions.into())
81 }
82 }
83}