traceback_error/
block_on.rs1use core::{
2 cell::Cell,
3 fmt,
4 future::Future,
5 marker::PhantomData,
6 mem::ManuallyDrop,
7 ops::Deref,
8 sync::atomic::{AtomicBool, Ordering},
9 task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
10};
11use std::{
12 sync::Arc,
13 thread::{self, Thread},
14};
15
16#[macro_export]
17macro_rules! pin_mut {
18 ($($x:ident),* $(,)?) => { $(
19 let mut $x = $x;
20 #[allow(unused_mut)]
21 let mut $x = unsafe {
22 core::pin::Pin::new_unchecked(&mut $x)
23 };
24 )* }
25}
26
27trait ArcWake: Send + Sync {
28 fn wake(self: Arc<Self>) {
29 Self::wake_by_ref(&self)
30 }
31 fn wake_by_ref(arc_self: &Arc<Self>);
32}
33
34unsafe fn increase_refcount<T: ArcWake>(data: *const ()) {
35 let arc = core::mem::ManuallyDrop::new(Arc::<T>::from_raw(data.cast::<T>()));
36 let _arc_clone: core::mem::ManuallyDrop<_> = arc.clone();
37}
38
39unsafe fn clone_arc_raw<T: ArcWake>(data: *const ()) -> RawWaker {
40 increase_refcount::<T>(data);
41 RawWaker::new(data, waker_vtable::<T>())
42}
43
44unsafe fn wake_arc_raw<T: ArcWake>(data: *const ()) {
45 let arc: Arc<T> = Arc::from_raw(data.cast::<T>());
46 ArcWake::wake(arc);
47}
48
49unsafe fn wake_by_ref_arc_raw<T: ArcWake>(data: *const ()) {
50 let arc = core::mem::ManuallyDrop::new(Arc::<T>::from_raw(data.cast::<T>()));
51 ArcWake::wake_by_ref(&arc);
52}
53
54unsafe fn drop_arc_raw<T: ArcWake>(data: *const ()) {
55 drop(Arc::<T>::from_raw(data.cast::<T>()))
56}
57
58fn waker_vtable<W: ArcWake>() -> &'static RawWakerVTable {
59 &RawWakerVTable::new(
60 clone_arc_raw::<W>,
61 wake_arc_raw::<W>,
62 wake_by_ref_arc_raw::<W>,
63 drop_arc_raw::<W>,
64 )
65}
66
67struct ThreadNotify {
68 thread: Thread,
69 unparked: AtomicBool,
70}
71
72impl ArcWake for ThreadNotify {
73 fn wake_by_ref(arc_self: &Arc<Self>) {
74 let unparked = arc_self.unparked.swap(true, Ordering::Release);
75 if !unparked {
76 arc_self.thread.unpark();
77 }
78 }
79}
80
81thread_local! {
82 static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
83 thread: thread::current(),
84 unparked: AtomicBool::new(false),
85 });
86}
87
88thread_local!(static ENTERED: Cell<bool> = Cell::new(false));
89
90struct Enter {
91 _priv: (),
92}
93struct EnterError {
94 _priv: (),
95}
96
97impl fmt::Debug for EnterError {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 f.debug_struct("EnterError").finish()
100 }
101}
102
103impl fmt::Display for EnterError {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 write!(f, "an execution scope has already been entered")
106 }
107}
108
109impl std::error::Error for EnterError {}
110
111fn enter() -> Result<Enter, EnterError> {
112 ENTERED.with(|c| {
113 if c.get() {
114 Err(EnterError { _priv: () })
115 } else {
116 c.set(true);
117
118 Ok(Enter { _priv: () })
119 }
120 })
121}
122
123fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
124 let _enter = enter().expect(
125 "cannot execute `LocalPool` executor from within \
126 another executor",
127 );
128
129 CURRENT_THREAD_NOTIFY.with(|thread_notify| {
130 let waker = waker_ref(thread_notify);
131 let mut cx = Context::from_waker(&waker);
132 loop {
133 if let Poll::Ready(t) = f(&mut cx) {
134 return t;
135 }
136 while !thread_notify.unparked.swap(false, Ordering::Acquire) {
137 thread::park();
138 }
139 }
140 })
141}
142#[derive(Debug)]
143struct WakerRef<'a> {
144 waker: ManuallyDrop<Waker>,
145 _marker: PhantomData<&'a ()>,
146}
147impl<'a> WakerRef<'a> {
148 #[inline]
149 fn new_unowned(waker: ManuallyDrop<Waker>) -> Self {
150 Self {
151 waker,
152 _marker: PhantomData,
153 }
154 }
155}
156impl Deref for WakerRef<'_> {
157 type Target = Waker;
158
159 #[inline]
160 fn deref(&self) -> &Waker {
161 &self.waker
162 }
163}
164fn waker_ref<W>(wake: &Arc<W>) -> WakerRef<'_>
165where
166 W: ArcWake,
167{
168 let ptr = Arc::as_ptr(wake).cast::<()>();
169
170 let waker =
171 ManuallyDrop::new(unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::<W>())) });
172 WakerRef::new_unowned(waker)
173}
174
175pub(crate) fn block_on<F: Future>(f: F) -> F::Output {
176 pin_mut!(f);
177 run_executor(|cx| f.as_mut().poll(cx))
178}