1use Rpc;
2
3use std::{fmt, mem, str};
4use std::sync::Arc;
5
6use hyper::{self, service::Service, Body, Method};
7use hyper::header::{self, HeaderMap, HeaderValue};
8
9use jsonrpc::{self as core, FutureResult, Metadata, Middleware, NoopMiddleware, FutureRpcResult};
10use jsonrpc::futures::{Future, Poll, Async, Stream, future};
11use jsonrpc::serde_json;
12use response::Response;
13use server_utils::cors;
14
15use {utils, RequestMiddleware, RequestMiddlewareAction, CorsDomains, AllowedHosts, RestApi};
16
17pub struct ServerHandler<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
19 jsonrpc_handler: Rpc<M, S>,
20 allowed_hosts: AllowedHosts,
21 cors_domains: CorsDomains,
22 cors_max_age: Option<u32>,
23 cors_allowed_headers: cors::AccessControlAllowHeaders,
24 middleware: Arc<RequestMiddleware>,
25 rest_api: RestApi,
26 health_api: Option<(String, String)>,
27 max_request_body_size: usize,
28}
29
30impl<M: Metadata, S: Middleware<M>> ServerHandler<M, S> {
31 pub fn new(
33 jsonrpc_handler: Rpc<M, S>,
34 cors_domains: CorsDomains,
35 cors_max_age: Option<u32>,
36 cors_allowed_headers: cors::AccessControlAllowHeaders,
37 allowed_hosts: AllowedHosts,
38 middleware: Arc<RequestMiddleware>,
39 rest_api: RestApi,
40 health_api: Option<(String, String)>,
41 max_request_body_size: usize,
42 ) -> Self {
43 ServerHandler {
44 jsonrpc_handler,
45 allowed_hosts,
46 cors_domains,
47 cors_max_age,
48 cors_allowed_headers,
49 middleware,
50 rest_api,
51 health_api,
52 max_request_body_size,
53 }
54 }
55}
56
57impl<M: Metadata, S: Middleware<M>> Service for ServerHandler<M, S> {
58 type ReqBody = Body;
59 type ResBody = Body;
60 type Error = hyper::Error;
61 type Future = Handler<M, S>;
62
63 fn call(&mut self, request: hyper::Request<Self::ReqBody>) -> Self::Future {
64 let is_host_allowed = utils::is_host_allowed(&request, &self.allowed_hosts);
65 let action = self.middleware.on_request(request);
66
67 let (should_validate_hosts, should_continue_on_invalid_cors, response) = match action {
68 RequestMiddlewareAction::Proceed { should_continue_on_invalid_cors, request }=> (
69 true, should_continue_on_invalid_cors, Err(request)
70 ),
71 RequestMiddlewareAction::Respond { should_validate_hosts, response } => (
72 should_validate_hosts, false, Ok(response)
73 ),
74 };
75
76 if should_validate_hosts && !is_host_allowed {
78 return Handler::Error(Some(Response::host_not_allowed()));
79 }
80
81 match response {
83 Ok(response) => Handler::Middleware(response),
84 Err(request) => {
85 Handler::Rpc(RpcHandler {
86 jsonrpc_handler: self.jsonrpc_handler.clone(),
87 state: RpcHandlerState::ReadingHeaders {
88 request,
89 cors_domains: self.cors_domains.clone(),
90 cors_headers: self.cors_allowed_headers.clone(),
91 continue_on_invalid_cors: should_continue_on_invalid_cors,
92 },
93 is_options: false,
94 cors_max_age: self.cors_max_age,
95 cors_allow_origin: cors::AllowCors::NotRequired,
96 cors_allow_headers: cors::AllowCors::NotRequired,
97 rest_api: self.rest_api,
98 health_api: self.health_api.clone(),
99 max_request_body_size: self.max_request_body_size,
100 })
101 }
102 }
103 }
104}
105
106pub enum Handler<M: Metadata, S: Middleware<M>> {
107 Rpc(RpcHandler<M, S>),
108 Error(Option<Response>),
109 Middleware(Box<Future<Item = hyper::Response<Body>, Error = hyper::Error> + Send>),
110}
111
112impl<M: Metadata, S: Middleware<M>> Future for Handler<M, S> {
113 type Item = hyper::Response<Body>;
114 type Error = hyper::Error;
115
116 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
117 match *self {
118 Handler::Rpc(ref mut handler) => handler.poll(),
119 Handler::Middleware(ref mut middleware) => middleware.poll(),
120 Handler::Error(ref mut response) => Ok(Async::Ready(
121 response.take().expect("Response always Some initialy. Returning `Ready` so will never be polled again; qed").into()
122 )),
123 }
124 }
125}
126
127enum RpcPollState<M, F> where
128 F: Future<Item = Option<core::Response>, Error = ()>,
129{
130 Ready(RpcHandlerState<M, F>),
131 NotReady(RpcHandlerState<M, F>),
132}
133
134impl<M, F> RpcPollState<M, F> where
135 F: Future<Item = Option<core::Response>, Error = ()>,
136{
137 fn decompose(self) -> (RpcHandlerState<M, F>, bool) {
138 use self::RpcPollState::*;
139 match self {
140 Ready(handler) => (handler, true),
141 NotReady(handler) => (handler, false),
142 }
143 }
144}
145
146type FutureResponse<F> = future::Map<
147 future::Either<future::FutureResult<Option<core::Response>, ()>, FutureRpcResult<F>>,
148 fn(Option<core::Response>) -> Response,
149>;
150
151
152enum RpcHandlerState<M, F> where
153 F: Future<Item = Option<core::Response>, Error = ()>,
154{
155 ReadingHeaders {
156 request: hyper::Request<Body>,
157 cors_domains: CorsDomains,
158 cors_headers: cors::AccessControlAllowHeaders,
159 continue_on_invalid_cors: bool,
160 },
161 ReadingBody {
162 body: hyper::Body,
163 uri: Option<hyper::Uri>,
164 request: Vec<u8>,
165 metadata: M,
166 },
167 ProcessRest {
168 uri: hyper::Uri,
169 metadata: M,
170 },
171 ProcessHealth {
172 method: String,
173 metadata: M,
174 },
175 Writing(Response),
176 WaitingForResponse(FutureResponse<F>),
177 Waiting(FutureResult<F>),
178 Done,
179}
180
181impl<M, F> fmt::Debug for RpcHandlerState<M, F> where
182 F: Future<Item = Option<core::Response>, Error = ()>,
183{
184 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
185 use self::RpcHandlerState::*;
186
187 match *self {
188 ReadingHeaders {..} => write!(fmt, "ReadingHeaders"),
189 ReadingBody {..} => write!(fmt, "ReadingBody"),
190 ProcessRest {..} => write!(fmt, "ProcessRest"),
191 ProcessHealth {..} => write!(fmt, "ProcessHealth"),
192 Writing(ref res) => write!(fmt, "Writing({:?})", res),
193 WaitingForResponse(_) => write!(fmt, "WaitingForResponse"),
194 Waiting(_) => write!(fmt, "Waiting"),
195 Done => write!(fmt, "Done"),
196 }
197 }
198}
199
200pub struct RpcHandler<M: Metadata, S: Middleware<M>> {
201 jsonrpc_handler: Rpc<M, S>,
202 state: RpcHandlerState<M, S::Future>,
203 is_options: bool,
204 cors_allow_origin: cors::AllowCors<header::HeaderValue>,
205 cors_allow_headers: cors::AllowCors<Vec<header::HeaderValue>>,
206 cors_max_age: Option<u32>,
207 rest_api: RestApi,
208 health_api: Option<(String, String)>,
209 max_request_body_size: usize,
210}
211
212impl<M: Metadata, S: Middleware<M>> Future for RpcHandler<M, S> {
213 type Item = hyper::Response<Body>;
214 type Error = hyper::Error;
215
216 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
217 let new_state = match mem::replace(&mut self.state, RpcHandlerState::Done) {
218 RpcHandlerState::ReadingHeaders { request, cors_domains, cors_headers, continue_on_invalid_cors, } => {
219 self.cors_allow_origin = utils::cors_allow_origin(&request, &cors_domains);
221 self.cors_allow_headers = utils::cors_allow_headers(&request, &cors_headers);
222 self.is_options = *request.method() == Method::OPTIONS;
223 RpcPollState::Ready(self.read_headers(request, continue_on_invalid_cors))
225 },
226 RpcHandlerState::ReadingBody { body, request, metadata, uri, } => {
227 match self.process_body(body, request, uri, metadata) {
228 Err(BodyError::Utf8(ref e)) => {
229 let mesg = format!("utf-8 encoding error at byte {} in request body", e.valid_up_to());
230 let resp = Response::bad_request(mesg);
231 RpcPollState::Ready(RpcHandlerState::Writing(resp))
232 }
233 Err(BodyError::TooLarge) => {
234 let resp = Response::too_large("request body size exceeds allowed maximum");
235 RpcPollState::Ready(RpcHandlerState::Writing(resp))
236 }
237 Err(BodyError::Hyper(e)) => return Err(e),
238 Ok(state) => state,
239 }
240 },
241 RpcHandlerState::ProcessRest { uri, metadata } => {
242 self.process_rest(uri, metadata)?
243 },
244 RpcHandlerState::ProcessHealth { method, metadata } => {
245 self.process_health(method, metadata)?
246 },
247 RpcHandlerState::WaitingForResponse(mut waiting) => {
248 match waiting.poll() {
249 Ok(Async::Ready(response)) => RpcPollState::Ready(RpcHandlerState::Writing(response.into())),
250 Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::WaitingForResponse(waiting)),
251 Err(e) => RpcPollState::Ready(RpcHandlerState::Writing(
252 Response::internal_error(format!("{:?}", e))
253 )),
254 }
255 },
256 RpcHandlerState::Waiting(mut waiting) => {
257 match waiting.poll() {
258 Ok(Async::Ready(response)) => {
259 RpcPollState::Ready(RpcHandlerState::Writing(match response {
260 None => Response::ok(String::new()),
262 Some(result) => Response::ok(format!("{}\n", result)),
264 }.into()))
265 },
266 Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::Waiting(waiting)),
267 Err(e) => RpcPollState::Ready(RpcHandlerState::Writing(
268 Response::internal_error(format!("{:?}", e))
269 )),
270 }
271 },
272 state => RpcPollState::NotReady(state),
273 };
274
275 let (new_state, is_ready) = new_state.decompose();
276 match new_state {
277 RpcHandlerState::Writing(res) => {
278 let mut response: hyper::Response<Body> = res.into();
279 let cors_allow_origin = mem::replace(&mut self.cors_allow_origin, cors::AllowCors::Invalid);
280 let cors_allow_headers = mem::replace(&mut self.cors_allow_headers, cors::AllowCors::Invalid);
281
282 Self::set_response_headers(
283 response.headers_mut(),
284 self.is_options,
285 self.cors_max_age,
286 cors_allow_origin.into(),
287 cors_allow_headers.into(),
288 );
289 Ok(Async::Ready(response))
290 },
291 state => {
292 self.state = state;
293 if is_ready {
294 self.poll()
295 } else {
296 Ok(Async::NotReady)
297 }
298 },
299 }
300 }
301}
302
303enum BodyError {
306 Hyper(hyper::Error),
307 Utf8(str::Utf8Error),
308 TooLarge,
309}
310
311impl From<hyper::Error> for BodyError {
312 fn from(e: hyper::Error) -> BodyError {
313 BodyError::Hyper(e)
314 }
315}
316
317impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S> {
318 fn read_headers(
319 &self,
320 request: hyper::Request<Body>,
321 continue_on_invalid_cors: bool,
322 ) -> RpcHandlerState<M, S::Future> {
323 if self.cors_allow_origin == cors::AllowCors::Invalid && !continue_on_invalid_cors {
324 return RpcHandlerState::Writing(Response::invalid_allow_origin());
325 }
326 if self.cors_allow_headers == cors::AllowCors::Invalid && !continue_on_invalid_cors {
327 return RpcHandlerState::Writing(Response::invalid_allow_headers());
328 }
329
330 let metadata = self.jsonrpc_handler.extractor.read_metadata(&request);
332
333 match *request.method() {
335 Method::POST if Self::is_json(request.headers().get("content-type")) => {
338 let uri = if self.rest_api != RestApi::Disabled { Some(request.uri().clone()) } else { None };
339 RpcHandlerState::ReadingBody {
340 metadata,
341 request: Default::default(),
342 uri,
343 body: request.into_body(),
344 }
345 },
346 Method::POST if self.rest_api == RestApi::Unsecure && request.uri().path().split('/').count() > 2 => {
347 RpcHandlerState::ProcessRest {
348 metadata,
349 uri: request.uri().clone(),
350 }
351 },
352 Method::POST => {
354 RpcHandlerState::Writing(Response::unsupported_content_type())
355 },
356 Method::OPTIONS => {
358 RpcHandlerState::Writing(Response::empty())
359 },
360 Method::GET if self.health_api.as_ref().map(|x| &*x.0) == Some(request.uri().path()) => {
362 RpcHandlerState::ProcessHealth {
363 metadata,
364 method: self.health_api.as_ref()
365 .map(|x| x.1.clone())
366 .expect("Health api is defined since the URI matched."),
367 }
368 },
369 _ => {
371 RpcHandlerState::Writing(Response::method_not_allowed())
372 },
373 }
374 }
375
376 fn process_health(
377 &self,
378 method: String,
379 metadata: M,
380 ) -> Result<RpcPollState<M, S::Future>, hyper::Error> {
381 use self::core::types::{Call, MethodCall, Version, Params, Request, Id, Output, Success, Failure};
382
383 let call = Request::Single(Call::MethodCall(MethodCall {
385 jsonrpc: Some(Version::V2),
386 method,
387 params: Params::None,
388 id: Id::Num(1),
389 }));
390
391 return Ok(RpcPollState::Ready(RpcHandlerState::WaitingForResponse(
392 future::Either::B(self.jsonrpc_handler.handler.handle_rpc_request(call, metadata))
393 .map(|res| match res {
394 Some(core::Response::Single(Output::Success(Success { result, .. }))) => {
395 let result = serde_json::to_string(&result)
396 .expect("Serialization of result is infallible;qed");
397
398 Response::ok(result)
399 },
400 Some(core::Response::Single(Output::Failure(Failure { error, .. }))) => {
401 let result = serde_json::to_string(&error)
402 .expect("Serialization of error is infallible;qed");
403
404 Response::service_unavailable(result)
405 },
406 e => Response::internal_error(format!("Invalid response for health request: {:?}", e)),
407 })
408 )));
409 }
410
411 fn process_rest(
412 &self,
413 uri: hyper::Uri,
414 metadata: M,
415 ) -> Result<RpcPollState<M, S::Future>, hyper::Error> {
416 use self::core::types::{Call, MethodCall, Version, Params, Request, Id, Value};
417
418 let mut it = uri.path().split('/').skip(1);
420
421 let method = it.next().unwrap_or("");
423 let mut params = Vec::new();
424 for param in it {
425 let v = serde_json::from_str(param)
426 .or_else(|_| serde_json::from_str(&format!("\"{}\"", param)))
427 .unwrap_or(Value::Null);
428 params.push(v)
429 }
430
431 let call = Request::Single(Call::MethodCall(MethodCall {
433 jsonrpc: Some(Version::V2),
434 method: method.into(),
435 params: Params::Array(params),
436 id: Id::Num(1),
437 }));
438
439 return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(
440 future::Either::B(self.jsonrpc_handler.handler.handle_rpc_request(call, metadata))
441 .map(|res| res.map(|x| serde_json::to_string(&x)
442 .expect("Serialization of response is infallible;qed")
443 ))
444 )));
445 }
446
447 fn process_body(
448 &self,
449 mut body: hyper::Body,
450 mut request: Vec<u8>,
451 uri: Option<hyper::Uri>,
452 metadata: M,
453 ) -> Result<RpcPollState<M, S::Future>, BodyError> {
454 loop {
455 match body.poll()? {
456 Async::Ready(Some(chunk)) => {
457 if request.len().checked_add(chunk.len()).map(|n| n > self.max_request_body_size).unwrap_or(true) {
458 return Err(BodyError::TooLarge)
459 }
460 request.extend_from_slice(&*chunk)
461 },
462 Async::Ready(None) => {
463 if let (Some(uri), true) = (uri, request.is_empty()) {
464 return Ok(RpcPollState::Ready(RpcHandlerState::ProcessRest {
465 uri,
466 metadata,
467 }));
468 }
469
470 let content = match str::from_utf8(&request) {
471 Ok(content) => content,
472 Err(err) => {
473 return Err(BodyError::Utf8(err));
475 },
476 };
477
478 return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(
480 self.jsonrpc_handler.handler.handle_request(content, metadata)
481 )));
482 },
483 Async::NotReady => {
484 return Ok(RpcPollState::NotReady(RpcHandlerState::ReadingBody {
485 body,
486 request,
487 metadata,
488 uri,
489 }));
490 },
491 }
492 }
493 }
494
495 fn set_response_headers(
496 headers: &mut HeaderMap,
497 is_options: bool,
498 cors_max_age: Option<u32>,
499 cors_allow_origin: Option<HeaderValue>,
500 cors_allow_headers: Option<Vec<HeaderValue>>,
501 ) {
502 let as_header = |m: Method| m.as_str().parse().expect("`Method` will always parse; qed");
503 let concat = |headers: &[HeaderValue]| {
504 let separator = b", ";
505 let val = headers
506 .iter()
507 .flat_map(|h| h.as_bytes().iter().chain(separator.iter()))
508 .cloned()
509 .collect::<Vec<_>>();
510 let max_len = if val.is_empty() { 0 } else { val.len() - 2 };
511 HeaderValue::from_bytes(&val[..max_len]).expect("Concatenation of valid headers with `, ` is still valid; qed")
512 };
513
514 let allowed = concat(&[as_header(Method::OPTIONS), as_header(Method::POST)]);
515
516 if is_options {
517 headers.append(header::ALLOW, allowed.clone());
518 headers.append(header::ACCEPT, HeaderValue::from_static("application/json"));
519 }
520
521 if let Some(cors_allow_origin) = cors_allow_origin {
522 headers.append(header::VARY, HeaderValue::from_static("origin"));
523 headers.append(header::ACCESS_CONTROL_ALLOW_METHODS, allowed);
524 headers.append(header::ACCESS_CONTROL_ALLOW_ORIGIN, cors_allow_origin);
525
526 if let Some(cma) = cors_max_age {
527 headers.append(
528 header::ACCESS_CONTROL_MAX_AGE,
529 HeaderValue::from_str(&cma.to_string()).expect("`u32` will always parse; qed")
530 );
531 }
532
533 if let Some(cors_allow_headers) = cors_allow_headers {
534 if !cors_allow_headers.is_empty() {
535 headers.append(header::ACCESS_CONTROL_ALLOW_HEADERS, concat(&cors_allow_headers));
536 }
537 }
538 }
539 }
540
541 fn is_json(content_type: Option<&header::HeaderValue>) -> bool {
544 match content_type.and_then(|val| val.to_str().ok()) {
545 Some("application/json") => true,
546 Some("application/json; charset=utf-8") => true,
547 _ => false,
548 }
549 }
550}