jsonrpsee_server/middleware/
rpc.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27//! JSON-RPC service middleware.
28
29pub use jsonrpsee_core::middleware::*;
30pub use jsonrpsee_core::server::MethodResponse;
31
32use std::sync::Arc;
33
34use crate::ConnectionId;
35use jsonrpsee_core::server::{
36	BatchResponseBuilder, BoundedSubscriptions, MethodCallback, MethodSink, Methods, SubscriptionState,
37};
38use jsonrpsee_core::traits::IdProvider;
39use jsonrpsee_types::ErrorObject;
40use jsonrpsee_types::error::{ErrorCode, reject_too_many_subscriptions};
41
42/// JSON-RPC service middleware.
43#[derive(Clone, Debug)]
44pub struct RpcService {
45	conn_id: ConnectionId,
46	methods: Methods,
47	max_response_body_size: usize,
48	cfg: RpcServiceCfg,
49}
50
51/// Configuration of the RpcService.
52#[derive(Clone, Debug)]
53pub(crate) enum RpcServiceCfg {
54	/// The server supports only calls.
55	OnlyCalls,
56	/// The server supports both method calls and subscriptions.
57	CallsAndSubscriptions {
58		bounded_subscriptions: BoundedSubscriptions,
59		sink: MethodSink,
60		id_provider: Arc<dyn IdProvider>,
61		_pending_calls: tokio::sync::mpsc::Sender<()>,
62	},
63}
64
65impl RpcService {
66	/// Create a new service.
67	pub(crate) fn new(
68		methods: Methods,
69		max_response_body_size: usize,
70		conn_id: ConnectionId,
71		cfg: RpcServiceCfg,
72	) -> Self {
73		Self { methods, max_response_body_size, conn_id, cfg }
74	}
75}
76
77impl RpcServiceT for RpcService {
78	type BatchResponse = MethodResponse;
79	type MethodResponse = MethodResponse;
80	type NotificationResponse = MethodResponse;
81
82	fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
83		let conn_id = self.conn_id;
84		let max_response_body_size = self.max_response_body_size;
85
86		let Request { id, method, params, extensions, .. } = req;
87		let params = jsonrpsee_types::Params::new(params.as_ref().map(|p| serde_json::value::RawValue::get(p)));
88
89		match self.methods.method_with_name(&method) {
90			None => {
91				let rp =
92					MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)).with_extensions(extensions);
93				ResponseFuture::ready(rp)
94			}
95			Some((_name, method)) => match method {
96				MethodCallback::Async(callback) => {
97					let params = params.into_owned();
98					let id = id.into_owned();
99					let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
100
101					ResponseFuture::future(fut)
102				}
103				MethodCallback::Sync(callback) => {
104					let rp = (callback)(id, params, max_response_body_size, extensions);
105					ResponseFuture::ready(rp)
106				}
107				MethodCallback::Subscription(callback) => {
108					let RpcServiceCfg::CallsAndSubscriptions {
109						bounded_subscriptions,
110						sink,
111						id_provider,
112						_pending_calls,
113					} = self.cfg.clone()
114					else {
115						tracing::warn!("Subscriptions not supported");
116						let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
117							.with_extensions(extensions);
118						return ResponseFuture::ready(rp);
119					};
120
121					if let Some(p) = bounded_subscriptions.acquire() {
122						let conn_state =
123							SubscriptionState { conn_id, id_provider: &*id_provider.clone(), subscription_permit: p };
124
125						let fut = (callback)(id.clone(), params, sink, conn_state, extensions);
126						ResponseFuture::future(fut)
127					} else {
128						let max = bounded_subscriptions.max();
129						let rp =
130							MethodResponse::error(id, reject_too_many_subscriptions(max)).with_extensions(extensions);
131						ResponseFuture::ready(rp)
132					}
133				}
134				MethodCallback::Unsubscription(callback) => {
135					// Don't adhere to any resource or subscription limits; always let unsubscribing happen!
136
137					let RpcServiceCfg::CallsAndSubscriptions { .. } = self.cfg else {
138						tracing::warn!("Subscriptions not supported");
139						let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
140							.with_extensions(extensions);
141						return ResponseFuture::ready(rp);
142					};
143
144					let rp = callback(id, params, conn_id, max_response_body_size, extensions);
145					ResponseFuture::ready(rp)
146				}
147			},
148		}
149	}
150
151	fn batch<'a>(&self, batch: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
152		let mut batch_rp = BatchResponseBuilder::new_with_limit(self.max_response_body_size);
153		let service = self.clone();
154		async move {
155			let mut got_notification = false;
156
157			for batch_entry in batch.into_iter() {
158				match batch_entry {
159					Ok(BatchEntry::Call(req)) => {
160						let rp = service.call(req).await;
161						if let Err(err) = batch_rp.append(rp) {
162							return err;
163						}
164					}
165					Ok(BatchEntry::Notification(n)) => {
166						got_notification = true;
167						service.notification(n).await;
168					}
169					Err(err) => {
170						let (err, id) = err.into_parts();
171						let rp = MethodResponse::error(id, err);
172						if let Err(err) = batch_rp.append(rp) {
173							return err;
174						}
175					}
176				}
177			}
178
179			// If the batch is empty and we got a notification, we return an empty response.
180			if batch_rp.is_empty() && got_notification {
181				MethodResponse::notification()
182			}
183			// An empty batch is regarded as an invalid request here.
184			else {
185				MethodResponse::from_batch(batch_rp.finish())
186			}
187		}
188	}
189
190	fn notification<'a>(&self, n: Notification<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
191		// The notification should not be replied to with a response
192		// but we propogate the extensions to the response which can be useful
193		// for example HTTP transport to set the headers.
194		async move { MethodResponse::notification().with_extensions(n.extensions) }
195	}
196}