block_on/lib.rs
1mod enter;
2
3use crate::enter::enter;
4use core::future::Future;
5use core::task::{Context, Poll};
6use futures_task::{waker_ref, ArcWake};
7use pin_utils::pin_mut;
8use std::sync::{
9 atomic::{AtomicBool, Ordering},
10 Arc,
11};
12use std::thread::{self, Thread};
13
14pub(crate) struct ThreadNotify {
15 /// The (single) executor thread.
16 thread: Thread,
17 /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
18 /// before the next `park()`, which may otherwise happen if the code
19 /// being executed as part of the future(s) being polled makes use of
20 /// park / unpark calls of its own, i.e. we cannot assume that no other
21 /// code uses park / unpark on the executing `thread`.
22 unparked: AtomicBool,
23}
24
25impl ArcWake for ThreadNotify {
26 fn wake_by_ref(arc_self: &Arc<Self>) {
27 // Make sure the wakeup is remembered until the next `park()`.
28 let unparked = arc_self.unparked.swap(true, Ordering::Release);
29 if !unparked {
30 // If the thread has not been unparked yet, it must be done
31 // now. If it was actually parked, it will run again,
32 // otherwise the token made available by `unpark`
33 // may be consumed before reaching `park()`, but `unparked`
34 // ensures it is not forgotten.
35 arc_self.thread.unpark();
36 }
37 }
38}
39
40thread_local! {
41 static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
42 thread: thread::current(),
43 unparked: AtomicBool::new(false),
44 });
45}
46
47// Set up and run a basic single-threaded spawner loop, invoking `f` on each
48// turn.
49fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
50 let _enter = enter().expect(
51 "cannot execute `LocalPool` executor from within \
52 another executor",
53 );
54
55 CURRENT_THREAD_NOTIFY.with(|thread_notify| {
56 let waker = waker_ref(thread_notify);
57 let mut cx = Context::from_waker(&waker);
58 loop {
59 if let Poll::Ready(t) = f(&mut cx) {
60 return t;
61 }
62
63 // Wait for a wakeup.
64 while !thread_notify.unparked.swap(false, Ordering::Acquire) {
65 // No wakeup occurred. It may occur now, right before parking,
66 // but in that case the token made available by `unpark()`
67 // is guaranteed to still be available and `park()` is a no-op.
68 thread::park();
69 }
70 }
71 })
72}
73
74pub fn block_on<F: Future>(f: F) -> F::Output {
75 pin_mut!(f);
76 run_executor(|cx| f.as_mut().poll(cx))
77}