bitconch_jsonrpc_ws_server/
server.rs1use std::fmt;
2use std::net::SocketAddr;
3use std::sync::{Arc, Mutex};
4use std::thread;
5
6use core;
7use server_utils::cors::Origin;
8use server_utils::hosts::{self, Host};
9use server_utils::reactor::{UninitializedExecutor, Executor};
10use server_utils::session::SessionStats;
11use ws;
12
13use error::{Error, Result};
14use metadata;
15use session;
16
17pub struct Server {
19 addr: SocketAddr,
20 handle: Option<thread::JoinHandle<Result<()>>>,
21 executor: Arc<Mutex<Option<Executor>>>,
22 broadcaster: ws::Sender,
23}
24
25impl fmt::Debug for Server {
26 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
27 f.debug_struct("Server")
28 .field("addr", &self.addr)
29 .field("handle", &self.handle)
30 .field("executor", &self.executor)
31 .finish()
32 }
33}
34
35impl Server {
36 pub fn addr(&self) -> &SocketAddr {
38 &self.addr
39 }
40
41 pub fn start<M: core::Metadata, S: core::Middleware<M>>(
44 addr: &SocketAddr,
45 handler: Arc<core::MetaIoHandler<M, S>>,
46 meta_extractor: Arc<metadata::MetaExtractor<M>>,
47 allowed_origins: Option<Vec<Origin>>,
48 allowed_hosts: Option<Vec<Host>>,
49 request_middleware: Option<Arc<session::RequestMiddleware>>,
50 stats: Option<Arc<SessionStats>>,
51 executor: UninitializedExecutor,
52 max_connections: usize,
53 ) -> Result<Server> {
54 let config = {
55 let mut config = ws::Settings::default();
56 config.max_connections = max_connections;
57 config.fragments_grow = false;
59 config.max_in_buffer = 5 * 1024 * 1024; config.method_strict = true;
63 config.masking_strict = true;
65 config.shutdown_on_interrupt = false;
67 config
68 };
69
70 let allowed_hosts = hosts::update(allowed_hosts, addr);
72
73 let eloop = executor.initialize()?;
75 let executor = eloop.executor();
76
77 let ws = ws::Builder::new().with_settings(config).build(session::Factory::new(
79 handler, meta_extractor, allowed_origins, allowed_hosts, request_middleware, stats, executor
80 ))?;
81 let broadcaster = ws.broadcaster();
82
83 let ws = ws.bind(addr)?;
85 let local_addr = ws.local_addr()?;
86 debug!("Bound to local address: {}", local_addr);
87
88 let handle = thread::spawn(move || {
90 match ws.run().map_err(Error::from) {
91 Err(error) => {
92 error!("Error while running websockets server. Details: {:?}", error);
93 Err(error)
94 },
95 Ok(_server) => Ok(()),
96 }
97 });
98
99 Ok(Server {
101 addr: local_addr,
102 handle: Some(handle),
103 executor: Arc::new(Mutex::new(Some(eloop))),
104 broadcaster: broadcaster,
105 })
106 }
107}
108
109impl Server {
110 pub fn wait(mut self) -> Result<()> {
112 self.handle.take().expect("Handle is always Some at start.").join().expect("Non-panic exit")
113 }
114
115 pub fn close(self) {
117 self.close_handle().close();
118 }
119
120 pub fn close_handle(&self) -> CloseHandle {
123 CloseHandle {
124 executor: self.executor.clone(),
125 broadcaster: self.broadcaster.clone(),
126 }
127 }
128}
129
130impl Drop for Server {
131 fn drop(&mut self) {
132 self.close_handle().close();
133 self.handle.take().map(|handle| handle.join());
134 }
135}
136
137
138#[derive(Clone)]
140pub struct CloseHandle {
141 executor: Arc<Mutex<Option<Executor>>>,
142 broadcaster: ws::Sender,
143}
144
145impl CloseHandle {
146 pub fn close(self) {
148 let _ = self.broadcaster.shutdown();
149 self.executor.lock().unwrap().take().map(|executor| executor.close());
150 }
151}