1#![warn(missing_docs)]
21
22extern crate unicase;
23extern crate rs_jsonrpc_core as jsonrpc;
24extern crate rs_jsonrpc_server_utils as server_utils;
25extern crate net2;
26
27pub extern crate hyper;
28
29#[macro_use]
30extern crate log;
31
32mod handler;
33mod response;
34mod utils;
35#[cfg(test)]
36mod tests;
37
38use std::io;
39use std::sync::{mpsc, Arc};
40use std::net::SocketAddr;
41
42use hyper::server;
43use jsonrpc::MetaIoHandler;
44use jsonrpc::futures::{self, Future, Stream};
45use jsonrpc::futures::sync::oneshot;
46use server_utils::reactor::{Remote, UninitializedRemote};
47
48pub use server_utils::hosts::{Host, DomainsValidation};
49pub use server_utils::cors::{AccessControlAllowOrigin, Origin};
50pub use server_utils::tokio_core;
51pub use handler::ServerHandler;
52pub use utils::{is_host_allowed, cors_header, CorsHeader};
53pub use response::Response;
54
55pub enum RequestMiddlewareAction {
57 Proceed {
59 should_continue_on_invalid_cors: bool,
62 request: server::Request,
64 },
65 Respond {
67 should_validate_hosts: bool,
69 response: Box<Future<Item=server::Response, Error=hyper::Error> + Send>,
71 }
72}
73
74impl From<Response> for RequestMiddlewareAction {
75 fn from(o: Response) -> Self {
76 RequestMiddlewareAction::Respond {
77 should_validate_hosts: true,
78 response: Box::new(futures::future::ok(o.into())),
79 }
80 }
81}
82
83impl From<server::Response> for RequestMiddlewareAction {
84 fn from(response: server::Response) -> Self {
85 RequestMiddlewareAction::Respond {
86 should_validate_hosts: true,
87 response: Box::new(futures::future::ok(response)),
88 }
89 }
90}
91
92impl From<server::Request> for RequestMiddlewareAction {
93 fn from(request: server::Request) -> Self {
94 RequestMiddlewareAction::Proceed {
95 should_continue_on_invalid_cors: false,
96 request,
97 }
98 }
99}
100
101pub trait RequestMiddleware: Send + Sync + 'static {
103 fn on_request(&self, request: server::Request) -> RequestMiddlewareAction;
105}
106
107impl<F> RequestMiddleware for F where
108 F: Fn(server::Request) -> RequestMiddlewareAction + Sync + Send + 'static,
109{
110 fn on_request(&self, request: server::Request) -> RequestMiddlewareAction {
111 (*self)(request)
112 }
113}
114
115#[derive(Default)]
116struct NoopRequestMiddleware;
117impl RequestMiddleware for NoopRequestMiddleware {
118 fn on_request(&self, request: server::Request) -> RequestMiddlewareAction {
119 RequestMiddlewareAction::Proceed {
120 should_continue_on_invalid_cors: false,
121 request,
122 }
123 }
124}
125
126pub trait MetaExtractor<M: jsonrpc::Metadata>: Sync + Send + 'static {
128 fn read_metadata(&self, _: &server::Request) -> M {
130 Default::default()
131 }
132}
133
134impl<M, F> MetaExtractor<M> for F where
135 M: jsonrpc::Metadata,
136 F: Fn(&server::Request) -> M + Sync + Send + 'static,
137{
138 fn read_metadata(&self, req: &server::Request) -> M {
139 (*self)(req)
140 }
141}
142
143#[derive(Default)]
144struct NoopExtractor;
145impl<M: jsonrpc::Metadata> MetaExtractor<M> for NoopExtractor {}
146
147pub struct Rpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
149 pub handler: Arc<MetaIoHandler<M, S>>,
151 pub extractor: Arc<MetaExtractor<M>>,
153}
154
155impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for Rpc<M, S> {
156 fn clone(&self) -> Self {
157 Rpc {
158 handler: self.handler.clone(),
159 extractor: self.extractor.clone(),
160 }
161 }
162}
163
164type AllowedHosts = Option<Vec<Host>>;
165type CorsDomains = Option<Vec<AccessControlAllowOrigin>>;
166
167#[derive(Debug, PartialEq, Clone, Copy)]
169pub enum RestApi {
170 Secure,
176 Unsecure,
181 Disabled,
183}
184
185pub struct ServerBuilder<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
187 handler: Arc<MetaIoHandler<M, S>>,
188 remote: UninitializedRemote,
189 meta_extractor: Arc<MetaExtractor<M>>,
190 request_middleware: Arc<RequestMiddleware>,
191 cors_domains: CorsDomains,
192 allowed_hosts: AllowedHosts,
193 rest_api: RestApi,
194 keep_alive: bool,
195 threads: usize,
196}
197
198const SENDER_PROOF: &'static str = "Server initialization awaits local address.";
199
200impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
201 pub fn new<T>(handler: T) -> Self where
210 T: Into<MetaIoHandler<M, S>>
211 {
212 ServerBuilder {
213 handler: Arc::new(handler.into()),
214 remote: UninitializedRemote::Unspawned,
215 meta_extractor: Arc::new(NoopExtractor::default()),
216 request_middleware: Arc::new(NoopRequestMiddleware::default()),
217 cors_domains: None,
218 allowed_hosts: None,
219 rest_api: RestApi::Disabled,
220 keep_alive: true,
221 threads: 1,
222 }
223 }
224
225 pub fn event_loop_remote(mut self, remote: tokio_core::reactor::Remote) -> Self {
228 self.remote = UninitializedRemote::Shared(remote);
229 self
230 }
231
232 pub fn rest_api(mut self, rest_api: RestApi) -> Self {
236 self.rest_api = rest_api;
237 self
238 }
239
240 pub fn keep_alive(mut self, val: bool) -> Self {
243 self.keep_alive = val;
244 self
245 }
246
247 #[cfg(not(unix))]
250 pub fn threads(mut self, _threads: usize) -> Self {
251 warn!("Multi-threaded server is not available on Windows. Falling back to single thread.");
252 self
253 }
254
255 #[cfg(unix)]
258 pub fn threads(mut self, threads: usize) -> Self {
259 self.threads = threads;
260 self
261 }
262
263 pub fn cors(mut self, cors_domains: DomainsValidation<AccessControlAllowOrigin>) -> Self {
265 self.cors_domains = cors_domains.into();
266 self
267 }
268
269 pub fn request_middleware<T: RequestMiddleware>(mut self, middleware: T) -> Self {
271 self.request_middleware = Arc::new(middleware);
272 self
273 }
274
275 pub fn meta_extractor<T: MetaExtractor<M>>(mut self, extractor: T) -> Self {
277 self.meta_extractor = Arc::new(extractor);
278 self
279 }
280
281 pub fn allow_only_bind_host(mut self) -> Self {
283 self.allowed_hosts = Some(Vec::new());
284 self
285 }
286
287 pub fn allowed_hosts(mut self, allowed_hosts: DomainsValidation<Host>) -> Self {
289 self.allowed_hosts = allowed_hosts.into();
290 self
291 }
292
293 pub fn start_http(self, addr: &SocketAddr) -> io::Result<Server> {
295 let cors_domains = self.cors_domains;
296 let request_middleware = self.request_middleware;
297 let allowed_hosts = self.allowed_hosts;
298 let rs_jsonrpc_handler = Rpc {
299 handler: self.handler,
300 extractor: self.meta_extractor,
301 };
302 let rest_api = self.rest_api;
303 let keep_alive = self.keep_alive;
304 let reuse_port = self.threads > 1;
305
306 let (local_addr_tx, local_addr_rx) = mpsc::channel();
307 let (close, shutdown_signal) = oneshot::channel();
308 let eloop = self.remote.init_with_name("http.worker0")?;
309 serve(
310 (shutdown_signal, local_addr_tx),
311 eloop.remote(),
312 addr.to_owned(),
313 cors_domains.clone(),
314 request_middleware.clone(),
315 allowed_hosts.clone(),
316 rs_jsonrpc_handler.clone(),
317 rest_api,
318 keep_alive,
319 reuse_port,
320 );
321 let handles = (0..self.threads - 1).map(|i| {
322 let (local_addr_tx, local_addr_rx) = mpsc::channel();
323 let (close, shutdown_signal) = oneshot::channel();
324 let eloop = UninitializedRemote::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
325 serve(
326 (shutdown_signal, local_addr_tx),
327 eloop.remote(),
328 addr.to_owned(),
329 cors_domains.clone(),
330 request_middleware.clone(),
331 allowed_hosts.clone(),
332 rs_jsonrpc_handler.clone(),
333 rest_api,
334 keep_alive,
335 reuse_port,
336 );
337 Ok((eloop, close, local_addr_rx))
338 }).collect::<io::Result<Vec<_>>>()?;
339
340 let local_addr = recv_address(local_addr_rx);
342 let mut handles = handles.into_iter().map(|(eloop, close, local_addr_rx)| {
344 let _ = recv_address(local_addr_rx)?;
345 Ok((eloop, close))
346 }).collect::<io::Result<(Vec<_>)>>()?;
347 handles.push((eloop, close));
348 let (remotes, close) = handles.into_iter().unzip();
349
350 Ok(Server {
351 address: local_addr?,
352 remote: Some(remotes),
353 close: Some(close),
354 })
355 }
356}
357
358fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Result<SocketAddr> {
359 local_addr_rx.recv().map_err(|_| {
360 io::Error::new(io::ErrorKind::Interrupted, "")
361 })?
362}
363
364fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
365 signals: (oneshot::Receiver<()>, mpsc::Sender<io::Result<SocketAddr>>),
366 remote: tokio_core::reactor::Remote,
367 addr: SocketAddr,
368 cors_domains: CorsDomains,
369 request_middleware: Arc<RequestMiddleware>,
370 allowed_hosts: AllowedHosts,
371 rs_jsonrpc_handler: Rpc<M, S>,
372 rest_api: RestApi,
373 keep_alive: bool,
374 reuse_port: bool,
375) {
376 let (shutdown_signal, local_addr_tx) = signals;
377 remote.spawn(move |handle| {
378 let handle1 = handle.clone();
379 let bind = move || {
380 let listener = match addr {
381 SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
382 SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
383 };
384 configure_port(reuse_port, &listener)?;
385 listener.reuse_address(true)?;
386 listener.bind(&addr)?;
387 let listener = listener.listen(1024)?;
388 let listener = tokio_core::net::TcpListener::from_listener(listener, &addr, &handle1)?;
389 let local_addr = listener.local_addr()?;
393
394 Ok((listener, local_addr))
395 };
396
397 let bind_result = match bind() {
398 Ok((listener, local_addr)) => {
399 local_addr_tx.send(Ok(local_addr)).expect(SENDER_PROOF);
401
402 futures::future::ok((listener, local_addr))
403 },
404 Err(err) => {
405 local_addr_tx.send(Err(err)).expect(SENDER_PROOF);
407
408 futures::future::err(())
409 }
410 };
411
412 let handle = handle.clone();
413 bind_result.and_then(move |(listener, local_addr)| {
414 let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
415
416 let http = {
417 let mut http = server::Http::new();
418 http.keep_alive(keep_alive);
419 http
420 };
421 listener.incoming()
422 .for_each(move |(socket, addr)| {
423 http.bind_connection(&handle, socket, addr, ServerHandler::new(
424 rs_jsonrpc_handler.clone(),
425 cors_domains.clone(),
426 allowed_hosts.clone(),
427 request_middleware.clone(),
428 rest_api,
429 ));
430 Ok(())
431 })
432 .map_err(|e| {
433 warn!("Incoming streams error, closing sever: {:?}", e);
434 })
435 .select(shutdown_signal.map_err(|e| {
436 debug!("Shutdown signaller dropped, closing server: {:?}", e);
437 }))
438 .map(|_| ())
439 .map_err(|_| ())
440 })
441 });
442}
443
444#[cfg(unix)]
445fn configure_port(reuse: bool, tcp: &net2::TcpBuilder) -> io::Result<()> {
446 use net2::unix::*;
447
448 if reuse {
449 try!(tcp.reuse_port(true));
450 }
451
452 Ok(())
453}
454
455#[cfg(not(unix))]
456fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
457 Ok(())
458}
459
460pub struct Server {
462 address: SocketAddr,
463 remote: Option<Vec<Remote>>,
464 close: Option<Vec<oneshot::Sender<()>>>,
465}
466
467const PROOF: &'static str = "Server is always Some until self is consumed.";
468impl Server {
469 pub fn address(&self) -> &SocketAddr {
471 &self.address
472 }
473
474 pub fn close(mut self) {
476 for close in self.close.take().expect(PROOF) {
477 let _ = close.send(());
478 }
479
480 for remote in self.remote.take().expect(PROOF) {
481 remote.close();
482 }
483 }
484
485 pub fn wait(mut self) {
487 for remote in self.remote.take().expect(PROOF) {
488 remote.wait();
489 }
490 }
491}
492
493impl Drop for Server {
494 fn drop(&mut self) {
495 self.remote.take().map(|remotes| {
496 for remote in remotes { remote.close(); }
497 });
498 }
499}
500