async_resource/util/
thread_waker.rs1use std::cell::UnsafeCell;
2use std::mem::MaybeUninit;
3use std::sync::{
4 atomic::{AtomicU8, Ordering},
5 Arc,
6};
7use std::task::Waker as TaskWaker;
8use std::thread;
9use std::time::Instant;
10
11use futures_util::task::{waker, ArcWake};
12
13const IDLE: u8 = 0;
14const BUSY: u8 = 1;
15const WAKE: u8 = 2;
16
17pub fn pair() -> (Waker, Waiter) {
18 let inner = Arc::new(Inner {
19 state: AtomicU8::new(BUSY),
20 thread: UnsafeCell::new(MaybeUninit::uninit()),
21 });
22 (
23 Waker {
24 inner: inner.clone(),
25 },
26 Waiter { inner },
27 )
28}
29
30struct Inner {
31 state: AtomicU8,
32 thread: UnsafeCell<MaybeUninit<thread::Thread>>,
33}
34
35impl Inner {
36 pub fn wake(self: &Arc<Self>) {
37 if self.state.swap(BUSY, Ordering::Release) == WAKE {
38 unsafe { self.thread.get().read().assume_init() }.unpark()
39 }
40 }
41}
42
43impl ArcWake for Inner {
44 fn wake_by_ref(arc_self: &Arc<Self>) {
45 arc_self.wake()
46 }
47}
48
49unsafe impl Sync for Inner {}
50
51pub struct Waker {
52 inner: Arc<Inner>,
53}
54
55impl Waker {
56 pub fn task_waker(&self) -> TaskWaker {
57 waker(self.inner.clone())
58 }
59
60 pub fn wake(&self) {
61 (&self.inner).wake()
62 }
63}
64
65impl Clone for Waker {
66 fn clone(&self) -> Self {
67 Self {
68 inner: self.inner.clone(),
69 }
70 }
71}
72
73pub struct Waiter {
74 inner: Arc<Inner>,
75}
76
77impl Waiter {
78 pub fn prepare_wait(&self) {
79 self.inner.state.store(IDLE, Ordering::Release);
80 unsafe {
81 self.inner
82 .thread
83 .get()
84 .write(MaybeUninit::new(thread::current()))
85 }
86 }
87
88 pub fn wait(&self) {
89 if self
90 .inner
91 .state
92 .compare_and_swap(IDLE, WAKE, Ordering::AcqRel)
93 != IDLE
94 {
95 return;
96 }
97 loop {
98 thread::park();
99 if self.inner.state.load(Ordering::Acquire) == BUSY {
100 break;
101 }
102 }
103 }
104
105 pub fn wait_until(&self, expire: Instant) -> bool {
106 if self
107 .inner
108 .state
109 .compare_and_swap(IDLE, WAKE, Ordering::AcqRel)
110 != IDLE
111 {
112 return false;
113 }
114 while let Some(dur) = expire.checked_duration_since(Instant::now()) {
115 thread::park_timeout(dur);
116 if self.inner.state.load(Ordering::Acquire) == BUSY {
117 return false;
118 }
119 }
120 true
121 }
122}