rs_jsonrpc_pubsub/
subscription.rs

1//! Subscription primitives.
2
3use std::fmt;
4use std::collections::HashMap;
5use std::sync::Arc;
6use parking_lot::Mutex;
7
8use core::{self, BoxFuture};
9use core::futures::{self, future, Sink as FuturesSink, Future};
10use core::futures::sync::oneshot;
11
12use handler::{SubscribeRpcMethod, UnsubscribeRpcMethod};
13use types::{PubSubMetadata, SubscriptionId, TransportSender, TransportError, SinkResult};
14
15/// RPC client session
16/// Keeps track of active subscriptions and unsubscribes from them upon dropping.
17pub struct Session {
18	active_subscriptions: Mutex<HashMap<(SubscriptionId, String), Box<Fn(SubscriptionId) + Send + 'static>>>,
19	transport: TransportSender,
20	on_drop: Mutex<Vec<Box<Fn() + Send>>>,
21}
22
23impl fmt::Debug for Session {
24	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
25		fmt.debug_struct("pubsub::Session")
26			.field("active_subscriptions", &self.active_subscriptions.lock().len())
27			.field("transport", &self.transport)
28			.finish()
29	}
30}
31
32impl Session {
33	/// Creates new session given transport raw send capabilities.
34	/// Session should be created as part of metadata, `sender` should be returned by transport.
35	pub fn new(sender: TransportSender) -> Self {
36		Session {
37			active_subscriptions: Default::default(),
38			transport: sender,
39			on_drop: Default::default(),
40		}
41	}
42
43	/// Returns transport write stream
44	pub fn sender(&self) -> TransportSender {
45		self.transport.clone()
46	}
47
48	/// Adds a function to call when session is dropped.
49	pub fn on_drop(&self, on_drop: Box<Fn() + Send>) {
50		self.on_drop.lock().push(on_drop);
51	}
52
53	/// Adds new active subscription
54	fn add_subscription<F>(&self, name: &str, id: &SubscriptionId, remove: F) where
55		F: Fn(SubscriptionId) + Send + 'static,
56	{
57		let ret = self.active_subscriptions.lock().insert((id.clone(), name.into()), Box::new(remove));
58		if let Some(remove) = ret {
59			warn!("SubscriptionId collision. Unsubscribing previous client.");
60			remove(id.clone());
61		}
62	}
63
64	/// Removes existing subscription.
65	fn remove_subscription(&self, name: &str, id: &SubscriptionId) {
66		self.active_subscriptions.lock().remove(&(id.clone(), name.into()));
67	}
68}
69
70impl Drop for Session {
71	fn drop(&mut self) {
72		let mut active = self.active_subscriptions.lock();
73		for (id, remove) in active.drain() {
74			remove(id.0)
75		}
76
77		let mut on_drop = self.on_drop.lock();
78		for on_drop in on_drop.drain(..) {
79			on_drop();
80		}
81	}
82}
83
84/// A handle to send notifications directly to subscribed client.
85#[derive(Debug, Clone)]
86pub struct Sink {
87	notification: String,
88	transport: TransportSender,
89}
90
91impl Sink {
92	/// Sends a notification to a client.
93	pub fn notify(&self, val: core::Params) -> SinkResult {
94		let val = self.params_to_string(val);
95		self.transport.clone().send(val.0)
96	}
97
98	fn params_to_string(&self, val: core::Params) -> (String, core::Params) {
99		let notification = core::Notification {
100			jsonrpc: Some(core::Version::V2),
101			method: self.notification.clone(),
102			params: Some(val),
103		};
104		(
105			core::to_string(&notification).expect("Notification serialization never fails."),
106			notification.params.expect("Always Some"),
107		)
108	}
109}
110
111impl FuturesSink for Sink {
112	type SinkItem = core::Params;
113	type SinkError = TransportError;
114
115	fn start_send(&mut self, item: Self::SinkItem) -> futures::StartSend<Self::SinkItem, Self::SinkError> {
116		let (val, params) = self.params_to_string(item);
117		self.transport.start_send(val).map(|result| match result {
118			futures::AsyncSink::Ready => futures::AsyncSink::Ready,
119			futures::AsyncSink::NotReady(_) => futures::AsyncSink::NotReady(params),
120		})
121	}
122
123	fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
124		self.transport.poll_complete()
125	}
126
127	fn close(&mut self) -> futures::Poll<(), Self::SinkError> {
128		self.transport.close()
129	}
130}
131
132/// Represents a subscribing client.
133/// Subscription handlers can either reject this subscription request or assign an unique id.
134#[derive(Debug)]
135pub struct Subscriber {
136	notification: String,
137	transport: TransportSender,
138	sender: oneshot::Sender<Result<SubscriptionId, core::Error>>,
139}
140
141impl Subscriber {
142	/// Consumes `Subscriber` and assigns unique id to a requestor.
143	/// Returns `Err` if request has already terminated.
144	pub fn assign_id(self, id: SubscriptionId) -> Result<Sink, ()> {
145		self.sender.send(Ok(id)).map_err(|_| ())?;
146
147		Ok(Sink {
148			notification: self.notification,
149			transport: self.transport,
150		})
151	}
152
153	/// Rejects this subscription request with given error.
154	/// Returns `Err` if request has already terminated.
155	pub fn reject(self, error: core::Error) -> Result<(), ()> {
156		self.sender.send(Err(error)).map_err(|_| ())?;
157		Ok(())
158	}
159}
160
161
162/// Creates new subscribe and unsubscribe RPC methods
163pub fn new_subscription<M, F, G>(notification: &str, subscribe: F, unsubscribe: G) -> (Subscribe<F, G>, Unsubscribe<G>) where
164	M: PubSubMetadata,
165	F: SubscribeRpcMethod<M>,
166	G: UnsubscribeRpcMethod,
167{
168	let unsubscribe = Arc::new(unsubscribe);
169	let subscribe = Subscribe {
170		notification: notification.to_owned(),
171		subscribe: subscribe,
172		unsubscribe: unsubscribe.clone(),
173	};
174
175	let unsubscribe = Unsubscribe {
176		notification: notification.into(),
177		unsubscribe: unsubscribe,
178	};
179
180	(subscribe, unsubscribe)
181}
182
183fn subscription_rejected() -> core::Error {
184	core::Error {
185		code: core::ErrorCode::ServerError(-32091),
186		message: "Subscription rejected".into(),
187		data: None,
188	}
189}
190
191fn subscriptions_unavailable() -> core::Error {
192	core::Error {
193		code: core::ErrorCode::ServerError(-32090),
194		message: "Subscriptions are not available on this transport.".into(),
195		data: None,
196	}
197}
198
199/// Subscribe RPC implementation.
200pub struct Subscribe<F, G> {
201	notification: String,
202	subscribe: F,
203	unsubscribe: Arc<G>,
204}
205
206impl<M, F, G> core::RpcMethod<M> for Subscribe<F, G> where
207	M: PubSubMetadata,
208	F: SubscribeRpcMethod<M>,
209	G: UnsubscribeRpcMethod,
210{
211	fn call(&self, params: core::Params, meta: M) -> BoxFuture<core::Value> {
212		match meta.session() {
213			Some(session) => {
214				let (tx, rx) = oneshot::channel();
215
216				// Register the subscription
217				let subscriber = Subscriber {
218					notification: self.notification.clone(),
219					transport: session.sender(),
220					sender: tx,
221				};
222				self.subscribe.call(params, meta, subscriber);
223
224				let unsub = self.unsubscribe.clone();
225				let notification = self.notification.clone();
226				let subscribe_future = rx
227					.map_err(|_| subscription_rejected())
228					.and_then(move |result| {
229						futures::done(match result {
230							Ok(id) => {
231								session.add_subscription(&notification, &id, move |id| {
232									let _ = unsub.call(id).wait();
233								});
234								Ok(id.into())
235							},
236							Err(e) => Err(e),
237						})
238					});
239				Box::new(subscribe_future)
240			},
241			None => Box::new(future::err(subscriptions_unavailable())),
242		}
243	}
244}
245
246/// Unsubscribe RPC implementation.
247pub struct Unsubscribe<G> {
248	notification: String,
249	unsubscribe: Arc<G>,
250}
251
252impl<M, G> core::RpcMethod<M> for Unsubscribe<G> where
253	M: PubSubMetadata,
254	G: UnsubscribeRpcMethod,
255{
256	fn call(&self, params: core::Params, meta: M) -> BoxFuture<core::Value> {
257		let id = match params {
258			core::Params::Array(ref vec) if vec.len() == 1 => {
259				SubscriptionId::parse_value(&vec[0])
260			},
261			_ => None,
262		};
263		match (meta.session(), id) {
264			(Some(session), Some(id)) => {
265				session.remove_subscription(&self.notification, &id);
266				Box::new(self.unsubscribe.call(id))
267			},
268			(Some(_), None) => Box::new(future::err(core::Error::invalid_params("Expected subscription id."))),
269			_ => Box::new(future::err(subscriptions_unavailable())),
270		}
271	}
272}
273
274#[cfg(test)]
275mod tests {
276	use std::sync::Arc;
277	use std::sync::atomic::{AtomicBool, Ordering};
278	use core;
279	use core::RpcMethod;
280	use core::futures::{Async, Future, Stream};
281	use core::futures::sync::{mpsc, oneshot};
282	use types::{SubscriptionId, PubSubMetadata};
283
284	use super::{Session, Sink, Subscriber, new_subscription};
285
286	fn session() -> (Session, mpsc::Receiver<String>) {
287		let (tx, rx) = mpsc::channel(1);
288		(Session::new(tx), rx)
289	}
290
291	#[test]
292	fn should_unregister_on_drop() {
293		// given
294		let id = SubscriptionId::Number(1);
295		let called = Arc::new(AtomicBool::new(false));
296		let called2 = called.clone();
297		let session = session().0;
298		session.add_subscription("test", &id, move |id| {
299			assert_eq!(id, SubscriptionId::Number(1));
300			called2.store(true, Ordering::SeqCst);
301		});
302
303		// when
304		drop(session);
305
306		// then
307		assert_eq!(called.load(Ordering::SeqCst), true);
308	}
309
310	#[test]
311	fn should_remove_subscription() {
312		// given
313		let id = SubscriptionId::Number(1);
314		let called = Arc::new(AtomicBool::new(false));
315		let called2 = called.clone();
316		let session = session().0;
317		session.add_subscription("test", &id, move |id| {
318			assert_eq!(id, SubscriptionId::Number(1));
319			called2.store(true, Ordering::SeqCst);
320		});
321
322		// when
323		session.remove_subscription("test", &id);
324		drop(session);
325
326		// then
327		assert_eq!(called.load(Ordering::SeqCst), false);
328	}
329
330	#[test]
331	fn should_unregister_in_case_of_collision() {
332		// given
333		let id = SubscriptionId::Number(1);
334		let called = Arc::new(AtomicBool::new(false));
335		let called2 = called.clone();
336		let session = session().0;
337		session.add_subscription("test", &id, move |id| {
338			assert_eq!(id, SubscriptionId::Number(1));
339			called2.store(true, Ordering::SeqCst);
340		});
341
342		// when
343		session.add_subscription("test", &id, |_| {});
344
345		// then
346		assert_eq!(called.load(Ordering::SeqCst), true);
347	}
348
349	#[test]
350	fn should_send_notification_to_the_transport() {
351		// given
352		let (tx, mut rx) = mpsc::channel(1);
353		let sink = Sink {
354			notification: "test".into(),
355			transport: tx,
356		};
357
358		// when
359		sink.notify(core::Params::Array(vec![core::Value::Number(10.into())])).wait().unwrap();
360
361		// then
362		assert_eq!(
363			rx.poll().unwrap(),
364			Async::Ready(Some(r#"{"jsonrpc":"2.0","method":"test","params":[10]}"#.into()))
365		);
366	}
367
368	#[test]
369	fn should_assign_id() {
370		// given
371		let (transport, _) = mpsc::channel(1);
372		let (tx, mut rx) = oneshot::channel();
373		let subscriber = Subscriber {
374			notification: "test".into(),
375			transport: transport,
376			sender: tx,
377		};
378
379		// when
380		let sink = subscriber.assign_id(SubscriptionId::Number(5)).unwrap();
381
382		// then
383		assert_eq!(
384			rx.poll().unwrap(),
385			Async::Ready(Ok(SubscriptionId::Number(5)))
386		);
387		assert_eq!(sink.notification, "test".to_owned());
388	}
389
390	#[test]
391	fn should_reject() {
392		// given
393		let (transport, _) = mpsc::channel(1);
394		let (tx, mut rx) = oneshot::channel();
395		let subscriber = Subscriber {
396			notification: "test".into(),
397			transport: transport,
398			sender: tx,
399		};
400		let error = core::Error {
401			code: core::ErrorCode::InvalidRequest,
402			message: "Cannot start subscription now.".into(),
403			data: None,
404		};
405
406		// when
407		subscriber.reject(error.clone()).unwrap();
408
409		// then
410		assert_eq!(
411			rx.poll().unwrap(),
412			Async::Ready(Err(error))
413		);
414	}
415
416	#[derive(Clone, Default)]
417	struct Metadata;
418	impl core::Metadata for Metadata {}
419	impl PubSubMetadata for Metadata {
420		fn session(&self) -> Option<Arc<Session>> {
421			Some(Arc::new(session().0))
422		}
423	}
424
425	#[test]
426	fn should_subscribe() {
427		// given
428		let called = Arc::new(AtomicBool::new(false));
429		let called2 = called.clone();
430		let (subscribe, _) = new_subscription(
431			"test".into(),
432			move |params, _meta, _subscriber| {
433				assert_eq!(params, core::Params::None);
434				called2.store(true, Ordering::SeqCst);
435			},
436			|_id| Ok(core::Value::Bool(true)),
437		);
438		let meta = Metadata;
439
440		// when
441		let result = subscribe.call(core::Params::None, meta);
442
443		// then
444		assert_eq!(called.load(Ordering::SeqCst), true);
445		assert_eq!(result.wait(), Err(core::Error {
446			code: core::ErrorCode::ServerError(-32091),
447			message: "Subscription rejected".into(),
448			data: None,
449		}));
450	}
451}