#![forbid(future_incompatible, rust_2018_idioms)]
#![deny(missing_debug_implementations, nonstandard_style)]
#![warn(missing_docs, missing_doc_code_examples)]
#![cfg_attr(test, deny(warnings))]
use futures::compat::Future01CompatExt;
#[cfg(feature = "runtime")]
use futures::compat::{Compat as Compat03As01, Compat01As03};
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream;
use futures::task::Spawn;
use http_service::{Body, HttpService};
use hyper::server::{Builder as HyperBuilder, Server as HyperServer};
use std::io;
#[cfg(feature = "runtime")]
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{self, Context, Poll};
struct WrapHttpService<H> {
service: Arc<H>,
}
struct WrapConnection<H: HttpService> {
service: Arc<H>,
connection: H::Connection,
}
impl<H, Ctx> hyper::service::MakeService<Ctx> for WrapHttpService<H>
where
H: HttpService,
{
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = std::io::Error;
type Service = WrapConnection<H>;
type Future = Compat03As01<BoxFuture<'static, Result<Self::Service, Self::Error>>>;
type MakeError = std::io::Error;
fn make_service(&mut self, _ctx: Ctx) -> Self::Future {
let service = self.service.clone();
let error = std::io::Error::from(std::io::ErrorKind::Other);
async move {
let connection = service.connect().into_future().await.map_err(|_| error)?;
Ok(WrapConnection {
service,
connection,
})
}
.boxed()
.compat()
}
}
impl<H> hyper::service::Service for WrapConnection<H>
where
H: HttpService,
{
type ReqBody = hyper::Body;
type ResBody = hyper::Body;
type Error = std::io::Error;
type Future =
Compat03As01<BoxFuture<'static, Result<http::Response<hyper::Body>, Self::Error>>>;
fn call(&mut self, req: http::Request<hyper::Body>) -> Self::Future {
let error = std::io::Error::from(std::io::ErrorKind::Other);
let req = req.map(|body| {
let body_stream = Compat01As03::new(body)
.map(|chunk| chunk.map(|chunk| chunk.to_vec()))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e));
let body_reader = body_stream.into_async_read();
Body::from_reader(body_reader)
});
let fut = self.service.respond(&mut self.connection, req);
async move {
let res: http::Response<_> = fut.into_future().await.map_err(|_| error)?;
let (parts, body) = res.into_parts();
let body = hyper::Body::wrap_stream(Compat03As01::new(ChunkStream { body }));
Ok(hyper::Response::from_parts(parts, body))
}
.boxed()
.compat()
}
}
#[allow(clippy::type_complexity)] pub struct Server<I: TryStream, S, Sp> {
inner: Compat01As03<
HyperServer<
Compat03As01<stream::MapOk<I, fn(I::Ok) -> Compat03As01<I::Ok>>>,
WrapHttpService<S>,
Compat03As01<Sp>,
>,
>,
}
impl<I: TryStream, S, Sp> std::fmt::Debug for Server<I, S, Sp> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Server").finish()
}
}
#[allow(clippy::type_complexity)] pub struct Builder<I: TryStream, Sp> {
inner: HyperBuilder<
Compat03As01<stream::MapOk<I, fn(I::Ok) -> Compat03As01<I::Ok>>>,
Compat03As01<Sp>,
>,
}
impl<I: TryStream, Sp> std::fmt::Debug for Builder<I, Sp> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Builder").finish()
}
}
impl<I: TryStream> Server<I, (), ()> {
pub fn builder(incoming: I) -> Builder<I, ()> {
Builder {
inner: HyperServer::builder(Compat03As01::new(incoming.map_ok(Compat03As01::new as _)))
.executor(Compat03As01::new(())),
}
}
}
impl<I: TryStream, Sp> Builder<I, Sp> {
pub fn with_spawner<Sp2>(self, new_spawner: Sp2) -> Builder<I, Sp2> {
Builder {
inner: self.inner.executor(Compat03As01::new(new_spawner)),
}
}
pub fn serve<S: HttpService>(self, service: S) -> Server<I, S, Sp>
where
I: TryStream + Unpin,
I::Ok: AsyncRead + AsyncWrite + Send + Unpin + 'static,
I::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Sp: Clone + Send + 'static,
for<'a> &'a Sp: Spawn,
{
Server {
inner: Compat01As03::new(self.inner.serve(WrapHttpService {
service: Arc::new(service),
})),
}
}
}
impl<I, S, Sp> Future for Server<I, S, Sp>
where
I: TryStream + Unpin,
I::Ok: AsyncRead + AsyncWrite + Send + Unpin + 'static,
I::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
S: HttpService,
Sp: Clone + Send + 'static,
for<'a> &'a Sp: Spawn,
{
type Output = hyper::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<hyper::Result<()>> {
self.inner.poll_unpin(cx)
}
}
#[cfg(feature = "runtime")]
pub fn serve<S: HttpService>(
s: S,
addr: SocketAddr,
) -> impl Future<Output = Result<(), hyper::Error>> {
let service = WrapHttpService {
service: Arc::new(s),
};
hyper::Server::bind(&addr).serve(service).compat()
}
#[cfg(feature = "runtime")]
pub fn run<S: HttpService>(s: S, addr: SocketAddr) {
let server = serve(s, addr).map(|_| Result::<_, ()>::Ok(())).compat();
hyper::rt::run(server);
}
struct ChunkStream<R: AsyncRead> {
body: R,
}
impl<R: AsyncRead + Unpin> futures::Stream for ChunkStream<R> {
type Item = Result<hyper::Chunk, Box<dyn std::error::Error + Send + Sync + 'static>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut buf = vec![0; 1024];
let read = futures::ready!(Pin::new(&mut self.body).poll_read(cx, &mut buf))?;
if read == 0 {
return Poll::Ready(None);
} else {
buf.truncate(read);
let chunk = hyper::Chunk::from(buf);
Poll::Ready(Some(Ok(chunk)))
}
}
}