jsonrpc_http_server/
lib.rs

1//! jsonrpc http server.
2//!
3//! ```no_run
4//! extern crate jsonrpc_core;
5//! extern crate jsonrpc_http_server;
6//!
7//! use jsonrpc_core::*;
8//! use jsonrpc_http_server::*;
9//!
10//! fn main() {
11//! 	let mut io = IoHandler::new();
12//! 	io.add_method("say_hello", |_: Params| {
13//! 		Ok(Value::String("hello".to_string()))
14//! 	});
15//!
16//! 	let _server = ServerBuilder::new(io).start_http(&"127.0.0.1:3030".parse().unwrap());
17//! }
18//! ```
19
20#![warn(missing_docs)]
21
22#[macro_use] extern crate log;
23extern crate unicase;
24extern crate jsonrpc_core as jsonrpc;
25extern crate jsonrpc_server_utils as server_utils;
26extern crate net2;
27
28pub extern crate hyper;
29
30mod response;
31mod handler;
32mod utils;
33#[cfg(test)]
34mod tests;
35
36use std::{fmt, io};
37use std::sync::{mpsc, Arc};
38use std::net::SocketAddr;
39
40use hyper::server;
41use jsonrpc::MetaIoHandler;
42use jsonrpc::futures::{self, Future, IntoFuture, BoxFuture, Stream};
43use jsonrpc::futures::sync::oneshot;
44use server_utils::reactor::{Remote, UninitializedRemote};
45
46pub use server_utils::hosts::{Host, DomainsValidation};
47pub use server_utils::cors::{AccessControlAllowOrigin, Origin};
48pub use server_utils::tokio_core;
49pub use handler::ServerHandler;
50pub use utils::{is_host_allowed, cors_header, CorsHeader};
51pub use response::Response;
52
53/// Result of starting the Server.
54pub type ServerResult = Result<Server, Error>;
55
56/// RPC Server startup error.
57#[derive(Debug)]
58pub enum Error {
59	/// IO Error
60	Io(std::io::Error),
61	/// Other Error (hyper)
62	Other(hyper::error::Error),
63}
64
65impl From<std::io::Error> for Error {
66	fn from(err: std::io::Error) -> Self {
67		Error::Io(err)
68	}
69}
70
71impl From<hyper::error::Error> for Error {
72	fn from(err: hyper::error::Error) -> Self {
73		match err {
74			hyper::error::Error::Io(e) => Error::Io(e),
75			e => Error::Other(e)
76		}
77	}
78}
79
80impl fmt::Display for Error {
81	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
82		match *self {
83			Error::Io(ref e) => e.fmt(f),
84			Error::Other(ref e) => e.fmt(f),
85		}
86	}
87}
88
89impl ::std::error::Error for Error {
90	fn description(&self) -> &str {
91		"Starting the JSON-RPC HTTP server failed"
92	}
93
94	fn cause(&self) -> Option<&::std::error::Error> {
95		Some(match *self {
96			Error::Io(ref e) => e,
97			Error::Other(ref e) => e,
98		})
99	}
100}
101
102/// Action undertaken by a middleware.
103pub enum RequestMiddlewareAction {
104	/// Proceed with standard RPC handling
105	Proceed {
106		/// Should the request be processed even if invalid CORS headers are detected?
107		/// This allows for side effects to take place.
108		should_continue_on_invalid_cors: bool,
109	},
110	/// Intercept the request and respond differently.
111	Respond {
112		/// Should standard hosts validation be performed?
113		should_validate_hosts: bool,
114		/// hyper handler used to process the request
115		handler: BoxFuture<server::Response, hyper::Error>,
116	}
117}
118
119impl From<Option<Response>> for RequestMiddlewareAction {
120	fn from(o: Option<Response>) -> Self {
121		o.map(Into::<server::Response>::into).map(futures::future::ok).into()
122	}
123}
124impl<T> From<Option<T>> for RequestMiddlewareAction where
125	T: IntoFuture<Item=server::Response, Error=hyper::Error>,
126	T::Future: Send + 'static,
127{
128	fn from(o: Option<T>) -> Self {
129		match o {
130			None => RequestMiddlewareAction::Proceed {
131				should_continue_on_invalid_cors: false,
132			},
133			Some(handler) => RequestMiddlewareAction::Respond {
134				should_validate_hosts: true,
135				handler: handler.into_future().boxed(),
136			},
137		}
138	}
139}
140
141/// Allows to intercept request and handle it differently.
142pub trait RequestMiddleware: Send + Sync + 'static {
143	/// Takes a request and decides how to proceed with it.
144	fn on_request(&self, request: &server::Request) -> RequestMiddlewareAction;
145}
146
147impl<F> RequestMiddleware for F where
148	F: Fn(&server::Request) -> RequestMiddlewareAction + Sync + Send + 'static,
149{
150	fn on_request(&self, request: &server::Request) -> RequestMiddlewareAction {
151		(*self)(request)
152	}
153}
154
155#[derive(Default)]
156struct NoopRequestMiddleware;
157impl RequestMiddleware for NoopRequestMiddleware {
158	fn on_request(&self, _request: &server::Request) -> RequestMiddlewareAction {
159		RequestMiddlewareAction::Proceed {
160			should_continue_on_invalid_cors: false,
161		}
162	}
163}
164
165/// Extracts metadata from the HTTP request.
166pub trait MetaExtractor<M: jsonrpc::Metadata>: Sync + Send + 'static {
167	/// Read the metadata from the request
168	fn read_metadata(&self, _: &server::Request) -> M {
169		Default::default()
170	}
171}
172
173impl<M, F> MetaExtractor<M> for F where
174	M: jsonrpc::Metadata,
175	F: Fn(&server::Request) -> M + Sync + Send + 'static,
176{
177	fn read_metadata(&self, req: &server::Request) -> M {
178		(*self)(req)
179	}
180}
181
182#[derive(Default)]
183struct NoopExtractor;
184impl<M: jsonrpc::Metadata> MetaExtractor<M> for NoopExtractor {}
185
186/// RPC Handler bundled with metadata extractor.
187pub struct Rpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
188	/// RPC Handler
189	pub handler: Arc<MetaIoHandler<M, S>>,
190	/// Metadata extractor
191	pub extractor: Arc<MetaExtractor<M>>,
192}
193
194impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for Rpc<M, S> {
195	fn clone(&self) -> Self {
196		Rpc {
197			handler: self.handler.clone(),
198			extractor: self.extractor.clone(),
199		}
200	}
201}
202
203type AllowedHosts = Option<Vec<Host>>;
204type CorsDomains = Option<Vec<AccessControlAllowOrigin>>;
205
206/// Convenient JSON-RPC HTTP Server builder.
207pub struct ServerBuilder<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
208	handler: Arc<MetaIoHandler<M, S>>,
209	remote: UninitializedRemote,
210	meta_extractor: Arc<MetaExtractor<M>>,
211	request_middleware: Arc<RequestMiddleware>,
212	cors_domains: CorsDomains,
213	allowed_hosts: AllowedHosts,
214	keep_alive: bool,
215	threads: usize,
216}
217
218const SENDER_PROOF: &'static str = "Server initialization awaits local address.";
219
220impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
221	/// Creates new `ServerBuilder` for given `IoHandler`.
222	///
223	/// If you want to re-use the same handler in couple places
224	/// see `with_remote` function.
225	///
226	/// By default:
227	/// 1. Server is not sending any CORS headers.
228	/// 2. Server is validating `Host` header.
229	pub fn new<T>(handler: T) -> Self where
230		T: Into<MetaIoHandler<M, S>>
231	{
232		ServerBuilder {
233			handler: Arc::new(handler.into()),
234			remote: UninitializedRemote::Unspawned,
235			meta_extractor: Arc::new(NoopExtractor::default()),
236			request_middleware: Arc::new(NoopRequestMiddleware::default()),
237			cors_domains: None,
238			allowed_hosts: None,
239			keep_alive: true,
240			threads: 1,
241		}
242	}
243
244	/// Utilize existing event loop remote to poll RPC results.
245	/// Applies only to 1 of the threads. Other threads will spawn their own Event Loops.
246	pub fn event_loop_remote(mut self, remote: tokio_core::reactor::Remote) -> Self {
247		self.remote = UninitializedRemote::Shared(remote);
248		self
249	}
250
251    /// Sets Enables or disables HTTP keep-alive.
252    /// Default is true.	
253	pub fn keep_alive(mut self, val: bool) -> Self {
254		self.keep_alive  = val;
255		self
256	}
257
258	/// Sets number of threads of the server to run.
259	/// Panics when set to `0`.
260	#[cfg(not(unix))]
261	pub fn threads(mut self, _threads: usize) -> Self {
262		warn!("Multi-threaded server is not available on Windows. Falling back to single thread.");
263		self
264	}
265
266	/// Sets number of threads of the server to run.
267	/// Panics when set to `0`.
268	#[cfg(unix)]
269	pub fn threads(mut self, threads: usize) -> Self {
270		self.threads = threads;
271		self
272	}
273
274	/// Configures a list of allowed CORS origins.
275	pub fn cors(mut self, cors_domains: DomainsValidation<AccessControlAllowOrigin>) -> Self {
276		self.cors_domains = cors_domains.into();
277		self
278	}
279
280	/// Configures request middleware
281	pub fn request_middleware<T: RequestMiddleware>(mut self, middleware: T) -> Self {
282		self.request_middleware = Arc::new(middleware);
283		self
284	}
285
286	/// Configures metadata extractor
287	pub fn meta_extractor<T: MetaExtractor<M>>(mut self, extractor: T) -> Self {
288		self.meta_extractor = Arc::new(extractor);
289		self
290	}
291
292	/// Allow connections only with `Host` header set to binding address.
293	pub fn allow_only_bind_host(mut self) -> Self {
294		self.allowed_hosts = Some(Vec::new());
295		self
296	}
297
298	/// Specify a list of valid `Host` headers. Binding address is allowed automatically.
299	pub fn allowed_hosts(mut self, allowed_hosts: DomainsValidation<Host>) -> Self {
300		self.allowed_hosts = allowed_hosts.into();
301		self
302	}
303
304	/// Start this JSON-RPC HTTP server trying to bind to specified `SocketAddr`.
305	pub fn start_http(self, addr: &SocketAddr) -> ServerResult {
306		let cors_domains = self.cors_domains;
307		let request_middleware = self.request_middleware;
308		let allowed_hosts = self.allowed_hosts;
309		let jsonrpc_handler = Rpc {
310			handler: self.handler,
311			extractor: self.meta_extractor,
312		};
313		let keep_alive = self.keep_alive;
314		let reuse_port = self.threads > 1;
315
316		let (local_addr_tx, local_addr_rx) = mpsc::channel();
317		let (close, shutdown_signal) = oneshot::channel();
318		let eloop = self.remote.init_with_name("http.worker0")?;
319		serve(
320			(shutdown_signal, local_addr_tx),
321			eloop.remote(),
322			addr.to_owned(),
323			cors_domains.clone(),
324			request_middleware.clone(),
325			allowed_hosts.clone(),
326			jsonrpc_handler.clone(),
327			keep_alive,
328			reuse_port,
329		);
330		let handles = (0..self.threads - 1).map(|i| {
331			let (local_addr_tx, local_addr_rx) = mpsc::channel();
332			let (close, shutdown_signal) = oneshot::channel();
333			let eloop = UninitializedRemote::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
334			serve(
335				(shutdown_signal, local_addr_tx),
336				eloop.remote(),
337				addr.to_owned(),
338				cors_domains.clone(),
339				request_middleware.clone(),
340				allowed_hosts.clone(),
341				jsonrpc_handler.clone(),
342				keep_alive,
343				reuse_port,
344			);
345			Ok((eloop, close, local_addr_rx))
346		}).collect::<io::Result<Vec<_>>>()?;
347
348		// Wait for server initialization
349		let local_addr = recv_address(local_addr_rx);
350		// Wait for other threads as well.
351		let mut handles = handles.into_iter().map(|(eloop, close, local_addr_rx)| {
352			let _ = recv_address(local_addr_rx)?;
353			Ok((eloop, close))
354		}).collect::<io::Result<(Vec<_>)>>()?;
355		handles.push((eloop, close));
356		let (remotes, close) = handles.into_iter().unzip();
357
358		Ok(Server {
359			address: local_addr?,
360			remote: Some(remotes),
361			close: Some(close),
362		})
363	}
364}
365
366fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Result<SocketAddr> {
367	local_addr_rx.recv().map_err(|_| {
368		io::Error::new(io::ErrorKind::Interrupted, "")
369	})?
370}
371
372fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
373	signals: (oneshot::Receiver<()>, mpsc::Sender<io::Result<SocketAddr>>),
374	remote: tokio_core::reactor::Remote,
375	addr: SocketAddr,
376	cors_domains: CorsDomains,
377	request_middleware: Arc<RequestMiddleware>,
378	allowed_hosts: AllowedHosts,
379	jsonrpc_handler: Rpc<M, S>,
380	keep_alive: bool,
381	reuse_port: bool,
382) {
383	let (shutdown_signal, local_addr_tx) = signals;
384	remote.spawn(move |handle| {
385		let handle1 = handle.clone();
386		let bind = move || {
387			let listener = match addr {
388				SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
389				SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
390			};
391			configure_port(reuse_port, &listener)?;
392			listener.reuse_address(true)?;
393			listener.bind(&addr)?;
394			let listener = listener.listen(1024)?;
395			let listener = tokio_core::net::TcpListener::from_listener(listener, &addr, &handle1)?;
396			// Add current host to allowed headers.
397			// NOTE: we need to use `l.local_addr()` instead of `addr`
398			// it might be different!
399			let local_addr = listener.local_addr()?;
400
401			Ok((listener, local_addr))
402		};
403
404		let bind_result = match bind() {
405			Ok((listener, local_addr)) => {
406				// Send local address
407				local_addr_tx.send(Ok(local_addr)).expect(SENDER_PROOF);
408
409				futures::future::ok((listener, local_addr))
410			},
411			Err(err) => {
412				// Send error
413				local_addr_tx.send(Err(err)).expect(SENDER_PROOF);
414
415				futures::future::err(())
416			}
417		};
418
419		let handle = handle.clone();
420		bind_result.and_then(move |(listener, local_addr)| {
421			let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
422
423			let http = {
424				let mut http = server::Http::new();
425				http.keep_alive(keep_alive);
426				http
427			};
428			listener.incoming()
429				.for_each(move |(socket, addr)| {
430					http.bind_connection(&handle, socket, addr, ServerHandler::new(
431						jsonrpc_handler.clone(),
432						cors_domains.clone(),
433						allowed_hosts.clone(),
434						request_middleware.clone(),
435					));
436					Ok(())
437				})
438				.map_err(|e| {
439					warn!("Incoming streams error, closing sever: {:?}", e);
440				})
441				.select(shutdown_signal.map_err(|e| {
442					warn!("Shutdown signaller dropped, closing server: {:?}", e);
443				}))
444				.map(|_| ())
445				.map_err(|_| ())
446		})
447	});
448}
449
450#[cfg(unix)]
451fn configure_port(reuse: bool, tcp: &net2::TcpBuilder) -> io::Result<()> {
452	use net2::unix::*;
453
454	if reuse {
455		try!(tcp.reuse_port(true));
456	}
457
458	Ok(())
459}
460
461#[cfg(not(unix))]
462fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
463    Ok(())
464}
465
466/// jsonrpc http server instance
467pub struct Server {
468	address: SocketAddr,
469	remote: Option<Vec<Remote>>,
470	close: Option<Vec<oneshot::Sender<()>>>,
471}
472
473const PROOF: &'static str = "Server is always Some until self is consumed.";
474impl Server {
475	/// Returns address of this server
476	pub fn address(&self) -> &SocketAddr {
477		&self.address
478	}
479
480	/// Closes the server.
481	pub fn close(mut self) {
482		for close in self.close.take().expect(PROOF) {
483			let _ = close.send(());
484		}
485
486		for remote in self.remote.take().expect(PROOF) {
487			remote.close();
488		}
489	}
490
491	/// Will block, waiting for the server to finish.
492	pub fn wait(mut self) {
493		for remote in self.remote.take().expect(PROOF) {
494			remote.wait();
495		}
496	}
497}
498
499impl Drop for Server {
500	fn drop(&mut self) {
501		self.remote.take().map(|remotes| {
502			for remote in remotes { remote.close(); }
503		});
504	}
505}
506