pub mod conn;
mod shutdown;
#[cfg(feature = "runtime")] mod tcp;
use std::fmt;
#[cfg(feature = "runtime")] use std::net::{SocketAddr, TcpListener as StdTcpListener};
#[cfg(feature = "runtime")] use std::time::Duration;
use futures::{Future, Stream, Poll};
use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature = "runtime")] use tokio_reactor;
use body::{Body, Payload};
use common::exec::{Exec, H2Exec, NewSvcExec};
use service::{MakeServiceRef, Service};
use self::conn::{Http as Http_, NoopWatcher, SpawnAll};
use self::shutdown::{Graceful, GracefulWatcher};
#[cfg(feature = "runtime")] use self::tcp::AddrIncoming;
pub struct Server<I, S, E = Exec> {
spawn_all: SpawnAll<I, S, E>,
}
#[derive(Debug)]
pub struct Builder<I, E = Exec> {
incoming: I,
protocol: Http_<E>,
}
impl<I> Server<I, ()> {
pub fn builder(incoming: I) -> Builder<I> {
Builder {
incoming,
protocol: Http_::new(),
}
}
}
#[cfg(feature = "runtime")]
impl Server<AddrIncoming, ()> {
pub fn bind(addr: &SocketAddr) -> Builder<AddrIncoming> {
let incoming = AddrIncoming::new(addr, None)
.unwrap_or_else(|e| {
panic!("error binding to {}: {}", addr, e);
});
Server::builder(incoming)
}
pub fn try_bind(addr: &SocketAddr) -> ::Result<Builder<AddrIncoming>> {
AddrIncoming::new(addr, None)
.map(Server::builder)
}
pub fn from_tcp(listener: StdTcpListener) -> Result<Builder<AddrIncoming>, ::Error> {
let handle = tokio_reactor::Handle::current();
AddrIncoming::from_std(listener, &handle)
.map(Server::builder)
}
}
#[cfg(feature = "runtime")]
impl<S> Server<AddrIncoming, S> {
pub fn local_addr(&self) -> SocketAddr {
self.spawn_all.local_addr()
}
}
impl<I, S, E, B> Server<I, S, E>
where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Service: 'static,
B: Payload,
E: H2Exec<<S::Service as Service>::Future, B>,
E: NewSvcExec<I::Item, S::Future, S::Service, E, GracefulWatcher>,
{
pub fn with_graceful_shutdown<F>(self, signal: F) -> Graceful<I, S, F, E>
where
F: Future<Item=()>
{
Graceful::new(self.spawn_all, signal)
}
}
impl<I, S, B, E> Future for Server<I, S, E>
where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Service: 'static,
B: Payload,
E: H2Exec<<S::Service as Service>::Future, B>,
E: NewSvcExec<I::Item, S::Future, S::Service, E, NoopWatcher>,
{
type Item = ();
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.spawn_all.poll_watch(&NoopWatcher)
}
}
impl<I: fmt::Debug, S: fmt::Debug> fmt::Debug for Server<I, S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Server")
.field("listener", &self.spawn_all.incoming_ref())
.finish()
}
}
impl<I, E> Builder<I, E> {
pub fn new(incoming: I, protocol: Http_<E>) -> Self {
Builder {
incoming,
protocol,
}
}
pub fn http1_keepalive(mut self, val: bool) -> Self {
self.protocol.keep_alive(val);
self
}
pub fn http1_half_close(mut self, val: bool) -> Self {
self.protocol.http1_half_close(val);
self
}
pub fn http1_only(mut self, val: bool) -> Self {
self.protocol.http1_only(val);
self
}
#[doc(hidden)]
pub fn http1_pipeline_flush(mut self, val: bool) -> Self {
self.protocol.pipeline_flush(val);
self
}
pub fn http1_writev(mut self, val: bool) -> Self {
self.protocol.http1_writev(val);
self
}
pub fn http2_only(mut self, val: bool) -> Self {
self.protocol.http2_only(val);
self
}
pub fn executor<E2>(self, executor: E2) -> Builder<I, E2> {
Builder {
incoming: self.incoming,
protocol: self.protocol.with_executor(executor),
}
}
pub fn serve<S, B>(self, new_service: S) -> Server<I, S, E>
where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Service: 'static,
B: Payload,
E: NewSvcExec<I::Item, S::Future, S::Service, E, NoopWatcher>,
E: H2Exec<<S::Service as Service>::Future, B>,
{
let serve = self.protocol.serve_incoming(self.incoming, new_service);
let spawn_all = serve.spawn_all();
Server {
spawn_all,
}
}
}
#[cfg(feature = "runtime")]
impl<E> Builder<AddrIncoming, E> {
pub fn tcp_keepalive(mut self, keepalive: Option<Duration>) -> Self {
self.incoming.set_keepalive(keepalive);
self
}
pub fn tcp_nodelay(mut self, enabled: bool) -> Self {
self.incoming.set_nodelay(enabled);
self
}
pub fn tcp_sleep_on_accept_errors(mut self, val: bool) -> Self {
self.incoming.set_sleep_on_errors(val);
self
}
}