jsonrpsee_http_client/
rpc_service.rs1use std::sync::Arc;
2
3use hyper::body::Bytes;
4use jsonrpsee_core::{
5 BoxError, JsonRawValue,
6 client::{Error, MiddlewareBatchResponse, MiddlewareMethodResponse, MiddlewareNotifResponse},
7 middleware::{Batch, Notification, Request, RpcServiceT},
8};
9use jsonrpsee_types::Response;
10use tower::Service;
11
12use crate::{
13 HttpRequest, HttpResponse,
14 transport::{Error as TransportError, HttpTransportClient},
15};
16
17#[derive(Clone, Debug)]
22pub struct RpcService<HttpMiddleware> {
23 service: Arc<HttpTransportClient<HttpMiddleware>>,
24}
25
26impl<HttpMiddleware> RpcService<HttpMiddleware> {
27 pub fn new(service: HttpTransportClient<HttpMiddleware>) -> Self {
33 Self { service: Arc::new(service) }
34 }
35}
36
37impl<B, HttpMiddleware> RpcServiceT for RpcService<HttpMiddleware>
38where
39 HttpMiddleware:
40 Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Clone + Send + Sync + 'static,
41 HttpMiddleware::Future: Send,
42 B: http_body::Body<Data = Bytes> + Send + 'static,
43 B::Data: Send,
44 B::Error: Into<BoxError>,
45{
46 type BatchResponse = Result<MiddlewareBatchResponse, Error>;
47 type MethodResponse = Result<MiddlewareMethodResponse, Error>;
48 type NotificationResponse = Result<MiddlewareNotifResponse, Error>;
49
50 fn call<'a>(&self, request: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
51 let service = self.service.clone();
52
53 async move {
54 let raw = serde_json::to_string(&request)?;
55 let bytes = service.send_and_read_body(raw).await.map_err(|e| Error::Transport(e.into()))?;
56 let mut rp: Response<Box<JsonRawValue>> = serde_json::from_slice(&bytes)?;
57 rp.extensions = request.extensions;
58
59 Ok(MiddlewareMethodResponse::response(rp.into_owned().into()))
60 }
61 }
62
63 fn batch<'a>(&self, batch: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
64 let service = self.service.clone();
65
66 async move {
67 let raw = serde_json::to_string(&batch)?;
68 let bytes = service.send_and_read_body(raw).await.map_err(|e| Error::Transport(e.into()))?;
69 let rp: Vec<_> = serde_json::from_slice::<Vec<Response<Box<JsonRawValue>>>>(&bytes)?
70 .into_iter()
71 .map(|r| r.into_owned().into())
72 .collect();
73
74 Ok(rp)
75 }
76 }
77
78 fn notification<'a>(
79 &self,
80 notif: Notification<'a>,
81 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
82 let service = self.service.clone();
83
84 async move {
85 let raw = serde_json::to_string(¬if)?;
86 service.send(raw).await.map_err(|e| Error::Transport(e.into()))?;
87 Ok(notif.extensions.into())
88 }
89 }
90}