fibers_http_server 0.3.0

A tiny HTTP/1.1 server framework
Documentation
use crate::dispatcher::Dispatcher;
use crate::handler::{BoxReply, HandleInput, RequestHandlerInstance};
use crate::metrics::ServerMetrics;
use crate::response::ResEncoder;
use crate::server::ServerOptions;
use crate::{Error, Req, Result, Status};
use bytecodec::combinator::MaybeEos;
use bytecodec::io::{BufferedIo, IoDecodeExt, IoEncodeExt};
use bytecodec::{Decode, DecodeExt, Encode};
use fibers::net::TcpStream;
use futures::{Async, Future, Poll};
use httpcodec::{NoBodyDecoder, RequestDecoder};
use slog::Logger;
use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use url::Url;

#[derive(Debug)]
pub struct Connection {
    logger: Logger,
    metrics: ServerMetrics,
    stream: BufferedIo<TcpStream>,
    req_head_decoder: MaybeEos<RequestDecoder<NoBodyDecoder>>,
    dispatcher: Dispatcher,
    is_server_alive: Arc<AtomicBool>,
    base_url: Url,
    phase: Phase,
    do_close: bool,
}
impl Connection {
    pub fn new(
        logger: Logger,
        metrics: ServerMetrics,
        stream: TcpStream,
        dispatcher: Dispatcher,
        is_server_alive: Arc<AtomicBool>,
        options: &ServerOptions,
    ) -> Result<Self> {
        let _ = stream.set_nodelay(true);
        let base_url = format!(
            "http://{}/",
            track!(stream.local_addr().map_err(Error::from))?
        );
        let base_url = track!(Url::parse(&base_url).map_err(Error::from))?;

        metrics.connected_tcp_clients.increment();
        let req_head_decoder =
            RequestDecoder::with_options(NoBodyDecoder, options.decode_options.clone());
        Ok(Connection {
            logger,
            metrics,
            stream: BufferedIo::new(stream, options.read_buffer_size, options.write_buffer_size),
            req_head_decoder: req_head_decoder.maybe_eos(),
            dispatcher,
            is_server_alive,
            base_url,
            phase: Phase::ReadRequestHead,
            do_close: false,
        })
    }

    fn is_closed(&self) -> bool {
        self.stream.is_eos()
            || (self.stream.write_buf_ref().is_empty() && self.phase.is_closed())
            || !self.is_server_alive.load(Ordering::SeqCst)
    }

    fn read_request_head(&mut self) -> Phase {
        let result = self
            .req_head_decoder
            .decode_from_read_buf(self.stream.read_buf_mut())
            .and_then(|()| {
                if self.req_head_decoder.is_idle() {
                    self.req_head_decoder.finish_decoding().map(Some)
                } else {
                    Ok(None)
                }
            });
        match result {
            Err(e) => {
                warn!(
                    self.logger,
                    "Cannot decode the head part of a HTTP request: {}", e
                );
                self.metrics.read_request_head_errors.increment();
                self.do_close = true;
                Phase::WriteResponse(ResEncoder::error(Status::BadRequest))
            }
            Ok(None) => Phase::ReadRequestHead,
            Ok(Some(head)) => match track!(Req::new(head, &self.base_url)) {
                Err(e) => {
                    warn!(
                        self.logger,
                        "Cannot parse the path of a HTTP request: {}", e
                    );
                    self.metrics.parse_request_path_errors.increment();
                    self.do_close = true;
                    Phase::WriteResponse(ResEncoder::error(Status::BadRequest))
                }
                Ok(head) => Phase::DispatchRequest(head),
            },
        }
    }

    fn dispatch_request(&mut self, head: Req<()>) -> Phase {
        match self.dispatcher.dispatch(&head) {
            Err(status) => {
                self.metrics.dispatch_request_errors.increment();
                self.do_close = true;
                Phase::WriteResponse(ResEncoder::error(status))
            }
            Ok(mut handler) => match track!(handler.init(head)) {
                Err(e) => {
                    warn!(self.logger, "Cannot initialize a request handler: {}", e);
                    self.metrics.initialize_handler_errors.increment();
                    self.do_close = true;
                    Phase::WriteResponse(ResEncoder::error(Status::InternalServerError))
                }
                Ok(()) => Phase::HandleRequest(handler),
            },
        }
    }

    fn handle_request(&mut self, mut handler: RequestHandlerInstance) -> Phase {
        match track!(handler.handle_input(self.stream.read_buf_mut())) {
            Err(e) => {
                warn!(
                    self.logger,
                    "Cannot decode the body of a HTTP request: {}", e
                );
                self.metrics.decode_request_body_errors.increment();
                self.do_close = true;
                Phase::WriteResponse(ResEncoder::error(Status::BadRequest))
            }
            Ok(None) => Phase::HandleRequest(handler),
            Ok(Some(reply)) => {
                self.do_close = handler.is_closed();
                Phase::PollReply(reply)
            }
        }
    }

    fn poll_reply(&mut self, mut reply: BoxReply) -> Phase {
        if let Async::Ready(res_encoder) = reply.poll().expect("Never fails") {
            Phase::WriteResponse(res_encoder)
        } else {
            Phase::PollReply(reply)
        }
    }

    fn write_response(&mut self, mut encoder: ResEncoder) -> Result<Phase> {
        track!(encoder.encode_to_write_buf(self.stream.write_buf_mut())).map_err(|e| {
            self.metrics.write_response_errors.increment();
            e
        })?;
        if encoder.is_idle() {
            if self.do_close {
                Ok(Phase::Closed)
            } else {
                Ok(Phase::ReadRequestHead)
            }
        } else {
            Ok(Phase::WriteResponse(encoder))
        }
    }

    fn poll_once(&mut self) -> Result<bool> {
        track!(self.stream.execute_io())?;
        let old = mem::discriminant(&self.phase);
        let next = match self.phase.take() {
            Phase::ReadRequestHead => self.read_request_head(),
            Phase::DispatchRequest(req) => self.dispatch_request(req),
            Phase::HandleRequest(handler) => self.handle_request(handler),
            Phase::PollReply(reply) => self.poll_reply(reply),
            Phase::WriteResponse(res) => track!(self.write_response(res))?,
            Phase::Closed => Phase::Closed,
        };
        self.phase = next;
        let changed = mem::discriminant(&self.phase) != old;
        Ok(changed || !self.stream.would_block())
    }
}
impl Future for Connection {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        while !self.is_closed() {
            match track!(self.poll_once()) {
                Err(e) => {
                    warn!(self.logger, "Connection aborted: {}", e);
                    self.metrics.disconnected_tcp_clients.increment();
                    return Err(());
                }
                Ok(do_continue) => {
                    if !do_continue {
                        if self.is_closed() {
                            break;
                        }
                        return Ok(Async::NotReady);
                    }
                }
            }
        }

        debug!(self.logger, "Connection closed");
        self.metrics.disconnected_tcp_clients.increment();
        Ok(Async::Ready(()))
    }
}

#[derive(Debug)]
enum Phase {
    ReadRequestHead,
    DispatchRequest(Req<()>),
    HandleRequest(RequestHandlerInstance),
    PollReply(BoxReply),
    WriteResponse(ResEncoder),
    Closed,
}
impl Phase {
    fn take(&mut self) -> Self {
        mem::replace(self, Phase::Closed)
    }

    fn is_closed(&self) -> bool {
        if let Phase::Closed = *self {
            true
        } else {
            false
        }
    }
}