jsonrpsee_server/middleware/
rpc.rs1pub use jsonrpsee_core::middleware::*;
30pub use jsonrpsee_core::server::MethodResponse;
31
32use std::sync::Arc;
33
34use crate::ConnectionId;
35use jsonrpsee_core::server::{
36 BatchResponseBuilder, BoundedSubscriptions, MethodCallback, MethodSink, Methods, SubscriptionState,
37};
38use jsonrpsee_core::traits::IdProvider;
39use jsonrpsee_types::ErrorObject;
40use jsonrpsee_types::error::{ErrorCode, reject_too_many_subscriptions};
41
42#[derive(Clone, Debug)]
44pub struct RpcService {
45 conn_id: ConnectionId,
46 methods: Methods,
47 max_response_body_size: usize,
48 cfg: RpcServiceCfg,
49}
50
51#[derive(Clone, Debug)]
53pub(crate) enum RpcServiceCfg {
54 OnlyCalls,
56 CallsAndSubscriptions {
58 bounded_subscriptions: BoundedSubscriptions,
59 sink: MethodSink,
60 id_provider: Arc<dyn IdProvider>,
61 _pending_calls: tokio::sync::mpsc::Sender<()>,
62 },
63}
64
65impl RpcService {
66 pub(crate) fn new(
68 methods: Methods,
69 max_response_body_size: usize,
70 conn_id: ConnectionId,
71 cfg: RpcServiceCfg,
72 ) -> Self {
73 Self { methods, max_response_body_size, conn_id, cfg }
74 }
75}
76
77impl RpcServiceT for RpcService {
78 type BatchResponse = MethodResponse;
79 type MethodResponse = MethodResponse;
80 type NotificationResponse = MethodResponse;
81
82 fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
83 let conn_id = self.conn_id;
84 let max_response_body_size = self.max_response_body_size;
85
86 let Request { id, method, params, extensions, .. } = req;
87 let params = jsonrpsee_types::Params::new(params.as_ref().map(|p| serde_json::value::RawValue::get(p)));
88
89 match self.methods.method_with_name(&method) {
90 None => {
91 let rp =
92 MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)).with_extensions(extensions);
93 ResponseFuture::ready(rp)
94 }
95 Some((_name, method)) => match method {
96 MethodCallback::Async(callback) => {
97 let params = params.into_owned();
98 let id = id.into_owned();
99 let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
100
101 ResponseFuture::future(fut)
102 }
103 MethodCallback::Sync(callback) => {
104 let rp = (callback)(id, params, max_response_body_size, extensions);
105 ResponseFuture::ready(rp)
106 }
107 MethodCallback::Subscription(callback) => {
108 let RpcServiceCfg::CallsAndSubscriptions {
109 bounded_subscriptions,
110 sink,
111 id_provider,
112 _pending_calls,
113 } = self.cfg.clone()
114 else {
115 tracing::warn!("Subscriptions not supported");
116 let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
117 .with_extensions(extensions);
118 return ResponseFuture::ready(rp);
119 };
120
121 if let Some(p) = bounded_subscriptions.acquire() {
122 let conn_state =
123 SubscriptionState { conn_id, id_provider: &*id_provider.clone(), subscription_permit: p };
124
125 let fut = (callback)(id.clone(), params, sink, conn_state, extensions);
126 ResponseFuture::future(fut)
127 } else {
128 let max = bounded_subscriptions.max();
129 let rp =
130 MethodResponse::error(id, reject_too_many_subscriptions(max)).with_extensions(extensions);
131 ResponseFuture::ready(rp)
132 }
133 }
134 MethodCallback::Unsubscription(callback) => {
135 let RpcServiceCfg::CallsAndSubscriptions { .. } = self.cfg else {
138 tracing::warn!("Subscriptions not supported");
139 let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
140 .with_extensions(extensions);
141 return ResponseFuture::ready(rp);
142 };
143
144 let rp = callback(id, params, conn_id, max_response_body_size, extensions);
145 ResponseFuture::ready(rp)
146 }
147 },
148 }
149 }
150
151 fn batch<'a>(&self, batch: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
152 let mut batch_rp = BatchResponseBuilder::new_with_limit(self.max_response_body_size);
153 let service = self.clone();
154 async move {
155 let mut got_notification = false;
156
157 for batch_entry in batch.into_iter() {
158 match batch_entry {
159 Ok(BatchEntry::Call(req)) => {
160 let rp = service.call(req).await;
161 if let Err(err) = batch_rp.append(rp) {
162 return err;
163 }
164 }
165 Ok(BatchEntry::Notification(n)) => {
166 got_notification = true;
167 service.notification(n).await;
168 }
169 Err(err) => {
170 let (err, id) = err.into_parts();
171 let rp = MethodResponse::error(id, err);
172 if let Err(err) = batch_rp.append(rp) {
173 return err;
174 }
175 }
176 }
177 }
178
179 if batch_rp.is_empty() && got_notification {
181 MethodResponse::notification()
182 }
183 else {
185 MethodResponse::from_batch(batch_rp.finish())
186 }
187 }
188 }
189
190 fn notification<'a>(&self, n: Notification<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
191 async move { MethodResponse::notification().with_extensions(n.extensions) }
195 }
196}