rs_jsonrpc_http_server/
lib.rs

1//! jsonrpc http server.
2//!
3//! ```no_run
4//! extern crate rs_jsonrpc_core;
5//! extern crate rs_jsonrpc_http_server;
6//!
7//! use rs_jsonrpc_core::*;
8//! use rs_jsonrpc_http_server::*;
9//!
10//! fn main() {
11//! 	let mut io = IoHandler::new();
12//! 	io.add_method("say_hello", |_: Params| {
13//! 		Ok(Value::String("hello".to_string()))
14//! 	});
15//!
16//! 	let _server = ServerBuilder::new(io).start_http(&"127.0.0.1:3030".parse().unwrap());
17//! }
18//! ```
19
20#![warn(missing_docs)]
21
22extern crate unicase;
23extern crate rs_jsonrpc_core as jsonrpc;
24extern crate rs_jsonrpc_server_utils as server_utils;
25extern crate net2;
26
27pub extern crate hyper;
28
29#[macro_use]
30extern crate log;
31
32mod handler;
33mod response;
34mod utils;
35#[cfg(test)]
36mod tests;
37
38use std::io;
39use std::sync::{mpsc, Arc};
40use std::net::SocketAddr;
41
42use hyper::server;
43use jsonrpc::MetaIoHandler;
44use jsonrpc::futures::{self, Future, Stream};
45use jsonrpc::futures::sync::oneshot;
46use server_utils::reactor::{Remote, UninitializedRemote};
47
48pub use server_utils::hosts::{Host, DomainsValidation};
49pub use server_utils::cors::{AccessControlAllowOrigin, Origin};
50pub use server_utils::tokio_core;
51pub use handler::ServerHandler;
52pub use utils::{is_host_allowed, cors_header, CorsHeader};
53pub use response::Response;
54
55/// Action undertaken by a middleware.
56pub enum RequestMiddlewareAction {
57	/// Proceed with standard RPC handling
58	Proceed {
59		/// Should the request be processed even if invalid CORS headers are detected?
60		/// This allows for side effects to take place.
61		should_continue_on_invalid_cors: bool,
62		/// The request object returned
63		request: server::Request,
64	},
65	/// Intsrcept the request and respond differently.
66	Respond {
67		/// Should standard hosts validation be performed?
68		should_validate_hosts: bool,
69		/// a future for server response
70		response: Box<Future<Item=server::Response, Error=hyper::Error> + Send>,
71	}
72}
73
74impl From<Response> for RequestMiddlewareAction {
75	fn from(o: Response) -> Self {
76		RequestMiddlewareAction::Respond {
77			should_validate_hosts: true,
78			response: Box::new(futures::future::ok(o.into())),
79		}
80	}
81}
82
83impl From<server::Response> for RequestMiddlewareAction {
84	fn from(response: server::Response) -> Self {
85		RequestMiddlewareAction::Respond {
86			should_validate_hosts: true,
87			response: Box::new(futures::future::ok(response)),
88		}
89	}
90}
91
92impl From<server::Request> for RequestMiddlewareAction {
93	fn from(request: server::Request) -> Self {
94		RequestMiddlewareAction::Proceed {
95			should_continue_on_invalid_cors: false,
96			request,
97		}
98	}
99}
100
101/// Allows to intsrcept request and handle it differently.
102pub trait RequestMiddleware: Send + Sync + 'static {
103	/// Takes a request and decides how to proceed with it.
104	fn on_request(&self, request: server::Request) -> RequestMiddlewareAction;
105}
106
107impl<F> RequestMiddleware for F where
108	F: Fn(server::Request) -> RequestMiddlewareAction + Sync + Send + 'static,
109{
110	fn on_request(&self, request: server::Request) -> RequestMiddlewareAction {
111		(*self)(request)
112	}
113}
114
115#[derive(Default)]
116struct NoopRequestMiddleware;
117impl RequestMiddleware for NoopRequestMiddleware {
118	fn on_request(&self, request: server::Request) -> RequestMiddlewareAction {
119		RequestMiddlewareAction::Proceed {
120			should_continue_on_invalid_cors: false,
121			request,
122		}
123	}
124}
125
126/// Extracts metadata from the HTTP request.
127pub trait MetaExtractor<M: jsonrpc::Metadata>: Sync + Send + 'static {
128	/// Read the metadata from the request
129	fn read_metadata(&self, _: &server::Request) -> M {
130		Default::default()
131	}
132}
133
134impl<M, F> MetaExtractor<M> for F where
135	M: jsonrpc::Metadata,
136	F: Fn(&server::Request) -> M + Sync + Send + 'static,
137{
138	fn read_metadata(&self, req: &server::Request) -> M {
139		(*self)(req)
140	}
141}
142
143#[derive(Default)]
144struct NoopExtractor;
145impl<M: jsonrpc::Metadata> MetaExtractor<M> for NoopExtractor {}
146
147/// RPC Handler bundled with metadata extractor.
148pub struct Rpc<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
149	/// RPC Handler
150	pub handler: Arc<MetaIoHandler<M, S>>,
151	/// Metadata extractor
152	pub extractor: Arc<MetaExtractor<M>>,
153}
154
155impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> Clone for Rpc<M, S> {
156	fn clone(&self) -> Self {
157		Rpc {
158			handler: self.handler.clone(),
159			extractor: self.extractor.clone(),
160		}
161	}
162}
163
164type AllowedHosts = Option<Vec<Host>>;
165type CorsDomains = Option<Vec<AccessControlAllowOrigin>>;
166
167/// REST -> RPC converter state.
168#[derive(Debug, PartialEq, Clone, Copy)]
169pub enum RestApi {
170	/// The REST -> RPC converter is enabled
171	/// and requires `Content-Type: application/json` header
172	/// (even though the body should be empty).
173	/// This protects from submitting an RPC call
174	/// from unwanted origins.
175	Secure,
176	/// The REST -> RPC converter is enabled
177	/// and does not require any `Content-Type` headers.
178	/// NOTE: This allows sending RPCs via HTTP forms
179	/// from any website.
180	Unsecure,
181	/// The REST -> RPC converter is disabled.
182	Disabled,
183}
184
185/// Convenient JSON-RPC HTTP Server builder.
186pub struct ServerBuilder<M: jsonrpc::Metadata = (), S: jsonrpc::Middleware<M> = jsonrpc::NoopMiddleware> {
187	handler: Arc<MetaIoHandler<M, S>>,
188	remote: UninitializedRemote,
189	meta_extractor: Arc<MetaExtractor<M>>,
190	request_middleware: Arc<RequestMiddleware>,
191	cors_domains: CorsDomains,
192	allowed_hosts: AllowedHosts,
193	rest_api: RestApi,
194	keep_alive: bool,
195	threads: usize,
196}
197
198const SENDER_PROOF: &'static str = "Server initialization awaits local address.";
199
200impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
201	/// Creates new `ServerBuilder` for given `IoHandler`.
202	///
203	/// If you want to re-use the same handler in couple places
204	/// see `with_remote` function.
205	///
206	/// By default:
207	/// 1. Server is not sending any CORS headers.
208	/// 2. Server is validating `Host` header.
209	pub fn new<T>(handler: T) -> Self where
210		T: Into<MetaIoHandler<M, S>>
211	{
212		ServerBuilder {
213			handler: Arc::new(handler.into()),
214			remote: UninitializedRemote::Unspawned,
215			meta_extractor: Arc::new(NoopExtractor::default()),
216			request_middleware: Arc::new(NoopRequestMiddleware::default()),
217			cors_domains: None,
218			allowed_hosts: None,
219			rest_api: RestApi::Disabled,
220			keep_alive: true,
221			threads: 1,
222		}
223	}
224
225	/// Utilize existing event loop remote to poll RPC results.
226	/// Applies only to 1 of the threads. Other threads will spawn their own Event Loops.
227	pub fn event_loop_remote(mut self, remote: tokio_core::reactor::Remote) -> Self {
228		self.remote = UninitializedRemote::Shared(remote);
229		self
230	}
231
232	/// Enable the REST -> RPC converter. Allows you to invoke RPCs
233	/// by sending `POST /<method>/<param1>/<param2>` requests
234	/// (with no body). Disabled by default.
235	pub fn rest_api(mut self, rest_api: RestApi) -> Self {
236		self.rest_api = rest_api;
237		self
238	}
239
240	/// Sets Enables or disables HTTP keep-alive.
241	/// Default is true.
242	pub fn keep_alive(mut self, val: bool) -> Self {
243		self.keep_alive  = val;
244		self
245	}
246
247	/// Sets number of threads of the server to run.
248	/// Panics when set to `0`.
249	#[cfg(not(unix))]
250	pub fn threads(mut self, _threads: usize) -> Self {
251		warn!("Multi-threaded server is not available on Windows. Falling back to single thread.");
252		self
253	}
254
255	/// Sets number of threads of the server to run.
256	/// Panics when set to `0`.
257	#[cfg(unix)]
258	pub fn threads(mut self, threads: usize) -> Self {
259		self.threads = threads;
260		self
261	}
262
263	/// Configures a list of allowed CORS origins.
264	pub fn cors(mut self, cors_domains: DomainsValidation<AccessControlAllowOrigin>) -> Self {
265		self.cors_domains = cors_domains.into();
266		self
267	}
268
269	/// Configures request middleware
270	pub fn request_middleware<T: RequestMiddleware>(mut self, middleware: T) -> Self {
271		self.request_middleware = Arc::new(middleware);
272		self
273	}
274
275	/// Configures metadata extractor
276	pub fn meta_extractor<T: MetaExtractor<M>>(mut self, extractor: T) -> Self {
277		self.meta_extractor = Arc::new(extractor);
278		self
279	}
280
281	/// Allow connections only with `Host` header set to binding address.
282	pub fn allow_only_bind_host(mut self) -> Self {
283		self.allowed_hosts = Some(Vec::new());
284		self
285	}
286
287	/// Specify a list of valid `Host` headers. Binding address is allowed automatically.
288	pub fn allowed_hosts(mut self, allowed_hosts: DomainsValidation<Host>) -> Self {
289		self.allowed_hosts = allowed_hosts.into();
290		self
291	}
292
293	/// Start this JSON-RPC HTTP server trying to bind to specified `SocketAddr`.
294	pub fn start_http(self, addr: &SocketAddr) -> io::Result<Server> {
295		let cors_domains = self.cors_domains;
296		let request_middleware = self.request_middleware;
297		let allowed_hosts = self.allowed_hosts;
298		let rs_jsonrpc_handler = Rpc {
299			handler: self.handler,
300			extractor: self.meta_extractor,
301		};
302		let rest_api = self.rest_api;
303		let keep_alive = self.keep_alive;
304		let reuse_port = self.threads > 1;
305
306		let (local_addr_tx, local_addr_rx) = mpsc::channel();
307		let (close, shutdown_signal) = oneshot::channel();
308		let eloop = self.remote.init_with_name("http.worker0")?;
309		serve(
310			(shutdown_signal, local_addr_tx),
311			eloop.remote(),
312			addr.to_owned(),
313			cors_domains.clone(),
314			request_middleware.clone(),
315			allowed_hosts.clone(),
316			rs_jsonrpc_handler.clone(),
317			rest_api,
318			keep_alive,
319			reuse_port,
320		);
321		let handles = (0..self.threads - 1).map(|i| {
322			let (local_addr_tx, local_addr_rx) = mpsc::channel();
323			let (close, shutdown_signal) = oneshot::channel();
324			let eloop = UninitializedRemote::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
325			serve(
326				(shutdown_signal, local_addr_tx),
327				eloop.remote(),
328				addr.to_owned(),
329				cors_domains.clone(),
330				request_middleware.clone(),
331				allowed_hosts.clone(),
332				rs_jsonrpc_handler.clone(),
333				rest_api,
334				keep_alive,
335				reuse_port,
336			);
337			Ok((eloop, close, local_addr_rx))
338		}).collect::<io::Result<Vec<_>>>()?;
339
340		// Wait for server initialization
341		let local_addr = recv_address(local_addr_rx);
342		// Wait for other threads as well.
343		let mut handles = handles.into_iter().map(|(eloop, close, local_addr_rx)| {
344			let _ = recv_address(local_addr_rx)?;
345			Ok((eloop, close))
346		}).collect::<io::Result<(Vec<_>)>>()?;
347		handles.push((eloop, close));
348		let (remotes, close) = handles.into_iter().unzip();
349
350		Ok(Server {
351			address: local_addr?,
352			remote: Some(remotes),
353			close: Some(close),
354		})
355	}
356}
357
358fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Result<SocketAddr> {
359	local_addr_rx.recv().map_err(|_| {
360		io::Error::new(io::ErrorKind::Interrupted, "")
361	})?
362}
363
364fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
365	signals: (oneshot::Receiver<()>, mpsc::Sender<io::Result<SocketAddr>>),
366	remote: tokio_core::reactor::Remote,
367	addr: SocketAddr,
368	cors_domains: CorsDomains,
369	request_middleware: Arc<RequestMiddleware>,
370	allowed_hosts: AllowedHosts,
371	rs_jsonrpc_handler: Rpc<M, S>,
372	rest_api: RestApi,
373	keep_alive: bool,
374	reuse_port: bool,
375) {
376	let (shutdown_signal, local_addr_tx) = signals;
377	remote.spawn(move |handle| {
378		let handle1 = handle.clone();
379		let bind = move || {
380			let listener = match addr {
381				SocketAddr::V4(_) => net2::TcpBuilder::new_v4()?,
382				SocketAddr::V6(_) => net2::TcpBuilder::new_v6()?,
383			};
384			configure_port(reuse_port, &listener)?;
385			listener.reuse_address(true)?;
386			listener.bind(&addr)?;
387			let listener = listener.listen(1024)?;
388			let listener = tokio_core::net::TcpListener::from_listener(listener, &addr, &handle1)?;
389			// Add current host to allowed headers.
390			// NOTE: we need to use `l.local_addr()` instead of `addr`
391			// it might be different!
392			let local_addr = listener.local_addr()?;
393
394			Ok((listener, local_addr))
395		};
396
397		let bind_result = match bind() {
398			Ok((listener, local_addr)) => {
399				// Send local address
400				local_addr_tx.send(Ok(local_addr)).expect(SENDER_PROOF);
401
402				futures::future::ok((listener, local_addr))
403			},
404			Err(err) => {
405				// Send error
406				local_addr_tx.send(Err(err)).expect(SENDER_PROOF);
407
408				futures::future::err(())
409			}
410		};
411
412		let handle = handle.clone();
413		bind_result.and_then(move |(listener, local_addr)| {
414			let allowed_hosts = server_utils::hosts::update(allowed_hosts, &local_addr);
415
416			let http = {
417				let mut http = server::Http::new();
418				http.keep_alive(keep_alive);
419				http
420			};
421			listener.incoming()
422				.for_each(move |(socket, addr)| {
423					http.bind_connection(&handle, socket, addr, ServerHandler::new(
424						rs_jsonrpc_handler.clone(),
425						cors_domains.clone(),
426						allowed_hosts.clone(),
427						request_middleware.clone(),
428						rest_api,
429					));
430					Ok(())
431				})
432				.map_err(|e| {
433					warn!("Incoming streams error, closing sever: {:?}", e);
434				})
435				.select(shutdown_signal.map_err(|e| {
436					debug!("Shutdown signaller dropped, closing server: {:?}", e);
437				}))
438				.map(|_| ())
439				.map_err(|_| ())
440		})
441	});
442}
443
444#[cfg(unix)]
445fn configure_port(reuse: bool, tcp: &net2::TcpBuilder) -> io::Result<()> {
446	use net2::unix::*;
447
448	if reuse {
449		try!(tcp.reuse_port(true));
450	}
451
452	Ok(())
453}
454
455#[cfg(not(unix))]
456fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
457    Ok(())
458}
459
460/// jsonrpc http server instance
461pub struct Server {
462	address: SocketAddr,
463	remote: Option<Vec<Remote>>,
464	close: Option<Vec<oneshot::Sender<()>>>,
465}
466
467const PROOF: &'static str = "Server is always Some until self is consumed.";
468impl Server {
469	/// Returns address of this server
470	pub fn address(&self) -> &SocketAddr {
471		&self.address
472	}
473
474	/// Closes the server.
475	pub fn close(mut self) {
476		for close in self.close.take().expect(PROOF) {
477			let _ = close.send(());
478		}
479
480		for remote in self.remote.take().expect(PROOF) {
481			remote.close();
482		}
483	}
484
485	/// Will block, waiting for the server to finish.
486	pub fn wait(mut self) {
487		for remote in self.remote.take().expect(PROOF) {
488			remote.wait();
489		}
490	}
491}
492
493impl Drop for Server {
494	fn drop(&mut self) {
495		self.remote.take().map(|remotes| {
496			for remote in remotes { remote.close(); }
497		});
498	}
499}
500