1use crate::Rpc;
2
3use std::sync::Arc;
4use std::{fmt, mem, str};
5
6use hyper::header::{self, HeaderMap, HeaderValue};
7use hyper::{self, service::Service, Body, Method};
8
9use crate::jsonrpc::futures::{future, Async, Future, Poll, Stream};
10use crate::jsonrpc::serde_json;
11use crate::jsonrpc::{self as core, middleware, FutureResult, Metadata, Middleware};
12use crate::response::Response;
13use crate::server_utils::cors;
14
15use crate::{utils, AllowedHosts, CorsDomains, RequestMiddleware, RequestMiddlewareAction, RestApi};
16
17pub struct ServerHandler<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
19 susydev_jsonrpc_handler: Rpc<M, S>,
20 allowed_hosts: AllowedHosts,
21 cors_domains: CorsDomains,
22 cors_max_age: Option<u32>,
23 cors_allowed_headers: cors::AccessControlAllowHeaders,
24 middleware: Arc<RequestMiddleware>,
25 rest_api: RestApi,
26 health_api: Option<(String, String)>,
27 max_request_body_size: usize,
28 keep_alive: bool,
29}
30
31impl<M: Metadata, S: Middleware<M>> ServerHandler<M, S> {
32 pub fn new(
34 susydev_jsonrpc_handler: Rpc<M, S>,
35 cors_domains: CorsDomains,
36 cors_max_age: Option<u32>,
37 cors_allowed_headers: cors::AccessControlAllowHeaders,
38 allowed_hosts: AllowedHosts,
39 middleware: Arc<RequestMiddleware>,
40 rest_api: RestApi,
41 health_api: Option<(String, String)>,
42 max_request_body_size: usize,
43 keep_alive: bool,
44 ) -> Self {
45 ServerHandler {
46 susydev_jsonrpc_handler,
47 allowed_hosts,
48 cors_domains,
49 cors_max_age,
50 cors_allowed_headers,
51 middleware,
52 rest_api,
53 health_api,
54 max_request_body_size,
55 keep_alive,
56 }
57 }
58}
59
60impl<M: Metadata, S: Middleware<M>> Service for ServerHandler<M, S> {
61 type ReqBody = Body;
62 type ResBody = Body;
63 type Error = hyper::Error;
64 type Future = Handler<M, S>;
65
66 fn call(&mut self, request: hyper::Request<Self::ReqBody>) -> Self::Future {
67 let is_host_allowed = utils::is_host_allowed(&request, &self.allowed_hosts);
68 let action = self.middleware.on_request(request);
69
70 let (should_validate_hosts, should_continue_on_invalid_cors, response) = match action {
71 RequestMiddlewareAction::Proceed {
72 should_continue_on_invalid_cors,
73 request,
74 } => (true, should_continue_on_invalid_cors, Err(request)),
75 RequestMiddlewareAction::Respond {
76 should_validate_hosts,
77 response,
78 } => (should_validate_hosts, false, Ok(response)),
79 };
80
81 if should_validate_hosts && !is_host_allowed {
83 return Handler::Err(Some(Response::host_not_allowed()));
84 }
85
86 match response {
88 Ok(response) => Handler::Middleware(response),
89 Err(request) => {
90 Handler::Rpc(RpcHandler {
91 susydev_jsonrpc_handler: self.susydev_jsonrpc_handler.clone(),
92 state: RpcHandlerState::ReadingHeaders {
93 request,
94 cors_domains: self.cors_domains.clone(),
95 cors_headers: self.cors_allowed_headers.clone(),
96 continue_on_invalid_cors: should_continue_on_invalid_cors,
97 keep_alive: self.keep_alive,
98 },
99 is_options: false,
100 cors_max_age: self.cors_max_age,
101 cors_allow_origin: cors::AllowCors::NotRequired,
102 cors_allow_headers: cors::AllowCors::NotRequired,
103 rest_api: self.rest_api,
104 health_api: self.health_api.clone(),
105 max_request_body_size: self.max_request_body_size,
106 keep_alive: true,
108 })
109 }
110 }
111 }
112}
113
114pub enum Handler<M: Metadata, S: Middleware<M>> {
115 Rpc(RpcHandler<M, S>),
116 Err(Option<Response>),
117 Middleware(Box<Future<Item = hyper::Response<Body>, Error = hyper::Error> + Send>),
118}
119
120impl<M: Metadata, S: Middleware<M>> Future for Handler<M, S> {
121 type Item = hyper::Response<Body>;
122 type Error = hyper::Error;
123
124 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
125 match *self {
126 Handler::Rpc(ref mut handler) => handler.poll(),
127 Handler::Middleware(ref mut middleware) => middleware.poll(),
128 Handler::Err(ref mut response) => Ok(Async::Ready(
129 response
130 .take()
131 .expect("Response always Some initialy. Returning `Ready` so will never be polled again; qed")
132 .into(),
133 )),
134 }
135 }
136}
137
138enum RpcPollState<M, F, G>
139where
140 F: Future<Item = Option<core::Response>, Error = ()>,
141 G: Future<Item = Option<core::Output>, Error = ()>,
142{
143 Ready(RpcHandlerState<M, F, G>),
144 NotReady(RpcHandlerState<M, F, G>),
145}
146
147impl<M, F, G> RpcPollState<M, F, G>
148where
149 F: Future<Item = Option<core::Response>, Error = ()>,
150 G: Future<Item = Option<core::Output>, Error = ()>,
151{
152 fn decompose(self) -> (RpcHandlerState<M, F, G>, bool) {
153 use self::RpcPollState::*;
154 match self {
155 Ready(handler) => (handler, true),
156 NotReady(handler) => (handler, false),
157 }
158 }
159}
160
161type FutureResponse<F, G> = future::Map<
162 future::Either<future::FutureResult<Option<core::Response>, ()>, core::FutureRpcResult<F, G>>,
163 fn(Option<core::Response>) -> Response,
164>;
165
166enum RpcHandlerState<M, F, G>
167where
168 F: Future<Item = Option<core::Response>, Error = ()>,
169 G: Future<Item = Option<core::Output>, Error = ()>,
170{
171 ReadingHeaders {
172 request: hyper::Request<Body>,
173 cors_domains: CorsDomains,
174 cors_headers: cors::AccessControlAllowHeaders,
175 continue_on_invalid_cors: bool,
176 keep_alive: bool,
177 },
178 ReadingBody {
179 body: hyper::Body,
180 uri: Option<hyper::Uri>,
181 request: Vec<u8>,
182 metadata: M,
183 },
184 ProcessRest {
185 uri: hyper::Uri,
186 metadata: M,
187 },
188 ProcessHealth {
189 method: String,
190 metadata: M,
191 },
192 Writing(Response),
193 Waiting(FutureResult<F, G>),
194 WaitingForResponse(FutureResponse<F, G>),
195 Done,
196}
197
198impl<M, F, G> fmt::Debug for RpcHandlerState<M, F, G>
199where
200 F: Future<Item = Option<core::Response>, Error = ()>,
201 G: Future<Item = Option<core::Output>, Error = ()>,
202{
203 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
204 use self::RpcHandlerState::*;
205
206 match *self {
207 ReadingHeaders { .. } => write!(fmt, "ReadingHeaders"),
208 ReadingBody { .. } => write!(fmt, "ReadingBody"),
209 ProcessRest { .. } => write!(fmt, "ProcessRest"),
210 ProcessHealth { .. } => write!(fmt, "ProcessHealth"),
211 Writing(ref res) => write!(fmt, "Writing({:?})", res),
212 WaitingForResponse(_) => write!(fmt, "WaitingForResponse"),
213 Waiting(_) => write!(fmt, "Waiting"),
214 Done => write!(fmt, "Done"),
215 }
216 }
217}
218
219pub struct RpcHandler<M: Metadata, S: Middleware<M>> {
220 susydev_jsonrpc_handler: Rpc<M, S>,
221 state: RpcHandlerState<M, S::Future, S::CallFuture>,
222 is_options: bool,
223 cors_allow_origin: cors::AllowCors<header::HeaderValue>,
224 cors_allow_headers: cors::AllowCors<Vec<header::HeaderValue>>,
225 cors_max_age: Option<u32>,
226 rest_api: RestApi,
227 health_api: Option<(String, String)>,
228 max_request_body_size: usize,
229 keep_alive: bool,
230}
231
232impl<M: Metadata, S: Middleware<M>> Future for RpcHandler<M, S> {
233 type Item = hyper::Response<Body>;
234 type Error = hyper::Error;
235
236 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
237 let new_state = match mem::replace(&mut self.state, RpcHandlerState::Done) {
238 RpcHandlerState::ReadingHeaders {
239 request,
240 cors_domains,
241 cors_headers,
242 continue_on_invalid_cors,
243 keep_alive,
244 } => {
245 self.cors_allow_origin = utils::cors_allow_origin(&request, &cors_domains);
247 self.cors_allow_headers = utils::cors_allow_headers(&request, &cors_headers);
248 self.keep_alive = utils::keep_alive(&request, keep_alive);
249 self.is_options = *request.method() == Method::OPTIONS;
250 RpcPollState::Ready(self.read_headers(request, continue_on_invalid_cors))
252 }
253 RpcHandlerState::ReadingBody {
254 body,
255 request,
256 metadata,
257 uri,
258 } => match self.process_body(body, request, uri, metadata) {
259 Err(BodyError::Utf8(ref e)) => {
260 let mesg = format!("utf-8 encoding error at byte {} in request body", e.valid_up_to());
261 let resp = Response::bad_request(mesg);
262 RpcPollState::Ready(RpcHandlerState::Writing(resp))
263 }
264 Err(BodyError::TooLarge) => {
265 let resp = Response::too_large("request body size exceeds allowed maximum");
266 RpcPollState::Ready(RpcHandlerState::Writing(resp))
267 }
268 Err(BodyError::Hyper(e)) => return Err(e),
269 Ok(state) => state,
270 },
271 RpcHandlerState::ProcessRest { uri, metadata } => self.process_rest(uri, metadata)?,
272 RpcHandlerState::ProcessHealth { method, metadata } => self.process_health(method, metadata)?,
273 RpcHandlerState::WaitingForResponse(mut waiting) => match waiting.poll() {
274 Ok(Async::Ready(response)) => RpcPollState::Ready(RpcHandlerState::Writing(response)),
275 Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::WaitingForResponse(waiting)),
276 Err(e) => RpcPollState::Ready(RpcHandlerState::Writing(Response::internal_error(format!("{:?}", e)))),
277 },
278 RpcHandlerState::Waiting(mut waiting) => {
279 match waiting.poll() {
280 Ok(Async::Ready(response)) => {
281 RpcPollState::Ready(RpcHandlerState::Writing(match response {
282 None => Response::ok(String::new()),
284 Some(result) => Response::ok(format!("{}\n", result)),
286 }))
287 }
288 Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::Waiting(waiting)),
289 Err(e) => {
290 RpcPollState::Ready(RpcHandlerState::Writing(Response::internal_error(format!("{:?}", e))))
291 }
292 }
293 }
294 state => RpcPollState::NotReady(state),
295 };
296
297 let (new_state, is_ready) = new_state.decompose();
298 match new_state {
299 RpcHandlerState::Writing(res) => {
300 let mut response: hyper::Response<Body> = res.into();
301 let cors_allow_origin = mem::replace(&mut self.cors_allow_origin, cors::AllowCors::Invalid);
302 let cors_allow_headers = mem::replace(&mut self.cors_allow_headers, cors::AllowCors::Invalid);
303
304 Self::set_response_headers(
305 response.headers_mut(),
306 self.is_options,
307 self.cors_max_age,
308 cors_allow_origin.into(),
309 cors_allow_headers.into(),
310 self.keep_alive,
311 );
312 Ok(Async::Ready(response))
313 }
314 state => {
315 self.state = state;
316 if is_ready {
317 self.poll()
318 } else {
319 Ok(Async::NotReady)
320 }
321 }
322 }
323 }
324}
325
326enum BodyError {
329 Hyper(hyper::Error),
330 Utf8(str::Utf8Error),
331 TooLarge,
332}
333
334impl From<hyper::Error> for BodyError {
335 fn from(e: hyper::Error) -> BodyError {
336 BodyError::Hyper(e)
337 }
338}
339
340impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S> {
341 fn read_headers(
342 &self,
343 request: hyper::Request<Body>,
344 continue_on_invalid_cors: bool,
345 ) -> RpcHandlerState<M, S::Future, S::CallFuture> {
346 if self.cors_allow_origin == cors::AllowCors::Invalid && !continue_on_invalid_cors {
347 return RpcHandlerState::Writing(Response::invalid_allow_origin());
348 }
349
350 if self.cors_allow_headers == cors::AllowCors::Invalid && !continue_on_invalid_cors {
351 return RpcHandlerState::Writing(Response::invalid_allow_headers());
352 }
353
354 let metadata = self.susydev_jsonrpc_handler.extractor.read_metadata(&request);
356
357 match *request.method() {
359 Method::POST if Self::is_json(request.headers().get("content-type")) => {
362 let uri = if self.rest_api != RestApi::Disabled {
363 Some(request.uri().clone())
364 } else {
365 None
366 };
367 RpcHandlerState::ReadingBody {
368 metadata,
369 request: Default::default(),
370 uri,
371 body: request.into_body(),
372 }
373 }
374 Method::POST if self.rest_api == RestApi::Unsecure && request.uri().path().split('/').count() > 2 => {
375 RpcHandlerState::ProcessRest {
376 metadata,
377 uri: request.uri().clone(),
378 }
379 }
380 Method::POST => RpcHandlerState::Writing(Response::unsupported_content_type()),
382 Method::OPTIONS => RpcHandlerState::Writing(Response::empty()),
384 Method::GET if self.health_api.as_ref().map(|x| &*x.0) == Some(request.uri().path()) => {
386 RpcHandlerState::ProcessHealth {
387 metadata,
388 method: self
389 .health_api
390 .as_ref()
391 .map(|x| x.1.clone())
392 .expect("Health api is defined since the URI matched."),
393 }
394 }
395 _ => RpcHandlerState::Writing(Response::method_not_allowed()),
397 }
398 }
399
400 fn process_health(
401 &self,
402 method: String,
403 metadata: M,
404 ) -> Result<RpcPollState<M, S::Future, S::CallFuture>, hyper::Error> {
405 use self::core::types::{Call, Failure, Id, MethodCall, Output, Params, Request, Success, Version};
406
407 let call = Request::Single(Call::MethodCall(MethodCall {
409 jsonrpc: Some(Version::V2),
410 method,
411 params: Params::None,
412 id: Id::Num(1),
413 }));
414
415 Ok(RpcPollState::Ready(RpcHandlerState::WaitingForResponse(
416 future::Either::B(self.susydev_jsonrpc_handler.handler.handle_rpc_request(call, metadata)).map(|res| match res {
417 Some(core::Response::Single(Output::Success(Success { result, .. }))) => {
418 let result = serde_json::to_string(&result).expect("Serialization of result is infallible;qed");
419
420 Response::ok(result)
421 }
422 Some(core::Response::Single(Output::Failure(Failure { error, .. }))) => {
423 let result = serde_json::to_string(&error).expect("Serialization of error is infallible;qed");
424
425 Response::service_unavailable(result)
426 }
427 e => Response::internal_error(format!("Invalid response for health request: {:?}", e)),
428 }),
429 )))
430 }
431
432 fn process_rest(
433 &self,
434 uri: hyper::Uri,
435 metadata: M,
436 ) -> Result<RpcPollState<M, S::Future, S::CallFuture>, hyper::Error> {
437 use self::core::types::{Call, Id, MethodCall, Params, Request, Value, Version};
438
439 let mut it = uri.path().split('/').skip(1);
441
442 let method = it.next().unwrap_or("");
444 let mut params = Vec::new();
445 for param in it {
446 let v = serde_json::from_str(param)
447 .or_else(|_| serde_json::from_str(&format!("\"{}\"", param)))
448 .unwrap_or(Value::Null);
449 params.push(v)
450 }
451
452 let call = Request::Single(Call::MethodCall(MethodCall {
454 jsonrpc: Some(Version::V2),
455 method: method.into(),
456 params: Params::Array(params),
457 id: Id::Num(1),
458 }));
459
460 Ok(RpcPollState::Ready(RpcHandlerState::Waiting(
461 future::Either::B(self.susydev_jsonrpc_handler.handler.handle_rpc_request(call, metadata)).map(|res| {
462 res.map(|x| serde_json::to_string(&x).expect("Serialization of response is infallible;qed"))
463 }),
464 )))
465 }
466
467 fn process_body(
468 &self,
469 mut body: hyper::Body,
470 mut request: Vec<u8>,
471 uri: Option<hyper::Uri>,
472 metadata: M,
473 ) -> Result<RpcPollState<M, S::Future, S::CallFuture>, BodyError> {
474 loop {
475 match body.poll()? {
476 Async::Ready(Some(chunk)) => {
477 if request
478 .len()
479 .checked_add(chunk.len())
480 .map(|n| n > self.max_request_body_size)
481 .unwrap_or(true)
482 {
483 return Err(BodyError::TooLarge);
484 }
485 request.extend_from_slice(&*chunk)
486 }
487 Async::Ready(None) => {
488 if let (Some(uri), true) = (uri, request.is_empty()) {
489 return Ok(RpcPollState::Ready(RpcHandlerState::ProcessRest { uri, metadata }));
490 }
491
492 let content = match str::from_utf8(&request) {
493 Ok(content) => content,
494 Err(err) => {
495 return Err(BodyError::Utf8(err));
497 }
498 };
499
500 return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(
502 self.susydev_jsonrpc_handler.handler.handle_request(content, metadata),
503 )));
504 }
505 Async::NotReady => {
506 return Ok(RpcPollState::NotReady(RpcHandlerState::ReadingBody {
507 body,
508 request,
509 metadata,
510 uri,
511 }));
512 }
513 }
514 }
515 }
516
517 fn set_response_headers(
518 headers: &mut HeaderMap,
519 is_options: bool,
520 cors_max_age: Option<u32>,
521 cors_allow_origin: Option<HeaderValue>,
522 cors_allow_headers: Option<Vec<HeaderValue>>,
523 keep_alive: bool,
524 ) {
525 let as_header = |m: Method| m.as_str().parse().expect("`Method` will always parse; qed");
526 let concat = |headers: &[HeaderValue]| {
527 let separator = b", ";
528 let val = headers
529 .iter()
530 .flat_map(|h| h.as_bytes().iter().chain(separator.iter()))
531 .cloned()
532 .collect::<Vec<_>>();
533 let max_len = if val.is_empty() { 0 } else { val.len() - 2 };
534 HeaderValue::from_bytes(&val[..max_len])
535 .expect("Concatenation of valid headers with `, ` is still valid; qed")
536 };
537
538 let allowed = concat(&[as_header(Method::OPTIONS), as_header(Method::POST)]);
539
540 if is_options {
541 headers.append(header::ALLOW, allowed.clone());
542 headers.append(header::ACCEPT, HeaderValue::from_static("application/json"));
543 }
544
545 if let Some(cors_allow_origin) = cors_allow_origin {
546 headers.append(header::VARY, HeaderValue::from_static("origin"));
547 headers.append(header::ACCESS_CONTROL_ALLOW_METHODS, allowed);
548 headers.append(header::ACCESS_CONTROL_ALLOW_ORIGIN, cors_allow_origin);
549
550 if let Some(cma) = cors_max_age {
551 headers.append(
552 header::ACCESS_CONTROL_MAX_AGE,
553 HeaderValue::from_str(&cma.to_string()).expect("`u32` will always parse; qed"),
554 );
555 }
556
557 if let Some(cors_allow_headers) = cors_allow_headers {
558 if !cors_allow_headers.is_empty() {
559 headers.append(header::ACCESS_CONTROL_ALLOW_HEADERS, concat(&cors_allow_headers));
560 }
561 }
562 }
563
564 if !keep_alive {
565 headers.append(header::CONNECTION, HeaderValue::from_static("close"));
566 }
567 }
568
569 fn is_json(content_type: Option<&header::HeaderValue>) -> bool {
572 match content_type.and_then(|val| val.to_str().ok()) {
573 Some(ref content)
574 if content.eq_ignore_ascii_case("application/json")
575 || content.eq_ignore_ascii_case("application/json; charset=utf-8")
576 || content.eq_ignore_ascii_case("application/json;charset=utf-8") =>
577 {
578 true
579 }
580 _ => false,
581 }
582 }
583}
584
585#[cfg(test)]
586mod test {
587 use super::{hyper, RpcHandler};
588 use susydev_jsonrpc_core::middleware::Noop;
589
590 #[test]
591 fn test_case_insensitive_content_type() {
592 let request = hyper::Request::builder()
593 .header("content-type", "Application/Json; charset=UTF-8")
594 .body(())
595 .unwrap();
596
597 let request2 = hyper::Request::builder()
598 .header("content-type", "Application/Json;charset=UTF-8")
599 .body(())
600 .unwrap();
601
602 assert_eq!(
603 request.headers().get("content-type").unwrap(),
604 &"Application/Json; charset=UTF-8"
605 );
606
607 assert_eq!(
608 RpcHandler::<(), Noop>::is_json(request.headers().get("content-type")),
609 true
610 );
611 assert_eq!(
612 RpcHandler::<(), Noop>::is_json(request2.headers().get("content-type")),
613 true
614 );
615 }
616}