uchan/
event.rs

1use core::pin::Pin;
2
3/// An Event represents a single-producer, single-consumer notification synchronization primitive.
4/// A thread creates an Event using [`Event::with`], makes it available to the producer, and uses
5/// [`Event::wait`] to block the caller thread until [`Event::set`] is called from another thread
6/// or from the same consumer thread prior to [`Event::wait`].
7///
8/// The same thread which call [`Event::with`] will always be the same thread that waits.
9/// [`Event::set`] will also only be called by a single other "producer" thread, allowing for
10/// implementations to rely on any SPSC-style optimizations. Events are Pinned in case the
11/// implementation requires a stable address or utilizes intrusive memory.
12///
13/// ## Safety
14///
15/// [`Event::with`] **must** block until [`Event::set`] is called. Failure to uphold this invariant
16/// could result in undefined behavior at safe use sites.
17pub unsafe trait Event: Sync {
18    /// Construct a pinned Event, allowing the closure to call [`wait`] on the Event reference.
19    ///
20    /// [`wait`]: Self::wait
21    fn with(f: impl FnOnce(Pin<&Self>));
22
23    /// Blocks the caller until the another thread (or previously on the same thread)
24    /// invokes [`Event::set`].
25    fn wait(self: Pin<&Self>);
26
27    /// Marks the thread as active and unblocks the waiting thread if any.
28    /// Further attempts to call [`Event::wait`] should return immediately.
29    fn set(self: Pin<&Self>);
30}
31
32/// A TimedEvent is an Event which supports waiting for [`Event::set`] but with a timeout.
33/// Most conditions of [`Event::wait`] still apply, but the timeout allows the caller to
34/// stop blocking before [`Event::set`] is called from another producer thread.
35pub trait TimedEvent: Event {
36    /// A timeout value used to represent the maximum amount of time to block the caller when waiting.
37    type Duration;
38
39    /// Similar to [`Event::wait`], but takes in a maximum amount of time to block in case the Event is not set.
40    /// Returns true if the Event was set and returns false if the caller timed out trying to wait for the Event to be set.
41    fn try_wait_for(self: Pin<&Self>, timeout: Self::Duration) -> bool;
42}
43
44#[cfg(feature = "std")]
45pub use if_std::*;
46
47#[cfg(feature = "std")]
48mod if_std {
49    use std::{
50        cell::Cell,
51        fmt,
52        marker::PhantomPinned,
53        pin::Pin,
54        sync::atomic::{AtomicBool, Ordering},
55        thread,
56    };
57
58    /// An implementation of [`Event`] and [`TimedEvent`] using `std::thread`.
59    ///
60    /// [`Event`]: super::Event
61    /// [`TimedEvent`]: super::TimedEvent
62    pub struct StdEvent {
63        thread: Cell<Option<thread::Thread>>,
64        unparked: AtomicBool,
65        _pinned: PhantomPinned,
66    }
67
68    impl fmt::Debug for StdEvent {
69        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70            f.debug_struct("StdEvent").finish()
71        }
72    }
73
74    unsafe impl Sync for StdEvent {}
75
76    unsafe impl super::Event for StdEvent {
77        fn with(f: impl FnOnce(Pin<&Self>)) {
78            let event = Self {
79                thread: Cell::new(Some(thread::current())),
80                unparked: AtomicBool::new(false),
81                _pinned: PhantomPinned,
82            };
83
84            // SAFETY: the event is stack allocated and lives only for the closure.
85            f(unsafe { Pin::new_unchecked(&event) })
86        }
87
88        fn wait(self: Pin<&Self>) {
89            while !self.unparked.load(Ordering::Acquire) {
90                thread::park();
91            }
92        }
93
94        fn set(self: Pin<&Self>) {
95            let thread = self.thread.take().expect("StdEvent without a thread");
96            self.unparked.store(true, Ordering::Release);
97            thread.unpark();
98        }
99    }
100
101    impl super::TimedEvent for StdEvent {
102        type Duration = std::time::Duration;
103
104        fn try_wait_for(self: Pin<&Self>, timeout: Self::Duration) -> bool {
105            let mut started = None;
106            loop {
107                if self.unparked.load(Ordering::Acquire) {
108                    return true;
109                }
110
111                #[cfg(miri)]
112                {
113                    std::mem::drop::<Option<()>>(started);
114                    return false;
115                }
116
117                #[cfg(not(miri))]
118                {
119                    let now = std::time::Instant::now();
120                    let start = started.unwrap_or(now);
121                    started = Some(start);
122
123                    match timeout.checked_sub(now.duration_since(start)) {
124                        Some(until_timeout) => thread::park_timeout(until_timeout),
125                        None => return false,
126                    }
127                }
128            }
129        }
130    }
131}