use std::net::SocketAddr;
use fibers::Spawn;
use fibers::sync::{mpsc, oneshot};
use fibers::net::{TcpListener, TcpStream};
use fibers::net::streams::Incoming;
use futures::{self, Async, Future, Poll, Stream};
use futures::future::Either;
use trackable::error::ErrorKindExt;
pub use self::request::{ReadRequest, Request};
pub use self::response::{Response, ResponseBuilder};
use {Error, Result, Status, TransportStream, Version};
use status::RawStatus;
use connection;
mod request;
mod response;
pub trait Server {
type Transport: TransportStream;
type SocketHandler: HandleSocket<Transport = Self::Transport>;
type ConnectionHandler: HandleConnection<Transport = Self::Transport>;
#[allow(unused_variables)]
fn before_listen(&mut self, listener: &mut TcpListener) -> Result<()> {
Ok(())
}
fn create_handlers(&mut self) -> (Self::SocketHandler, Self::ConnectionHandler);
fn start<S>(self, bind_addr: SocketAddr, spawner: S) -> ServerHandle
where
Self: Sized + Send + 'static,
S: Spawn + Clone + Send + 'static,
{
ServerHandle::start(self, bind_addr, spawner)
}
}
pub trait HandleSocket: Sized + Send + 'static {
type Transport: TransportStream;
type Future: Future<Item = Connection<Self::Transport>, Error = Error> + Send + 'static;
fn handle(self, socket: TcpStream) -> Self::Future;
}
pub trait HandleConnection: Sized + Send + 'static {
type Transport: TransportStream;
type Future: Future<Item = (), Error = ()> + Send + 'static;
fn handle(self, connection: Connection<Self::Transport>) -> Self::Future;
#[allow(unused_variables)]
fn on_error(self, client: SocketAddr, error: Error) {}
}
#[derive(Debug)]
pub struct Connection<T> {
inner: connection::Connection<T>,
version: Version,
}
impl<T: TransportStream> Connection<T> {
pub fn new(
stream: T,
min_buffer_size: usize,
max_buffer_size: usize,
max_header_count: usize,
) -> Self {
let inner =
connection::Connection::new(stream, min_buffer_size, max_buffer_size, max_header_count);
Connection {
inner: inner,
version: Version::default(),
}
}
pub fn read_request(self) -> ReadRequest<T> {
ReadRequest::new(self)
}
pub fn build_response<'a, S>(self, status: S) -> ResponseBuilder<T>
where
S: Into<RawStatus<'a>>,
{
response::builder(self, status.into())
}
pub fn into_raw_stream(self) -> T {
self.inner.stream
}
}
impl<T> AsMut<connection::Connection<T>> for Connection<T> {
fn as_mut(&mut self) -> &mut connection::Connection<T> {
&mut self.inner
}
}
#[derive(Debug)]
enum Command {
Stop,
}
#[derive(Debug)]
pub struct ServerHandle {
command_tx: mpsc::Sender<Command>,
monitor: oneshot::Monitor<(), Error>,
}
impl ServerHandle {
fn start<S, T>(mut server: S, bind_addr: SocketAddr, spawner: T) -> ServerHandle
where
S: Server + Send + 'static,
T: Spawn + Clone + Send + 'static,
{
let (command_tx, command_rx) = mpsc::channel();
let future = {
let spawner = spawner.clone();
TcpListener::bind(bind_addr)
.map_err(|e| track!(Error::from(e)))
.and_then(move |mut listener| {
if let Err(e) = server.before_listen(&mut listener) {
Either::A(futures::failed(e))
} else {
let server_loop = ServerLoop {
server: server,
spawner: spawner,
incoming: listener.incoming(),
command_rx: command_rx,
};
Either::B(server_loop)
}
})
};
let monitor = spawner.spawn_monitor(future);
ServerHandle {
monitor: monitor,
command_tx: command_tx,
}
}
pub fn stop(self) -> JoinServer {
let _ = self.command_tx.send(Command::Stop);
JoinServer(self)
}
pub fn join(self) -> JoinServer {
JoinServer(self)
}
}
#[derive(Debug)]
pub struct JoinServer(ServerHandle);
impl Future for JoinServer {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.monitor.poll().map_err(|e| {
e.unwrap_or(
Status::InternalServerError
.cause("HTTP server aborted")
.into(),
)
})
}
}
#[derive(Debug)]
struct ServerLoop<S, T> {
server: S,
spawner: T,
incoming: Incoming,
command_rx: mpsc::Receiver<Command>,
}
impl<S, T> ServerLoop<S, T> {
fn handle_command(&mut self, command: Command) -> Option<Result<()>> {
match command {
Command::Stop => Some(Ok(())),
}
}
}
impl<S, T> Future for ServerLoop<S, T>
where
S: Server,
T: Spawn,
{
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.command_rx.poll().expect("unreachable") {
Async::Ready(None) => return Ok(Async::Ready(())),
Async::Ready(Some(command)) => {
if let Some(shutdown) = self.handle_command(command) {
return shutdown.map(|()| Async::Ready(()));
} else {
continue;
}
}
Async::NotReady => {}
}
match self.incoming.poll().map_err(|e| track!(Error::from(e)))? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => unreachable!(),
Async::Ready(Some((socket, address))) => {
let (socket_handler, connection_handler) = self.server.create_handlers();
self.spawner.spawn(
socket
.map_err(|e| track!(Error::from(e)))
.and_then(move |socket| socket_handler.handle(socket))
.then(move |result| match result {
Err(e) => {
connection_handler.on_error(address, e);
Either::A(futures::failed(()))
}
Ok(connection) => Either::B(connection_handler.handle(connection)),
}),
);
}
}
}
}
}