jsonrpc_ws_server/
server.rs

1use std::net::SocketAddr;
2use std::sync::{Arc, Mutex};
3use std::thread;
4use std::{cmp, fmt};
5
6use crate::core;
7use crate::server_utils::cors::Origin;
8use crate::server_utils::hosts::{self, Host};
9use crate::server_utils::reactor::{Executor, UninitializedExecutor};
10use crate::server_utils::session::SessionStats;
11use crate::ws;
12
13use crate::error::{Error, Result};
14use crate::metadata;
15use crate::session;
16
17/// `WebSockets` server implementation.
18pub struct Server {
19	addr: SocketAddr,
20	handle: Option<thread::JoinHandle<Result<()>>>,
21	executor: Arc<Mutex<Option<Executor>>>,
22	broadcaster: ws::Sender,
23}
24
25impl fmt::Debug for Server {
26	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
27		f.debug_struct("Server")
28			.field("addr", &self.addr)
29			.field("handle", &self.handle)
30			.field("executor", &self.executor)
31			.finish()
32	}
33}
34
35impl Server {
36	/// Returns the address this server is listening on
37	pub fn addr(&self) -> &SocketAddr {
38		&self.addr
39	}
40
41	/// Returns a Broadcaster that can be used to send messages on all connections.
42	pub fn broadcaster(&self) -> Broadcaster {
43		Broadcaster {
44			broadcaster: self.broadcaster.clone(),
45		}
46	}
47
48	/// Starts a new `WebSocket` server in separate thread.
49	/// Returns a `Server` handle which closes the server when droped.
50	pub fn start<M: core::Metadata, S: core::Middleware<M>>(
51		addr: &SocketAddr,
52		handler: Arc<core::MetaIoHandler<M, S>>,
53		meta_extractor: Arc<dyn metadata::MetaExtractor<M>>,
54		allowed_origins: Option<Vec<Origin>>,
55		allowed_hosts: Option<Vec<Host>>,
56		request_middleware: Option<Arc<dyn session::RequestMiddleware>>,
57		stats: Option<Arc<dyn SessionStats>>,
58		executor: UninitializedExecutor,
59		max_connections: usize,
60		max_payload_bytes: usize,
61		max_in_buffer_capacity: usize,
62		max_out_buffer_capacity: usize,
63	) -> Result<Server>
64	where
65		S::Future: Unpin,
66		S::CallFuture: Unpin,
67	{
68		let config = {
69			let mut config = ws::Settings::default();
70			config.max_connections = max_connections;
71			// don't accept super large requests
72			config.max_fragment_size = max_payload_bytes;
73			config.in_buffer_capacity_hard_limit = max_in_buffer_capacity;
74			config.out_buffer_capacity_hard_limit = max_out_buffer_capacity;
75			// don't grow non-final fragments (to prevent DOS)
76			config.fragments_grow = false;
77			config.fragments_capacity = cmp::max(1, max_payload_bytes / config.fragment_size);
78			// accept only handshakes beginning with GET
79			config.method_strict = true;
80			// require masking
81			config.masking_strict = true;
82			// Was shutting down server when suspending on linux:
83			config.shutdown_on_interrupt = false;
84			config
85		};
86
87		// Update allowed_hosts
88		let allowed_hosts = hosts::update(allowed_hosts, addr);
89
90		// Spawn event loop (if necessary)
91		let eloop = executor.initialize()?;
92		let executor = eloop.executor();
93
94		// Create WebSocket
95		let ws = ws::Builder::new().with_settings(config).build(session::Factory::new(
96			handler,
97			meta_extractor,
98			allowed_origins,
99			allowed_hosts,
100			request_middleware,
101			stats,
102			executor,
103		))?;
104		let broadcaster = ws.broadcaster();
105
106		// Start listening...
107		let ws = ws.bind(addr)?;
108		let local_addr = ws.local_addr()?;
109		debug!("Bound to local address: {}", local_addr);
110
111		// Spawn a thread with event loop
112		let handle = thread::spawn(move || match ws.run().map_err(Error::from) {
113			Err(error) => {
114				error!("Error while running websockets server. Details: {:?}", error);
115				Err(error)
116			}
117			Ok(_server) => Ok(()),
118		});
119
120		// Return a handle
121		Ok(Server {
122			addr: local_addr,
123			handle: Some(handle),
124			executor: Arc::new(Mutex::new(Some(eloop))),
125			broadcaster,
126		})
127	}
128}
129
130impl Server {
131	/// Consumes the server and waits for completion
132	pub fn wait(mut self) -> Result<()> {
133		self.handle
134			.take()
135			.expect("Handle is always Some at start.")
136			.join()
137			.expect("Non-panic exit")
138	}
139
140	/// Closes the server and waits for it to finish
141	pub fn close(self) {
142		self.close_handle().close();
143	}
144
145	/// Returns a handle to the server that can be used to close it while another thread is
146	/// blocking in `wait`.
147	pub fn close_handle(&self) -> CloseHandle {
148		CloseHandle {
149			executor: self.executor.clone(),
150			broadcaster: self.broadcaster.clone(),
151		}
152	}
153}
154
155impl Drop for Server {
156	fn drop(&mut self) {
157		self.close_handle().close();
158		self.handle.take().map(|handle| handle.join());
159	}
160}
161
162/// A handle that allows closing of a server even if it owned by a thread blocked in `wait`.
163#[derive(Clone)]
164pub struct CloseHandle {
165	executor: Arc<Mutex<Option<Executor>>>,
166	broadcaster: ws::Sender,
167}
168
169impl CloseHandle {
170	/// Closes the `Server`.
171	pub fn close(self) {
172		let _ = self.broadcaster.shutdown();
173		if let Some(executor) = self.executor.lock().unwrap().take() {
174			executor.close()
175		}
176	}
177}
178
179/// A Broadcaster that can be used to send messages on all connections.
180#[derive(Clone)]
181pub struct Broadcaster {
182	broadcaster: ws::Sender,
183}
184
185impl Broadcaster {
186	/// Send a message to the endpoints of all connections.
187	#[inline]
188	pub fn send<M>(&self, msg: M) -> Result<()>
189	where
190		M: Into<ws::Message>,
191	{
192		match self.broadcaster.send(msg).map_err(Error::from) {
193			Err(error) => {
194				error!("Error while running sending. Details: {:?}", error);
195				Err(error)
196			}
197			Ok(_server) => Ok(()),
198		}
199	}
200}