http_service_hyper/
lib.rs

1//! `HttpService` server that uses Hyper as backend.
2
3#![forbid(future_incompatible, rust_2018_idioms)]
4#![deny(missing_debug_implementations, nonstandard_style)]
5#![warn(missing_docs, missing_doc_code_examples)]
6#![cfg_attr(test, deny(warnings))]
7
8use futures::compat::Future01CompatExt;
9#[cfg(feature = "runtime")]
10use futures::compat::{Compat as Compat03As01, Compat01As03};
11use futures::future::BoxFuture;
12use futures::prelude::*;
13use futures::stream;
14use futures::task::Spawn;
15use http_service::{Body, HttpService};
16use hyper::server::{Builder as HyperBuilder, Server as HyperServer};
17
18use std::io;
19#[cfg(feature = "runtime")]
20use std::net::SocketAddr;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{self, Context, Poll};
24
25// Wrapper type to allow us to provide a blanket `MakeService` impl
26struct WrapHttpService<H> {
27    service: Arc<H>,
28}
29
30// Wrapper type to allow us to provide a blanket `Service` impl
31struct WrapConnection<H: HttpService> {
32    service: Arc<H>,
33    connection: H::Connection,
34}
35
36impl<H, Ctx> hyper::service::MakeService<Ctx> for WrapHttpService<H>
37where
38    H: HttpService,
39{
40    type ReqBody = hyper::Body;
41    type ResBody = hyper::Body;
42    type Error = std::io::Error;
43    type Service = WrapConnection<H>;
44    type Future = Compat03As01<BoxFuture<'static, Result<Self::Service, Self::Error>>>;
45    type MakeError = std::io::Error;
46
47    fn make_service(&mut self, _ctx: Ctx) -> Self::Future {
48        let service = self.service.clone();
49        let error = std::io::Error::from(std::io::ErrorKind::Other);
50        async move {
51            let connection = service.connect().into_future().await.map_err(|_| error)?;
52            Ok(WrapConnection {
53                service,
54                connection,
55            })
56        }
57        .boxed()
58        .compat()
59    }
60}
61
62impl<H> hyper::service::Service for WrapConnection<H>
63where
64    H: HttpService,
65{
66    type ReqBody = hyper::Body;
67    type ResBody = hyper::Body;
68    type Error = std::io::Error;
69    type Future =
70        Compat03As01<BoxFuture<'static, Result<http::Response<hyper::Body>, Self::Error>>>;
71
72    fn call(&mut self, req: http::Request<hyper::Body>) -> Self::Future {
73        // Convert Request
74        let error = std::io::Error::from(std::io::ErrorKind::Other);
75        let req = req.map(|body| {
76            let body_stream = Compat01As03::new(body)
77                .map(|chunk| chunk.map(|chunk| chunk.to_vec()))
78                .map_err(|e| io::Error::new(io::ErrorKind::Other, e));
79            let body_reader = body_stream.into_async_read();
80            Body::from_reader(body_reader)
81        });
82
83        let fut = self.service.respond(&mut self.connection, req);
84
85        // Convert Request
86        async move {
87            let res: http::Response<_> = fut.into_future().await.map_err(|_| error)?;
88            let (parts, body) = res.into_parts();
89            let body = hyper::Body::wrap_stream(Compat03As01::new(ChunkStream { body }));
90            Ok(hyper::Response::from_parts(parts, body))
91        }
92        .boxed()
93        .compat()
94    }
95}
96
97/// A listening HTTP server that accepts connections in both HTTP1 and HTTP2 by default.
98///
99/// [`Server`] is a [`Future`] mapping a bound listener with a set of service handlers. It is built
100/// using the [`Builder`], and the future completes when the server has been shutdown. It should be
101/// run by an executor.
102#[allow(clippy::type_complexity)] // single-use type with many compat layers
103pub struct Server<I: TryStream, S, Sp> {
104    inner: Compat01As03<
105        HyperServer<
106            Compat03As01<stream::MapOk<I, fn(I::Ok) -> Compat03As01<I::Ok>>>,
107            WrapHttpService<S>,
108            Compat03As01<Sp>,
109        >,
110    >,
111}
112
113impl<I: TryStream, S, Sp> std::fmt::Debug for Server<I, S, Sp> {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        f.debug_struct("Server").finish()
116    }
117}
118
119/// A builder for a [`Server`].
120#[allow(clippy::type_complexity)] // single-use type with many compat layers
121pub struct Builder<I: TryStream, Sp> {
122    inner: HyperBuilder<
123        Compat03As01<stream::MapOk<I, fn(I::Ok) -> Compat03As01<I::Ok>>>,
124        Compat03As01<Sp>,
125    >,
126}
127
128impl<I: TryStream, Sp> std::fmt::Debug for Builder<I, Sp> {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        f.debug_struct("Builder").finish()
131    }
132}
133
134impl<I: TryStream> Server<I, (), ()> {
135    /// Starts a [`Builder`] with the provided incoming stream.
136    pub fn builder(incoming: I) -> Builder<I, ()> {
137        Builder {
138            inner: HyperServer::builder(Compat03As01::new(incoming.map_ok(Compat03As01::new as _)))
139                .executor(Compat03As01::new(())),
140        }
141    }
142}
143
144impl<I: TryStream, Sp> Builder<I, Sp> {
145    /// Sets the [`Spawn`] to deal with starting connection tasks.
146    pub fn with_spawner<Sp2>(self, new_spawner: Sp2) -> Builder<I, Sp2> {
147        Builder {
148            inner: self.inner.executor(Compat03As01::new(new_spawner)),
149        }
150    }
151
152    /// Consume this [`Builder`], creating a [`Server`].
153    ///
154    /// # Examples
155    ///
156    /// ```no_run
157    /// use http_service::{Response, Body};
158    /// use http_service_hyper::Server;
159    /// use romio::TcpListener;
160    ///
161    /// // Construct an executor to run our tasks on
162    /// let mut pool = futures::executor::ThreadPool::new()?;
163    ///
164    /// // And an HttpService to handle each connection...
165    /// let service = |req| {
166    ///     futures::future::ok::<_, ()>(Response::new(Body::from("Hello World")))
167    /// };
168    ///
169    /// // Then bind, configure the spawner to our pool, and serve...
170    /// let addr = "127.0.0.1:3000".parse()?;
171    /// let mut listener = TcpListener::bind(&addr)?;
172    /// let server = Server::builder(listener.incoming())
173    ///     .with_spawner(pool.clone())
174    ///     .serve(service);
175    ///
176    /// // Finally, spawn `server` onto our executor...
177    /// pool.run(server)?;
178    /// # Ok::<(), Box<dyn std::error::Error>>(())
179    pub fn serve<S: HttpService>(self, service: S) -> Server<I, S, Sp>
180    where
181        I: TryStream + Unpin,
182        I::Ok: AsyncRead + AsyncWrite + Send + Unpin + 'static,
183        I::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
184        Sp: Clone + Send + 'static,
185        for<'a> &'a Sp: Spawn,
186    {
187        Server {
188            inner: Compat01As03::new(self.inner.serve(WrapHttpService {
189                service: Arc::new(service),
190            })),
191        }
192    }
193}
194
195impl<I, S, Sp> Future for Server<I, S, Sp>
196where
197    I: TryStream + Unpin,
198    I::Ok: AsyncRead + AsyncWrite + Send + Unpin + 'static,
199    I::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
200    S: HttpService,
201    Sp: Clone + Send + 'static,
202    for<'a> &'a Sp: Spawn,
203{
204    type Output = hyper::Result<()>;
205
206    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<hyper::Result<()>> {
207        self.inner.poll_unpin(cx)
208    }
209}
210
211/// Serve the given `HttpService` at the given address, using `hyper` as backend, and return a
212/// `Future` that can be `await`ed on.
213#[cfg(feature = "runtime")]
214pub fn serve<S: HttpService>(
215    s: S,
216    addr: SocketAddr,
217) -> impl Future<Output = Result<(), hyper::Error>> {
218    let service = WrapHttpService {
219        service: Arc::new(s),
220    };
221    hyper::Server::bind(&addr).serve(service).compat()
222}
223
224/// Run the given `HttpService` at the given address on the default runtime, using `hyper` as
225/// backend.
226#[cfg(feature = "runtime")]
227pub fn run<S: HttpService>(s: S, addr: SocketAddr) {
228    let server = serve(s, addr).map(|_| Result::<_, ()>::Ok(())).compat();
229    hyper::rt::run(server);
230}
231
232/// A type that wraps an `AsyncRead` into a `Stream` of `hyper::Chunk`. Used for writing data to a
233/// Hyper response.
234struct ChunkStream<R: AsyncRead> {
235    body: R,
236}
237
238impl<R: AsyncRead + Unpin> futures::Stream for ChunkStream<R> {
239    type Item = Result<hyper::Chunk, Box<dyn std::error::Error + Send + Sync + 'static>>;
240
241    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
242        // This is not at all efficient, but that's okay for now.
243        let mut buf = vec![0; 1024];
244        let read = futures::ready!(Pin::new(&mut self.body).poll_read(cx, &mut buf))?;
245        if read == 0 {
246            return Poll::Ready(None);
247        } else {
248            buf.truncate(read);
249            let chunk = hyper::Chunk::from(buf);
250            Poll::Ready(Some(Ok(chunk)))
251        }
252    }
253}