1#![cfg_attr(not(test), no_std)]
100#![feature(const_waker)]
101#![cfg_attr(test, feature(future_join))]
102
103use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
104use core::{future::Future, ptr::null};
105use core::{hint, pin::pin};
106
107pub fn block_on<F: Future>(f: F) -> F::Output {
121 static WAKER: Waker = {
122 const RAW_WAKER: RawWaker = RawWaker::new(null(), &RawWakerVTable::new(|_| RAW_WAKER, |_| (), |_| (), |_| ()));
123 unsafe { Waker::from_raw(RAW_WAKER) }
124 };
125
126 let mut f = pin!(f);
127 loop {
128 match f.as_mut().poll(&mut Context::from_waker(&WAKER)) {
129 Poll::Ready(r) => break r,
130 Poll::Pending => hint::spin_loop(),
131 }
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use crate::block_on;
138 use core::future::join;
139 use core::{future, pin::pin, task::Poll};
140 use futures::{stream::FuturesUnordered, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt};
141
142 #[test]
143 fn test_block_on_trivial() {
144 assert_eq!(block_on(async { 42 }), 42);
145 }
146
147 #[test]
148 fn test_block_on_n_tries() {
149 for n in 0..420 {
150 let mut index = 0;
151 let future = future::poll_fn(|ctx| {
152 if index < n {
153 index += 1;
154 ctx.waker().wake_by_ref();
155 Poll::Pending
156 } else {
157 Poll::Ready(index)
158 }
159 });
160 let ret = block_on(async {
161 assert_eq!(future.await, n);
162 n
163 });
164 assert_eq!(ret, n);
165 }
166 }
167
168 async fn write_read_eq<W: AsyncWrite, R: AsyncRead>(writer: W, reader: R, data: &[u8]) {
169 let (mut writer, mut reader) = (pin!(writer), pin!(reader));
170 let write = async {
171 writer.write_all(data).await.unwrap();
172 writer.flush().await.unwrap();
173 };
174 let read = async {
175 let mut buf = vec![0u8; data.len()];
176 reader.read_exact(&mut buf).await.unwrap();
177 buf
178 };
179 let (_, buf) = future::join!(write, read).await;
180 assert_eq!(buf, data);
181 }
182
183 #[test]
184 fn test_async_fs() {
185 use async_fs::File;
186 let data = b"Hello, world! from my async executor";
187 let path = "./target/test_file";
188 let _ = block_on(async_fs::remove_file(path));
190 block_on(async {
191 {
192 let writer = File::create(path).await.unwrap();
193 let reader = File::open(path).await.unwrap();
194 write_read_eq(writer, reader, data).await;
195 }
196 async_fs::remove_file(path).await.unwrap();
198 });
199 }
200
201 #[cfg(unix)]
202 #[test]
203 fn test_async_unix_socket() {
204 use async_net::unix::{UnixListener, UnixStream};
205 let path = "./target/test_socket.sock";
206 let data = b"Hello, world! from my async executor in UNIX stream";
207 let _ = block_on(async_fs::remove_file(path));
209 let listener = UnixListener::bind(path).unwrap();
210
211 block_on(async {
212 let (sender, receiver) = future::join!(listener.accept(), UnixStream::connect(path)).await;
213 write_read_eq(sender.unwrap().0, receiver.unwrap(), data).await;
214 async_fs::remove_file(path).await.unwrap();
216 });
217 }
218
219 #[test]
220 fn test_async_tcp_socket() {
221 use async_net::{TcpListener, TcpStream};
222 let localhost = "127.0.0.1";
223 let data = b"Hello, world! from my async executor in TCP stream";
224
225 block_on(async {
226 let listener = TcpListener::bind((localhost, 0)).await.unwrap();
227 let port = listener.local_addr().unwrap().port();
228
229 let (sender, receiver) = future::join!(listener.accept(), TcpStream::connect((localhost, port))).await;
230 write_read_eq(sender.unwrap().0, receiver.unwrap(), data).await;
231 });
232 }
233
234 #[test]
235 fn test_async_udp_socket() {
236 use async_net::UdpSocket;
237 let localhost = "127.0.0.1";
238 let data = b"Hello, world! from my async executor in UDP stream";
239
240 block_on(async {
241 let sender = UdpSocket::bind((localhost, 0)).await.unwrap();
242 let receiver = UdpSocket::bind((localhost, 0)).await.unwrap();
243 let sender_port = sender.local_addr().unwrap().port();
244 let receiver_port = receiver.local_addr().unwrap().port();
245 receiver.connect((localhost, sender_port)).await.unwrap();
246 sender.connect((localhost, receiver_port)).await.unwrap();
247 let mut buf = vec![0u8; data.len()];
248 let (sender_res, receiver_res) = join!(sender.send(data), receiver.recv(&mut buf)).await;
249 sender_res.unwrap();
250 receiver_res.unwrap();
251 assert_eq!(buf, data);
252 });
253 }
254
255 #[test]
256 fn test_async_unbounded_channel() {
257 let data = b"Hello, world! from my async executor in a channel";
258 let (sender, receiver) = async_channel::unbounded();
259 block_on(async {
260 let (sender_res, ret) = join!(sender.send(data), receiver.recv()).await;
261 sender_res.unwrap();
262 assert_eq!(ret.unwrap(), data);
263 });
264 }
265
266 #[test]
267 fn test_async_bounded_channel() {
268 let (sender, receiver) = async_channel::bounded(1);
269 block_on(async {
270 let send_20_fut = async {
271 for i in 0..20 {
272 sender.send(i).await.unwrap();
273 }
274 };
275 let recv_20_fut = async {
276 for i in 0..20 {
277 assert_eq!(receiver.recv().await.unwrap(), i);
278 }
279 };
280 join!(send_20_fut, recv_20_fut).await;
281 });
282 }
283
284 #[test]
285 fn test_tcp_server() {
286 use async_net::{TcpListener, TcpStream};
287
288 async fn handle_connection(mut stream: TcpStream) -> Option<TcpStream> {
289 let mut buf = [0u8; 1024];
290 let n = match stream.read(&mut buf).await {
291 Ok(n) if n == 0 => return Some(stream),
292 Ok(n) => n,
293 Err(e) => {
294 eprintln!("failed to read from the socket {e:?}");
295 return None;
296 }
297 };
298 stream.write_all(&buf[0..n]).await.map_err(|e| eprintln!("failed to write to the socket {e:?}")).map(|()| stream).ok()
300 }
301 let localhost = "127.0.0.1";
302 let listener = crate::block_on(TcpListener::bind((localhost, 0))).unwrap();
303 let port = listener.local_addr().unwrap().port();
304
305 let res = block_on(async move {
306 let mut readers = futures::future::join_all((0..10).map(|i| async move {
307 let mut stream = TcpStream::connect((localhost, port)).await.unwrap();
308 let data = format!("Hello, world! from my async executor in TCP stream number: {i}");
309 stream.write_all(data.as_bytes()).await.unwrap();
310 let mut buf = vec![0u8; data.len()];
311 stream.read_exact(&mut buf).await.unwrap();
312 assert_eq!(buf, data.as_bytes());
313 i
314 }))
315 .fuse();
316 let mut connection_handlers = FuturesUnordered::new();
318 let mut listener = listener.incoming().fuse();
319 loop {
320 futures::select! {
321 new_connection = listener.select_next_some() => connection_handlers.push(handle_connection(new_connection.unwrap())),
322 socket = connection_handlers.select_next_some() =>
323 if let Some(socket) = socket {
324 connection_handlers.push(handle_connection(socket));
325 },
326 result = readers => break result,
327 }
328 }
329 });
330
331 assert_eq!(res, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
332 }
333}