jsonrpc_ws_server/
metadata.rs

1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{atomic, Arc};
5use std::task::{Context, Poll};
6
7use crate::core;
8use crate::core::futures::channel::mpsc;
9use crate::server_utils::{reactor::TaskExecutor, session};
10use crate::ws;
11
12use crate::error;
13use crate::Origin;
14
15/// Output of WebSocket connection. Use this to send messages to the other endpoint.
16#[derive(Clone)]
17pub struct Sender {
18	out: ws::Sender,
19	active: Arc<atomic::AtomicBool>,
20}
21
22impl Sender {
23	/// Creates a new `Sender`.
24	pub fn new(out: ws::Sender, active: Arc<atomic::AtomicBool>) -> Self {
25		Sender { out, active }
26	}
27
28	fn check_active(&self) -> error::Result<()> {
29		if self.active.load(atomic::Ordering::SeqCst) {
30			Ok(())
31		} else {
32			Err(error::Error::ConnectionClosed)
33		}
34	}
35
36	/// Sends a message over the connection.
37	/// Will return error if the connection is not active any more.
38	pub fn send<M>(&self, msg: M) -> error::Result<()>
39	where
40		M: Into<ws::Message>,
41	{
42		self.check_active()?;
43		self.out.send(msg)?;
44		Ok(())
45	}
46
47	/// Sends a message over the endpoints of all connections.
48	/// Will return error if the connection is not active any more.
49	pub fn broadcast<M>(&self, msg: M) -> error::Result<()>
50	where
51		M: Into<ws::Message>,
52	{
53		self.check_active()?;
54		self.out.broadcast(msg)?;
55		Ok(())
56	}
57
58	/// Sends a close code to the other endpoint.
59	/// Will return error if the connection is not active any more.
60	pub fn close(&self, code: ws::CloseCode) -> error::Result<()> {
61		self.check_active()?;
62		self.out.close(code)?;
63		Ok(())
64	}
65}
66
67/// Request context
68pub struct RequestContext {
69	/// Session id
70	pub session_id: session::SessionId,
71	/// Request Origin
72	pub origin: Option<Origin>,
73	/// Requested protocols
74	pub protocols: Vec<String>,
75	/// Direct channel to send messages to a client.
76	pub out: Sender,
77	/// Remote to underlying event loop.
78	pub executor: TaskExecutor,
79}
80
81impl RequestContext {
82	/// Get this session as a `Sink` spawning a new future
83	/// in the underlying event loop.
84	pub fn sender(&self) -> mpsc::UnboundedSender<String> {
85		let out = self.out.clone();
86		let (sender, receiver) = mpsc::unbounded();
87		self.executor.spawn(SenderFuture(out, Box::new(receiver)));
88		sender
89	}
90}
91
92impl fmt::Debug for RequestContext {
93	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
94		fmt.debug_struct("RequestContext")
95			.field("session_id", &self.session_id)
96			.field("origin", &self.origin)
97			.field("protocols", &self.protocols)
98			.finish()
99	}
100}
101
102/// Metadata extractor from session data.
103pub trait MetaExtractor<M: core::Metadata>: Send + Sync + 'static {
104	/// Extract metadata for given session
105	fn extract(&self, _context: &RequestContext) -> M;
106}
107
108impl<M, F> MetaExtractor<M> for F
109where
110	M: core::Metadata,
111	F: Fn(&RequestContext) -> M + Send + Sync + 'static,
112{
113	fn extract(&self, context: &RequestContext) -> M {
114		(*self)(context)
115	}
116}
117
118/// Dummy metadata extractor
119#[derive(Debug, Clone)]
120pub struct NoopExtractor;
121impl<M: core::Metadata + Default> MetaExtractor<M> for NoopExtractor {
122	fn extract(&self, _context: &RequestContext) -> M {
123		M::default()
124	}
125}
126
127struct SenderFuture(Sender, Box<dyn futures::Stream<Item = String> + Send + Unpin>);
128
129impl Future for SenderFuture {
130	type Output = ();
131
132	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133		use futures::Stream;
134
135		let this = Pin::into_inner(self);
136		loop {
137			match Pin::new(&mut this.1).poll_next(cx) {
138				Poll::Pending => return Poll::Pending,
139				Poll::Ready(None) => return Poll::Ready(()),
140				Poll::Ready(Some(val)) => {
141					if let Err(e) = this.0.send(val) {
142						warn!("Error sending a subscription update: {:?}", e);
143						return Poll::Ready(());
144					}
145				}
146			}
147		}
148	}
149}