uchan/event.rs
1use core::pin::Pin;
2
3/// An Event represents a single-producer, single-consumer notification synchronization primitive.
4/// A thread creates an Event using [`Event::with`], makes it available to the producer, and uses
5/// [`Event::wait`] to block the caller thread until [`Event::set`] is called from another thread
6/// or from the same consumer thread prior to [`Event::wait`].
7///
8/// The same thread which call [`Event::with`] will always be the same thread that waits.
9/// [`Event::set`] will also only be called by a single other "producer" thread, allowing for
10/// implementations to rely on any SPSC-style optimizations. Events are Pinned in case the
11/// implementation requires a stable address or utilizes intrusive memory.
12///
13/// ## Safety
14///
15/// [`Event::with`] **must** block until [`Event::set`] is called. Failure to uphold this invariant
16/// could result in undefined behavior at safe use sites.
17pub unsafe trait Event: Sync {
18 /// Construct a pinned Event, allowing the closure to call [`wait`] on the Event reference.
19 ///
20 /// [`wait`]: Self::wait
21 fn with(f: impl FnOnce(Pin<&Self>));
22
23 /// Blocks the caller until the another thread (or previously on the same thread)
24 /// invokes [`Event::set`].
25 fn wait(self: Pin<&Self>);
26
27 /// Marks the thread as active and unblocks the waiting thread if any.
28 /// Further attempts to call [`Event::wait`] should return immediately.
29 fn set(self: Pin<&Self>);
30}
31
32/// A TimedEvent is an Event which supports waiting for [`Event::set`] but with a timeout.
33/// Most conditions of [`Event::wait`] still apply, but the timeout allows the caller to
34/// stop blocking before [`Event::set`] is called from another producer thread.
35pub trait TimedEvent: Event {
36 /// A timeout value used to represent the maximum amount of time to block the caller when waiting.
37 type Duration;
38
39 /// Similar to [`Event::wait`], but takes in a maximum amount of time to block in case the Event is not set.
40 /// Returns true if the Event was set and returns false if the caller timed out trying to wait for the Event to be set.
41 fn try_wait_for(self: Pin<&Self>, timeout: Self::Duration) -> bool;
42}
43
44#[cfg(feature = "std")]
45pub use if_std::*;
46
47#[cfg(feature = "std")]
48mod if_std {
49 use std::{
50 cell::Cell,
51 fmt,
52 marker::PhantomPinned,
53 pin::Pin,
54 sync::atomic::{AtomicBool, Ordering},
55 thread,
56 };
57
58 /// An implementation of [`Event`] and [`TimedEvent`] using `std::thread`.
59 ///
60 /// [`Event`]: super::Event
61 /// [`TimedEvent`]: super::TimedEvent
62 pub struct StdEvent {
63 thread: Cell<Option<thread::Thread>>,
64 unparked: AtomicBool,
65 _pinned: PhantomPinned,
66 }
67
68 impl fmt::Debug for StdEvent {
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 f.debug_struct("StdEvent").finish()
71 }
72 }
73
74 unsafe impl Sync for StdEvent {}
75
76 unsafe impl super::Event for StdEvent {
77 fn with(f: impl FnOnce(Pin<&Self>)) {
78 let event = Self {
79 thread: Cell::new(Some(thread::current())),
80 unparked: AtomicBool::new(false),
81 _pinned: PhantomPinned,
82 };
83
84 // SAFETY: the event is stack allocated and lives only for the closure.
85 f(unsafe { Pin::new_unchecked(&event) })
86 }
87
88 fn wait(self: Pin<&Self>) {
89 while !self.unparked.load(Ordering::Acquire) {
90 thread::park();
91 }
92 }
93
94 fn set(self: Pin<&Self>) {
95 let thread = self.thread.take().expect("StdEvent without a thread");
96 self.unparked.store(true, Ordering::Release);
97 thread.unpark();
98 }
99 }
100
101 impl super::TimedEvent for StdEvent {
102 type Duration = std::time::Duration;
103
104 fn try_wait_for(self: Pin<&Self>, timeout: Self::Duration) -> bool {
105 let mut started = None;
106 loop {
107 if self.unparked.load(Ordering::Acquire) {
108 return true;
109 }
110
111 #[cfg(miri)]
112 {
113 std::mem::drop::<Option<()>>(started);
114 return false;
115 }
116
117 #[cfg(not(miri))]
118 {
119 let now = std::time::Instant::now();
120 let start = started.unwrap_or(now);
121 started = Some(start);
122
123 match timeout.checked_sub(now.duration_since(start)) {
124 Some(until_timeout) => thread::park_timeout(until_timeout),
125 None => return false,
126 }
127 }
128 }
129 }
130 }
131}