bitconch_jsonrpc_ws_server/
metadata.rs1use std::fmt;
2use std::sync::{atomic, Arc};
3
4use core::{self, futures};
5use core::futures::sync::mpsc;
6use server_utils::{session, tokio::runtime::TaskExecutor};
7use ws;
8
9use error;
10use {Origin};
11
12#[derive(Clone)]
14pub struct Sender {
15 out: ws::Sender,
16 active: Arc<atomic::AtomicBool>,
17}
18
19impl Sender {
20 pub fn new(out: ws::Sender, active: Arc<atomic::AtomicBool>) -> Self {
22 Sender {
23 out: out,
24 active: active,
25 }
26 }
27
28 pub fn check_active(&self) -> error::Result<()> {
30 if self.active.load(atomic::Ordering::SeqCst) {
31 Ok(())
32 } else {
33 bail!(error::ErrorKind::ConnectionClosed)
34 }
35 }
36
37 pub fn send<M>(&self, msg: M) -> error::Result<()>
40 where M: Into<ws::Message>
41 {
42 self.check_active()?;
43 self.out.send(msg)?;
44 Ok(())
45 }
46
47 pub fn broadcast<M>(&self, msg: M) -> error::Result<()> where
50 M: Into<ws::Message>
51 {
52 self.check_active()?;
53 self.out.broadcast(msg)?;
54 Ok(())
55 }
56
57 pub fn close(&self, code: ws::CloseCode) -> error::Result<()> {
60 self.check_active()?;
61 self.out.close(code)?;
62 Ok(())
63 }
64}
65
66pub struct RequestContext {
68 pub session_id: session::SessionId,
70 pub origin: Option<Origin>,
72 pub protocols: Vec<String>,
74 pub out: Sender,
76 pub executor: TaskExecutor,
78}
79
80impl RequestContext {
81 pub fn sender(&self) -> mpsc::Sender<String> {
84 let out = self.out.clone();
85 let (sender, receiver) = mpsc::channel(1);
86 self.executor.spawn(SenderFuture(out, receiver));
87 sender
88 }
89}
90
91impl fmt::Debug for RequestContext {
92 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
93 fmt.debug_struct("RequestContext")
94 .field("session_id", &self.session_id)
95 .field("origin", &self.origin)
96 .field("protocols", &self.protocols)
97 .finish()
98 }
99}
100
101pub trait MetaExtractor<M: core::Metadata>: Send + Sync + 'static {
103 fn extract(&self, _context: &RequestContext) -> M;
105}
106
107impl<M, F> MetaExtractor<M> for F where
108 M: core::Metadata,
109 F: Fn(&RequestContext) -> M + Send + Sync + 'static,
110{
111 fn extract(&self, context: &RequestContext) -> M {
112 (*self)(context)
113 }
114}
115
116#[derive(Debug, Clone)]
118pub struct NoopExtractor;
119impl<M: core::Metadata + Default> MetaExtractor<M> for NoopExtractor {
120 fn extract(&self, _context: &RequestContext) -> M { M::default() }
121}
122
123struct SenderFuture(Sender, mpsc::Receiver<String>);
124impl futures::Future for SenderFuture {
125 type Item = ();
126 type Error = ();
127
128 fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
129 use self::futures::Stream;
130
131 loop {
132 let item = self.1.poll()?;
133 match item {
134 futures::Async::NotReady => {
135 return Ok(futures::Async::NotReady);
136 },
137 futures::Async::Ready(None) => {
138 return Ok(futures::Async::Ready(()));
139 },
140 futures::Async::Ready(Some(val)) => {
141 if let Err(e) = self.0.send(val) {
142 warn!("Error sending a subscription update: {:?}", e);
143 return Ok(futures::Async::Ready(()));
144 }
145 },
146 }
147 }
148 }
149}