1#![forbid(future_incompatible, rust_2018_idioms)]
4#![deny(missing_debug_implementations, nonstandard_style)]
5#![warn(missing_docs, missing_doc_code_examples)]
6#![cfg_attr(test, deny(warnings))]
7
8use futures::compat::Future01CompatExt;
9#[cfg(feature = "runtime")]
10use futures::compat::{Compat as Compat03As01, Compat01As03};
11use futures::future::BoxFuture;
12use futures::prelude::*;
13use futures::stream;
14use futures::task::Spawn;
15use http_service::{Body, HttpService};
16use hyper::server::{Builder as HyperBuilder, Server as HyperServer};
17
18use std::io;
19#[cfg(feature = "runtime")]
20use std::net::SocketAddr;
21use std::pin::Pin;
22use std::sync::Arc;
23use std::task::{self, Context, Poll};
24
25struct WrapHttpService<H> {
27 service: Arc<H>,
28}
29
30struct WrapConnection<H: HttpService> {
32 service: Arc<H>,
33 connection: H::Connection,
34}
35
36impl<H, Ctx> hyper::service::MakeService<Ctx> for WrapHttpService<H>
37where
38 H: HttpService,
39{
40 type ReqBody = hyper::Body;
41 type ResBody = hyper::Body;
42 type Error = std::io::Error;
43 type Service = WrapConnection<H>;
44 type Future = Compat03As01<BoxFuture<'static, Result<Self::Service, Self::Error>>>;
45 type MakeError = std::io::Error;
46
47 fn make_service(&mut self, _ctx: Ctx) -> Self::Future {
48 let service = self.service.clone();
49 let error = std::io::Error::from(std::io::ErrorKind::Other);
50 async move {
51 let connection = service.connect().into_future().await.map_err(|_| error)?;
52 Ok(WrapConnection {
53 service,
54 connection,
55 })
56 }
57 .boxed()
58 .compat()
59 }
60}
61
62impl<H> hyper::service::Service for WrapConnection<H>
63where
64 H: HttpService,
65{
66 type ReqBody = hyper::Body;
67 type ResBody = hyper::Body;
68 type Error = std::io::Error;
69 type Future =
70 Compat03As01<BoxFuture<'static, Result<http::Response<hyper::Body>, Self::Error>>>;
71
72 fn call(&mut self, req: http::Request<hyper::Body>) -> Self::Future {
73 let error = std::io::Error::from(std::io::ErrorKind::Other);
75 let req = req.map(|body| {
76 let body_stream = Compat01As03::new(body)
77 .map(|chunk| chunk.map(|chunk| chunk.to_vec()))
78 .map_err(|e| io::Error::new(io::ErrorKind::Other, e));
79 let body_reader = body_stream.into_async_read();
80 Body::from_reader(body_reader)
81 });
82
83 let fut = self.service.respond(&mut self.connection, req);
84
85 async move {
87 let res: http::Response<_> = fut.into_future().await.map_err(|_| error)?;
88 let (parts, body) = res.into_parts();
89 let body = hyper::Body::wrap_stream(Compat03As01::new(ChunkStream { body }));
90 Ok(hyper::Response::from_parts(parts, body))
91 }
92 .boxed()
93 .compat()
94 }
95}
96
97#[allow(clippy::type_complexity)] pub struct Server<I: TryStream, S, Sp> {
104 inner: Compat01As03<
105 HyperServer<
106 Compat03As01<stream::MapOk<I, fn(I::Ok) -> Compat03As01<I::Ok>>>,
107 WrapHttpService<S>,
108 Compat03As01<Sp>,
109 >,
110 >,
111}
112
113impl<I: TryStream, S, Sp> std::fmt::Debug for Server<I, S, Sp> {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 f.debug_struct("Server").finish()
116 }
117}
118
119#[allow(clippy::type_complexity)] pub struct Builder<I: TryStream, Sp> {
122 inner: HyperBuilder<
123 Compat03As01<stream::MapOk<I, fn(I::Ok) -> Compat03As01<I::Ok>>>,
124 Compat03As01<Sp>,
125 >,
126}
127
128impl<I: TryStream, Sp> std::fmt::Debug for Builder<I, Sp> {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 f.debug_struct("Builder").finish()
131 }
132}
133
134impl<I: TryStream> Server<I, (), ()> {
135 pub fn builder(incoming: I) -> Builder<I, ()> {
137 Builder {
138 inner: HyperServer::builder(Compat03As01::new(incoming.map_ok(Compat03As01::new as _)))
139 .executor(Compat03As01::new(())),
140 }
141 }
142}
143
144impl<I: TryStream, Sp> Builder<I, Sp> {
145 pub fn with_spawner<Sp2>(self, new_spawner: Sp2) -> Builder<I, Sp2> {
147 Builder {
148 inner: self.inner.executor(Compat03As01::new(new_spawner)),
149 }
150 }
151
152 pub fn serve<S: HttpService>(self, service: S) -> Server<I, S, Sp>
180 where
181 I: TryStream + Unpin,
182 I::Ok: AsyncRead + AsyncWrite + Send + Unpin + 'static,
183 I::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
184 Sp: Clone + Send + 'static,
185 for<'a> &'a Sp: Spawn,
186 {
187 Server {
188 inner: Compat01As03::new(self.inner.serve(WrapHttpService {
189 service: Arc::new(service),
190 })),
191 }
192 }
193}
194
195impl<I, S, Sp> Future for Server<I, S, Sp>
196where
197 I: TryStream + Unpin,
198 I::Ok: AsyncRead + AsyncWrite + Send + Unpin + 'static,
199 I::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
200 S: HttpService,
201 Sp: Clone + Send + 'static,
202 for<'a> &'a Sp: Spawn,
203{
204 type Output = hyper::Result<()>;
205
206 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<hyper::Result<()>> {
207 self.inner.poll_unpin(cx)
208 }
209}
210
211#[cfg(feature = "runtime")]
214pub fn serve<S: HttpService>(
215 s: S,
216 addr: SocketAddr,
217) -> impl Future<Output = Result<(), hyper::Error>> {
218 let service = WrapHttpService {
219 service: Arc::new(s),
220 };
221 hyper::Server::bind(&addr).serve(service).compat()
222}
223
224#[cfg(feature = "runtime")]
227pub fn run<S: HttpService>(s: S, addr: SocketAddr) {
228 let server = serve(s, addr).map(|_| Result::<_, ()>::Ok(())).compat();
229 hyper::rt::run(server);
230}
231
232struct ChunkStream<R: AsyncRead> {
235 body: R,
236}
237
238impl<R: AsyncRead + Unpin> futures::Stream for ChunkStream<R> {
239 type Item = Result<hyper::Chunk, Box<dyn std::error::Error + Send + Sync + 'static>>;
240
241 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
242 let mut buf = vec![0; 1024];
244 let read = futures::ready!(Pin::new(&mut self.body).poll_read(cx, &mut buf))?;
245 if read == 0 {
246 return Poll::Ready(None);
247 } else {
248 buf.truncate(read);
249 let chunk = hyper::Chunk::from(buf);
250 Poll::Ready(Some(Ok(chunk)))
251 }
252 }
253}