tetsy_jsonrpc_http_server/
lib.rs

1//! jsonrpc http server.
2//!
3//! ```no_run
4//! use tetsy_jsonrpc_core::*;
5//! use tetsy_jsonrpc_http_server::*;
6//!
7//! fn main() {
8//! 	let mut io = IoHandler::new();
9//! 	io.add_method("say_hello", |_: Params| {
10//! 		Ok(Value::String("hello".to_string()))
11//! 	});
12//!
13//! 	let _server = ServerBuilder::new(io)
14//!		.start_http(&"127.0.0.1:3030".parse().unwrap())
15//!		.expect("Unable to start RPC server");
16//!
17//!	_server.wait();
18//! }
19//! ```
20
21#![deny(missing_docs)]
22
23use tetsy_jsonrpc_server_utils as server_utils;
24use net2;
25
26pub use hyper;
27pub use tetsy_jsonrpc_core;
28
29#[macro_use]
30extern crate log;
31
32mod handler;
33mod response;
34#[cfg(test)]
35mod tests;
36mod utils;
37
38use std::io;
39use std::net::SocketAddr;
40use std::sync::{mpsc, Arc, Weak};
41use std::thread;
42
43use parking_lot::Mutex;
44
45use crate::jsonrpc::futures::sync::oneshot;
46use crate::jsonrpc::futures::{self, Future, Stream};
47use crate::jsonrpc::MetaIoHandler;
48use crate::server_utils::reactor::{Executor, UninitializedExecutor};
49use hyper::{server, Body};
50use tetsy_jsonrpc_core as jsonrpc;
51
52pub use crate::handler::ServerHandler;
53pub use crate::response::Response;
54pub use crate::server_utils::cors::{self, AccessControlAllowOrigin, AllowCors, Origin};
55pub use crate::server_utils::hosts::{DomainsValidation, Host};
56pub use crate::server_utils::{tokio, SuspendableStream};
57pub use crate::utils::{cors_allow_headers, cors_allow_origin, is_host_allowed};
58
59/// Action undertaken by a middleware.
60pub enum RequestMiddlewareAction {
61	/// Proceed with standard RPC handling
62	Proceed {
63		/// Should the request be processed even if invalid CORS headers are detected?
64		/// This allows for side effects to take place.
65		should_continue_on_invalid_cors: bool,
66		/// The request object returned
67		request: hyper::Request<Body>,
68	},
69	/// Intercept the request and respond differently.
70	Respond {
71		/// Should standard hosts validation be performed?
72		should_validate_hosts: bool,
73		/// a future for server response
74		response: Box<dyn Future<Item = hyper::Response<Body>, Error = hyper::Error> + Send>,
75	},
76}
77
78impl From<Response> for RequestMiddlewareAction {
79	fn from(o: Response) -> Self {
80		RequestMiddlewareAction::Respond {
81			should_validate_hosts: true,
82			response: Box::new(futures::future::ok(o.into())),
83		}
84	}
85}
86
87impl From<hyper::Response<Body>> for RequestMiddlewareAction {
88	fn from(response: hyper::Response<Body>) -> Self {
89		RequestMiddlewareAction::Respond {
90			should_validate_hosts: true,
91			response: Box::new(futures::future::ok(response)),
92		}
93	}
94}
95
96impl From<hyper::Request<Body>> for RequestMiddlewareAction {
97	fn from(request: hyper::Request<Body>) -> Self {
98		RequestMiddlewareAction::Proceed {
99			should_continue_on_invalid_cors: false,
100			request,
101		}
102	}
103}
104
105/// Allows to intercept request and handle it differently.
106pub trait RequestMiddleware: Send + Sync + 'static {
107	/// Takes a request and decides how to proceed with it.
108	fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction;
109}
110
111impl<F> RequestMiddleware for F
112where
113	F: Fn(hyper::Request<Body>) -> RequestMiddlewareAction + Sync + Send + 'static,
114{
115	fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction {
116		(*self)(request)
117	}
118}
119
120#[derive(Default)]
121struct NoopRequestMiddleware;
122impl RequestMiddleware for NoopRequestMiddleware {
123	fn on_request(&self, request: hyper::Request<Body>) -> RequestMiddlewareAction {
124		RequestMiddlewareAction::Proceed {
125			should_continue_on_invalid_cors: false,
126			request,
127		}
128	}
129}
130
131/// Extracts metadata from the HTTP request.
132pub trait MetaExtractor<M: jsonrpc::Metadata>: Sync + Send + 'static {
133	/// Read the metadata from the request
134	fn read_metadata(&self, _: &hyper::Request<Body>) -> M;
135}
136
137impl<M, F> MetaExtractor<M> for F
138where
139	M: jsonrpc::Metadata,
140	F: Fn(&hyper::Request<Body>) -> M + Sync + Send + 'static,
141{
142	fn read_metadata(&self, req: &hyper::Request<Body>) -> M {
143		(*self)(req)
144	}
145}
146
147#[derive(Default)]
148struct NoopExtractor;
149impl<M: jsonrpc::Metadata + Default> MetaExtractor<M> for NoopExtractor {
150	fn read_metadata(&self, _: &hyper::Request<Body>) -> M {
151		M::default()
152	}
153}
154//
155/// RPC Handler bundled with metadata extractor.
156pub struct Rpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
157	/// RPC Handler
158	pub handler: Arc<MetaIoHandler<M, S>>,
159	/// Metadata extractor
160	pub extractor: Arc<dyn MetaExtractor<M>>,
161}
162
163impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for Rpc<M, S> {
164	fn clone(&self) -> Self {
165		Rpc {
166			handler: self.handler.clone(),
167			extractor: self.extractor.clone(),
168		}
169	}
170}
171
172impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Rpc<M, S> {
173	/// Downgrade the `Rpc` to `WeakRpc`.
174	///
175	/// Downgrades internal `Arc`s to `Weak` references.
176	pub fn downgrade(&self) -> WeakRpc<M, S> {
177		WeakRpc {
178			handler: Arc::downgrade(&self.handler),
179			extractor: Arc::downgrade(&self.extractor),
180		}
181	}
182}
183/// A weak handle to the RPC server.
184///
185/// Since request handling futures are spawned directly on the executor,
186/// whenever the server is closed we want to make sure that existing
187/// tasks are not blocking the server and are dropped as soon as the server stops.
188pub struct WeakRpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
189	handler: Weak<MetaIoHandler<M, S>>,
190	extractor: Weak<dyn MetaExtractor<M>>,
191}
192
193impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for WeakRpc<M, S> {
194	fn clone(&self) -> Self {
195		WeakRpc {
196			handler: self.handler.clone(),
197			extractor: self.extractor.clone(),
198		}
199	}
200}
201
202impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> WeakRpc<M, S> {
203	/// Upgrade the handle to a strong one (`Rpc`) if  possible.
204	pub fn upgrade(&self) -> Option<Rpc<M, S>> {
205		let handler = self.handler.upgrade()?;
206		let extractor = self.extractor.upgrade()?;
207
208		Some(Rpc { handler, extractor })
209	}
210}
211
212type AllowedHosts = Option<Vec<Host>>;
213type CorsDomains = Option<Vec<AccessControlAllowOrigin>>;
214
215/// REST -> RPC converter state.
216#[derive(Debug, PartialEq, Clone, Copy)]
217pub enum RestApi {
218	/// The REST -> RPC converter is enabled
219	/// and requires `Content-Type: application/json` header
220	/// (even though the body should be empty).
221	/// This protects from submitting an RPC call
222	/// from unwanted origins.
223	Secure,
224	/// The REST -> RPC converter is enabled
225	/// and does not require any `Content-Type` headers.
226	/// NOTE: This allows sending RPCs via HTTP forms
227	/// from any website.
228	Unsecure,
229	/// The REST -> RPC converter is disabled.
230	Disabled,
231}
232
233/// Convenient JSON-RPC HTTP Server builder.
234pub struct ServerBuilder<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::middleware::Noop> {
235	handler: Arc<MetaIoHandler<M, S>>,
236	executor: UninitializedExecutor,
237	meta_extractor: Arc<dyn MetaExtractor<M>>,
238	request_middleware: Arc<dyn RequestMiddleware>,
239	cors_domains: CorsDomains,
240	cors_max_age: Option<u32>,
241	allowed_headers: cors::AccessControlAllowHeaders,
242	allowed_hosts: AllowedHosts,
243	rest_api: RestApi,
244	health_api: Option<(String, String)>,
245	keep_alive: bool,
246	threads: usize,
247	max_request_body_size: usize,
248}
249
250impl<M: jsonrpc::Metadata + Default, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
251	/// Creates new `ServerBuilder` for given `IoHandler`.
252	///
253	/// By default:
254	/// 1. Server is not sending any CORS headers.
255	/// 2. Server is validating `Host` header.
256	pub fn new<T>(handler: T) -> Self
257	where
258		T: Into<MetaIoHandler<M, S>>,
259	{
260		Self::with_meta_extractor(handler, NoopExtractor)
261	}
262}
263
264impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
265	/// Creates new `ServerBuilder` for given `IoHandler`.
266	///
267	/// By default:
268	/// 1. Server is not sending any CORS headers.
269	/// 2. Server is validating `Host` header.
270	pub fn with_meta_extractor<T, E>(handler: T, extractor: E) -> Self
271	where
272		T: Into<MetaIoHandler<M, S>>,
273		E: MetaExtractor<M>,
274	{
275		ServerBuilder {
276			handler: Arc::new(handler.into()),
277			executor: UninitializedExecutor::Unspawned,
278			meta_extractor: Arc::new(extractor),
279			request_middleware: Arc::new(NoopRequestMiddleware::default()),
280			cors_domains: None,
281			cors_max_age: None,
282			allowed_headers: cors::AccessControlAllowHeaders::Any,
283			allowed_hosts: None,
284			rest_api: RestApi::Disabled,
285			health_api: None,
286			keep_alive: true,
287			threads: 1,
288			max_request_body_size: 5 * 1024 * 1024,
289		}
290	}
291
292	/// Utilize existing event loop executor to poll RPC results.
293	///
294	/// Applies only to 1 of the threads. Other threads will spawn their own Event Loops.
295	pub fn event_loop_executor(mut self, executor: tokio::runtime::TaskExecutor) -> Self {
296		self.executor = UninitializedExecutor::Shared(executor);
297		self
298	}
299
300	/// Enable the REST -> RPC converter.
301	///
302	/// Allows you to invoke RPCs by sending `POST /<method>/<param1>/<param2>` requests
303	/// (with no body). Disabled by default.
304	pub fn rest_api(mut self, rest_api: RestApi) -> Self {
305		self.rest_api = rest_api;
306		self
307	}
308
309	/// Enable health endpoint.
310	///
311	/// Allows you to expose one of the methods under `GET /<path>`
312	/// The method will be invoked with no parameters.
313	/// Error returned from the method will be converted to status `500` response.
314	///
315	/// Expects a tuple with `(<path>, <rpc-method-name>)`.
316	pub fn health_api<A, B, T>(mut self, health_api: T) -> Self
317	where
318		T: Into<Option<(A, B)>>,
319		A: Into<String>,
320		B: Into<String>,
321	{
322		self.health_api = health_api.into().map(|(a, b)| (a.into(), b.into()));
323		self
324	}
325
326	/// Enables or disables HTTP keep-alive.
327	///
328	/// Default is true.
329	pub fn keep_alive(mut self, val: bool) -> Self {
330		self.keep_alive = val;
331		self
332	}
333
334	/// Sets number of threads of the server to run.
335	///
336	/// Panics when set to `0`.
337	#[cfg(not(unix))]
338	#[allow(unused_mut)]
339	pub fn threads(mut self, _threads: usize) -> Self {
340		warn!("Multi-threaded server is not available on Windows. Falling back to single thread.");
341		self
342	}
343
344	/// Sets number of threads of the server to run.
345	///
346	/// Panics when set to `0`.
347	/// The first thread will use provided `Executor` instance
348	/// and all other threads will use `UninitializedExecutor` to spawn
349	/// a new runtime for futures.
350	/// So it's also possible to run a multi-threaded server by
351	/// passing the default `tokio::runtime` executor to this builder
352	/// and setting `threads` to 1.
353	#[cfg(unix)]
354	pub fn threads(mut self, threads: usize) -> Self {
355		self.threads = threads;
356		self
357	}
358
359	/// Configures a list of allowed CORS origins.
360	pub fn cors(mut self, cors_domains: DomainsValidation<AccessControlAllowOrigin>) -> Self {
361		self.cors_domains = cors_domains.into();
362		self
363	}
364
365	/// Configure CORS `AccessControlMaxAge` header returned.
366	///
367	/// Informs the client that the CORS preflight request is not necessary for `cors_max_age` seconds.
368	/// Disabled by default.
369	pub fn cors_max_age<T: Into<Option<u32>>>(mut self, cors_max_age: T) -> Self {
370		self.cors_max_age = cors_max_age.into();
371		self
372	}
373
374	/// Configure the CORS `AccessControlAllowHeaders` header which are allowed.
375	pub fn cors_allow_headers(mut self, allowed_headers: cors::AccessControlAllowHeaders) -> Self {
376		self.allowed_headers = allowed_headers;
377		self
378	}
379
380	/// Configures request middleware
381	pub fn request_middleware<T: RequestMiddleware>(mut self, middleware: T) -> Self {
382		self.request_middleware = Arc::new(middleware);
383		self
384	}
385
386	/// Configures metadata extractor
387	pub fn meta_extractor<T: MetaExtractor<M>>(mut self, extractor: T) -> Self {
388		self.meta_extractor = Arc::new(extractor);
389		self
390	}
391
392	/// Allow connections only with `Host` header set to binding address.
393	pub fn allow_only_bind_host(mut self) -> Self {
394		self.allowed_hosts = Some(Vec::new());
395		self
396	}
397
398	/// Specify a list of valid `Host` headers. Binding address is allowed automatically.
399	pub fn allowed_hosts(mut self, allowed_hosts: DomainsValidation<Host>) -> Self {
400		self.allowed_hosts = allowed_hosts.into();
401		self
402	}
403
404	/// Sets the maximum size of a request body in bytes (default is 5 MiB).
405	pub fn max_request_body_size(mut self, val: usize) -> Self {
406		self.max_request_body_size = val;
407		self
408	}
409
410	/// Start this JSON-RPC HTTP server trying to bind to specified `SocketAddr`.
411	pub fn start_http(self, addr: &SocketAddr) -> io::Result<Server> {
412		let cors_domains = self.cors_domains;
413		let cors_max_age = self.cors_max_age;
414		let allowed_headers = self.allowed_headers;
415		let request_middleware = self.request_middleware;
416		let allowed_hosts = self.allowed_hosts;
417		let tetsy_jsonrpc_handler = Rpc {
418			handler: self.handler,
419			extractor: self.meta_extractor,
420		};
421		let rest_api = self.rest_api;
422		let health_api = self.health_api;
423		let keep_alive = self.keep_alive;
424		let reuse_port = self.threads > 1;
425
426		let (local_addr_tx, local_addr_rx) = mpsc::channel();
427		let (close, shutdown_signal) = oneshot::channel();
428		let (done_tx, done_rx) = oneshot::channel();
429		let eloop = self.executor.init_with_name("http.worker0")?;
430		let req_max_size = self.max_request_body_size;
431		// The first threads `Executor` is initialised differently from the others
432		serve(
433			(shutdown_signal, local_addr_tx, done_tx),
434			eloop.executor(),
435			addr.to_owned(),
436			cors_domains.clone(),
437			cors_max_age,
438			allowed_headers.clone(),
439			request_middleware.clone(),
440			allowed_hosts.clone(),
441			tetsy_jsonrpc_handler.clone(),
442			rest_api,
443			health_api.clone(),
444			keep_alive,
445			reuse_port,
446			req_max_size,
447		);
448		let handles = (0..self.threads - 1)
449			.map(|i| {
450				let (local_addr_tx, local_addr_rx) = mpsc::channel();
451				let (close, shutdown_signal) = oneshot::channel();
452				let (done_tx, done_rx) = oneshot::channel();
453				let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
454				serve(
455					(shutdown_signal, local_addr_tx, done_tx),
456					eloop.executor(),
457					addr.to_owned(),
458					cors_domains.clone(),
459					cors_max_age,
460					allowed_headers.clone(),
461					request_middleware.clone(),
462					allowed_hosts.clone(),
463					tetsy_jsonrpc_handler.clone(),
464					rest_api,
465					health_api.clone(),
466					keep_alive,
467					reuse_port,
468					req_max_size,
469				);
470				Ok((eloop, close, local_addr_rx, done_rx))
471			})
472			.collect::<io::Result<Vec<_>>>()?;
473
474		// Wait for server initialization
475		let local_addr = recv_address(local_addr_rx);
476		// Wait for other threads as well.
477		let mut handles: Vec<(Executor, oneshot::Sender<()>, oneshot::Receiver<()>)> = handles
478			.into_iter()
479			.map(|(eloop, close, local_addr_rx, done_rx)| {
480				let _ = recv_address(local_addr_rx)?;
481				Ok((eloop, close, done_rx))
482			})
483			.collect::<io::Result<Vec<_>>>()?;
484		handles.push((eloop, close, done_rx));
485
486		let (executors, done_rxs) = handles
487			.into_iter()
488			.fold((vec![], vec![]), |mut acc, (eloop, closer, done_rx)| {
489				acc.0.push((eloop, closer));
490				acc.1.push(done_rx);
491				acc
492			});
493
494		Ok(Server {
495			address: local_addr?,
496			executors: Arc::new(Mutex::new(Some(executors))),
497			done: Some(done_rxs),
498		})
499	}
500}
501
502fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Result<SocketAddr> {
503	local_addr_rx
504		.recv()
505		.map_err(|_| io::Error::new(io::ErrorKind::Interrupted, ""))?
506}
507
508fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
509	signals: (
510		oneshot::Receiver<()>,
511		mpsc::Sender<io::Result<SocketAddr>>,
512		oneshot::Sender<()>,
513	),
514	executor: tokio::runtime::TaskExecutor,
515	addr: SocketAddr,
516	cors_domains: CorsDomains,
517	cors_max_age: Option<u32>,
518	allowed_headers: cors::AccessControlAllowHeaders,
519	request_middleware: Arc<dyn RequestMiddleware>,
520	allowed_hosts: AllowedHosts,
521	tetsy_jsonrpc_handler: Rpc<M, S>,
522	rest_api: RestApi,
523	health_api: Option<(String, String)>,
524	keep_alive: bool,
525	reuse_port: bool,
526	max_request_body_size: usize,
527) {
528	let (shutdown_signal, local_addr_tx, done_tx) = signals;
529	executor.spawn({
530		let handle = tokio::reactor::Handle::default();
531
532		let bind = move || {
533			let listener = match addr {
534				SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
535				SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
536			};
537			configure_port(reuse_port, &listener)?;
538			listener.reuse_address(true)?;
539			listener.bind(&addr)?;
540			let listener = listener.listen(1024)?;
541			let listener = tokio::net::TcpListener::from_std(listener, &handle)?;
542			// Add current host to allowed headers.
543			// NOTE: we need to use `l.local_addr()` instead of `addr`
544			// it might be different!
545			let local_addr = listener.local_addr()?;
546
547			Ok((listener, local_addr))
548		};
549
550		let bind_result = match bind() {
551			Ok((listener, local_addr)) => {
552				// Send local address
553				match local_addr_tx.send(Ok(local_addr)) {
554					Ok(_) => futures::future::ok((listener, local_addr)),
555					Err(_) => {
556						warn!(
557							"Thread {:?} unable to reach receiver, closing server",
558							thread::current().name()
559						);
560						futures::future::err(())
561					}
562				}
563			}
564			Err(err) => {
565				// Send error
566				let _send_result = local_addr_tx.send(Err(err));
567
568				futures::future::err(())
569			}
570		};
571
572		bind_result
573			.and_then(move |(listener, local_addr)| {
574				let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
575
576				let mut http = server::conn::Http::new();
577				http.keep_alive(keep_alive);
578				let tcp_stream = SuspendableStream::new(listener.incoming());
579
580				tcp_stream
581					.map(move |socket| {
582						let service = ServerHandler::new(
583							tetsy_jsonrpc_handler.downgrade(),
584							cors_domains.clone(),
585							cors_max_age,
586							allowed_headers.clone(),
587							allowed_hosts.clone(),
588							request_middleware.clone(),
589							rest_api,
590							health_api.clone(),
591							max_request_body_size,
592							keep_alive,
593						);
594
595						tokio::spawn(
596							http.serve_connection(socket, service)
597								.map_err(|e| error!("Error serving connection: {:?}", e))
598								.then(|_| Ok(())),
599						)
600					})
601					.for_each(|_| Ok(()))
602					.map_err(|e| {
603						warn!("Incoming streams error, closing sever: {:?}", e);
604					})
605					.select(shutdown_signal.map_err(|e| {
606						debug!("Shutdown signaller dropped, closing server: {:?}", e);
607					}))
608					.map_err(|_| ())
609			})
610			.and_then(|(_, server)| {
611				// We drop the server first to prevent a situation where main thread terminates
612				// before the server is properly dropped (see #504 for more details)
613				drop(server);
614				done_tx.send(())
615			})
616	});
617}
618
619#[cfg(unix)]
620fn configure_port(reuse: bool, tcp: &net2::TcpBuilder) -> io::Result<()> {
621	use net2::unix::*;
622
623	if reuse {
624		tcp.reuse_port(true)?;
625	}
626
627	Ok(())
628}
629
630#[cfg(not(unix))]
631fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
632	Ok(())
633}
634
635/// Handle used to close the server. Can be cloned and passed around to different threads and be used
636/// to close a server that is `wait()`ing.
637
638#[derive(Clone)]
639pub struct CloseHandle(Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>);
640
641impl CloseHandle {
642	/// Shutdown a running server
643	pub fn close(self) {
644		if let Some(executors) = self.0.lock().take() {
645			for (executor, closer) in executors {
646				executor.close();
647				let _ = closer.send(());
648			}
649		}
650	}
651}
652
653/// jsonrpc http server instance
654pub struct Server {
655	address: SocketAddr,
656	executors: Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>,
657	done: Option<Vec<oneshot::Receiver<()>>>,
658}
659
660impl Server {
661	/// Returns address of this server
662	pub fn address(&self) -> &SocketAddr {
663		&self.address
664	}
665
666	/// Closes the server.
667	pub fn close(self) {
668		self.close_handle().close()
669	}
670
671	/// Will block, waiting for the server to finish.
672	pub fn wait(mut self) {
673		self.wait_internal();
674	}
675
676	/// Get a handle that allows us to close the server from a different thread and/or while the
677	/// server is `wait()`ing.
678	pub fn close_handle(&self) -> CloseHandle {
679		CloseHandle(self.executors.clone())
680	}
681
682	fn wait_internal(&mut self) {
683		if let Some(receivers) = self.done.take() {
684			for receiver in receivers {
685				let _ = receiver.wait();
686			}
687		}
688	}
689}
690
691impl Drop for Server {
692	fn drop(&mut self) {
693		self.close_handle().close();
694		self.wait_internal();
695	}
696}