bitconch_jsonrpc_pubsub/
handler.rs

1use core;
2use core::futures::{Future, IntoFuture};
3
4use types::{PubSubMetadata, SubscriptionId};
5use subscription::{Subscriber, new_subscription};
6
7/// Subscribe handler
8pub trait SubscribeRpcMethod<M: PubSubMetadata>: Send + Sync + 'static {
9	/// Called when client is requesting new subscription to be started.
10	fn call(&self, params: core::Params, meta: M, subscriber: Subscriber);
11}
12
13impl<M, F> SubscribeRpcMethod<M> for F where
14	F: Fn(core::Params, M, Subscriber) + Send + Sync + 'static,
15	M: PubSubMetadata,
16{
17	fn call(&self, params: core::Params, meta: M, subscriber: Subscriber) {
18		(*self)(params, meta, subscriber)
19	}
20}
21
22/// Unsubscribe handler
23pub trait UnsubscribeRpcMethod<M: PubSubMetadata>: Send + Sync + 'static {
24	/// Output type
25	type Out: Future<Item = core::Value, Error = core::Error> + Send + 'static;
26	/// Called when client is requesting to cancel existing subscription.
27	fn call(&self, meta: M, id: SubscriptionId) -> Self::Out;
28}
29
30impl<F, I, M> UnsubscribeRpcMethod<M> for F where
31	F: Fn(SubscriptionId) -> I + Send + Sync + 'static,
32	I: IntoFuture<Item = core::Value, Error = core::Error>,
33	I::Future: Send + 'static,
34	M: PubSubMetadata,
35{
36	type Out = I::Future;
37	fn call(&self, _meta: M, id: SubscriptionId) -> Self::Out {
38		(*self)(id).into_future()
39	}
40}
41
42/// Publish-Subscribe extension of `IoHandler`.
43pub struct PubSubHandler<T: PubSubMetadata, S: core::Middleware<T> = core::NoopMiddleware> {
44	handler: core::MetaIoHandler<T, S>,
45}
46
47impl<T: PubSubMetadata> Default for PubSubHandler<T> {
48	fn default() -> Self {
49		PubSubHandler {
50			handler: Default::default(),
51		}
52	}
53}
54
55impl<T: PubSubMetadata, S: core::Middleware<T>> PubSubHandler<T, S> {
56	/// Creates new `PubSubHandler`
57	pub fn new(handler: core::MetaIoHandler<T, S>) -> Self {
58		PubSubHandler {
59			handler: handler,
60		}
61	}
62
63	/// Adds new subscription.
64	pub fn add_subscription<F, G>(
65		&mut self,
66		notification: &str,
67		subscribe: (&str, F),
68		unsubscribe: (&str, G),
69	) where
70		F: SubscribeRpcMethod<T>,
71		G: UnsubscribeRpcMethod<T>,
72	{
73		let (sub, unsub) = new_subscription(notification, subscribe.1, unsubscribe.1);
74		self.handler.add_method_with_meta(subscribe.0, sub);
75		self.handler.add_method_with_meta(unsubscribe.0, unsub);
76	}
77}
78
79impl<T: PubSubMetadata, S: core::Middleware<T>> ::std::ops::Deref for PubSubHandler<T, S> {
80	type Target = core::MetaIoHandler<T, S>;
81
82	fn deref(&self) -> &Self::Target {
83		&self.handler
84	}
85}
86
87impl<T: PubSubMetadata, S: core::Middleware<T>> ::std::ops::DerefMut for PubSubHandler<T, S> {
88	fn deref_mut(&mut self) -> &mut Self::Target {
89		&mut self.handler
90	}
91}
92
93impl<T: PubSubMetadata, S: core::Middleware<T>> Into<core::MetaIoHandler<T, S>> for PubSubHandler<T, S> {
94	fn into(self) -> core::MetaIoHandler<T, S> {
95		self.handler
96	}
97}
98
99#[cfg(test)]
100mod tests {
101	use std::sync::Arc;
102	use std::sync::atomic::{AtomicBool, Ordering};
103
104	use core;
105	use core::futures::future;
106	use core::futures::sync::mpsc;
107	use subscription::{Session, Subscriber};
108	use types::{PubSubMetadata, SubscriptionId};
109
110	use super::PubSubHandler;
111
112	#[derive(Clone, Default)]
113	struct Metadata;
114	impl core::Metadata for Metadata {}
115	impl PubSubMetadata for Metadata {
116		fn session(&self) -> Option<Arc<Session>> {
117			let (tx, _rx) = mpsc::channel(1);
118			Some(Arc::new(Session::new(tx)))
119		}
120	}
121
122	#[test]
123	fn should_handle_subscription() {
124		// given
125		let mut handler = PubSubHandler::default();
126		let called = Arc::new(AtomicBool::new(false));
127		let called2 = called.clone();
128		handler.add_subscription(
129			"hello",
130			("subscribe_hello", |params, _meta, subscriber: Subscriber| {
131				assert_eq!(params, core::Params::None);
132				let _sink = subscriber.assign_id(SubscriptionId::Number(5));
133			}),
134			("unsubscribe_hello", move |id| {
135				// Should be called because session is dropped.
136				called2.store(true, Ordering::SeqCst);
137				assert_eq!(id, SubscriptionId::Number(5));
138				future::ok(core::Value::Bool(true))
139			}),
140		);
141
142		// when
143		let meta = Metadata;
144		let req = r#"{"jsonrpc":"2.0","id":1,"method":"subscribe_hello","params":null}"#;
145		let res = handler.handle_request_sync(req, meta);
146
147		// then
148		let response = r#"{"jsonrpc":"2.0","result":5,"id":1}"#;
149		assert_eq!(res, Some(response.into()));
150		assert_eq!(called.load(Ordering::SeqCst), true);
151	}
152
153}