use std::cell::RefCell;
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::rc::{Rc, Weak};
use std::time::Duration;
use futures::future;
use futures::task::{self, Task};
use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink};
use futures::future::Map;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::{Core, Handle, Timeout};
use tokio::net::TcpListener;
use tokio_proto::BindServer;
use tokio_proto::streaming::Message;
use tokio_proto::streaming::pipeline::{Transport, Frame, ServerProto};
pub use tokio_service::{NewService, Service};
use http;
use http::response;
use http::request;
pub use http::response::Response;
pub use http::request::Request;
pub struct Http<B = ::Chunk> {
keep_alive: bool,
_marker: PhantomData<B>,
}
pub struct Server<S, B>
where B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
protocol: Http<B::Item>,
new_service: S,
core: Core,
listener: TcpListener,
shutdown_timeout: Duration,
}
impl<B: AsRef<[u8]> + 'static> Http<B> {
pub fn new() -> Http<B> {
Http {
keep_alive: true,
_marker: PhantomData,
}
}
pub fn keep_alive(&mut self, val: bool) -> &mut Self {
self.keep_alive = val;
self
}
pub fn bind<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Server<S, Bd>>
where S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error> +
Send + Sync + 'static,
Bd: Stream<Item=B, Error=::Error>,
{
let core = try!(Core::new());
let handle = core.handle();
let listener = try!(TcpListener::bind(addr, &handle));
Ok(Server {
new_service: new_service,
core: core,
listener: listener,
protocol: self.clone(),
shutdown_timeout: Duration::new(1, 0),
})
}
pub fn bind_connection<S, I, Bd>(&self,
handle: &Handle,
io: I,
remote_addr: SocketAddr,
service: S)
where S: Service<Request = Request, Response = Response<Bd>, Error = ::Error> + 'static,
Bd: Stream<Item=B, Error=::Error> + 'static,
I: AsyncRead + AsyncWrite + 'static,
{
self.bind_server(handle, io, HttpService {
inner: service,
remote_addr: remote_addr,
})
}
}
impl<B> Clone for Http<B> {
fn clone(&self) -> Http<B> {
Http {
..*self
}
}
}
impl<B> fmt::Debug for Http<B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Http")
.field("keep_alive", &self.keep_alive)
.finish()
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct __ProtoRequest(http::RequestHead);
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct __ProtoResponse(ResponseHead);
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct __ProtoTransport<T, B>(http::Conn<T, B, http::ServerTransaction>);
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct __ProtoBindTransport<T, B> {
inner: future::FutureResult<http::Conn<T, B, http::ServerTransaction>, io::Error>,
}
impl<T, B> ServerProto<T> for Http<B>
where T: AsyncRead + AsyncWrite + 'static,
B: AsRef<[u8]> + 'static,
{
type Request = __ProtoRequest;
type RequestBody = http::Chunk;
type Response = __ProtoResponse;
type ResponseBody = B;
type Error = ::Error;
type Transport = __ProtoTransport<T, B>;
type BindTransport = __ProtoBindTransport<T, B>;
#[inline]
fn bind_transport(&self, io: T) -> Self::BindTransport {
let ka = if self.keep_alive {
http::KA::Busy
} else {
http::KA::Disabled
};
__ProtoBindTransport {
inner: future::ok(http::Conn::new(io, ka)),
}
}
}
impl<T, B> Sink for __ProtoTransport<T, B>
where T: AsyncRead + AsyncWrite + 'static,
B: AsRef<[u8]> + 'static,
{
type SinkItem = Frame<__ProtoResponse, B, ::Error>;
type SinkError = io::Error;
#[inline]
fn start_send(&mut self, item: Self::SinkItem)
-> StartSend<Self::SinkItem, io::Error> {
let item = match item {
Frame::Message { message, body } => {
Frame::Message { message: message.0, body: body }
}
Frame::Body { chunk } => Frame::Body { chunk: chunk },
Frame::Error { error } => Frame::Error { error: error },
};
match try!(self.0.start_send(item)) {
AsyncSink::Ready => Ok(AsyncSink::Ready),
AsyncSink::NotReady(Frame::Message { message, body }) => {
Ok(AsyncSink::NotReady(Frame::Message {
message: __ProtoResponse(message),
body: body,
}))
}
AsyncSink::NotReady(Frame::Body { chunk }) => {
Ok(AsyncSink::NotReady(Frame::Body { chunk: chunk }))
}
AsyncSink::NotReady(Frame::Error { error }) => {
Ok(AsyncSink::NotReady(Frame::Error { error: error }))
}
}
}
#[inline]
fn poll_complete(&mut self) -> Poll<(), io::Error> {
self.0.poll_complete()
}
#[inline]
fn close(&mut self) -> Poll<(), io::Error> {
self.0.close()
}
}
impl<T, B> Stream for __ProtoTransport<T, B>
where T: AsyncRead + AsyncWrite + 'static,
B: AsRef<[u8]> + 'static,
{
type Item = Frame<__ProtoRequest, http::Chunk, ::Error>;
type Error = io::Error;
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
let item = match try_ready!(self.0.poll()) {
Some(item) => item,
None => return Ok(None.into()),
};
let item = match item {
Frame::Message { message, body } => {
Frame::Message { message: __ProtoRequest(message), body: body }
}
Frame::Body { chunk } => Frame::Body { chunk: chunk },
Frame::Error { error } => Frame::Error { error: error },
};
Ok(Some(item).into())
}
}
impl<T, B> Transport for __ProtoTransport<T, B>
where T: AsyncRead + AsyncWrite + 'static,
B: AsRef<[u8]> + 'static,
{
#[inline]
fn tick(&mut self) {
self.0.tick()
}
#[inline]
fn cancel(&mut self) -> io::Result<()> {
self.0.cancel()
}
}
impl<T, B> Future for __ProtoBindTransport<T, B>
where T: AsyncRead + AsyncWrite + 'static,
{
type Item = __ProtoTransport<T, B>;
type Error = io::Error;
#[inline]
fn poll(&mut self) -> Poll<__ProtoTransport<T, B>, io::Error> {
self.inner.poll().map(|a| a.map(__ProtoTransport))
}
}
impl From<Message<__ProtoRequest, http::TokioBody>> for Request {
#[inline]
fn from(message: Message<__ProtoRequest, http::TokioBody>) -> Request {
let (head, body) = match message {
Message::WithoutBody(head) => (head.0, http::Body::empty()),
Message::WithBody(head, body) => (head.0, body.into()),
};
request::from_wire(None, head, body)
}
}
impl<B> Into<Message<__ProtoResponse, B>> for Response<B> {
#[inline]
fn into(self) -> Message<__ProtoResponse, B> {
let (head, body) = response::split(self);
if let Some(body) = body {
Message::WithBody(__ProtoResponse(head), body.into())
} else {
Message::WithoutBody(__ProtoResponse(head))
}
}
}
struct HttpService<T> {
inner: T,
remote_addr: SocketAddr,
}
type ResponseHead = http::MessageHead<::StatusCode>;
impl<T, B> Service for HttpService<T>
where T: Service<Request=Request, Response=Response<B>, Error=::Error>,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
type Request = Message<__ProtoRequest, http::TokioBody>;
type Response = Message<__ProtoResponse, B>;
type Error = ::Error;
type Future = Map<T::Future, fn(Response<B>) -> Message<__ProtoResponse, B>>;
#[inline]
fn call(&self, message: Self::Request) -> Self::Future {
let (head, body) = match message {
Message::WithoutBody(head) => (head.0, http::Body::empty()),
Message::WithBody(head, body) => (head.0, body.into()),
};
let req = request::from_wire(Some(self.remote_addr), head, body);
self.inner.call(req).map(Into::into)
}
}
impl<S, B> Server<S, B>
where S: NewService<Request = Request, Response = Response<B>, Error = ::Error>
+ Send + Sync + 'static,
B: Stream<Error=::Error> + 'static,
B::Item: AsRef<[u8]>,
{
pub fn local_addr(&self) -> ::Result<SocketAddr> {
Ok(try!(self.listener.local_addr()))
}
pub fn handle(&self) -> Handle {
self.core.handle()
}
pub fn shutdown_timeout(&mut self, timeout: Duration) -> &mut Self {
self.shutdown_timeout = timeout;
self
}
pub fn run(self) -> ::Result<()> {
self.run_until(future::empty())
}
pub fn run_until<F>(self, shutdown_signal: F) -> ::Result<()>
where F: Future<Item = (), Error = ()>,
{
let Server { protocol, new_service, mut core, listener, shutdown_timeout } = self;
let handle = core.handle();
let info = Rc::new(RefCell::new(Info {
active: 0,
blocker: None,
}));
let srv = listener.incoming().for_each(|(socket, addr)| {
let s = NotifyService {
inner: try!(new_service.new_service()),
info: Rc::downgrade(&info),
};
info.borrow_mut().active += 1;
protocol.bind_connection(&handle, socket, addr, s);
Ok(())
});
let shutdown_signal = shutdown_signal.then(|_| Ok(()));
match core.run(shutdown_signal.select(srv)) {
Ok(((), _incoming)) => {}
Err((e, _other)) => return Err(e.into()),
}
let timeout = try!(Timeout::new(shutdown_timeout, &handle));
let wait = WaitUntilZero { info: info.clone() };
match core.run(wait.select(timeout)) {
Ok(_) => Ok(()),
Err((e, _)) => Err(e.into())
}
}
}
impl<S: fmt::Debug, B: Stream<Error=::Error>> fmt::Debug for Server<S, B>
where B::Item: AsRef<[u8]>
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Server")
.field("core", &"...")
.field("listener", &self.listener)
.field("new_service", &self.new_service)
.field("protocol", &self.protocol)
.finish()
}
}
struct NotifyService<S> {
inner: S,
info: Weak<RefCell<Info>>,
}
struct WaitUntilZero {
info: Rc<RefCell<Info>>,
}
struct Info {
active: usize,
blocker: Option<Task>,
}
impl<S: Service> Service for NotifyService<S> {
type Request = S::Request;
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn call(&self, message: Self::Request) -> Self::Future {
self.inner.call(message)
}
}
impl<S> Drop for NotifyService<S> {
fn drop(&mut self) {
let info = match self.info.upgrade() {
Some(info) => info,
None => return,
};
let mut info = info.borrow_mut();
info.active -= 1;
if info.active == 0 {
if let Some(task) = info.blocker.take() {
task.notify();
}
}
}
}
impl Future for WaitUntilZero {
type Item = ();
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
let mut info = self.info.borrow_mut();
if info.active == 0 {
Ok(().into())
} else {
info.blocker = Some(task::current());
Ok(Async::NotReady)
}
}
}