1use std::fmt;
4use std::collections::HashMap;
5use std::sync::Arc;
6use parking_lot::Mutex;
7
8use core::{self, BoxFuture};
9use core::futures::{self, future, Sink as FuturesSink, Future};
10use core::futures::sync::oneshot;
11
12use handler::{SubscribeRpcMethod, UnsubscribeRpcMethod};
13use types::{PubSubMetadata, SubscriptionId, TransportSender, TransportError, SinkResult};
14
15pub struct Session {
18 active_subscriptions: Mutex<HashMap<(SubscriptionId, String), Box<Fn(SubscriptionId) + Send + 'static>>>,
19 transport: TransportSender,
20 on_drop: Mutex<Vec<Box<Fn() + Send>>>,
21}
22
23impl fmt::Debug for Session {
24 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
25 fmt.debug_struct("pubsub::Session")
26 .field("active_subscriptions", &self.active_subscriptions.lock().len())
27 .field("transport", &self.transport)
28 .finish()
29 }
30}
31
32impl Session {
33 pub fn new(sender: TransportSender) -> Self {
36 Session {
37 active_subscriptions: Default::default(),
38 transport: sender,
39 on_drop: Default::default(),
40 }
41 }
42
43 pub fn sender(&self) -> TransportSender {
45 self.transport.clone()
46 }
47
48 pub fn on_drop(&self, on_drop: Box<Fn() + Send>) {
50 self.on_drop.lock().push(on_drop);
51 }
52
53 fn add_subscription<F>(&self, name: &str, id: &SubscriptionId, remove: F) where
55 F: Fn(SubscriptionId) + Send + 'static,
56 {
57 let ret = self.active_subscriptions.lock().insert((id.clone(), name.into()), Box::new(remove));
58 if let Some(remove) = ret {
59 warn!("SubscriptionId collision. Unsubscribing previous client.");
60 remove(id.clone());
61 }
62 }
63
64 fn remove_subscription(&self, name: &str, id: &SubscriptionId) {
66 self.active_subscriptions.lock().remove(&(id.clone(), name.into()));
67 }
68}
69
70impl Drop for Session {
71 fn drop(&mut self) {
72 let mut active = self.active_subscriptions.lock();
73 for (id, remove) in active.drain() {
74 remove(id.0)
75 }
76
77 let mut on_drop = self.on_drop.lock();
78 for on_drop in on_drop.drain(..) {
79 on_drop();
80 }
81 }
82}
83
84#[derive(Debug, Clone)]
86pub struct Sink {
87 notification: String,
88 transport: TransportSender,
89}
90
91impl Sink {
92 pub fn notify(&self, val: core::Params) -> SinkResult {
94 let val = self.params_to_string(val);
95 self.transport.clone().send(val.0)
96 }
97
98 fn params_to_string(&self, val: core::Params) -> (String, core::Params) {
99 let notification = core::Notification {
100 jsonrpc: Some(core::Version::V2),
101 method: self.notification.clone(),
102 params: Some(val),
103 };
104 (
105 core::to_string(¬ification).expect("Notification serialization never fails."),
106 notification.params.expect("Always Some"),
107 )
108 }
109}
110
111impl FuturesSink for Sink {
112 type SinkItem = core::Params;
113 type SinkError = TransportError;
114
115 fn start_send(&mut self, item: Self::SinkItem) -> futures::StartSend<Self::SinkItem, Self::SinkError> {
116 let (val, params) = self.params_to_string(item);
117 self.transport.start_send(val).map(|result| match result {
118 futures::AsyncSink::Ready => futures::AsyncSink::Ready,
119 futures::AsyncSink::NotReady(_) => futures::AsyncSink::NotReady(params),
120 })
121 }
122
123 fn poll_complete(&mut self) -> futures::Poll<(), Self::SinkError> {
124 self.transport.poll_complete()
125 }
126
127 fn close(&mut self) -> futures::Poll<(), Self::SinkError> {
128 self.transport.close()
129 }
130}
131
132#[derive(Debug)]
135pub struct Subscriber {
136 notification: String,
137 transport: TransportSender,
138 sender: oneshot::Sender<Result<SubscriptionId, core::Error>>,
139}
140
141impl Subscriber {
142 pub fn assign_id(self, id: SubscriptionId) -> Result<Sink, ()> {
145 self.sender.send(Ok(id)).map_err(|_| ())?;
146
147 Ok(Sink {
148 notification: self.notification,
149 transport: self.transport,
150 })
151 }
152
153 pub fn reject(self, error: core::Error) -> Result<(), ()> {
156 self.sender.send(Err(error)).map_err(|_| ())?;
157 Ok(())
158 }
159}
160
161
162pub fn new_subscription<M, F, G>(notification: &str, subscribe: F, unsubscribe: G) -> (Subscribe<F, G>, Unsubscribe<G>) where
164 M: PubSubMetadata,
165 F: SubscribeRpcMethod<M>,
166 G: UnsubscribeRpcMethod,
167{
168 let unsubscribe = Arc::new(unsubscribe);
169 let subscribe = Subscribe {
170 notification: notification.to_owned(),
171 subscribe: subscribe,
172 unsubscribe: unsubscribe.clone(),
173 };
174
175 let unsubscribe = Unsubscribe {
176 notification: notification.into(),
177 unsubscribe: unsubscribe,
178 };
179
180 (subscribe, unsubscribe)
181}
182
183fn subscription_rejected() -> core::Error {
184 core::Error {
185 code: core::ErrorCode::ServerError(-32091),
186 message: "Subscription rejected".into(),
187 data: None,
188 }
189}
190
191fn subscriptions_unavailable() -> core::Error {
192 core::Error {
193 code: core::ErrorCode::ServerError(-32090),
194 message: "Subscriptions are not available on this transport.".into(),
195 data: None,
196 }
197}
198
199pub struct Subscribe<F, G> {
201 notification: String,
202 subscribe: F,
203 unsubscribe: Arc<G>,
204}
205
206impl<M, F, G> core::RpcMethod<M> for Subscribe<F, G> where
207 M: PubSubMetadata,
208 F: SubscribeRpcMethod<M>,
209 G: UnsubscribeRpcMethod,
210{
211 fn call(&self, params: core::Params, meta: M) -> BoxFuture<core::Value> {
212 match meta.session() {
213 Some(session) => {
214 let (tx, rx) = oneshot::channel();
215
216 let subscriber = Subscriber {
218 notification: self.notification.clone(),
219 transport: session.sender(),
220 sender: tx,
221 };
222 self.subscribe.call(params, meta, subscriber);
223
224 let unsub = self.unsubscribe.clone();
225 let notification = self.notification.clone();
226 let subscribe_future = rx
227 .map_err(|_| subscription_rejected())
228 .and_then(move |result| {
229 futures::done(match result {
230 Ok(id) => {
231 session.add_subscription(¬ification, &id, move |id| {
232 let _ = unsub.call(id).wait();
233 });
234 Ok(id.into())
235 },
236 Err(e) => Err(e),
237 })
238 });
239 Box::new(subscribe_future)
240 },
241 None => Box::new(future::err(subscriptions_unavailable())),
242 }
243 }
244}
245
246pub struct Unsubscribe<G> {
248 notification: String,
249 unsubscribe: Arc<G>,
250}
251
252impl<M, G> core::RpcMethod<M> for Unsubscribe<G> where
253 M: PubSubMetadata,
254 G: UnsubscribeRpcMethod,
255{
256 fn call(&self, params: core::Params, meta: M) -> BoxFuture<core::Value> {
257 let id = match params {
258 core::Params::Array(ref vec) if vec.len() == 1 => {
259 SubscriptionId::parse_value(&vec[0])
260 },
261 _ => None,
262 };
263 match (meta.session(), id) {
264 (Some(session), Some(id)) => {
265 session.remove_subscription(&self.notification, &id);
266 Box::new(self.unsubscribe.call(id))
267 },
268 (Some(_), None) => Box::new(future::err(core::Error::invalid_params("Expected subscription id."))),
269 _ => Box::new(future::err(subscriptions_unavailable())),
270 }
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use std::sync::Arc;
277 use std::sync::atomic::{AtomicBool, Ordering};
278 use core;
279 use core::RpcMethod;
280 use core::futures::{Async, Future, Stream};
281 use core::futures::sync::{mpsc, oneshot};
282 use types::{SubscriptionId, PubSubMetadata};
283
284 use super::{Session, Sink, Subscriber, new_subscription};
285
286 fn session() -> (Session, mpsc::Receiver<String>) {
287 let (tx, rx) = mpsc::channel(1);
288 (Session::new(tx), rx)
289 }
290
291 #[test]
292 fn should_unregister_on_drop() {
293 let id = SubscriptionId::Number(1);
295 let called = Arc::new(AtomicBool::new(false));
296 let called2 = called.clone();
297 let session = session().0;
298 session.add_subscription("test", &id, move |id| {
299 assert_eq!(id, SubscriptionId::Number(1));
300 called2.store(true, Ordering::SeqCst);
301 });
302
303 drop(session);
305
306 assert_eq!(called.load(Ordering::SeqCst), true);
308 }
309
310 #[test]
311 fn should_remove_subscription() {
312 let id = SubscriptionId::Number(1);
314 let called = Arc::new(AtomicBool::new(false));
315 let called2 = called.clone();
316 let session = session().0;
317 session.add_subscription("test", &id, move |id| {
318 assert_eq!(id, SubscriptionId::Number(1));
319 called2.store(true, Ordering::SeqCst);
320 });
321
322 session.remove_subscription("test", &id);
324 drop(session);
325
326 assert_eq!(called.load(Ordering::SeqCst), false);
328 }
329
330 #[test]
331 fn should_unregister_in_case_of_collision() {
332 let id = SubscriptionId::Number(1);
334 let called = Arc::new(AtomicBool::new(false));
335 let called2 = called.clone();
336 let session = session().0;
337 session.add_subscription("test", &id, move |id| {
338 assert_eq!(id, SubscriptionId::Number(1));
339 called2.store(true, Ordering::SeqCst);
340 });
341
342 session.add_subscription("test", &id, |_| {});
344
345 assert_eq!(called.load(Ordering::SeqCst), true);
347 }
348
349 #[test]
350 fn should_send_notification_to_the_transport() {
351 let (tx, mut rx) = mpsc::channel(1);
353 let sink = Sink {
354 notification: "test".into(),
355 transport: tx,
356 };
357
358 sink.notify(core::Params::Array(vec![core::Value::Number(10.into())])).wait().unwrap();
360
361 assert_eq!(
363 rx.poll().unwrap(),
364 Async::Ready(Some(r#"{"jsonrpc":"2.0","method":"test","params":[10]}"#.into()))
365 );
366 }
367
368 #[test]
369 fn should_assign_id() {
370 let (transport, _) = mpsc::channel(1);
372 let (tx, mut rx) = oneshot::channel();
373 let subscriber = Subscriber {
374 notification: "test".into(),
375 transport: transport,
376 sender: tx,
377 };
378
379 let sink = subscriber.assign_id(SubscriptionId::Number(5)).unwrap();
381
382 assert_eq!(
384 rx.poll().unwrap(),
385 Async::Ready(Ok(SubscriptionId::Number(5)))
386 );
387 assert_eq!(sink.notification, "test".to_owned());
388 }
389
390 #[test]
391 fn should_reject() {
392 let (transport, _) = mpsc::channel(1);
394 let (tx, mut rx) = oneshot::channel();
395 let subscriber = Subscriber {
396 notification: "test".into(),
397 transport: transport,
398 sender: tx,
399 };
400 let error = core::Error {
401 code: core::ErrorCode::InvalidRequest,
402 message: "Cannot start subscription now.".into(),
403 data: None,
404 };
405
406 subscriber.reject(error.clone()).unwrap();
408
409 assert_eq!(
411 rx.poll().unwrap(),
412 Async::Ready(Err(error))
413 );
414 }
415
416 #[derive(Clone, Default)]
417 struct Metadata;
418 impl core::Metadata for Metadata {}
419 impl PubSubMetadata for Metadata {
420 fn session(&self) -> Option<Arc<Session>> {
421 Some(Arc::new(session().0))
422 }
423 }
424
425 #[test]
426 fn should_subscribe() {
427 let called = Arc::new(AtomicBool::new(false));
429 let called2 = called.clone();
430 let (subscribe, _) = new_subscription(
431 "test".into(),
432 move |params, _meta, _subscriber| {
433 assert_eq!(params, core::Params::None);
434 called2.store(true, Ordering::SeqCst);
435 },
436 |_id| Ok(core::Value::Bool(true)),
437 );
438 let meta = Metadata;
439
440 let result = subscribe.call(core::Params::None, meta);
442
443 assert_eq!(called.load(Ordering::SeqCst), true);
445 assert_eq!(result.wait(), Err(core::Error {
446 code: core::ErrorCode::ServerError(-32091),
447 message: "Subscription rejected".into(),
448 data: None,
449 }));
450 }
451}