bitconch_jsonrpc_pubsub/
handler.rs1use core;
2use core::futures::{Future, IntoFuture};
3
4use types::{PubSubMetadata, SubscriptionId};
5use subscription::{Subscriber, new_subscription};
6
7pub trait SubscribeRpcMethod<M: PubSubMetadata>: Send + Sync + 'static {
9 fn call(&self, params: core::Params, meta: M, subscriber: Subscriber);
11}
12
13impl<M, F> SubscribeRpcMethod<M> for F where
14 F: Fn(core::Params, M, Subscriber) + Send + Sync + 'static,
15 M: PubSubMetadata,
16{
17 fn call(&self, params: core::Params, meta: M, subscriber: Subscriber) {
18 (*self)(params, meta, subscriber)
19 }
20}
21
22pub trait UnsubscribeRpcMethod<M: PubSubMetadata>: Send + Sync + 'static {
24 type Out: Future<Item = core::Value, Error = core::Error> + Send + 'static;
26 fn call(&self, meta: M, id: SubscriptionId) -> Self::Out;
28}
29
30impl<F, I, M> UnsubscribeRpcMethod<M> for F where
31 F: Fn(SubscriptionId) -> I + Send + Sync + 'static,
32 I: IntoFuture<Item = core::Value, Error = core::Error>,
33 I::Future: Send + 'static,
34 M: PubSubMetadata,
35{
36 type Out = I::Future;
37 fn call(&self, _meta: M, id: SubscriptionId) -> Self::Out {
38 (*self)(id).into_future()
39 }
40}
41
42pub struct PubSubHandler<T: PubSubMetadata, S: core::Middleware<T> = core::NoopMiddleware> {
44 handler: core::MetaIoHandler<T, S>,
45}
46
47impl<T: PubSubMetadata> Default for PubSubHandler<T> {
48 fn default() -> Self {
49 PubSubHandler {
50 handler: Default::default(),
51 }
52 }
53}
54
55impl<T: PubSubMetadata, S: core::Middleware<T>> PubSubHandler<T, S> {
56 pub fn new(handler: core::MetaIoHandler<T, S>) -> Self {
58 PubSubHandler {
59 handler: handler,
60 }
61 }
62
63 pub fn add_subscription<F, G>(
65 &mut self,
66 notification: &str,
67 subscribe: (&str, F),
68 unsubscribe: (&str, G),
69 ) where
70 F: SubscribeRpcMethod<T>,
71 G: UnsubscribeRpcMethod<T>,
72 {
73 let (sub, unsub) = new_subscription(notification, subscribe.1, unsubscribe.1);
74 self.handler.add_method_with_meta(subscribe.0, sub);
75 self.handler.add_method_with_meta(unsubscribe.0, unsub);
76 }
77}
78
79impl<T: PubSubMetadata, S: core::Middleware<T>> ::std::ops::Deref for PubSubHandler<T, S> {
80 type Target = core::MetaIoHandler<T, S>;
81
82 fn deref(&self) -> &Self::Target {
83 &self.handler
84 }
85}
86
87impl<T: PubSubMetadata, S: core::Middleware<T>> ::std::ops::DerefMut for PubSubHandler<T, S> {
88 fn deref_mut(&mut self) -> &mut Self::Target {
89 &mut self.handler
90 }
91}
92
93impl<T: PubSubMetadata, S: core::Middleware<T>> Into<core::MetaIoHandler<T, S>> for PubSubHandler<T, S> {
94 fn into(self) -> core::MetaIoHandler<T, S> {
95 self.handler
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use std::sync::Arc;
102 use std::sync::atomic::{AtomicBool, Ordering};
103
104 use core;
105 use core::futures::future;
106 use core::futures::sync::mpsc;
107 use subscription::{Session, Subscriber};
108 use types::{PubSubMetadata, SubscriptionId};
109
110 use super::PubSubHandler;
111
112 #[derive(Clone, Default)]
113 struct Metadata;
114 impl core::Metadata for Metadata {}
115 impl PubSubMetadata for Metadata {
116 fn session(&self) -> Option<Arc<Session>> {
117 let (tx, _rx) = mpsc::channel(1);
118 Some(Arc::new(Session::new(tx)))
119 }
120 }
121
122 #[test]
123 fn should_handle_subscription() {
124 let mut handler = PubSubHandler::default();
126 let called = Arc::new(AtomicBool::new(false));
127 let called2 = called.clone();
128 handler.add_subscription(
129 "hello",
130 ("subscribe_hello", |params, _meta, subscriber: Subscriber| {
131 assert_eq!(params, core::Params::None);
132 let _sink = subscriber.assign_id(SubscriptionId::Number(5));
133 }),
134 ("unsubscribe_hello", move |id| {
135 called2.store(true, Ordering::SeqCst);
137 assert_eq!(id, SubscriptionId::Number(5));
138 future::ok(core::Value::Bool(true))
139 }),
140 );
141
142 let meta = Metadata;
144 let req = r#"{"jsonrpc":"2.0","id":1,"method":"subscribe_hello","params":null}"#;
145 let res = handler.handle_request_sync(req, meta);
146
147 let response = r#"{"jsonrpc":"2.0","result":5,"id":1}"#;
149 assert_eq!(res, Some(response.into()));
150 assert_eq!(called.load(Ordering::SeqCst), true);
151 }
152
153}