smol_axum/
lib.rs

1// MIT/Apache2 License
2
3//! Integrations between [`smol`] and [`axum`].
4//!
5//! By default, [`axum`] only supports the [`tokio`] runtime. This crate adds a `serve`
6//! function that can be used with [`smol`]'s networking types.
7//!
8//! ## Examples
9//!
10//! ```no_run
11//! use async_io::Async;
12//! use axum::{response::Html, routing::get, Router};
13//! use macro_rules_attribute::apply;
14//!
15//! use std::io;
16//! use std::net::TcpListener;
17//! use std::sync::Arc;
18//!
19//! #[apply(smol_macros::main!)]
20//! async fn main(ex: &Arc<smol_macros::Executor<'_>>) -> io::Result<()> {
21//!     // Build our application with a route.
22//!     let app = Router::new().route("/", get(handler));
23//!
24//!     // Create a `smol`-based TCP listener.
25//!     let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 3000)).unwrap();
26//!     println!("listening on {}", listener.get_ref().local_addr().unwrap());
27//!
28//!     // Run it using `smol_axum`
29//!     smol_axum::serve(ex.clone(), listener, app).await
30//! }
31//!
32//! async fn handler() -> Html<&'static str> {
33//!     Html("<h1>Hello, World!</h1>")
34//! }
35//! ```
36//!
37//! [`axum`]: https://crates.io/crates/axum
38//! [`smol`]: https://crates.io/crates/smol
39//! [`tokio`]: https://crates.io/crates/tokio
40
41#![forbid(unsafe_code)]
42
43use async_executor::Executor;
44use async_io::Async;
45use hyper::body::Incoming as HyperIncoming;
46use hyper_util::server::conn::auto::Builder;
47use pin_project_lite::pin_project;
48use smol_hyper::rt::{FuturesIo, SmolExecutor, SmolTimer};
49use tower_service::Service;
50
51use axum_core::body::Body;
52use axum_core::extract::Request;
53use axum_core::response::Response;
54
55use futures_lite::future::poll_fn;
56use futures_lite::io::{AsyncRead, AsyncWrite};
57use futures_lite::ready;
58
59use std::borrow::Borrow;
60use std::convert::Infallible;
61use std::future::Future;
62use std::io;
63use std::net::{SocketAddr, TcpListener, TcpStream};
64use std::pin::Pin;
65use std::task::{Context, Poll};
66
67/// Something that produces incoming connections.
68pub trait Incoming {
69    /// The resulting connections.
70    type Connection: AsyncRead + AsyncWrite;
71
72    /// Future for accepting a new connection.
73    type Accept<'a>: Future<Output = io::Result<Option<(Self::Connection, SocketAddr)>>> + 'a
74    where
75        Self: 'a;
76
77    /// Wait for a new connection to arrive.
78    fn accept(&self) -> Self::Accept<'_>;
79}
80
81impl<'this, T: Incoming + ?Sized> Incoming for &'this T {
82    type Accept<'a>
83        = T::Accept<'a>
84    where
85        'this: 'a;
86    type Connection = T::Connection;
87
88    #[inline]
89    fn accept(&self) -> Self::Accept<'_> {
90        (**self).accept()
91    }
92}
93
94impl<'this, T: Incoming + ?Sized> Incoming for &'this mut T {
95    type Accept<'a>
96        = T::Accept<'a>
97    where
98        'this: 'a;
99    type Connection = T::Connection;
100
101    #[inline]
102    fn accept(&self) -> Self::Accept<'_> {
103        (**self).accept()
104    }
105}
106
107impl<T: Incoming + ?Sized> Incoming for Box<T> {
108    type Accept<'a>
109        = T::Accept<'a>
110    where
111        T: 'a;
112    type Connection = T::Connection;
113
114    #[inline]
115    fn accept(&self) -> Self::Accept<'_> {
116        (**self).accept()
117    }
118}
119
120impl Incoming for Async<TcpListener> {
121    type Accept<'a> = Pin<
122        Box<dyn Future<Output = io::Result<Option<(Self::Connection, SocketAddr)>>> + Send + 'a>,
123    >;
124    type Connection = Async<TcpStream>;
125
126    #[inline]
127    fn accept(&self) -> Self::Accept<'_> {
128        Box::pin(async move { self.accept().await.map(Some) })
129    }
130}
131
132#[cfg(feature = "async-net")]
133impl Incoming for async_net::TcpListener {
134    type Accept<'a> = Pin<
135        Box<dyn Future<Output = io::Result<Option<(Self::Connection, SocketAddr)>>> + Send + 'a>,
136    >;
137    type Connection = async_net::TcpStream;
138
139    #[inline]
140    fn accept(&self) -> Self::Accept<'_> {
141        Box::pin(async move { self.accept().await.map(Some) })
142    }
143}
144
145/// Serve a future using [`smol`]'s TCP listener.
146pub async fn serve<'ex, I, S>(
147    executor: impl Borrow<Executor<'ex>> + Clone + Send + 'ex,
148    tcp_listener: I,
149    service: S,
150) -> io::Result<()>
151where
152    I: Incoming + 'static,
153    I::Connection: Send + Unpin,
154    S: Service<Request, Response = Response, Error = Infallible> + Clone + Send + 'static,
155    S::Future: Send,
156{
157    loop {
158        // Wait for a new connection.
159        let (tcp_stream, _remote_addr) = match tcp_listener.accept().await? {
160            Some(conn) => conn,
161            None => break,
162        };
163
164        // Wrap it in a `FuturesIo`.
165        let tcp_stream = FuturesIo::new(tcp_stream);
166
167        // Wait for the service to be ready.
168        let mut service = service.clone();
169        poll_fn(|cx| service.poll_ready(cx))
170            .await
171            .unwrap_or_else(|e| match e {});
172
173        // Create a service.
174        let service = { TowerToHyperService { service } };
175
176        // Spawn the service on our executor.
177        let task = executor.borrow().spawn({
178            let executor = executor.clone();
179            async move {
180                let mut builder = Builder::new(SmolExecutor::new(AsRefExecutor(executor.borrow())));
181                builder.http1().timer(SmolTimer::new());
182                builder.http2().timer(SmolTimer::new());
183
184                if let Err(err) = builder
185                    .serve_connection_with_upgrades(tcp_stream, service)
186                    .await
187                {
188                    tracing::error!("unintelligible hyper error: {err}");
189                }
190            }
191        });
192
193        // Detach the task and let it run forever.
194        task.detach();
195    }
196
197    Ok(())
198}
199
200/// Convert a Tower service to the Hyper service.
201#[derive(Debug, Copy, Clone)]
202struct TowerToHyperService<S> {
203    service: S,
204}
205
206impl<S> hyper::service::Service<Request<HyperIncoming>> for TowerToHyperService<S>
207where
208    S: tower_service::Service<Request> + Clone,
209{
210    type Response = S::Response;
211    type Error = S::Error;
212    type Future = Oneshot<S, Request>;
213
214    fn call(&self, req: Request<HyperIncoming>) -> Self::Future {
215        let req = req.map(Body::new);
216        Oneshot::NotReady {
217            svc: self.service.clone(),
218            req: Some(req),
219        }
220    }
221}
222
223// Poll a `tower` service with a request as a future to completion.
224pin_project! {
225    #[project = OneshotProj]
226    enum Oneshot<S, R>
227    where
228        S: tower_service::Service<R>,
229    {
230        // We are not yet ready.
231        NotReady {
232            svc: S,
233            req: Option<R>
234        },
235        // We have been called and are processing the request.
236        Called {
237            #[pin]
238            fut: S::Future,
239        },
240        // We are done.
241        Done
242    }
243}
244
245impl<S, R> Future for Oneshot<S, R>
246where
247    S: tower_service::Service<R>,
248{
249    type Output = Result<S::Response, S::Error>;
250
251    #[inline]
252    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
253        loop {
254            match self.as_mut().project() {
255                OneshotProj::NotReady { svc, req } => {
256                    ready!(svc.poll_ready(cx))?;
257                    let fut = svc.call(req.take().expect("already called"));
258                    self.as_mut().set(Oneshot::Called { fut });
259                }
260
261                OneshotProj::Called { fut } => {
262                    let res = ready!(fut.poll(cx))?;
263                    self.as_mut().set(Oneshot::Done);
264                    return Poll::Ready(Ok(res));
265                }
266
267                OneshotProj::Done => panic!("future polled after completion"),
268            }
269        }
270    }
271}
272
273#[derive(Clone)]
274struct AsRefExecutor<'this, 'ex>(&'this Executor<'ex>);
275
276impl<'ex> AsRef<Executor<'ex>> for AsRefExecutor<'_, 'ex> {
277    #[inline]
278    fn as_ref(&self) -> &Executor<'ex> {
279        self.0
280    }
281}