tetsy_jsonrpc_pubsub/
manager.rs

1//! The SubscriptionManager used to manage subscription based RPCs.
2//!
3//! The manager provides four main things in terms of functionality:
4//!
5//! 1. The ability to create unique subscription IDs through the
6//! use of the `IdProvider` trait. Two implementations are availble
7//! out of the box, a `NumericIdProvider` and a `RandomStringIdProvider`.
8//!
9//! 2. An executor with which to drive `Future`s to completion.
10//!
11//! 3. A way to add new subscriptions. Subscriptions should come in the form
12//! of a `Stream`. These subscriptions will be transformed into notifications
13//! by the manager, which can be consumed by the client.
14//!
15//! 4. A way to cancel any currently active subscription.
16
17use std::collections::HashMap;
18use std::iter;
19use std::sync::{
20	atomic::{AtomicUsize, Ordering},
21	Arc,
22};
23
24use crate::core::futures::sync::oneshot;
25use crate::core::futures::{future as future01, Future as Future01};
26use crate::{
27	typed::{Sink, Subscriber},
28	SubscriptionId,
29};
30
31use log::{error, warn};
32use parking_lot::Mutex;
33use rand::distributions::Alphanumeric;
34use rand::{thread_rng, Rng};
35
36/// Alias for an implementation of `futures::future::Executor`.
37pub type TaskExecutor = Arc<dyn future01::Executor<Box<dyn Future01<Item = (), Error = ()> + Send>> + Send + Sync>;
38
39type ActiveSubscriptions = Arc<Mutex<HashMap<SubscriptionId, oneshot::Sender<()>>>>;
40
41/// Trait used to provide unique subscription IDs.
42pub trait IdProvider {
43	/// A unique ID used to identify a subscription.
44	type Id: Default + Into<SubscriptionId>;
45
46	/// Returns the next ID for the subscription.
47	fn next_id(&self) -> Self::Id;
48}
49
50/// Provides a thread-safe incrementing integer which
51/// can be used as a subscription ID.
52#[derive(Clone, Debug)]
53pub struct NumericIdProvider {
54	current_id: Arc<AtomicUsize>,
55}
56
57impl NumericIdProvider {
58	/// Create a new NumericIdProvider.
59	pub fn new() -> Self {
60		Default::default()
61	}
62
63	/// Create a new NumericIdProvider starting from
64	/// the given ID.
65	pub fn with_id(id: AtomicUsize) -> Self {
66		Self {
67			current_id: Arc::new(id),
68		}
69	}
70}
71
72impl IdProvider for NumericIdProvider {
73	type Id = u64;
74
75	fn next_id(&self) -> Self::Id {
76		self.current_id.fetch_add(1, Ordering::AcqRel) as u64
77	}
78}
79
80impl Default for NumericIdProvider {
81	fn default() -> Self {
82		NumericIdProvider {
83			current_id: Arc::new(AtomicUsize::new(1)),
84		}
85	}
86}
87
88/// Used to generate random strings for use as
89/// subscription IDs.
90#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
91pub struct RandomStringIdProvider {
92	len: usize,
93}
94
95impl RandomStringIdProvider {
96	/// Create a new RandomStringIdProvider.
97	pub fn new() -> Self {
98		Default::default()
99	}
100
101	/// Create a new RandomStringIdProvider, which will generate
102	/// random id strings of the given length.
103	pub fn with_len(len: usize) -> Self {
104		Self { len }
105	}
106}
107
108impl IdProvider for RandomStringIdProvider {
109	type Id = String;
110
111	fn next_id(&self) -> Self::Id {
112		let mut rng = thread_rng();
113		let id: String = iter::repeat(())
114			.map(|()| rng.sample(Alphanumeric))
115			.take(self.len)
116			.collect();
117		id
118	}
119}
120
121impl Default for RandomStringIdProvider {
122	fn default() -> Self {
123		Self { len: 16 }
124	}
125}
126
127/// Subscriptions manager.
128///
129/// Takes care of assigning unique subscription ids and
130/// driving the sinks into completion.
131#[derive(Clone)]
132pub struct SubscriptionManager<I: IdProvider = RandomStringIdProvider> {
133	id_provider: I,
134	active_subscriptions: ActiveSubscriptions,
135	executor: TaskExecutor,
136}
137
138impl SubscriptionManager {
139	/// Creates a new SubscriptionManager.
140	///
141	/// Uses `RandomStringIdProvider` as the ID provider.
142	pub fn new(executor: TaskExecutor) -> Self {
143		Self {
144			id_provider: RandomStringIdProvider::default(),
145			active_subscriptions: Default::default(),
146			executor,
147		}
148	}
149}
150
151impl<I: IdProvider> SubscriptionManager<I> {
152	/// Creates a new SubscriptionManager with the specified
153	/// ID provider.
154	pub fn with_id_provider(id_provider: I, executor: TaskExecutor) -> Self {
155		Self {
156			id_provider,
157			active_subscriptions: Default::default(),
158			executor,
159		}
160	}
161
162	/// Borrows the internal task executor.
163	///
164	/// This can be used to spawn additional tasks on the underlying event loop.
165	pub fn executor(&self) -> &TaskExecutor {
166		&self.executor
167	}
168
169	/// Creates new subscription for given subscriber.
170	///
171	/// Second parameter is a function that converts Subscriber Sink into a Future.
172	/// This future will be driven to completion by the underlying event loop
173	pub fn add<T, E, G, R, F>(&self, subscriber: Subscriber<T, E>, into_future: G) -> SubscriptionId
174	where
175		G: FnOnce(Sink<T, E>) -> R,
176		R: future01::IntoFuture<Future = F, Item = (), Error = ()>,
177		F: future01::Future<Item = (), Error = ()> + Send + 'static,
178	{
179		let id = self.id_provider.next_id();
180		let subscription_id: SubscriptionId = id.into();
181		if let Ok(sink) = subscriber.assign_id(subscription_id.clone()) {
182			let (tx, rx) = oneshot::channel();
183			let future = into_future(sink)
184				.into_future()
185				.select(rx.map_err(|e| warn!("Error timing out: {:?}", e)))
186				.then(|_| Ok(()));
187
188			self.active_subscriptions.lock().insert(subscription_id.clone(), tx);
189			if self.executor.execute(Box::new(future)).is_err() {
190				error!("Failed to spawn RPC subscription task");
191			}
192		}
193
194		subscription_id
195	}
196
197	/// Cancel subscription.
198	///
199	/// Returns true if subscription existed or false otherwise.
200	pub fn cancel(&self, id: SubscriptionId) -> bool {
201		if let Some(tx) = self.active_subscriptions.lock().remove(&id) {
202			let _ = tx.send(());
203			return true;
204		}
205
206		false
207	}
208}
209
210impl<I: Default + IdProvider> SubscriptionManager<I> {
211	/// Creates a new SubscriptionManager.
212	pub fn with_executor(executor: TaskExecutor) -> Self {
213		Self {
214			id_provider: Default::default(),
215			active_subscriptions: Default::default(),
216			executor,
217		}
218	}
219}
220
221#[cfg(test)]
222mod tests {
223	use super::*;
224	use crate::typed::Subscriber;
225	use futures::{compat::Future01CompatExt, executor, FutureExt};
226	use futures::{stream, StreamExt, TryStreamExt};
227
228	use crate::core::futures::sink::Sink as Sink01;
229	use crate::core::futures::stream::Stream as Stream01;
230
231	// Executor shared by all tests.
232	//
233	// This shared executor is used to prevent `Too many open files` errors
234	// on systems with a lot of cores.
235	lazy_static::lazy_static! {
236		static ref EXECUTOR: executor::ThreadPool = executor::ThreadPool::new()
237			.expect("Failed to create thread pool executor for tests");
238	}
239
240	pub struct TestTaskExecutor;
241	type Boxed01Future01 = Box<dyn future01::Future<Item = (), Error = ()> + Send + 'static>;
242
243	impl future01::Executor<Boxed01Future01> for TestTaskExecutor {
244		fn execute(&self, future: Boxed01Future01) -> std::result::Result<(), future01::ExecuteError<Boxed01Future01>> {
245			EXECUTOR.spawn_ok(future.compat().map(drop));
246			Ok(())
247		}
248	}
249
250	#[test]
251	fn making_a_numeric_id_provider_works() {
252		let provider = NumericIdProvider::new();
253		let expected_id = 1;
254		let actual_id = provider.next_id();
255
256		assert_eq!(actual_id, expected_id);
257	}
258
259	#[test]
260	fn default_numeric_id_provider_works() {
261		let provider: NumericIdProvider = Default::default();
262		let expected_id = 1;
263		let actual_id = provider.next_id();
264
265		assert_eq!(actual_id, expected_id);
266	}
267
268	#[test]
269	fn numeric_id_provider_with_id_works() {
270		let provider = NumericIdProvider::with_id(AtomicUsize::new(5));
271		let expected_id = 5;
272		let actual_id = provider.next_id();
273
274		assert_eq!(actual_id, expected_id);
275	}
276
277	#[test]
278	fn random_string_provider_returns_id_with_correct_default_len() {
279		let provider = RandomStringIdProvider::new();
280		let expected_len = 16;
281		let actual_len = provider.next_id().len();
282
283		assert_eq!(actual_len, expected_len);
284	}
285
286	#[test]
287	fn random_string_provider_returns_id_with_correct_user_given_len() {
288		let expected_len = 10;
289		let provider = RandomStringIdProvider::with_len(expected_len);
290		let actual_len = provider.next_id().len();
291
292		assert_eq!(actual_len, expected_len);
293	}
294
295	#[test]
296	fn new_subscription_manager_defaults_to_random_string_provider() {
297		let manager = SubscriptionManager::new(Arc::new(TestTaskExecutor));
298		let subscriber = Subscriber::<u64>::new_test("test_subTest").0;
299		let stream = stream::iter(vec![Ok(1)]).compat();
300
301		let id = manager.add(subscriber, |sink| {
302			let stream = stream.map(|res| Ok(res));
303
304			sink.sink_map_err(|_| ()).send_all(stream).map(|_| ())
305		});
306
307		assert!(matches!(id, SubscriptionId::String(_)))
308	}
309
310	#[test]
311	fn new_subscription_manager_works_with_numeric_id_provider() {
312		let id_provider = NumericIdProvider::default();
313		let manager = SubscriptionManager::with_id_provider(id_provider, Arc::new(TestTaskExecutor));
314
315		let subscriber = Subscriber::<u64>::new_test("test_subTest").0;
316		let stream = stream::iter(vec![Ok(1)]).compat();
317
318		let id = manager.add(subscriber, |sink| {
319			let stream = stream.map(|res| Ok(res));
320
321			sink.sink_map_err(|_| ()).send_all(stream).map(|_| ())
322		});
323
324		assert!(matches!(id, SubscriptionId::Number(_)))
325	}
326
327	#[test]
328	fn new_subscription_manager_works_with_random_string_provider() {
329		let id_provider = RandomStringIdProvider::default();
330		let manager = SubscriptionManager::with_id_provider(id_provider, Arc::new(TestTaskExecutor));
331
332		let subscriber = Subscriber::<u64>::new_test("test_subTest").0;
333		let stream = stream::iter(vec![Ok(1)]).compat();
334
335		let id = manager.add(subscriber, |sink| {
336			let stream = stream.map(|res| Ok(res));
337
338			sink.sink_map_err(|_| ()).send_all(stream).map(|_| ())
339		});
340
341		assert!(matches!(id, SubscriptionId::String(_)))
342	}
343
344	#[test]
345	fn subscription_is_canceled_if_it_existed() {
346		let manager = SubscriptionManager::<NumericIdProvider>::with_executor(Arc::new(TestTaskExecutor));
347		// Need to bind receiver here (unlike the other tests) or else the subscriber
348		// will think the client has disconnected and not update `active_subscriptions`
349		let (subscriber, _recv, _) = Subscriber::<u64>::new_test("test_subTest");
350
351		let (mut tx, rx) = futures::channel::mpsc::channel(8);
352		tx.start_send(1).unwrap();
353		let stream = rx.map(|v| Ok::<_, ()>(v)).compat();
354
355		let id = manager.add(subscriber, |sink| {
356			let stream = stream.map(|res| Ok(res));
357
358			sink.sink_map_err(|_| ()).send_all(stream).map(|_| ())
359		});
360
361		let is_cancelled = manager.cancel(id);
362		assert!(is_cancelled);
363	}
364
365	#[test]
366	fn subscription_is_not_canceled_because_it_didnt_exist() {
367		let manager = SubscriptionManager::new(Arc::new(TestTaskExecutor));
368
369		let id: SubscriptionId = 23u32.into();
370		let is_cancelled = manager.cancel(id);
371		let is_not_cancelled = !is_cancelled;
372
373		assert!(is_not_cancelled);
374	}
375}