finchers_runtime/
server.rs

1//! Components for managing HTTP server.
2
3use failure::Fail;
4use futures::{Async, Future, Poll, Stream};
5use http;
6use hyper;
7use hyper::server::{self, Http};
8use slog::{Drain, Level, Logger};
9use std::cell::RefCell;
10use std::net::{IpAddr, SocketAddr};
11use std::sync::Arc;
12use std::time::Instant;
13use structopt::StructOpt;
14use tokio;
15use tokio::net::TcpListener;
16use {slog_async, slog_term};
17
18use finchers_core::input::RequestBody;
19use service::{HttpService, NewHttpService, Payload};
20
21/// All kinds of logging mode of `Server`.
22#[allow(missing_docs)]
23#[derive(Debug, Clone, Copy, PartialEq)]
24pub enum Mode {
25    Silent,
26    Normal,
27    Debug,
28}
29
30#[derive(Debug, StructOpt)]
31#[structopt(name = "finchers")]
32struct Cli {
33    /// The host of listener address.
34    #[structopt(short = "h", long = "host", default_value = "127.0.0.1")]
35    host: IpAddr,
36
37    /// The port of listener address.
38    #[structopt(short = "p", long = "port", default_value = "5000")]
39    port: u16,
40
41    /// Set to silent mode.
42    #[structopt(short = "s", long = "silent")]
43    silent: bool,
44
45    /// Set to debug mode (implies silent=false).
46    #[structopt(short = "d", long = "debug")]
47    debug: bool,
48}
49
50/// A set of configurations used by the runtime.
51#[derive(Debug)]
52pub struct Config {
53    addr: SocketAddr,
54    mode: Mode,
55    cli: Option<Cli>,
56}
57
58impl Default for Config {
59    fn default() -> Config {
60        Config {
61            addr: ([127, 0, 0, 1], 5000).into(),
62            mode: Mode::Normal,
63            cli: None,
64        }
65    }
66}
67
68impl Config {
69    /// Create an instance of "Config" from the environment.
70    pub fn from_env() -> Config {
71        let mut config = Config::default();
72        config.overwite_cli(Cli::from_args());
73        config
74    }
75
76    fn overwite_cli(&mut self, cli: Cli) {
77        self.addr = (cli.host, cli.port).into();
78        if cli.silent {
79            self.mode = Mode::Silent;
80        }
81        if cli.debug {
82            self.mode = Mode::Debug;
83        }
84        self.cli = Some(cli);
85    }
86
87    #[allow(missing_docs)]
88    pub fn addr(&self) -> SocketAddr {
89        self.addr
90    }
91
92    #[allow(missing_docs)]
93    pub fn mode(&self) -> Mode {
94        self.mode
95    }
96
97    /// Create an instance of "Logger" from the current configuration.
98    pub fn logger(&self) -> Logger {
99        let level = match self.mode {
100            Mode::Silent => Level::Error,
101            Mode::Normal => Level::Info,
102            Mode::Debug => Level::Debug,
103        };
104        let drain = slog_term::term_full()
105            .filter(move |record| record.level() <= level)
106            .fuse();
107        let async_drain = slog_async::Async::new(drain).build().fuse();
108
109        Logger::root(async_drain, o!())
110    }
111}
112
113/// A builder for running the HTTP server based on given HTTP service and configuration.
114#[derive(Debug)]
115pub struct Server<S> {
116    new_service: S,
117    config: Config,
118}
119
120impl<S> Server<S>
121where
122    S: NewHttpService<RequestBody = RequestBody> + Send + Sync + 'static,
123    S::ResponseBody: Payload + Send + 'static,
124    <S::ResponseBody as Payload>::Data: Send,
125    <S::ResponseBody as Payload>::Error: Into<hyper::Error>,
126    S::Service: Send + 'static,
127    S::Future: Send + 'static,
128    S::Error: Into<hyper::Error>,
129    <S::Service as HttpService>::Future: Send + 'static,
130    S::InitError: Fail,
131{
132    /// Create a new launcher from given service.
133    pub fn new(new_service: S, config: Config) -> Server<S> {
134        Server { new_service, config }
135    }
136
137    /// Start the HTTP server with given configurations
138    #[inline]
139    pub fn launch(self) {
140        let Server { new_service, config } = self;
141        let new_service = Arc::new(new_service);
142
143        let logger = config.logger();
144        info!(logger, "Listening on {}", config.addr());
145
146        let listener = match TcpListener::bind(&config.addr()) {
147            Ok(listener) => listener,
148            Err(err) => {
149                crit!(logger, "Failed to create TcpListener: {}", err);
150                ::std::process::exit(1);
151            }
152        };
153        let incoming = listener.incoming().map_err({
154            let logger = logger.clone();
155            move |err| trace!(logger, "failed to accept a TCP connection: {}", err)
156        });
157
158        let server = incoming.for_each(move |stream| {
159            let logger = logger.new(o! {
160                "ip_addr" => stream.peer_addr()
161                    .map(|addr| addr.to_string())
162                    .unwrap_or_else(|_| "<error>".into()),
163            });
164
165            new_service
166                .new_service()
167                .map_err({
168                    let logger = logger.clone();
169                    move |e| error!(logger, "failed to create a new service: {}", e)
170                })
171                .and_then(move |service| {
172                    let wrapped_service = WrappedService {
173                        service: RefCell::new(service),
174                        logger: logger.clone(),
175                    };
176                    let protocol = Http::<<S::ResponseBody as Payload>::Data>::new();
177                    let conn = protocol.serve_connection(stream, wrapped_service);
178
179                    conn.map_err(move |e| error!(logger, "during serving a connection: {}", e))
180                })
181        });
182
183        tokio::run(server);
184    }
185}
186
187scoped_thread_local!(static LOGGER: Logger);
188
189/// Execute a closure with the reference to `Logger` associated with the current scope.
190pub fn with_logger<F, R>(f: F) -> R
191where
192    F: FnOnce(&Logger) -> R,
193{
194    LOGGER.with(|logger| f(logger))
195}
196
197#[derive(Debug)]
198struct WrappedService<S> {
199    // FIXME: remove RefCell
200    service: RefCell<S>,
201    logger: Logger,
202}
203
204impl<S> server::Service for WrappedService<S>
205where
206    S: HttpService<RequestBody = RequestBody>,
207    S::Error: Into<hyper::Error> + 'static,
208    S::ResponseBody: Payload,
209    S::Future: Send + 'static,
210{
211    type Request = hyper::Request;
212    type Response = hyper::Response<WrappedBody<S::ResponseBody>>;
213    type Error = hyper::Error;
214    type Future = WrappedServiceFuture<S::Future>;
215
216    fn call(&self, request: Self::Request) -> Self::Future {
217        let request = http::Request::from(request).map(RequestBody::from_hyp);
218        let logger = self.logger.new(o!{
219            "method" => request.method().to_string(),
220            "path" => request.uri().path().to_owned(),
221        });
222
223        let start = Instant::now();
224        let future = LOGGER.set(&logger, || self.service.borrow_mut().call(request));
225        WrappedServiceFuture { future, logger, start }
226    }
227}
228
229#[allow(missing_debug_implementations)]
230struct WrappedServiceFuture<F> {
231    future: F,
232    logger: Logger,
233    start: Instant,
234}
235
236impl<F, Bd> Future for WrappedServiceFuture<F>
237where
238    F: Future<Item = http::Response<Bd>>,
239    Bd: Payload,
240    F::Error: Into<hyper::Error> + 'static,
241{
242    type Item = hyper::Response<WrappedBody<Bd>>;
243    type Error = hyper::Error;
244
245    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
246        let response = {
247            let future = &mut self.future;
248            try_ready!(LOGGER.set(&self.logger, || future.poll().map_err(Into::<hyper::Error>::into)))
249        };
250        let end = Instant::now();
251        let duration = end - self.start;
252        let duration_msec = duration.as_secs() * 10 + duration.subsec_nanos() as u64 / 1_000_000;
253        info!(self.logger, "{} ({} ms)", response.status(), duration_msec);
254        Ok(Async::Ready(hyper::Response::from(response.map(WrappedBody))))
255    }
256}
257
258#[derive(Debug)]
259struct WrappedBody<Bd>(Bd);
260
261impl<Bd: Payload> Stream for WrappedBody<Bd>
262where
263    Bd::Error: Into<hyper::Error>,
264{
265    type Item = Bd::Data;
266    type Error = hyper::Error;
267
268    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
269        self.0.poll_data().map_err(Into::into)
270    }
271}