jsonrpc_ws_server/
server.rs1use std::net::SocketAddr;
2use std::sync::{Arc, Mutex};
3use std::thread;
4use std::{cmp, fmt};
5
6use crate::core;
7use crate::server_utils::cors::Origin;
8use crate::server_utils::hosts::{self, Host};
9use crate::server_utils::reactor::{Executor, UninitializedExecutor};
10use crate::server_utils::session::SessionStats;
11use crate::ws;
12
13use crate::error::{Error, Result};
14use crate::metadata;
15use crate::session;
16
17pub struct Server {
19 addr: SocketAddr,
20 handle: Option<thread::JoinHandle<Result<()>>>,
21 executor: Arc<Mutex<Option<Executor>>>,
22 broadcaster: ws::Sender,
23}
24
25impl fmt::Debug for Server {
26 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
27 f.debug_struct("Server")
28 .field("addr", &self.addr)
29 .field("handle", &self.handle)
30 .field("executor", &self.executor)
31 .finish()
32 }
33}
34
35impl Server {
36 pub fn addr(&self) -> &SocketAddr {
38 &self.addr
39 }
40
41 pub fn broadcaster(&self) -> Broadcaster {
43 Broadcaster {
44 broadcaster: self.broadcaster.clone(),
45 }
46 }
47
48 pub fn start<M: core::Metadata, S: core::Middleware<M>>(
51 addr: &SocketAddr,
52 handler: Arc<core::MetaIoHandler<M, S>>,
53 meta_extractor: Arc<dyn metadata::MetaExtractor<M>>,
54 allowed_origins: Option<Vec<Origin>>,
55 allowed_hosts: Option<Vec<Host>>,
56 request_middleware: Option<Arc<dyn session::RequestMiddleware>>,
57 stats: Option<Arc<dyn SessionStats>>,
58 executor: UninitializedExecutor,
59 max_connections: usize,
60 max_payload_bytes: usize,
61 max_in_buffer_capacity: usize,
62 max_out_buffer_capacity: usize,
63 ) -> Result<Server>
64 where
65 S::Future: Unpin,
66 S::CallFuture: Unpin,
67 {
68 let config = {
69 let mut config = ws::Settings::default();
70 config.max_connections = max_connections;
71 config.max_fragment_size = max_payload_bytes;
73 config.in_buffer_capacity_hard_limit = max_in_buffer_capacity;
74 config.out_buffer_capacity_hard_limit = max_out_buffer_capacity;
75 config.fragments_grow = false;
77 config.fragments_capacity = cmp::max(1, max_payload_bytes / config.fragment_size);
78 config.method_strict = true;
80 config.masking_strict = true;
82 config.shutdown_on_interrupt = false;
84 config
85 };
86
87 let allowed_hosts = hosts::update(allowed_hosts, addr);
89
90 let eloop = executor.initialize()?;
92 let executor = eloop.executor();
93
94 let ws = ws::Builder::new().with_settings(config).build(session::Factory::new(
96 handler,
97 meta_extractor,
98 allowed_origins,
99 allowed_hosts,
100 request_middleware,
101 stats,
102 executor,
103 ))?;
104 let broadcaster = ws.broadcaster();
105
106 let ws = ws.bind(addr)?;
108 let local_addr = ws.local_addr()?;
109 debug!("Bound to local address: {}", local_addr);
110
111 let handle = thread::spawn(move || match ws.run().map_err(Error::from) {
113 Err(error) => {
114 error!("Error while running websockets server. Details: {:?}", error);
115 Err(error)
116 }
117 Ok(_server) => Ok(()),
118 });
119
120 Ok(Server {
122 addr: local_addr,
123 handle: Some(handle),
124 executor: Arc::new(Mutex::new(Some(eloop))),
125 broadcaster,
126 })
127 }
128}
129
130impl Server {
131 pub fn wait(mut self) -> Result<()> {
133 self.handle
134 .take()
135 .expect("Handle is always Some at start.")
136 .join()
137 .expect("Non-panic exit")
138 }
139
140 pub fn close(self) {
142 self.close_handle().close();
143 }
144
145 pub fn close_handle(&self) -> CloseHandle {
148 CloseHandle {
149 executor: self.executor.clone(),
150 broadcaster: self.broadcaster.clone(),
151 }
152 }
153}
154
155impl Drop for Server {
156 fn drop(&mut self) {
157 self.close_handle().close();
158 self.handle.take().map(|handle| handle.join());
159 }
160}
161
162#[derive(Clone)]
164pub struct CloseHandle {
165 executor: Arc<Mutex<Option<Executor>>>,
166 broadcaster: ws::Sender,
167}
168
169impl CloseHandle {
170 pub fn close(self) {
172 let _ = self.broadcaster.shutdown();
173 if let Some(executor) = self.executor.lock().unwrap().take() {
174 executor.close()
175 }
176 }
177}
178
179#[derive(Clone)]
181pub struct Broadcaster {
182 broadcaster: ws::Sender,
183}
184
185impl Broadcaster {
186 #[inline]
188 pub fn send<M>(&self, msg: M) -> Result<()>
189 where
190 M: Into<ws::Message>,
191 {
192 match self.broadcaster.send(msg).map_err(Error::from) {
193 Err(error) => {
194 error!("Error while running sending. Details: {:?}", error);
195 Err(error)
196 }
197 Ok(_server) => Ok(()),
198 }
199 }
200}