jsonrpsee_http_client/
rpc_service.rs

1use std::sync::Arc;
2
3use hyper::body::Bytes;
4use jsonrpsee_core::{
5	BoxError, JsonRawValue,
6	client::{Error, MiddlewareBatchResponse, MiddlewareMethodResponse, MiddlewareNotifResponse},
7	middleware::{Batch, Notification, Request, RpcServiceT},
8};
9use jsonrpsee_types::Response;
10use tower::Service;
11
12use crate::{
13	HttpRequest, HttpResponse,
14	transport::{Error as TransportError, HttpTransportClient},
15};
16
17/// An [`RpcServiceT`] compliant implementation for the HTTP client.
18#[derive(Clone, Debug)]
19pub struct RpcService<HttpMiddleware> {
20	service: Arc<HttpTransportClient<HttpMiddleware>>,
21}
22
23impl<HttpMiddleware> RpcService<HttpMiddleware> {
24	/// Convert an [`HttpTransportClient`] into an [`RpcService`].
25	pub fn new(service: HttpTransportClient<HttpMiddleware>) -> Self {
26		Self { service: Arc::new(service) }
27	}
28}
29
30impl<B, HttpMiddleware> RpcServiceT for RpcService<HttpMiddleware>
31where
32	HttpMiddleware:
33		Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Clone + Send + Sync + 'static,
34	HttpMiddleware::Future: Send,
35	B: http_body::Body<Data = Bytes> + Send + 'static,
36	B::Data: Send,
37	B::Error: Into<BoxError>,
38{
39	type BatchResponse = Result<MiddlewareBatchResponse, Error>;
40	type MethodResponse = Result<MiddlewareMethodResponse, Error>;
41	type NotificationResponse = Result<MiddlewareNotifResponse, Error>;
42
43	fn call<'a>(&self, request: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
44		let service = self.service.clone();
45
46		async move {
47			let raw = serde_json::to_string(&request)?;
48			let bytes = service.send_and_read_body(raw).await.map_err(|e| Error::Transport(e.into()))?;
49			let mut rp: Response<Box<JsonRawValue>> = serde_json::from_slice(&bytes)?;
50			rp.extensions = request.extensions;
51
52			Ok(MiddlewareMethodResponse::response(rp.into_owned().into()))
53		}
54	}
55
56	fn batch<'a>(&self, batch: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
57		let service = self.service.clone();
58
59		async move {
60			let raw = serde_json::to_string(&batch)?;
61			let bytes = service.send_and_read_body(raw).await.map_err(|e| Error::Transport(e.into()))?;
62			let rp: Vec<_> = serde_json::from_slice::<Vec<Response<Box<JsonRawValue>>>>(&bytes)?
63				.into_iter()
64				.map(|r| r.into_owned().into())
65				.collect();
66
67			Ok(rp)
68		}
69	}
70
71	fn notification<'a>(
72		&self,
73		notif: Notification<'a>,
74	) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
75		let service = self.service.clone();
76
77		async move {
78			let raw = serde_json::to_string(&notif)?;
79			service.send(raw).await.map_err(|e| Error::Transport(e.into()))?;
80			Ok(notif.extensions.into())
81		}
82	}
83}