finchers_runtime/
endpoint.rs

1//! The components to construct an asynchronous HTTP service from the `Endpoint`.
2
3use bytes::Bytes;
4use futures::Async::*;
5use futures::future::{self, FutureResult};
6use futures::{self, Future};
7use http::header::{self, HeaderValue};
8use http::{Request, Response};
9use std::sync::Arc;
10use std::{fmt, io};
11
12use finchers_core::endpoint::ApplyRequest;
13use finchers_core::input::RequestBody;
14use finchers_core::output::{Responder, ResponseBody};
15use finchers_core::{Endpoint, HttpError, Input, Poll, Task};
16
17use service::{HttpService, NewHttpService, Payload};
18
19// FIXME: move the implementation to finchers-core after replacing `Payload` with `hyper::Payload`.
20impl Payload for ResponseBody {
21    type Data = Bytes;
22    type Error = io::Error;
23
24    fn poll_data(&mut self) -> futures::Poll<Option<Self::Data>, Self::Error> {
25        self.poll_data().into()
26    }
27}
28
29/// A factory of HTTP service which wraps an `Endpoint`.
30pub struct NewEndpointService<E> {
31    endpoint: Arc<E>,
32    error_handler: ErrorHandler,
33}
34
35impl<E: fmt::Debug> fmt::Debug for NewEndpointService<E> {
36    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
37        f.debug_struct("NewEndpointService")
38            .field("endpoint", &self.endpoint)
39            .finish()
40    }
41}
42
43impl<E> NewEndpointService<E>
44where
45    E: Endpoint,
46    E::Output: Responder,
47{
48    /// Create a new `NewEndpointService` from an endpoint.
49    pub fn new(endpoint: E) -> NewEndpointService<E> {
50        NewEndpointService {
51            endpoint: Arc::new(endpoint),
52            error_handler: default_error_handler,
53        }
54    }
55
56    /// Set the error handler used in this service.
57    pub fn set_error_handler(&mut self, handler: ErrorHandler) {
58        self.error_handler = handler;
59    }
60}
61
62impl<E> NewHttpService for NewEndpointService<E>
63where
64    E: Endpoint,
65    E::Output: Responder,
66{
67    type RequestBody = RequestBody;
68    type ResponseBody = ResponseBody;
69    type Error = io::Error;
70    type Service = EndpointService<E>;
71    type InitError = io::Error;
72    type Future = FutureResult<Self::Service, Self::InitError>;
73
74    fn new_service(&self) -> Self::Future {
75        future::ok(EndpointService {
76            endpoint: self.endpoint.clone(),
77            error_handler: self.error_handler,
78            _priv: (),
79        })
80    }
81}
82
83/// An asynchronous HTTP service which holds an `Endpoint`.
84///
85/// The value of this type is generated by `NewEndpointService`.
86pub struct EndpointService<E> {
87    endpoint: Arc<E>,
88    error_handler: ErrorHandler,
89    _priv: (),
90}
91
92impl<E: fmt::Debug> fmt::Debug for EndpointService<E> {
93    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
94        f.debug_struct("EndpointService")
95            .field("endpoint", &self.endpoint)
96            .finish()
97    }
98}
99
100impl<E> HttpService for EndpointService<E>
101where
102    E: Endpoint,
103    E::Output: Responder,
104{
105    type RequestBody = RequestBody;
106    type ResponseBody = ResponseBody;
107    type Error = io::Error;
108    type Future = EndpointServiceFuture<E::Task>;
109
110    fn call(&mut self, request: Request<Self::RequestBody>) -> Self::Future {
111        let (parts, body) = request.into_parts();
112        let input = Input::new(Request::from_parts(parts, ()));
113        let apply = self.endpoint.apply_request(&input, body);
114
115        EndpointServiceFuture {
116            apply,
117            input,
118            error_handler: self.error_handler,
119        }
120    }
121}
122
123#[allow(missing_docs)]
124#[allow(missing_debug_implementations)]
125pub struct EndpointServiceFuture<T> {
126    apply: ApplyRequest<T>,
127    input: Input,
128    error_handler: ErrorHandler,
129}
130
131impl<T> EndpointServiceFuture<T> {
132    fn handle_error(&self, err: &HttpError) -> Response<ResponseBody> {
133        (self.error_handler)(err, &self.input)
134    }
135}
136
137impl<T> Future for EndpointServiceFuture<T>
138where
139    T: Task,
140    T::Output: Responder,
141{
142    type Item = Response<ResponseBody>;
143    type Error = io::Error;
144
145    fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
146        let output = match self.apply.poll_ready(&self.input) {
147            Poll::Pending => return Ok(NotReady),
148            Poll::Ready(output) => output.and_then(|output| output.respond(&self.input)),
149        };
150
151        let mut response = output.unwrap_or_else(|err| self.handle_error(err.as_http_error()));
152
153        if !response.headers().contains_key(header::SERVER) {
154            response.headers_mut().insert(
155                header::SERVER,
156                HeaderValue::from_static(concat!("finchers-runtime/", env!("CARGO_PKG_VERSION"))),
157            );
158        }
159
160        Ok(Ready(response))
161    }
162}
163
164/// A type alias of the error handler used by `EndpointService`.
165pub type ErrorHandler = fn(&HttpError, &Input) -> Response<ResponseBody>;
166
167fn default_error_handler(err: &HttpError, _: &Input) -> Response<ResponseBody> {
168    let body = err.to_string();
169    let body_len = body.len().to_string();
170
171    let mut response = Response::new(ResponseBody::once(body));
172    *response.status_mut() = err.status_code();
173    response.headers_mut().insert(
174        header::CONTENT_TYPE,
175        HeaderValue::from_static("text/plain; charset=utf-8"),
176    );
177    response.headers_mut().insert(header::CONTENT_LENGTH, unsafe {
178        HeaderValue::from_shared_unchecked(body_len.into())
179    });
180    err.append_headers(response.headers_mut());
181
182    response
183}