async_rs/util/
block_on.rs1use std::{
2 future::Future,
3 sync::{
4 Arc,
5 atomic::{AtomicBool, Ordering},
6 },
7 task::{Context, Poll, Wake},
8 thread::{self, Thread},
9};
10
11pub fn simple_block_on<F: Future>(f: F) -> F::Output {
13 let _enter = enter();
14 let thread = ThreadWaker::new_arc();
15 let waker = thread.clone().into();
16 let mut cx = Context::from_waker(&waker);
17 let mut f = Box::pin(f);
18 loop {
19 match f.as_mut().poll(&mut cx) {
20 Poll::Ready(r) => return r,
21 Poll::Pending => thread.park(),
22 }
23 }
24}
25
26thread_local! {
27 static BUSY: AtomicBool = const { AtomicBool::new(false) };
28}
29
30struct EnterGuard;
31
32impl Drop for EnterGuard {
33 fn drop(&mut self) {
34 BUSY.with(|e| e.swap(false, Ordering::Acquire));
35 }
36}
37
38fn enter() -> EnterGuard {
39 if BUSY.with(|e| e.swap(true, Ordering::Release)) {
40 panic!("Cannot call simple_block_on recursively")
41 }
42
43 EnterGuard
44}
45
46struct ThreadWaker {
47 thread: Thread,
48 parked: AtomicBool,
49}
50
51impl ThreadWaker {
52 fn new_arc() -> Arc<Self> {
53 Arc::new(Self {
54 thread: thread::current(),
55 parked: AtomicBool::new(true),
56 })
57 }
58
59 fn park(&self) {
60 if !self.parked.swap(true, Ordering::Acquire) {
64 thread::park();
66 }
67 }
68
69 fn unpark(&self) {
70 if self.parked.swap(false, Ordering::Release) {
74 self.thread.unpark();
75 }
76 }
77}
78
79impl Wake for ThreadWaker {
80 fn wake(self: Arc<Self>) {
81 self.unpark()
82 }
83
84 fn wake_by_ref(self: &Arc<Self>) {
85 self.unpark()
86 }
87}
88
89#[cfg(test)]
90mod test {
91 use super::*;
92 use std::future;
93
94 #[test]
95 fn simple() {
96 assert_eq!(simple_block_on(future::ready(42)), 42);
97 }
98
99 #[test]
100 fn poll_fn() {
101 let mut a = 0;
102 let fut = future::poll_fn(move |cx| {
103 if a == 5 {
104 return Poll::Ready(10);
105 }
106 a += 1;
107 cx.waker().wake_by_ref();
108 Poll::Pending
109 });
110 assert_eq!(simple_block_on(fut), 10);
111 }
112}