rusturnate/queue/callback/
thread.rs

1use core::{
2    cell::UnsafeCell,
3    panic::{RefUnwindSafe, UnwindSafe},
4    sync::atomic::{AtomicBool, Ordering},
5};
6use Ordering::{Acquire, Relaxed, Release};
7use std::{
8    thread::{self, Thread},
9    time::{Duration, Instant},
10};
11
12use super::{NotSendOrSync, PhantomData};
13
14pub struct ThreadImpl(AtomicBool, UnsafeCell<Option<Thread>>, NotSendOrSync);
15
16unsafe impl Sync for ThreadImpl {}
17
18impl RefUnwindSafe for ThreadImpl {}
19impl UnwindSafe for ThreadImpl {}
20
21impl Default for ThreadImpl {
22    #[inline]
23    fn default() -> Self {
24        let curr_thread: Thread = thread::current();
25        ThreadImpl(
26            AtomicBool::new(false),
27            UnsafeCell::new(Some(curr_thread)),
28            PhantomData,
29        )
30    }
31}
32
33impl ThreadImpl {
34    /// # Safety
35    /// Only call once. Don't use object anymore after calling `wake`.
36    #[inline]
37    pub unsafe fn wake(&self) {
38        let thread_cell: *mut Option<Thread> = self.1.get();
39        let thread: Thread = (*thread_cell).take().unwrap_unchecked();
40        debug_assert!(!self.0.load(Relaxed), "woken multiple times");
41        self.0.store(true, Release);
42        thread.unpark()
43    }
44
45    #[cfg(debug_assertions)]
46    #[inline]
47    pub fn check_signaled(&self) {
48        debug_assert!(self.0.load(Relaxed), "not signaled")
49    }
50
51    /// # Safety
52    /// Only call once from the thread where it was created. This is [`Sync`]
53    /// since it needs to be woken by another thread, but its `wait` method
54    /// must not be called from another thread.
55    ///
56    /// # Contract
57    /// This method will only return after `wake` was called.
58    ///
59    /// IMPORTANT: This method acts as a one-way barrier. Operations after
60    /// this function's return cannot be reordered in front of it due to an
61    /// `Acquire` at its end.
62    #[inline]
63    pub unsafe fn wait(&self) {
64        let ThreadImpl(signaled, _, _) = self;
65        while !signaled.load(Acquire) {
66            thread::park()
67        }
68    }
69
70    /// # Safety
71    /// Only call once from the thread where it was created. This is [`Sync`]
72    /// since it needs to be woken by another thread, but its `wait_timeout`
73    /// method must not be called from another thread.
74    ///
75    /// # Contract
76    /// This method will only return after `wake` was called or a timeout
77    /// occurred. If `wake` was not called in time, `true` MUST be returned.
78    ///
79    /// IMPORTANT: This method acts as a one-way barrier. Operations after
80    /// this function's return cannot be reordered in front of it due to an
81    /// `Acquire` at its end.
82    ///
83    /// This method returns `true` if a timeout occurred, `false` if otherwise.
84    #[inline]
85    pub unsafe fn wait_timeout(&self, start: Instant, dur: Duration) -> bool {
86        let ThreadImpl(signaled, _, _) = self;
87        while !signaled.load(Acquire) {
88            let elapsed: Duration = start.elapsed();
89            let remaining: Duration = match dur.checked_sub(elapsed) {
90                // Timed out:
91                None | Some(Duration::ZERO) => return true,
92                Some(rem) => rem,
93            };
94
95            let _: () = thread::park_timeout(remaining);
96        }
97        false
98    }
99}