tetsy_jsonrpc_pubsub/
manager.rs1use std::collections::HashMap;
18use std::iter;
19use std::sync::{
20 atomic::{AtomicUsize, Ordering},
21 Arc,
22};
23
24use crate::core::futures::sync::oneshot;
25use crate::core::futures::{future as future01, Future as Future01};
26use crate::{
27 typed::{Sink, Subscriber},
28 SubscriptionId,
29};
30
31use log::{error, warn};
32use parking_lot::Mutex;
33use rand::distributions::Alphanumeric;
34use rand::{thread_rng, Rng};
35
36pub type TaskExecutor = Arc<dyn future01::Executor<Box<dyn Future01<Item = (), Error = ()> + Send>> + Send + Sync>;
38
39type ActiveSubscriptions = Arc<Mutex<HashMap<SubscriptionId, oneshot::Sender<()>>>>;
40
41pub trait IdProvider {
43 type Id: Default + Into<SubscriptionId>;
45
46 fn next_id(&self) -> Self::Id;
48}
49
50#[derive(Clone, Debug)]
53pub struct NumericIdProvider {
54 current_id: Arc<AtomicUsize>,
55}
56
57impl NumericIdProvider {
58 pub fn new() -> Self {
60 Default::default()
61 }
62
63 pub fn with_id(id: AtomicUsize) -> Self {
66 Self {
67 current_id: Arc::new(id),
68 }
69 }
70}
71
72impl IdProvider for NumericIdProvider {
73 type Id = u64;
74
75 fn next_id(&self) -> Self::Id {
76 self.current_id.fetch_add(1, Ordering::AcqRel) as u64
77 }
78}
79
80impl Default for NumericIdProvider {
81 fn default() -> Self {
82 NumericIdProvider {
83 current_id: Arc::new(AtomicUsize::new(1)),
84 }
85 }
86}
87
88#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
91pub struct RandomStringIdProvider {
92 len: usize,
93}
94
95impl RandomStringIdProvider {
96 pub fn new() -> Self {
98 Default::default()
99 }
100
101 pub fn with_len(len: usize) -> Self {
104 Self { len }
105 }
106}
107
108impl IdProvider for RandomStringIdProvider {
109 type Id = String;
110
111 fn next_id(&self) -> Self::Id {
112 let mut rng = thread_rng();
113 let id: String = iter::repeat(())
114 .map(|()| rng.sample(Alphanumeric))
115 .take(self.len)
116 .collect();
117 id
118 }
119}
120
121impl Default for RandomStringIdProvider {
122 fn default() -> Self {
123 Self { len: 16 }
124 }
125}
126
127#[derive(Clone)]
132pub struct SubscriptionManager<I: IdProvider = RandomStringIdProvider> {
133 id_provider: I,
134 active_subscriptions: ActiveSubscriptions,
135 executor: TaskExecutor,
136}
137
138impl SubscriptionManager {
139 pub fn new(executor: TaskExecutor) -> Self {
143 Self {
144 id_provider: RandomStringIdProvider::default(),
145 active_subscriptions: Default::default(),
146 executor,
147 }
148 }
149}
150
151impl<I: IdProvider> SubscriptionManager<I> {
152 pub fn with_id_provider(id_provider: I, executor: TaskExecutor) -> Self {
155 Self {
156 id_provider,
157 active_subscriptions: Default::default(),
158 executor,
159 }
160 }
161
162 pub fn executor(&self) -> &TaskExecutor {
166 &self.executor
167 }
168
169 pub fn add<T, E, G, R, F>(&self, subscriber: Subscriber<T, E>, into_future: G) -> SubscriptionId
174 where
175 G: FnOnce(Sink<T, E>) -> R,
176 R: future01::IntoFuture<Future = F, Item = (), Error = ()>,
177 F: future01::Future<Item = (), Error = ()> + Send + 'static,
178 {
179 let id = self.id_provider.next_id();
180 let subscription_id: SubscriptionId = id.into();
181 if let Ok(sink) = subscriber.assign_id(subscription_id.clone()) {
182 let (tx, rx) = oneshot::channel();
183 let future = into_future(sink)
184 .into_future()
185 .select(rx.map_err(|e| warn!("Error timing out: {:?}", e)))
186 .then(|_| Ok(()));
187
188 self.active_subscriptions.lock().insert(subscription_id.clone(), tx);
189 if self.executor.execute(Box::new(future)).is_err() {
190 error!("Failed to spawn RPC subscription task");
191 }
192 }
193
194 subscription_id
195 }
196
197 pub fn cancel(&self, id: SubscriptionId) -> bool {
201 if let Some(tx) = self.active_subscriptions.lock().remove(&id) {
202 let _ = tx.send(());
203 return true;
204 }
205
206 false
207 }
208}
209
210impl<I: Default + IdProvider> SubscriptionManager<I> {
211 pub fn with_executor(executor: TaskExecutor) -> Self {
213 Self {
214 id_provider: Default::default(),
215 active_subscriptions: Default::default(),
216 executor,
217 }
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use crate::typed::Subscriber;
225 use futures::{compat::Future01CompatExt, executor, FutureExt};
226 use futures::{stream, StreamExt, TryStreamExt};
227
228 use crate::core::futures::sink::Sink as Sink01;
229 use crate::core::futures::stream::Stream as Stream01;
230
231 lazy_static::lazy_static! {
236 static ref EXECUTOR: executor::ThreadPool = executor::ThreadPool::new()
237 .expect("Failed to create thread pool executor for tests");
238 }
239
240 pub struct TestTaskExecutor;
241 type Boxed01Future01 = Box<dyn future01::Future<Item = (), Error = ()> + Send + 'static>;
242
243 impl future01::Executor<Boxed01Future01> for TestTaskExecutor {
244 fn execute(&self, future: Boxed01Future01) -> std::result::Result<(), future01::ExecuteError<Boxed01Future01>> {
245 EXECUTOR.spawn_ok(future.compat().map(drop));
246 Ok(())
247 }
248 }
249
250 #[test]
251 fn making_a_numeric_id_provider_works() {
252 let provider = NumericIdProvider::new();
253 let expected_id = 1;
254 let actual_id = provider.next_id();
255
256 assert_eq!(actual_id, expected_id);
257 }
258
259 #[test]
260 fn default_numeric_id_provider_works() {
261 let provider: NumericIdProvider = Default::default();
262 let expected_id = 1;
263 let actual_id = provider.next_id();
264
265 assert_eq!(actual_id, expected_id);
266 }
267
268 #[test]
269 fn numeric_id_provider_with_id_works() {
270 let provider = NumericIdProvider::with_id(AtomicUsize::new(5));
271 let expected_id = 5;
272 let actual_id = provider.next_id();
273
274 assert_eq!(actual_id, expected_id);
275 }
276
277 #[test]
278 fn random_string_provider_returns_id_with_correct_default_len() {
279 let provider = RandomStringIdProvider::new();
280 let expected_len = 16;
281 let actual_len = provider.next_id().len();
282
283 assert_eq!(actual_len, expected_len);
284 }
285
286 #[test]
287 fn random_string_provider_returns_id_with_correct_user_given_len() {
288 let expected_len = 10;
289 let provider = RandomStringIdProvider::with_len(expected_len);
290 let actual_len = provider.next_id().len();
291
292 assert_eq!(actual_len, expected_len);
293 }
294
295 #[test]
296 fn new_subscription_manager_defaults_to_random_string_provider() {
297 let manager = SubscriptionManager::new(Arc::new(TestTaskExecutor));
298 let subscriber = Subscriber::<u64>::new_test("test_subTest").0;
299 let stream = stream::iter(vec![Ok(1)]).compat();
300
301 let id = manager.add(subscriber, |sink| {
302 let stream = stream.map(|res| Ok(res));
303
304 sink.sink_map_err(|_| ()).send_all(stream).map(|_| ())
305 });
306
307 assert!(matches!(id, SubscriptionId::String(_)))
308 }
309
310 #[test]
311 fn new_subscription_manager_works_with_numeric_id_provider() {
312 let id_provider = NumericIdProvider::default();
313 let manager = SubscriptionManager::with_id_provider(id_provider, Arc::new(TestTaskExecutor));
314
315 let subscriber = Subscriber::<u64>::new_test("test_subTest").0;
316 let stream = stream::iter(vec![Ok(1)]).compat();
317
318 let id = manager.add(subscriber, |sink| {
319 let stream = stream.map(|res| Ok(res));
320
321 sink.sink_map_err(|_| ()).send_all(stream).map(|_| ())
322 });
323
324 assert!(matches!(id, SubscriptionId::Number(_)))
325 }
326
327 #[test]
328 fn new_subscription_manager_works_with_random_string_provider() {
329 let id_provider = RandomStringIdProvider::default();
330 let manager = SubscriptionManager::with_id_provider(id_provider, Arc::new(TestTaskExecutor));
331
332 let subscriber = Subscriber::<u64>::new_test("test_subTest").0;
333 let stream = stream::iter(vec![Ok(1)]).compat();
334
335 let id = manager.add(subscriber, |sink| {
336 let stream = stream.map(|res| Ok(res));
337
338 sink.sink_map_err(|_| ()).send_all(stream).map(|_| ())
339 });
340
341 assert!(matches!(id, SubscriptionId::String(_)))
342 }
343
344 #[test]
345 fn subscription_is_canceled_if_it_existed() {
346 let manager = SubscriptionManager::<NumericIdProvider>::with_executor(Arc::new(TestTaskExecutor));
347 let (subscriber, _recv, _) = Subscriber::<u64>::new_test("test_subTest");
350
351 let (mut tx, rx) = futures::channel::mpsc::channel(8);
352 tx.start_send(1).unwrap();
353 let stream = rx.map(|v| Ok::<_, ()>(v)).compat();
354
355 let id = manager.add(subscriber, |sink| {
356 let stream = stream.map(|res| Ok(res));
357
358 sink.sink_map_err(|_| ()).send_all(stream).map(|_| ())
359 });
360
361 let is_cancelled = manager.cancel(id);
362 assert!(is_cancelled);
363 }
364
365 #[test]
366 fn subscription_is_not_canceled_because_it_didnt_exist() {
367 let manager = SubscriptionManager::new(Arc::new(TestTaskExecutor));
368
369 let id: SubscriptionId = 23u32.into();
370 let is_cancelled = manager.cancel(id);
371 let is_not_cancelled = !is_cancelled;
372
373 assert!(is_not_cancelled);
374 }
375}