use std::net::SocketAddr;
use std::sync::Arc;
use futures::{Async, Future, Poll, Stream};
use hyper::{rt, Server as HyperServer};
use hyper::service::{service_fn};
use tokio_io::{AsyncRead, AsyncWrite};
use ::never::Never;
use ::reject::Reject;
use ::reply::{ReplySealed, Reply};
use ::Request;
pub fn serve<S>(service: S) -> Server<S>
where
S: IntoWarpService + 'static,
{
Server {
pipeline: false,
service,
}
}
#[derive(Debug)]
pub struct Server<S> {
pipeline: bool,
service: S,
}
macro_rules! into_service {
($this:ident) => ({
let inner = Arc::new($this.service.into_warp_service());
move || {
let inner = inner.clone();
service_fn(move |req| {
ReplyFuture {
inner: inner.call(req)
}
})
}
});
}
macro_rules! bind_inner {
($this:ident, $addr:expr) => ({
let service = into_service!($this);
let srv = HyperServer::bind(&$addr.into())
.http1_pipeline_flush($this.pipeline)
.serve(service);
let addr = srv.local_addr();
(addr, srv)
});
}
impl<S> Server<S>
where
S: IntoWarpService + 'static,
<<S::Service as WarpService>::Reply as Future>::Item: Reply + Send,
<<S::Service as WarpService>::Reply as Future>::Error: Reject + Send,
{
pub fn run(self, addr: impl Into<SocketAddr> + 'static) {
let (addr, fut) = self.bind_ephemeral(addr);
info!("warp drive engaged: listening on {}", addr);
rt::run(fut);
}
pub fn run_incoming<I>(self, incoming: I)
where
I: Stream + Send + 'static,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
{
let fut = self.serve_incoming(incoming);
info!("warp drive engaged: listening with custom incoming");
rt::run(fut);
}
pub fn bind(self, addr: impl Into<SocketAddr> + 'static) -> impl Future<Item=(), Error=()> + 'static {
let (_, fut) = self.bind_ephemeral(addr);
fut
}
pub fn bind_ephemeral(self, addr: impl Into<SocketAddr> + 'static) -> (SocketAddr, impl Future<Item=(), Error=()> + 'static) {
let (addr, srv) = bind_inner!(self, addr);
(addr, srv.map_err(|e| error!("server error: {}", e)))
}
pub fn bind_with_graceful_shutdown(
self,
addr: impl Into<SocketAddr> + 'static,
signal: impl Future<Item=()> + Send + 'static,
) -> (SocketAddr, impl Future<Item=(), Error=()> + 'static) {
let (addr, srv) = bind_inner!(self, addr);
let fut = srv
.with_graceful_shutdown(signal)
.map_err(|e| error!("server error: {}", e));
(addr, fut)
}
pub fn serve_incoming<I>(self, incoming: I) -> impl Future<Item=(), Error=()> + 'static
where
I: Stream + Send + 'static,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
{
let service = into_service!(self);
HyperServer::builder(incoming)
.http1_pipeline_flush(self.pipeline)
.serve(service)
.map_err(|e| error!("server error: {}", e))
}
#[doc(hidden)]
pub fn unstable_pipeline(mut self) -> Self {
self.pipeline = true;
self
}
}
pub trait IntoWarpService {
type Service: WarpService + Send + Sync + 'static;
fn into_warp_service(self) -> Self::Service;
}
pub trait WarpService {
type Reply: Future + Send;
fn call(&self, req: Request) -> Self::Reply;
}
#[derive(Debug)]
struct ReplyFuture<F> {
inner: F,
}
impl<F> Future for ReplyFuture<F>
where
F: Future,
F::Item: Reply,
F::Error: Reject,
{
type Item = ::reply::Response;
type Error = Never;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll() {
Ok(Async::Ready(ok)) => Ok(Async::Ready(ok.into_response())),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Ok(Async::Ready(err.into_response())),
}
}
}