async_resource/util/
thread_waker.rs

1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::sync::{
4    atomic::{AtomicU8, Ordering},
5    Arc,
6};
7use std::task::Waker as TaskWaker;
8use std::thread;
9use std::time::Instant;
10
11use futures_util::task::{waker, ArcWake};
12
13const IDLE: u8 = 0;
14const BUSY: u8 = 1;
15const WAKE: u8 = 2;
16
17pub fn pair() -> (Waker, Waiter) {
18    let inner = Arc::new(Inner {
19        state: AtomicU8::new(BUSY),
20        thread: UnsafeCell::new(MaybeUninit::uninit()),
21    });
22    (
23        Waker {
24            inner: inner.clone(),
25        },
26        Waiter { inner },
27    )
28}
29
30struct Inner {
31    state: AtomicU8,
32    thread: UnsafeCell<MaybeUninit<thread::Thread>>,
33}
34
35impl Inner {
36    pub fn wake(self: &Arc<Self>) {
37        if self.state.swap(BUSY, Ordering::Release) == WAKE {
38            unsafe { self.thread.get().read().assume_init() }.unpark()
39        }
40    }
41}
42
43impl ArcWake for Inner {
44    fn wake_by_ref(arc_self: &Arc<Self>) {
45        arc_self.wake()
46    }
47}
48
49unsafe impl Sync for Inner {}
50
51pub struct Waker {
52    inner: Arc<Inner>,
53}
54
55impl Waker {
56    pub fn task_waker(&self) -> TaskWaker {
57        waker(self.inner.clone())
58    }
59
60    pub fn wake(&self) {
61        (&self.inner).wake()
62    }
63}
64
65impl Clone for Waker {
66    fn clone(&self) -> Self {
67        Self {
68            inner: self.inner.clone(),
69        }
70    }
71}
72
73pub struct Waiter {
74    inner: Arc<Inner>,
75}
76
77impl Waiter {
78    pub fn prepare_wait(&self) {
79        self.inner.state.store(IDLE, Ordering::Release);
80        unsafe {
81            self.inner
82                .thread
83                .get()
84                .write(MaybeUninit::new(thread::current()))
85        }
86    }
87
88    pub fn wait(&self) {
89        if self
90            .inner
91            .state
92            .compare_and_swap(IDLE, WAKE, Ordering::AcqRel)
93            != IDLE
94        {
95            return;
96        }
97        loop {
98            thread::park();
99            if self.inner.state.load(Ordering::Acquire) == BUSY {
100                break;
101            }
102        }
103    }
104
105    pub fn wait_until(&self, expire: Instant) -> bool {
106        if self
107            .inner
108            .state
109            .compare_and_swap(IDLE, WAKE, Ordering::AcqRel)
110            != IDLE
111        {
112            return false;
113        }
114        while let Some(dur) = expire.checked_duration_since(Instant::now()) {
115            thread::park_timeout(dur);
116            if self.inner.state.load(Ordering::Acquire) == BUSY {
117                return false;
118            }
119        }
120        true
121    }
122}