1use Rpc;
2
3use std::{fmt, mem};
4use std::sync::Arc;
5
6use hyper::{self, mime, server, Method};
7use hyper::header::{self, Headers};
8use unicase::Ascii;
9
10use jsonrpc::{Metadata, Middleware, NoopMiddleware};
11use jsonrpc::futures::{Future, Poll, Async, BoxFuture, Stream};
12use response::Response;
13use server_utils::cors;
14
15use {utils, RequestMiddleware, RequestMiddlewareAction, CorsDomains, AllowedHosts};
16
17const APPLICATION_JSON_UTF_8: &str = "application/json; charset=utf-8";
18
19pub struct ServerHandler<M: Metadata = (), S: Middleware<M> = NoopMiddleware> {
21 jsonrpc_handler: Rpc<M, S>,
22 allowed_hosts: AllowedHosts,
23 cors_domains: CorsDomains,
24 middleware: Arc<RequestMiddleware>,
25}
26
27impl<M: Metadata, S: Middleware<M>> ServerHandler<M, S> {
28 pub fn new(
30 jsonrpc_handler: Rpc<M, S>,
31 cors_domains: CorsDomains,
32 allowed_hosts: AllowedHosts,
33 middleware: Arc<RequestMiddleware>,
34 ) -> Self {
35 ServerHandler {
36 jsonrpc_handler: jsonrpc_handler,
37 allowed_hosts: allowed_hosts,
38 cors_domains: cors_domains,
39 middleware: middleware,
40 }
41 }
42}
43
44impl<M: Metadata, S: Middleware<M>> server::Service for ServerHandler<M, S> {
45 type Request = server::Request;
46 type Response = server::Response;
47 type Error = hyper::Error;
48 type Future = Handler<M, S>;
49
50 fn call(&self, request: Self::Request) -> Self::Future {
51 let action = self.middleware.on_request(&request);
52
53 let (should_validate_hosts, should_continue_on_invalid_cors, handler) = match action {
54 RequestMiddlewareAction::Proceed { should_continue_on_invalid_cors }=> (
55 true, should_continue_on_invalid_cors, None
56 ),
57 RequestMiddlewareAction::Respond { should_validate_hosts, handler } => (
58 should_validate_hosts, false, Some(handler)
59 ),
60 };
61
62 if should_validate_hosts && !utils::is_host_allowed(&request, &self.allowed_hosts) {
64 return Handler::Error(Some(Response::host_not_allowed()));
65 }
66
67 if let Some(handler) = handler {
69 return Handler::Middleware(handler);
70 }
71
72 Handler::Rpc(RpcHandler {
73 jsonrpc_handler: self.jsonrpc_handler.clone(),
74 state: RpcHandlerState::ReadingHeaders {
75 request: request,
76 cors_domains: self.cors_domains.clone(),
77 continue_on_invalid_cors: should_continue_on_invalid_cors,
78 },
79 is_options: false,
80 cors_header: cors::CorsHeader::NotRequired,
81 })
82 }
83}
84
85pub enum Handler<M: Metadata, S: Middleware<M>> {
86 Rpc(RpcHandler<M, S>),
87 Error(Option<Response>),
88 Middleware(BoxFuture<server::Response, hyper::Error>),
89}
90
91impl<M: Metadata, S: Middleware<M>> Future for Handler<M, S> {
92 type Item = server::Response;
93 type Error = hyper::Error;
94
95 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
96 match *self {
97 Handler::Rpc(ref mut handler) => handler.poll(),
98 Handler::Middleware(ref mut middleware) => middleware.poll(),
99 Handler::Error(ref mut response) => Ok(Async::Ready(
100 response.take().expect("Response always Some initialy. Returning `Ready` so will never be polled again; qed").into()
101 )),
102 }
103 }
104}
105
106enum RpcPollState<M> {
107 Ready(RpcHandlerState<M>),
108 NotReady(RpcHandlerState<M>),
109}
110
111impl<M> RpcPollState<M> {
112 fn decompose(self) -> (RpcHandlerState<M>, bool) {
113 use self::RpcPollState::*;
114 match self {
115 Ready(handler) => (handler, true),
116 NotReady(handler) => (handler, false),
117 }
118 }
119}
120
121enum RpcHandlerState<M> {
122 ReadingHeaders {
123 request: server::Request,
124 cors_domains: CorsDomains,
125 continue_on_invalid_cors: bool,
126 },
127 ReadingBody {
128 body: hyper::Body,
129 request: Vec<u8>,
130 metadata: M,
131 },
132 Writing(Response),
133 Waiting(BoxFuture<Option<String>, ()>),
134 Done,
135}
136
137impl<M> fmt::Debug for RpcHandlerState<M> {
138 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
139 use self::RpcHandlerState::*;
140
141 match *self {
142 ReadingHeaders {..} => write!(fmt, "ReadingHeaders"),
143 ReadingBody {..} => write!(fmt, "ReadingBody"),
144 Writing(ref res) => write!(fmt, "Writing({:?})", res),
145 Waiting(_) => write!(fmt, "Waiting"),
146 Done => write!(fmt, "Done"),
147 }
148 }
149}
150
151pub struct RpcHandler<M: Metadata, S: Middleware<M>> {
152 jsonrpc_handler: Rpc<M, S>,
153 state: RpcHandlerState<M>,
154 is_options: bool,
155 cors_header: cors::CorsHeader<header::AccessControlAllowOrigin>,
156}
157
158impl<M: Metadata, S: Middleware<M>> Future for RpcHandler<M, S> {
159 type Item = server::Response;
160 type Error = hyper::Error;
161
162 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
163 let new_state = match mem::replace(&mut self.state, RpcHandlerState::Done) {
164 RpcHandlerState::ReadingHeaders { request, cors_domains, continue_on_invalid_cors, } => {
165 self.cors_header = utils::cors_header(&request, &cors_domains);
167 self.is_options = *request.method() == Method::Options;
168 RpcPollState::Ready(self.read_headers(request, continue_on_invalid_cors))
170 },
171 RpcHandlerState::ReadingBody { body, request, metadata, } => {
172 self.process_body(body, request, metadata)?
173 },
174 RpcHandlerState::Waiting(mut waiting) => {
175 match waiting.poll() {
176 Ok(Async::Ready(response)) => {
177 RpcPollState::Ready(RpcHandlerState::Writing(match response {
178 None => Response::ok(String::new()),
180 Some(result) => Response::ok(format!("{}\n", result)),
182 }.into()))
183 },
184 Ok(Async::NotReady) => RpcPollState::NotReady(RpcHandlerState::Waiting(waiting)),
185 Err(_) => RpcPollState::Ready(RpcHandlerState::Writing(Response::internal_error())),
186 }
187 },
188 state => RpcPollState::NotReady(state),
189 };
190
191 let (new_state, is_ready) = new_state.decompose();
192 match new_state {
193 RpcHandlerState::Writing(res) => {
194 let mut response: server::Response = res.into();
195 let cors_header = mem::replace(&mut self.cors_header, cors::CorsHeader::Invalid);
196 Self::set_response_headers(response.headers_mut(), self.is_options, cors_header.into());
197 Ok(Async::Ready(response))
198 },
199 state => {
200 self.state = state;
201 if is_ready {
202 self.poll()
203 } else {
204 Ok(Async::NotReady)
205 }
206 },
207 }
208 }
209}
210
211impl<M: Metadata, S: Middleware<M>> RpcHandler<M, S> {
212 fn read_headers(
213 &self,
214 request: server::Request,
215 continue_on_invalid_cors: bool,
216 ) -> RpcHandlerState<M> {
217 if self.cors_header == cors::CorsHeader::Invalid && !continue_on_invalid_cors {
218 return RpcHandlerState::Writing(Response::invalid_cors());
219 }
220 let metadata = self.jsonrpc_handler.extractor.read_metadata(&request);
222
223 match *request.method() {
225 Method::Post if Self::is_json(request.headers().get::<header::ContentType>()) => {
228 RpcHandlerState::ReadingBody {
229 metadata: metadata,
230 request: Default::default(),
231 body: request.body(),
232 }
233 },
234 Method::Post => {
236 RpcHandlerState::Writing(Response::unsupported_content_type())
237 },
238 Method::Options => {
240 RpcHandlerState::Writing(Response::empty())
241 },
242 _ => {
244 RpcHandlerState::Writing(Response::method_not_allowed())
245 },
246 }
247 }
248
249 fn process_body(
250 &self,
251 mut body: hyper::Body,
252 mut request: Vec<u8>,
253 metadata: M,
254 ) -> Result<RpcPollState<M>, hyper::Error> {
255 loop {
256 match body.poll()? {
257 Async::Ready(Some(chunk)) => {
259 request.extend_from_slice(&*chunk)
260 },
261 Async::Ready(None) => {
262 let content = match ::std::str::from_utf8(&request) {
263 Ok(content) => content,
264 Err(err) => {
265 return Err(hyper::Error::Utf8(err));
267 },
268 };
269
270 return Ok(RpcPollState::Ready(RpcHandlerState::Waiting(
272 self.jsonrpc_handler.handler.handle_request(content, metadata)
273 )));
274 },
275 Async::NotReady => {
276 return Ok(RpcPollState::NotReady(RpcHandlerState::ReadingBody {
277 body: body,
278 request: request,
279 metadata: metadata
280 }));
281 },
282 }
283 }
284 }
285
286 fn set_response_headers(headers: &mut Headers, is_options: bool, cors_header: Option<header::AccessControlAllowOrigin>) {
287 if is_options {
288 headers.set(header::Allow(vec![
289 Method::Options,
290 Method::Post,
291 ]));
292 headers.set(header::Accept(vec![
293 header::qitem(mime::APPLICATION_JSON)
294 ]));
295 }
296
297 if let Some(cors_domain) = cors_header {
298 headers.set(header::AccessControlAllowMethods(vec![
299 Method::Options,
300 Method::Post
301 ]));
302 headers.set(header::AccessControlAllowHeaders(vec![
303 Ascii::new("origin".to_owned()),
304 Ascii::new("content-type".to_owned()),
305 Ascii::new("accept".to_owned()),
306 ]));
307 headers.set(cors_domain);
308 headers.set(header::Vary::Items(vec![
309 Ascii::new("origin".to_owned())
310 ]));
311 }
312 }
313
314 fn is_json(content_type: Option<&header::ContentType>) -> bool {
315 match content_type {
316 Some(&header::ContentType(ref mime))
317 if *mime == mime::APPLICATION_JSON || *mime == APPLICATION_JSON_UTF_8 => true,
318 _ => false
319 }
320 }
321}