1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
use std::io::Error as IoError;
use std::net::SocketAddr;

use tokio_core::reactor::{Handle, Core};
use tokio_core::net::TcpListener;

use futures::{Future, Stream, IntoFuture};

use hyper::server::{Http, Request, Response, Service};
use hyper;

use state::Container;

use responder::Responder;
use router::Router;

pub struct Server {
    listener: TcpListener,
    router: Router,
    state: Container,
}

struct S {
    router: Router,
    state: Container,
}

impl Service for S {
    type Request = Request;
    type Response = Response;
    type Error = hyper::Error;
    type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

    fn call(&self, req: Request) -> Self::Future {
        self.serve(req)
    }
}


impl Server {
    pub fn bind(addr: SocketAddr, router: Router, handle: &Handle) -> Server {
        let listener = TcpListener::bind(&addr, handle).expect("unable to listen");
        let state = Container::new();
        Server {
            listener,
            router,
            state,
        }
    }

    pub fn manage_state<T: 'static>(mut self, state: T) -> Server {
        if !self.state.set(state) {
            panic!("double state set");
        }

        self
    }

    pub fn run(self, handle: Handle) -> impl Future<Item = (), Error = IoError> {
        let http = Http::new();
        let router = self.router;
        let state = self.state;

        let service_factory = move || {
            S {
                router: router.clone(),
                state: state.clone(),
            }
        };


        self.listener.incoming().for_each(move |(socket, addr)| {
            http.bind_connection(&handle, socket, addr, service_factory());
            Ok(())
        })
    }

    pub fn start_sync(addr: SocketAddr, router: Router) {
        let mut core = Core::new().unwrap();
        let handle = core.handle();
        let s = Server::bind(addr, router, &handle);

        core.run(s.run(handle)).unwrap();
    }

    pub fn start_sync_with_state<T: 'static>(addr: SocketAddr, router: Router, state: T) {
        let mut core = Core::new().unwrap();
        let handle = core.handle();

        let s = Server::bind(addr, router, &handle).manage_state(state);
        core.run(s.run(handle)).unwrap();
    }
}

impl S {
    fn serve(&self, hreq: Request) -> Box<Future<Item = Response, Error = hyper::Error>> {
        use super::Request as RRequest;

        let f = self.router
            .run(RRequest::new(hreq), self.state.clone())
            .into_future()
            .flatten();

        let f = f.then(|r| match r {
            Ok(r) => Ok(r),
            Err(e) => Ok(e.respond()),
        });

        Box::new(f)
    }
}