tetsy_jsonrpc_pubsub/
handler.rs

1use crate::core;
2use crate::core::futures::{Future, IntoFuture};
3
4use crate::subscription::{new_subscription, Subscriber};
5use crate::types::{PubSubMetadata, SubscriptionId};
6
7/// Subscribe handler
8pub trait SubscribeRpcMethod<M: PubSubMetadata>: Send + Sync + 'static {
9	/// Called when client is requesting new subscription to be started.
10	fn call(&self, params: core::Params, meta: M, subscriber: Subscriber);
11}
12
13impl<M, F> SubscribeRpcMethod<M> for F
14where
15	F: Fn(core::Params, M, Subscriber) + Send + Sync + 'static,
16	M: PubSubMetadata,
17{
18	fn call(&self, params: core::Params, meta: M, subscriber: Subscriber) {
19		(*self)(params, meta, subscriber)
20	}
21}
22
23/// Unsubscribe handler
24pub trait UnsubscribeRpcMethod<M>: Send + Sync + 'static {
25	/// Output type
26	type Out: Future<Item = core::Value, Error = core::Error> + Send + 'static;
27	/// Called when client is requesting to cancel existing subscription.
28	///
29	/// Metadata is not available if the session was closed without unsubscribing.
30	fn call(&self, id: SubscriptionId, meta: Option<M>) -> Self::Out;
31}
32
33impl<M, F, I> UnsubscribeRpcMethod<M> for F
34where
35	F: Fn(SubscriptionId, Option<M>) -> I + Send + Sync + 'static,
36	I: IntoFuture<Item = core::Value, Error = core::Error>,
37	I::Future: Send + 'static,
38{
39	type Out = I::Future;
40	fn call(&self, id: SubscriptionId, meta: Option<M>) -> Self::Out {
41		(*self)(id, meta).into_future()
42	}
43}
44
45/// Publish-Subscribe extension of `IoHandler`.
46pub struct PubSubHandler<T: PubSubMetadata, S: core::Middleware<T> = core::middleware::Noop> {
47	handler: core::MetaIoHandler<T, S>,
48}
49
50impl<T: PubSubMetadata> Default for PubSubHandler<T> {
51	fn default() -> Self {
52		PubSubHandler {
53			handler: Default::default(),
54		}
55	}
56}
57
58impl<T: PubSubMetadata, S: core::Middleware<T>> PubSubHandler<T, S> {
59	/// Creates new `PubSubHandler`
60	pub fn new(handler: core::MetaIoHandler<T, S>) -> Self {
61		PubSubHandler { handler }
62	}
63
64	/// Adds new subscription.
65	pub fn add_subscription<F, G>(&mut self, notification: &str, subscribe: (&str, F), unsubscribe: (&str, G))
66	where
67		F: SubscribeRpcMethod<T>,
68		G: UnsubscribeRpcMethod<T>,
69	{
70		let (sub, unsub) = new_subscription(notification, subscribe.1, unsubscribe.1);
71		self.handler.add_method_with_meta(subscribe.0, sub);
72		self.handler.add_method_with_meta(unsubscribe.0, unsub);
73	}
74}
75
76impl<T: PubSubMetadata, S: core::Middleware<T>> ::std::ops::Deref for PubSubHandler<T, S> {
77	type Target = core::MetaIoHandler<T, S>;
78
79	fn deref(&self) -> &Self::Target {
80		&self.handler
81	}
82}
83
84impl<T: PubSubMetadata, S: core::Middleware<T>> ::std::ops::DerefMut for PubSubHandler<T, S> {
85	fn deref_mut(&mut self) -> &mut Self::Target {
86		&mut self.handler
87	}
88}
89
90impl<T: PubSubMetadata, S: core::Middleware<T>> Into<core::MetaIoHandler<T, S>> for PubSubHandler<T, S> {
91	fn into(self) -> core::MetaIoHandler<T, S> {
92		self.handler
93	}
94}
95
96#[cfg(test)]
97mod tests {
98	use std::sync::atomic::{AtomicBool, Ordering};
99	use std::sync::Arc;
100
101	use crate::core;
102	use crate::core::futures::future;
103	use crate::core::futures::sync::mpsc;
104	use crate::subscription::{Session, Subscriber};
105	use crate::types::{PubSubMetadata, SubscriptionId};
106
107	use super::PubSubHandler;
108
109	#[derive(Clone)]
110	struct Metadata(Arc<Session>);
111	impl core::Metadata for Metadata {}
112	impl PubSubMetadata for Metadata {
113		fn session(&self) -> Option<Arc<Session>> {
114			Some(self.0.clone())
115		}
116	}
117
118	#[test]
119	fn should_handle_subscription() {
120		// given
121		let mut handler = PubSubHandler::default();
122		let called = Arc::new(AtomicBool::new(false));
123		let called2 = called.clone();
124		handler.add_subscription(
125			"hello",
126			("subscribe_hello", |params, _meta, subscriber: Subscriber| {
127				assert_eq!(params, core::Params::None);
128				let _sink = subscriber.assign_id(SubscriptionId::Number(5));
129			}),
130			("unsubscribe_hello", move |id, _meta| {
131				// Should be called because session is dropped.
132				called2.store(true, Ordering::SeqCst);
133				assert_eq!(id, SubscriptionId::Number(5));
134				future::ok(core::Value::Bool(true))
135			}),
136		);
137
138		// when
139		let (tx, _rx) = mpsc::channel(1);
140		let meta = Metadata(Arc::new(Session::new(tx)));
141		let req = r#"{"jsonrpc":"2.0","id":1,"method":"subscribe_hello","params":null}"#;
142		let res = handler.handle_request_sync(req, meta);
143
144		// then
145		let response = r#"{"jsonrpc":"2.0","result":5,"id":1}"#;
146		assert_eq!(res, Some(response.into()));
147		assert_eq!(called.load(Ordering::SeqCst), true);
148	}
149}