bitconch_jsonrpc_http_server/
lib.rs

1//! jsonrpc http server.
2//!
3//! ```no_run
4//! extern crate bitconch_jsonrpc_core as jsonrpc_core;
5//! extern crate bitconch_jsonrpc_http_server as jsonrpc_http_server;
6//!
7//! use jsonrpc_core::*;
8//! use jsonrpc_http_server::*;
9//!
10//! fn main() {
11//! 	let mut io = IoHandler::new();
12//! 	io.add_method("say_hello", |_: Params| {
13//! 		Ok(Value::String("hello".to_string()))
14//! 	});
15//!
16//! 	let _server = ServerBuilder::new(io)
17//!		.start_http(&"127.0.0.1:3030".parse().unwrap())
18//!		.expect("Unable to start RPC server");
19//!
20//!	_server.wait();
21//! }
22//! ```
23
24#![warn(missing_docs)]
25
26extern crate unicase;
27extern crate bitconch_jsonrpc_server_utils as server_utils;
28extern crate net2;
29
30pub extern crate bitconch_jsonrpc_core as jsonrpc_core;
31pub extern crate hyper;
32
33#[macro_use]
34extern crate log;
35
36mod handler;
37mod response;
38mod utils;
39#[cfg(test)]
40mod tests;
41
42use std::io;
43use std::sync::{mpsc, Arc};
44use std::net::SocketAddr;
45use std::thread;
46
47use hyper::{server, Body};
48use jsonrpc_core as jsonrpc;
49use jsonrpc::MetaIoHandler;
50use jsonrpc::futures::{self, Future, Stream, future};
51use jsonrpc::futures::sync::oneshot;
52use server_utils::reactor::{Executor, UninitializedExecutor};
53
54pub use server_utils::hosts::{Host, DomainsValidation};
55pub use server_utils::cors::{self, AccessControlAllowOrigin, Origin, AllowCors};
56pub use server_utils::tokio;
57pub use handler::ServerHandler;
58pub use utils::{is_host_allowed, cors_allow_origin, cors_allow_headers};
59pub use response::Response;
60
61/// Action undertaken by a middleware.
62pub enum RequestMiddlewareAction {
63	/// Proceed with standard RPC handling
64	Proceed {
65		/// Should the request be processed even if invalid CORS headers are detected?
66		/// This allows for side effects to take place.
67		should_continue_on_invalid_cors: bool,
68		/// The request object returned
69		request: hyper::Request<Body>,
70	},
71	/// Intercept the request and respond differently.
72	Respond {
73		/// Should standard hosts validation be performed?
74		should_validate_hosts: bool,
75		/// a future for server response
76		response: Box<Future<Item=hyper::Response<Body>, Error=hyper::Error> + Send>,
77	}
78}
79
80impl From<Response> for RequestMiddlewareAction {
81	fn from(o: Response) -> Self {
82		RequestMiddlewareAction::Respond {
83			should_validate_hosts: true,
84			response: Box::new(futures::future::ok(o.into())),
85		}
86	}
87}
88
89impl From<hyper::Response<Body>> for RequestMiddlewareAction {
90	fn from(response: hyper::Response<Body>) -> Self {
91		RequestMiddlewareAction::Respond {
92			should_validate_hosts: true,
93			response: Box::new(futures::future::ok(response)),
94		}
95	}
96}
97
98impl From<hyper::Request<Body>> for RequestMiddlewareAction {
99	fn from(request: hyper::Request<Body>) -> Self {
100		RequestMiddlewareAction::Proceed {
101			should_continue_on_invalid_cors: false,
102			request,
103		}
104	}
105}
106
107/// Allows to intercept request and handle it differently.
108pub trait RequestMiddleware: Send + Sync + 'static {
109	/// Takes a request and decides how to proceed with it.
110	fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction;
111}
112
113impl<F> RequestMiddleware for F where
114	F: Fn(hyper::Request<Body>) -> RequestMiddlewareAction + Sync + Send + 'static,
115{
116	fn on_request(&self, request: hyper::Request<hyper::Body>) -> RequestMiddlewareAction {
117		(*self)(request)
118	}
119}
120
121#[derive(Default)]
122struct NoopRequestMiddleware;
123impl RequestMiddleware for NoopRequestMiddleware {
124	fn on_request(&self, request: hyper::Request<Body>) -> RequestMiddlewareAction {
125		RequestMiddlewareAction::Proceed {
126			should_continue_on_invalid_cors: false,
127			request,
128		}
129	}
130}
131
132/// Extracts metadata from the HTTP request.
133pub trait MetaExtractor<M: jsonrpc::Metadata>: Sync + Send + 'static {
134	/// Read the metadata from the request
135	fn read_metadata(&self, _: &hyper::Request<Body>) -> M;
136}
137
138impl<M, F> MetaExtractor<M> for F where
139	M: jsonrpc::Metadata,
140	F: Fn(&hyper::Request<Body>) -> M + Sync + Send + 'static,
141{
142	fn read_metadata(&self, req: &hyper::Request<Body>) -> M {
143		(*self)(req)
144	}
145}
146
147#[derive(Default)]
148struct NoopExtractor;
149impl<M: jsonrpc::Metadata + Default> MetaExtractor<M> for NoopExtractor {
150	fn read_metadata(&self, _: &hyper::Request<Body>) -> M {
151		M::default()
152	}
153}
154//
155/// RPC Handler bundled with metadata extractor.
156pub struct Rpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
157	/// RPC Handler
158	pub handler: Arc<MetaIoHandler<M, S>>,
159	/// Metadata extractor
160	pub extractor: Arc<MetaExtractor<M>>,
161}
162
163impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for Rpc<M, S> {
164	fn clone(&self) -> Self {
165		Rpc {
166			handler: self.handler.clone(),
167			extractor: self.extractor.clone(),
168		}
169	}
170}
171
172type AllowedHosts = Option<Vec<Host>>;
173type CorsDomains = Option<Vec<AccessControlAllowOrigin>>;
174
175/// REST -> RPC converter state.
176#[derive(Debug, PartialEq, Clone, Copy)]
177pub enum RestApi {
178	/// The REST -> RPC converter is enabled
179	/// and requires `Content-Type: application/json` header
180	/// (even though the body should be empty).
181	/// This protects from submitting an RPC call
182	/// from unwanted origins.
183	Secure,
184	/// The REST -> RPC converter is enabled
185	/// and does not require any `Content-Type` headers.
186	/// NOTE: This allows sending RPCs via HTTP forms
187	/// from any website.
188	Unsecure,
189	/// The REST -> RPC converter is disabled.
190	Disabled,
191}
192
193/// Convenient JSON-RPC HTTP Server builder.
194pub struct ServerBuilder<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
195	handler: Arc<MetaIoHandler<M, S>>,
196	executor: UninitializedExecutor,
197	meta_extractor: Arc<MetaExtractor<M>>,
198	request_middleware: Arc<RequestMiddleware>,
199	cors_domains: CorsDomains,
200	cors_max_age: Option<u32>,
201	allowed_headers: cors::AccessControlAllowHeaders,
202	allowed_hosts: AllowedHosts,
203	rest_api: RestApi,
204	health_api: Option<(String, String)>,
205	keep_alive: bool,
206	threads: usize,
207	max_request_body_size: usize,
208}
209
210impl<M: jsonrpc::Metadata + Default, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
211	/// Creates new `ServerBuilder` for given `IoHandler`.
212	///
213	/// By default:
214	/// 1. Server is not sending any CORS headers.
215	/// 2. Server is validating `Host` header.
216	pub fn new<T>(handler: T) -> Self where
217		T: Into<MetaIoHandler<M, S>>
218	{
219		Self::with_meta_extractor(handler, NoopExtractor)
220	}
221}
222
223impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
224	/// Creates new `ServerBuilder` for given `IoHandler`.
225	///
226	/// By default:
227	/// 1. Server is not sending any CORS headers.
228	/// 2. Server is validating `Host` header.
229	pub fn with_meta_extractor<T, E>(handler: T, extractor: E) -> Self where
230		T: Into<MetaIoHandler<M, S>>,
231		E: MetaExtractor<M>,
232	{
233		ServerBuilder {
234			handler: Arc::new(handler.into()),
235			executor: UninitializedExecutor::Unspawned,
236			meta_extractor: Arc::new(extractor),
237			request_middleware: Arc::new(NoopRequestMiddleware::default()),
238			cors_domains: None,
239			cors_max_age: None,
240			allowed_headers: cors::AccessControlAllowHeaders::Any,
241			allowed_hosts: None,
242			rest_api: RestApi::Disabled,
243			health_api: None,
244			keep_alive: true,
245			threads: 1,
246			max_request_body_size: 5 * 1024 * 1024,
247		}
248	}
249
250	/// Utilize existing event loop executor to poll RPC results.
251	///
252	/// Applies only to 1 of the threads. Other threads will spawn their own Event Loops.
253	pub fn event_loop_executor(mut self, executor: tokio::runtime::TaskExecutor) -> Self {
254		self.executor = UninitializedExecutor::Shared(executor);
255		self
256	}
257
258	/// Enable the REST -> RPC converter.
259	///
260	/// Allows you to invoke RPCs by sending `POST /<method>/<param1>/<param2>` requests
261	/// (with no body). Disabled by default.
262	pub fn rest_api(mut self, rest_api: RestApi) -> Self {
263		self.rest_api = rest_api;
264		self
265	}
266
267	/// Enable health endpoint.
268	///
269	/// Allows you to expose one of the methods under `GET /<path>`
270	/// The method will be invoked with no parameters.
271	/// Error returned from the method will be converted to status `500` response.
272	///
273	/// Expects a tuple with `(<path>, <rpc-method-name>)`.
274	pub fn health_api<A, B, T>(mut self, health_api: T) -> Self where
275		T: Into<Option<(A, B)>>,
276		A: Into<String>,
277		B: Into<String>,
278	{
279		self.health_api = health_api.into().map(|(a, b)| (a.into(), b.into()));
280		self
281	}
282
283	/// Enables or disables HTTP keep-alive.
284	///
285	/// Default is true.
286	pub fn keep_alive(mut self, val: bool) -> Self {
287		self.keep_alive  = val;
288		self
289	}
290
291	/// Sets number of threads of the server to run.
292	///
293	/// Panics when set to `0`.
294	#[cfg(not(unix))]
295	pub fn threads(mut self, _threads: usize) -> Self {
296		warn!("Multi-threaded server is not available on Windows. Falling back to single thread.");
297		self
298	}
299
300	/// Sets number of threads of the server to run.
301	///
302	/// Panics when set to `0`.
303	#[cfg(unix)]
304	pub fn threads(mut self, threads: usize) -> Self {
305		self.threads = threads;
306		self
307	}
308
309	/// Configures a list of allowed CORS origins.
310	pub fn cors(mut self, cors_domains: DomainsValidation<AccessControlAllowOrigin>) -> Self {
311		self.cors_domains = cors_domains.into();
312		self
313	}
314
315	/// Configure CORS `AccessControlMaxAge` header returned.
316	///
317	/// Passing `Some(millis)` informs the client that the CORS preflight request is not necessary
318	/// for at least `millis` ms.
319	/// Disabled by default.
320	pub fn cors_max_age<T: Into<Option<u32>>>(mut self, cors_max_age: T) -> Self {
321		self.cors_max_age = cors_max_age.into();
322		self
323	}
324
325	/// Configure the CORS `AccessControlAllowHeaders` header which are allowed.
326	pub fn cors_allow_headers(mut self, allowed_headers: cors::AccessControlAllowHeaders) -> Self {
327		self.allowed_headers = allowed_headers.into();
328		self
329	}
330
331	/// Configures request middleware
332	pub fn request_middleware<T: RequestMiddleware>(mut self, middleware: T) -> Self {
333		self.request_middleware = Arc::new(middleware);
334		self
335	}
336
337	/// Configures metadata extractor
338	pub fn meta_extractor<T: MetaExtractor<M>>(mut self, extractor: T) -> Self {
339		self.meta_extractor = Arc::new(extractor);
340		self
341	}
342
343	/// Allow connections only with `Host` header set to binding address.
344	pub fn allow_only_bind_host(mut self) -> Self {
345		self.allowed_hosts = Some(Vec::new());
346		self
347	}
348
349	/// Specify a list of valid `Host` headers. Binding address is allowed automatically.
350	pub fn allowed_hosts(mut self, allowed_hosts: DomainsValidation<Host>) -> Self {
351		self.allowed_hosts = allowed_hosts.into();
352		self
353	}
354
355	/// Sets the maximum size of a request body in bytes (default is 5 MiB).
356	pub fn max_request_body_size(mut self, val: usize) -> Self {
357		self.max_request_body_size = val;
358		self
359	}
360
361	/// Start this JSON-RPC HTTP server trying to bind to specified `SocketAddr`.
362	pub fn start_http(self, addr: &SocketAddr) -> io::Result<Server> {
363		let cors_domains = self.cors_domains;
364		let cors_max_age = self.cors_max_age;
365		let allowed_headers = self.allowed_headers;
366		let request_middleware = self.request_middleware;
367		let allowed_hosts = self.allowed_hosts;
368		let jsonrpc_handler = Rpc {
369			handler: self.handler,
370			extractor: self.meta_extractor,
371		};
372		let rest_api = self.rest_api;
373		let health_api = self.health_api;
374		let keep_alive = self.keep_alive;
375		let reuse_port = self.threads > 1;
376
377		let (local_addr_tx, local_addr_rx) = mpsc::channel();
378		let (close, shutdown_signal) = oneshot::channel();
379		let eloop = self.executor.init_with_name("http.worker0")?;
380		let req_max_size = self.max_request_body_size;
381		serve(
382			(shutdown_signal, local_addr_tx),
383			eloop.executor(),
384			addr.to_owned(),
385			cors_domains.clone(),
386			cors_max_age,
387			allowed_headers.clone(),
388			request_middleware.clone(),
389			allowed_hosts.clone(),
390			jsonrpc_handler.clone(),
391			rest_api,
392			health_api.clone(),
393			keep_alive,
394			reuse_port,
395			req_max_size,
396		);
397		let handles = (0..self.threads - 1).map(|i| {
398			let (local_addr_tx, local_addr_rx) = mpsc::channel();
399			let (close, shutdown_signal) = oneshot::channel();
400			let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
401			serve(
402				(shutdown_signal, local_addr_tx),
403				eloop.executor(),
404				addr.to_owned(),
405				cors_domains.clone(),
406				cors_max_age,
407				allowed_headers.clone(),
408				request_middleware.clone(),
409				allowed_hosts.clone(),
410				jsonrpc_handler.clone(),
411				rest_api,
412				health_api.clone(),
413				keep_alive,
414				reuse_port,
415				req_max_size,
416			);
417			Ok((eloop, close, local_addr_rx))
418		}).collect::<io::Result<Vec<_>>>()?;
419
420		// Wait for server initialization
421		let local_addr = recv_address(local_addr_rx);
422		// Wait for other threads as well.
423		let mut handles = handles.into_iter().map(|(eloop, close, local_addr_rx)| {
424			let _ = recv_address(local_addr_rx)?;
425			Ok((eloop, close))
426		}).collect::<io::Result<(Vec<_>)>>()?;
427		handles.push((eloop, close));
428		let (executors, close) = handles.into_iter().unzip();
429
430		Ok(Server {
431			address: local_addr?,
432			executor: Some(executors),
433			close: Some(close),
434		})
435	}
436}
437
438fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Result<SocketAddr> {
439	local_addr_rx.recv().map_err(|_| {
440		io::Error::new(io::ErrorKind::Interrupted, "")
441	})?
442}
443
444fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
445	signals: (oneshot::Receiver<()>, mpsc::Sender<io::Result<SocketAddr>>),
446	executor: tokio::runtime::TaskExecutor,
447	addr: SocketAddr,
448	cors_domains: CorsDomains,
449	cors_max_age: Option<u32>,
450	allowed_headers: cors::AccessControlAllowHeaders,
451	request_middleware: Arc<RequestMiddleware>,
452	allowed_hosts: AllowedHosts,
453	jsonrpc_handler: Rpc<M, S>,
454	rest_api: RestApi,
455	health_api: Option<(String, String)>,
456	keep_alive: bool,
457	reuse_port: bool,
458	max_request_body_size: usize,
459) {
460	let (shutdown_signal, local_addr_tx) = signals;
461	executor.spawn(future::lazy(move || {
462		let handle = tokio::reactor::Handle::current();
463
464		let bind = move || {
465			let listener = match addr {
466				SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
467				SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
468			};
469			configure_port(reuse_port, &listener)?;
470			listener.reuse_address(true)?;
471			listener.bind(&addr)?;
472			let listener = listener.listen(1024)?;
473			let listener = tokio::net::TcpListener::from_std(listener, &handle)?;
474			// Add current host to allowed headers.
475			// NOTE: we need to use `l.local_addr()` instead of `addr`
476			// it might be different!
477			let local_addr = listener.local_addr()?;
478
479			Ok((listener, local_addr))
480		};
481
482		let bind_result = match bind() {
483			Ok((listener, local_addr)) => {
484				// Send local address
485				match local_addr_tx.send(Ok(local_addr)) {
486					Ok(_) => futures::future::ok((listener, local_addr)),
487					Err(_) => {
488						warn!("Thread {:?} unable to reach receiver, closing server", thread::current().name());
489						futures::future::err(())
490					},
491				}
492			},
493			Err(err) => {
494				// Send error
495				let _send_result = local_addr_tx.send(Err(err));
496
497				futures::future::err(())
498			}
499		};
500
501		bind_result.and_then(move |(listener, local_addr)| {
502			let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
503
504			let mut http = server::conn::Http::new();
505			http.keep_alive(keep_alive);
506
507			listener.incoming()
508				.for_each(move |socket| {
509					let service = ServerHandler::new(
510						jsonrpc_handler.clone(),
511						cors_domains.clone(),
512						cors_max_age,
513						allowed_headers.clone(),
514						allowed_hosts.clone(),
515						request_middleware.clone(),
516						rest_api,
517						health_api.clone(),
518						max_request_body_size,
519					);
520					tokio::spawn(http.serve_connection(socket, service)
521						.map_err(|e| error!("Error serving connection: {:?}", e)));
522					Ok(())
523				})
524				.map_err(|e| {
525					warn!("Incoming streams error, closing sever: {:?}", e);
526				})
527				.select(shutdown_signal.map_err(|e| {
528					debug!("Shutdown signaller dropped, closing server: {:?}", e);
529				}))
530				.map(|_| ())
531				.map_err(|_| ())
532		})
533	}));
534}
535
536#[cfg(unix)]
537fn configure_port(reuse: bool, tcp: &net2::TcpBuilder) -> io::Result<()> {
538	use net2::unix::*;
539
540	if reuse {
541		try!(tcp.reuse_port(true));
542	}
543
544	Ok(())
545}
546
547#[cfg(not(unix))]
548fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
549    Ok(())
550}
551
552/// jsonrpc http server instance
553pub struct Server {
554	address: SocketAddr,
555	executor: Option<Vec<Executor>>,
556	close: Option<Vec<oneshot::Sender<()>>>,
557}
558
559const PROOF: &'static str = "Server is always Some until self is consumed.";
560impl Server {
561	/// Returns address of this server
562	pub fn address(&self) -> &SocketAddr {
563		&self.address
564	}
565
566	/// Closes the server.
567	pub fn close(mut self) {
568		for close in self.close.take().expect(PROOF) {
569			let _ = close.send(());
570		}
571
572		for executor in self.executor.take().expect(PROOF) {
573			executor.close();
574		}
575	}
576
577	/// Will block, waiting for the server to finish.
578	pub fn wait(mut self) {
579		for executor in self.executor.take().expect(PROOF) {
580			executor.wait();
581		}
582	}
583}
584
585impl Drop for Server {
586	fn drop(&mut self) {
587		self.executor.take().map(|executors| {
588			for executor in executors { executor.close(); }
589		});
590	}
591}