1use failure::Fail;
4use futures::{Async, Future, Poll, Stream};
5use http;
6use hyper;
7use hyper::server::{self, Http};
8use slog::{Drain, Level, Logger};
9use std::cell::RefCell;
10use std::net::{IpAddr, SocketAddr};
11use std::sync::Arc;
12use std::time::Instant;
13use structopt::StructOpt;
14use tokio;
15use tokio::net::TcpListener;
16use {slog_async, slog_term};
17
18use finchers_core::input::RequestBody;
19use service::{HttpService, NewHttpService, Payload};
20
21#[allow(missing_docs)]
23#[derive(Debug, Clone, Copy, PartialEq)]
24pub enum Mode {
25 Silent,
26 Normal,
27 Debug,
28}
29
30#[derive(Debug, StructOpt)]
31#[structopt(name = "finchers")]
32struct Cli {
33 #[structopt(short = "h", long = "host", default_value = "127.0.0.1")]
35 host: IpAddr,
36
37 #[structopt(short = "p", long = "port", default_value = "5000")]
39 port: u16,
40
41 #[structopt(short = "s", long = "silent")]
43 silent: bool,
44
45 #[structopt(short = "d", long = "debug")]
47 debug: bool,
48}
49
50#[derive(Debug)]
52pub struct Config {
53 addr: SocketAddr,
54 mode: Mode,
55 cli: Option<Cli>,
56}
57
58impl Default for Config {
59 fn default() -> Config {
60 Config {
61 addr: ([127, 0, 0, 1], 5000).into(),
62 mode: Mode::Normal,
63 cli: None,
64 }
65 }
66}
67
68impl Config {
69 pub fn from_env() -> Config {
71 let mut config = Config::default();
72 config.overwite_cli(Cli::from_args());
73 config
74 }
75
76 fn overwite_cli(&mut self, cli: Cli) {
77 self.addr = (cli.host, cli.port).into();
78 if cli.silent {
79 self.mode = Mode::Silent;
80 }
81 if cli.debug {
82 self.mode = Mode::Debug;
83 }
84 self.cli = Some(cli);
85 }
86
87 #[allow(missing_docs)]
88 pub fn addr(&self) -> SocketAddr {
89 self.addr
90 }
91
92 #[allow(missing_docs)]
93 pub fn mode(&self) -> Mode {
94 self.mode
95 }
96
97 pub fn logger(&self) -> Logger {
99 let level = match self.mode {
100 Mode::Silent => Level::Error,
101 Mode::Normal => Level::Info,
102 Mode::Debug => Level::Debug,
103 };
104 let drain = slog_term::term_full()
105 .filter(move |record| record.level() <= level)
106 .fuse();
107 let async_drain = slog_async::Async::new(drain).build().fuse();
108
109 Logger::root(async_drain, o!())
110 }
111}
112
113#[derive(Debug)]
115pub struct Server<S> {
116 new_service: S,
117 config: Config,
118}
119
120impl<S> Server<S>
121where
122 S: NewHttpService<RequestBody = RequestBody> + Send + Sync + 'static,
123 S::ResponseBody: Payload + Send + 'static,
124 <S::ResponseBody as Payload>::Data: Send,
125 <S::ResponseBody as Payload>::Error: Into<hyper::Error>,
126 S::Service: Send + 'static,
127 S::Future: Send + 'static,
128 S::Error: Into<hyper::Error>,
129 <S::Service as HttpService>::Future: Send + 'static,
130 S::InitError: Fail,
131{
132 pub fn new(new_service: S, config: Config) -> Server<S> {
134 Server { new_service, config }
135 }
136
137 #[inline]
139 pub fn launch(self) {
140 let Server { new_service, config } = self;
141 let new_service = Arc::new(new_service);
142
143 let logger = config.logger();
144 info!(logger, "Listening on {}", config.addr());
145
146 let listener = match TcpListener::bind(&config.addr()) {
147 Ok(listener) => listener,
148 Err(err) => {
149 crit!(logger, "Failed to create TcpListener: {}", err);
150 ::std::process::exit(1);
151 }
152 };
153 let incoming = listener.incoming().map_err({
154 let logger = logger.clone();
155 move |err| trace!(logger, "failed to accept a TCP connection: {}", err)
156 });
157
158 let server = incoming.for_each(move |stream| {
159 let logger = logger.new(o! {
160 "ip_addr" => stream.peer_addr()
161 .map(|addr| addr.to_string())
162 .unwrap_or_else(|_| "<error>".into()),
163 });
164
165 new_service
166 .new_service()
167 .map_err({
168 let logger = logger.clone();
169 move |e| error!(logger, "failed to create a new service: {}", e)
170 })
171 .and_then(move |service| {
172 let wrapped_service = WrappedService {
173 service: RefCell::new(service),
174 logger: logger.clone(),
175 };
176 let protocol = Http::<<S::ResponseBody as Payload>::Data>::new();
177 let conn = protocol.serve_connection(stream, wrapped_service);
178
179 conn.map_err(move |e| error!(logger, "during serving a connection: {}", e))
180 })
181 });
182
183 tokio::run(server);
184 }
185}
186
187scoped_thread_local!(static LOGGER: Logger);
188
189pub fn with_logger<F, R>(f: F) -> R
191where
192 F: FnOnce(&Logger) -> R,
193{
194 LOGGER.with(|logger| f(logger))
195}
196
197#[derive(Debug)]
198struct WrappedService<S> {
199 service: RefCell<S>,
201 logger: Logger,
202}
203
204impl<S> server::Service for WrappedService<S>
205where
206 S: HttpService<RequestBody = RequestBody>,
207 S::Error: Into<hyper::Error> + 'static,
208 S::ResponseBody: Payload,
209 S::Future: Send + 'static,
210{
211 type Request = hyper::Request;
212 type Response = hyper::Response<WrappedBody<S::ResponseBody>>;
213 type Error = hyper::Error;
214 type Future = WrappedServiceFuture<S::Future>;
215
216 fn call(&self, request: Self::Request) -> Self::Future {
217 let request = http::Request::from(request).map(RequestBody::from_hyp);
218 let logger = self.logger.new(o!{
219 "method" => request.method().to_string(),
220 "path" => request.uri().path().to_owned(),
221 });
222
223 let start = Instant::now();
224 let future = LOGGER.set(&logger, || self.service.borrow_mut().call(request));
225 WrappedServiceFuture { future, logger, start }
226 }
227}
228
229#[allow(missing_debug_implementations)]
230struct WrappedServiceFuture<F> {
231 future: F,
232 logger: Logger,
233 start: Instant,
234}
235
236impl<F, Bd> Future for WrappedServiceFuture<F>
237where
238 F: Future<Item = http::Response<Bd>>,
239 Bd: Payload,
240 F::Error: Into<hyper::Error> + 'static,
241{
242 type Item = hyper::Response<WrappedBody<Bd>>;
243 type Error = hyper::Error;
244
245 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
246 let response = {
247 let future = &mut self.future;
248 try_ready!(LOGGER.set(&self.logger, || future.poll().map_err(Into::<hyper::Error>::into)))
249 };
250 let end = Instant::now();
251 let duration = end - self.start;
252 let duration_msec = duration.as_secs() * 10 + duration.subsec_nanos() as u64 / 1_000_000;
253 info!(self.logger, "{} ({} ms)", response.status(), duration_msec);
254 Ok(Async::Ready(hyper::Response::from(response.map(WrappedBody))))
255 }
256}
257
258#[derive(Debug)]
259struct WrappedBody<Bd>(Bd);
260
261impl<Bd: Payload> Stream for WrappedBody<Bd>
262where
263 Bd::Error: Into<hyper::Error>,
264{
265 type Item = Bd::Data;
266 type Error = hyper::Error;
267
268 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
269 self.0.poll_data().map_err(Into::into)
270 }
271}