rs_jsonrpc_http_server/
handler.rs

1use Rpc;
2
3use std::{fmt, mem};
4use std::sync::Arc;
5
6use hyper::{self, mime, server, Method};
7use hyper::header::{self, Headers};
8use unicase::Ascii;
9
10use jsonrpc::{self as core, FutureResult, Metadata, Middleware, NoopMiddleware};
11use jsonrpc::futures::{Future, Poll, Async, Stream, future};
12use jsonrpc::serde_json;
13use response::Response;
14use server_utils::cors;
15
16use {utils, RequestMiddleware, RequestMiddlewareAction, CorsDomains, AllowedHosts, RestApi};
17
18/// jsonrpc http request handler.
19pub struct ServerHandler<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
20	rs_jsonrpc_handler: Rpc<M, S>,
21	allowed_hosts: AllowedHosts,
22	cors_domains: CorsDomains,
23	middleware: Arc<RequestMiddleware>,
24	rest_api: RestApi,
25}
26
27impl<M: Metadata, S: Middleware<M>> ServerHandler<M, S> {
28	/// Create new request handler.
29	pub fn new(
30		rs_jsonrpc_handler: Rpc<M, S>,
31		cors_domains: CorsDomains,
32		allowed_hosts: AllowedHosts,
33		middleware: Arc<RequestMiddleware>,
34		rest_api: RestApi,
35	) -> Self {
36		ServerHandler {
37			rs_jsonrpc_handler,
38			allowed_hosts,
39			cors_domains,
40			middleware,
41			rest_api,
42		}
43	}
44}
45
46impl<M: Metadata, S: Middleware<M>> server::Service for ServerHandler<M, S> {
47	type Request = server::Request;
48	type Response = server::Response;
49	type Error = hyper::Error;
50	type Future = Handler<M, S>;
51
52	fn call(&self, request: Self::Request) -> Self::Future {
53		let is_host_allowed = utils::is_host_allowed(&request, &self.allowed_hosts);
54		let action = self.middleware.on_request(request);
55
56		let (should_validate_hosts, should_continue_on_invalid_cors, response) = match action {
57			RequestMiddlewareAction::Proceed { should_continue_on_invalid_cors, request }=> (
58				true, should_continue_on_invalid_cors, Err(request)
59			),
60			RequestMiddlewareAction::Respond { should_validate_hosts, response } => (
61				should_validate_hosts, false, Ok(response)
62			),
63		};
64
65		// Validate host
66		if should_validate_hosts && !is_host_allowed {
67			return Handler::Error(Some(Response::host_not_allowed()));
68		}
69
70		// Replace response with the one returned by middleware.
71		match response {
72			Ok(response) => Handler::Middleware(response),
73			Err(request) => {
74				Handler::Rpc(RpcHandler {
75					rs_jsonrpc_handler: self.rs_jsonrpc_handler.clone(),
76					state: RpcHandlerState::ReadingHeaders {
77						request: request,
78						cors_domains: self.cors_domains.clone(),
79						continue_on_invalid_cors: should_continue_on_invalid_cors,
80					},
81					is_options: false,
82					cors_header: cors::CorsHeader::NotRequired,
83					rest_api: self.rest_api,
84				})
85			}
86		}
87	}
88}
89
90pub enum Handler<M: Metadata, S: Middleware<M>> {
91	Rpc(RpcHandler<M, S>),
92	Error(Option<Response>),
93	Middleware(Box<Future<Item=server::Response, Error=hyper::Error> + Send>),
94}
95
96impl<M: Metadata, S: Middleware<M>> Future for Handler<M, S> {
97	type Item = server::Response;
98	type Error = hyper::Error;
99
100	fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
101		match *self {
102			Handler::Rpc(ref mut handler) => handler.poll(),
103			Handler::Middleware(ref mut middleware) => middleware.poll(),
104			Handler::Error(ref mut response) => Ok(Async::Ready(
105				response.take().expect("Response always Some initialy. Returning `Ready` so will never be polled again; qed").into()
106			)),
107		}
108	}
109}
110
111enum RpcPollState<M, F> where
112	F: Future<Item = Option<core::Response>, Error = ()>,
113{
114	Ready(RpcHandlerState<M, F>),
115	NotReady(RpcHandlerState<M, F>),
116}
117
118impl<M, F> RpcPollState<M, F> where
119	F: Future<Item = Option<core::Response>, Error = ()>,
120{
121	fn decompose(self) -> (RpcHandlerState<M, F>, bool) {
122		use self::RpcPollState::*;
123		match self {
124			Ready(handler) => (handler, true),
125			NotReady(handler) => (handler, false),
126		}
127	}
128}
129
130enum RpcHandlerState<M, F> where
131	F: Future<Item = Option<core::Response>, Error = ()>,
132{
133	ReadingHeaders {
134		request: server::Request,
135		cors_domains: CorsDomains,
136		continue_on_invalid_cors: bool,
137	},
138	ReadingBody {
139		body: hyper::Body,
140		uri: Option<hyper::Uri>,
141		request: Vec<u8>,
142		metadata: M,
143	},
144	ProcessRest {
145		uri: hyper::Uri,
146		metadata: M,
147	},
148	Writing(Response),
149	Waiting(FutureResult<F>),
150	Done,
151}
152
153impl<M, F> fmt::Debug for RpcHandlerState<M, F> where
154	F: Future<Item = Option<core::Response>, Error = ()>,
155{
156	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
157		use self::RpcHandlerState::*;
158
159		match *self {
160			ReadingHeaders {..} => write!(fmt, "ReadingHeaders"),
161			ReadingBody {..} => write!(fmt, "ReadingBody"),
162			ProcessRest {..} => write!(fmt, "ProcessRest"),
163			Writing(ref res) => write!(fmt, "Writing({:?})", res),
164			Waiting(_) => write!(fmt, "Waiting"),
165			Done => write!(fmt, "Done"),
166		}
167	}
168}
169
170pub struct RpcHandler<M: Metadata, S: Middleware<M>> {
171	rs_jsonrpc_handler: Rpc<M, S>,
172	state: RpcHandlerState<M, S::Future>,
173	is_options: bool,
174	cors_header: cors::CorsHeader<header::AccessControlAllowOrigin>,
175	rest_api: RestApi,
176}
177
178impl<M: Metadata, S: Middleware<M>> Future for RpcHandler<M, S> {
179	type Item = server::Response;
180	type Error = hyper::Error;
181
182	fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
183		let new_state = match mem::replace(&mut self.state, RpcHandlerState::Done) {
184			RpcHandlerState::ReadingHeaders { request, cors_domains, continue_on_invalid_cors, } => {
185				// Read cors header
186				self.cors_header = utils::cors_header(&request, &cors_domains);
187				self.is_options = *request.method() == Method::Options;
188				// Read other headers
189				RpcPollState::Ready(self.read_headers(request, continue_on_invalid_cors))
190			},
191			RpcHandlerState::ReadingBody { body, request, metadata, uri, } => {
192				self.process_body(body, request, uri, metadata)?
193			},
194			RpcHandlerState::ProcessRest { uri, metadata } => {
195				self.process_rest(uri, metadata)?
196			},
197			RpcHandlerState::Waiting(mut waiting) => {
198				match waiting.poll() {
199					Ok(Async::Ready(response)) => {
200						RpcPollState::Ready(RpcHandlerState::Writing(match response {
201							// Notification, just return empty response.
202							None => Response::ok(String::new()),
203							// Add new line to have nice output when using CLI clients (curl)
204							Some(result) => Response::ok(format!("{}\n", result)),
205						}.into()))
206					},
207					Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::Waiting(waiting)),
208					Err(_) => RpcPollState::Ready(RpcHandlerState::Writing(Response::internal_error())),
209				}
210			},
211			state => RpcPollState::NotReady(state),
212		};
213
214		let (new_state, is_ready) = new_state.decompose();
215		match new_state {
216			RpcHandlerState::Writing(res) => {
217				let mut response: server::Response = res.into();
218				let cors_header = mem::replace(&mut self.cors_header, cors::CorsHeader::Invalid);
219				Self::set_response_headers(response.headers_mut(), self.is_options, cors_header.into());
220				Ok(Async::Ready(response))
221			},
222			state => {
223				self.state = state;
224				if is_ready {
225					self.poll()
226				} else {
227					Ok(Async::NotReady)
228				}
229			},
230		}
231	}
232}
233
234impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S> {
235	fn read_headers(
236		&self,
237		request: server::Request,
238		continue_on_invalid_cors: bool,
239	) -> RpcHandlerState<M, S::Future> {
240		if self.cors_header == cors::CorsHeader::Invalid && !continue_on_invalid_cors {
241			return RpcHandlerState::Writing(Response::invalid_cors());
242		}
243		// Read metadata
244		let metadata = self.rs_jsonrpc_handler.extractor.read_metadata(&request);
245
246		// Proceed
247		match *request.method() {
248			// Validate the ContentType header
249			// to prevent Cross-Origin XHRs with text/plain
250			Method::Post if Self::is_json(request.headers().get::<header::ContentType>()) => {
251				let uri = if self.rest_api != RestApi::Disabled { Some(request.uri().clone()) } else { None };
252				RpcHandlerState::ReadingBody {
253					metadata,
254					request: Default::default(),
255					uri,
256					body: request.body(),
257				}
258			},
259			Method::Post if self.rest_api == RestApi::Unsecure => {
260				RpcHandlerState::ProcessRest {
261					metadata,
262					uri: request.uri().clone(),
263				}
264			},
265			// Just return error for unsupported content type
266			Method::Post => {
267				RpcHandlerState::Writing(Response::unsupported_content_type())
268			},
269			// Don't validate content type on options
270			Method::Options => {
271				RpcHandlerState::Writing(Response::empty())
272			},
273			// Disallow other methods.
274			_ => {
275				RpcHandlerState::Writing(Response::method_not_allowed())
276			},
277		}
278	}
279
280	fn process_rest(
281		&self,
282		uri: hyper::Uri,
283		metadata: M,
284	) -> Result<RpcPollState<M, S::Future>, hyper::Error> {
285		use self::core::types::{Call, MethodCall, Version, Params, Request, Id, Value};
286
287		// skip the initial /
288		let mut it = uri.path().split('/').skip(1);
289
290		// parse method & params
291		let method = it.next().unwrap_or("");
292		let mut params = Vec::new();
293		for param in it {
294			let v = serde_json::from_str(param)
295				.or_else(|_| serde_json::from_str(&format!("\"{}\"", param)))
296				.unwrap_or(Value::Null);
297			params.push(v)
298		}
299
300		// Parse request
301		let call = Request::Single(Call::MethodCall(MethodCall {
302			jsonrpc: Some(Version::V2),
303			method: method.into(),
304			params: Some(Params::Array(params)),
305			id: Id::Num(1),
306		}));
307
308		return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(
309			future::Either::B(self.rs_jsonrpc_handler.handler.handle_rpc_request(call, metadata))
310				.map(|res| res.map(|x| serde_json::to_string(&x)
311					.expect("Serialization of response is infallible;qed")
312				))
313		)));
314	}
315
316	fn process_body(
317		&self,
318		mut body: hyper::Body,
319		mut request: Vec<u8>,
320		uri: Option<hyper::Uri>,
321		metadata: M,
322	) -> Result<RpcPollState<M, S::Future>, hyper::Error> {
323		loop {
324			match body.poll()? {
325				// TODO [ToDr] reject too large requests?
326				Async::Ready(Some(chunk)) => {
327					request.extend_from_slice(&*chunk)
328				},
329				Async::Ready(None) => {
330					if let (Some(uri), true) = (uri, request.is_empty()) {
331						return Ok(RpcPollState::Ready(RpcHandlerState::ProcessRest {
332							uri,
333							metadata,
334						}));
335					}
336
337					let content = match ::std::str::from_utf8(&request) {
338						Ok(content) => content,
339						Err(err) => {
340							// Return utf error.
341							return Err(hyper::Error::Utf8(err));
342						},
343					};
344
345					// Content is ready
346					return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(
347						self.rs_jsonrpc_handler.handler.handle_request(content, metadata)
348					)));
349				},
350				Async::NotReady => {
351					return Ok(RpcPollState::NotReady(RpcHandlerState::ReadingBody {
352						body,
353						request,
354						metadata,
355						uri,
356					}));
357				},
358			}
359		}
360	}
361
362	fn set_response_headers(headers: &mut Headers, is_options: bool, cors_header: Option<header::AccessControlAllowOrigin>) {
363		if is_options {
364			headers.set(header::Allow(vec![
365				Method::Options,
366				Method::Post,
367			]));
368			headers.set(header::Accept(vec![
369				header::qitem(mime::APPLICATION_JSON)
370			]));
371		}
372
373		if let Some(cors_domain) = cors_header {
374			headers.set(header::AccessControlAllowMethods(vec![
375				Method::Options,
376				Method::Post
377			]));
378			headers.set(header::AccessControlAllowHeaders(vec![
379				Ascii::new("origin".to_owned()),
380				Ascii::new("content-type".to_owned()),
381				Ascii::new("accept".to_owned()),
382			]));
383			headers.set(cors_domain);
384			headers.set(header::Vary::Items(vec![
385				Ascii::new("origin".to_owned())
386			]));
387		}
388	}
389
390	fn is_json(content_type: Option<&header::ContentType>) -> bool {
391		const APPLICATION_JSON_UTF_8: &str = "application/json; charset=utf-8";
392
393		match content_type {
394			Some(&header::ContentType(ref mime))
395				if *mime == mime::APPLICATION_JSON || *mime == APPLICATION_JSON_UTF_8 => true,
396			_ => false
397		}
398	}
399}