traceback_error/
block_on.rs

1use core::{
2    cell::Cell,
3    fmt,
4    future::Future,
5    marker::PhantomData,
6    mem::ManuallyDrop,
7    ops::Deref,
8    sync::atomic::{AtomicBool, Ordering},
9    task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
10};
11use std::{
12    sync::Arc,
13    thread::{self, Thread},
14};
15
16#[macro_export]
17macro_rules! pin_mut {
18    ($($x:ident),* $(,)?) => { $(
19        let mut $x = $x;
20        #[allow(unused_mut)]
21        let mut $x = unsafe {
22            core::pin::Pin::new_unchecked(&mut $x)
23        };
24    )* }
25}
26
27trait ArcWake: Send + Sync {
28    fn wake(self: Arc<Self>) {
29        Self::wake_by_ref(&self)
30    }
31    fn wake_by_ref(arc_self: &Arc<Self>);
32}
33
34unsafe fn increase_refcount<T: ArcWake>(data: *const ()) {
35    let arc = core::mem::ManuallyDrop::new(Arc::<T>::from_raw(data.cast::<T>()));
36    let _arc_clone: core::mem::ManuallyDrop<_> = arc.clone();
37}
38
39unsafe fn clone_arc_raw<T: ArcWake>(data: *const ()) -> RawWaker {
40    increase_refcount::<T>(data);
41    RawWaker::new(data, waker_vtable::<T>())
42}
43
44unsafe fn wake_arc_raw<T: ArcWake>(data: *const ()) {
45    let arc: Arc<T> = Arc::from_raw(data.cast::<T>());
46    ArcWake::wake(arc);
47}
48
49unsafe fn wake_by_ref_arc_raw<T: ArcWake>(data: *const ()) {
50    let arc = core::mem::ManuallyDrop::new(Arc::<T>::from_raw(data.cast::<T>()));
51    ArcWake::wake_by_ref(&arc);
52}
53
54unsafe fn drop_arc_raw<T: ArcWake>(data: *const ()) {
55    drop(Arc::<T>::from_raw(data.cast::<T>()))
56}
57
58fn waker_vtable<W: ArcWake>() -> &'static RawWakerVTable {
59    &RawWakerVTable::new(
60        clone_arc_raw::<W>,
61        wake_arc_raw::<W>,
62        wake_by_ref_arc_raw::<W>,
63        drop_arc_raw::<W>,
64    )
65}
66
67struct ThreadNotify {
68    thread: Thread,
69    unparked: AtomicBool,
70}
71
72impl ArcWake for ThreadNotify {
73    fn wake_by_ref(arc_self: &Arc<Self>) {
74        let unparked = arc_self.unparked.swap(true, Ordering::Release);
75        if !unparked {
76            arc_self.thread.unpark();
77        }
78    }
79}
80
81thread_local! {
82    static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
83        thread: thread::current(),
84        unparked: AtomicBool::new(false),
85    });
86}
87
88thread_local!(static ENTERED: Cell<bool> = Cell::new(false));
89
90struct Enter {
91    _priv: (),
92}
93struct EnterError {
94    _priv: (),
95}
96
97impl fmt::Debug for EnterError {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        f.debug_struct("EnterError").finish()
100    }
101}
102
103impl fmt::Display for EnterError {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        write!(f, "an execution scope has already been entered")
106    }
107}
108
109impl std::error::Error for EnterError {}
110
111fn enter() -> Result<Enter, EnterError> {
112    ENTERED.with(|c| {
113        if c.get() {
114            Err(EnterError { _priv: () })
115        } else {
116            c.set(true);
117
118            Ok(Enter { _priv: () })
119        }
120    })
121}
122
123fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
124    let _enter = enter().expect(
125        "cannot execute `LocalPool` executor from within \
126         another executor",
127    );
128
129    CURRENT_THREAD_NOTIFY.with(|thread_notify| {
130        let waker = waker_ref(thread_notify);
131        let mut cx = Context::from_waker(&waker);
132        loop {
133            if let Poll::Ready(t) = f(&mut cx) {
134                return t;
135            }
136            while !thread_notify.unparked.swap(false, Ordering::Acquire) {
137                thread::park();
138            }
139        }
140    })
141}
142#[derive(Debug)]
143struct WakerRef<'a> {
144    waker: ManuallyDrop<Waker>,
145    _marker: PhantomData<&'a ()>,
146}
147impl<'a> WakerRef<'a> {
148    #[inline]
149    fn new_unowned(waker: ManuallyDrop<Waker>) -> Self {
150        Self {
151            waker,
152            _marker: PhantomData,
153        }
154    }
155}
156impl Deref for WakerRef<'_> {
157    type Target = Waker;
158
159    #[inline]
160    fn deref(&self) -> &Waker {
161        &self.waker
162    }
163}
164fn waker_ref<W>(wake: &Arc<W>) -> WakerRef<'_>
165where
166    W: ArcWake,
167{
168    let ptr = Arc::as_ptr(wake).cast::<()>();
169
170    let waker =
171        ManuallyDrop::new(unsafe { Waker::from_raw(RawWaker::new(ptr, waker_vtable::<W>())) });
172    WakerRef::new_unowned(waker)
173}
174
175pub(crate) fn block_on<F: Future>(f: F) -> F::Output {
176    pin_mut!(f);
177    run_executor(|cx| f.as_mut().poll(cx))
178}