1#![forbid(unsafe_code)]
42
43use async_executor::Executor;
44use async_io::Async;
45use hyper::body::Incoming as HyperIncoming;
46use hyper_util::server::conn::auto::Builder;
47use pin_project_lite::pin_project;
48use smol_hyper::rt::{FuturesIo, SmolExecutor, SmolTimer};
49use tower_service::Service;
50
51use axum_core::body::Body;
52use axum_core::extract::Request;
53use axum_core::response::Response;
54
55use futures_lite::future::poll_fn;
56use futures_lite::io::{AsyncRead, AsyncWrite};
57use futures_lite::ready;
58
59use std::borrow::Borrow;
60use std::convert::Infallible;
61use std::future::Future;
62use std::io;
63use std::net::{SocketAddr, TcpListener, TcpStream};
64use std::pin::Pin;
65use std::task::{Context, Poll};
66
67pub trait Incoming {
69 type Connection: AsyncRead + AsyncWrite;
71
72 type Accept<'a>: Future<Output = io::Result<Option<(Self::Connection, SocketAddr)>>> + 'a
74 where
75 Self: 'a;
76
77 fn accept(&self) -> Self::Accept<'_>;
79}
80
81impl<'this, T: Incoming + ?Sized> Incoming for &'this T {
82 type Accept<'a>
83 = T::Accept<'a>
84 where
85 'this: 'a;
86 type Connection = T::Connection;
87
88 #[inline]
89 fn accept(&self) -> Self::Accept<'_> {
90 (**self).accept()
91 }
92}
93
94impl<'this, T: Incoming + ?Sized> Incoming for &'this mut T {
95 type Accept<'a>
96 = T::Accept<'a>
97 where
98 'this: 'a;
99 type Connection = T::Connection;
100
101 #[inline]
102 fn accept(&self) -> Self::Accept<'_> {
103 (**self).accept()
104 }
105}
106
107impl<T: Incoming + ?Sized> Incoming for Box<T> {
108 type Accept<'a>
109 = T::Accept<'a>
110 where
111 T: 'a;
112 type Connection = T::Connection;
113
114 #[inline]
115 fn accept(&self) -> Self::Accept<'_> {
116 (**self).accept()
117 }
118}
119
120impl Incoming for Async<TcpListener> {
121 type Accept<'a> = Pin<
122 Box<dyn Future<Output = io::Result<Option<(Self::Connection, SocketAddr)>>> + Send + 'a>,
123 >;
124 type Connection = Async<TcpStream>;
125
126 #[inline]
127 fn accept(&self) -> Self::Accept<'_> {
128 Box::pin(async move { self.accept().await.map(Some) })
129 }
130}
131
132#[cfg(feature = "async-net")]
133impl Incoming for async_net::TcpListener {
134 type Accept<'a> = Pin<
135 Box<dyn Future<Output = io::Result<Option<(Self::Connection, SocketAddr)>>> + Send + 'a>,
136 >;
137 type Connection = async_net::TcpStream;
138
139 #[inline]
140 fn accept(&self) -> Self::Accept<'_> {
141 Box::pin(async move { self.accept().await.map(Some) })
142 }
143}
144
145pub async fn serve<'ex, I, S>(
147 executor: impl Borrow<Executor<'ex>> + Clone + Send + 'ex,
148 tcp_listener: I,
149 service: S,
150) -> io::Result<()>
151where
152 I: Incoming + 'static,
153 I::Connection: Send + Unpin,
154 S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
155 S::Future: Send,
156{
157 loop {
158 let (tcp_stream, _remote_addr) = match tcp_listener.accept().await? {
160 Some(conn) => conn,
161 None => break,
162 };
163
164 let tcp_stream = FuturesIo::new(tcp_stream);
166
167 let mut service = service.clone();
169 poll_fn(|cx| service.poll_ready(cx))
170 .await
171 .unwrap_or_else(|e| match e {});
172
173 let service = { TowerToHyperService { service } };
175
176 let task = executor.borrow().spawn({
178 let executor = executor.clone();
179 async move {
180 let mut builder = Builder::new(SmolExecutor::new(AsRefExecutor(executor.borrow())));
181 builder.http1().timer(SmolTimer::new());
182 builder.http2().timer(SmolTimer::new());
183
184 if let Err(err) = builder
185 .serve_connection_with_upgrades(tcp_stream, service)
186 .await
187 {
188 tracing::error!("unintelligible hyper error: {err}");
189 }
190 }
191 });
192
193 task.detach();
195 }
196
197 Ok(())
198}
199
200#[derive(Debug, Copy, Clone)]
202struct TowerToHyperService<S> {
203 service: S,
204}
205
206impl<S> hyper::service::Service<Request<HyperIncoming>> for TowerToHyperService<S>
207where
208 S: tower_service::Service<Request> + Clone,
209{
210 type Response = S::Response;
211 type Error = S::Error;
212 type Future = Oneshot<S, Request>;
213
214 fn call(&self, req: Request<HyperIncoming>) -> Self::Future {
215 let req = req.map(Body::new);
216 Oneshot::NotReady {
217 svc: self.service.clone(),
218 req: Some(req),
219 }
220 }
221}
222
223pin_project! {
225 #[project = OneshotProj]
226 enum Oneshot<S, R>
227 where
228 S: tower_service::Service<R>,
229 {
230 NotReady {
232 svc: S,
233 req: Option<R>
234 },
235 Called {
237 #[pin]
238 fut: S::Future,
239 },
240 Done
242 }
243}
244
245impl<S, R> Future for Oneshot<S, R>
246where
247 S: tower_service::Service<R>,
248{
249 type Output = Result<S::Response, S::Error>;
250
251 #[inline]
252 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
253 loop {
254 match self.as_mut().project() {
255 OneshotProj::NotReady { svc, req } => {
256 ready!(svc.poll_ready(cx))?;
257 let fut = svc.call(req.take().expect("already called"));
258 self.as_mut().set(Oneshot::Called { fut });
259 }
260
261 OneshotProj::Called { fut } => {
262 let res = ready!(fut.poll(cx))?;
263 self.as_mut().set(Oneshot::Done);
264 return Poll::Ready(Ok(res));
265 }
266
267 OneshotProj::Done => panic!("future polled after completion"),
268 }
269 }
270 }
271}
272
273#[derive(Clone)]
274struct AsRefExecutor<'this, 'ex>(&'this Executor<'ex>);
275
276impl<'ex> AsRef<Executor<'ex>> for AsRefExecutor<'_, 'ex> {
277 #[inline]
278 fn as_ref(&self) -> &Executor<'ex> {
279 self.0
280 }
281}