jsonrpsee_core/client/async_client/
manager.rs

1// Copyright 2019-2021 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any
4// person obtaining a copy of this software and associated
5// documentation files (the "Software"), to deal in the
6// Software without restriction, including without
7// limitation the rights to use, copy, modify, merge,
8// publish, distribute, sublicense, and/or sell copies of
9// the Software, and to permit persons to whom the Software
10// is furnished to do so, subject to the following
11// conditions:
12//
13// The above copyright notice and this permission notice
14// shall be included in all copies or substantial portions
15// of the Software.
16//
17// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25// DEALINGS IN THE SOFTWARE.
26
27//! Handles and monitors JSONRPC v2 method calls and subscriptions
28//!
29//! Definitions:
30//!
31//!    - RequestId: request ID in the JSONRPC-v2 specification
32//!    > **Note**: The spec allow number, string or null but this crate only supports numbers.
33//!    - SubscriptionId: unique ID generated by server
34
35use std::{
36	collections::{HashMap, hash_map::Entry},
37	ops::Range,
38};
39
40use crate::{
41	client::{Error, RawResponseOwned, SubscriptionReceiver, SubscriptionSender},
42	error::RegisterMethodError,
43};
44use jsonrpsee_types::{Id, InvalidRequestId, SubscriptionId};
45use rustc_hash::FxHashMap;
46use tokio::sync::oneshot;
47
48#[derive(Debug)]
49enum Kind {
50	PendingMethodCall(PendingCallOneshot),
51	PendingSubscription((RequestId, PendingSubscriptionOneshot, UnsubscribeMethod)),
52	Subscription((RequestId, SubscriptionSink, UnsubscribeMethod)),
53}
54
55#[derive(Debug, Clone)]
56/// Indicates the status of a given request/response.
57pub(crate) enum RequestStatus {
58	/// The method call is waiting for a response,
59	PendingMethodCall,
60	/// The subscription is waiting for a response to become an active subscription.
61	PendingSubscription,
62	/// An active subscription.
63	Subscription,
64	/// Invalid request ID.
65	Invalid,
66}
67
68type PendingCallOneshot = Option<oneshot::Sender<Result<RawResponseOwned, InvalidRequestId>>>;
69type PendingBatchOneshot = oneshot::Sender<Result<Vec<RawResponseOwned>, InvalidRequestId>>;
70type PendingSubscriptionOneshot = oneshot::Sender<Result<(SubscriptionReceiver, SubscriptionId<'static>), Error>>;
71type SubscriptionSink = SubscriptionSender;
72type UnsubscribeMethod = String;
73type RequestId = Id<'static>;
74
75#[derive(Debug)]
76/// Batch state.
77pub(crate) struct BatchState {
78	/// Oneshot send back.
79	pub(crate) send_back: PendingBatchOneshot,
80}
81
82#[derive(Debug, Default)]
83/// Manages and monitors JSONRPC v2 method calls and subscriptions.
84pub(crate) struct RequestManager {
85	/// List of requests that are waiting for a response from the server.
86	// NOTE: FnvHashMap is used here because RequestId is not under the caller's control and is known to be a short
87	// key.
88	requests: FxHashMap<RequestId, Kind>,
89	/// Reverse lookup, to find a request ID in constant time by `subscription ID` instead of looking through all
90	/// requests.
91	subscriptions: HashMap<SubscriptionId<'static>, RequestId>,
92	/// Pending batch requests.
93	batches: FxHashMap<Range<u64>, BatchState>,
94	/// Registered Methods for incoming notifications.
95	notification_handlers: HashMap<String, SubscriptionSink>,
96}
97
98impl RequestManager {
99	/// Create a new `RequestManager`.
100	#[allow(unused)]
101	pub(crate) fn new() -> Self {
102		Self::default()
103	}
104
105	/// Tries to insert a new pending request.
106	///
107	/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
108	pub(crate) fn insert_pending_call(
109		&mut self,
110		id: RequestId,
111		send_back: PendingCallOneshot,
112	) -> Result<(), PendingCallOneshot> {
113		if let Entry::Vacant(v) = self.requests.entry(id) {
114			v.insert(Kind::PendingMethodCall(send_back));
115			Ok(())
116		} else {
117			Err(send_back)
118		}
119	}
120
121	/// Tries to insert a new batch request.
122	///
123	/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
124	pub(crate) fn insert_pending_batch(
125		&mut self,
126		batch: Range<u64>,
127		send_back: PendingBatchOneshot,
128	) -> Result<(), PendingBatchOneshot> {
129		if let Entry::Vacant(v) = self.batches.entry(batch) {
130			v.insert(BatchState { send_back });
131			Ok(())
132		} else {
133			Err(send_back)
134		}
135	}
136
137	/// Tries to insert a new pending subscription and reserves a slot for a "potential" unsubscription request.
138	///
139	/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
140	pub(crate) fn insert_pending_subscription(
141		&mut self,
142		sub_req_id: RequestId,
143		unsub_req_id: RequestId,
144		send_back: PendingSubscriptionOneshot,
145		unsubscribe_method: UnsubscribeMethod,
146	) -> Result<(), PendingSubscriptionOneshot> {
147		// The request IDs are not in the manager and the `sub_id` and `unsub_id` are not equal.
148		if !self.requests.contains_key(&sub_req_id)
149			&& !self.requests.contains_key(&unsub_req_id)
150			&& sub_req_id != unsub_req_id
151		{
152			self.requests
153				.insert(sub_req_id, Kind::PendingSubscription((unsub_req_id.clone(), send_back, unsubscribe_method)));
154			self.requests.insert(unsub_req_id, Kind::PendingMethodCall(None));
155			Ok(())
156		} else {
157			Err(send_back)
158		}
159	}
160
161	/// Tries to insert a new subscription.
162	///
163	/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
164	pub(crate) fn insert_subscription(
165		&mut self,
166		sub_req_id: RequestId,
167		unsub_req_id: RequestId,
168		subscription_id: SubscriptionId<'static>,
169		send_back: SubscriptionSink,
170		unsubscribe_method: UnsubscribeMethod,
171	) -> Result<(), SubscriptionSink> {
172		if let (Entry::Vacant(request), Entry::Vacant(subscription)) =
173			(self.requests.entry(sub_req_id.clone()), self.subscriptions.entry(subscription_id))
174		{
175			request.insert(Kind::Subscription((unsub_req_id, send_back, unsubscribe_method)));
176			subscription.insert(sub_req_id);
177			Ok(())
178		} else {
179			Err(send_back)
180		}
181	}
182
183	/// Inserts a handler for incoming notifications.
184	pub(crate) fn insert_notification_handler(
185		&mut self,
186		method: &str,
187		send_back: SubscriptionSink,
188	) -> Result<(), RegisterMethodError> {
189		if let Entry::Vacant(handle) = self.notification_handlers.entry(method.to_owned()) {
190			handle.insert(send_back);
191			Ok(())
192		} else {
193			Err(RegisterMethodError::AlreadyRegistered(method.to_owned()))
194		}
195	}
196
197	/// Removes a notification handler.
198	pub(crate) fn remove_notification_handler(&mut self, method: &str) -> Option<SubscriptionSink> {
199		self.notification_handlers.remove(method)
200	}
201
202	/// Tries to complete a pending subscription.
203	///
204	/// Returns `Some` if the subscription was completed otherwise `None`.
205	pub(crate) fn complete_pending_subscription(
206		&mut self,
207		request_id: RequestId,
208	) -> Option<(RequestId, PendingSubscriptionOneshot, UnsubscribeMethod)> {
209		match self.requests.entry(request_id) {
210			Entry::Occupied(request) if matches!(request.get(), Kind::PendingSubscription(_)) => {
211				let (_req_id, kind) = request.remove_entry();
212				if let Kind::PendingSubscription(send_back) = kind {
213					Some(send_back)
214				} else {
215					unreachable!("Pending subscription is Pending subscription checked above; qed");
216				}
217			}
218			_ => None,
219		}
220	}
221
222	/// Tries to complete a pending batch request.
223	///
224	/// Returns `Some` if the subscription was completed otherwise `None`.
225	pub(crate) fn complete_pending_batch(&mut self, batch: Range<u64>) -> Option<BatchState> {
226		match self.batches.entry(batch) {
227			Entry::Occupied(request) => {
228				let (_digest, state) = request.remove_entry();
229				Some(state)
230			}
231			_ => None,
232		}
233	}
234
235	/// Tries to complete a pending call.
236	///
237	/// Returns `Some` if the call was completed otherwise `None`.
238	pub(crate) fn complete_pending_call(&mut self, request_id: RequestId) -> Option<PendingCallOneshot> {
239		match self.requests.entry(request_id) {
240			Entry::Occupied(request) if matches!(request.get(), Kind::PendingMethodCall(_)) => {
241				let (_req_id, kind) = request.remove_entry();
242				if let Kind::PendingMethodCall(send_back) = kind {
243					Some(send_back)
244				} else {
245					unreachable!("Pending call is Pending call checked above; qed");
246				}
247			}
248			_ => None,
249		}
250	}
251
252	/// Removes the subscription without waiting for the unsubscribe call.
253	///
254	/// Returns `Some` if the subscription was removed.
255	pub(crate) fn remove_subscription(
256		&mut self,
257		request_id: RequestId,
258		subscription_id: SubscriptionId<'static>,
259	) -> Option<(RequestId, SubscriptionSink, UnsubscribeMethod, SubscriptionId<'_>)> {
260		match (self.requests.entry(request_id), self.subscriptions.entry(subscription_id)) {
261			(Entry::Occupied(request), Entry::Occupied(subscription))
262				if matches!(request.get(), Kind::Subscription(_)) =>
263			{
264				// Mark the request ID as pending unsubscription.
265				let (_req_id, kind) = request.remove_entry();
266				let (sub_id, _req_id) = subscription.remove_entry();
267				if let Kind::Subscription((unsub_req_id, send_back, unsub)) = kind {
268					Some((unsub_req_id, send_back, unsub, sub_id))
269				} else {
270					unreachable!("Subscription is Subscription checked above; qed");
271				}
272			}
273			_ => None,
274		}
275	}
276
277	/// Initiates an unsubscribe which is not completed until the unsubscribe call
278	/// has been acknowledged.
279	///
280	/// Returns `Some` if the subscription was unsubscribed.
281	pub(crate) fn unsubscribe(
282		&mut self,
283		request_id: RequestId,
284		subscription_id: SubscriptionId<'static>,
285	) -> Option<(RequestId, SubscriptionSink, UnsubscribeMethod, SubscriptionId<'_>)> {
286		match (self.requests.entry(request_id), self.subscriptions.entry(subscription_id)) {
287			(Entry::Occupied(mut request), Entry::Occupied(subscription))
288				if matches!(request.get(), Kind::Subscription(_)) =>
289			{
290				// Mark the request ID as "pending unsubscription" which will be resolved once the
291				// unsubscribe call has been acknowledged.
292				let kind = std::mem::replace(request.get_mut(), Kind::PendingMethodCall(None));
293				let (sub_id, _req_id) = subscription.remove_entry();
294				if let Kind::Subscription((unsub_req_id, send_back, unsub)) = kind {
295					Some((unsub_req_id, send_back, unsub, sub_id))
296				} else {
297					unreachable!("Subscription is Subscription checked above; qed");
298				}
299			}
300			_ => None,
301		}
302	}
303
304	/// Returns the status of a request ID
305	pub(crate) fn request_status(&mut self, id: &RequestId) -> RequestStatus {
306		self.requests.get(id).map_or(RequestStatus::Invalid, |kind| match kind {
307			Kind::PendingMethodCall(_) => RequestStatus::PendingMethodCall,
308			Kind::PendingSubscription(_) => RequestStatus::PendingSubscription,
309			Kind::Subscription(_) => RequestStatus::Subscription,
310		})
311	}
312
313	/// Get a mutable reference to underlying `Sink` in order to send messages to the subscription.
314	///
315	/// Returns `Some` if the `request_id` was registered as a subscription otherwise `None`.
316	pub(crate) fn as_subscription_mut(&mut self, request_id: &RequestId) -> Option<&mut SubscriptionSink> {
317		if let Some(Kind::Subscription((_, sink, _))) = self.requests.get_mut(request_id) { Some(sink) } else { None }
318	}
319
320	/// Get a mutable reference to underlying `Sink` in order to send incoming notifications to the subscription.
321	///
322	/// Returns `Some` if the `method` was registered as a NotificationHandler otherwise `None`.
323	pub(crate) fn as_notification_handler_mut(&mut self, method: String) -> Option<&mut SubscriptionSink> {
324		self.notification_handlers.get_mut(&method)
325	}
326
327	/// Reverse lookup to get the request ID for a subscription ID.
328	///
329	/// Returns `Some` if the subscription ID was registered as a subscription otherwise `None`.
330	pub(crate) fn get_request_id_by_subscription_id(&self, sub_id: &SubscriptionId) -> Option<RequestId> {
331		self.subscriptions.get(sub_id).map(|id| id.clone().into_owned())
332	}
333}
334
335#[cfg(test)]
336mod tests {
337	use crate::client::subscription_channel;
338
339	use super::RequestManager;
340	use jsonrpsee_types::{Id, SubscriptionId};
341	use tokio::sync::oneshot;
342
343	#[test]
344	fn insert_remove_pending_request_works() {
345		let (request_tx, _) = oneshot::channel();
346
347		let mut manager = RequestManager::new();
348		assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx)).is_ok());
349		assert!(manager.complete_pending_call(Id::Number(0)).is_some());
350	}
351
352	#[test]
353	fn insert_remove_subscription_works() {
354		let (pending_sub_tx, _) = oneshot::channel();
355		let (sub_tx, _) = subscription_channel(1);
356		let mut manager = RequestManager::new();
357		assert!(
358			manager
359				.insert_pending_subscription(Id::Number(1), Id::Number(2), pending_sub_tx, "unsubscribe_method".into())
360				.is_ok()
361		);
362		let (unsub_req_id, _send_back_oneshot, unsubscribe_method) =
363			manager.complete_pending_subscription(Id::Number(1)).unwrap();
364		assert_eq!(unsub_req_id, Id::Number(2));
365		assert!(
366			manager
367				.insert_subscription(
368					Id::Number(1),
369					Id::Number(2),
370					SubscriptionId::Str("uniq_id_from_server".into()),
371					sub_tx,
372					unsubscribe_method
373				)
374				.is_ok()
375		);
376
377		assert!(manager.as_subscription_mut(&Id::Number(1)).is_some());
378		assert!(
379			manager.remove_subscription(Id::Number(1), SubscriptionId::Str("uniq_id_from_server".into())).is_some()
380		);
381	}
382
383	#[test]
384	fn insert_subscription_with_same_sub_and_unsub_id_should_err() {
385		let (tx1, _) = oneshot::channel();
386		let (tx2, _) = oneshot::channel();
387		let (tx3, _) = oneshot::channel();
388		let (tx4, _) = oneshot::channel();
389		let mut manager = RequestManager::new();
390		assert!(
391			manager
392				.insert_pending_subscription(Id::Str("1".into()), Id::Str("1".into()), tx1, "unsubscribe_method".into())
393				.is_err()
394		);
395		assert!(
396			manager
397				.insert_pending_subscription(Id::Str("0".into()), Id::Str("1".into()), tx2, "unsubscribe_method".into())
398				.is_ok()
399		);
400		assert!(
401			manager
402				.insert_pending_subscription(
403					Id::Str("99".into()),
404					Id::Str("0".into()),
405					tx3,
406					"unsubscribe_method".into()
407				)
408				.is_err(),
409			"unsub request ID already occupied"
410		);
411		assert!(
412			manager
413				.insert_pending_subscription(
414					Id::Str("99".into()),
415					Id::Str("1".into()),
416					tx4,
417					"unsubscribe_method".into()
418				)
419				.is_err(),
420			"sub request ID already occupied"
421		);
422	}
423
424	#[test]
425	fn pending_method_call_faulty() {
426		let (request_tx1, _) = oneshot::channel();
427		let (request_tx2, _) = oneshot::channel();
428		let (pending_sub_tx, _) = oneshot::channel();
429		let (sub_tx, _) = subscription_channel(1);
430
431		let mut manager = RequestManager::new();
432		assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx1)).is_ok());
433		assert!(manager.insert_pending_call(Id::Number(0), Some(request_tx2)).is_err());
434		assert!(
435			manager
436				.insert_pending_subscription(Id::Number(0), Id::Number(1), pending_sub_tx, "beef".to_string())
437				.is_err()
438		);
439		assert!(
440			manager
441				.insert_subscription(
442					Id::Number(0),
443					Id::Number(99),
444					SubscriptionId::Num(137),
445					sub_tx,
446					"bibimbap".to_string()
447				)
448				.is_err()
449		);
450
451		assert!(manager.remove_subscription(Id::Number(0), SubscriptionId::Num(137)).is_none());
452		assert!(manager.complete_pending_subscription(Id::Number(0)).is_none());
453		assert!(manager.complete_pending_call(Id::Number(0)).is_some());
454	}
455
456	#[test]
457	fn pending_subscription_faulty() {
458		let (request_tx, _) = oneshot::channel();
459		let (pending_sub_tx1, _) = oneshot::channel();
460		let (pending_sub_tx2, _) = oneshot::channel();
461		let (sub_tx, _) = subscription_channel(1);
462
463		let mut manager = RequestManager::new();
464		assert!(
465			manager
466				.insert_pending_subscription(Id::Number(99), Id::Number(100), pending_sub_tx1, "beef".to_string())
467				.is_ok()
468		);
469		assert!(manager.insert_pending_call(Id::Number(99), Some(request_tx)).is_err());
470		assert!(
471			manager
472				.insert_pending_subscription(Id::Number(99), Id::Number(1337), pending_sub_tx2, "vegan".to_string())
473				.is_err()
474		);
475
476		assert!(
477			manager
478				.insert_subscription(
479					Id::Number(99),
480					Id::Number(100),
481					SubscriptionId::Num(0),
482					sub_tx,
483					"bibimbap".to_string()
484				)
485				.is_err()
486		);
487
488		assert!(manager.remove_subscription(Id::Number(99), SubscriptionId::Num(0)).is_none());
489		assert!(manager.complete_pending_call(Id::Number(99)).is_none());
490		assert!(manager.complete_pending_subscription(Id::Number(99)).is_some());
491	}
492
493	#[test]
494	fn active_subscriptions_faulty() {
495		let (request_tx, _) = oneshot::channel();
496		let (pending_sub_tx, _) = oneshot::channel();
497		let (sub_tx1, _) = subscription_channel(1);
498		let (sub_tx2, _) = subscription_channel(1);
499
500		let mut manager = RequestManager::new();
501
502		assert!(
503			manager
504				.insert_subscription(
505					Id::Number(3),
506					Id::Number(4),
507					SubscriptionId::Num(0),
508					sub_tx1,
509					"bibimbap".to_string()
510				)
511				.is_ok()
512		);
513		assert!(
514			manager
515				.insert_subscription(
516					Id::Number(3),
517					Id::Number(4),
518					SubscriptionId::Num(1),
519					sub_tx2,
520					"bibimbap".to_string()
521				)
522				.is_err()
523		);
524		assert!(
525			manager
526				.insert_pending_subscription(Id::Number(3), Id::Number(4), pending_sub_tx, "beef".to_string())
527				.is_err()
528		);
529		assert!(manager.insert_pending_call(Id::Number(3), Some(request_tx)).is_err());
530
531		assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(7)).is_none());
532		assert!(manager.complete_pending_call(Id::Number(3)).is_none());
533		assert!(manager.complete_pending_subscription(Id::Number(3)).is_none());
534		assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(1)).is_none());
535		assert!(manager.remove_subscription(Id::Number(3), SubscriptionId::Num(0)).is_some());
536
537		assert!(manager.requests.is_empty());
538		assert!(manager.subscriptions.is_empty());
539	}
540}