jsonrpc_ws_server/
metadata.rs1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{atomic, Arc};
5use std::task::{Context, Poll};
6
7use crate::core;
8use crate::core::futures::channel::mpsc;
9use crate::server_utils::{reactor::TaskExecutor, session};
10use crate::ws;
11
12use crate::error;
13use crate::Origin;
14
15#[derive(Clone)]
17pub struct Sender {
18 out: ws::Sender,
19 active: Arc<atomic::AtomicBool>,
20}
21
22impl Sender {
23 pub fn new(out: ws::Sender, active: Arc<atomic::AtomicBool>) -> Self {
25 Sender { out, active }
26 }
27
28 fn check_active(&self) -> error::Result<()> {
29 if self.active.load(atomic::Ordering::SeqCst) {
30 Ok(())
31 } else {
32 Err(error::Error::ConnectionClosed)
33 }
34 }
35
36 pub fn send<M>(&self, msg: M) -> error::Result<()>
39 where
40 M: Into<ws::Message>,
41 {
42 self.check_active()?;
43 self.out.send(msg)?;
44 Ok(())
45 }
46
47 pub fn broadcast<M>(&self, msg: M) -> error::Result<()>
50 where
51 M: Into<ws::Message>,
52 {
53 self.check_active()?;
54 self.out.broadcast(msg)?;
55 Ok(())
56 }
57
58 pub fn close(&self, code: ws::CloseCode) -> error::Result<()> {
61 self.check_active()?;
62 self.out.close(code)?;
63 Ok(())
64 }
65}
66
67pub struct RequestContext {
69 pub session_id: session::SessionId,
71 pub origin: Option<Origin>,
73 pub protocols: Vec<String>,
75 pub out: Sender,
77 pub executor: TaskExecutor,
79}
80
81impl RequestContext {
82 pub fn sender(&self) -> mpsc::UnboundedSender<String> {
85 let out = self.out.clone();
86 let (sender, receiver) = mpsc::unbounded();
87 self.executor.spawn(SenderFuture(out, Box::new(receiver)));
88 sender
89 }
90}
91
92impl fmt::Debug for RequestContext {
93 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
94 fmt.debug_struct("RequestContext")
95 .field("session_id", &self.session_id)
96 .field("origin", &self.origin)
97 .field("protocols", &self.protocols)
98 .finish()
99 }
100}
101
102pub trait MetaExtractor<M: core::Metadata>: Send + Sync + 'static {
104 fn extract(&self, _context: &RequestContext) -> M;
106}
107
108impl<M, F> MetaExtractor<M> for F
109where
110 M: core::Metadata,
111 F: Fn(&RequestContext) -> M + Send + Sync + 'static,
112{
113 fn extract(&self, context: &RequestContext) -> M {
114 (*self)(context)
115 }
116}
117
118#[derive(Debug, Clone)]
120pub struct NoopExtractor;
121impl<M: core::Metadata + Default> MetaExtractor<M> for NoopExtractor {
122 fn extract(&self, _context: &RequestContext) -> M {
123 M::default()
124 }
125}
126
127struct SenderFuture(Sender, Box<dyn futures::Stream<Item = String> + Send + Unpin>);
128
129impl Future for SenderFuture {
130 type Output = ();
131
132 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133 use futures::Stream;
134
135 let this = Pin::into_inner(self);
136 loop {
137 match Pin::new(&mut this.1).poll_next(cx) {
138 Poll::Pending => return Poll::Pending,
139 Poll::Ready(None) => return Poll::Ready(()),
140 Poll::Ready(Some(val)) => {
141 if let Err(e) = this.0.send(val) {
142 warn!("Error sending a subscription update: {:?}", e);
143 return Poll::Ready(());
144 }
145 }
146 }
147 }
148 }
149}