finchers_runtime/
endpoint.rs1use bytes::Bytes;
4use futures::Async::*;
5use futures::future::{self, FutureResult};
6use futures::{self, Future};
7use http::header::{self, HeaderValue};
8use http::{Request, Response};
9use std::sync::Arc;
10use std::{fmt, io};
11
12use finchers_core::endpoint::ApplyRequest;
13use finchers_core::input::RequestBody;
14use finchers_core::output::{Responder, ResponseBody};
15use finchers_core::{Endpoint, HttpError, Input, Poll, Task};
16
17use service::{HttpService, NewHttpService, Payload};
18
19impl Payload for ResponseBody {
21 type Data = Bytes;
22 type Error = io::Error;
23
24 fn poll_data(&mut self) -> futures::Poll<Option<Self::Data>, Self::Error> {
25 self.poll_data().into()
26 }
27}
28
29pub struct NewEndpointService<E> {
31 endpoint: Arc<E>,
32 error_handler: ErrorHandler,
33}
34
35impl<E: fmt::Debug> fmt::Debug for NewEndpointService<E> {
36 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
37 f.debug_struct("NewEndpointService")
38 .field("endpoint", &self.endpoint)
39 .finish()
40 }
41}
42
43impl<E> NewEndpointService<E>
44where
45 E: Endpoint,
46 E::Output: Responder,
47{
48 pub fn new(endpoint: E) -> NewEndpointService<E> {
50 NewEndpointService {
51 endpoint: Arc::new(endpoint),
52 error_handler: default_error_handler,
53 }
54 }
55
56 pub fn set_error_handler(&mut self, handler: ErrorHandler) {
58 self.error_handler = handler;
59 }
60}
61
62impl<E> NewHttpService for NewEndpointService<E>
63where
64 E: Endpoint,
65 E::Output: Responder,
66{
67 type RequestBody = RequestBody;
68 type ResponseBody = ResponseBody;
69 type Error = io::Error;
70 type Service = EndpointService<E>;
71 type InitError = io::Error;
72 type Future = FutureResult<Self::Service, Self::InitError>;
73
74 fn new_service(&self) -> Self::Future {
75 future::ok(EndpointService {
76 endpoint: self.endpoint.clone(),
77 error_handler: self.error_handler,
78 _priv: (),
79 })
80 }
81}
82
83pub struct EndpointService<E> {
87 endpoint: Arc<E>,
88 error_handler: ErrorHandler,
89 _priv: (),
90}
91
92impl<E: fmt::Debug> fmt::Debug for EndpointService<E> {
93 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
94 f.debug_struct("EndpointService")
95 .field("endpoint", &self.endpoint)
96 .finish()
97 }
98}
99
100impl<E> HttpService for EndpointService<E>
101where
102 E: Endpoint,
103 E::Output: Responder,
104{
105 type RequestBody = RequestBody;
106 type ResponseBody = ResponseBody;
107 type Error = io::Error;
108 type Future = EndpointServiceFuture<E::Task>;
109
110 fn call(&mut self, request: Request<Self::RequestBody>) -> Self::Future {
111 let (parts, body) = request.into_parts();
112 let input = Input::new(Request::from_parts(parts, ()));
113 let apply = self.endpoint.apply_request(&input, body);
114
115 EndpointServiceFuture {
116 apply,
117 input,
118 error_handler: self.error_handler,
119 }
120 }
121}
122
123#[allow(missing_docs)]
124#[allow(missing_debug_implementations)]
125pub struct EndpointServiceFuture<T> {
126 apply: ApplyRequest<T>,
127 input: Input,
128 error_handler: ErrorHandler,
129}
130
131impl<T> EndpointServiceFuture<T> {
132 fn handle_error(&self, err: &HttpError) -> Response<ResponseBody> {
133 (self.error_handler)(err, &self.input)
134 }
135}
136
137impl<T> Future for EndpointServiceFuture<T>
138where
139 T: Task,
140 T::Output: Responder,
141{
142 type Item = Response<ResponseBody>;
143 type Error = io::Error;
144
145 fn poll(&mut self) -> futures::Poll<Self::Item, Self::Error> {
146 let output = match self.apply.poll_ready(&self.input) {
147 Poll::Pending => return Ok(NotReady),
148 Poll::Ready(output) => output.and_then(|output| output.respond(&self.input)),
149 };
150
151 let mut response = output.unwrap_or_else(|err| self.handle_error(err.as_http_error()));
152
153 if !response.headers().contains_key(header::SERVER) {
154 response.headers_mut().insert(
155 header::SERVER,
156 HeaderValue::from_static(concat!("finchers-runtime/", env!("CARGO_PKG_VERSION"))),
157 );
158 }
159
160 Ok(Ready(response))
161 }
162}
163
164pub type ErrorHandler = fn(&HttpError, &Input) -> Response<ResponseBody>;
166
167fn default_error_handler(err: &HttpError, _: &Input) -> Response<ResponseBody> {
168 let body = err.to_string();
169 let body_len = body.len().to_string();
170
171 let mut response = Response::new(ResponseBody::once(body));
172 *response.status_mut() = err.status_code();
173 response.headers_mut().insert(
174 header::CONTENT_TYPE,
175 HeaderValue::from_static("text/plain; charset=utf-8"),
176 );
177 response.headers_mut().insert(header::CONTENT_LENGTH, unsafe {
178 HeaderValue::from_shared_unchecked(body_len.into())
179 });
180 err.append_headers(response.headers_mut());
181
182 response
183}