rs_jsonrpc_pubsub/
handler.rs1use core;
2use core::futures::{Future, IntoFuture};
3
4use types::{PubSubMetadata, SubscriptionId};
5use subscription::{Subscriber, new_subscription};
6
7pub trait SubscribeRpcMethod<M: PubSubMetadata>: Send + Sync + 'static {
9 fn call(&self, params: core::Params, meta: M, subscriber: Subscriber);
11}
12
13impl<M, F> SubscribeRpcMethod<M> for F where
14 F: Fn(core::Params, M, Subscriber) + Send + Sync + 'static,
15 M: PubSubMetadata,
16{
17 fn call(&self, params: core::Params, meta: M, subscriber: Subscriber) {
18 (*self)(params, meta, subscriber)
19 }
20}
21
22pub trait UnsubscribeRpcMethod: Send + Sync + 'static {
24 type Out: Future<Item = core::Value, Error = core::Error> + Send + 'static;
26 fn call(&self, id: SubscriptionId) -> Self::Out;
28}
29
30impl<F, I> UnsubscribeRpcMethod for F where
31 F: Fn(SubscriptionId) -> I + Send + Sync + 'static,
32 I: IntoFuture<Item = core::Value, Error = core::Error>,
33 I::Future: Send + 'static,
34{
35 type Out = I::Future;
36 fn call(&self, id: SubscriptionId) -> Self::Out {
37 (*self)(id).into_future()
38 }
39}
40
41pub struct PubSubHandler<T: PubSubMetadata, S: core::Middleware<T> = core::NoopMiddleware> {
43 handler: core::MetaIoHandler<T, S>,
44}
45
46impl<T: PubSubMetadata> Default for PubSubHandler<T> {
47 fn default() -> Self {
48 PubSubHandler {
49 handler: Default::default(),
50 }
51 }
52}
53
54impl<T: PubSubMetadata, S: core::Middleware<T>> PubSubHandler<T, S> {
55 pub fn new(handler: core::MetaIoHandler<T, S>) -> Self {
57 PubSubHandler {
58 handler: handler,
59 }
60 }
61
62 pub fn add_subscription<F, G>(
64 &mut self,
65 notification: &str,
66 subscribe: (&str, F),
67 unsubscribe: (&str, G),
68 ) where
69 F: SubscribeRpcMethod<T>,
70 G: UnsubscribeRpcMethod,
71 {
72 let (sub, unsub) = new_subscription(notification, subscribe.1, unsubscribe.1);
73 self.handler.add_method_with_meta(subscribe.0, sub);
74 self.handler.add_method_with_meta(unsubscribe.0, unsub);
75 }
76}
77
78impl<T: PubSubMetadata, S: core::Middleware<T>> ::std::ops::Deref for PubSubHandler<T, S> {
79 type Target = core::MetaIoHandler<T, S>;
80
81 fn deref(&self) -> &Self::Target {
82 &self.handler
83 }
84}
85
86impl<T: PubSubMetadata, S: core::Middleware<T>> ::std::ops::DerefMut for PubSubHandler<T, S> {
87 fn deref_mut(&mut self) -> &mut Self::Target {
88 &mut self.handler
89 }
90}
91
92impl<T: PubSubMetadata, S: core::Middleware<T>> Into<core::MetaIoHandler<T, S>> for PubSubHandler<T, S> {
93 fn into(self) -> core::MetaIoHandler<T, S> {
94 self.handler
95 }
96}
97
98#[cfg(test)]
99mod tests {
100 use std::sync::Arc;
101 use std::sync::atomic::{AtomicBool, Ordering};
102
103 use core;
104 use core::futures::future;
105 use core::futures::sync::mpsc;
106 use subscription::{Session, Subscriber};
107 use types::{PubSubMetadata, SubscriptionId};
108
109 use super::PubSubHandler;
110
111 #[derive(Clone, Default)]
112 struct Metadata;
113 impl core::Metadata for Metadata {}
114 impl PubSubMetadata for Metadata {
115 fn session(&self) -> Option<Arc<Session>> {
116 let (tx, _rx) = mpsc::channel(1);
117 Some(Arc::new(Session::new(tx)))
118 }
119 }
120
121 #[test]
122 fn should_handle_subscription() {
123 let mut handler = PubSubHandler::default();
125 let called = Arc::new(AtomicBool::new(false));
126 let called2 = called.clone();
127 handler.add_subscription(
128 "hello",
129 ("subscribe_hello", |params, _meta, subscriber: Subscriber| {
130 assert_eq!(params, core::Params::None);
131 let _sink = subscriber.assign_id(SubscriptionId::Number(5));
132 }),
133 ("unsubscribe_hello", move |id| {
134 called2.store(true, Ordering::SeqCst);
136 assert_eq!(id, SubscriptionId::Number(5));
137 future::ok(core::Value::Bool(true))
138 }),
139 );
140
141 let meta = Metadata;
143 let req = r#"{"jsonrpc":"2.0","id":1,"method":"subscribe_hello","params":null}"#;
144 let res = handler.handle_request_sync(req, meta);
145
146 let response = r#"{"jsonrpc":"2.0","result":5,"id":1}"#;
148 assert_eq!(res, Some(response.into()));
149 assert_eq!(called.load(Ordering::SeqCst), true);
150 }
151
152}