jsonrpsee_core/client/async_client/
rpc_service.rs1use crate::{
2 client::{
3 BatchMessage, Error, FrontToBack, MiddlewareBatchResponse, MiddlewareMethodResponse, MiddlewareNotifResponse,
4 RequestMessage, SubscriptionMessage, SubscriptionResponse,
5 },
6 middleware::{Batch, IsBatch, IsSubscription, Notification, Request, RpcServiceT},
7};
8
9use jsonrpsee_types::{Response, ResponsePayload};
10use tokio::sync::{mpsc, oneshot};
11
12impl From<mpsc::error::SendError<FrontToBack>> for Error {
13 fn from(_: mpsc::error::SendError<FrontToBack>) -> Self {
14 Error::ServiceDisconnect
15 }
16}
17
18impl From<oneshot::error::RecvError> for Error {
19 fn from(_: oneshot::error::RecvError) -> Self {
20 Error::ServiceDisconnect
21 }
22}
23
24#[derive(Debug, Clone)]
26pub struct RpcService(mpsc::Sender<FrontToBack>);
27
28impl RpcService {
29 #[allow(private_interfaces)]
32 pub(crate) fn new(tx: mpsc::Sender<FrontToBack>) -> Self {
33 Self(tx)
34 }
35}
36
37impl RpcServiceT for RpcService {
38 type MethodResponse = Result<MiddlewareMethodResponse, Error>;
39 type BatchResponse = Result<MiddlewareBatchResponse, Error>;
40 type NotificationResponse = Result<MiddlewareNotifResponse, Error>;
41
42 fn call<'a>(&self, request: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
43 let tx = self.0.clone();
44
45 async move {
46 let raw = serde_json::to_string(&request)?;
47
48 match request.extensions.get::<IsSubscription>() {
49 Some(sub) => {
50 let (send_back_tx, send_back_rx) = tokio::sync::oneshot::channel();
51
52 tx.clone()
53 .send(FrontToBack::Subscribe(SubscriptionMessage {
54 raw,
55 subscribe_id: sub.sub_req_id(),
56 unsubscribe_id: sub.unsub_req_id(),
57 unsubscribe_method: sub.unsubscribe_method().to_owned(),
58 send_back: send_back_tx,
59 }))
60 .await?;
61
62 let (subscribe_rx, sub_id) = send_back_rx.await??;
63
64 let rp = serde_json::value::to_raw_value(&sub_id)?;
65
66 Ok(MiddlewareMethodResponse::subscription_response(
67 Response::new(ResponsePayload::success(rp), request.id.clone().into_owned()).into(),
68 SubscriptionResponse { sub_id, stream: subscribe_rx },
69 ))
70 }
71 None => {
72 let (send_back_tx, send_back_rx) = oneshot::channel();
73
74 tx.send(FrontToBack::Request(RequestMessage {
75 raw,
76 send_back: Some(send_back_tx),
77 id: request.id.clone().into_owned(),
78 }))
79 .await?;
80 let mut rp = send_back_rx.await??;
81
82 rp.0.extensions = request.extensions;
83
84 Ok(MiddlewareMethodResponse::response(rp))
85 }
86 }
87 }
88 }
89
90 fn batch<'a>(&self, mut batch: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
91 let tx = self.0.clone();
92
93 async move {
94 let (send_back_tx, send_back_rx) = oneshot::channel();
95
96 let raw = serde_json::to_string(&batch)?;
97 let id_range = batch
98 .extensions()
99 .get::<IsBatch>()
100 .map(|b| b.id_range.clone())
101 .expect("Batch ID range must be set in extensions");
102
103 tx.send(FrontToBack::Batch(BatchMessage { raw, ids: id_range, send_back: send_back_tx })).await?;
104 let json = send_back_rx.await??;
105
106 Ok(json)
107 }
108 }
109
110 fn notification<'a>(&self, n: Notification<'a>) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
111 let tx = self.0.clone();
112
113 async move {
114 let raw = serde_json::to_string(&n)?;
115 tx.send(FrontToBack::Notification(raw)).await?;
116 Ok(MiddlewareNotifResponse::from(n.extensions))
117 }
118 }
119}