1use Rpc;
2
3use std::{fmt, mem};
4use std::sync::Arc;
5
6use hyper::{self, mime, server, Method};
7use hyper::header::{self, Headers};
8use unicase::Ascii;
9
10use jsonrpc::{self as core, FutureResult, Metadata, Middleware, NoopMiddleware};
11use jsonrpc::futures::{Future, Poll, Async, Stream, future};
12use jsonrpc::serde_json;
13use response::Response;
14use server_utils::cors;
15
16use {utils, RequestMiddleware, RequestMiddlewareAction, CorsDomains, AllowedHosts, RestApi};
17
18pub struct ServerHandler<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
20 rs_jsonrpc_handler: Rpc<M, S>,
21 allowed_hosts: AllowedHosts,
22 cors_domains: CorsDomains,
23 middleware: Arc<RequestMiddleware>,
24 rest_api: RestApi,
25}
26
27impl<M: Metadata, S: Middleware<M>> ServerHandler<M, S> {
28 pub fn new(
30 rs_jsonrpc_handler: Rpc<M, S>,
31 cors_domains: CorsDomains,
32 allowed_hosts: AllowedHosts,
33 middleware: Arc<RequestMiddleware>,
34 rest_api: RestApi,
35 ) -> Self {
36 ServerHandler {
37 rs_jsonrpc_handler,
38 allowed_hosts,
39 cors_domains,
40 middleware,
41 rest_api,
42 }
43 }
44}
45
46impl<M: Metadata, S: Middleware<M>> server::Service for ServerHandler<M, S> {
47 type Request = server::Request;
48 type Response = server::Response;
49 type Error = hyper::Error;
50 type Future = Handler<M, S>;
51
52 fn call(&self, request: Self::Request) -> Self::Future {
53 let is_host_allowed = utils::is_host_allowed(&request, &self.allowed_hosts);
54 let action = self.middleware.on_request(request);
55
56 let (should_validate_hosts, should_continue_on_invalid_cors, response) = match action {
57 RequestMiddlewareAction::Proceed { should_continue_on_invalid_cors, request }=> (
58 true, should_continue_on_invalid_cors, Err(request)
59 ),
60 RequestMiddlewareAction::Respond { should_validate_hosts, response } => (
61 should_validate_hosts, false, Ok(response)
62 ),
63 };
64
65 if should_validate_hosts && !is_host_allowed {
67 return Handler::Error(Some(Response::host_not_allowed()));
68 }
69
70 match response {
72 Ok(response) => Handler::Middleware(response),
73 Err(request) => {
74 Handler::Rpc(RpcHandler {
75 rs_jsonrpc_handler: self.rs_jsonrpc_handler.clone(),
76 state: RpcHandlerState::ReadingHeaders {
77 request: request,
78 cors_domains: self.cors_domains.clone(),
79 continue_on_invalid_cors: should_continue_on_invalid_cors,
80 },
81 is_options: false,
82 cors_header: cors::CorsHeader::NotRequired,
83 rest_api: self.rest_api,
84 })
85 }
86 }
87 }
88}
89
90pub enum Handler<M: Metadata, S: Middleware<M>> {
91 Rpc(RpcHandler<M, S>),
92 Error(Option<Response>),
93 Middleware(Box<Future<Item=server::Response, Error=hyper::Error> + Send>),
94}
95
96impl<M: Metadata, S: Middleware<M>> Future for Handler<M, S> {
97 type Item = server::Response;
98 type Error = hyper::Error;
99
100 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
101 match *self {
102 Handler::Rpc(ref mut handler) => handler.poll(),
103 Handler::Middleware(ref mut middleware) => middleware.poll(),
104 Handler::Error(ref mut response) => Ok(Async::Ready(
105 response.take().expect("Response always Some initialy. Returning `Ready` so will never be polled again; qed").into()
106 )),
107 }
108 }
109}
110
111enum RpcPollState<M, F> where
112 F: Future<Item = Option<core::Response>, Error = ()>,
113{
114 Ready(RpcHandlerState<M, F>),
115 NotReady(RpcHandlerState<M, F>),
116}
117
118impl<M, F> RpcPollState<M, F> where
119 F: Future<Item = Option<core::Response>, Error = ()>,
120{
121 fn decompose(self) -> (RpcHandlerState<M, F>, bool) {
122 use self::RpcPollState::*;
123 match self {
124 Ready(handler) => (handler, true),
125 NotReady(handler) => (handler, false),
126 }
127 }
128}
129
130enum RpcHandlerState<M, F> where
131 F: Future<Item = Option<core::Response>, Error = ()>,
132{
133 ReadingHeaders {
134 request: server::Request,
135 cors_domains: CorsDomains,
136 continue_on_invalid_cors: bool,
137 },
138 ReadingBody {
139 body: hyper::Body,
140 uri: Option<hyper::Uri>,
141 request: Vec<u8>,
142 metadata: M,
143 },
144 ProcessRest {
145 uri: hyper::Uri,
146 metadata: M,
147 },
148 Writing(Response),
149 Waiting(FutureResult<F>),
150 Done,
151}
152
153impl<M, F> fmt::Debug for RpcHandlerState<M, F> where
154 F: Future<Item = Option<core::Response>, Error = ()>,
155{
156 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
157 use self::RpcHandlerState::*;
158
159 match *self {
160 ReadingHeaders {..} => write!(fmt, "ReadingHeaders"),
161 ReadingBody {..} => write!(fmt, "ReadingBody"),
162 ProcessRest {..} => write!(fmt, "ProcessRest"),
163 Writing(ref res) => write!(fmt, "Writing({:?})", res),
164 Waiting(_) => write!(fmt, "Waiting"),
165 Done => write!(fmt, "Done"),
166 }
167 }
168}
169
170pub struct RpcHandler<M: Metadata, S: Middleware<M>> {
171 rs_jsonrpc_handler: Rpc<M, S>,
172 state: RpcHandlerState<M, S::Future>,
173 is_options: bool,
174 cors_header: cors::CorsHeader<header::AccessControlAllowOrigin>,
175 rest_api: RestApi,
176}
177
178impl<M: Metadata, S: Middleware<M>> Future for RpcHandler<M, S> {
179 type Item = server::Response;
180 type Error = hyper::Error;
181
182 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
183 let new_state = match mem::replace(&mut self.state, RpcHandlerState::Done) {
184 RpcHandlerState::ReadingHeaders { request, cors_domains, continue_on_invalid_cors, } => {
185 self.cors_header = utils::cors_header(&request, &cors_domains);
187 self.is_options = *request.method() == Method::Options;
188 RpcPollState::Ready(self.read_headers(request, continue_on_invalid_cors))
190 },
191 RpcHandlerState::ReadingBody { body, request, metadata, uri, } => {
192 self.process_body(body, request, uri, metadata)?
193 },
194 RpcHandlerState::ProcessRest { uri, metadata } => {
195 self.process_rest(uri, metadata)?
196 },
197 RpcHandlerState::Waiting(mut waiting) => {
198 match waiting.poll() {
199 Ok(Async::Ready(response)) => {
200 RpcPollState::Ready(RpcHandlerState::Writing(match response {
201 None => Response::ok(String::new()),
203 Some(result) => Response::ok(format!("{}\n", result)),
205 }.into()))
206 },
207 Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::Waiting(waiting)),
208 Err(_) => RpcPollState::Ready(RpcHandlerState::Writing(Response::internal_error())),
209 }
210 },
211 state => RpcPollState::NotReady(state),
212 };
213
214 let (new_state, is_ready) = new_state.decompose();
215 match new_state {
216 RpcHandlerState::Writing(res) => {
217 let mut response: server::Response = res.into();
218 let cors_header = mem::replace(&mut self.cors_header, cors::CorsHeader::Invalid);
219 Self::set_response_headers(response.headers_mut(), self.is_options, cors_header.into());
220 Ok(Async::Ready(response))
221 },
222 state => {
223 self.state = state;
224 if is_ready {
225 self.poll()
226 } else {
227 Ok(Async::NotReady)
228 }
229 },
230 }
231 }
232}
233
234impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S> {
235 fn read_headers(
236 &self,
237 request: server::Request,
238 continue_on_invalid_cors: bool,
239 ) -> RpcHandlerState<M, S::Future> {
240 if self.cors_header == cors::CorsHeader::Invalid && !continue_on_invalid_cors {
241 return RpcHandlerState::Writing(Response::invalid_cors());
242 }
243 let metadata = self.rs_jsonrpc_handler.extractor.read_metadata(&request);
245
246 match *request.method() {
248 Method::Post if Self::is_json(request.headers().get::<header::ContentType>()) => {
251 let uri = if self.rest_api != RestApi::Disabled { Some(request.uri().clone()) } else { None };
252 RpcHandlerState::ReadingBody {
253 metadata,
254 request: Default::default(),
255 uri,
256 body: request.body(),
257 }
258 },
259 Method::Post if self.rest_api == RestApi::Unsecure => {
260 RpcHandlerState::ProcessRest {
261 metadata,
262 uri: request.uri().clone(),
263 }
264 },
265 Method::Post => {
267 RpcHandlerState::Writing(Response::unsupported_content_type())
268 },
269 Method::Options => {
271 RpcHandlerState::Writing(Response::empty())
272 },
273 _ => {
275 RpcHandlerState::Writing(Response::method_not_allowed())
276 },
277 }
278 }
279
280 fn process_rest(
281 &self,
282 uri: hyper::Uri,
283 metadata: M,
284 ) -> Result<RpcPollState<M, S::Future>, hyper::Error> {
285 use self::core::types::{Call, MethodCall, Version, Params, Request, Id, Value};
286
287 let mut it = uri.path().split('/').skip(1);
289
290 let method = it.next().unwrap_or("");
292 let mut params = Vec::new();
293 for param in it {
294 let v = serde_json::from_str(param)
295 .or_else(|_| serde_json::from_str(&format!("\"{}\"", param)))
296 .unwrap_or(Value::Null);
297 params.push(v)
298 }
299
300 let call = Request::Single(Call::MethodCall(MethodCall {
302 jsonrpc: Some(Version::V2),
303 method: method.into(),
304 params: Some(Params::Array(params)),
305 id: Id::Num(1),
306 }));
307
308 return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(
309 future::Either::B(self.rs_jsonrpc_handler.handler.handle_rpc_request(call, metadata))
310 .map(|res| res.map(|x| serde_json::to_string(&x)
311 .expect("Serialization of response is infallible;qed")
312 ))
313 )));
314 }
315
316 fn process_body(
317 &self,
318 mut body: hyper::Body,
319 mut request: Vec<u8>,
320 uri: Option<hyper::Uri>,
321 metadata: M,
322 ) -> Result<RpcPollState<M, S::Future>, hyper::Error> {
323 loop {
324 match body.poll()? {
325 Async::Ready(Some(chunk)) => {
327 request.extend_from_slice(&*chunk)
328 },
329 Async::Ready(None) => {
330 if let (Some(uri), true) = (uri, request.is_empty()) {
331 return Ok(RpcPollState::Ready(RpcHandlerState::ProcessRest {
332 uri,
333 metadata,
334 }));
335 }
336
337 let content = match ::std::str::from_utf8(&request) {
338 Ok(content) => content,
339 Err(err) => {
340 return Err(hyper::Error::Utf8(err));
342 },
343 };
344
345 return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(
347 self.rs_jsonrpc_handler.handler.handle_request(content, metadata)
348 )));
349 },
350 Async::NotReady => {
351 return Ok(RpcPollState::NotReady(RpcHandlerState::ReadingBody {
352 body,
353 request,
354 metadata,
355 uri,
356 }));
357 },
358 }
359 }
360 }
361
362 fn set_response_headers(headers: &mut Headers, is_options: bool, cors_header: Option<header::AccessControlAllowOrigin>) {
363 if is_options {
364 headers.set(header::Allow(vec![
365 Method::Options,
366 Method::Post,
367 ]));
368 headers.set(header::Accept(vec![
369 header::qitem(mime::APPLICATION_JSON)
370 ]));
371 }
372
373 if let Some(cors_domain) = cors_header {
374 headers.set(header::AccessControlAllowMethods(vec![
375 Method::Options,
376 Method::Post
377 ]));
378 headers.set(header::AccessControlAllowHeaders(vec![
379 Ascii::new("origin".to_owned()),
380 Ascii::new("content-type".to_owned()),
381 Ascii::new("accept".to_owned()),
382 ]));
383 headers.set(cors_domain);
384 headers.set(header::Vary::Items(vec![
385 Ascii::new("origin".to_owned())
386 ]));
387 }
388 }
389
390 fn is_json(content_type: Option<&header::ContentType>) -> bool {
391 const APPLICATION_JSON_UTF_8: &str = "application/json; charset=utf-8";
392
393 match content_type {
394 Some(&header::ContentType(ref mime))
395 if *mime == mime::APPLICATION_JSON || *mime == APPLICATION_JSON_UTF_8 => true,
396 _ => false
397 }
398 }
399}