use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::thread;
use std::{cmp, fmt};
use crate::core;
use crate::server_utils::cors::Origin;
use crate::server_utils::hosts::{self, Host};
use crate::server_utils::reactor::{Executor, UninitializedExecutor};
use crate::server_utils::session::SessionStats;
use crate::ws;
use crate::error::{Error, Result};
use crate::metadata;
use crate::session;
pub struct Server {
addr: SocketAddr,
handle: Option<thread::JoinHandle<Result<()>>>,
executor: Arc<Mutex<Option<Executor>>>,
broadcaster: ws::Sender,
}
impl fmt::Debug for Server {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Server")
.field("addr", &self.addr)
.field("handle", &self.handle)
.field("executor", &self.executor)
.finish()
}
}
impl Server {
pub fn addr(&self) -> &SocketAddr {
&self.addr
}
pub fn broadcaster(&self) -> Broadcaster {
Broadcaster {
broadcaster: self.broadcaster.clone(),
}
}
pub fn start<M: core::Metadata, S: core::Middleware<M>>(
addr: &SocketAddr,
handler: Arc<core::MetaIoHandler<M, S>>,
meta_extractor: Arc<metadata::MetaExtractor<M>>,
allowed_origins: Option<Vec<Origin>>,
allowed_hosts: Option<Vec<Host>>,
request_middleware: Option<Arc<session::RequestMiddleware>>,
stats: Option<Arc<SessionStats>>,
executor: UninitializedExecutor,
max_connections: usize,
max_payload_bytes: usize,
) -> Result<Server> {
let config = {
let mut config = ws::Settings::default();
config.max_connections = max_connections;
config.max_fragment_size = max_payload_bytes;
config.fragments_grow = false;
config.fragments_capacity = cmp::max(1, max_payload_bytes / config.fragment_size);
config.method_strict = true;
config.masking_strict = true;
config.shutdown_on_interrupt = false;
config
};
let allowed_hosts = hosts::update(allowed_hosts, addr);
let eloop = executor.initialize()?;
let executor = eloop.executor();
let ws = ws::Builder::new().with_settings(config).build(session::Factory::new(
handler,
meta_extractor,
allowed_origins,
allowed_hosts,
request_middleware,
stats,
executor,
))?;
let broadcaster = ws.broadcaster();
let ws = ws.bind(addr)?;
let local_addr = ws.local_addr()?;
debug!("Bound to local address: {}", local_addr);
let handle = thread::spawn(move || match ws.run().map_err(Error::from) {
Err(error) => {
error!("Error while running websockets server. Details: {:?}", error);
Err(error)
}
Ok(_server) => Ok(()),
});
Ok(Server {
addr: local_addr,
handle: Some(handle),
executor: Arc::new(Mutex::new(Some(eloop))),
broadcaster,
})
}
}
impl Server {
pub fn wait(mut self) -> Result<()> {
self.handle
.take()
.expect("Handle is always Some at start.")
.join()
.expect("Non-panic exit")
}
pub fn close(self) {
self.close_handle().close();
}
pub fn close_handle(&self) -> CloseHandle {
CloseHandle {
executor: self.executor.clone(),
broadcaster: self.broadcaster.clone(),
}
}
}
impl Drop for Server {
fn drop(&mut self) {
self.close_handle().close();
self.handle.take().map(|handle| handle.join());
}
}
#[derive(Clone)]
pub struct CloseHandle {
executor: Arc<Mutex<Option<Executor>>>,
broadcaster: ws::Sender,
}
impl CloseHandle {
pub fn close(self) {
let _ = self.broadcaster.shutdown();
if let Some(executor) = self.executor.lock().unwrap().take() {
executor.close()
}
}
}
#[derive(Clone)]
pub struct Broadcaster {
broadcaster: ws::Sender,
}
impl Broadcaster {
#[inline]
pub fn send<M>(&self, msg: M) -> Result<()>
where
M: Into<ws::Message>,
{
match self.broadcaster.send(msg).map_err(Error::from) {
Err(error) => {
error!("Error while running sending. Details: {:?}", error);
Err(error)
}
Ok(_server) => Ok(()),
}
}
}