jsonrpsee_server/middleware/rpc/layer/
rpc_service.rs1use super::ResponseFuture;
30use std::sync::Arc;
31
32use crate::middleware::rpc::RpcServiceT;
33use crate::ConnectionId;
34use futures_util::future::BoxFuture;
35use jsonrpsee_core::server::{
36 BoundedSubscriptions, MethodCallback, MethodResponse, MethodSink, Methods, SubscriptionState,
37};
38use jsonrpsee_core::traits::IdProvider;
39use jsonrpsee_types::error::{reject_too_many_subscriptions, ErrorCode};
40use jsonrpsee_types::{ErrorObject, Request};
41
42#[derive(Clone, Debug)]
44pub struct RpcService {
45 conn_id: ConnectionId,
46 methods: Methods,
47 max_response_body_size: usize,
48 cfg: RpcServiceCfg,
49}
50
51#[derive(Clone, Debug)]
53pub(crate) enum RpcServiceCfg {
54 OnlyCalls,
56 CallsAndSubscriptions {
58 bounded_subscriptions: BoundedSubscriptions,
59 sink: MethodSink,
60 id_provider: Arc<dyn IdProvider>,
61 _pending_calls: tokio::sync::mpsc::Sender<()>,
62 },
63}
64
65impl RpcService {
66 pub(crate) fn new(
68 methods: Methods,
69 max_response_body_size: usize,
70 conn_id: ConnectionId,
71 cfg: RpcServiceCfg,
72 ) -> Self {
73 Self { methods, max_response_body_size, conn_id, cfg }
74 }
75}
76
77impl<'a> RpcServiceT<'a> for RpcService {
78 type Future = ResponseFuture<BoxFuture<'a, MethodResponse>>;
81
82 fn call(&self, req: Request<'a>) -> Self::Future {
83 let conn_id = self.conn_id;
84 let max_response_body_size = self.max_response_body_size;
85
86 let Request { id, method, params, extensions, .. } = req;
87 let params = jsonrpsee_types::Params::new(params.as_ref().map(|p| serde_json::value::RawValue::get(p)));
88
89 match self.methods.method_with_name(&method) {
90 None => {
91 let rp =
92 MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)).with_extensions(extensions);
93 ResponseFuture::ready(rp)
94 }
95 Some((_name, method)) => match method {
96 MethodCallback::Async(callback) => {
97 let params = params.into_owned();
98 let id = id.into_owned();
99
100 let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
101 ResponseFuture::future(fut)
102 }
103 MethodCallback::Sync(callback) => {
104 let rp = (callback)(id, params, max_response_body_size, extensions);
105 ResponseFuture::ready(rp)
106 }
107 MethodCallback::Subscription(callback) => {
108 let RpcServiceCfg::CallsAndSubscriptions {
109 bounded_subscriptions,
110 sink,
111 id_provider,
112 _pending_calls,
113 } = self.cfg.clone()
114 else {
115 tracing::warn!("Subscriptions not supported");
116 let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
117 .with_extensions(extensions);
118 return ResponseFuture::ready(rp);
119 };
120
121 if let Some(p) = bounded_subscriptions.acquire() {
122 let conn_state =
123 SubscriptionState { conn_id, id_provider: &*id_provider.clone(), subscription_permit: p };
124
125 let fut = callback(id.clone(), params, sink, conn_state, extensions);
126 ResponseFuture::future(fut)
127 } else {
128 let max = bounded_subscriptions.max();
129 let rp =
130 MethodResponse::error(id, reject_too_many_subscriptions(max)).with_extensions(extensions);
131 ResponseFuture::ready(rp)
132 }
133 }
134 MethodCallback::Unsubscription(callback) => {
135 let RpcServiceCfg::CallsAndSubscriptions { .. } = self.cfg else {
138 tracing::warn!("Subscriptions not supported");
139 let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
140 .with_extensions(extensions);
141 return ResponseFuture::ready(rp);
142 };
143
144 let rp = callback(id, params, conn_id, max_response_body_size, extensions);
145 ResponseFuture::ready(rp)
146 }
147 },
148 }
149 }
150}