bitconch_jsonrpc_http_server/
handler.rs

1use Rpc;
2
3use std::{fmt, mem, str};
4use std::sync::Arc;
5
6use hyper::{self, service::Service, Body, Method};
7use hyper::header::{self, HeaderMap, HeaderValue};
8
9use jsonrpc::{self as core, FutureResult, Metadata, Middleware, NoopMiddleware, FutureRpcResult};
10use jsonrpc::futures::{Future, Poll, Async, Stream, future};
11use jsonrpc::serde_json;
12use response::Response;
13use server_utils::cors;
14
15use {utils, RequestMiddleware, RequestMiddlewareAction, CorsDomains, AllowedHosts, RestApi};
16
17/// jsonrpc http request handler.
18pub struct ServerHandler<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
19	jsonrpc_handler: Rpc<M, S>,
20	allowed_hosts: AllowedHosts,
21	cors_domains: CorsDomains,
22	cors_max_age: Option<u32>,
23	cors_allowed_headers: cors::AccessControlAllowHeaders,
24	middleware: Arc<RequestMiddleware>,
25	rest_api: RestApi,
26	health_api: Option<(String, String)>,
27	max_request_body_size: usize,
28}
29
30impl<M: Metadata, S: Middleware<M>> ServerHandler<M, S> {
31	/// Create new request handler.
32	pub fn new(
33		jsonrpc_handler: Rpc<M, S>,
34		cors_domains: CorsDomains,
35		cors_max_age: Option<u32>,
36		cors_allowed_headers: cors::AccessControlAllowHeaders,
37		allowed_hosts: AllowedHosts,
38		middleware: Arc<RequestMiddleware>,
39		rest_api: RestApi,
40		health_api: Option<(String, String)>,
41		max_request_body_size: usize,
42	) -> Self {
43		ServerHandler {
44			jsonrpc_handler,
45			allowed_hosts,
46			cors_domains,
47			cors_max_age,
48			cors_allowed_headers,
49			middleware,
50			rest_api,
51			health_api,
52			max_request_body_size,
53		}
54	}
55}
56
57impl<M: Metadata, S: Middleware<M>> Service for ServerHandler<M, S> {
58	type ReqBody = Body;
59	type ResBody = Body;
60	type Error = hyper::Error;
61	type Future = Handler<M, S>;
62
63	fn call(&mut self, request: hyper::Request<Self::ReqBody>) -> Self::Future {
64		let is_host_allowed = utils::is_host_allowed(&request, &self.allowed_hosts);
65		let action = self.middleware.on_request(request);
66
67		let (should_validate_hosts, should_continue_on_invalid_cors, response) = match action {
68			RequestMiddlewareAction::Proceed { should_continue_on_invalid_cors, request }=> (
69				true, should_continue_on_invalid_cors, Err(request)
70			),
71			RequestMiddlewareAction::Respond { should_validate_hosts, response } => (
72				should_validate_hosts, false, Ok(response)
73			),
74		};
75
76		// Validate host
77		if should_validate_hosts && !is_host_allowed {
78			return Handler::Error(Some(Response::host_not_allowed()));
79		}
80
81		// Replace response with the one returned by middleware.
82		match response {
83			Ok(response) => Handler::Middleware(response),
84			Err(request) => {
85				Handler::Rpc(RpcHandler {
86					jsonrpc_handler: self.jsonrpc_handler.clone(),
87					state: RpcHandlerState::ReadingHeaders {
88						request,
89						cors_domains: self.cors_domains.clone(),
90						cors_headers: self.cors_allowed_headers.clone(),
91						continue_on_invalid_cors: should_continue_on_invalid_cors,
92					},
93					is_options: false,
94					cors_max_age: self.cors_max_age,
95					cors_allow_origin: cors::AllowCors::NotRequired,
96					cors_allow_headers: cors::AllowCors::NotRequired,
97					rest_api: self.rest_api,
98					health_api: self.health_api.clone(),
99					max_request_body_size: self.max_request_body_size,
100				})
101			}
102		}
103	}
104}
105
106pub enum Handler<M: Metadata, S: Middleware<M>> {
107	Rpc(RpcHandler<M, S>),
108	Error(Option<Response>),
109	Middleware(Box<Future<Item = hyper::Response<Body>, Error = hyper::Error> + Send>),
110}
111
112impl<M: Metadata, S: Middleware<M>> Future for Handler<M, S> {
113	type Item = hyper::Response<Body>;
114	type Error = hyper::Error;
115
116	fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
117		match *self {
118			Handler::Rpc(ref mut handler) => handler.poll(),
119			Handler::Middleware(ref mut middleware) => middleware.poll(),
120			Handler::Error(ref mut response) => Ok(Async::Ready(
121				response.take().expect("Response always Some initialy. Returning `Ready` so will never be polled again; qed").into()
122			)),
123		}
124	}
125}
126
127enum RpcPollState<M, F> where
128	F: Future<Item = Option<core::Response>, Error = ()>,
129{
130	Ready(RpcHandlerState<M, F>),
131	NotReady(RpcHandlerState<M, F>),
132}
133
134impl<M, F> RpcPollState<M, F> where
135	F: Future<Item = Option<core::Response>, Error = ()>,
136{
137	fn decompose(self) -> (RpcHandlerState<M, F>, bool) {
138		use self::RpcPollState::*;
139		match self {
140			Ready(handler) => (handler, true),
141			NotReady(handler) => (handler, false),
142		}
143	}
144}
145
146type FutureResponse<F> = future::Map<
147	future::Either<future::FutureResult<Option<core::Response>, ()>, FutureRpcResult<F>>,
148	fn(Option<core::Response>) -> Response,
149>;
150
151
152enum RpcHandlerState<M, F> where
153	F: Future<Item = Option<core::Response>, Error = ()>,
154{
155	ReadingHeaders {
156		request: hyper::Request<Body>,
157		cors_domains: CorsDomains,
158		cors_headers: cors::AccessControlAllowHeaders,
159		continue_on_invalid_cors: bool,
160	},
161	ReadingBody {
162		body: hyper::Body,
163		uri: Option<hyper::Uri>,
164		request: Vec<u8>,
165		metadata: M,
166	},
167	ProcessRest {
168		uri: hyper::Uri,
169		metadata: M,
170	},
171	ProcessHealth {
172		method: String,
173		metadata: M,
174	},
175	Writing(Response),
176	WaitingForResponse(FutureResponse<F>),
177	Waiting(FutureResult<F>),
178	Done,
179}
180
181impl<M, F> fmt::Debug for RpcHandlerState<M, F> where
182	F: Future<Item = Option<core::Response>, Error = ()>,
183{
184	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
185		use self::RpcHandlerState::*;
186
187		match *self {
188			ReadingHeaders {..} => write!(fmt, "ReadingHeaders"),
189			ReadingBody {..} => write!(fmt, "ReadingBody"),
190			ProcessRest {..} => write!(fmt, "ProcessRest"),
191			ProcessHealth {..} => write!(fmt, "ProcessHealth"),
192			Writing(ref res) => write!(fmt, "Writing({:?})", res),
193			WaitingForResponse(_) => write!(fmt, "WaitingForResponse"),
194			Waiting(_) => write!(fmt, "Waiting"),
195			Done => write!(fmt, "Done"),
196		}
197	}
198}
199
200pub struct RpcHandler<M: Metadata, S: Middleware<M>> {
201	jsonrpc_handler: Rpc<M, S>,
202	state: RpcHandlerState<M, S::Future>,
203	is_options: bool,
204	cors_allow_origin: cors::AllowCors<header::HeaderValue>,
205	cors_allow_headers: cors::AllowCors<Vec<header::HeaderValue>>,
206	cors_max_age: Option<u32>,
207	rest_api: RestApi,
208	health_api: Option<(String, String)>,
209	max_request_body_size: usize,
210}
211
212impl<M: Metadata, S: Middleware<M>> Future for RpcHandler<M, S> {
213	type Item = hyper::Response<Body>;
214	type Error = hyper::Error;
215
216	fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
217		let new_state = match mem::replace(&mut self.state, RpcHandlerState::Done) {
218			RpcHandlerState::ReadingHeaders { request, cors_domains, cors_headers, continue_on_invalid_cors, } => {
219				// Read cors header
220				self.cors_allow_origin = utils::cors_allow_origin(&request, &cors_domains);
221				self.cors_allow_headers = utils::cors_allow_headers(&request, &cors_headers);
222				self.is_options = *request.method() == Method::OPTIONS;
223				// Read other headers
224				RpcPollState::Ready(self.read_headers(request, continue_on_invalid_cors))
225			},
226			RpcHandlerState::ReadingBody { body, request, metadata, uri, } => {
227				match self.process_body(body, request, uri, metadata) {
228					Err(BodyError::Utf8(ref e)) => {
229						let mesg = format!("utf-8 encoding error at byte {} in request body", e.valid_up_to());
230						let resp = Response::bad_request(mesg);
231						RpcPollState::Ready(RpcHandlerState::Writing(resp))
232					}
233					Err(BodyError::TooLarge) => {
234						let resp = Response::too_large("request body size exceeds allowed maximum");
235						RpcPollState::Ready(RpcHandlerState::Writing(resp))
236					}
237					Err(BodyError::Hyper(e)) => return Err(e),
238					Ok(state) => state,
239				}
240			},
241			RpcHandlerState::ProcessRest { uri, metadata } => {
242				self.process_rest(uri, metadata)?
243			},
244			RpcHandlerState::ProcessHealth { method, metadata } => {
245				self.process_health(method, metadata)?
246			},
247			RpcHandlerState::WaitingForResponse(mut waiting) => {
248				match waiting.poll() {
249					Ok(Async::Ready(response)) => RpcPollState::Ready(RpcHandlerState::Writing(response.into())),
250					Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::WaitingForResponse(waiting)),
251					Err(e) => RpcPollState::Ready(RpcHandlerState::Writing(
252						Response::internal_error(format!("{:?}", e))
253					)),
254				}
255			},
256			RpcHandlerState::Waiting(mut waiting) => {
257				match waiting.poll() {
258					Ok(Async::Ready(response)) => {
259						RpcPollState::Ready(RpcHandlerState::Writing(match response {
260							// Notification, just return empty response.
261							None => Response::ok(String::new()),
262							// Add new line to have nice output when using CLI clients (curl)
263							Some(result) => Response::ok(format!("{}\n", result)),
264						}.into()))
265					},
266					Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::Waiting(waiting)),
267					Err(e) => RpcPollState::Ready(RpcHandlerState::Writing(
268						Response::internal_error(format!("{:?}", e))
269					)),
270				}
271			},
272			state => RpcPollState::NotReady(state),
273		};
274
275		let (new_state, is_ready) = new_state.decompose();
276		match new_state {
277			RpcHandlerState::Writing(res) => {
278				let mut response: hyper::Response<Body> = res.into();
279				let cors_allow_origin = mem::replace(&mut self.cors_allow_origin, cors::AllowCors::Invalid);
280				let cors_allow_headers = mem::replace(&mut self.cors_allow_headers, cors::AllowCors::Invalid);
281
282				Self::set_response_headers(
283					response.headers_mut(),
284					self.is_options,
285					self.cors_max_age,
286					cors_allow_origin.into(),
287					cors_allow_headers.into(),
288				);
289				Ok(Async::Ready(response))
290			},
291			state => {
292				self.state = state;
293				if is_ready {
294					self.poll()
295				} else {
296					Ok(Async::NotReady)
297				}
298			},
299		}
300	}
301}
302
303// Intermediate and internal error type to better distinguish
304// error cases occurring during request body processing.
305enum BodyError {
306	Hyper(hyper::Error),
307	Utf8(str::Utf8Error),
308	TooLarge,
309}
310
311impl From<hyper::Error> for BodyError {
312	fn from(e: hyper::Error) -> BodyError {
313		BodyError::Hyper(e)
314	}
315}
316
317impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S> {
318	fn read_headers(
319		&self,
320		request: hyper::Request<Body>,
321		continue_on_invalid_cors: bool,
322	) -> RpcHandlerState<M, S::Future> {
323		if self.cors_allow_origin == cors::AllowCors::Invalid && !continue_on_invalid_cors {
324			return RpcHandlerState::Writing(Response::invalid_allow_origin());
325		}
326		if self.cors_allow_headers == cors::AllowCors::Invalid && !continue_on_invalid_cors {
327			return RpcHandlerState::Writing(Response::invalid_allow_headers());
328		}
329
330		// Read metadata
331		let metadata = self.jsonrpc_handler.extractor.read_metadata(&request);
332
333		// Proceed
334		match *request.method() {
335			// Validate the ContentType header
336			// to prevent Cross-Origin XHRs with text/plain
337			Method::POST if Self::is_json(request.headers().get("content-type")) => {
338				let uri = if self.rest_api != RestApi::Disabled { Some(request.uri().clone()) } else { None };
339				RpcHandlerState::ReadingBody {
340					metadata,
341					request: Default::default(),
342					uri,
343					body: request.into_body(),
344				}
345			},
346			Method::POST if self.rest_api == RestApi::Unsecure && request.uri().path().split('/').count() > 2 => {
347				RpcHandlerState::ProcessRest {
348					metadata,
349					uri: request.uri().clone(),
350				}
351			},
352			// Just return error for unsupported content type
353			Method::POST => {
354				RpcHandlerState::Writing(Response::unsupported_content_type())
355			},
356			// Don't validate content type on options
357			Method::OPTIONS => {
358				RpcHandlerState::Writing(Response::empty())
359			},
360			// Respond to health API request if there is one configured.
361			Method::GET if self.health_api.as_ref().map(|x| &*x.0) == Some(request.uri().path()) => {
362				RpcHandlerState::ProcessHealth {
363					metadata,
364					method: self.health_api.as_ref()
365							.map(|x| x.1.clone())
366							.expect("Health api is defined since the URI matched."),
367				}
368			},
369			// Disallow other methods.
370			_ => {
371				RpcHandlerState::Writing(Response::method_not_allowed())
372			},
373		}
374	}
375
376	fn process_health(
377		&self,
378		method: String,
379		metadata: M,
380	) -> Result<RpcPollState<M, S::Future>, hyper::Error> {
381		use self::core::types::{Call, MethodCall, Version, Params, Request, Id, Output, Success, Failure};
382
383		// Create a request
384		let call = Request::Single(Call::MethodCall(MethodCall {
385			jsonrpc: Some(Version::V2),
386			method,
387			params: Params::None,
388			id: Id::Num(1),
389		}));
390
391		return Ok(RpcPollState::Ready(RpcHandlerState::WaitingForResponse(
392			future::Either::B(self.jsonrpc_handler.handler.handle_rpc_request(call, metadata))
393				.map(|res| match res {
394					Some(core::Response::Single(Output::Success(Success { result, .. }))) => {
395						let result = serde_json::to_string(&result)
396							.expect("Serialization of result is infallible;qed");
397
398						Response::ok(result)
399					},
400					Some(core::Response::Single(Output::Failure(Failure { error, .. }))) => {
401						let result = serde_json::to_string(&error)
402							.expect("Serialization of error is infallible;qed");
403
404						Response::service_unavailable(result)
405					},
406					e => Response::internal_error(format!("Invalid response for health request: {:?}", e)),
407				})
408		)));
409	}
410
411	fn process_rest(
412		&self,
413		uri: hyper::Uri,
414		metadata: M,
415	) -> Result<RpcPollState<M, S::Future>, hyper::Error> {
416		use self::core::types::{Call, MethodCall, Version, Params, Request, Id, Value};
417
418		// skip the initial /
419		let mut it = uri.path().split('/').skip(1);
420
421		// parse method & params
422		let method = it.next().unwrap_or("");
423		let mut params = Vec::new();
424		for param in it {
425			let v = serde_json::from_str(param)
426				.or_else(|_| serde_json::from_str(&format!("\"{}\"", param)))
427				.unwrap_or(Value::Null);
428			params.push(v)
429		}
430
431		// Create a request
432		let call = Request::Single(Call::MethodCall(MethodCall {
433			jsonrpc: Some(Version::V2),
434			method: method.into(),
435			params: Params::Array(params),
436			id: Id::Num(1),
437		}));
438
439		return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(
440			future::Either::B(self.jsonrpc_handler.handler.handle_rpc_request(call, metadata))
441				.map(|res| res.map(|x| serde_json::to_string(&x)
442					.expect("Serialization of response is infallible;qed")
443				))
444		)));
445	}
446
447	fn process_body(
448		&self,
449		mut body: hyper::Body,
450		mut request: Vec<u8>,
451		uri: Option<hyper::Uri>,
452		metadata: M,
453	) -> Result<RpcPollState<M, S::Future>, BodyError> {
454		loop {
455			match body.poll()? {
456				Async::Ready(Some(chunk)) => {
457					if request.len().checked_add(chunk.len()).map(|n| n > self.max_request_body_size).unwrap_or(true) {
458						return Err(BodyError::TooLarge)
459					}
460					request.extend_from_slice(&*chunk)
461				},
462				Async::Ready(None) => {
463					if let (Some(uri), true) = (uri, request.is_empty()) {
464						return Ok(RpcPollState::Ready(RpcHandlerState::ProcessRest {
465							uri,
466							metadata,
467						}));
468					}
469
470					let content = match str::from_utf8(&request) {
471						Ok(content) => content,
472						Err(err) => {
473							// Return utf error.
474							return Err(BodyError::Utf8(err));
475						},
476					};
477
478					// Content is ready
479					return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(
480						self.jsonrpc_handler.handler.handle_request(content, metadata)
481					)));
482				},
483				Async::NotReady => {
484					return Ok(RpcPollState::NotReady(RpcHandlerState::ReadingBody {
485						body,
486						request,
487						metadata,
488						uri,
489					}));
490				},
491			}
492		}
493	}
494
495	fn set_response_headers(
496		headers: &mut HeaderMap,
497		is_options: bool,
498		cors_max_age: Option<u32>,
499		cors_allow_origin: Option<HeaderValue>,
500		cors_allow_headers: Option<Vec<HeaderValue>>,
501	) {
502		let as_header = |m: Method| m.as_str().parse().expect("`Method` will always parse; qed");
503		let concat = |headers: &[HeaderValue]| {
504			let separator = b", ";
505			let val = headers
506				.iter()
507				.flat_map(|h| h.as_bytes().iter().chain(separator.iter()))
508				.cloned()
509				.collect::<Vec<_>>();
510			let max_len = if val.is_empty() { 0 } else { val.len() - 2 };
511			HeaderValue::from_bytes(&val[..max_len]).expect("Concatenation of valid headers with `, ` is still valid; qed")
512		};
513
514		let allowed = concat(&[as_header(Method::OPTIONS), as_header(Method::POST)]);
515
516		if is_options {
517			headers.append(header::ALLOW, allowed.clone());
518			headers.append(header::ACCEPT, HeaderValue::from_static("application/json"));
519		}
520
521		if let Some(cors_allow_origin) = cors_allow_origin {
522			headers.append(header::VARY, HeaderValue::from_static("origin"));
523			headers.append(header::ACCESS_CONTROL_ALLOW_METHODS, allowed);
524			headers.append(header::ACCESS_CONTROL_ALLOW_ORIGIN, cors_allow_origin);
525
526			if let Some(cma) = cors_max_age {
527				headers.append(
528					header::ACCESS_CONTROL_MAX_AGE,
529					HeaderValue::from_str(&cma.to_string()).expect("`u32` will always parse; qed")
530				);
531			}
532
533			if let Some(cors_allow_headers) = cors_allow_headers {
534				if !cors_allow_headers.is_empty() {
535					headers.append(header::ACCESS_CONTROL_ALLOW_HEADERS, concat(&cors_allow_headers));
536				}
537			}
538		}
539	}
540
541	/// Returns true if the `content_type` header indicates a valid JSON
542	/// message.
543	fn is_json(content_type: Option<&header::HeaderValue>) -> bool {
544		match content_type.and_then(|val| val.to_str().ok()) {
545			Some("application/json") => true,
546			Some("application/json; charset=utf-8") => true,
547			_ => false,
548		}
549	}
550}