worst_executor/
lib.rs

1//! The worst async executor you could think of.
2//!
3//! This crate provides a single function, `block_on`, which takes a future and
4//! blocks the current thread until the future is resolved.
5//! The way it works is by "spin-looping" over the `poll` method until it is ready.
6//!
7//! The nice thing about this is that it optimizes very well,
8//! for example `worst_executor::block_on(async { 42 })` compiles to a single `mov` instruction.
9//!
10//! The bad thing about this is that it does not actually do any scheduling, meaning that if you
11//! wait on a future that never resolves, your program will hang. which is why you should probably not use this.
12//!
13//! Note that because of its simplicity, the library only uses `core` and does not require `std` or `alloc`
14//! and is literally 14 lines of code.
15//!
16//! This can become more useful using the `core::future::join` macro
17//! which allows you to wait on multiple futures at once (each time polling a different future).
18//!
19//! Currently this library requires rust *nightly* for the `pin` macro, the `join` macro(used in tests) and the `const_waker` feature (required for complete optimization of `block_on`)
20//!
21//! # Examples
22//!
23//! ## Block on a simple future
24//! ```
25//! use worst_executor::block_on;
26//!
27//! let val = block_on(async { 42 });
28//! assert_eq!(val, 42);
29//! ```
30//!
31//! ## Receive and send messages through a channel
32//!
33//! This works in an event-loop style, each time checking if it can send or recieve a message.
34//! ```
35//! #![feature(future_join)]
36//! use core::future::join;
37//! use worst_executor::block_on;
38//!
39//! let (sender, receiver) = async_channel::bounded(1);
40//! block_on(async {
41//!     let send_20_fut = async {
42//!         for i in 0..20 {
43//!             sender.send(i).await.unwrap();
44//!         }
45//!     };
46//!     let recv_20_fut = async {
47//!         for i in 0..20 {
48//!             assert_eq!(receiver.recv().await.unwrap(), i);
49//!         }
50//!     };
51//!     join!(send_20_fut, recv_20_fut).await;
52//! });
53//! ```
54//!
55//! ## A single threaded TCP server
56//! ```no_run
57//! use async_net::{TcpListener, TcpStream};
58//! use futures::{stream::FuturesUnordered, AsyncReadExt, AsyncWriteExt, StreamExt};
59//! use worst_executor::block_on;
60//!
61//! # fn main() -> std::io::Result<()> {
62//! block_on(async {
63//!     let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
64//!     let mut connection_handlers = FuturesUnordered::new();
65//!     // This stream is infinite so it's OK to call fuse.
66//!     let mut listener = listener.incoming().fuse();
67//!     loop {
68//!         futures::select! {
69//!             new_connection = listener.select_next_some() => connection_handlers.push(handle_connection(new_connection?)),
70//!             socket = connection_handlers.select_next_some() =>
71//!                 if let Some(socket) = socket {
72//!                     connection_handlers.push(handle_connection(socket));
73//!                 },
74//!         }
75//!     }
76//! })
77//! # }
78//!
79//! async fn handle_connection(mut stream: TcpStream) -> Option<TcpStream> {
80//!    let mut buf = [0u8; 1024];
81//!    let n = match stream.read(&mut buf).await {
82//!        Ok(n) if n == 0 => return Some(stream),
83//!        Ok(n) => n,
84//!        Err(e) => {
85//!            eprintln!("failed to read from the socket {e:?}");
86//!            return None;
87//!        }
88//!    };
89//!    // Write the data back
90//!    stream
91//!        .write_all(&buf[0..n])
92//!        .await
93//!        .map_err(|e| eprintln!("failed to write to the socket {e:?}"))
94//!        .map(|()| stream)
95//!        .ok()
96//! }
97//! ```
98
99#![cfg_attr(not(test), no_std)]
100#![feature(const_waker)]
101#![cfg_attr(test, feature(future_join))]
102
103use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
104use core::{future::Future, ptr::null};
105use core::{hint, pin::pin};
106
107/// Runs the given future to completion on the current thread.
108/// This function will block the current thread until the future is resolved.
109/// The way it works is by "spin-looping" over the `poll` method until it is ready.
110///
111/// It will not do any scheduling, nor will it launch any threads.
112///
113/// # Examples
114/// ```
115/// use worst_executor::block_on;
116/// block_on(async {
117///     println!("Hello, world!");
118/// })
119/// ```
120pub fn block_on<F: Future>(f: F) -> F::Output {
121    static WAKER: Waker = {
122        const RAW_WAKER: RawWaker = RawWaker::new(null(), &RawWakerVTable::new(|_| RAW_WAKER, |_| (), |_| (), |_| ()));
123        unsafe { Waker::from_raw(RAW_WAKER) }
124    };
125
126    let mut f = pin!(f);
127    loop {
128        match f.as_mut().poll(&mut Context::from_waker(&WAKER)) {
129            Poll::Ready(r) => break r,
130            Poll::Pending => hint::spin_loop(),
131        }
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use crate::block_on;
138    use core::future::join;
139    use core::{future, pin::pin, task::Poll};
140    use futures::{stream::FuturesUnordered, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt};
141
142    #[test]
143    fn test_block_on_trivial() {
144        assert_eq!(block_on(async { 42 }), 42);
145    }
146
147    #[test]
148    fn test_block_on_n_tries() {
149        for n in 0..420 {
150            let mut index = 0;
151            let future = future::poll_fn(|ctx| {
152                if index < n {
153                    index += 1;
154                    ctx.waker().wake_by_ref();
155                    Poll::Pending
156                } else {
157                    Poll::Ready(index)
158                }
159            });
160            let ret = block_on(async {
161                assert_eq!(future.await, n);
162                n
163            });
164            assert_eq!(ret, n);
165        }
166    }
167
168    async fn write_read_eq<W: AsyncWrite, R: AsyncRead>(writer: W, reader: R, data: &[u8]) {
169        let (mut writer, mut reader) = (pin!(writer), pin!(reader));
170        let write = async {
171            writer.write_all(data).await.unwrap();
172            writer.flush().await.unwrap();
173        };
174        let read = async {
175            let mut buf = vec![0u8; data.len()];
176            reader.read_exact(&mut buf).await.unwrap();
177            buf
178        };
179        let (_, buf) = future::join!(write, read).await;
180        assert_eq!(buf, data);
181    }
182
183    #[test]
184    fn test_async_fs() {
185        use async_fs::File;
186        let data = b"Hello, world! from my async executor";
187        let path = "./target/test_file";
188        // Cleanup
189        let _ = block_on(async_fs::remove_file(path));
190        block_on(async {
191            {
192                let writer = File::create(path).await.unwrap();
193                let reader = File::open(path).await.unwrap();
194                write_read_eq(writer, reader, data).await;
195            }
196            // Cleanup
197            async_fs::remove_file(path).await.unwrap();
198        });
199    }
200
201    #[cfg(unix)]
202    #[test]
203    fn test_async_unix_socket() {
204        use async_net::unix::{UnixListener, UnixStream};
205        let path = "./target/test_socket.sock";
206        let data = b"Hello, world! from my async executor in UNIX stream";
207        // Cleanup
208        let _ = block_on(async_fs::remove_file(path));
209        let listener = UnixListener::bind(path).unwrap();
210
211        block_on(async {
212            let (sender, receiver) = future::join!(listener.accept(), UnixStream::connect(path)).await;
213            write_read_eq(sender.unwrap().0, receiver.unwrap(), data).await;
214            // Cleanup
215            async_fs::remove_file(path).await.unwrap();
216        });
217    }
218
219    #[test]
220    fn test_async_tcp_socket() {
221        use async_net::{TcpListener, TcpStream};
222        let localhost = "127.0.0.1";
223        let data = b"Hello, world! from my async executor in TCP stream";
224
225        block_on(async {
226            let listener = TcpListener::bind((localhost, 0)).await.unwrap();
227            let port = listener.local_addr().unwrap().port();
228
229            let (sender, receiver) = future::join!(listener.accept(), TcpStream::connect((localhost, port))).await;
230            write_read_eq(sender.unwrap().0, receiver.unwrap(), data).await;
231        });
232    }
233
234    #[test]
235    fn test_async_udp_socket() {
236        use async_net::UdpSocket;
237        let localhost = "127.0.0.1";
238        let data = b"Hello, world! from my async executor in UDP stream";
239
240        block_on(async {
241            let sender = UdpSocket::bind((localhost, 0)).await.unwrap();
242            let receiver = UdpSocket::bind((localhost, 0)).await.unwrap();
243            let sender_port = sender.local_addr().unwrap().port();
244            let receiver_port = receiver.local_addr().unwrap().port();
245            receiver.connect((localhost, sender_port)).await.unwrap();
246            sender.connect((localhost, receiver_port)).await.unwrap();
247            let mut buf = vec![0u8; data.len()];
248            let (sender_res, receiver_res) = join!(sender.send(data), receiver.recv(&mut buf)).await;
249            sender_res.unwrap();
250            receiver_res.unwrap();
251            assert_eq!(buf, data);
252        });
253    }
254
255    #[test]
256    fn test_async_unbounded_channel() {
257        let data = b"Hello, world! from my async executor in a channel";
258        let (sender, receiver) = async_channel::unbounded();
259        block_on(async {
260            let (sender_res, ret) = join!(sender.send(data), receiver.recv()).await;
261            sender_res.unwrap();
262            assert_eq!(ret.unwrap(), data);
263        });
264    }
265
266    #[test]
267    fn test_async_bounded_channel() {
268        let (sender, receiver) = async_channel::bounded(1);
269        block_on(async {
270            let send_20_fut = async {
271                for i in 0..20 {
272                    sender.send(i).await.unwrap();
273                }
274            };
275            let recv_20_fut = async {
276                for i in 0..20 {
277                    assert_eq!(receiver.recv().await.unwrap(), i);
278                }
279            };
280            join!(send_20_fut, recv_20_fut).await;
281        });
282    }
283
284    #[test]
285    fn test_tcp_server() {
286        use async_net::{TcpListener, TcpStream};
287
288        async fn handle_connection(mut stream: TcpStream) -> Option<TcpStream> {
289            let mut buf = [0u8; 1024];
290            let n = match stream.read(&mut buf).await {
291                Ok(n) if n == 0 => return Some(stream),
292                Ok(n) => n,
293                Err(e) => {
294                    eprintln!("failed to read from the socket {e:?}");
295                    return None;
296                }
297            };
298            // Write the data back
299            stream.write_all(&buf[0..n]).await.map_err(|e| eprintln!("failed to write to the socket {e:?}")).map(|()| stream).ok()
300        }
301        let localhost = "127.0.0.1";
302        let listener = crate::block_on(TcpListener::bind((localhost, 0))).unwrap();
303        let port = listener.local_addr().unwrap().port();
304
305        let res = block_on(async move {
306            let mut readers = futures::future::join_all((0..10).map(|i| async move {
307                let mut stream = TcpStream::connect((localhost, port)).await.unwrap();
308                let data = format!("Hello, world! from my async executor in TCP stream number: {i}");
309                stream.write_all(data.as_bytes()).await.unwrap();
310                let mut buf = vec![0u8; data.len()];
311                stream.read_exact(&mut buf).await.unwrap();
312                assert_eq!(buf, data.as_bytes());
313                i
314            }))
315            .fuse();
316            // This stream is inifinite so same to call fuse.
317            let mut connection_handlers = FuturesUnordered::new();
318            let mut listener = listener.incoming().fuse();
319            loop {
320                futures::select! {
321                    new_connection = listener.select_next_some() => connection_handlers.push(handle_connection(new_connection.unwrap())),
322                    socket = connection_handlers.select_next_some() =>
323                        if let Some(socket) = socket {
324                            connection_handlers.push(handle_connection(socket));
325                        },
326                    result = readers => break result,
327                }
328            }
329        });
330
331        assert_eq!(res, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
332    }
333}