qsdr_benchmarks/futures/
executor.rs

1// Part of this code is taken from futures-rs/futures-executor
2
3use futures::{executor::enter, pin_mut};
4use futures_task::{waker_ref, ArcWake};
5use std::{
6    future::Future,
7    sync::{
8        atomic::{AtomicBool, Ordering},
9        Arc,
10    },
11    task::{Context, Poll},
12    thread::{self, Thread},
13};
14
15struct ThreadNotify {
16    /// The (single) executor thread.
17    thread: Thread,
18    /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
19    /// before the next `park()`, which may otherwise happen if the code
20    /// being executed as part of the future(s) being polled makes use of
21    /// park / unpark calls of its own, i.e. we cannot assume that no other
22    /// code uses park / unpark on the executing `thread`.
23    unparked: AtomicBool,
24}
25
26std::thread_local! {
27    static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
28        thread: thread::current(),
29        unparked: AtomicBool::new(false),
30    });
31}
32
33impl ArcWake for ThreadNotify {
34    fn wake_by_ref(arc_self: &Arc<Self>) {
35        // Make sure the wakeup is remembered until the next `park()`.
36        let unparked = arc_self.unparked.swap(true, Ordering::Release);
37        if !unparked {
38            // If the thread has not been unparked yet, it must be done
39            // now. If it was actually parked, it will run again,
40            // otherwise the token made available by `unpark`
41            // may be consumed before reaching `park()`, but `unparked`
42            // ensures it is not forgotten.
43            arc_self.thread.unpark();
44        }
45    }
46}
47
48// Set up and run a basic single-threaded spawner loop, invoking `f` on each
49// turn. `f` is invoked multiple times before parking if it returns
50// Poll::Pending, with the hope that it becomes ready after some busy waiting
51// and we don't need to sleep the thread.
52fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
53    let _enter = enter().expect("cannot execute executor from within another executor");
54
55    CURRENT_THREAD_NOTIFY.with(|thread_notify| {
56        let waker = waker_ref(thread_notify);
57        let mut cx = Context::from_waker(&waker);
58        loop {
59            const SPINS: usize = 1 << 10;
60            for _ in 0..SPINS {
61                if let Poll::Ready(t) = f(&mut cx) {
62                    return t;
63                }
64            }
65
66            // Wait for a wakeup.
67            while !thread_notify.unparked.swap(false, Ordering::Acquire) {
68                // No wakeup occurred. It may occur now, right before parking,
69                // but in that case the token made available by `unpark()`
70                // is guaranteed to still be available and `park()` is a no-op.
71                thread::park();
72            }
73        }
74    })
75}
76
77/// Run a future to completion on the current thread.
78///
79/// This function will block the caller until the given future has completed.
80pub fn block_on<F: Future>(f: F) -> F::Output {
81    pin_mut!(f);
82    run_executor(|cx| f.as_mut().poll(cx))
83}