bitconch_jsonrpc_ws_server/
server.rs

1use std::fmt;
2use std::net::SocketAddr;
3use std::sync::{Arc, Mutex};
4use std::thread;
5
6use core;
7use server_utils::cors::Origin;
8use server_utils::hosts::{self, Host};
9use server_utils::reactor::{UninitializedExecutor, Executor};
10use server_utils::session::SessionStats;
11use ws;
12
13use error::{Error, Result};
14use metadata;
15use session;
16
17/// `WebSockets` server implementation.
18pub struct Server {
19	addr: SocketAddr,
20	handle: Option<thread::JoinHandle<Result<()>>>,
21	executor: Arc<Mutex<Option<Executor>>>,
22	broadcaster: ws::Sender,
23}
24
25impl fmt::Debug for Server {
26    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
27		f.debug_struct("Server")
28			.field("addr", &self.addr)
29			.field("handle", &self.handle)
30			.field("executor", &self.executor)
31			.finish()
32    }
33}
34
35impl Server {
36	/// Returns the address this server is listening on
37	pub fn addr(&self) -> &SocketAddr {
38		&self.addr
39	}
40
41	/// Starts a new `WebSocket` server in separate thread.
42	/// Returns a `Server` handle which closes the server when droped.
43	pub fn start<M: core::Metadata, S: core::Middleware<M>>(
44		addr: &SocketAddr,
45		handler: Arc<core::MetaIoHandler<M, S>>,
46		meta_extractor: Arc<metadata::MetaExtractor<M>>,
47		allowed_origins: Option<Vec<Origin>>,
48		allowed_hosts: Option<Vec<Host>>,
49		request_middleware: Option<Arc<session::RequestMiddleware>>,
50		stats: Option<Arc<SessionStats>>,
51		executor: UninitializedExecutor,
52		max_connections: usize,
53	) -> Result<Server> {
54		let config = {
55			let mut config = ws::Settings::default();
56			config.max_connections = max_connections;
57			// don't grow non-final fragments (to prevent DOS)
58			config.fragments_grow = false;
59			// don't accept super large requests
60			config.max_in_buffer = 5 * 1024 * 1024; // 5MB
61			// accept only handshakes beginning with GET
62			config.method_strict = true;
63			// require masking
64			config.masking_strict = true;
65			// Was shutting down server when suspending on linux:
66			config.shutdown_on_interrupt = false;
67			config
68		};
69
70		// Update allowed_hosts
71		let allowed_hosts = hosts::update(allowed_hosts, addr);
72
73		// Spawn event loop (if necessary)
74		let eloop = executor.initialize()?;
75		let executor = eloop.executor();
76
77		// Create WebSocket
78		let ws = ws::Builder::new().with_settings(config).build(session::Factory::new(
79			handler, meta_extractor, allowed_origins, allowed_hosts, request_middleware, stats, executor
80		))?;
81		let broadcaster = ws.broadcaster();
82
83		// Start listening...
84		let ws = ws.bind(addr)?;
85		let local_addr = ws.local_addr()?;
86		debug!("Bound to local address: {}", local_addr);
87
88		// Spawn a thread with event loop
89		let handle = thread::spawn(move || {
90			match ws.run().map_err(Error::from) {
91				Err(error) => {
92					error!("Error while running websockets server. Details: {:?}", error);
93					Err(error)
94				},
95				Ok(_server) => Ok(()),
96			}
97		});
98
99		// Return a handle
100		Ok(Server {
101			addr: local_addr,
102			handle: Some(handle),
103			executor: Arc::new(Mutex::new(Some(eloop))),
104			broadcaster: broadcaster,
105		})
106	}
107}
108
109impl Server {
110	/// Consumes the server and waits for completion
111	pub fn wait(mut self) -> Result<()> {
112		self.handle.take().expect("Handle is always Some at start.").join().expect("Non-panic exit")
113	}
114
115	/// Closes the server and waits for it to finish
116	pub fn close(self) {
117		self.close_handle().close();
118	}
119
120	/// Returns a handle to the server that can be used to close it while another thread is
121	/// blocking in `wait`.
122	pub fn close_handle(&self) -> CloseHandle {
123		CloseHandle {
124			executor: self.executor.clone(),
125			broadcaster: self.broadcaster.clone(),
126		}
127	}
128}
129
130impl Drop for Server {
131	fn drop(&mut self) {
132		self.close_handle().close();
133		self.handle.take().map(|handle| handle.join());
134	}
135}
136
137
138/// A handle that allows closing of a server even if it owned by a thread blocked in `wait`.
139#[derive(Clone)]
140pub struct CloseHandle {
141	executor: Arc<Mutex<Option<Executor>>>,
142	broadcaster: ws::Sender,
143}
144
145impl CloseHandle {
146	/// Closes the `Server`.
147	pub fn close(self) {
148		let _ = self.broadcaster.shutdown();
149		self.executor.lock().unwrap().take().map(|executor| executor.close());
150	}
151}