jsonrpsee_core/server/
rpc_module.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27use std::collections::hash_map::Entry;
28use std::fmt::{self, Debug};
29use std::future::Future;
30use std::ops::{Deref, DerefMut};
31use std::sync::Arc;
32
33use crate::error::RegisterMethodError;
34use crate::id_providers::RandomIntegerIdProvider;
35use crate::server::helpers::MethodSink;
36use crate::server::subscription::{
37	BoundedSubscriptions, IntoSubscriptionCloseResponse, PendingSubscriptionSink, Subscribers, Subscription,
38	SubscriptionCloseResponse, SubscriptionKey, SubscriptionPermit, SubscriptionState, sub_message_to_json,
39};
40use crate::server::{LOG_TARGET, MethodResponse, ResponsePayload};
41use crate::traits::ToRpcParams;
42use futures_util::{FutureExt, future::BoxFuture};
43use http::Extensions;
44use jsonrpsee_types::error::{ErrorCode, ErrorObject};
45use jsonrpsee_types::{
46	ErrorObjectOwned, Id, Params, Request, Response, ResponseSuccess, SubscriptionId as RpcSubscriptionId,
47};
48use rustc_hash::FxHashMap;
49use serde::de::DeserializeOwned;
50use serde_json::value::RawValue;
51use tokio::sync::{mpsc, oneshot};
52
53use super::{IntoResponse, sub_err_to_json};
54
55/// A `MethodCallback` is an RPC endpoint, callable with a standard JSON-RPC request,
56/// implemented as a function pointer to a `Fn` function taking four arguments:
57/// the `id`, `params`, a channel the function uses to communicate the result (or error)
58/// back to `jsonrpsee`, and the connection ID (useful for the websocket transport).
59pub type SyncMethod = Arc<dyn Send + Sync + Fn(Id, Params, MaxResponseSize, Extensions) -> MethodResponse>;
60/// Similar to [`SyncMethod`], but represents an asynchronous handler.
61pub type AsyncMethod<'a> = Arc<
62	dyn Send
63		+ Sync
64		+ Fn(Id<'a>, Params<'a>, ConnectionId, MaxResponseSize, Extensions) -> BoxFuture<'a, MethodResponse>,
65>;
66
67/// Method callback for subscriptions.
68pub type SubscriptionMethod<'a> =
69	Arc<dyn Send + Sync + Fn(Id, Params, MethodSink, SubscriptionState, Extensions) -> BoxFuture<'a, MethodResponse>>;
70// Method callback to unsubscribe.
71type UnsubscriptionMethod =
72	Arc<dyn Send + Sync + Fn(Id, Params, ConnectionId, MaxResponseSize, Extensions) -> MethodResponse>;
73
74/// Connection ID.
75#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Default, serde::Deserialize, serde::Serialize)]
76pub struct ConnectionId(pub usize);
77
78impl From<u32> for ConnectionId {
79	fn from(id: u32) -> Self {
80		Self(id as usize)
81	}
82}
83
84impl From<usize> for ConnectionId {
85	fn from(id: usize) -> Self {
86		Self(id)
87	}
88}
89
90/// Max response size.
91pub type MaxResponseSize = usize;
92
93/// Raw response from an RPC
94/// A tuple containing:
95///   - Call result as a `String`,
96///   - a [`mpsc::UnboundedReceiver<String>`] to receive future subscription results
97pub type RawRpcResponse = (Box<RawValue>, mpsc::Receiver<Box<RawValue>>);
98
99/// The error that can occur when [`Methods::call`] or [`Methods::subscribe`] is invoked.
100#[derive(thiserror::Error, Debug)]
101pub enum MethodsError {
102	/// Failed to parse the call as valid JSON-RPC.
103	#[error(transparent)]
104	Parse(#[from] serde_json::Error),
105	/// Specific JSON-RPC error.
106	#[error(transparent)]
107	JsonRpc(#[from] ErrorObjectOwned),
108	#[error("Invalid subscription ID: `{0}`")]
109	/// Invalid subscription ID.
110	InvalidSubscriptionId(String),
111}
112
113/// This represent a response to a RPC call
114/// and `Subscribe` calls are handled differently
115/// because we want to prevent subscriptions to start
116/// before the actual subscription call has been answered.
117#[derive(Debug)]
118pub enum CallOrSubscription {
119	/// The subscription callback itself sends back the result
120	/// so it must not be sent back again.
121	Subscription(MethodResponse),
122	/// Treat it as ordinary call.
123	Call(MethodResponse),
124}
125
126impl CallOrSubscription {
127	/// Extract the JSON-RPC response.
128	pub fn as_response(&self) -> &MethodResponse {
129		match &self {
130			Self::Subscription(r) => r,
131			Self::Call(r) => r,
132		}
133	}
134
135	/// Extract the JSON-RPC response.
136	pub fn into_response(self) -> MethodResponse {
137		match self {
138			Self::Subscription(r) => r,
139			Self::Call(r) => r,
140		}
141	}
142}
143
144/// Callback wrapper that can be either sync or async.
145#[derive(Clone)]
146pub enum MethodCallback {
147	/// Synchronous method handler.
148	Sync(SyncMethod),
149	/// Asynchronous method handler.
150	Async(AsyncMethod<'static>),
151	/// Subscription method handler.
152	Subscription(SubscriptionMethod<'static>),
153	/// Unsubscription method handler.
154	Unsubscription(UnsubscriptionMethod),
155}
156
157/// The kind of the JSON-RPC method call, it can be a subscription, method call or unknown.
158#[derive(Debug, Copy, Clone)]
159pub enum MethodKind {
160	/// Subscription Call.
161	Subscription,
162	/// Unsubscription Call.
163	Unsubscription,
164	/// Method call.
165	MethodCall,
166	/// The method was not found.
167	NotFound,
168}
169
170impl std::fmt::Display for MethodKind {
171	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172		let s = match self {
173			Self::Subscription => "subscription",
174			Self::MethodCall => "method call",
175			Self::NotFound => "method not found",
176			Self::Unsubscription => "unsubscription",
177		};
178
179		write!(f, "{s}")
180	}
181}
182
183/// Result of a method, either direct value or a future of one.
184pub enum MethodResult<T> {
185	/// Result by value
186	Sync(T),
187	/// Future of a value
188	Async(BoxFuture<'static, T>),
189}
190
191impl<T: Debug> Debug for MethodResult<T> {
192	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
193		match self {
194			MethodResult::Sync(result) => result.fmt(f),
195			MethodResult::Async(_) => f.write_str("<future>"),
196		}
197	}
198}
199
200impl Debug for MethodCallback {
201	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202		match self {
203			Self::Async(_) => write!(f, "Async"),
204			Self::Sync(_) => write!(f, "Sync"),
205			Self::Subscription(_) => write!(f, "Subscription"),
206			Self::Unsubscription(_) => write!(f, "Unsubscription"),
207		}
208	}
209}
210
211/// Reference-counted, clone-on-write collection of synchronous and asynchronous methods.
212#[derive(Default, Debug, Clone)]
213pub struct Methods {
214	callbacks: Arc<FxHashMap<&'static str, MethodCallback>>,
215	extensions: Extensions,
216}
217
218impl Methods {
219	/// Creates a new empty [`Methods`].
220	pub fn new() -> Self {
221		Self::default()
222	}
223
224	/// Verifies that the method name is not already taken, and returns an error if it is.
225	pub fn verify_method_name(&mut self, name: &'static str) -> Result<(), RegisterMethodError> {
226		if self.callbacks.contains_key(name) {
227			return Err(RegisterMethodError::AlreadyRegistered(name.into()));
228		}
229
230		Ok(())
231	}
232
233	/// Inserts the method callback for a given name, or returns an error if the name was already taken.
234	/// On success it returns a mut reference to the [`MethodCallback`] just inserted.
235	pub fn verify_and_insert(
236		&mut self,
237		name: &'static str,
238		callback: MethodCallback,
239	) -> Result<&mut MethodCallback, RegisterMethodError> {
240		match self.mut_callbacks().entry(name) {
241			Entry::Occupied(_) => Err(RegisterMethodError::AlreadyRegistered(name.into())),
242			Entry::Vacant(vacant) => Ok(vacant.insert(callback)),
243		}
244	}
245
246	/// Helper for obtaining a mut ref to the callbacks HashMap.
247	fn mut_callbacks(&mut self) -> &mut FxHashMap<&'static str, MethodCallback> {
248		Arc::make_mut(&mut self.callbacks)
249	}
250
251	/// Merge two [`Methods`]'s by adding all [`MethodCallback`]s from `other` into `self`.
252	/// Fails if any of the methods in `other` is present already.
253	pub fn merge(&mut self, other: impl Into<Methods>) -> Result<(), RegisterMethodError> {
254		let mut other = other.into();
255
256		for name in other.callbacks.keys() {
257			self.verify_method_name(name)?;
258		}
259
260		let callbacks = self.mut_callbacks();
261
262		for (name, callback) in other.mut_callbacks().drain() {
263			callbacks.insert(name, callback);
264		}
265
266		Ok(())
267	}
268
269	/// Returns the method callback.
270	pub fn method(&self, method_name: &str) -> Option<&MethodCallback> {
271		self.callbacks.get(method_name)
272	}
273
274	/// Returns the method callback along with its name. The returned name is same as the
275	/// `method_name`, but its lifetime bound is `'static`.
276	pub fn method_with_name(&self, method_name: &str) -> Option<(&'static str, &MethodCallback)> {
277		self.callbacks.get_key_value(method_name).map(|(k, v)| (*k, v))
278	}
279
280	/// Helper to call a method on the `RPC module` without having to spin up a server.
281	///
282	/// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation.
283	///
284	/// Returns the decoded value of the `result field` in JSON-RPC response if successful.
285	///
286	/// # Examples
287	///
288	/// ```
289	/// #[tokio::main]
290	/// async fn main() {
291	///     use jsonrpsee::{RpcModule, IntoResponse};
292	///     use jsonrpsee::core::RpcResult;
293	///
294	///     let mut module = RpcModule::new(());
295	///     module.register_method::<RpcResult<u64>, _>("echo_call", |params, _, _| {
296	///         params.one::<u64>().map_err(Into::into)
297	///     }).unwrap();
298	///
299	///     let echo: u64 = module.call("echo_call", [1_u64]).await.unwrap();
300	///     assert_eq!(echo, 1);
301	/// }
302	/// ```
303	pub async fn call<Params: ToRpcParams, T: DeserializeOwned + Clone>(
304		&self,
305		method: &str,
306		params: Params,
307	) -> Result<T, MethodsError> {
308		let params = params.to_rpc_params()?;
309		let req = Request::borrowed(method, params.as_ref().map(|p| p.as_ref()), Id::Number(0));
310		tracing::trace!(target: LOG_TARGET, "[Methods::call] Method: {:?}, params: {:?}", method, params);
311		let (rp, _) = self.inner_call(req, 1, mock_subscription_permit()).await;
312
313		let rp = serde_json::from_str::<Response<T>>(rp.get())?;
314		ResponseSuccess::try_from(rp).map(|s| s.result).map_err(|e| MethodsError::JsonRpc(e.into_owned()))
315	}
316
317	/// Make a request (JSON-RPC method call or subscription) by using raw JSON.
318	///
319	/// Returns the raw JSON response to the call and a stream to receive notifications if the call was a subscription.
320	///
321	/// # Examples
322	///
323	/// ```
324	/// #[tokio::main]
325	/// async fn main() {
326	///     use jsonrpsee::{RpcModule, SubscriptionMessage};
327	///     use jsonrpsee::types::{response::Success, Response};
328	///     use jsonrpsee::core::to_json_raw_value;
329	///     use futures_util::StreamExt;
330	///
331	///     let mut module = RpcModule::new(());
332	///     module.register_subscription("hi", "hi", "goodbye", |_, pending, _, _| async {
333	///         let sink = pending.accept().await?;
334	///
335	///         // see comment above.
336	///         let msg = to_json_raw_value(&"one answer").unwrap();
337	///         sink.send(msg).await?;
338	///
339	///         Ok(())
340	///     }).unwrap();
341	///     let (resp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"hi","id":0}"#, 1).await.unwrap();
342	///     // If the response is an error converting it to `Success` will fail.
343	///     let resp: Success<u64> = serde_json::from_str::<Response<u64>>(resp.get()).unwrap().try_into().unwrap();
344	///     let sub_resp = stream.recv().await.unwrap();
345	///     assert_eq!(
346	///         format!(r#"{{"jsonrpc":"2.0","method":"hi","params":{{"subscription":{},"result":"one answer"}}}}"#, resp.result),
347	///         sub_resp.get()
348	///     );
349	/// }
350	/// ```
351	pub async fn raw_json_request(
352		&self,
353		request: &str,
354		buf_size: usize,
355	) -> Result<(Box<RawValue>, mpsc::Receiver<Box<RawValue>>), serde_json::Error> {
356		tracing::trace!("[Methods::raw_json_request] Request: {:?}", request);
357		let req: Request = serde_json::from_str(request)?;
358		let (resp, rx) = self.inner_call(req, buf_size, mock_subscription_permit()).await;
359
360		Ok((resp, rx))
361	}
362
363	/// Execute a callback.
364	async fn inner_call(
365		&self,
366		req: Request<'_>,
367		buf_size: usize,
368		subscription_permit: SubscriptionPermit,
369	) -> RawRpcResponse {
370		let (tx, mut rx) = mpsc::channel(buf_size);
371		// The extensions is always empty when calling the method directly because decoding an JSON-RPC
372		// request doesn't have any extensions.
373		let Request { id, method, params, .. } = req;
374		let params = Params::new(params.as_ref().map(|params| params.as_ref().get()));
375		let max_response_size = usize::MAX;
376		let conn_id = ConnectionId(0);
377		let mut ext = self.extensions.clone();
378		ext.insert(conn_id);
379
380		let response = match self.method(&method) {
381			None => MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)),
382			Some(MethodCallback::Sync(cb)) => (cb)(id, params, max_response_size, ext),
383			Some(MethodCallback::Async(cb)) => {
384				(cb)(id.into_owned(), params.into_owned(), conn_id, max_response_size, ext).await
385			}
386			Some(MethodCallback::Subscription(cb)) => {
387				let conn_state =
388					SubscriptionState { conn_id, id_provider: &RandomIntegerIdProvider, subscription_permit };
389				let res = (cb)(id, params, MethodSink::new(tx.clone()), conn_state, ext).await;
390
391				// This message is not used because it's used for metrics so we discard in other to
392				// not read once this is used for subscriptions.
393				//
394				// The same information is part of `res` above.
395				let _ = rx.recv().await.expect("Every call must at least produce one response; qed");
396
397				res
398			}
399			Some(MethodCallback::Unsubscription(cb)) => (cb)(id, params, conn_id, max_response_size, ext),
400		};
401
402		let is_success = response.is_success();
403		let (rp, notif, _) = response.into_parts();
404
405		if let Some(n) = notif {
406			n.notify(is_success);
407		}
408
409		tracing::trace!(target: LOG_TARGET, "[Methods::inner_call] Method: {}, response: {}", method, rp);
410
411		(rp, rx)
412	}
413
414	/// Helper to create a subscription on the `RPC module` without having to spin up a server.
415	///
416	/// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation.
417	///
418	/// Returns [`Subscription`] on success which can used to get results from the subscription.
419	///
420	/// # Examples
421	///
422	/// ```
423	/// #[tokio::main]
424	/// async fn main() {
425	///     use jsonrpsee::{RpcModule, SubscriptionMessage};
426	///     use jsonrpsee::core::{EmptyServerParams, RpcResult, to_json_raw_value};
427	///
428	///     let mut module = RpcModule::new(());
429	///     module.register_subscription("hi", "hi", "goodbye", |_, pending, _, _| async move {
430	///         let sink = pending.accept().await?;
431	///         let msg = to_json_raw_value(&"one answer").unwrap();
432	///         sink.send(msg).await?;
433	///         Ok(())
434	///     }).unwrap();
435	///
436	///     let mut sub = module.subscribe_unbounded("hi", EmptyServerParams::new()).await.unwrap();
437	///     // In this case we ignore the subscription ID,
438	///     let (sub_resp, _sub_id) = sub.next::<String>().await.unwrap().unwrap();
439	///     assert_eq!(&sub_resp, "one answer");
440	/// }
441	/// ```
442	pub async fn subscribe_unbounded(
443		&self,
444		sub_method: &str,
445		params: impl ToRpcParams,
446	) -> Result<Subscription, MethodsError> {
447		self.subscribe(sub_method, params, u32::MAX as usize).await
448	}
449
450	/// Similar to [`Methods::subscribe_unbounded`] but it's using a bounded channel and the buffer capacity must be
451	/// provided.
452	///
453	pub async fn subscribe(
454		&self,
455		sub_method: &str,
456		params: impl ToRpcParams,
457		buf_size: usize,
458	) -> Result<Subscription, MethodsError> {
459		let params = params.to_rpc_params()?;
460		let req = Request::borrowed(sub_method, params.as_ref().map(|p| p.as_ref()), Id::Number(0));
461
462		tracing::trace!(target: LOG_TARGET, "[Methods::subscribe] Method: {}, params: {:?}", sub_method, params);
463
464		let (resp, rx) = self.inner_call(req, buf_size, mock_subscription_permit()).await;
465		let as_success: ResponseSuccess<&RawValue> = serde_json::from_str::<Response<_>>(resp.get())?.try_into()?;
466		let sub_id: RpcSubscriptionId = serde_json::from_str(as_success.result.get())?;
467
468		Ok(Subscription { sub_id: sub_id.into_owned(), rx })
469	}
470
471	/// Returns an `Iterator` with all the method names registered on this server.
472	pub fn method_names(&self) -> impl Iterator<Item = &'static str> + '_ {
473		self.callbacks.keys().copied()
474	}
475
476	/// Similar to [`Methods::extensions_mut`] but it's immutable.
477	pub fn extensions(&mut self) -> &Extensions {
478		&self.extensions
479	}
480
481	/// Get a mutable reference to the extensions to add or remove data from
482	/// the extensions.
483	///
484	/// This only affects direct calls to the methods and subscriptions
485	/// and can be used for example to unit test the API without a server.
486	///
487	/// # Examples
488	///
489	/// ```
490	/// #[tokio::main]
491	/// async fn main() {
492	///     use jsonrpsee::{RpcModule, IntoResponse, Extensions};
493	///     use jsonrpsee::core::RpcResult;
494	///
495	///     let mut module = RpcModule::new(());
496	///     module.register_method::<RpcResult<u64>, _>("magic_multiply", |params, _, ext| {
497	///         let magic = ext.get::<u64>().copied().unwrap();
498	///         let val = params.one::<u64>()?;
499	///         Ok(val * magic)
500	///     }).unwrap();
501	///
502	///     // inject arbitrary data into the extensions.
503	///     module.extensions_mut().insert(33_u64);
504	///
505	///     let magic: u64 = module.call("magic_multiply", [1_u64]).await.unwrap();
506	///     assert_eq!(magic, 33);
507	/// }
508	/// ```
509	pub fn extensions_mut(&mut self) -> &mut Extensions {
510		&mut self.extensions
511	}
512}
513
514impl<Context> Deref for RpcModule<Context> {
515	type Target = Methods;
516
517	fn deref(&self) -> &Methods {
518		&self.methods
519	}
520}
521
522impl<Context> DerefMut for RpcModule<Context> {
523	fn deref_mut(&mut self) -> &mut Methods {
524		&mut self.methods
525	}
526}
527
528/// Sets of JSON-RPC methods can be organized into "module"s that are in turn registered on the server or,
529/// alternatively, merged with other modules to construct a cohesive API. [`RpcModule`] wraps an additional context
530/// argument that can be used to access data during call execution.
531#[derive(Debug, Clone)]
532pub struct RpcModule<Context> {
533	ctx: Arc<Context>,
534	methods: Methods,
535}
536
537impl<Context> RpcModule<Context> {
538	/// Create a new module with a given shared `Context`.
539	pub fn new(ctx: Context) -> Self {
540		Self::from_arc(Arc::new(ctx))
541	}
542
543	/// Create a new module from an already shared `Context`.
544	///
545	/// This is useful if `Context` needs to be shared outside of an [`RpcModule`].
546	pub fn from_arc(ctx: Arc<Context>) -> Self {
547		Self { ctx, methods: Default::default() }
548	}
549
550	/// Transform a module into an `RpcModule<()>` (unit context).
551	pub fn remove_context(self) -> RpcModule<()> {
552		let mut module = RpcModule::new(());
553		module.methods = self.methods;
554		module
555	}
556}
557
558impl<Context> From<RpcModule<Context>> for Methods {
559	fn from(module: RpcModule<Context>) -> Methods {
560		module.methods
561	}
562}
563
564impl<Context: Send + Sync + 'static> RpcModule<Context> {
565	/// Register a new synchronous RPC method, which computes the response with the given callback.
566	///
567	/// ## Examples
568	///
569	/// ```
570	/// use jsonrpsee_core::server::RpcModule;
571	///
572	/// let mut module = RpcModule::new(());
573	/// module.register_method("say_hello", |_params, _ctx, _| "lo").unwrap();
574	/// ```
575	pub fn register_method<R, F>(
576		&mut self,
577		method_name: &'static str,
578		callback: F,
579	) -> Result<&mut MethodCallback, RegisterMethodError>
580	where
581		Context: Send + Sync + 'static,
582		R: IntoResponse + 'static,
583		F: Fn(Params, &Context, &Extensions) -> R + Send + Sync + 'static,
584	{
585		let ctx = self.ctx.clone();
586		self.methods.verify_and_insert(
587			method_name,
588			MethodCallback::Sync(Arc::new(move |id, params, max_response_size, extensions| {
589				let rp = callback(params, &*ctx, &extensions).into_response();
590				MethodResponse::response(id, rp, max_response_size).with_extensions(extensions)
591			})),
592		)
593	}
594
595	/// Removes the method if it exists.
596	///
597	/// Be aware that a subscription consist of two methods, `subscribe` and `unsubscribe` and
598	/// it's the caller responsibility to remove both `subscribe` and `unsubscribe` methods for subscriptions.
599	pub fn remove_method(&mut self, method_name: &'static str) -> Option<MethodCallback> {
600		self.methods.mut_callbacks().remove(method_name)
601	}
602
603	/// Register a new asynchronous RPC method, which computes the response with the given callback.
604	///
605	/// ## Examples
606	///
607	/// ```
608	/// use jsonrpsee_core::server::RpcModule;
609	///
610	/// let mut module = RpcModule::new(());
611	/// module.register_async_method("say_hello", |_params, _ctx, _| async { "lo" }).unwrap();
612	///
613	/// ```
614	///
615	pub fn register_async_method<R, Fun, Fut>(
616		&mut self,
617		method_name: &'static str,
618		callback: Fun,
619	) -> Result<&mut MethodCallback, RegisterMethodError>
620	where
621		R: IntoResponse + 'static,
622		Fut: Future<Output = R> + Send,
623		Fun: (Fn(Params<'static>, Arc<Context>, Extensions) -> Fut) + Clone + Send + Sync + 'static,
624	{
625		let ctx = self.ctx.clone();
626		self.methods.verify_and_insert(
627			method_name,
628			MethodCallback::Async(Arc::new(move |id, params, _, max_response_size, extensions| {
629				let ctx = ctx.clone();
630				let callback = callback.clone();
631
632				// NOTE: the extensions can't be mutated at this point so
633				// it's safe to clone it.
634				let future = async move {
635					let rp = callback(params, ctx, extensions.clone()).await.into_response();
636					MethodResponse::response(id, rp, max_response_size).with_extensions(extensions)
637				};
638				future.boxed()
639			})),
640		)
641	}
642
643	/// Register a new **blocking** synchronous RPC method, which computes the response with the given callback.
644	/// Unlike the regular [`register_method`](RpcModule::register_method), this method can block its thread and perform
645	/// expensive computations.
646	pub fn register_blocking_method<R, F>(
647		&mut self,
648		method_name: &'static str,
649		callback: F,
650	) -> Result<&mut MethodCallback, RegisterMethodError>
651	where
652		Context: Send + Sync + 'static,
653		R: IntoResponse + 'static,
654		F: Fn(Params, Arc<Context>, Extensions) -> R + Clone + Send + Sync + 'static,
655	{
656		let ctx = self.ctx.clone();
657		let callback = self.methods.verify_and_insert(
658			method_name,
659			MethodCallback::Async(Arc::new(move |id, params, _, max_response_size, extensions| {
660				let ctx = ctx.clone();
661				let callback = callback.clone();
662
663				// NOTE: the extensions can't be mutated at this point so
664				// it's safe to clone it.
665				let extensions2 = extensions.clone();
666
667				tokio::task::spawn_blocking(move || {
668					let rp = callback(params, ctx, extensions2.clone()).into_response();
669					MethodResponse::response(id, rp, max_response_size).with_extensions(extensions2)
670				})
671				.map(|result| match result {
672					Ok(r) => r,
673					Err(err) => {
674						tracing::error!(target: LOG_TARGET, "Join error for blocking RPC method: {:?}", err);
675						MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError))
676							.with_extensions(extensions)
677					}
678				})
679				.boxed()
680			})),
681		)?;
682
683		Ok(callback)
684	}
685
686	/// Register a new publish/subscribe interface using JSON-RPC notifications.
687	///
688	/// It implements the [ethereum pubsub specification](https://geth.ethereum.org/docs/rpc/pubsub)
689	/// with an option to choose custom subscription ID generation.
690	///
691	/// Furthermore, it generates the `unsubscribe implementation` where a `bool` is used as
692	/// the result to indicate whether the subscription was successfully unsubscribed to or not.
693	/// For instance an `unsubscribe call` may fail if a non-existent subscription ID is used in the call.
694	///
695	/// This method ensures that the `subscription_method_name` and `unsubscription_method_name` are unique.
696	/// The `notif_method_name` argument sets the content of the `method` field in the JSON document that
697	/// the server sends back to the client. The uniqueness of this value is not machine checked and it's up to
698	/// the user to ensure it is not used in any other [`RpcModule`] used in the server.
699	///
700	/// # Arguments
701	///
702	/// * `subscription_method_name` - name of the method to call to initiate a subscription
703	/// * `notif_method_name` - name of method to be used in the subscription payload (technically a JSON-RPC
704	///   notification)
705	/// * `unsubscription_method` - name of the method to call to terminate a subscription
706	/// * `callback` - A callback to invoke on each subscription; it takes three parameters:
707	///     - [`Params`]: JSON-RPC parameters in the subscription call.
708	///     - [`PendingSubscriptionSink`]: A pending subscription waiting to be accepted, in order to send out messages
709	///       on the subscription
710	///     - Context: Any type that can be embedded into the [`RpcModule`].
711	///
712	/// # Returns
713	///
714	/// An async block which returns something that implements [`crate::server::IntoSubscriptionCloseResponse`] which
715	/// decides what action to take when the subscription ends whether such as to sent out another message
716	/// on the subscription stream before closing down it.
717	///
718	/// NOTE: The return value is ignored if [`PendingSubscriptionSink`] hasn't been called or is unsuccessful, as the subscription
719	/// is not allowed to send out subscription notifications before the actual subscription has been established.
720	///
721	/// This is implemented for `Result<T, E>` and `()`.
722	///
723	/// It's recommended to use `Result` if you want to propagate the error as special error notification
724	/// Another option is to implement [`crate::server::IntoSubscriptionCloseResponse`] if you want customized behaviour.
725	///
726	/// The error notification has the following format:
727	///
728	/// ```json
729	/// {
730	///  "jsonrpc": "2.0",
731	///  "method": "<method>",
732	///  "params": {
733	///    "subscription": "<subscriptionID>",
734	///    "error": <your msg>
735	///    }
736	///  }
737	/// }
738	/// ```
739	///
740	/// # Examples
741	///
742	/// ```no_run
743	///
744	/// use jsonrpsee_core::server::{RpcModule, SubscriptionSink, SubscriptionMessage};
745	/// use jsonrpsee_types::ErrorObjectOwned;
746	///
747	/// let mut ctx = RpcModule::new(99_usize);
748	/// ctx.register_subscription("sub", "notif_name", "unsub", |params, pending, ctx, _| async move {
749	///
750	///     let x = match params.one::<usize>() {
751	///         Ok(x) => x,
752	///         Err(e) => {
753	///            pending.reject(ErrorObjectOwned::from(e)).await;
754	///            // If the subscription has not been "accepted" then
755	///            // the return value will be "ignored" as it's not
756	///            // allowed to send out any further notifications on
757	///            // on the subscription.
758	///            return Ok(());
759	///         }
760	///     };
761	///
762	///     // Mark the subscription is accepted after the params has been parsed successful.
763	///     // This is actually responds the underlying RPC method call and may fail if the
764	///     // connection is closed.
765	///     let sink = pending.accept().await?;
766	///     let sum = x + (*ctx);
767	///
768	///     // This will send out an error notification if it fails.
769	///     //
770	///     // If you need some other behavior implement or custom format of the error field
771	///     // you need to manually handle that.
772	///     let msg = serde_json::value::to_raw_value(&sum).unwrap();
773	///
774	///     // This fails only if the connection is closed
775	///     sink.send(msg).await?;
776	///
777	///     Ok(())
778	/// });
779	/// ```
780	pub fn register_subscription<R, F, Fut>(
781		&mut self,
782		subscribe_method_name: &'static str,
783		notif_method_name: &'static str,
784		unsubscribe_method_name: &'static str,
785		callback: F,
786	) -> Result<&mut MethodCallback, RegisterMethodError>
787	where
788		Context: Send + Sync + 'static,
789		F: (Fn(Params<'static>, PendingSubscriptionSink, Arc<Context>, Extensions) -> Fut)
790			+ Send
791			+ Sync
792			+ Clone
793			+ 'static,
794		Fut: Future<Output = R> + Send + 'static,
795		R: IntoSubscriptionCloseResponse + Send,
796	{
797		let subscribers = self.verify_and_register_unsubscribe(subscribe_method_name, unsubscribe_method_name)?;
798		let ctx = self.ctx.clone();
799
800		// Subscribe
801		let callback = {
802			self.methods.verify_and_insert(
803				subscribe_method_name,
804				MethodCallback::Subscription(Arc::new(move |id, params, method_sink, conn, extensions| {
805					let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: conn.id_provider.next_id() };
806
807					// response to the subscription call.
808					let (tx, rx) = oneshot::channel();
809					let (accepted_tx, accepted_rx) = oneshot::channel();
810
811					let sub_id = uniq_sub.sub_id.clone();
812					let method = notif_method_name;
813
814					let sink = PendingSubscriptionSink {
815						inner: method_sink.clone(),
816						method: notif_method_name,
817						subscribers: subscribers.clone(),
818						uniq_sub,
819						id: id.clone().into_owned(),
820						subscribe: tx,
821						permit: conn.subscription_permit,
822					};
823
824					// The subscription callback is a future from the subscription
825					// definition and not the as same when the subscription call has been completed.
826					//
827					// This runs until the subscription callback has completed.
828					//
829					// NOTE: the extensions can't be mutated at this point so
830					// it's safe to clone it.
831					let sub_fut = callback(params.into_owned(), sink, ctx.clone(), extensions.clone());
832
833					tokio::spawn(async move {
834						// This will wait for the subscription future to be resolved
835						let response = match futures_util::future::try_join(sub_fut.map(|f| Ok(f)), accepted_rx).await {
836							Ok((r, _)) => r.into_response(),
837							// The accept call failed i.e, the subscription was not accepted.
838							Err(_) => return,
839						};
840
841						match response {
842							SubscriptionCloseResponse::Notif(msg) => {
843								let json = sub_message_to_json(msg, &sub_id, method);
844								let _ = method_sink.send(json).await;
845							}
846							SubscriptionCloseResponse::NotifErr(err) => {
847								let json = sub_err_to_json(err, sub_id, method);
848								let _ = method_sink.send(json).await;
849							}
850							SubscriptionCloseResponse::None => (),
851						}
852					});
853
854					let id = id.clone().into_owned();
855
856					Box::pin(async move {
857						let rp = match rx.await {
858							Ok(rp) => {
859								// If the subscription was accepted then send a message
860								// to subscription task otherwise rely on the drop impl.
861								if rp.is_success() {
862									let _ = accepted_tx.send(());
863								}
864								rp
865							}
866							Err(_) => MethodResponse::error(id, ErrorCode::InternalError),
867						};
868
869						rp.with_extensions(extensions)
870					})
871				})),
872			)?
873		};
874
875		Ok(callback)
876	}
877
878	/// Similar to [`RpcModule::register_subscription`] but a little lower-level API
879	/// where handling the subscription is managed the user i.e, polling the subscription
880	/// such as spawning a separate task to do so.
881	///
882	/// This is more efficient as this doesn't require cloning the `params` in the subscription
883	/// and it won't send out a close message. Such things are delegated to the user of this API
884	///
885	/// # Examples
886	///
887	/// ```no_run
888	///
889	/// use jsonrpsee_core::server::{RpcModule, SubscriptionSink, SubscriptionMessage};
890	/// use jsonrpsee_types::ErrorObjectOwned;
891	///
892	/// let mut ctx = RpcModule::new(99_usize);
893	/// ctx.register_subscription_raw("sub", "notif_name", "unsub", |params, pending, ctx, _| {
894	///
895	///     // The params are parsed outside the async block below to avoid cloning the bytes.
896	///     let val = match params.one::<usize>() {
897	///         Ok(val) => val,
898	///         Err(e) => {
899	///             // If the subscription has not been "accepted" then
900	///             // the return value will be "ignored" as it's not
901	///             // allowed to send out any further notifications on
902	///             // on the subscription.
903	///             tokio::spawn(pending.reject(ErrorObjectOwned::from(e)));
904	///             return;
905	///         }
906	///     };
907	///
908	///     tokio::spawn(async move {
909	///         // Mark the subscription is accepted after the params has been parsed successful.
910	///         // This is actually responds the underlying RPC method call and may fail if the
911	///         // connection is closed.
912	///         let sink = pending.accept().await.unwrap();
913	///         let sum = val + (*ctx);
914	///
915	///         let msg = serde_json::value::to_raw_value(&sum).unwrap();
916	///
917	///         // This fails only if the connection is closed
918	///         sink.send(msg).await.unwrap();
919	///     });
920	/// });
921	/// ```
922	///
923	pub fn register_subscription_raw<R, F>(
924		&mut self,
925		subscribe_method_name: &'static str,
926		notif_method_name: &'static str,
927		unsubscribe_method_name: &'static str,
928		callback: F,
929	) -> Result<&mut MethodCallback, RegisterMethodError>
930	where
931		Context: Send + Sync + 'static,
932		F: (Fn(Params, PendingSubscriptionSink, Arc<Context>, &Extensions) -> R) + Send + Sync + Clone + 'static,
933		R: IntoSubscriptionCloseResponse,
934	{
935		let subscribers = self.verify_and_register_unsubscribe(subscribe_method_name, unsubscribe_method_name)?;
936		let ctx = self.ctx.clone();
937
938		// Subscribe
939		let callback = {
940			self.methods.verify_and_insert(
941				subscribe_method_name,
942				MethodCallback::Subscription(Arc::new(move |id, params, method_sink, conn, extensions| {
943					let uniq_sub = SubscriptionKey { conn_id: conn.conn_id, sub_id: conn.id_provider.next_id() };
944
945					// response to the subscription call.
946					let (tx, rx) = oneshot::channel();
947
948					let sink = PendingSubscriptionSink {
949						inner: method_sink.clone(),
950						method: notif_method_name,
951						subscribers: subscribers.clone(),
952						uniq_sub,
953						id: id.clone().into_owned(),
954						subscribe: tx,
955						permit: conn.subscription_permit,
956					};
957
958					callback(params, sink, ctx.clone(), &extensions);
959
960					let id = id.clone().into_owned();
961
962					Box::pin(async move {
963						let rp = match rx.await {
964							Ok(rp) => rp,
965							Err(_) => MethodResponse::error(id, ErrorCode::InternalError),
966						};
967
968						rp.with_extensions(extensions)
969					})
970				})),
971			)?
972		};
973
974		Ok(callback)
975	}
976
977	/// Helper to verify the subscription can be created
978	/// and register the unsubscribe handler.
979	fn verify_and_register_unsubscribe(
980		&mut self,
981		subscribe_method_name: &'static str,
982		unsubscribe_method_name: &'static str,
983	) -> Result<Subscribers, RegisterMethodError> {
984		if subscribe_method_name == unsubscribe_method_name {
985			return Err(RegisterMethodError::SubscriptionNameConflict(subscribe_method_name.into()));
986		}
987
988		self.methods.verify_method_name(subscribe_method_name)?;
989		self.methods.verify_method_name(unsubscribe_method_name)?;
990
991		let subscribers = Subscribers::default();
992
993		// Unsubscribe
994		{
995			let subscribers = subscribers.clone();
996			self.methods.mut_callbacks().insert(
997				unsubscribe_method_name,
998				MethodCallback::Unsubscription(Arc::new(move |id, params, conn_id, max_response_size, extensions| {
999					let sub_id = match params.one::<RpcSubscriptionId>() {
1000						Ok(sub_id) => sub_id,
1001						Err(_) => {
1002							tracing::warn!(
1003								target: LOG_TARGET,
1004								"Unsubscribe call `{}` failed: couldn't parse subscription id={:?} request id={:?}",
1005								unsubscribe_method_name,
1006								params,
1007								id
1008							);
1009
1010							return MethodResponse::response(id, ResponsePayload::success(false), max_response_size)
1011								.with_extensions(extensions);
1012						}
1013					};
1014
1015					let key = SubscriptionKey { conn_id, sub_id: sub_id.into_owned() };
1016					let result = subscribers.lock().remove(&key).is_some();
1017
1018					if !result {
1019						tracing::debug!(
1020							target: LOG_TARGET,
1021							"Unsubscribe call `{}` subscription key={:?} not an active subscription",
1022							unsubscribe_method_name,
1023							key,
1024						);
1025					}
1026
1027					MethodResponse::response(id, ResponsePayload::success(result), max_response_size)
1028				})),
1029			);
1030		}
1031
1032		Ok(subscribers)
1033	}
1034
1035	/// Register an alias for an existing_method. Alias uniqueness is enforced.
1036	pub fn register_alias(
1037		&mut self,
1038		alias: &'static str,
1039		existing_method: &'static str,
1040	) -> Result<(), RegisterMethodError> {
1041		self.methods.verify_method_name(alias)?;
1042
1043		let callback = match self.methods.callbacks.get(existing_method) {
1044			Some(callback) => callback.clone(),
1045			None => return Err(RegisterMethodError::MethodNotFound(existing_method.into())),
1046		};
1047
1048		self.methods.mut_callbacks().insert(alias, callback);
1049
1050		Ok(())
1051	}
1052}
1053
1054fn mock_subscription_permit() -> SubscriptionPermit {
1055	BoundedSubscriptions::new(1).acquire().expect("1 permit should exist; qed")
1056}