use std::fmt;
#[cfg(feature = "runtime")] use std::net::SocketAddr;
use std::sync::Arc;
#[cfg(feature = "runtime")] use std::time::Duration;
use super::rewind::Rewind;
use bytes::Bytes;
use futures::{Async, Future, Poll, Stream};
use futures::future::{Either, Executor};
use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature = "runtime")] use tokio_reactor::Handle;
use common::Exec;
use proto;
use body::{Body, Payload};
use service::{NewService, Service};
use error::{Kind, Parse};
#[cfg(feature = "runtime")] pub use super::tcp::AddrIncoming;
#[derive(Clone, Debug)]
pub struct Http {
exec: Exec,
http2: bool,
keep_alive: bool,
max_buf_size: Option<usize>,
pipeline_flush: bool,
}
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct Serve<I, S> {
incoming: I,
new_service: S,
protocol: Http,
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Connecting<I, F> {
future: F,
io: Option<I>,
protocol: Http,
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub(super) struct SpawnAll<I, S> {
serve: Serve<I, S>,
}
#[must_use = "futures do nothing unless polled"]
pub struct Connection<T, S>
where
S: Service,
{
pub(super) conn: Option<
Either<
proto::h1::Dispatcher<
proto::h1::dispatch::Server<S>,
S::ResBody,
T,
proto::ServerTransaction,
>,
proto::h2::Server<
Rewind<T>,
S,
S::ResBody,
>,
>>,
}
#[derive(Debug)]
pub struct Parts<T, S> {
pub io: T,
pub read_buf: Bytes,
pub service: S,
_inner: (),
}
impl Http {
pub fn new() -> Http {
Http {
exec: Exec::Default,
http2: false,
keep_alive: true,
max_buf_size: None,
pipeline_flush: false,
}
}
pub fn http2_only(&mut self, val: bool) -> &mut Self {
self.http2 = val;
self
}
pub fn keep_alive(&mut self, val: bool) -> &mut Self {
self.keep_alive = val;
self
}
pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
assert!(
max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
"the max_buf_size cannot be smaller than the minimum that h1 specifies."
);
self.max_buf_size = Some(max);
self
}
pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self {
self.pipeline_flush = enabled;
self
}
pub fn executor<E>(&mut self, exec: E) -> &mut Self
where
E: Executor<Box<Future<Item=(), Error=()> + Send>> + Send + Sync + 'static
{
self.exec = Exec::Executor(Arc::new(exec));
self
}
pub fn serve_connection<S, I, Bd>(&self, io: I, service: S) -> Connection<I, S>
where
S: Service<ReqBody=Body, ResBody=Bd>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Future: Send + 'static,
Bd: Payload,
I: AsyncRead + AsyncWrite,
{
let either = if !self.http2 {
let mut conn = proto::Conn::new(io);
if !self.keep_alive {
conn.disable_keep_alive();
}
conn.set_flush_pipeline(self.pipeline_flush);
if let Some(max) = self.max_buf_size {
conn.set_max_buf_size(max);
}
let sd = proto::h1::dispatch::Server::new(service);
Either::A(proto::h1::Dispatcher::new(sd, conn))
} else {
let rewind_io = Rewind::new(io);
let h2 = proto::h2::Server::new(rewind_io, service, self.exec.clone());
Either::B(h2)
};
Connection {
conn: Some(either),
}
}
#[cfg(feature = "runtime")]
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
where
S: NewService<ReqBody=Body, ResBody=Bd>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
Bd: Payload,
{
let mut incoming = AddrIncoming::new(addr, None)?;
if self.keep_alive {
incoming.set_keepalive(Some(Duration::from_secs(90)));
}
Ok(self.serve_incoming(incoming, new_service))
}
#[cfg(feature = "runtime")]
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrIncoming, S>>
where
S: NewService<ReqBody=Body, ResBody=Bd>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
Bd: Payload,
{
let mut incoming = AddrIncoming::new(addr, Some(handle))?;
if self.keep_alive {
incoming.set_keepalive(Some(Duration::from_secs(90)));
}
Ok(self.serve_incoming(incoming, new_service))
}
pub fn serve_incoming<I, S, Bd>(&self, incoming: I, new_service: S) -> Serve<I, S>
where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite,
S: NewService<ReqBody=Body, ResBody=Bd>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
Bd: Payload,
{
Serve {
incoming: incoming,
new_service: new_service,
protocol: self.clone(),
}
}
}
impl<I, B, S> Connection<I, S>
where
S: Service<ReqBody=Body, ResBody=B> + 'static,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Future: Send,
I: AsyncRead + AsyncWrite + 'static,
B: Payload + 'static,
{
pub fn graceful_shutdown(&mut self) {
match *self.conn.as_mut().unwrap() {
Either::A(ref mut h1) => {
h1.disable_keep_alive();
},
Either::B(ref mut h2) => {
h2.graceful_shutdown();
}
}
}
pub fn into_parts(self) -> Parts<I, S> {
let (io, read_buf, dispatch) = match self.conn.unwrap() {
Either::A(h1) => {
h1.into_inner()
},
Either::B(_h2) => {
panic!("h2 cannot into_inner");
}
};
Parts {
io: io,
read_buf: read_buf,
service: dispatch.into_service(),
_inner: (),
}
}
pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
match *self.conn.as_mut().unwrap() {
Either::A(ref mut h1) => {
try_ready!(h1.poll_without_shutdown());
Ok(().into())
},
Either::B(ref mut h2) => h2.poll(),
}
}
fn try_h2(&mut self) -> Poll<(), ::Error> {
trace!("Trying to upgrade connection to h2");
let conn = self.conn.take();
let (io, read_buf, dispatch) = match conn.unwrap() {
Either::A(h1) => {
h1.into_inner()
},
Either::B(_h2) => {
panic!("h2 cannot into_inner");
}
};
let mut rewind_io = Rewind::new(io);
rewind_io.rewind(read_buf);
let mut h2 = proto::h2::Server::new(rewind_io, dispatch.into_service(), Exec::Default);
let pr = h2.poll();
debug_assert!(self.conn.is_none());
self.conn = Some(Either::B(h2));
pr
}
}
impl<I, B, S> Future for Connection<I, S>
where
S: Service<ReqBody=Body, ResBody=B> + 'static,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Future: Send,
I: AsyncRead + AsyncWrite + 'static,
B: Payload + 'static,
{
type Item = ();
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.conn.poll() {
Ok(x) => Ok(x.map(|o| o.unwrap_or_else(|| ()))),
Err(e) => {
debug!("error polling connection protocol: {}", e);
match *e.kind() {
Kind::Parse(Parse::VersionH2) => self.try_h2(),
_ => Err(e),
}
}
}
}
}
impl<I, S> fmt::Debug for Connection<I, S>
where
S: Service,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Connection")
.finish()
}
}
impl<I, S> Serve<I, S> {
pub(super) fn spawn_all(self) -> SpawnAll<I, S> {
SpawnAll {
serve: self,
}
}
#[inline]
pub fn incoming_ref(&self) -> &I {
&self.incoming
}
#[inline]
pub fn incoming_mut(&mut self) -> &mut I {
&mut self.incoming
}
}
impl<I, S, B> Stream for Serve<I, S>
where
I: Stream,
I::Item: AsyncRead + AsyncWrite,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
S: NewService<ReqBody=Body, ResBody=B>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
<S::Service as Service>::Future: Send + 'static,
B: Payload,
{
type Item = Connecting<I::Item, S::Future>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) {
let new_fut = self.new_service.new_service();
Ok(Async::Ready(Some(Connecting {
future: new_fut,
io: Some(io),
protocol: self.protocol.clone(),
})))
} else {
Ok(Async::Ready(None))
}
}
}
impl<I, F, S, B> Future for Connecting<I, F>
where
I: AsyncRead + AsyncWrite,
F: Future<Item=S>,
S: Service<ReqBody=Body, ResBody=B>,
S::Future: Send + 'static,
B: Payload,
{
type Item = Connection<I, S>;
type Error = F::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let service = try_ready!(self.future.poll());
let io = self.io.take().expect("polled after complete");
Ok(self.protocol.serve_connection(io, service).into())
}
}
#[cfg(feature = "runtime")]
impl<S> SpawnAll<AddrIncoming, S> {
pub(super) fn local_addr(&self) -> SocketAddr {
self.serve.incoming.local_addr()
}
}
impl<I, S> SpawnAll<I, S> {
pub(super) fn incoming_ref(&self) -> &I {
self.serve.incoming_ref()
}
}
impl<I, S, B> Future for SpawnAll<I, S>
where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
S: NewService<ReqBody=Body, ResBody=B> + Send + 'static,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Service: Send,
S::Future: Send + 'static,
<S::Service as Service>::Future: Send + 'static,
B: Payload,
{
type Item = ();
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
if let Some(connecting) = try_ready!(self.serve.poll()) {
let fut = connecting
.map_err(::Error::new_user_new_service)
.and_then(|conn| conn)
.map_err(|err| debug!("conn error: {}", err));
self.serve.protocol.exec.execute(fut);
} else {
return Ok(Async::Ready(()))
}
}
}
}