bitconch_jsonrpc_ws_server/
metadata.rs

1use std::fmt;
2use std::sync::{atomic, Arc};
3
4use core::{self, futures};
5use core::futures::sync::mpsc;
6use server_utils::{session, tokio::runtime::TaskExecutor};
7use ws;
8
9use error;
10use {Origin};
11
12/// Output of WebSocket connection. Use this to send messages to the other endpoint.
13#[derive(Clone)]
14pub struct Sender {
15	out: ws::Sender,
16	active: Arc<atomic::AtomicBool>,
17}
18
19impl Sender {
20	/// Creates a new `Sender`.
21	pub fn new(out: ws::Sender, active: Arc<atomic::AtomicBool>) -> Self {
22		Sender {
23			out: out,
24			active: active,
25		}
26	}
27
28	/// Checks whether connection is still active
29	pub fn check_active(&self) -> error::Result<()> {
30		if self.active.load(atomic::Ordering::SeqCst) {
31			Ok(())
32		} else {
33			bail!(error::ErrorKind::ConnectionClosed)
34		}
35	}
36
37	/// Sends a message over the connection.
38	/// Will return error if the connection is not active any more.
39	pub fn send<M>(&self, msg: M) -> error::Result<()>
40		where M: Into<ws::Message>
41	{
42		self.check_active()?;
43		self.out.send(msg)?;
44		Ok(())
45	}
46
47	/// Sends a message over the endpoints of all connections.
48	/// Will return error if the connection is not active any more.
49	pub fn broadcast<M>(&self, msg: M) -> error::Result<()> where
50		M: Into<ws::Message>
51	{
52		self.check_active()?;
53		self.out.broadcast(msg)?;
54		Ok(())
55	}
56
57	/// Sends a close code to the other endpoint.
58	/// Will return error if the connection is not active any more.
59	pub fn close(&self, code: ws::CloseCode) -> error::Result<()> {
60		self.check_active()?;
61		self.out.close(code)?;
62		Ok(())
63	}
64}
65
66/// Request context
67pub struct RequestContext {
68	/// Session id
69	pub session_id: session::SessionId,
70	/// Request Origin
71	pub origin: Option<Origin>,
72	/// Requested protocols
73	pub protocols: Vec<String>,
74	/// Direct channel to send messages to a client.
75	pub out: Sender,
76	/// Remote to underlying event loop.
77	pub executor: TaskExecutor,
78}
79
80impl RequestContext {
81	/// Get this session as a `Sink` spawning a new future
82	/// in the underlying event loop.
83	pub fn sender(&self) -> mpsc::Sender<String> {
84		let out = self.out.clone();
85		let (sender, receiver) = mpsc::channel(1);
86		self.executor.spawn(SenderFuture(out, receiver));
87		sender
88	}
89}
90
91impl fmt::Debug for RequestContext {
92	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
93		fmt.debug_struct("RequestContext")
94			.field("session_id", &self.session_id)
95			.field("origin", &self.origin)
96			.field("protocols", &self.protocols)
97			.finish()
98	}
99}
100
101/// Metadata extractor from session data.
102pub trait MetaExtractor<M: core::Metadata>: Send + Sync + 'static {
103	/// Extract metadata for given session
104	fn extract(&self, _context: &RequestContext) -> M;
105}
106
107impl<M, F> MetaExtractor<M> for F where
108	M: core::Metadata,
109	F: Fn(&RequestContext) -> M + Send + Sync + 'static,
110{
111	fn extract(&self, context: &RequestContext) -> M {
112		(*self)(context)
113	}
114}
115
116/// Dummy metadata extractor
117#[derive(Debug, Clone)]
118pub struct NoopExtractor;
119impl<M: core::Metadata + Default> MetaExtractor<M> for NoopExtractor {
120	fn extract(&self, _context: &RequestContext) -> M { M::default() }
121}
122
123struct SenderFuture(Sender, mpsc::Receiver<String>);
124impl futures::Future for SenderFuture {
125	type Item = ();
126	type Error = ();
127
128	fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
129		use self::futures::Stream;
130
131		loop {
132			let item = self.1.poll()?;
133			match item {
134				futures::Async::NotReady => {
135					return Ok(futures::Async::NotReady);
136				},
137				futures::Async::Ready(None) => {
138					return Ok(futures::Async::Ready(()));
139				},
140				futures::Async::Ready(Some(val)) => {
141					if let Err(e) = self.0.send(val) {
142						warn!("Error sending a subscription update: {:?}", e);
143						return Ok(futures::Async::Ready(()));
144					}
145				},
146			}
147		}
148	}
149}