susydev_jsonrpc_http_server/
handler.rs

1use crate::Rpc;
2
3use std::sync::Arc;
4use std::{fmt, mem, str};
5
6use hyper::header::{self, HeaderMap, HeaderValue};
7use hyper::{self, service::Service, Body, Method};
8
9use crate::jsonrpc::futures::{future, Async, Future, Poll, Stream};
10use crate::jsonrpc::serde_json;
11use crate::jsonrpc::{self as core, middleware, FutureResult, Metadata, Middleware};
12use crate::response::Response;
13use crate::server_utils::cors;
14
15use crate::{utils, AllowedHosts, CorsDomains, RequestMiddleware, RequestMiddlewareAction, RestApi};
16
17/// jsonrpc http request handler.
18pub struct ServerHandler<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
19	susydev_jsonrpc_handler: Rpc<M, S>,
20	allowed_hosts: AllowedHosts,
21	cors_domains: CorsDomains,
22	cors_max_age: Option<u32>,
23	cors_allowed_headers: cors::AccessControlAllowHeaders,
24	middleware: Arc<RequestMiddleware>,
25	rest_api: RestApi,
26	health_api: Option<(String, String)>,
27	max_request_body_size: usize,
28	keep_alive: bool,
29}
30
31impl<M: Metadata, S: Middleware<M>> ServerHandler<M, S> {
32	/// Create new request handler.
33	pub fn new(
34		susydev_jsonrpc_handler: Rpc<M, S>,
35		cors_domains: CorsDomains,
36		cors_max_age: Option<u32>,
37		cors_allowed_headers: cors::AccessControlAllowHeaders,
38		allowed_hosts: AllowedHosts,
39		middleware: Arc<RequestMiddleware>,
40		rest_api: RestApi,
41		health_api: Option<(String, String)>,
42		max_request_body_size: usize,
43		keep_alive: bool,
44	) -> Self {
45		ServerHandler {
46			susydev_jsonrpc_handler,
47			allowed_hosts,
48			cors_domains,
49			cors_max_age,
50			cors_allowed_headers,
51			middleware,
52			rest_api,
53			health_api,
54			max_request_body_size,
55			keep_alive,
56		}
57	}
58}
59
60impl<M: Metadata, S: Middleware<M>> Service for ServerHandler<M, S> {
61	type ReqBody = Body;
62	type ResBody = Body;
63	type Error = hyper::Error;
64	type Future = Handler<M, S>;
65
66	fn call(&mut self, request: hyper::Request<Self::ReqBody>) -> Self::Future {
67		let is_host_allowed = utils::is_host_allowed(&request, &self.allowed_hosts);
68		let action = self.middleware.on_request(request);
69
70		let (should_validate_hosts, should_continue_on_invalid_cors, response) = match action {
71			RequestMiddlewareAction::Proceed {
72				should_continue_on_invalid_cors,
73				request,
74			} => (true, should_continue_on_invalid_cors, Err(request)),
75			RequestMiddlewareAction::Respond {
76				should_validate_hosts,
77				response,
78			} => (should_validate_hosts, false, Ok(response)),
79		};
80
81		// Validate host
82		if should_validate_hosts && !is_host_allowed {
83			return Handler::Err(Some(Response::host_not_allowed()));
84		}
85
86		// Replace response with the one returned by middleware.
87		match response {
88			Ok(response) => Handler::Middleware(response),
89			Err(request) => {
90				Handler::Rpc(RpcHandler {
91					susydev_jsonrpc_handler: self.susydev_jsonrpc_handler.clone(),
92					state: RpcHandlerState::ReadingHeaders {
93						request,
94						cors_domains: self.cors_domains.clone(),
95						cors_headers: self.cors_allowed_headers.clone(),
96						continue_on_invalid_cors: should_continue_on_invalid_cors,
97						keep_alive: self.keep_alive,
98					},
99					is_options: false,
100					cors_max_age: self.cors_max_age,
101					cors_allow_origin: cors::AllowCors::NotRequired,
102					cors_allow_headers: cors::AllowCors::NotRequired,
103					rest_api: self.rest_api,
104					health_api: self.health_api.clone(),
105					max_request_body_size: self.max_request_body_size,
106					// initial value, overwritten when reading client headers
107					keep_alive: true,
108				})
109			}
110		}
111	}
112}
113
114pub enum Handler<M: Metadata, S: Middleware<M>> {
115	Rpc(RpcHandler<M, S>),
116	Err(Option<Response>),
117	Middleware(Box<Future<Item = hyper::Response<Body>, Error = hyper::Error> + Send>),
118}
119
120impl<M: Metadata, S: Middleware<M>> Future for Handler<M, S> {
121	type Item = hyper::Response<Body>;
122	type Error = hyper::Error;
123
124	fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
125		match *self {
126			Handler::Rpc(ref mut handler) => handler.poll(),
127			Handler::Middleware(ref mut middleware) => middleware.poll(),
128			Handler::Err(ref mut response) => Ok(Async::Ready(
129				response
130					.take()
131					.expect("Response always Some initialy. Returning `Ready` so will never be polled again; qed")
132					.into(),
133			)),
134		}
135	}
136}
137
138enum RpcPollState<M, F, G>
139where
140	F: Future<Item = Option<core::Response>, Error = ()>,
141	G: Future<Item = Option<core::Output>, Error = ()>,
142{
143	Ready(RpcHandlerState<M, F, G>),
144	NotReady(RpcHandlerState<M, F, G>),
145}
146
147impl<M, F, G> RpcPollState<M, F, G>
148where
149	F: Future<Item = Option<core::Response>, Error = ()>,
150	G: Future<Item = Option<core::Output>, Error = ()>,
151{
152	fn decompose(self) -> (RpcHandlerState<M, F, G>, bool) {
153		use self::RpcPollState::*;
154		match self {
155			Ready(handler) => (handler, true),
156			NotReady(handler) => (handler, false),
157		}
158	}
159}
160
161type FutureResponse<F, G> = future::Map<
162	future::Either<future::FutureResult<Option<core::Response>, ()>, core::FutureRpcResult<F, G>>,
163	fn(Option<core::Response>) -> Response,
164>;
165
166enum RpcHandlerState<M, F, G>
167where
168	F: Future<Item = Option<core::Response>, Error = ()>,
169	G: Future<Item = Option<core::Output>, Error = ()>,
170{
171	ReadingHeaders {
172		request: hyper::Request<Body>,
173		cors_domains: CorsDomains,
174		cors_headers: cors::AccessControlAllowHeaders,
175		continue_on_invalid_cors: bool,
176		keep_alive: bool,
177	},
178	ReadingBody {
179		body: hyper::Body,
180		uri: Option<hyper::Uri>,
181		request: Vec<u8>,
182		metadata: M,
183	},
184	ProcessRest {
185		uri: hyper::Uri,
186		metadata: M,
187	},
188	ProcessHealth {
189		method: String,
190		metadata: M,
191	},
192	Writing(Response),
193	Waiting(FutureResult<F, G>),
194	WaitingForResponse(FutureResponse<F, G>),
195	Done,
196}
197
198impl<M, F, G> fmt::Debug for RpcHandlerState<M, F, G>
199where
200	F: Future<Item = Option<core::Response>, Error = ()>,
201	G: Future<Item = Option<core::Output>, Error = ()>,
202{
203	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
204		use self::RpcHandlerState::*;
205
206		match *self {
207			ReadingHeaders { .. } => write!(fmt, "ReadingHeaders"),
208			ReadingBody { .. } => write!(fmt, "ReadingBody"),
209			ProcessRest { .. } => write!(fmt, "ProcessRest"),
210			ProcessHealth { .. } => write!(fmt, "ProcessHealth"),
211			Writing(ref res) => write!(fmt, "Writing({:?})", res),
212			WaitingForResponse(_) => write!(fmt, "WaitingForResponse"),
213			Waiting(_) => write!(fmt, "Waiting"),
214			Done => write!(fmt, "Done"),
215		}
216	}
217}
218
219pub struct RpcHandler<M: Metadata, S: Middleware<M>> {
220	susydev_jsonrpc_handler: Rpc<M, S>,
221	state: RpcHandlerState<M, S::Future, S::CallFuture>,
222	is_options: bool,
223	cors_allow_origin: cors::AllowCors<header::HeaderValue>,
224	cors_allow_headers: cors::AllowCors<Vec<header::HeaderValue>>,
225	cors_max_age: Option<u32>,
226	rest_api: RestApi,
227	health_api: Option<(String, String)>,
228	max_request_body_size: usize,
229	keep_alive: bool,
230}
231
232impl<M: Metadata, S: Middleware<M>> Future for RpcHandler<M, S> {
233	type Item = hyper::Response<Body>;
234	type Error = hyper::Error;
235
236	fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
237		let new_state = match mem::replace(&mut self.state, RpcHandlerState::Done) {
238			RpcHandlerState::ReadingHeaders {
239				request,
240				cors_domains,
241				cors_headers,
242				continue_on_invalid_cors,
243				keep_alive,
244			} => {
245				// Read cors header
246				self.cors_allow_origin = utils::cors_allow_origin(&request, &cors_domains);
247				self.cors_allow_headers = utils::cors_allow_headers(&request, &cors_headers);
248				self.keep_alive = utils::keep_alive(&request, keep_alive);
249				self.is_options = *request.method() == Method::OPTIONS;
250				// Read other headers
251				RpcPollState::Ready(self.read_headers(request, continue_on_invalid_cors))
252			}
253			RpcHandlerState::ReadingBody {
254				body,
255				request,
256				metadata,
257				uri,
258			} => match self.process_body(body, request, uri, metadata) {
259				Err(BodyError::Utf8(ref e)) => {
260					let mesg = format!("utf-8 encoding error at byte {} in request body", e.valid_up_to());
261					let resp = Response::bad_request(mesg);
262					RpcPollState::Ready(RpcHandlerState::Writing(resp))
263				}
264				Err(BodyError::TooLarge) => {
265					let resp = Response::too_large("request body size exceeds allowed maximum");
266					RpcPollState::Ready(RpcHandlerState::Writing(resp))
267				}
268				Err(BodyError::Hyper(e)) => return Err(e),
269				Ok(state) => state,
270			},
271			RpcHandlerState::ProcessRest { uri, metadata } => self.process_rest(uri, metadata)?,
272			RpcHandlerState::ProcessHealth { method, metadata } => self.process_health(method, metadata)?,
273			RpcHandlerState::WaitingForResponse(mut waiting) => match waiting.poll() {
274				Ok(Async::Ready(response)) => RpcPollState::Ready(RpcHandlerState::Writing(response)),
275				Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::WaitingForResponse(waiting)),
276				Err(e) => RpcPollState::Ready(RpcHandlerState::Writing(Response::internal_error(format!("{:?}", e)))),
277			},
278			RpcHandlerState::Waiting(mut waiting) => {
279				match waiting.poll() {
280					Ok(Async::Ready(response)) => {
281						RpcPollState::Ready(RpcHandlerState::Writing(match response {
282							// Notification, just return empty response.
283							None => Response::ok(String::new()),
284							// Add new line to have nice output when using CLI clients (curl)
285							Some(result) => Response::ok(format!("{}\n", result)),
286						}))
287					}
288					Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::Waiting(waiting)),
289					Err(e) => {
290						RpcPollState::Ready(RpcHandlerState::Writing(Response::internal_error(format!("{:?}", e))))
291					}
292				}
293			}
294			state => RpcPollState::NotReady(state),
295		};
296
297		let (new_state, is_ready) = new_state.decompose();
298		match new_state {
299			RpcHandlerState::Writing(res) => {
300				let mut response: hyper::Response<Body> = res.into();
301				let cors_allow_origin = mem::replace(&mut self.cors_allow_origin, cors::AllowCors::Invalid);
302				let cors_allow_headers = mem::replace(&mut self.cors_allow_headers, cors::AllowCors::Invalid);
303
304				Self::set_response_headers(
305					response.headers_mut(),
306					self.is_options,
307					self.cors_max_age,
308					cors_allow_origin.into(),
309					cors_allow_headers.into(),
310					self.keep_alive,
311				);
312				Ok(Async::Ready(response))
313			}
314			state => {
315				self.state = state;
316				if is_ready {
317					self.poll()
318				} else {
319					Ok(Async::NotReady)
320				}
321			}
322		}
323	}
324}
325
326// Intermediate and internal error type to better distinguish
327// error cases occurring during request body processing.
328enum BodyError {
329	Hyper(hyper::Error),
330	Utf8(str::Utf8Error),
331	TooLarge,
332}
333
334impl From<hyper::Error> for BodyError {
335	fn from(e: hyper::Error) -> BodyError {
336		BodyError::Hyper(e)
337	}
338}
339
340impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S> {
341	fn read_headers(
342		&self,
343		request: hyper::Request<Body>,
344		continue_on_invalid_cors: bool,
345	) -> RpcHandlerState<M, S::Future, S::CallFuture> {
346		if self.cors_allow_origin == cors::AllowCors::Invalid && !continue_on_invalid_cors {
347			return RpcHandlerState::Writing(Response::invalid_allow_origin());
348		}
349
350		if self.cors_allow_headers == cors::AllowCors::Invalid && !continue_on_invalid_cors {
351			return RpcHandlerState::Writing(Response::invalid_allow_headers());
352		}
353
354		// Read metadata
355		let metadata = self.susydev_jsonrpc_handler.extractor.read_metadata(&request);
356
357		// Proceed
358		match *request.method() {
359			// Validate the ContentType header
360			// to prevent Cross-Origin XHRs with text/plain
361			Method::POST if Self::is_json(request.headers().get("content-type")) => {
362				let uri = if self.rest_api != RestApi::Disabled {
363					Some(request.uri().clone())
364				} else {
365					None
366				};
367				RpcHandlerState::ReadingBody {
368					metadata,
369					request: Default::default(),
370					uri,
371					body: request.into_body(),
372				}
373			}
374			Method::POST if self.rest_api == RestApi::Unsecure && request.uri().path().split('/').count() > 2 => {
375				RpcHandlerState::ProcessRest {
376					metadata,
377					uri: request.uri().clone(),
378				}
379			}
380			// Just return error for unsupported content type
381			Method::POST => RpcHandlerState::Writing(Response::unsupported_content_type()),
382			// Don't validate content type on options
383			Method::OPTIONS => RpcHandlerState::Writing(Response::empty()),
384			// Respond to health API request if there is one configured.
385			Method::GET if self.health_api.as_ref().map(|x| &*x.0) == Some(request.uri().path()) => {
386				RpcHandlerState::ProcessHealth {
387					metadata,
388					method: self
389						.health_api
390						.as_ref()
391						.map(|x| x.1.clone())
392						.expect("Health api is defined since the URI matched."),
393				}
394			}
395			// Disallow other methods.
396			_ => RpcHandlerState::Writing(Response::method_not_allowed()),
397		}
398	}
399
400	fn process_health(
401		&self,
402		method: String,
403		metadata: M,
404	) -> Result<RpcPollState<M, S::Future, S::CallFuture>, hyper::Error> {
405		use self::core::types::{Call, Failure, Id, MethodCall, Output, Params, Request, Success, Version};
406
407		// Create a request
408		let call = Request::Single(Call::MethodCall(MethodCall {
409			jsonrpc: Some(Version::V2),
410			method,
411			params: Params::None,
412			id: Id::Num(1),
413		}));
414
415		Ok(RpcPollState::Ready(RpcHandlerState::WaitingForResponse(
416			future::Either::B(self.susydev_jsonrpc_handler.handler.handle_rpc_request(call, metadata)).map(|res| match res {
417				Some(core::Response::Single(Output::Success(Success { result, .. }))) => {
418					let result = serde_json::to_string(&result).expect("Serialization of result is infallible;qed");
419
420					Response::ok(result)
421				}
422				Some(core::Response::Single(Output::Failure(Failure { error, .. }))) => {
423					let result = serde_json::to_string(&error).expect("Serialization of error is infallible;qed");
424
425					Response::service_unavailable(result)
426				}
427				e => Response::internal_error(format!("Invalid response for health request: {:?}", e)),
428			}),
429		)))
430	}
431
432	fn process_rest(
433		&self,
434		uri: hyper::Uri,
435		metadata: M,
436	) -> Result<RpcPollState<M, S::Future, S::CallFuture>, hyper::Error> {
437		use self::core::types::{Call, Id, MethodCall, Params, Request, Value, Version};
438
439		// skip the initial /
440		let mut it = uri.path().split('/').skip(1);
441
442		// parse method & params
443		let method = it.next().unwrap_or("");
444		let mut params = Vec::new();
445		for param in it {
446			let v = serde_json::from_str(param)
447				.or_else(|_| serde_json::from_str(&format!("\"{}\"", param)))
448				.unwrap_or(Value::Null);
449			params.push(v)
450		}
451
452		// Create a request
453		let call = Request::Single(Call::MethodCall(MethodCall {
454			jsonrpc: Some(Version::V2),
455			method: method.into(),
456			params: Params::Array(params),
457			id: Id::Num(1),
458		}));
459
460		Ok(RpcPollState::Ready(RpcHandlerState::Waiting(
461			future::Either::B(self.susydev_jsonrpc_handler.handler.handle_rpc_request(call, metadata)).map(|res| {
462				res.map(|x| serde_json::to_string(&x).expect("Serialization of response is infallible;qed"))
463			}),
464		)))
465	}
466
467	fn process_body(
468		&self,
469		mut body: hyper::Body,
470		mut request: Vec<u8>,
471		uri: Option<hyper::Uri>,
472		metadata: M,
473	) -> Result<RpcPollState<M, S::Future, S::CallFuture>, BodyError> {
474		loop {
475			match body.poll()? {
476				Async::Ready(Some(chunk)) => {
477					if request
478						.len()
479						.checked_add(chunk.len())
480						.map(|n| n > self.max_request_body_size)
481						.unwrap_or(true)
482					{
483						return Err(BodyError::TooLarge);
484					}
485					request.extend_from_slice(&*chunk)
486				}
487				Async::Ready(None) => {
488					if let (Some(uri), true) = (uri, request.is_empty()) {
489						return Ok(RpcPollState::Ready(RpcHandlerState::ProcessRest { uri, metadata }));
490					}
491
492					let content = match str::from_utf8(&request) {
493						Ok(content) => content,
494						Err(err) => {
495							// Return utf error.
496							return Err(BodyError::Utf8(err));
497						}
498					};
499
500					// Content is ready
501					return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(
502						self.susydev_jsonrpc_handler.handler.handle_request(content, metadata),
503					)));
504				}
505				Async::NotReady => {
506					return Ok(RpcPollState::NotReady(RpcHandlerState::ReadingBody {
507						body,
508						request,
509						metadata,
510						uri,
511					}));
512				}
513			}
514		}
515	}
516
517	fn set_response_headers(
518		headers: &mut HeaderMap,
519		is_options: bool,
520		cors_max_age: Option<u32>,
521		cors_allow_origin: Option<HeaderValue>,
522		cors_allow_headers: Option<Vec<HeaderValue>>,
523		keep_alive: bool,
524	) {
525		let as_header = |m: Method| m.as_str().parse().expect("`Method` will always parse; qed");
526		let concat = |headers: &[HeaderValue]| {
527			let separator = b", ";
528			let val = headers
529				.iter()
530				.flat_map(|h| h.as_bytes().iter().chain(separator.iter()))
531				.cloned()
532				.collect::<Vec<_>>();
533			let max_len = if val.is_empty() { 0 } else { val.len() - 2 };
534			HeaderValue::from_bytes(&val[..max_len])
535				.expect("Concatenation of valid headers with `, ` is still valid; qed")
536		};
537
538		let allowed = concat(&[as_header(Method::OPTIONS), as_header(Method::POST)]);
539
540		if is_options {
541			headers.append(header::ALLOW, allowed.clone());
542			headers.append(header::ACCEPT, HeaderValue::from_static("application/json"));
543		}
544
545		if let Some(cors_allow_origin) = cors_allow_origin {
546			headers.append(header::VARY, HeaderValue::from_static("origin"));
547			headers.append(header::ACCESS_CONTROL_ALLOW_METHODS, allowed);
548			headers.append(header::ACCESS_CONTROL_ALLOW_ORIGIN, cors_allow_origin);
549
550			if let Some(cma) = cors_max_age {
551				headers.append(
552					header::ACCESS_CONTROL_MAX_AGE,
553					HeaderValue::from_str(&cma.to_string()).expect("`u32` will always parse; qed"),
554				);
555			}
556
557			if let Some(cors_allow_headers) = cors_allow_headers {
558				if !cors_allow_headers.is_empty() {
559					headers.append(header::ACCESS_CONTROL_ALLOW_HEADERS, concat(&cors_allow_headers));
560				}
561			}
562		}
563
564		if !keep_alive {
565			headers.append(header::CONNECTION, HeaderValue::from_static("close"));
566		}
567	}
568
569	/// Returns true if the `content_type` header indicates a valid JSON
570	/// message.
571	fn is_json(content_type: Option<&header::HeaderValue>) -> bool {
572		match content_type.and_then(|val| val.to_str().ok()) {
573			Some(ref content)
574				if content.eq_ignore_ascii_case("application/json")
575					|| content.eq_ignore_ascii_case("application/json; charset=utf-8")
576					|| content.eq_ignore_ascii_case("application/json;charset=utf-8") =>
577			{
578				true
579			}
580			_ => false,
581		}
582	}
583}
584
585#[cfg(test)]
586mod test {
587	use super::{hyper, RpcHandler};
588	use susydev_jsonrpc_core::middleware::Noop;
589
590	#[test]
591	fn test_case_insensitive_content_type() {
592		let request = hyper::Request::builder()
593			.header("content-type", "Application/Json; charset=UTF-8")
594			.body(())
595			.unwrap();
596
597		let request2 = hyper::Request::builder()
598			.header("content-type", "Application/Json;charset=UTF-8")
599			.body(())
600			.unwrap();
601
602		assert_eq!(
603			request.headers().get("content-type").unwrap(),
604			&"Application/Json; charset=UTF-8"
605		);
606
607		assert_eq!(
608			RpcHandler::<(), Noop>::is_json(request.headers().get("content-type")),
609			true
610		);
611		assert_eq!(
612			RpcHandler::<(), Noop>::is_json(request2.headers().get("content-type")),
613			true
614		);
615	}
616}