1#![warn(missing_docs)]
25
26extern crate unicase;
27extern crate bitconch_jsonrpc_server_utils as server_utils;
28extern crate net2;
29
30pub extern crate bitconch_jsonrpc_core as jsonrpc_core;
31pub extern crate hyper;
32
33#[macro_use]
34extern crate log;
35
36mod handler;
37mod response;
38mod utils;
39#[cfg(test)]
40mod tests;
41
42use std::io;
43use std::sync::{mpsc, Arc};
44use std::net::SocketAddr;
45use std::thread;
46
47use hyper::{server, Body};
48use jsonrpc_core as jsonrpc;
49use jsonrpc::MetaIoHandler;
50use jsonrpc::futures::{self, Future, Stream, future};
51use jsonrpc::futures::sync::oneshot;
52use server_utils::reactor::{Executor, UninitializedExecutor};
53
54pub use server_utils::hosts::{Host, DomainsValidation};
55pub use server_utils::cors::{self, AccessControlAllowOrigin, Origin, AllowCors};
56pub use server_utils::tokio;
57pub use handler::ServerHandler;
58pub use utils::{is_host_allowed, cors_allow_origin, cors_allow_headers};
59pub use response::Response;
60
61pub enum RequestMiddlewareAction {
63 Proceed {
65 should_continue_on_invalid_cors: bool,
68 request: hyper::Request<Body>,
70 },
71 Respond {
73 should_validate_hosts: bool,
75 response: Box<Future<Item=hyper::Response<Body>, Error=hyper::Error> + Send>,
77 }
78}
79
80impl From<Response> for RequestMiddlewareAction {
81 fn from(o: Response) -> Self {
82 RequestMiddlewareAction::Respond {
83 should_validate_hosts: true,
84 response: Box::new(futures::future::ok(o.into())),
85 }
86 }
87}
88
89impl From<hyper::Response<Body>> for RequestMiddlewareAction {
90 fn from(response: hyper::Response<Body>) -> Self {
91 RequestMiddlewareAction::Respond {
92 should_validate_hosts: true,
93 response: Box::new(futures::future::ok(response)),
94 }
95 }
96}
97
98impl From<hyper::Request<Body>> for RequestMiddlewareAction {
99 fn from(request: hyper::Request<Body>) -> Self {
100 RequestMiddlewareAction::Proceed {
101 should_continue_on_invalid_cors: false,
102 request,
103 }
104 }
105}
106
107pub trait RequestMiddleware: Send + Sync + 'static {
109 fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction;
111}
112
113impl<F> RequestMiddleware for F where
114 F: Fn(hyper::Request<Body>) -> RequestMiddlewareAction + Sync + Send + 'static,
115{
116 fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction {
117 (*self)(request)
118 }
119}
120
121#[derive(Default)]
122struct NoopRequestMiddleware;
123impl RequestMiddleware for NoopRequestMiddleware {
124 fn on_request(&self, request: hyper::Request<Body>) -> RequestMiddlewareAction {
125 RequestMiddlewareAction::Proceed {
126 should_continue_on_invalid_cors: false,
127 request,
128 }
129 }
130}
131
132pub trait MetaExtractor<M: jsonrpc::Metadata>: Sync + Send + 'static {
134 fn read_metadata(&self, _: &hyper::Request<Body>) -> M;
136}
137
138impl<M, F> MetaExtractor<M> for F where
139 M: jsonrpc::Metadata,
140 F: Fn(&hyper::Request<Body>) -> M + Sync + Send + 'static,
141{
142 fn read_metadata(&self, req: &hyper::Request<Body>) -> M {
143 (*self)(req)
144 }
145}
146
147#[derive(Default)]
148struct NoopExtractor;
149impl<M: jsonrpc::Metadata + Default> MetaExtractor<M> for NoopExtractor {
150 fn read_metadata(&self, _: &hyper::Request<Body>) -> M {
151 M::default()
152 }
153}
154pub struct Rpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
157 pub handler: Arc<MetaIoHandler<M, S>>,
159 pub extractor: Arc<MetaExtractor<M>>,
161}
162
163impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for Rpc<M, S> {
164 fn clone(&self) -> Self {
165 Rpc {
166 handler: self.handler.clone(),
167 extractor: self.extractor.clone(),
168 }
169 }
170}
171
172type AllowedHosts = Option<Vec<Host>>;
173type CorsDomains = Option<Vec<AccessControlAllowOrigin>>;
174
175#[derive(Debug, PartialEq, Clone, Copy)]
177pub enum RestApi {
178 Secure,
184 Unsecure,
189 Disabled,
191}
192
193pub struct ServerBuilder<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
195 handler: Arc<MetaIoHandler<M, S>>,
196 executor: UninitializedExecutor,
197 meta_extractor: Arc<MetaExtractor<M>>,
198 request_middleware: Arc<RequestMiddleware>,
199 cors_domains: CorsDomains,
200 cors_max_age: Option<u32>,
201 allowed_headers: cors::AccessControlAllowHeaders,
202 allowed_hosts: AllowedHosts,
203 rest_api: RestApi,
204 health_api: Option<(String, String)>,
205 keep_alive: bool,
206 threads: usize,
207 max_request_body_size: usize,
208}
209
210impl<M: jsonrpc::Metadata + Default, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
211 pub fn new<T>(handler: T) -> Self where
217 T: Into<MetaIoHandler<M, S>>
218 {
219 Self::with_meta_extractor(handler, NoopExtractor)
220 }
221}
222
223impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
224 pub fn with_meta_extractor<T, E>(handler: T, extractor: E) -> Self where
230 T: Into<MetaIoHandler<M, S>>,
231 E: MetaExtractor<M>,
232 {
233 ServerBuilder {
234 handler: Arc::new(handler.into()),
235 executor: UninitializedExecutor::Unspawned,
236 meta_extractor: Arc::new(extractor),
237 request_middleware: Arc::new(NoopRequestMiddleware::default()),
238 cors_domains: None,
239 cors_max_age: None,
240 allowed_headers: cors::AccessControlAllowHeaders::Any,
241 allowed_hosts: None,
242 rest_api: RestApi::Disabled,
243 health_api: None,
244 keep_alive: true,
245 threads: 1,
246 max_request_body_size: 5 * 1024 * 1024,
247 }
248 }
249
250 pub fn event_loop_executor(mut self, executor: tokio::runtime::TaskExecutor) -> Self {
254 self.executor = UninitializedExecutor::Shared(executor);
255 self
256 }
257
258 pub fn rest_api(mut self, rest_api: RestApi) -> Self {
263 self.rest_api = rest_api;
264 self
265 }
266
267 pub fn health_api<A, B, T>(mut self, health_api: T) -> Self where
275 T: Into<Option<(A, B)>>,
276 A: Into<String>,
277 B: Into<String>,
278 {
279 self.health_api = health_api.into().map(|(a, b)| (a.into(), b.into()));
280 self
281 }
282
283 pub fn keep_alive(mut self, val: bool) -> Self {
287 self.keep_alive = val;
288 self
289 }
290
291 #[cfg(not(unix))]
295 pub fn threads(mut self, _threads: usize) -> Self {
296 warn!("Multi-threaded server is not available on Windows. Falling back to single thread.");
297 self
298 }
299
300 #[cfg(unix)]
304 pub fn threads(mut self, threads: usize) -> Self {
305 self.threads = threads;
306 self
307 }
308
309 pub fn cors(mut self, cors_domains: DomainsValidation<AccessControlAllowOrigin>) -> Self {
311 self.cors_domains = cors_domains.into();
312 self
313 }
314
315 pub fn cors_max_age<T: Into<Option<u32>>>(mut self, cors_max_age: T) -> Self {
321 self.cors_max_age = cors_max_age.into();
322 self
323 }
324
325 pub fn cors_allow_headers(mut self, allowed_headers: cors::AccessControlAllowHeaders) -> Self {
327 self.allowed_headers = allowed_headers.into();
328 self
329 }
330
331 pub fn request_middleware<T: RequestMiddleware>(mut self, middleware: T) -> Self {
333 self.request_middleware = Arc::new(middleware);
334 self
335 }
336
337 pub fn meta_extractor<T: MetaExtractor<M>>(mut self, extractor: T) -> Self {
339 self.meta_extractor = Arc::new(extractor);
340 self
341 }
342
343 pub fn allow_only_bind_host(mut self) -> Self {
345 self.allowed_hosts = Some(Vec::new());
346 self
347 }
348
349 pub fn allowed_hosts(mut self, allowed_hosts: DomainsValidation<Host>) -> Self {
351 self.allowed_hosts = allowed_hosts.into();
352 self
353 }
354
355 pub fn max_request_body_size(mut self, val: usize) -> Self {
357 self.max_request_body_size = val;
358 self
359 }
360
361 pub fn start_http(self, addr: &SocketAddr) -> io::Result<Server> {
363 let cors_domains = self.cors_domains;
364 let cors_max_age = self.cors_max_age;
365 let allowed_headers = self.allowed_headers;
366 let request_middleware = self.request_middleware;
367 let allowed_hosts = self.allowed_hosts;
368 let jsonrpc_handler = Rpc {
369 handler: self.handler,
370 extractor: self.meta_extractor,
371 };
372 let rest_api = self.rest_api;
373 let health_api = self.health_api;
374 let keep_alive = self.keep_alive;
375 let reuse_port = self.threads > 1;
376
377 let (local_addr_tx, local_addr_rx) = mpsc::channel();
378 let (close, shutdown_signal) = oneshot::channel();
379 let eloop = self.executor.init_with_name("http.worker0")?;
380 let req_max_size = self.max_request_body_size;
381 serve(
382 (shutdown_signal, local_addr_tx),
383 eloop.executor(),
384 addr.to_owned(),
385 cors_domains.clone(),
386 cors_max_age,
387 allowed_headers.clone(),
388 request_middleware.clone(),
389 allowed_hosts.clone(),
390 jsonrpc_handler.clone(),
391 rest_api,
392 health_api.clone(),
393 keep_alive,
394 reuse_port,
395 req_max_size,
396 );
397 let handles = (0..self.threads - 1).map(|i| {
398 let (local_addr_tx, local_addr_rx) = mpsc::channel();
399 let (close, shutdown_signal) = oneshot::channel();
400 let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
401 serve(
402 (shutdown_signal, local_addr_tx),
403 eloop.executor(),
404 addr.to_owned(),
405 cors_domains.clone(),
406 cors_max_age,
407 allowed_headers.clone(),
408 request_middleware.clone(),
409 allowed_hosts.clone(),
410 jsonrpc_handler.clone(),
411 rest_api,
412 health_api.clone(),
413 keep_alive,
414 reuse_port,
415 req_max_size,
416 );
417 Ok((eloop, close, local_addr_rx))
418 }).collect::<io::Result<Vec<_>>>()?;
419
420 let local_addr = recv_address(local_addr_rx);
422 let mut handles = handles.into_iter().map(|(eloop, close, local_addr_rx)| {
424 let _ = recv_address(local_addr_rx)?;
425 Ok((eloop, close))
426 }).collect::<io::Result<(Vec<_>)>>()?;
427 handles.push((eloop, close));
428 let (executors, close) = handles.into_iter().unzip();
429
430 Ok(Server {
431 address: local_addr?,
432 executor: Some(executors),
433 close: Some(close),
434 })
435 }
436}
437
438fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Result<SocketAddr> {
439 local_addr_rx.recv().map_err(|_| {
440 io::Error::new(io::ErrorKind::Interrupted, "")
441 })?
442}
443
444fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
445 signals: (oneshot::Receiver<()>, mpsc::Sender<io::Result<SocketAddr>>),
446 executor: tokio::runtime::TaskExecutor,
447 addr: SocketAddr,
448 cors_domains: CorsDomains,
449 cors_max_age: Option<u32>,
450 allowed_headers: cors::AccessControlAllowHeaders,
451 request_middleware: Arc<RequestMiddleware>,
452 allowed_hosts: AllowedHosts,
453 jsonrpc_handler: Rpc<M, S>,
454 rest_api: RestApi,
455 health_api: Option<(String, String)>,
456 keep_alive: bool,
457 reuse_port: bool,
458 max_request_body_size: usize,
459) {
460 let (shutdown_signal, local_addr_tx) = signals;
461 executor.spawn(future::lazy(move || {
462 let handle = tokio::reactor::Handle::current();
463
464 let bind = move || {
465 let listener = match addr {
466 SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
467 SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
468 };
469 configure_port(reuse_port, &listener)?;
470 listener.reuse_address(true)?;
471 listener.bind(&addr)?;
472 let listener = listener.listen(1024)?;
473 let listener = tokio::net::TcpListener::from_std(listener, &handle)?;
474 let local_addr = listener.local_addr()?;
478
479 Ok((listener, local_addr))
480 };
481
482 let bind_result = match bind() {
483 Ok((listener, local_addr)) => {
484 match local_addr_tx.send(Ok(local_addr)) {
486 Ok(_) => futures::future::ok((listener, local_addr)),
487 Err(_) => {
488 warn!("Thread {:?} unable to reach receiver, closing server", thread::current().name());
489 futures::future::err(())
490 },
491 }
492 },
493 Err(err) => {
494 let _send_result = local_addr_tx.send(Err(err));
496
497 futures::future::err(())
498 }
499 };
500
501 bind_result.and_then(move |(listener, local_addr)| {
502 let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
503
504 let mut http = server::conn::Http::new();
505 http.keep_alive(keep_alive);
506
507 listener.incoming()
508 .for_each(move |socket| {
509 let service = ServerHandler::new(
510 jsonrpc_handler.clone(),
511 cors_domains.clone(),
512 cors_max_age,
513 allowed_headers.clone(),
514 allowed_hosts.clone(),
515 request_middleware.clone(),
516 rest_api,
517 health_api.clone(),
518 max_request_body_size,
519 );
520 tokio::spawn(http.serve_connection(socket, service)
521 .map_err(|e| error!("Error serving connection: {:?}", e)));
522 Ok(())
523 })
524 .map_err(|e| {
525 warn!("Incoming streams error, closing sever: {:?}", e);
526 })
527 .select(shutdown_signal.map_err(|e| {
528 debug!("Shutdown signaller dropped, closing server: {:?}", e);
529 }))
530 .map(|_| ())
531 .map_err(|_| ())
532 })
533 }));
534}
535
536#[cfg(unix)]
537fn configure_port(reuse: bool, tcp: &net2::TcpBuilder) -> io::Result<()> {
538 use net2::unix::*;
539
540 if reuse {
541 try!(tcp.reuse_port(true));
542 }
543
544 Ok(())
545}
546
547#[cfg(not(unix))]
548fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
549 Ok(())
550}
551
552pub struct Server {
554 address: SocketAddr,
555 executor: Option<Vec<Executor>>,
556 close: Option<Vec<oneshot::Sender<()>>>,
557}
558
559const PROOF: &'static str = "Server is always Some until self is consumed.";
560impl Server {
561 pub fn address(&self) -> &SocketAddr {
563 &self.address
564 }
565
566 pub fn close(mut self) {
568 for close in self.close.take().expect(PROOF) {
569 let _ = close.send(());
570 }
571
572 for executor in self.executor.take().expect(PROOF) {
573 executor.close();
574 }
575 }
576
577 pub fn wait(mut self) {
579 for executor in self.executor.take().expect(PROOF) {
580 executor.wait();
581 }
582 }
583}
584
585impl Drop for Server {
586 fn drop(&mut self) {
587 self.executor.take().map(|executors| {
588 for executor in executors { executor.close(); }
589 });
590 }
591}