1#![deny(missing_docs)]
22
23use jsonrpc_server_utils as server_utils;
24use net2;
25
26pub use hyper;
27pub use jsonrpc_core;
28
29#[macro_use]
30extern crate log;
31
32mod handler;
33mod response;
34#[cfg(test)]
35mod tests;
36mod utils;
37
38use std::io;
39use std::net::SocketAddr;
40use std::sync::{mpsc, Arc, Weak};
41use std::thread;
42
43use parking_lot::Mutex;
44
45use crate::jsonrpc::MetaIoHandler;
46use crate::server_utils::reactor::{Executor, UninitializedExecutor};
47use futures01::sync::oneshot;
48use futures01::{future, Future, Stream};
49use hyper::{server, Body};
50use jsonrpc_core as jsonrpc;
51
52pub use crate::handler::ServerHandler;
53pub use crate::response::Response;
54pub use crate::server_utils::cors::{self, AccessControlAllowOrigin, AllowCors, Origin};
55pub use crate::server_utils::hosts::{DomainsValidation, Host};
56pub use crate::server_utils::{tokio, SuspendableStream};
57pub use crate::utils::{cors_allow_headers, cors_allow_origin, is_host_allowed};
58
59pub enum RequestMiddlewareAction {
61 Proceed {
63 should_continue_on_invalid_cors: bool,
66 request: hyper::Request<Body>,
68 },
69 Respond {
71 should_validate_hosts: bool,
73 response: Box<dyn Future<Item = hyper::Response<Body>, Error = hyper::Error> + Send>,
75 },
76}
77
78impl From<Response> for RequestMiddlewareAction {
79 fn from(o: Response) -> Self {
80 RequestMiddlewareAction::Respond {
81 should_validate_hosts: true,
82 response: Box::new(future::ok(o.into())),
83 }
84 }
85}
86
87impl From<hyper::Response<Body>> for RequestMiddlewareAction {
88 fn from(response: hyper::Response<Body>) -> Self {
89 RequestMiddlewareAction::Respond {
90 should_validate_hosts: true,
91 response: Box::new(future::ok(response)),
92 }
93 }
94}
95
96impl From<hyper::Request<Body>> for RequestMiddlewareAction {
97 fn from(request: hyper::Request<Body>) -> Self {
98 RequestMiddlewareAction::Proceed {
99 should_continue_on_invalid_cors: false,
100 request,
101 }
102 }
103}
104
105pub trait RequestMiddleware: Send + Sync + 'static {
107 fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction;
109}
110
111impl<F> RequestMiddleware for F
112where
113 F: Fn(hyper::Request<Body>) -> RequestMiddlewareAction + Sync + Send + 'static,
114{
115 fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction {
116 (*self)(request)
117 }
118}
119
120#[derive(Default)]
121struct NoopRequestMiddleware;
122impl RequestMiddleware for NoopRequestMiddleware {
123 fn on_request(&self, request: hyper::Request<Body>) -> RequestMiddlewareAction {
124 RequestMiddlewareAction::Proceed {
125 should_continue_on_invalid_cors: false,
126 request,
127 }
128 }
129}
130
131pub trait MetaExtractor<M: jsonrpc::Metadata>: Sync + Send + 'static {
133 fn read_metadata(&self, _: &hyper::Request<Body>) -> M;
135}
136
137impl<M, F> MetaExtractor<M> for F
138where
139 M: jsonrpc::Metadata,
140 F: Fn(&hyper::Request<Body>) -> M + Sync + Send + 'static,
141{
142 fn read_metadata(&self, req: &hyper::Request<Body>) -> M {
143 (*self)(req)
144 }
145}
146
147#[derive(Default)]
148struct NoopExtractor;
149impl<M: jsonrpc::Metadata + Default> MetaExtractor<M> for NoopExtractor {
150 fn read_metadata(&self, _: &hyper::Request<Body>) -> M {
151 M::default()
152 }
153}
154pub struct Rpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
157 pub handler: Arc<MetaIoHandler<M, S>>,
159 pub extractor: Arc<dyn MetaExtractor<M>>,
161}
162
163impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for Rpc<M, S> {
164 fn clone(&self) -> Self {
165 Rpc {
166 handler: self.handler.clone(),
167 extractor: self.extractor.clone(),
168 }
169 }
170}
171
172impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Rpc<M, S> {
173 pub fn downgrade(&self) -> WeakRpc<M, S> {
177 WeakRpc {
178 handler: Arc::downgrade(&self.handler),
179 extractor: Arc::downgrade(&self.extractor),
180 }
181 }
182}
183pub struct WeakRpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
189 handler: Weak<MetaIoHandler<M, S>>,
190 extractor: Weak<dyn MetaExtractor<M>>,
191}
192
193impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for WeakRpc<M, S> {
194 fn clone(&self) -> Self {
195 WeakRpc {
196 handler: self.handler.clone(),
197 extractor: self.extractor.clone(),
198 }
199 }
200}
201
202impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> WeakRpc<M, S> {
203 pub fn upgrade(&self) -> Option<Rpc<M, S>> {
205 let handler = self.handler.upgrade()?;
206 let extractor = self.extractor.upgrade()?;
207
208 Some(Rpc { handler, extractor })
209 }
210}
211
212type AllowedHosts = Option<Vec<Host>>;
213type CorsDomains = Option<Vec<AccessControlAllowOrigin>>;
214
215#[derive(Debug, PartialEq, Clone, Copy)]
217pub enum RestApi {
218 Secure,
224 Unsecure,
229 Disabled,
231}
232
233pub struct ServerBuilder<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
235 handler: Arc<MetaIoHandler<M, S>>,
236 executor: UninitializedExecutor,
237 meta_extractor: Arc<dyn MetaExtractor<M>>,
238 request_middleware: Arc<dyn RequestMiddleware>,
239 cors_domains: CorsDomains,
240 cors_max_age: Option<u32>,
241 allowed_headers: cors::AccessControlAllowHeaders,
242 allowed_hosts: AllowedHosts,
243 rest_api: RestApi,
244 health_api: Option<(String, String)>,
245 keep_alive: bool,
246 threads: usize,
247 max_request_body_size: usize,
248}
249
250impl<M: jsonrpc::Metadata + Default, S: jsonrpc::Middleware<M>> ServerBuilder<M, S>
251where
252 S::Future: Unpin,
253 S::CallFuture: Unpin,
254{
255 pub fn new<T>(handler: T) -> Self
261 where
262 T: Into<MetaIoHandler<M, S>>,
263 {
264 Self::with_meta_extractor(handler, NoopExtractor)
265 }
266}
267
268impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S>
269where
270 S::Future: Unpin,
271 S::CallFuture: Unpin,
272{
273 pub fn with_meta_extractor<T, E>(handler: T, extractor: E) -> Self
279 where
280 T: Into<MetaIoHandler<M, S>>,
281 E: MetaExtractor<M>,
282 {
283 ServerBuilder {
284 handler: Arc::new(handler.into()),
285 executor: UninitializedExecutor::Unspawned,
286 meta_extractor: Arc::new(extractor),
287 request_middleware: Arc::new(NoopRequestMiddleware::default()),
288 cors_domains: None,
289 cors_max_age: None,
290 allowed_headers: cors::AccessControlAllowHeaders::Any,
291 allowed_hosts: None,
292 rest_api: RestApi::Disabled,
293 health_api: None,
294 keep_alive: true,
295 threads: 1,
296 max_request_body_size: 5 * 1024 * 1024,
297 }
298 }
299
300 pub fn event_loop_executor(mut self, executor: tokio::runtime::TaskExecutor) -> Self {
304 self.executor = UninitializedExecutor::Shared(executor);
305 self
306 }
307
308 pub fn rest_api(mut self, rest_api: RestApi) -> Self {
313 self.rest_api = rest_api;
314 self
315 }
316
317 pub fn health_api<A, B, T>(mut self, health_api: T) -> Self
325 where
326 T: Into<Option<(A, B)>>,
327 A: Into<String>,
328 B: Into<String>,
329 {
330 self.health_api = health_api.into().map(|(a, b)| (a.into(), b.into()));
331 self
332 }
333
334 pub fn keep_alive(mut self, val: bool) -> Self {
338 self.keep_alive = val;
339 self
340 }
341
342 #[cfg(not(unix))]
346 #[allow(unused_mut)]
347 pub fn threads(mut self, _threads: usize) -> Self {
348 warn!("Multi-threaded server is not available on Windows. Falling back to single thread.");
349 self
350 }
351
352 #[cfg(unix)]
362 pub fn threads(mut self, threads: usize) -> Self {
363 self.threads = threads;
364 self
365 }
366
367 pub fn cors(mut self, cors_domains: DomainsValidation<AccessControlAllowOrigin>) -> Self {
369 self.cors_domains = cors_domains.into();
370 self
371 }
372
373 pub fn cors_max_age<T: Into<Option<u32>>>(mut self, cors_max_age: T) -> Self {
378 self.cors_max_age = cors_max_age.into();
379 self
380 }
381
382 pub fn cors_allow_headers(mut self, allowed_headers: cors::AccessControlAllowHeaders) -> Self {
384 self.allowed_headers = allowed_headers;
385 self
386 }
387
388 pub fn request_middleware<T: RequestMiddleware>(mut self, middleware: T) -> Self {
390 self.request_middleware = Arc::new(middleware);
391 self
392 }
393
394 pub fn meta_extractor<T: MetaExtractor<M>>(mut self, extractor: T) -> Self {
396 self.meta_extractor = Arc::new(extractor);
397 self
398 }
399
400 pub fn allow_only_bind_host(mut self) -> Self {
402 self.allowed_hosts = Some(Vec::new());
403 self
404 }
405
406 pub fn allowed_hosts(mut self, allowed_hosts: DomainsValidation<Host>) -> Self {
408 self.allowed_hosts = allowed_hosts.into();
409 self
410 }
411
412 pub fn max_request_body_size(mut self, val: usize) -> Self {
414 self.max_request_body_size = val;
415 self
416 }
417
418 pub fn start_http(self, addr: &SocketAddr) -> io::Result<Server> {
420 let cors_domains = self.cors_domains;
421 let cors_max_age = self.cors_max_age;
422 let allowed_headers = self.allowed_headers;
423 let request_middleware = self.request_middleware;
424 let allowed_hosts = self.allowed_hosts;
425 let jsonrpc_handler = Rpc {
426 handler: self.handler,
427 extractor: self.meta_extractor,
428 };
429 let rest_api = self.rest_api;
430 let health_api = self.health_api;
431 let keep_alive = self.keep_alive;
432 let reuse_port = self.threads > 1;
433
434 let (local_addr_tx, local_addr_rx) = mpsc::channel();
435 let (close, shutdown_signal) = oneshot::channel();
436 let (done_tx, done_rx) = oneshot::channel();
437 let eloop = self.executor.init_with_name("http.worker0")?;
438 let req_max_size = self.max_request_body_size;
439 serve(
441 (shutdown_signal, local_addr_tx, done_tx),
442 eloop.executor(),
443 addr.to_owned(),
444 cors_domains.clone(),
445 cors_max_age,
446 allowed_headers.clone(),
447 request_middleware.clone(),
448 allowed_hosts.clone(),
449 jsonrpc_handler.clone(),
450 rest_api,
451 health_api.clone(),
452 keep_alive,
453 reuse_port,
454 req_max_size,
455 );
456 let handles = (0..self.threads - 1)
457 .map(|i| {
458 let (local_addr_tx, local_addr_rx) = mpsc::channel();
459 let (close, shutdown_signal) = oneshot::channel();
460 let (done_tx, done_rx) = oneshot::channel();
461 let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
462 serve(
463 (shutdown_signal, local_addr_tx, done_tx),
464 eloop.executor(),
465 addr.to_owned(),
466 cors_domains.clone(),
467 cors_max_age,
468 allowed_headers.clone(),
469 request_middleware.clone(),
470 allowed_hosts.clone(),
471 jsonrpc_handler.clone(),
472 rest_api,
473 health_api.clone(),
474 keep_alive,
475 reuse_port,
476 req_max_size,
477 );
478 Ok((eloop, close, local_addr_rx, done_rx))
479 })
480 .collect::<io::Result<Vec<_>>>()?;
481
482 let local_addr = recv_address(local_addr_rx);
484 let mut handles: Vec<(Executor, oneshot::Sender<()>, oneshot::Receiver<()>)> = handles
486 .into_iter()
487 .map(|(eloop, close, local_addr_rx, done_rx)| {
488 let _ = recv_address(local_addr_rx)?;
489 Ok((eloop, close, done_rx))
490 })
491 .collect::<io::Result<Vec<_>>>()?;
492 handles.push((eloop, close, done_rx));
493
494 let (executors, done_rxs) = handles
495 .into_iter()
496 .fold((vec![], vec![]), |mut acc, (eloop, closer, done_rx)| {
497 acc.0.push((eloop, closer));
498 acc.1.push(done_rx);
499 acc
500 });
501
502 Ok(Server {
503 address: local_addr?,
504 executors: Arc::new(Mutex::new(Some(executors))),
505 done: Some(done_rxs),
506 })
507 }
508}
509
510fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Result<SocketAddr> {
511 local_addr_rx
512 .recv()
513 .map_err(|_| io::Error::new(io::ErrorKind::Interrupted, ""))?
514}
515
516fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
517 signals: (
518 oneshot::Receiver<()>,
519 mpsc::Sender<io::Result<SocketAddr>>,
520 oneshot::Sender<()>,
521 ),
522 executor: tokio::runtime::TaskExecutor,
523 addr: SocketAddr,
524 cors_domains: CorsDomains,
525 cors_max_age: Option<u32>,
526 allowed_headers: cors::AccessControlAllowHeaders,
527 request_middleware: Arc<dyn RequestMiddleware>,
528 allowed_hosts: AllowedHosts,
529 jsonrpc_handler: Rpc<M, S>,
530 rest_api: RestApi,
531 health_api: Option<(String, String)>,
532 keep_alive: bool,
533 reuse_port: bool,
534 max_request_body_size: usize,
535) where
536 S::Future: Unpin,
537 S::CallFuture: Unpin,
538{
539 let (shutdown_signal, local_addr_tx, done_tx) = signals;
540 executor.spawn({
541 let handle = tokio::reactor::Handle::default();
542
543 let bind = move || {
544 let listener = match addr {
545 SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
546 SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
547 };
548 configure_port(reuse_port, &listener)?;
549 listener.reuse_address(true)?;
550 listener.bind(&addr)?;
551 let listener = listener.listen(1024)?;
552 let listener = tokio::net::TcpListener::from_std(listener, &handle)?;
553 let local_addr = listener.local_addr()?;
557
558 Ok((listener, local_addr))
559 };
560
561 let bind_result = match bind() {
562 Ok((listener, local_addr)) => {
563 match local_addr_tx.send(Ok(local_addr)) {
565 Ok(_) => future::ok((listener, local_addr)),
566 Err(_) => {
567 warn!(
568 "Thread {:?} unable to reach receiver, closing server",
569 thread::current().name()
570 );
571 future::err(())
572 }
573 }
574 }
575 Err(err) => {
576 let _send_result = local_addr_tx.send(Err(err));
578
579 future::err(())
580 }
581 };
582
583 bind_result
584 .and_then(move |(listener, local_addr)| {
585 let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
586
587 let mut http = server::conn::Http::new();
588 http.keep_alive(keep_alive);
589 let tcp_stream = SuspendableStream::new(listener.incoming());
590
591 tcp_stream
592 .map(move |socket| {
593 let service = ServerHandler::new(
594 jsonrpc_handler.downgrade(),
595 cors_domains.clone(),
596 cors_max_age,
597 allowed_headers.clone(),
598 allowed_hosts.clone(),
599 request_middleware.clone(),
600 rest_api,
601 health_api.clone(),
602 max_request_body_size,
603 keep_alive,
604 );
605
606 tokio::spawn(
607 http.serve_connection(socket, service)
608 .map_err(|e| error!("Error serving connection: {:?}", e))
609 .then(|_| Ok(())),
610 )
611 })
612 .for_each(|_| Ok(()))
613 .map_err(|e| {
614 warn!("Incoming streams error, closing sever: {:?}", e);
615 })
616 .select(shutdown_signal.map_err(|e| {
617 debug!("Shutdown signaller dropped, closing server: {:?}", e);
618 }))
619 .map_err(|_| ())
620 })
621 .and_then(|(_, server)| {
622 drop(server);
625 done_tx.send(())
626 })
627 });
628}
629
630#[cfg(unix)]
631fn configure_port(reuse: bool, tcp: &net2::TcpBuilder) -> io::Result<()> {
632 use net2::unix::*;
633
634 if reuse {
635 tcp.reuse_port(true)?;
636 }
637
638 Ok(())
639}
640
641#[cfg(not(unix))]
642fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
643 Ok(())
644}
645
646#[derive(Clone)]
650pub struct CloseHandle(Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>);
651
652impl CloseHandle {
653 pub fn close(self) {
655 if let Some(executors) = self.0.lock().take() {
656 for (executor, closer) in executors {
657 executor.close();
658 let _ = closer.send(());
659 }
660 }
661 }
662}
663
664pub struct Server {
666 address: SocketAddr,
667 executors: Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>,
668 done: Option<Vec<oneshot::Receiver<()>>>,
669}
670
671impl Server {
672 pub fn address(&self) -> &SocketAddr {
674 &self.address
675 }
676
677 pub fn close(self) {
679 self.close_handle().close()
680 }
681
682 pub fn wait(mut self) {
684 self.wait_internal();
685 }
686
687 pub fn close_handle(&self) -> CloseHandle {
690 CloseHandle(self.executors.clone())
691 }
692
693 fn wait_internal(&mut self) {
694 if let Some(receivers) = self.done.take() {
695 for receiver in receivers {
696 let _ = receiver.wait();
697 }
698 }
699 }
700}
701
702impl Drop for Server {
703 fn drop(&mut self) {
704 self.close_handle().close();
705 self.wait_internal();
706 }
707}