jsonrpsee_core/client/async_client/
rpc_service.rs

1use crate::{
2	client::{
3		BatchMessage, Error, FrontToBack, MiddlewareBatchResponse, MiddlewareMethodResponse, MiddlewareNotifResponse,
4		RequestMessage, SubscriptionMessage, SubscriptionResponse,
5	},
6	middleware::{Batch, IsBatch, IsSubscription, Notification, Request, RpcServiceT},
7};
8
9use jsonrpsee_types::{Response, ResponsePayload};
10use tokio::sync::{mpsc, oneshot};
11
12impl From<mpsc::error::SendError<FrontToBack>> for Error {
13	fn from(_: mpsc::error::SendError<FrontToBack>) -> Self {
14		Error::ServiceDisconnect
15	}
16}
17
18impl From<oneshot::error::RecvError> for Error {
19	fn from(_: oneshot::error::RecvError) -> Self {
20		Error::ServiceDisconnect
21	}
22}
23
24/// RpcService implementation for the async client.
25#[derive(Debug, Clone)]
26pub struct RpcService(mpsc::Sender<FrontToBack>);
27
28impl RpcService {
29	// This is a private interface but we need to expose it for the async client
30	// to be able to create the service.
31	#[allow(private_interfaces)]
32	pub(crate) fn new(tx: mpsc::Sender<FrontToBack>) -> Self {
33		Self(tx)
34	}
35}
36
37impl RpcServiceT for RpcService {
38	type MethodResponse = Result<MiddlewareMethodResponse, Error>;
39	type BatchResponse = Result<MiddlewareBatchResponse, Error>;
40	type NotificationResponse = Result<MiddlewareNotifResponse, Error>;
41
42	fn call<'a>(&self, request: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
43		let tx = self.0.clone();
44
45		async move {
46			let raw = serde_json::to_string(&request)?;
47
48			match request.extensions.get::<IsSubscription>() {
49				Some(sub) => {
50					let (send_back_tx, send_back_rx) = tokio::sync::oneshot::channel();
51
52					tx.clone()
53						.send(FrontToBack::Subscribe(SubscriptionMessage {
54							raw,
55							subscribe_id: sub.sub_req_id(),
56							unsubscribe_id: sub.unsub_req_id(),
57							unsubscribe_method: sub.unsubscribe_method().to_owned(),
58							send_back: send_back_tx,
59						}))
60						.await?;
61
62					let (subscribe_rx, sub_id) = send_back_rx.await??;
63
64					let rp = serde_json::value::to_raw_value(&sub_id)?;
65
66					Ok(MiddlewareMethodResponse::subscription_response(
67						Response::new(ResponsePayload::success(rp), request.id.clone().into_owned()).into(),
68						SubscriptionResponse { sub_id, stream: subscribe_rx },
69					))
70				}
71				None => {
72					let (send_back_tx, send_back_rx) = oneshot::channel();
73
74					tx.send(FrontToBack::Request(RequestMessage {
75						raw,
76						send_back: Some(send_back_tx),
77						id: request.id.clone().into_owned(),
78					}))
79					.await?;
80					let mut rp = send_back_rx.await??;
81
82					rp.0.extensions = request.extensions;
83
84					Ok(MiddlewareMethodResponse::response(rp))
85				}
86			}
87		}
88	}
89
90	fn batch<'a>(&self, mut batch: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
91		let tx = self.0.clone();
92
93		async move {
94			let (send_back_tx, send_back_rx) = oneshot::channel();
95
96			let raw = serde_json::to_string(&batch)?;
97			let id_range = batch
98				.extensions()
99				.get::<IsBatch>()
100				.map(|b| b.id_range.clone())
101				.expect("Batch ID range must be set in extensions");
102
103			tx.send(FrontToBack::Batch(BatchMessage { raw, ids: id_range, send_back: send_back_tx })).await?;
104			let json = send_back_rx.await??;
105
106			Ok(json)
107		}
108	}
109
110	fn notification<'a>(&self, n: Notification<'a>) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
111		let tx = self.0.clone();
112
113		async move {
114			let raw = serde_json::to_string(&n)?;
115			tx.send(FrontToBack::Notification(raw)).await?;
116			Ok(MiddlewareNotifResponse::from(n.extensions))
117		}
118	}
119}