1use crate::WeakRpc;
2
3use std::sync::Arc;
4use std::{fmt, mem, str};
5
6use hyper::header::{self, HeaderMap, HeaderValue};
7use hyper::{self, service::Service, Body, Method};
8
9use crate::jsonrpc::serde_json;
10use crate::jsonrpc::{self as core, middleware, Metadata, Middleware};
11use crate::response::Response;
12use crate::server_utils::cors;
13use futures01::{Async, Future, Poll, Stream};
14
15use crate::{utils, AllowedHosts, CorsDomains, RequestMiddleware, RequestMiddlewareAction, RestApi};
16
17pub struct ServerHandler<M: Metadata = (), S: Middleware<M> = middleware::Noop> {
19 jsonrpc_handler: WeakRpc<M, S>,
20 allowed_hosts: AllowedHosts,
21 cors_domains: CorsDomains,
22 cors_max_age: Option<u32>,
23 cors_allowed_headers: cors::AccessControlAllowHeaders,
24 middleware: Arc<dyn RequestMiddleware>,
25 rest_api: RestApi,
26 health_api: Option<(String, String)>,
27 max_request_body_size: usize,
28 keep_alive: bool,
29}
30
31impl<M: Metadata, S: Middleware<M>> ServerHandler<M, S> {
32 pub fn new(
34 jsonrpc_handler: WeakRpc<M, S>,
35 cors_domains: CorsDomains,
36 cors_max_age: Option<u32>,
37 cors_allowed_headers: cors::AccessControlAllowHeaders,
38 allowed_hosts: AllowedHosts,
39 middleware: Arc<dyn RequestMiddleware>,
40 rest_api: RestApi,
41 health_api: Option<(String, String)>,
42 max_request_body_size: usize,
43 keep_alive: bool,
44 ) -> Self {
45 ServerHandler {
46 jsonrpc_handler,
47 allowed_hosts,
48 cors_domains,
49 cors_max_age,
50 cors_allowed_headers,
51 middleware,
52 rest_api,
53 health_api,
54 max_request_body_size,
55 keep_alive,
56 }
57 }
58}
59
60impl<M: Metadata, S: Middleware<M>> Service for ServerHandler<M, S>
61where
62 S::Future: Unpin,
63 S::CallFuture: Unpin,
64{
65 type ReqBody = Body;
66 type ResBody = Body;
67 type Error = hyper::Error;
68 type Future = Handler<M, S>;
69
70 fn call(&mut self, request: hyper::Request<Self::ReqBody>) -> Self::Future {
71 let is_host_allowed = utils::is_host_allowed(&request, &self.allowed_hosts);
72 let action = self.middleware.on_request(request);
73
74 let (should_validate_hosts, should_continue_on_invalid_cors, response) = match action {
75 RequestMiddlewareAction::Proceed {
76 should_continue_on_invalid_cors,
77 request,
78 } => (true, should_continue_on_invalid_cors, Err(request)),
79 RequestMiddlewareAction::Respond {
80 should_validate_hosts,
81 response,
82 } => (should_validate_hosts, false, Ok(response)),
83 };
84
85 if should_validate_hosts && !is_host_allowed {
87 return Handler::Err(Some(Response::host_not_allowed()));
88 }
89
90 match response {
92 Ok(response) => Handler::Middleware(response),
93 Err(request) => {
94 Handler::Rpc(RpcHandler {
95 jsonrpc_handler: self.jsonrpc_handler.clone(),
96 state: RpcHandlerState::ReadingHeaders {
97 request,
98 cors_domains: self.cors_domains.clone(),
99 cors_headers: self.cors_allowed_headers.clone(),
100 continue_on_invalid_cors: should_continue_on_invalid_cors,
101 keep_alive: self.keep_alive,
102 },
103 is_options: false,
104 cors_max_age: self.cors_max_age,
105 cors_allow_origin: cors::AllowCors::NotRequired,
106 cors_allow_headers: cors::AllowCors::NotRequired,
107 rest_api: self.rest_api,
108 health_api: self.health_api.clone(),
109 max_request_body_size: self.max_request_body_size,
110 keep_alive: true,
112 })
113 }
114 }
115 }
116}
117
118pub enum Handler<M: Metadata, S: Middleware<M>> {
119 Rpc(RpcHandler<M, S>),
120 Err(Option<Response>),
121 Middleware(Box<dyn Future<Item = hyper::Response<Body>, Error = hyper::Error> + Send>),
122}
123
124impl<M: Metadata, S: Middleware<M>> Future for Handler<M, S>
125where
126 S::Future: Unpin,
127 S::CallFuture: Unpin,
128{
129 type Item = hyper::Response<Body>;
130 type Error = hyper::Error;
131
132 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
133 match *self {
134 Handler::Rpc(ref mut handler) => handler.poll(),
135 Handler::Middleware(ref mut middleware) => middleware.poll(),
136 Handler::Err(ref mut response) => Ok(Async::Ready(
137 response
138 .take()
139 .expect("Response always Some initialy. Returning `Ready` so will never be polled again; qed")
140 .into(),
141 )),
142 }
143 }
144}
145
146enum RpcPollState<M> {
147 Ready(RpcHandlerState<M>),
148 NotReady(RpcHandlerState<M>),
149}
150
151impl<M> RpcPollState<M> {
152 fn decompose(self) -> (RpcHandlerState<M>, bool) {
153 use self::RpcPollState::*;
154 match self {
155 Ready(handler) => (handler, true),
156 NotReady(handler) => (handler, false),
157 }
158 }
159}
160
161enum RpcHandlerState<M> {
162 ReadingHeaders {
163 request: hyper::Request<Body>,
164 cors_domains: CorsDomains,
165 cors_headers: cors::AccessControlAllowHeaders,
166 continue_on_invalid_cors: bool,
167 keep_alive: bool,
168 },
169 ReadingBody {
170 body: hyper::Body,
171 uri: Option<hyper::Uri>,
172 request: Vec<u8>,
173 metadata: M,
174 },
175 ProcessRest {
176 uri: hyper::Uri,
177 metadata: M,
178 },
179 ProcessHealth {
180 method: String,
181 metadata: M,
182 },
183 Writing(Response),
184 Waiting(Box<dyn Future<Item = Option<String>, Error = ()> + Send>),
185 WaitingForResponse(Box<dyn Future<Item = Response, Error = ()> + Send>),
186 Done,
187}
188
189impl<M> fmt::Debug for RpcHandlerState<M> {
190 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
191 use self::RpcHandlerState::*;
192
193 match *self {
194 ReadingHeaders { .. } => write!(fmt, "ReadingHeaders"),
195 ReadingBody { .. } => write!(fmt, "ReadingBody"),
196 ProcessRest { .. } => write!(fmt, "ProcessRest"),
197 ProcessHealth { .. } => write!(fmt, "ProcessHealth"),
198 Writing(ref res) => write!(fmt, "Writing({:?})", res),
199 WaitingForResponse(_) => write!(fmt, "WaitingForResponse"),
200 Waiting(_) => write!(fmt, "Waiting"),
201 Done => write!(fmt, "Done"),
202 }
203 }
204}
205
206pub struct RpcHandler<M: Metadata, S: Middleware<M>> {
207 jsonrpc_handler: WeakRpc<M, S>,
208 state: RpcHandlerState<M>,
209 is_options: bool,
210 cors_allow_origin: cors::AllowCors<header::HeaderValue>,
211 cors_allow_headers: cors::AllowCors<Vec<header::HeaderValue>>,
212 cors_max_age: Option<u32>,
213 rest_api: RestApi,
214 health_api: Option<(String, String)>,
215 max_request_body_size: usize,
216 keep_alive: bool,
217}
218
219impl<M: Metadata, S: Middleware<M>> Future for RpcHandler<M, S>
220where
221 S::Future: Unpin,
222 S::CallFuture: Unpin,
223{
224 type Item = hyper::Response<Body>;
225 type Error = hyper::Error;
226
227 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
228 let new_state = match mem::replace(&mut self.state, RpcHandlerState::Done) {
229 RpcHandlerState::ReadingHeaders {
230 request,
231 cors_domains,
232 cors_headers,
233 continue_on_invalid_cors,
234 keep_alive,
235 } => {
236 self.cors_allow_origin = utils::cors_allow_origin(&request, &cors_domains);
238 self.cors_allow_headers = utils::cors_allow_headers(&request, &cors_headers);
239 self.keep_alive = utils::keep_alive(&request, keep_alive);
240 self.is_options = *request.method() == Method::OPTIONS;
241 RpcPollState::Ready(self.read_headers(request, continue_on_invalid_cors))
243 }
244 RpcHandlerState::ReadingBody {
245 body,
246 request,
247 metadata,
248 uri,
249 } => match self.process_body(body, request, uri, metadata) {
250 Err(BodyError::Utf8(ref e)) => {
251 let mesg = format!("utf-8 encoding error at byte {} in request body", e.valid_up_to());
252 let resp = Response::bad_request(mesg);
253 RpcPollState::Ready(RpcHandlerState::Writing(resp))
254 }
255 Err(BodyError::TooLarge) => {
256 let resp = Response::too_large("request body size exceeds allowed maximum");
257 RpcPollState::Ready(RpcHandlerState::Writing(resp))
258 }
259 Err(BodyError::Hyper(e)) => return Err(e),
260 Ok(state) => state,
261 },
262 RpcHandlerState::ProcessRest { uri, metadata } => self.process_rest(uri, metadata)?,
263 RpcHandlerState::ProcessHealth { method, metadata } => self.process_health(method, metadata)?,
264 RpcHandlerState::WaitingForResponse(mut waiting) => match waiting.poll() {
265 Ok(Async::Ready(response)) => RpcPollState::Ready(RpcHandlerState::Writing(response)),
266 Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::WaitingForResponse(waiting)),
267 Err(e) => RpcPollState::Ready(RpcHandlerState::Writing(Response::internal_error(format!("{:?}", e)))),
268 },
269 RpcHandlerState::Waiting(mut waiting) => {
270 match waiting.poll() {
271 Ok(Async::Ready(response)) => {
272 RpcPollState::Ready(RpcHandlerState::Writing(match response {
273 None => Response::ok(String::new()),
275 Some(result) => Response::ok(format!("{}\n", result)),
277 }))
278 }
279 Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::Waiting(waiting)),
280 Err(e) => {
281 RpcPollState::Ready(RpcHandlerState::Writing(Response::internal_error(format!("{:?}", e))))
282 }
283 }
284 }
285 state => RpcPollState::NotReady(state),
286 };
287
288 let (new_state, is_ready) = new_state.decompose();
289 match new_state {
290 RpcHandlerState::Writing(res) => {
291 let mut response: hyper::Response<Body> = res.into();
292 let cors_allow_origin = mem::replace(&mut self.cors_allow_origin, cors::AllowCors::Invalid);
293 let cors_allow_headers = mem::replace(&mut self.cors_allow_headers, cors::AllowCors::Invalid);
294
295 Self::set_response_headers(
296 response.headers_mut(),
297 self.is_options,
298 self.cors_max_age,
299 cors_allow_origin.into(),
300 cors_allow_headers.into(),
301 self.keep_alive,
302 );
303 Ok(Async::Ready(response))
304 }
305 state => {
306 self.state = state;
307 if is_ready {
308 self.poll()
309 } else {
310 Ok(Async::NotReady)
311 }
312 }
313 }
314 }
315}
316
317enum BodyError {
320 Hyper(hyper::Error),
321 Utf8(str::Utf8Error),
322 TooLarge,
323}
324
325impl From<hyper::Error> for BodyError {
326 fn from(e: hyper::Error) -> BodyError {
327 BodyError::Hyper(e)
328 }
329}
330
331impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S>
332where
333 S::Future: Unpin,
334 S::CallFuture: Unpin,
335{
336 fn read_headers(&self, request: hyper::Request<Body>, continue_on_invalid_cors: bool) -> RpcHandlerState<M> {
337 if self.cors_allow_origin == cors::AllowCors::Invalid && !continue_on_invalid_cors {
338 return RpcHandlerState::Writing(Response::invalid_allow_origin());
339 }
340
341 if self.cors_allow_headers == cors::AllowCors::Invalid && !continue_on_invalid_cors {
342 return RpcHandlerState::Writing(Response::invalid_allow_headers());
343 }
344
345 let handler = match self.jsonrpc_handler.upgrade() {
347 Some(handler) => handler,
348 None => return RpcHandlerState::Writing(Response::closing()),
349 };
350 let metadata = handler.extractor.read_metadata(&request);
351
352 match *request.method() {
354 Method::POST if Self::is_json(request.headers().get("content-type")) => {
357 let uri = if self.rest_api != RestApi::Disabled {
358 Some(request.uri().clone())
359 } else {
360 None
361 };
362 RpcHandlerState::ReadingBody {
363 metadata,
364 request: Default::default(),
365 uri,
366 body: request.into_body(),
367 }
368 }
369 Method::POST if self.rest_api == RestApi::Unsecure && request.uri().path().split('/').count() > 2 => {
370 RpcHandlerState::ProcessRest {
371 metadata,
372 uri: request.uri().clone(),
373 }
374 }
375 Method::POST => RpcHandlerState::Writing(Response::unsupported_content_type()),
377 Method::OPTIONS => RpcHandlerState::Writing(Response::empty()),
379 Method::GET if self.health_api.as_ref().map(|x| &*x.0) == Some(request.uri().path()) => {
381 RpcHandlerState::ProcessHealth {
382 metadata,
383 method: self
384 .health_api
385 .as_ref()
386 .map(|x| x.1.clone())
387 .expect("Health api is defined since the URI matched."),
388 }
389 }
390 _ => RpcHandlerState::Writing(Response::method_not_allowed()),
392 }
393 }
394
395 fn process_health(&self, method: String, metadata: M) -> Result<RpcPollState<M>, hyper::Error> {
396 use self::core::types::{Call, Failure, Id, MethodCall, Output, Params, Request, Success, Version};
397 use futures03::{FutureExt, TryFutureExt};
398
399 let call = Request::Single(Call::MethodCall(MethodCall {
401 jsonrpc: Some(Version::V2),
402 method,
403 params: Params::None,
404 id: Id::Num(1),
405 }));
406
407 let response = match self.jsonrpc_handler.upgrade() {
408 Some(h) => h.handler.handle_rpc_request(call, metadata),
409 None => return Ok(RpcPollState::Ready(RpcHandlerState::Writing(Response::closing()))),
410 };
411 let response = response.map(Ok).compat();
412
413 Ok(RpcPollState::Ready(RpcHandlerState::WaitingForResponse(Box::new(
414 response.map(|res| match res {
415 Some(core::Response::Single(Output::Success(Success { result, .. }))) => {
416 let result = serde_json::to_string(&result).expect("Serialization of result is infallible;qed");
417
418 Response::ok(result)
419 }
420 Some(core::Response::Single(Output::Failure(Failure { error, .. }))) => {
421 let result = serde_json::to_string(&error).expect("Serialization of error is infallible;qed");
422
423 Response::service_unavailable(result)
424 }
425 e => Response::internal_error(format!("Invalid response for health request: {:?}", e)),
426 }),
427 ))))
428 }
429
430 fn process_rest(&self, uri: hyper::Uri, metadata: M) -> Result<RpcPollState<M>, hyper::Error> {
431 use self::core::types::{Call, Id, MethodCall, Params, Request, Value, Version};
432 use futures03::{FutureExt, TryFutureExt};
433
434 let mut it = uri.path().split('/').skip(1);
436
437 let method = it.next().unwrap_or("");
439 let mut params = Vec::new();
440 for param in it {
441 let v = serde_json::from_str(param)
442 .or_else(|_| serde_json::from_str(&format!("\"{}\"", param)))
443 .unwrap_or(Value::Null);
444 params.push(v)
445 }
446
447 let call = Request::Single(Call::MethodCall(MethodCall {
449 jsonrpc: Some(Version::V2),
450 method: method.into(),
451 params: Params::Array(params),
452 id: Id::Num(1),
453 }));
454
455 let response = match self.jsonrpc_handler.upgrade() {
456 Some(h) => h.handler.handle_rpc_request(call, metadata),
457 None => return Ok(RpcPollState::Ready(RpcHandlerState::Writing(Response::closing()))),
458 };
459 let response = response.map(Ok).compat();
460
461 Ok(RpcPollState::Ready(RpcHandlerState::Waiting(Box::new(response.map(
462 |res| res.map(|x| serde_json::to_string(&x).expect("Serialization of response is infallible;qed")),
463 )))))
464 }
465
466 fn process_body(
467 &self,
468 mut body: hyper::Body,
469 mut request: Vec<u8>,
470 uri: Option<hyper::Uri>,
471 metadata: M,
472 ) -> Result<RpcPollState<M>, BodyError> {
473 loop {
474 match body.poll()? {
475 Async::Ready(Some(chunk)) => {
476 if request
477 .len()
478 .checked_add(chunk.len())
479 .map(|n| n > self.max_request_body_size)
480 .unwrap_or(true)
481 {
482 return Err(BodyError::TooLarge);
483 }
484 request.extend_from_slice(&*chunk)
485 }
486 Async::Ready(None) => {
487 use futures03::{FutureExt, TryFutureExt};
488 if let (Some(uri), true) = (uri, request.is_empty()) {
489 return Ok(RpcPollState::Ready(RpcHandlerState::ProcessRest { uri, metadata }));
490 }
491
492 let content = match str::from_utf8(&request) {
493 Ok(content) => content,
494 Err(err) => {
495 return Err(BodyError::Utf8(err));
497 }
498 };
499
500 let response = match self.jsonrpc_handler.upgrade() {
501 Some(h) => h.handler.handle_request(content, metadata),
502 None => return Ok(RpcPollState::Ready(RpcHandlerState::Writing(Response::closing()))),
503 };
504
505 let response = response.map(Ok).compat();
507 return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(Box::new(response))));
508 }
509 Async::NotReady => {
510 return Ok(RpcPollState::NotReady(RpcHandlerState::ReadingBody {
511 body,
512 request,
513 metadata,
514 uri,
515 }));
516 }
517 }
518 }
519 }
520
521 fn set_response_headers(
522 headers: &mut HeaderMap,
523 is_options: bool,
524 cors_max_age: Option<u32>,
525 cors_allow_origin: Option<HeaderValue>,
526 cors_allow_headers: Option<Vec<HeaderValue>>,
527 keep_alive: bool,
528 ) {
529 let as_header = |m: Method| m.as_str().parse().expect("`Method` will always parse; qed");
530 let concat = |headers: &[HeaderValue]| {
531 let separator = b", ";
532 let val = headers
533 .iter()
534 .flat_map(|h| h.as_bytes().iter().chain(separator.iter()))
535 .cloned()
536 .collect::<Vec<_>>();
537 let max_len = if val.is_empty() { 0 } else { val.len() - 2 };
538 HeaderValue::from_bytes(&val[..max_len])
539 .expect("Concatenation of valid headers with `, ` is still valid; qed")
540 };
541
542 let allowed = concat(&[as_header(Method::OPTIONS), as_header(Method::POST)]);
543
544 if is_options {
545 headers.append(header::ALLOW, allowed.clone());
546 headers.append(header::ACCEPT, HeaderValue::from_static("application/json"));
547 headers.append(header::ACCEPT, HeaderValue::from_static("text/json"));
548 }
549
550 if let Some(cors_allow_origin) = cors_allow_origin {
551 headers.append(header::VARY, HeaderValue::from_static("origin"));
552 headers.append(header::ACCESS_CONTROL_ALLOW_METHODS, allowed);
553 headers.append(header::ACCESS_CONTROL_ALLOW_ORIGIN, cors_allow_origin);
554
555 if let Some(cma) = cors_max_age {
556 headers.append(
557 header::ACCESS_CONTROL_MAX_AGE,
558 HeaderValue::from_str(&cma.to_string()).expect("`u32` will always parse; qed"),
559 );
560 }
561
562 if let Some(cors_allow_headers) = cors_allow_headers {
563 if !cors_allow_headers.is_empty() {
564 headers.append(header::ACCESS_CONTROL_ALLOW_HEADERS, concat(&cors_allow_headers));
565 }
566 }
567 }
568
569 if !keep_alive {
570 headers.append(header::CONNECTION, HeaderValue::from_static("close"));
571 }
572 }
573
574 fn is_json(content_type: Option<&header::HeaderValue>) -> bool {
577 match content_type.and_then(|val| val.to_str().ok()) {
578 Some(ref content)
579 if content.eq_ignore_ascii_case("application/json")
580 || content.eq_ignore_ascii_case("application/json; charset=utf-8")
581 || content.eq_ignore_ascii_case("application/json;charset=utf-8")
582 || content.eq_ignore_ascii_case("text/json")
583 || content.eq_ignore_ascii_case("text/json; charset=utf-8")
584 || content.eq_ignore_ascii_case("text/json;charset=utf-8") =>
585 {
586 true
587 }
588 _ => false,
589 }
590 }
591}
592
593#[cfg(test)]
594mod test {
595 use super::{hyper, RpcHandler};
596 use jsonrpc_core::middleware::Noop;
597
598 #[test]
599 fn test_case_insensitive_content_type() {
600 let request = hyper::Request::builder()
601 .header("content-type", "Application/Json; charset=UTF-8")
602 .body(())
603 .unwrap();
604
605 let request2 = hyper::Request::builder()
606 .header("content-type", "Application/Json;charset=UTF-8")
607 .body(())
608 .unwrap();
609
610 assert_eq!(
611 request.headers().get("content-type").unwrap(),
612 &"Application/Json; charset=UTF-8"
613 );
614
615 assert_eq!(
616 RpcHandler::<(), Noop>::is_json(request.headers().get("content-type")),
617 true
618 );
619 assert_eq!(
620 RpcHandler::<(), Noop>::is_json(request2.headers().get("content-type")),
621 true
622 );
623 }
624}