qsdr_benchmarks/futures/executor.rs
1// Part of this code is taken from futures-rs/futures-executor
2
3use futures::{executor::enter, pin_mut};
4use futures_task::{waker_ref, ArcWake};
5use std::{
6 future::Future,
7 sync::{
8 atomic::{AtomicBool, Ordering},
9 Arc,
10 },
11 task::{Context, Poll},
12 thread::{self, Thread},
13};
14
15struct ThreadNotify {
16 /// The (single) executor thread.
17 thread: Thread,
18 /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
19 /// before the next `park()`, which may otherwise happen if the code
20 /// being executed as part of the future(s) being polled makes use of
21 /// park / unpark calls of its own, i.e. we cannot assume that no other
22 /// code uses park / unpark on the executing `thread`.
23 unparked: AtomicBool,
24}
25
26std::thread_local! {
27 static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
28 thread: thread::current(),
29 unparked: AtomicBool::new(false),
30 });
31}
32
33impl ArcWake for ThreadNotify {
34 fn wake_by_ref(arc_self: &Arc<Self>) {
35 // Make sure the wakeup is remembered until the next `park()`.
36 let unparked = arc_self.unparked.swap(true, Ordering::Release);
37 if !unparked {
38 // If the thread has not been unparked yet, it must be done
39 // now. If it was actually parked, it will run again,
40 // otherwise the token made available by `unpark`
41 // may be consumed before reaching `park()`, but `unparked`
42 // ensures it is not forgotten.
43 arc_self.thread.unpark();
44 }
45 }
46}
47
48// Set up and run a basic single-threaded spawner loop, invoking `f` on each
49// turn. `f` is invoked multiple times before parking if it returns
50// Poll::Pending, with the hope that it becomes ready after some busy waiting
51// and we don't need to sleep the thread.
52fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
53 let _enter = enter().expect("cannot execute executor from within another executor");
54
55 CURRENT_THREAD_NOTIFY.with(|thread_notify| {
56 let waker = waker_ref(thread_notify);
57 let mut cx = Context::from_waker(&waker);
58 loop {
59 const SPINS: usize = 1 << 10;
60 for _ in 0..SPINS {
61 if let Poll::Ready(t) = f(&mut cx) {
62 return t;
63 }
64 }
65
66 // Wait for a wakeup.
67 while !thread_notify.unparked.swap(false, Ordering::Acquire) {
68 // No wakeup occurred. It may occur now, right before parking,
69 // but in that case the token made available by `unpark()`
70 // is guaranteed to still be available and `park()` is a no-op.
71 thread::park();
72 }
73 }
74 })
75}
76
77/// Run a future to completion on the current thread.
78///
79/// This function will block the caller until the given future has completed.
80pub fn block_on<F: Future>(f: F) -> F::Output {
81 pin_mut!(f);
82 run_executor(|cx| f.as_mut().poll(cx))
83}