#[cfg(feature = "compat")]
pub mod compat;
mod service;
use std::cell::RefCell;
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::rc::{Rc, Weak};
use std::time::Duration;
use futures::task::{self, Task};
use futures::future::{self};
use futures::{Future, Stream, Poll, Async};
#[cfg(feature = "compat")]
use http;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::{Core, Handle, Timeout};
use tokio::net::TcpListener;
pub use tokio_service::{NewService, Service};
use proto;
#[cfg(feature = "compat")]
use proto::Body;
use self::hyper_service::HyperService;
pub use proto::response::Response;
pub use proto::request::Request;
feat_server_proto! {
mod server_proto;
pub use self::server_proto::{
__ProtoRequest,
__ProtoResponse,
__ProtoTransport,
__ProtoBindTransport,
};
}
pub use self::service::{const_service, service_fn};
pub struct Http<B = ::Chunk> {
keep_alive: bool,
pipeline: bool,
_marker: PhantomData<B>,
}
pub struct Server<S, B>
where B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
protocol: Http<B::Item>,
new_service: S,
reactor: Core,
listener: TcpListener,
shutdown_timeout: Duration,
}
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct Serve<I, S> {
incoming: I,
new_service: S,
protocol: Http,
}
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct AddrIncoming {
addr: SocketAddr,
listener: TcpListener,
}
#[must_use = "futures do nothing unless polled"]
pub struct Connection<I, S>
where
S: HyperService,
S::ResponseBody: Stream<Error=::Error>,
<S::ResponseBody as Stream>::Item: AsRef<[u8]>,
{
conn: proto::dispatch::Dispatcher<
proto::dispatch::Server<S>,
S::ResponseBody,
I,
<S::ResponseBody as Stream>::Item,
proto::ServerTransaction,
proto::KA,
>,
}
impl<B: AsRef<[u8]> + 'static> Http<B> {
pub fn new() -> Http<B> {
Http {
keep_alive: true,
pipeline: false,
_marker: PhantomData,
}
}
pub fn keep_alive(&mut self, val: bool) -> &mut Self {
self.keep_alive = val;
self
}
pub fn pipeline(&mut self, enabled: bool) -> &mut Self {
self.pipeline = enabled;
self
}
pub fn bind<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Server<S, Bd>>
where S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error> + 'static,
Bd: Stream<Item=B, Error=::Error>,
{
let core = try!(Core::new());
let handle = core.handle();
let listener = try!(TcpListener::bind(addr, &handle));
Ok(Server {
new_service: new_service,
reactor: core,
listener: listener,
protocol: self.clone(),
shutdown_timeout: Duration::new(1, 0),
})
}
#[cfg(feature = "compat")]
pub fn bind_compat<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Server<compat::NewCompatService<S>, Bd>>
where S: NewService<Request = http::Request<Body>, Response = http::Response<Bd>, Error = ::Error> +
Send + Sync + 'static,
Bd: Stream<Item=B, Error=::Error>,
{
self.bind(addr, self::compat::new_service(new_service))
}
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
where S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error>,
Bd: Stream<Item=B, Error=::Error>,
{
let listener = TcpListener::bind(addr, &handle)?;
let incoming = AddrIncoming {
addr: listener.local_addr()?,
listener: listener,
};
Ok(self.serve_incoming(incoming, new_service))
}
pub fn serve_incoming<I, S, Bd>(&self, incoming: I, new_service: S) -> Serve<I, S>
where I: Stream<Error=::std::io::Error>,
I::Item: AsyncRead + AsyncWrite,
S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error>,
Bd: Stream<Item=B, Error=::Error>,
{
Serve {
incoming: incoming,
new_service: new_service,
protocol: Http {
keep_alive: self.keep_alive,
pipeline: self.pipeline,
_marker: PhantomData,
},
}
}
pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S>
where S: Service<Request = Request, Response = Response<Bd>, Error = ::Error>,
Bd: Stream<Error=::Error>,
Bd::Item: AsRef<[u8]>,
I: AsyncRead + AsyncWrite,
{
let ka = if self.keep_alive {
proto::KA::Busy
} else {
proto::KA::Disabled
};
let mut conn = proto::Conn::new(io, ka);
conn.set_flush_pipeline(self.pipeline);
Connection {
conn: proto::dispatch::Dispatcher::new(proto::dispatch::Server::new(service), conn),
}
}
}
impl<B> Clone for Http<B> {
fn clone(&self) -> Http<B> {
Http {
..*self
}
}
}
impl<B> fmt::Debug for Http<B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Http")
.field("keep_alive", &self.keep_alive)
.field("pipeline", &self.pipeline)
.finish()
}
}
impl<S, B> Server<S, B>
where S: NewService<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
pub fn local_addr(&self) -> ::Result<SocketAddr> {
Ok(try!(self.listener.local_addr()))
}
pub fn handle(&self) -> Handle {
self.reactor.handle()
}
pub fn shutdown_timeout(&mut self, timeout: Duration) -> &mut Self {
self.shutdown_timeout = timeout;
self
}
#[doc(hidden)]
#[deprecated(since="0.11.11", note="no_proto is always enabled")]
pub fn no_proto(&mut self) -> &mut Self {
self
}
pub fn run(self) -> ::Result<()> {
self.run_until(future::empty())
}
pub fn run_until<F>(self, shutdown_signal: F) -> ::Result<()>
where F: Future<Item = (), Error = ()>,
{
let Server { protocol, new_service, mut reactor, listener, shutdown_timeout } = self;
let handle = reactor.handle();
let info = Rc::new(RefCell::new(Info {
active: 0,
blocker: None,
}));
let srv = listener.incoming().for_each(|(socket, addr)| {
let addr_service = SocketAddrService::new(addr, new_service.new_service()?);
let s = NotifyService {
inner: addr_service,
info: Rc::downgrade(&info),
};
info.borrow_mut().active += 1;
let fut = protocol.serve_connection(socket, s)
.map(|_| ())
.map_err(move |err| error!("server connection error: ({}) {}", addr, err));
handle.spawn(fut);
Ok(())
});
let shutdown_signal = shutdown_signal.then(|_| Ok(()));
match reactor.run(shutdown_signal.select(srv)) {
Ok(((), _incoming)) => {}
Err((e, _other)) => return Err(e.into()),
}
let timeout = try!(Timeout::new(shutdown_timeout, &handle));
let wait = WaitUntilZero { info: info.clone() };
match reactor.run(wait.select(timeout)) {
Ok(_) => Ok(()),
Err((e, _)) => Err(e.into())
}
}
}
impl<S: fmt::Debug, B: Stream<Error=::Error>> fmt::Debug for Server<S, B>
where B::Item: AsRef<[u8]>
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Server")
.field("reactor", &"...")
.field("listener", &self.listener)
.field("new_service", &self.new_service)
.field("protocol", &self.protocol)
.finish()
}
}
impl<I, S> Serve<I, S> {
#[inline]
pub fn incoming_ref(&self) -> &I {
&self.incoming
}
}
impl<I, S, B> Stream for Serve<I, S>
where
I: Stream<Error=io::Error>,
I::Item: AsyncRead + AsyncWrite,
S: NewService<Request=Request, Response=Response<B>, Error=::Error>,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
type Item = Connection<I::Item, S::Instance>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(io) = try_ready!(self.incoming.poll()) {
let service = self.new_service.new_service()?;
Ok(Async::Ready(Some(self.protocol.serve_connection(io, service))))
} else {
Ok(Async::Ready(None))
}
}
}
impl<I, B, S> Future for Connection<I, S>
where S: Service<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
I: AsyncRead + AsyncWrite + 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
type Item = self::unnameable::Opaque;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
try_ready!(self.conn.poll());
Ok(self::unnameable::opaque().into())
}
}
impl<I, S> fmt::Debug for Connection<I, S>
where
S: HyperService,
S::ResponseBody: Stream<Error=::Error>,
<S::ResponseBody as Stream>::Item: AsRef<[u8]>,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Connection")
.finish()
}
}
impl<I, B, S> Connection<I, S>
where S: Service<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
I: AsyncRead + AsyncWrite + 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
pub fn disable_keep_alive(&mut self) {
self.conn.disable_keep_alive()
}
}
mod unnameable {
#[allow(missing_debug_implementations)]
pub struct Opaque {
_inner: (),
}
pub fn opaque() -> Opaque {
Opaque {
_inner: (),
}
}
}
impl AddrIncoming {
pub fn local_addr(&self) -> SocketAddr {
self.addr
}
}
impl Stream for AddrIncoming {
type Item = self::addr_stream::AddrStream;
type Error = ::std::io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
match self.listener.accept() {
Ok((socket, _addr)) => {
return Ok(Async::Ready(Some(self::addr_stream::new(socket))));
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
Err(e) => debug!("internal error: {:?}", e),
}
}
}
}
mod addr_stream {
use std::io::{self, Read, Write};
use bytes::{Buf, BufMut};
use futures::Poll;
use tokio::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};
pub fn new(tcp: TcpStream) -> AddrStream {
AddrStream {
inner: tcp,
}
}
#[derive(Debug)]
pub struct AddrStream {
inner: TcpStream,
}
impl Read for AddrStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
impl Write for AddrStream {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}
#[inline]
fn flush(&mut self ) -> io::Result<()> {
self.inner.flush()
}
}
impl AsyncRead for AddrStream {
#[inline]
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}
#[inline]
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.read_buf(buf)
}
}
impl AsyncWrite for AddrStream {
#[inline]
fn shutdown(&mut self) -> Poll<(), io::Error> {
AsyncWrite::shutdown(&mut self.inner)
}
#[inline]
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.write_buf(buf)
}
}
}
struct SocketAddrService<S> {
addr: SocketAddr,
inner: S,
}
impl<S> SocketAddrService<S> {
fn new(addr: SocketAddr, service: S) -> SocketAddrService<S> {
SocketAddrService {
addr: addr,
inner: service,
}
}
}
impl<S> Service for SocketAddrService<S>
where
S: Service<Request=Request>,
{
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn call(&self, mut req: Self::Request) -> Self::Future {
proto::request::addr(&mut req, self.addr);
self.inner.call(req)
}
}
struct NotifyService<S> {
inner: S,
info: Weak<RefCell<Info>>,
}
struct WaitUntilZero {
info: Rc<RefCell<Info>>,
}
struct Info {
active: usize,
blocker: Option<Task>,
}
impl<S: Service> Service for NotifyService<S> {
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn call(&self, message: Self::Request) -> Self::Future {
self.inner.call(message)
}
}
impl<S> Drop for NotifyService<S> {
fn drop(&mut self) {
let info = match self.info.upgrade() {
Some(info) => info,
None => return,
};
let mut info = info.borrow_mut();
info.active -= 1;
if info.active == 0 {
if let Some(task) = info.blocker.take() {
task.notify();
}
}
}
}
impl Future for WaitUntilZero {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
let mut info = self.info.borrow_mut();
if info.active == 0 {
Ok(().into())
} else {
info.blocker = Some(task::current());
Ok(Async::NotReady)
}
}
}
mod hyper_service {
use super::{Request, Response, Service, Stream};
pub trait HyperService: Service + Sealed {
#[doc(hidden)]
type ResponseBody;
#[doc(hidden)]
type Sealed: Sealed2;
}
pub trait Sealed {}
pub trait Sealed2 {}
#[allow(missing_debug_implementations)]
pub struct Opaque {
_inner: (),
}
impl Sealed2 for Opaque {}
impl<S, B> Sealed for S
where
S: Service<
Request=Request,
Response=Response<B>,
Error=::Error,
>,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{}
impl<S, B> HyperService for S
where
S: Service<
Request=Request,
Response=Response<B>,
Error=::Error,
>,
S: Sealed,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
type ResponseBody = B;
type Sealed = Opaque;
}
}