1#![deny(missing_docs)]
22
23use tetsy_jsonrpc_server_utils as server_utils;
24use net2;
25
26pub use hyper;
27pub use tetsy_jsonrpc_core;
28
29#[macro_use]
30extern crate log;
31
32mod handler;
33mod response;
34#[cfg(test)]
35mod tests;
36mod utils;
37
38use std::io;
39use std::net::SocketAddr;
40use std::sync::{mpsc, Arc, Weak};
41use std::thread;
42
43use parking_lot::Mutex;
44
45use crate::jsonrpc::futures::sync::oneshot;
46use crate::jsonrpc::futures::{self, Future, Stream};
47use crate::jsonrpc::MetaIoHandler;
48use crate::server_utils::reactor::{Executor, UninitializedExecutor};
49use hyper::{server, Body};
50use tetsy_jsonrpc_core as jsonrpc;
51
52pub use crate::handler::ServerHandler;
53pub use crate::response::Response;
54pub use crate::server_utils::cors::{self, AccessControlAllowOrigin, AllowCors, Origin};
55pub use crate::server_utils::hosts::{DomainsValidation, Host};
56pub use crate::server_utils::{tokio, SuspendableStream};
57pub use crate::utils::{cors_allow_headers, cors_allow_origin, is_host_allowed};
58
59pub enum RequestMiddlewareAction {
61 Proceed {
63 should_continue_on_invalid_cors: bool,
66 request: hyper::Request<Body>,
68 },
69 Respond {
71 should_validate_hosts: bool,
73 response: Box<dyn Future<Item = hyper::Response<Body>, Error = hyper::Error> + Send>,
75 },
76}
77
78impl From<Response> for RequestMiddlewareAction {
79 fn from(o: Response) -> Self {
80 RequestMiddlewareAction::Respond {
81 should_validate_hosts: true,
82 response: Box::new(futures::future::ok(o.into())),
83 }
84 }
85}
86
87impl From<hyper::Response<Body>> for RequestMiddlewareAction {
88 fn from(response: hyper::Response<Body>) -> Self {
89 RequestMiddlewareAction::Respond {
90 should_validate_hosts: true,
91 response: Box::new(futures::future::ok(response)),
92 }
93 }
94}
95
96impl From<hyper::Request<Body>> for RequestMiddlewareAction {
97 fn from(request: hyper::Request<Body>) -> Self {
98 RequestMiddlewareAction::Proceed {
99 should_continue_on_invalid_cors: false,
100 request,
101 }
102 }
103}
104
105pub trait RequestMiddleware: Send + Sync + 'static {
107 fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction;
109}
110
111impl<F> RequestMiddleware for F
112where
113 F: Fn(hyper::Request<Body>) -> RequestMiddlewareAction + Sync + Send + 'static,
114{
115 fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction {
116 (*self)(request)
117 }
118}
119
120#[derive(Default)]
121struct NoopRequestMiddleware;
122impl RequestMiddleware for NoopRequestMiddleware {
123 fn on_request(&self, request: hyper::Request<Body>) -> RequestMiddlewareAction {
124 RequestMiddlewareAction::Proceed {
125 should_continue_on_invalid_cors: false,
126 request,
127 }
128 }
129}
130
131pub trait MetaExtractor<M: jsonrpc::Metadata>: Sync + Send + 'static {
133 fn read_metadata(&self, _: &hyper::Request<Body>) -> M;
135}
136
137impl<M, F> MetaExtractor<M> for F
138where
139 M: jsonrpc::Metadata,
140 F: Fn(&hyper::Request<Body>) -> M + Sync + Send + 'static,
141{
142 fn read_metadata(&self, req: &hyper::Request<Body>) -> M {
143 (*self)(req)
144 }
145}
146
147#[derive(Default)]
148struct NoopExtractor;
149impl<M: jsonrpc::Metadata + Default> MetaExtractor<M> for NoopExtractor {
150 fn read_metadata(&self, _: &hyper::Request<Body>) -> M {
151 M::default()
152 }
153}
154pub struct Rpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
157 pub handler: Arc<MetaIoHandler<M, S>>,
159 pub extractor: Arc<dyn MetaExtractor<M>>,
161}
162
163impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for Rpc<M, S> {
164 fn clone(&self) -> Self {
165 Rpc {
166 handler: self.handler.clone(),
167 extractor: self.extractor.clone(),
168 }
169 }
170}
171
172impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Rpc<M, S> {
173 pub fn downgrade(&self) -> WeakRpc<M, S> {
177 WeakRpc {
178 handler: Arc::downgrade(&self.handler),
179 extractor: Arc::downgrade(&self.extractor),
180 }
181 }
182}
183pub struct WeakRpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
189 handler: Weak<MetaIoHandler<M, S>>,
190 extractor: Weak<dyn MetaExtractor<M>>,
191}
192
193impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for WeakRpc<M, S> {
194 fn clone(&self) -> Self {
195 WeakRpc {
196 handler: self.handler.clone(),
197 extractor: self.extractor.clone(),
198 }
199 }
200}
201
202impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> WeakRpc<M, S> {
203 pub fn upgrade(&self) -> Option<Rpc<M, S>> {
205 let handler = self.handler.upgrade()?;
206 let extractor = self.extractor.upgrade()?;
207
208 Some(Rpc { handler, extractor })
209 }
210}
211
212type AllowedHosts = Option<Vec<Host>>;
213type CorsDomains = Option<Vec<AccessControlAllowOrigin>>;
214
215#[derive(Debug, PartialEq, Clone, Copy)]
217pub enum RestApi {
218 Secure,
224 Unsecure,
229 Disabled,
231}
232
233pub struct ServerBuilder<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
235 handler: Arc<MetaIoHandler<M, S>>,
236 executor: UninitializedExecutor,
237 meta_extractor: Arc<dyn MetaExtractor<M>>,
238 request_middleware: Arc<dyn RequestMiddleware>,
239 cors_domains: CorsDomains,
240 cors_max_age: Option<u32>,
241 allowed_headers: cors::AccessControlAllowHeaders,
242 allowed_hosts: AllowedHosts,
243 rest_api: RestApi,
244 health_api: Option<(String, String)>,
245 keep_alive: bool,
246 threads: usize,
247 max_request_body_size: usize,
248}
249
250impl<M: jsonrpc::Metadata + Default, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
251 pub fn new<T>(handler: T) -> Self
257 where
258 T: Into<MetaIoHandler<M, S>>,
259 {
260 Self::with_meta_extractor(handler, NoopExtractor)
261 }
262}
263
264impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
265 pub fn with_meta_extractor<T, E>(handler: T, extractor: E) -> Self
271 where
272 T: Into<MetaIoHandler<M, S>>,
273 E: MetaExtractor<M>,
274 {
275 ServerBuilder {
276 handler: Arc::new(handler.into()),
277 executor: UninitializedExecutor::Unspawned,
278 meta_extractor: Arc::new(extractor),
279 request_middleware: Arc::new(NoopRequestMiddleware::default()),
280 cors_domains: None,
281 cors_max_age: None,
282 allowed_headers: cors::AccessControlAllowHeaders::Any,
283 allowed_hosts: None,
284 rest_api: RestApi::Disabled,
285 health_api: None,
286 keep_alive: true,
287 threads: 1,
288 max_request_body_size: 5 * 1024 * 1024,
289 }
290 }
291
292 pub fn event_loop_executor(mut self, executor: tokio::runtime::TaskExecutor) -> Self {
296 self.executor = UninitializedExecutor::Shared(executor);
297 self
298 }
299
300 pub fn rest_api(mut self, rest_api: RestApi) -> Self {
305 self.rest_api = rest_api;
306 self
307 }
308
309 pub fn health_api<A, B, T>(mut self, health_api: T) -> Self
317 where
318 T: Into<Option<(A, B)>>,
319 A: Into<String>,
320 B: Into<String>,
321 {
322 self.health_api = health_api.into().map(|(a, b)| (a.into(), b.into()));
323 self
324 }
325
326 pub fn keep_alive(mut self, val: bool) -> Self {
330 self.keep_alive = val;
331 self
332 }
333
334 #[cfg(not(unix))]
338 #[allow(unused_mut)]
339 pub fn threads(mut self, _threads: usize) -> Self {
340 warn!("Multi-threaded server is not available on Windows. Falling back to single thread.");
341 self
342 }
343
344 #[cfg(unix)]
354 pub fn threads(mut self, threads: usize) -> Self {
355 self.threads = threads;
356 self
357 }
358
359 pub fn cors(mut self, cors_domains: DomainsValidation<AccessControlAllowOrigin>) -> Self {
361 self.cors_domains = cors_domains.into();
362 self
363 }
364
365 pub fn cors_max_age<T: Into<Option<u32>>>(mut self, cors_max_age: T) -> Self {
370 self.cors_max_age = cors_max_age.into();
371 self
372 }
373
374 pub fn cors_allow_headers(mut self, allowed_headers: cors::AccessControlAllowHeaders) -> Self {
376 self.allowed_headers = allowed_headers;
377 self
378 }
379
380 pub fn request_middleware<T: RequestMiddleware>(mut self, middleware: T) -> Self {
382 self.request_middleware = Arc::new(middleware);
383 self
384 }
385
386 pub fn meta_extractor<T: MetaExtractor<M>>(mut self, extractor: T) -> Self {
388 self.meta_extractor = Arc::new(extractor);
389 self
390 }
391
392 pub fn allow_only_bind_host(mut self) -> Self {
394 self.allowed_hosts = Some(Vec::new());
395 self
396 }
397
398 pub fn allowed_hosts(mut self, allowed_hosts: DomainsValidation<Host>) -> Self {
400 self.allowed_hosts = allowed_hosts.into();
401 self
402 }
403
404 pub fn max_request_body_size(mut self, val: usize) -> Self {
406 self.max_request_body_size = val;
407 self
408 }
409
410 pub fn start_http(self, addr: &SocketAddr) -> io::Result<Server> {
412 let cors_domains = self.cors_domains;
413 let cors_max_age = self.cors_max_age;
414 let allowed_headers = self.allowed_headers;
415 let request_middleware = self.request_middleware;
416 let allowed_hosts = self.allowed_hosts;
417 let tetsy_jsonrpc_handler = Rpc {
418 handler: self.handler,
419 extractor: self.meta_extractor,
420 };
421 let rest_api = self.rest_api;
422 let health_api = self.health_api;
423 let keep_alive = self.keep_alive;
424 let reuse_port = self.threads > 1;
425
426 let (local_addr_tx, local_addr_rx) = mpsc::channel();
427 let (close, shutdown_signal) = oneshot::channel();
428 let (done_tx, done_rx) = oneshot::channel();
429 let eloop = self.executor.init_with_name("http.worker0")?;
430 let req_max_size = self.max_request_body_size;
431 serve(
433 (shutdown_signal, local_addr_tx, done_tx),
434 eloop.executor(),
435 addr.to_owned(),
436 cors_domains.clone(),
437 cors_max_age,
438 allowed_headers.clone(),
439 request_middleware.clone(),
440 allowed_hosts.clone(),
441 tetsy_jsonrpc_handler.clone(),
442 rest_api,
443 health_api.clone(),
444 keep_alive,
445 reuse_port,
446 req_max_size,
447 );
448 let handles = (0..self.threads - 1)
449 .map(|i| {
450 let (local_addr_tx, local_addr_rx) = mpsc::channel();
451 let (close, shutdown_signal) = oneshot::channel();
452 let (done_tx, done_rx) = oneshot::channel();
453 let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
454 serve(
455 (shutdown_signal, local_addr_tx, done_tx),
456 eloop.executor(),
457 addr.to_owned(),
458 cors_domains.clone(),
459 cors_max_age,
460 allowed_headers.clone(),
461 request_middleware.clone(),
462 allowed_hosts.clone(),
463 tetsy_jsonrpc_handler.clone(),
464 rest_api,
465 health_api.clone(),
466 keep_alive,
467 reuse_port,
468 req_max_size,
469 );
470 Ok((eloop, close, local_addr_rx, done_rx))
471 })
472 .collect::<io::Result<Vec<_>>>()?;
473
474 let local_addr = recv_address(local_addr_rx);
476 let mut handles: Vec<(Executor, oneshot::Sender<()>, oneshot::Receiver<()>)> = handles
478 .into_iter()
479 .map(|(eloop, close, local_addr_rx, done_rx)| {
480 let _ = recv_address(local_addr_rx)?;
481 Ok((eloop, close, done_rx))
482 })
483 .collect::<io::Result<Vec<_>>>()?;
484 handles.push((eloop, close, done_rx));
485
486 let (executors, done_rxs) = handles
487 .into_iter()
488 .fold((vec![], vec![]), |mut acc, (eloop, closer, done_rx)| {
489 acc.0.push((eloop, closer));
490 acc.1.push(done_rx);
491 acc
492 });
493
494 Ok(Server {
495 address: local_addr?,
496 executors: Arc::new(Mutex::new(Some(executors))),
497 done: Some(done_rxs),
498 })
499 }
500}
501
502fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Result<SocketAddr> {
503 local_addr_rx
504 .recv()
505 .map_err(|_| io::Error::new(io::ErrorKind::Interrupted, ""))?
506}
507
508fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
509 signals: (
510 oneshot::Receiver<()>,
511 mpsc::Sender<io::Result<SocketAddr>>,
512 oneshot::Sender<()>,
513 ),
514 executor: tokio::runtime::TaskExecutor,
515 addr: SocketAddr,
516 cors_domains: CorsDomains,
517 cors_max_age: Option<u32>,
518 allowed_headers: cors::AccessControlAllowHeaders,
519 request_middleware: Arc<dyn RequestMiddleware>,
520 allowed_hosts: AllowedHosts,
521 tetsy_jsonrpc_handler: Rpc<M, S>,
522 rest_api: RestApi,
523 health_api: Option<(String, String)>,
524 keep_alive: bool,
525 reuse_port: bool,
526 max_request_body_size: usize,
527) {
528 let (shutdown_signal, local_addr_tx, done_tx) = signals;
529 executor.spawn({
530 let handle = tokio::reactor::Handle::default();
531
532 let bind = move || {
533 let listener = match addr {
534 SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
535 SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
536 };
537 configure_port(reuse_port, &listener)?;
538 listener.reuse_address(true)?;
539 listener.bind(&addr)?;
540 let listener = listener.listen(1024)?;
541 let listener = tokio::net::TcpListener::from_std(listener, &handle)?;
542 let local_addr = listener.local_addr()?;
546
547 Ok((listener, local_addr))
548 };
549
550 let bind_result = match bind() {
551 Ok((listener, local_addr)) => {
552 match local_addr_tx.send(Ok(local_addr)) {
554 Ok(_) => futures::future::ok((listener, local_addr)),
555 Err(_) => {
556 warn!(
557 "Thread {:?} unable to reach receiver, closing server",
558 thread::current().name()
559 );
560 futures::future::err(())
561 }
562 }
563 }
564 Err(err) => {
565 let _send_result = local_addr_tx.send(Err(err));
567
568 futures::future::err(())
569 }
570 };
571
572 bind_result
573 .and_then(move |(listener, local_addr)| {
574 let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
575
576 let mut http = server::conn::Http::new();
577 http.keep_alive(keep_alive);
578 let tcp_stream = SuspendableStream::new(listener.incoming());
579
580 tcp_stream
581 .map(move |socket| {
582 let service = ServerHandler::new(
583 tetsy_jsonrpc_handler.downgrade(),
584 cors_domains.clone(),
585 cors_max_age,
586 allowed_headers.clone(),
587 allowed_hosts.clone(),
588 request_middleware.clone(),
589 rest_api,
590 health_api.clone(),
591 max_request_body_size,
592 keep_alive,
593 );
594
595 tokio::spawn(
596 http.serve_connection(socket, service)
597 .map_err(|e| error!("Error serving connection: {:?}", e))
598 .then(|_| Ok(())),
599 )
600 })
601 .for_each(|_| Ok(()))
602 .map_err(|e| {
603 warn!("Incoming streams error, closing sever: {:?}", e);
604 })
605 .select(shutdown_signal.map_err(|e| {
606 debug!("Shutdown signaller dropped, closing server: {:?}", e);
607 }))
608 .map_err(|_| ())
609 })
610 .and_then(|(_, server)| {
611 drop(server);
614 done_tx.send(())
615 })
616 });
617}
618
619#[cfg(unix)]
620fn configure_port(reuse: bool, tcp: &net2::TcpBuilder) -> io::Result<()> {
621 use net2::unix::*;
622
623 if reuse {
624 tcp.reuse_port(true)?;
625 }
626
627 Ok(())
628}
629
630#[cfg(not(unix))]
631fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
632 Ok(())
633}
634
635#[derive(Clone)]
639pub struct CloseHandle(Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>);
640
641impl CloseHandle {
642 pub fn close(self) {
644 if let Some(executors) = self.0.lock().take() {
645 for (executor, closer) in executors {
646 executor.close();
647 let _ = closer.send(());
648 }
649 }
650 }
651}
652
653pub struct Server {
655 address: SocketAddr,
656 executors: Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>,
657 done: Option<Vec<oneshot::Receiver<()>>>,
658}
659
660impl Server {
661 pub fn address(&self) -> &SocketAddr {
663 &self.address
664 }
665
666 pub fn close(self) {
668 self.close_handle().close()
669 }
670
671 pub fn wait(mut self) {
673 self.wait_internal();
674 }
675
676 pub fn close_handle(&self) -> CloseHandle {
679 CloseHandle(self.executors.clone())
680 }
681
682 fn wait_internal(&mut self) {
683 if let Some(receivers) = self.done.take() {
684 for receiver in receivers {
685 let _ = receiver.wait();
686 }
687 }
688 }
689}
690
691impl Drop for Server {
692 fn drop(&mut self) {
693 self.close_handle().close();
694 self.wait_internal();
695 }
696}