Skip to main content

jsonrpsee_http_client/
rpc_service.rs

1use std::sync::Arc;
2
3use hyper::body::Bytes;
4use jsonrpsee_core::{
5	BoxError, JsonRawValue,
6	client::{Error, MiddlewareBatchResponse, MiddlewareMethodResponse, MiddlewareNotifResponse},
7	middleware::{Batch, Notification, Request, RpcServiceT},
8};
9use jsonrpsee_types::Response;
10use tower::Service;
11
12use crate::{
13	HttpRequest, HttpResponse,
14	transport::{Error as TransportError, HttpTransportClient},
15};
16
17/// RPC service that provides JSON-RPC functionality over HTTP transport.
18///
19/// This service implements the `RpcServiceT` trait to handle JSON-RPC requests,
20/// batches, and notifications using an underlying HTTP transport client.
21#[derive(Clone, Debug)]
22pub struct RpcService<HttpMiddleware> {
23	service: Arc<HttpTransportClient<HttpMiddleware>>,
24}
25
26impl<HttpMiddleware> RpcService<HttpMiddleware> {
27	/// Creates a new RPC service with the provided HTTP transport client.
28	///
29	/// # Arguments
30	///
31	/// * `service` - The HTTP transport client to use for sending requests
32	pub fn new(service: HttpTransportClient<HttpMiddleware>) -> Self {
33		Self { service: Arc::new(service) }
34	}
35}
36
37impl<B, HttpMiddleware> RpcServiceT for RpcService<HttpMiddleware>
38where
39	HttpMiddleware:
40		Service<HttpRequest, Response = HttpResponse<B>, Error = TransportError> + Clone + Send + Sync + 'static,
41	HttpMiddleware::Future: Send,
42	B: http_body::Body<Data = Bytes> + Send + 'static,
43	B::Data: Send,
44	B::Error: Into<BoxError>,
45{
46	type BatchResponse = Result<MiddlewareBatchResponse, Error>;
47	type MethodResponse = Result<MiddlewareMethodResponse, Error>;
48	type NotificationResponse = Result<MiddlewareNotifResponse, Error>;
49
50	fn call<'a>(&self, request: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
51		let service = self.service.clone();
52
53		async move {
54			let raw = serde_json::to_string(&request)?;
55			let bytes = service.send_and_read_body(raw).await.map_err(|e| Error::Transport(e.into()))?;
56			let mut rp: Response<Box<JsonRawValue>> = serde_json::from_slice(&bytes)?;
57			rp.extensions = request.extensions;
58
59			Ok(MiddlewareMethodResponse::response(rp.into_owned().into()))
60		}
61	}
62
63	fn batch<'a>(&self, batch: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
64		let service = self.service.clone();
65
66		async move {
67			let raw = serde_json::to_string(&batch)?;
68			let bytes = service.send_and_read_body(raw).await.map_err(|e| Error::Transport(e.into()))?;
69			let rp: Vec<_> = serde_json::from_slice::<Vec<Response<Box<JsonRawValue>>>>(&bytes)?
70				.into_iter()
71				.map(|r| r.into_owned().into())
72				.collect();
73
74			Ok(rp)
75		}
76	}
77
78	fn notification<'a>(
79		&self,
80		notif: Notification<'a>,
81	) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
82		let service = self.service.clone();
83
84		async move {
85			let raw = serde_json::to_string(&notif)?;
86			service.send(raw).await.map_err(|e| Error::Transport(e.into()))?;
87			Ok(notif.extensions.into())
88		}
89	}
90}