jsonrpc_http_server/
handler.rs

1use Rpc;
2
3use std::{fmt, mem};
4use std::sync::Arc;
5
6use hyper::{self, mime, server, Method};
7use hyper::header::{self, Headers};
8use unicase::Ascii;
9
10use jsonrpc::{Metadata, Middleware, NoopMiddleware};
11use jsonrpc::futures::{Future, Poll, Async, BoxFuture, Stream};
12use response::Response;
13use server_utils::cors;
14
15use {utils, RequestMiddleware, RequestMiddlewareAction, CorsDomains, AllowedHosts};
16
17const APPLICATION_JSON_UTF_8: &str = "application/json; charset=utf-8";
18
19/// jsonrpc http request handler.
20pub struct ServerHandler<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
21	jsonrpc_handler: Rpc<M, S>,
22	allowed_hosts: AllowedHosts,
23	cors_domains: CorsDomains,
24	middleware: Arc<RequestMiddleware>,
25}
26
27impl<M: Metadata, S: Middleware<M>> ServerHandler<M, S> {
28	/// Create new request handler.
29	pub fn new(
30		jsonrpc_handler: Rpc<M, S>,
31		cors_domains: CorsDomains,
32		allowed_hosts: AllowedHosts,
33		middleware: Arc<RequestMiddleware>,
34	) -> Self {
35		ServerHandler {
36			jsonrpc_handler: jsonrpc_handler,
37			allowed_hosts: allowed_hosts,
38			cors_domains: cors_domains,
39			middleware: middleware,
40		}
41	}
42}
43
44impl<M: Metadata, S: Middleware<M>> server::Service for ServerHandler<M, S> {
45	type Request = server::Request;
46	type Response = server::Response;
47	type Error = hyper::Error;
48	type Future = Handler<M, S>;
49
50	fn call(&self, request: Self::Request) -> Self::Future {
51		let action = self.middleware.on_request(&request);
52
53		let (should_validate_hosts, should_continue_on_invalid_cors, handler) = match action {
54			RequestMiddlewareAction::Proceed { should_continue_on_invalid_cors }=> (
55				true, should_continue_on_invalid_cors, None
56			),
57			RequestMiddlewareAction::Respond { should_validate_hosts, handler } => (
58				should_validate_hosts, false, Some(handler)
59			),
60		};
61
62		// Validate host
63		if should_validate_hosts && !utils::is_host_allowed(&request, &self.allowed_hosts) {
64			return Handler::Error(Some(Response::host_not_allowed()));
65		}
66
67		// Replace handler with the one returned by middleware.
68		if let Some(handler) = handler {
69			return Handler::Middleware(handler);
70		}
71
72		Handler::Rpc(RpcHandler {
73			jsonrpc_handler: self.jsonrpc_handler.clone(),
74			state: RpcHandlerState::ReadingHeaders {
75				request: request,
76				cors_domains: self.cors_domains.clone(),
77				continue_on_invalid_cors: should_continue_on_invalid_cors,
78			},
79			is_options: false,
80			cors_header: cors::CorsHeader::NotRequired,
81		})
82	}
83}
84
85pub enum Handler<M: Metadata, S: Middleware<M>> {
86	Rpc(RpcHandler<M, S>),
87	Error(Option<Response>),
88	Middleware(BoxFuture<server::Response, hyper::Error>),
89}
90
91impl<M: Metadata, S: Middleware<M>> Future for Handler<M, S> {
92	type Item = server::Response;
93	type Error = hyper::Error;
94
95	fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
96		match *self {
97			Handler::Rpc(ref mut handler) => handler.poll(),
98			Handler::Middleware(ref mut middleware) => middleware.poll(),
99			Handler::Error(ref mut response) => Ok(Async::Ready(
100				response.take().expect("Response always Some initialy. Returning `Ready` so will never be polled again; qed").into()
101			)),
102		}
103	}
104}
105
106enum RpcPollState<M> {
107	Ready(RpcHandlerState<M>),
108	NotReady(RpcHandlerState<M>),
109}
110
111impl<M> RpcPollState<M> {
112	fn decompose(self) -> (RpcHandlerState<M>, bool) {
113		use self::RpcPollState::*;
114		match self {
115			Ready(handler) => (handler, true),
116			NotReady(handler) => (handler, false),
117		}
118	}
119}
120
121enum RpcHandlerState<M> {
122	ReadingHeaders {
123		request: server::Request,
124		cors_domains: CorsDomains,
125		continue_on_invalid_cors: bool,
126	},
127	ReadingBody {
128		body: hyper::Body,
129		request: Vec<u8>,
130		metadata: M,
131	},
132	Writing(Response),
133	Waiting(BoxFuture<Option<String>, ()>),
134	Done,
135}
136
137impl<M> fmt::Debug for RpcHandlerState<M> {
138	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
139		use self::RpcHandlerState::*;
140
141		match *self {
142			ReadingHeaders {..} => write!(fmt, "ReadingHeaders"),
143			ReadingBody {..} => write!(fmt, "ReadingBody"),
144			Writing(ref res) => write!(fmt, "Writing({:?})", res),
145			Waiting(_) => write!(fmt, "Waiting"),
146			Done => write!(fmt, "Done"),
147		}
148	}
149}
150
151pub struct RpcHandler<M: Metadata, S: Middleware<M>> {
152	jsonrpc_handler: Rpc<M, S>,
153	state: RpcHandlerState<M>,
154	is_options: bool,
155	cors_header: cors::CorsHeader<header::AccessControlAllowOrigin>,
156}
157
158impl<M: Metadata, S: Middleware<M>> Future for RpcHandler<M, S> {
159	type Item = server::Response;
160	type Error = hyper::Error;
161
162	fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
163		let new_state = match mem::replace(&mut self.state, RpcHandlerState::Done) {
164			RpcHandlerState::ReadingHeaders { request, cors_domains, continue_on_invalid_cors, } => {
165				// Read cors header
166				self.cors_header = utils::cors_header(&request, &cors_domains);
167				self.is_options = *request.method() == Method::Options;
168				// Read other headers
169				RpcPollState::Ready(self.read_headers(request, continue_on_invalid_cors))
170			},
171			RpcHandlerState::ReadingBody { body, request, metadata, } => {
172				self.process_body(body, request, metadata)?
173			},
174			RpcHandlerState::Waiting(mut waiting) => {
175				match waiting.poll() {
176					Ok(Async::Ready(response)) => {
177						RpcPollState::Ready(RpcHandlerState::Writing(match response {
178							// Notification, just return empty response.
179							None => Response::ok(String::new()),
180							// Add new line to have nice output when using CLI clients (curl)
181							Some(result) => Response::ok(format!("{}\n", result)),
182						}.into()))
183					},
184					Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::Waiting(waiting)),
185					Err(_) => RpcPollState::Ready(RpcHandlerState::Writing(Response::internal_error())),
186				}
187			},
188			state => RpcPollState::NotReady(state),
189		};
190
191		let (new_state, is_ready) = new_state.decompose();
192		match new_state {
193			RpcHandlerState::Writing(res) => {
194				let mut response: server::Response = res.into();
195				let cors_header = mem::replace(&mut self.cors_header, cors::CorsHeader::Invalid);
196				Self::set_response_headers(response.headers_mut(), self.is_options, cors_header.into());
197				Ok(Async::Ready(response))
198			},
199			state => {
200				self.state = state;
201				if is_ready {
202					self.poll()
203				} else {
204					Ok(Async::NotReady)
205				}
206			},
207		}
208	}
209}
210
211impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S> {
212	fn read_headers(
213		&self,
214		request: server::Request,
215		continue_on_invalid_cors: bool,
216	) -> RpcHandlerState<M> {
217		if self.cors_header == cors::CorsHeader::Invalid && !continue_on_invalid_cors {
218			return RpcHandlerState::Writing(Response::invalid_cors());
219		}
220		// Read metadata
221		let metadata = self.jsonrpc_handler.extractor.read_metadata(&request);
222
223		// Proceed
224		match *request.method() {
225			// Validate the ContentType header
226			// to prevent Cross-Origin XHRs with text/plain
227			Method::Post if Self::is_json(request.headers().get::<header::ContentType>()) => {
228				RpcHandlerState::ReadingBody {
229					metadata: metadata,
230					request: Default::default(),
231					body: request.body(),
232				}
233			},
234			// Just return error for unsupported content type
235			Method::Post => {
236				RpcHandlerState::Writing(Response::unsupported_content_type())
237			},
238			// Don't validate content type on options
239			Method::Options => {
240				RpcHandlerState::Writing(Response::empty())
241			},
242			// Disallow other methods.
243			_ => {
244				RpcHandlerState::Writing(Response::method_not_allowed())
245			},
246		}
247	}
248
249	fn process_body(
250		&self,
251		mut body: hyper::Body,
252		mut request: Vec<u8>,
253		metadata: M,
254	) -> Result<RpcPollState<M>, hyper::Error> {
255		loop {
256			match body.poll()? {
257				// TODO [ToDr] reject too large requests?
258				Async::Ready(Some(chunk)) => {
259					request.extend_from_slice(&*chunk)
260				},
261				Async::Ready(None) => {
262					let content = match ::std::str::from_utf8(&request) {
263						Ok(content) => content,
264						Err(err) => {
265							// Return utf error.
266							return Err(hyper::Error::Utf8(err));
267						},
268					};
269
270					// Content is ready
271					return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(
272						self.jsonrpc_handler.handler.handle_request(content, metadata)
273					)));
274				},
275				Async::NotReady => {
276					return Ok(RpcPollState::NotReady(RpcHandlerState::ReadingBody {
277						body: body,
278						request: request,
279						metadata: metadata
280					}));
281				},
282			}
283		}
284	}
285
286	fn set_response_headers(headers: &mut Headers, is_options: bool, cors_header: Option<header::AccessControlAllowOrigin>) {
287		if is_options {
288			headers.set(header::Allow(vec![
289				Method::Options,
290				Method::Post,
291			]));
292			headers.set(header::Accept(vec![
293				header::qitem(mime::APPLICATION_JSON)
294			]));
295		}
296
297		if let Some(cors_domain) = cors_header {
298			headers.set(header::AccessControlAllowMethods(vec![
299				Method::Options,
300				Method::Post
301			]));
302			headers.set(header::AccessControlAllowHeaders(vec![
303				Ascii::new("origin".to_owned()),
304				Ascii::new("content-type".to_owned()),
305				Ascii::new("accept".to_owned()),
306			]));
307			headers.set(cors_domain);
308			headers.set(header::Vary::Items(vec![
309				Ascii::new("origin".to_owned())
310			]));
311		}
312	}
313
314	fn is_json(content_type: Option<&header::ContentType>) -> bool {
315		match content_type {
316			Some(&header::ContentType(ref mime))
317                if *mime == mime::APPLICATION_JSON || *mime == APPLICATION_JSON_UTF_8 => true,
318			_ => false
319		}
320	}
321}