1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::thread;
use std::{cmp, fmt};

use crate::core;
use crate::server_utils::cors::Origin;
use crate::server_utils::hosts::{self, Host};
use crate::server_utils::reactor::{Executor, UninitializedExecutor};
use crate::server_utils::session::SessionStats;
use crate::ws;

use crate::error::{Error, Result};
use crate::metadata;
use crate::session;

/// `WebSockets` server implementation.
pub struct Server {
	addr: SocketAddr,
	handle: Option<thread::JoinHandle<Result<()>>>,
	executor: Arc<Mutex<Option<Executor>>>,
	broadcaster: ws::Sender,
}

impl fmt::Debug for Server {
	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
		f.debug_struct("Server")
			.field("addr", &self.addr)
			.field("handle", &self.handle)
			.field("executor", &self.executor)
			.finish()
	}
}

impl Server {
	/// Returns the address this server is listening on
	pub fn addr(&self) -> &SocketAddr {
		&self.addr
	}

	/// Returns a Broadcaster that can be used to send messages on all connections.
	pub fn broadcaster(&self) -> Broadcaster {
		Broadcaster {
			broadcaster: self.broadcaster.clone(),
		}
	}

	/// Starts a new `WebSocket` server in separate thread.
	/// Returns a `Server` handle which closes the server when droped.
	pub fn start<M: core::Metadata, S: core::Middleware<M>>(
		addr: &SocketAddr,
		handler: Arc<core::MetaIoHandler<M, S>>,
		meta_extractor: Arc<dyn metadata::MetaExtractor<M>>,
		allowed_origins: Option<Vec<Origin>>,
		allowed_hosts: Option<Vec<Host>>,
		request_middleware: Option<Arc<dyn session::RequestMiddleware>>,
		stats: Option<Arc<dyn SessionStats>>,
		executor: UninitializedExecutor,
		max_connections: usize,
		max_payload_bytes: usize,
		max_in_buffer_capacity: usize,
		max_out_buffer_capacity: usize,
	) -> Result<Server>
	where
		S::Future: Unpin,
		S::CallFuture: Unpin,
	{
		let config = {
			let mut config = ws::Settings::default();
			config.max_connections = max_connections;
			// don't accept super large requests
			config.max_fragment_size = max_payload_bytes;
			config.in_buffer_capacity_hard_limit = max_in_buffer_capacity;
			config.out_buffer_capacity_hard_limit = max_out_buffer_capacity;
			// don't grow non-final fragments (to prevent DOS)
			config.fragments_grow = false;
			config.fragments_capacity = cmp::max(1, max_payload_bytes / config.fragment_size);
			// accept only handshakes beginning with GET
			config.method_strict = true;
			// require masking
			config.masking_strict = true;
			// Was shutting down server when suspending on linux:
			config.shutdown_on_interrupt = false;
			config
		};

		// Update allowed_hosts
		let allowed_hosts = hosts::update(allowed_hosts, addr);

		// Spawn event loop (if necessary)
		let eloop = executor.initialize()?;
		let executor = eloop.executor();

		// Create WebSocket
		let ws = ws::Builder::new().with_settings(config).build(session::Factory::new(
			handler,
			meta_extractor,
			allowed_origins,
			allowed_hosts,
			request_middleware,
			stats,
			executor,
		))?;
		let broadcaster = ws.broadcaster();

		// Start listening...
		let ws = ws.bind(addr)?;
		let local_addr = ws.local_addr()?;
		debug!("Bound to local address: {}", local_addr);

		// Spawn a thread with event loop
		let handle = thread::spawn(move || match ws.run().map_err(Error::from) {
			Err(error) => {
				error!("Error while running websockets server. Details: {:?}", error);
				Err(error)
			}
			Ok(_server) => Ok(()),
		});

		// Return a handle
		Ok(Server {
			addr: local_addr,
			handle: Some(handle),
			executor: Arc::new(Mutex::new(Some(eloop))),
			broadcaster,
		})
	}
}

impl Server {
	/// Consumes the server and waits for completion
	pub fn wait(mut self) -> Result<()> {
		self.handle
			.take()
			.expect("Handle is always Some at start.")
			.join()
			.expect("Non-panic exit")
	}

	/// Closes the server and waits for it to finish
	pub fn close(self) {
		self.close_handle().close();
	}

	/// Returns a handle to the server that can be used to close it while another thread is
	/// blocking in `wait`.
	pub fn close_handle(&self) -> CloseHandle {
		CloseHandle {
			executor: self.executor.clone(),
			broadcaster: self.broadcaster.clone(),
		}
	}
}

impl Drop for Server {
	fn drop(&mut self) {
		self.close_handle().close();
		self.handle.take().map(|handle| handle.join());
	}
}

/// A handle that allows closing of a server even if it owned by a thread blocked in `wait`.
#[derive(Clone)]
pub struct CloseHandle {
	executor: Arc<Mutex<Option<Executor>>>,
	broadcaster: ws::Sender,
}

impl CloseHandle {
	/// Closes the `Server`.
	pub fn close(self) {
		let _ = self.broadcaster.shutdown();
		if let Some(executor) = self.executor.lock().unwrap().take() {
			executor.close()
		}
	}
}

/// A Broadcaster that can be used to send messages on all connections.
#[derive(Clone)]
pub struct Broadcaster {
	broadcaster: ws::Sender,
}

impl Broadcaster {
	/// Send a message to the endpoints of all connections.
	#[inline]
	pub fn send<M>(&self, msg: M) -> Result<()>
	where
		M: Into<ws::Message>,
	{
		match self.broadcaster.send(msg).map_err(Error::from) {
			Err(error) => {
				error!("Error while running sending. Details: {:?}", error);
				Err(error)
			}
			Ok(_server) => Ok(()),
		}
	}
}