async_rs/util/
block_on.rs

1use std::{
2    future::Future,
3    sync::{
4        Arc,
5        atomic::{AtomicBool, Ordering},
6    },
7    task::{Context, Poll, Wake},
8    thread::{self, Thread},
9};
10
11/// Simple naive block_on implementation for noop runtime
12pub fn simple_block_on<F: Future>(f: F) -> F::Output {
13    let _enter = enter();
14    let thread = ThreadWaker::new_arc();
15    let waker = thread.clone().into();
16    let mut cx = Context::from_waker(&waker);
17    let mut f = Box::pin(f);
18    loop {
19        match f.as_mut().poll(&mut cx) {
20            Poll::Ready(r) => return r,
21            Poll::Pending => thread.park(),
22        }
23    }
24}
25
26thread_local! {
27    static BUSY: AtomicBool = const { AtomicBool::new(false) };
28}
29
30struct EnterGuard;
31
32impl Drop for EnterGuard {
33    fn drop(&mut self) {
34        BUSY.with(|e| e.swap(false, Ordering::Acquire));
35    }
36}
37
38fn enter() -> EnterGuard {
39    if BUSY.with(|e| e.swap(true, Ordering::Release)) {
40        panic!("Cannot call simple_block_on recursively")
41    }
42
43    EnterGuard
44}
45
46struct ThreadWaker {
47    thread: Thread,
48    parked: AtomicBool,
49}
50
51impl ThreadWaker {
52    fn new_arc() -> Arc<Self> {
53        Arc::new(Self {
54            thread: thread::current(),
55            parked: AtomicBool::new(true),
56        })
57    }
58
59    fn park(&self) {
60        // Check with Ordering Release to make sure we're ran first.
61        // Better unpark once too much than park once too much.
62        // Only park if we weren't already.
63        if !self.parked.swap(true, Ordering::Acquire) {
64            // self.thread.park() is private, but anyways we want to park the current thread.
65            thread::park();
66        }
67    }
68
69    fn unpark(&self) {
70        // Check with Ordering Release to make sure we're ran last.
71        // Better unpark once too much than park once too much.
72        // Only unpark if we were parked.
73        if self.parked.swap(false, Ordering::Release) {
74            self.thread.unpark();
75        }
76    }
77}
78
79impl Wake for ThreadWaker {
80    fn wake(self: Arc<Self>) {
81        self.unpark()
82    }
83
84    fn wake_by_ref(self: &Arc<Self>) {
85        self.unpark()
86    }
87}
88
89#[cfg(test)]
90mod test {
91    use super::*;
92    use std::future;
93
94    #[test]
95    fn simple() {
96        assert_eq!(simple_block_on(future::ready(42)), 42);
97    }
98
99    #[test]
100    fn poll_fn() {
101        let mut a = 0;
102        let fut = future::poll_fn(move |cx| {
103            if a == 5 {
104                return Poll::Ready(10);
105            }
106            a += 1;
107            cx.waker().wake_by_ref();
108            Poll::Pending
109        });
110        assert_eq!(simple_block_on(fut), 10);
111    }
112}