enfipy_jsonrpc_http_server/
handler.rs

1use crate::WeakRpc;
2
3use std::sync::Arc;
4use std::{fmt, mem, str};
5
6use hyper::header::{self, HeaderMap, HeaderValue};
7use hyper::{self, service::Service, Body, Method};
8
9use crate::jsonrpc::serde_json;
10use crate::jsonrpc::{self as core, middleware, Metadata, Middleware};
11use crate::response::Response;
12use crate::server_utils::cors;
13use futures01::{Async, Future, Poll, Stream};
14
15use crate::{utils, AllowedHosts, CorsDomains, RequestMiddleware, RequestMiddlewareAction, RestApi};
16
17/// jsonrpc http request handler.
18pub struct ServerHandler<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
19	jsonrpc_handler: WeakRpc<M, S>,
20	allowed_hosts: AllowedHosts,
21	cors_domains: CorsDomains,
22	cors_max_age: Option<u32>,
23	cors_allowed_headers: cors::AccessControlAllowHeaders,
24	middleware: Arc<dyn RequestMiddleware>,
25	rest_api: RestApi,
26	health_api: Option<(String, String)>,
27	max_request_body_size: usize,
28	keep_alive: bool,
29}
30
31impl<M: Metadata, S: Middleware<M>> ServerHandler<M, S> {
32	/// Create new request handler.
33	pub fn new(
34		jsonrpc_handler: WeakRpc<M, S>,
35		cors_domains: CorsDomains,
36		cors_max_age: Option<u32>,
37		cors_allowed_headers: cors::AccessControlAllowHeaders,
38		allowed_hosts: AllowedHosts,
39		middleware: Arc<dyn RequestMiddleware>,
40		rest_api: RestApi,
41		health_api: Option<(String, String)>,
42		max_request_body_size: usize,
43		keep_alive: bool,
44	) -> Self {
45		ServerHandler {
46			jsonrpc_handler,
47			allowed_hosts,
48			cors_domains,
49			cors_max_age,
50			cors_allowed_headers,
51			middleware,
52			rest_api,
53			health_api,
54			max_request_body_size,
55			keep_alive,
56		}
57	}
58}
59
60impl<M: Metadata, S: Middleware<M>> Service for ServerHandler<M, S>
61where
62	S::Future: Unpin,
63	S::CallFuture: Unpin,
64{
65	type ReqBody = Body;
66	type ResBody = Body;
67	type Error = hyper::Error;
68	type Future = Handler<M, S>;
69
70	fn call(&mut self, request: hyper::Request<Self::ReqBody>) -> Self::Future {
71		let is_host_allowed = utils::is_host_allowed(&request, &self.allowed_hosts);
72		let action = self.middleware.on_request(request);
73
74		let (should_validate_hosts, should_continue_on_invalid_cors, response) = match action {
75			RequestMiddlewareAction::Proceed {
76				should_continue_on_invalid_cors,
77				request,
78			} => (true, should_continue_on_invalid_cors, Err(request)),
79			RequestMiddlewareAction::Respond {
80				should_validate_hosts,
81				response,
82			} => (should_validate_hosts, false, Ok(response)),
83		};
84
85		// Validate host
86		if should_validate_hosts && !is_host_allowed {
87			return Handler::Err(Some(Response::host_not_allowed()));
88		}
89
90		// Replace response with the one returned by middleware.
91		match response {
92			Ok(response) => Handler::Middleware(response),
93			Err(request) => {
94				Handler::Rpc(RpcHandler {
95					jsonrpc_handler: self.jsonrpc_handler.clone(),
96					state: RpcHandlerState::ReadingHeaders {
97						request,
98						cors_domains: self.cors_domains.clone(),
99						cors_headers: self.cors_allowed_headers.clone(),
100						continue_on_invalid_cors: should_continue_on_invalid_cors,
101						keep_alive: self.keep_alive,
102					},
103					is_options: false,
104					cors_max_age: self.cors_max_age,
105					cors_allow_origin: cors::AllowCors::NotRequired,
106					cors_allow_headers: cors::AllowCors::NotRequired,
107					rest_api: self.rest_api,
108					health_api: self.health_api.clone(),
109					max_request_body_size: self.max_request_body_size,
110					// initial value, overwritten when reading client headers
111					keep_alive: true,
112				})
113			}
114		}
115	}
116}
117
118pub enum Handler<M: Metadata, S: Middleware<M>> {
119	Rpc(RpcHandler<M, S>),
120	Err(Option<Response>),
121	Middleware(Box<dyn Future<Item = hyper::Response<Body>, Error = hyper::Error> + Send>),
122}
123
124impl<M: Metadata, S: Middleware<M>> Future for Handler<M, S>
125where
126	S::Future: Unpin,
127	S::CallFuture: Unpin,
128{
129	type Item = hyper::Response<Body>;
130	type Error = hyper::Error;
131
132	fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
133		match *self {
134			Handler::Rpc(ref mut handler) => handler.poll(),
135			Handler::Middleware(ref mut middleware) => middleware.poll(),
136			Handler::Err(ref mut response) => Ok(Async::Ready(
137				response
138					.take()
139					.expect("Response always Some initialy. Returning `Ready` so will never be polled again; qed")
140					.into(),
141			)),
142		}
143	}
144}
145
146enum RpcPollState<M> {
147	Ready(RpcHandlerState<M>),
148	NotReady(RpcHandlerState<M>),
149}
150
151impl<M> RpcPollState<M> {
152	fn decompose(self) -> (RpcHandlerState<M>, bool) {
153		use self::RpcPollState::*;
154		match self {
155			Ready(handler) => (handler, true),
156			NotReady(handler) => (handler, false),
157		}
158	}
159}
160
161enum RpcHandlerState<M> {
162	ReadingHeaders {
163		request: hyper::Request<Body>,
164		cors_domains: CorsDomains,
165		cors_headers: cors::AccessControlAllowHeaders,
166		continue_on_invalid_cors: bool,
167		keep_alive: bool,
168	},
169	ReadingBody {
170		body: hyper::Body,
171		uri: Option<hyper::Uri>,
172		request: Vec<u8>,
173		metadata: M,
174	},
175	ProcessRest {
176		uri: hyper::Uri,
177		metadata: M,
178	},
179	ProcessHealth {
180		method: String,
181		metadata: M,
182	},
183	Writing(Response),
184	Waiting(Box<dyn Future<Item = Option<String>, Error = ()> + Send>),
185	WaitingForResponse(Box<dyn Future<Item = Response, Error = ()> + Send>),
186	Done,
187}
188
189impl<M> fmt::Debug for RpcHandlerState<M> {
190	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
191		use self::RpcHandlerState::*;
192
193		match *self {
194			ReadingHeaders { .. } => write!(fmt, "ReadingHeaders"),
195			ReadingBody { .. } => write!(fmt, "ReadingBody"),
196			ProcessRest { .. } => write!(fmt, "ProcessRest"),
197			ProcessHealth { .. } => write!(fmt, "ProcessHealth"),
198			Writing(ref res) => write!(fmt, "Writing({:?})", res),
199			WaitingForResponse(_) => write!(fmt, "WaitingForResponse"),
200			Waiting(_) => write!(fmt, "Waiting"),
201			Done => write!(fmt, "Done"),
202		}
203	}
204}
205
206pub struct RpcHandler<M: Metadata, S: Middleware<M>> {
207	jsonrpc_handler: WeakRpc<M, S>,
208	state: RpcHandlerState<M>,
209	is_options: bool,
210	cors_allow_origin: cors::AllowCors<header::HeaderValue>,
211	cors_allow_headers: cors::AllowCors<Vec<header::HeaderValue>>,
212	cors_max_age: Option<u32>,
213	rest_api: RestApi,
214	health_api: Option<(String, String)>,
215	max_request_body_size: usize,
216	keep_alive: bool,
217}
218
219impl<M: Metadata, S: Middleware<M>> Future for RpcHandler<M, S>
220where
221	S::Future: Unpin,
222	S::CallFuture: Unpin,
223{
224	type Item = hyper::Response<Body>;
225	type Error = hyper::Error;
226
227	fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
228		let new_state = match mem::replace(&mut self.state, RpcHandlerState::Done) {
229			RpcHandlerState::ReadingHeaders {
230				request,
231				cors_domains,
232				cors_headers,
233				continue_on_invalid_cors,
234				keep_alive,
235			} => {
236				// Read cors header
237				self.cors_allow_origin = utils::cors_allow_origin(&request, &cors_domains);
238				self.cors_allow_headers = utils::cors_allow_headers(&request, &cors_headers);
239				self.keep_alive = utils::keep_alive(&request, keep_alive);
240				self.is_options = *request.method() == Method::OPTIONS;
241				// Read other headers
242				RpcPollState::Ready(self.read_headers(request, continue_on_invalid_cors))
243			}
244			RpcHandlerState::ReadingBody {
245				body,
246				request,
247				metadata,
248				uri,
249			} => match self.process_body(body, request, uri, metadata) {
250				Err(BodyError::Utf8(ref e)) => {
251					let mesg = format!("utf-8 encoding error at byte {} in request body", e.valid_up_to());
252					let resp = Response::bad_request(mesg);
253					RpcPollState::Ready(RpcHandlerState::Writing(resp))
254				}
255				Err(BodyError::TooLarge) => {
256					let resp = Response::too_large("request body size exceeds allowed maximum");
257					RpcPollState::Ready(RpcHandlerState::Writing(resp))
258				}
259				Err(BodyError::Hyper(e)) => return Err(e),
260				Ok(state) => state,
261			},
262			RpcHandlerState::ProcessRest { uri, metadata } => self.process_rest(uri, metadata)?,
263			RpcHandlerState::ProcessHealth { method, metadata } => self.process_health(method, metadata)?,
264			RpcHandlerState::WaitingForResponse(mut waiting) => match waiting.poll() {
265				Ok(Async::Ready(response)) => RpcPollState::Ready(RpcHandlerState::Writing(response)),
266				Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::WaitingForResponse(waiting)),
267				Err(e) => RpcPollState::Ready(RpcHandlerState::Writing(Response::internal_error(format!("{:?}", e)))),
268			},
269			RpcHandlerState::Waiting(mut waiting) => {
270				match waiting.poll() {
271					Ok(Async::Ready(response)) => {
272						RpcPollState::Ready(RpcHandlerState::Writing(match response {
273							// Notification, just return empty response.
274							None => Response::ok(String::new()),
275							// Add new line to have nice output when using CLI clients (curl)
276							Some(result) => Response::ok(format!("{}\n", result)),
277						}))
278					}
279					Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::Waiting(waiting)),
280					Err(e) => {
281						RpcPollState::Ready(RpcHandlerState::Writing(Response::internal_error(format!("{:?}", e))))
282					}
283				}
284			}
285			state => RpcPollState::NotReady(state),
286		};
287
288		let (new_state, is_ready) = new_state.decompose();
289		match new_state {
290			RpcHandlerState::Writing(res) => {
291				let mut response: hyper::Response<Body> = res.into();
292				let cors_allow_origin = mem::replace(&mut self.cors_allow_origin, cors::AllowCors::Invalid);
293				let cors_allow_headers = mem::replace(&mut self.cors_allow_headers, cors::AllowCors::Invalid);
294
295				Self::set_response_headers(
296					response.headers_mut(),
297					self.is_options,
298					self.cors_max_age,
299					cors_allow_origin.into(),
300					cors_allow_headers.into(),
301					self.keep_alive,
302				);
303				Ok(Async::Ready(response))
304			}
305			state => {
306				self.state = state;
307				if is_ready {
308					self.poll()
309				} else {
310					Ok(Async::NotReady)
311				}
312			}
313		}
314	}
315}
316
317// Intermediate and internal error type to better distinguish
318// error cases occurring during request body processing.
319enum BodyError {
320	Hyper(hyper::Error),
321	Utf8(str::Utf8Error),
322	TooLarge,
323}
324
325impl From<hyper::Error> for BodyError {
326	fn from(e: hyper::Error) -> BodyError {
327		BodyError::Hyper(e)
328	}
329}
330
331impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S>
332where
333	S::Future: Unpin,
334	S::CallFuture: Unpin,
335{
336	fn read_headers(&self, request: hyper::Request<Body>, continue_on_invalid_cors: bool) -> RpcHandlerState<M> {
337		if self.cors_allow_origin == cors::AllowCors::Invalid && !continue_on_invalid_cors {
338			return RpcHandlerState::Writing(Response::invalid_allow_origin());
339		}
340
341		if self.cors_allow_headers == cors::AllowCors::Invalid && !continue_on_invalid_cors {
342			return RpcHandlerState::Writing(Response::invalid_allow_headers());
343		}
344
345		// Read metadata
346		let handler = match self.jsonrpc_handler.upgrade() {
347			Some(handler) => handler,
348			None => return RpcHandlerState::Writing(Response::closing()),
349		};
350		let metadata = handler.extractor.read_metadata(&request);
351
352		// Proceed
353		match *request.method() {
354			// Validate the ContentType header
355			// to prevent Cross-Origin XHRs with text/plain
356			Method::POST if Self::is_json(request.headers().get("content-type")) => {
357				let uri = if self.rest_api != RestApi::Disabled {
358					Some(request.uri().clone())
359				} else {
360					None
361				};
362				RpcHandlerState::ReadingBody {
363					metadata,
364					request: Default::default(),
365					uri,
366					body: request.into_body(),
367				}
368			}
369			Method::POST if self.rest_api == RestApi::Unsecure && request.uri().path().split('/').count() > 2 => {
370				RpcHandlerState::ProcessRest {
371					metadata,
372					uri: request.uri().clone(),
373				}
374			}
375			// Just return error for unsupported content type
376			Method::POST => RpcHandlerState::Writing(Response::unsupported_content_type()),
377			// Don't validate content type on options
378			Method::OPTIONS => RpcHandlerState::Writing(Response::empty()),
379			// Respond to health API request if there is one configured.
380			Method::GET if self.health_api.as_ref().map(|x| &*x.0) == Some(request.uri().path()) => {
381				RpcHandlerState::ProcessHealth {
382					metadata,
383					method: self
384						.health_api
385						.as_ref()
386						.map(|x| x.1.clone())
387						.expect("Health api is defined since the URI matched."),
388				}
389			}
390			// Disallow other methods.
391			_ => RpcHandlerState::Writing(Response::method_not_allowed()),
392		}
393	}
394
395	fn process_health(&self, method: String, metadata: M) -> Result<RpcPollState<M>, hyper::Error> {
396		use self::core::types::{Call, Failure, Id, MethodCall, Output, Params, Request, Success, Version};
397		use futures03::{FutureExt, TryFutureExt};
398
399		// Create a request
400		let call = Request::Single(Call::MethodCall(MethodCall {
401			jsonrpc: Some(Version::V2),
402			method,
403			params: Params::None,
404			id: Id::Num(1),
405		}));
406
407		let response = match self.jsonrpc_handler.upgrade() {
408			Some(h) => h.handler.handle_rpc_request(call, metadata),
409			None => return Ok(RpcPollState::Ready(RpcHandlerState::Writing(Response::closing()))),
410		};
411		let response = response.map(Ok).compat();
412
413		Ok(RpcPollState::Ready(RpcHandlerState::WaitingForResponse(Box::new(
414			response.map(|res| match res {
415				Some(core::Response::Single(Output::Success(Success { result, .. }))) => {
416					let result = serde_json::to_string(&result).expect("Serialization of result is infallible;qed");
417
418					Response::ok(result)
419				}
420				Some(core::Response::Single(Output::Failure(Failure { error, .. }))) => {
421					let result = serde_json::to_string(&error).expect("Serialization of error is infallible;qed");
422
423					Response::service_unavailable(result)
424				}
425				e => Response::internal_error(format!("Invalid response for health request: {:?}", e)),
426			}),
427		))))
428	}
429
430	fn process_rest(&self, uri: hyper::Uri, metadata: M) -> Result<RpcPollState<M>, hyper::Error> {
431		use self::core::types::{Call, Id, MethodCall, Params, Request, Value, Version};
432		use futures03::{FutureExt, TryFutureExt};
433
434		// skip the initial /
435		let mut it = uri.path().split('/').skip(1);
436
437		// parse method & params
438		let method = it.next().unwrap_or("");
439		let mut params = Vec::new();
440		for param in it {
441			let v = serde_json::from_str(param)
442				.or_else(|_| serde_json::from_str(&format!("\"{}\"", param)))
443				.unwrap_or(Value::Null);
444			params.push(v)
445		}
446
447		// Create a request
448		let call = Request::Single(Call::MethodCall(MethodCall {
449			jsonrpc: Some(Version::V2),
450			method: method.into(),
451			params: Params::Array(params),
452			id: Id::Num(1),
453		}));
454
455		let response = match self.jsonrpc_handler.upgrade() {
456			Some(h) => h.handler.handle_rpc_request(call, metadata),
457			None => return Ok(RpcPollState::Ready(RpcHandlerState::Writing(Response::closing()))),
458		};
459		let response = response.map(Ok).compat();
460
461		Ok(RpcPollState::Ready(RpcHandlerState::Waiting(Box::new(response.map(
462			|res| res.map(|x| serde_json::to_string(&x).expect("Serialization of response is infallible;qed")),
463		)))))
464	}
465
466	fn process_body(
467		&self,
468		mut body: hyper::Body,
469		mut request: Vec<u8>,
470		uri: Option<hyper::Uri>,
471		metadata: M,
472	) -> Result<RpcPollState<M>, BodyError> {
473		loop {
474			match body.poll()? {
475				Async::Ready(Some(chunk)) => {
476					if request
477						.len()
478						.checked_add(chunk.len())
479						.map(|n| n > self.max_request_body_size)
480						.unwrap_or(true)
481					{
482						return Err(BodyError::TooLarge);
483					}
484					request.extend_from_slice(&*chunk)
485				}
486				Async::Ready(None) => {
487					use futures03::{FutureExt, TryFutureExt};
488					if let (Some(uri), true) = (uri, request.is_empty()) {
489						return Ok(RpcPollState::Ready(RpcHandlerState::ProcessRest { uri, metadata }));
490					}
491
492					let content = match str::from_utf8(&request) {
493						Ok(content) => content,
494						Err(err) => {
495							// Return utf error.
496							return Err(BodyError::Utf8(err));
497						}
498					};
499
500					let response = match self.jsonrpc_handler.upgrade() {
501						Some(h) => h.handler.handle_request(content, metadata),
502						None => return Ok(RpcPollState::Ready(RpcHandlerState::Writing(Response::closing()))),
503					};
504
505					// Content is ready
506					let response = response.map(Ok).compat();
507					return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(Box::new(response))));
508				}
509				Async::NotReady => {
510					return Ok(RpcPollState::NotReady(RpcHandlerState::ReadingBody {
511						body,
512						request,
513						metadata,
514						uri,
515					}));
516				}
517			}
518		}
519	}
520
521	fn set_response_headers(
522		headers: &mut HeaderMap,
523		is_options: bool,
524		cors_max_age: Option<u32>,
525		cors_allow_origin: Option<HeaderValue>,
526		cors_allow_headers: Option<Vec<HeaderValue>>,
527		keep_alive: bool,
528	) {
529		let as_header = |m: Method| m.as_str().parse().expect("`Method` will always parse; qed");
530		let concat = |headers: &[HeaderValue]| {
531			let separator = b", ";
532			let val = headers
533				.iter()
534				.flat_map(|h| h.as_bytes().iter().chain(separator.iter()))
535				.cloned()
536				.collect::<Vec<_>>();
537			let max_len = if val.is_empty() { 0 } else { val.len() - 2 };
538			HeaderValue::from_bytes(&val[..max_len])
539				.expect("Concatenation of valid headers with `, ` is still valid; qed")
540		};
541
542		let allowed = concat(&[as_header(Method::OPTIONS), as_header(Method::POST)]);
543
544		if is_options {
545			headers.append(header::ALLOW, allowed.clone());
546			headers.append(header::ACCEPT, HeaderValue::from_static("application/json"));
547			headers.append(header::ACCEPT, HeaderValue::from_static("text/json"));
548		}
549
550		if let Some(cors_allow_origin) = cors_allow_origin {
551			headers.append(header::VARY, HeaderValue::from_static("origin"));
552			headers.append(header::ACCESS_CONTROL_ALLOW_METHODS, allowed);
553			headers.append(header::ACCESS_CONTROL_ALLOW_ORIGIN, cors_allow_origin);
554
555			if let Some(cma) = cors_max_age {
556				headers.append(
557					header::ACCESS_CONTROL_MAX_AGE,
558					HeaderValue::from_str(&cma.to_string()).expect("`u32` will always parse; qed"),
559				);
560			}
561
562			if let Some(cors_allow_headers) = cors_allow_headers {
563				if !cors_allow_headers.is_empty() {
564					headers.append(header::ACCESS_CONTROL_ALLOW_HEADERS, concat(&cors_allow_headers));
565				}
566			}
567		}
568
569		if !keep_alive {
570			headers.append(header::CONNECTION, HeaderValue::from_static("close"));
571		}
572	}
573
574	/// Returns true if the `content_type` header indicates a valid JSON
575	/// message.
576	fn is_json(content_type: Option<&header::HeaderValue>) -> bool {
577		match content_type.and_then(|val| val.to_str().ok()) {
578			Some(ref content)
579				if content.eq_ignore_ascii_case("application/json")
580					|| content.eq_ignore_ascii_case("application/json; charset=utf-8")
581					|| content.eq_ignore_ascii_case("application/json;charset=utf-8")
582					|| content.eq_ignore_ascii_case("text/json")
583					|| content.eq_ignore_ascii_case("text/json; charset=utf-8")
584					|| content.eq_ignore_ascii_case("text/json;charset=utf-8") =>
585			{
586				true
587			}
588			_ => false,
589		}
590	}
591}
592
593#[cfg(test)]
594mod test {
595	use super::{hyper, RpcHandler};
596	use jsonrpc_core::middleware::Noop;
597
598	#[test]
599	fn test_case_insensitive_content_type() {
600		let request = hyper::Request::builder()
601			.header("content-type", "Application/Json; charset=UTF-8")
602			.body(())
603			.unwrap();
604
605		let request2 = hyper::Request::builder()
606			.header("content-type", "Application/Json;charset=UTF-8")
607			.body(())
608			.unwrap();
609
610		assert_eq!(
611			request.headers().get("content-type").unwrap(),
612			&"Application/Json; charset=UTF-8"
613		);
614
615		assert_eq!(
616			RpcHandler::<(), Noop>::is_json(request.headers().get("content-type")),
617			true
618		);
619		assert_eq!(
620			RpcHandler::<(), Noop>::is_json(request2.headers().get("content-type")),
621			true
622		);
623	}
624}