actix-web 0.3.3

Actix web framework
use std::{net, time};
use std::rc::Rc;
use futures::Future;
use futures::unsync::oneshot;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Handle;
use net2::TcpStreamExt;

use futures::future;
use native_tls::TlsAcceptor;
use tokio_tls::TlsAcceptorExt;

use futures::future;
use openssl::ssl::SslAcceptor;
use tokio_openssl::SslAcceptorExt;

use actix::*;
use actix::msgs::StopArbiter;

use helpers;
use server::HttpHandler;
use server::channel::HttpChannel;
use server::settings::WorkerSettings;

pub(crate) struct Conn<T> {
    pub io: T,
    pub peer: Option<net::SocketAddr>,
    pub http2: bool,

/// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive.
pub(crate) struct StopWorker {
    pub graceful: Option<time::Duration>,

/// Http worker
/// Worker accepts Socket objects via unbounded channel and start requests processing.
pub(crate) struct Worker<H> where H: HttpHandler + 'static {
    settings: Rc<WorkerSettings<H>>,
    hnd: Handle,
    handler: StreamHandlerType,

impl<H: HttpHandler + 'static> Worker<H> {

    pub(crate) fn new(h: Vec<H>, handler: StreamHandlerType, keep_alive: Option<u64>)
                      -> Worker<H>
        Worker {
            settings: Rc::new(WorkerSettings::new(h, keep_alive)),
            hnd: Arbiter::handle().clone(),
            handler: handler,

    fn update_time(&self, ctx: &mut Context<Self>) {
        ctx.run_later(time::Duration::new(1, 0), |slf, ctx| slf.update_time(ctx));

    fn shutdown_timeout(&self, ctx: &mut Context<Self>,
                        tx: oneshot::Sender<bool>, dur: time::Duration) {
        // sleep for 1 second and then check again
        ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
            let num = slf.settings.num_channels();
            if num == 0 {
                let _ = tx.send(true);
            } else if let Some(d) = dur.checked_sub(time::Duration::new(1, 0)) {
                slf.shutdown_timeout(ctx, tx, d);
            } else {
                info!("Force shutdown http worker, {} connections", num);
                slf.settings.head().traverse::<TcpStream, H>();
                let _ = tx.send(false);

impl<H: 'static> Actor for Worker<H> where H: HttpHandler + 'static {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {

impl<H> Handler<Conn<net::TcpStream>> for Worker<H>
    where H: HttpHandler + 'static,
    type Result = ();

    fn handle(&mut self, msg: Conn<net::TcpStream>, _: &mut Context<Self>)
        if !self.settings.keep_alive_enabled() &&
  , 0))).is_err()
            error!("Can not set socket keep-alive option");
        self.handler.handle(Rc::clone(&self.settings), &self.hnd, msg);

/// `StopWorker` message handler
impl<H> Handler<StopWorker> for Worker<H>
    where H: HttpHandler + 'static,
    type Result = Response<Self, StopWorker>;

    fn handle(&mut self, msg: StopWorker, ctx: &mut Context<Self>) -> Self::Result {
        let num = self.settings.num_channels();
        if num == 0 {
            info!("Shutting down http worker, 0 connections");
        } else if let Some(dur) = msg.graceful {
            info!("Graceful http worker shutdown, {} connections", num);
            let (tx, rx) = oneshot::channel();
            self.shutdown_timeout(ctx, tx, dur);
            Self::async_reply(rx.map_err(|_| ()).actfuture())
        } else {
            info!("Force shutdown http worker, {} connections", num);
            self.settings.head().traverse::<TcpStream, H>();

pub(crate) enum StreamHandlerType {

impl StreamHandlerType {

    fn handle<H: HttpHandler>(&mut self,
                              h: Rc<WorkerSettings<H>>,
                              hnd: &Handle, msg: Conn<net::TcpStream>) {
        match *self {
            StreamHandlerType::Normal => {
                let _ =;
                let io = TcpStream::from_stream(, hnd)
                    .expect("failed to associate TCP stream");

                hnd.spawn(HttpChannel::new(h, io, msg.peer, msg.http2));
            StreamHandlerType::Tls(ref acceptor) => {
                let Conn { io, peer, http2 } = msg;
                let _ = io.set_nodelay(true);
                let io = TcpStream::from_stream(io, hnd)
                    .expect("failed to associate TCP stream");

                    TlsAcceptorExt::accept_async(acceptor, io).then(move |res| {
                        match res {
                            Ok(io) => Arbiter::handle().spawn(
                                HttpChannel::new(h, io, peer, http2)),
                            Err(err) =>
                                trace!("Error during handling tls connection: {}", err),
            StreamHandlerType::Alpn(ref acceptor) => {
                let Conn { io, peer, .. } = msg;
                let _ = io.set_nodelay(true);
                let io = TcpStream::from_stream(io, hnd)
                    .expect("failed to associate TCP stream");

                    SslAcceptorExt::accept_async(acceptor, io).then(move |res| {
                        match res {
                            Ok(io) => {
                                let http2 = if let Some(p) = io.get_ref().ssl().selected_alpn_protocol()
                                    p.len() == 2 && &p == b"h2"
                                } else {
                                Arbiter::handle().spawn(HttpChannel::new(h, io, peer, http2));
                            Err(err) =>
                                trace!("Error during handling tls connection: {}", err),