rsevents/lib.rs
1//! `rsevents` is an implementation of WIN32's auto- and manual-reset events for the rust world.
2//! Events are synchronization primitives (i.e. not implemented atop of mutexes) used to either
3//! create other synchronization primitives with or for implementing signalling between threads.
4//!
5//! Events come in two different flavors: [`AutoResetEvent`] and [`ManualResetEvent`]. Internally,
6//! both are implemented with the unsafe `RawEvent` and use the `parking_lot_core` crate to take
7//! care of efficiently suspending (parking) threads while they wait for an event to become
8//! signalled, and take care of memory coherence issues between the signalling and signalled
9//! threads.
10//!
11//! An event is a synchronization primitive that is functionally the equivalent of an awaitable
12//! boolean that allows for synchronization between threads. Unlike mutexes and condition variables
13//! which are most often used to restrict access to a critical section, events are more appropriate
14//! for efficiently signalling remote threads or waiting on a remote thread to change state - or for
15//! building your own synchronization types on top of something both light and easy to use.
16
17#![forbid(missing_docs)]
18
19use parking_lot_core as plc;
20use parking_lot_core::ParkResult;
21use std::convert::Infallible;
22use std::sync::atomic::{AtomicU8, Ordering};
23use std::time::{Duration, Instant};
24
25#[cfg(test)]
26mod tests;
27
28/// The event is available when this bit is set, otherwise it is unavailable.
29const AVAILABLE_BIT: u8 = 0x01;
30/// There are one or more threads waiting to obtain the event.
31const WAITING_BIT: u8 = 0x02;
32
33/// A wrapper around an atomic state that represents whether or not the event is available.
34/// This isn't pinned and it seems that pinning is unnecessary because the lock may be moved so long
35/// as it is not borrowed (for prior art, see rust's `src/sys/windows/locks/mutex.rs` which is
36/// similarly directly exposed without pinning/boxing to make a movable mutex.
37///
38/// The lowest two bits of the u8 state are used, 0b010 represents the WAITING bit which is set when
39/// a thread is parked or about to park, and 0b001 represents the AVAILABLE bit, set when the event
40/// is available and cleared otherwise.
41///
42/// The following combinations are possible:
43/// * 0b00: Unavailable
44/// The event is not available and no threads are waiting on it. It can be "fast set" without
45/// going through the plc lock.
46/// * 0b01: Available
47/// The event is available and there are no threads waiting on it. It can be "fast obtained"
48/// without going through the plc lock.
49/// * 0b10: Unavailable w/ Parked Waiters
50/// The event is unavailable and there are one or more threads parked or trying to park to wait
51/// for the event to become available. We must go through the plc lock to preferentially "give"
52/// the event to a waiting thread.
53/// * 0b11: Available w/ Parked Threads
54/// The event is available but there are parked threads waiting for it. This is not a valid state
55/// and no function should end with this being the quiescent state.
56///
57#[doc(hidden)]
58pub struct RawEvent(AtomicU8);
59
60/// A representation of the state of an event, which can either be `Set` (i.e. signalled,
61/// ready) or `Unset` (i.e. not ready).
62#[derive(Clone, Debug, PartialEq)]
63#[repr(u8)]
64pub enum EventState {
65 /// The event is available and call(s) to [`Awaitable::wait()`] will go through without
66 /// blocking, i.e. the event is signalled.
67 Set,
68 /// The event is unavailable and calls to [`Awaitable::wait()`] will block until the event
69 /// becomes set, i.e. the event is unsignalled.
70 Unset,
71}
72
73impl RawEvent {
74 const fn new(state: u8) -> RawEvent {
75 RawEvent(AtomicU8::new(state))
76 }
77
78 #[inline]
79 /// Attempts to exclusively obtain the event. Returns true upon success. Called internally by
80 /// [`suspend_one()`](Self::suspend_one) when determining if a thread should be parked/suspended
81 /// or if that's not necessary.
82 fn try_unlock_one(&self) -> bool {
83 // Obtains the event if it is both available and there are no threads waiting on it.
84 self.0.compare_exchange_weak(AVAILABLE_BIT, 0, Ordering::Acquire, Ordering::Relaxed)
85 .is_ok()
86 }
87
88 #[cfg(any(test, miri))]
89 /// This entry point is used to deterministically determine if the event could be obtained
90 /// without any spurious failures. We don't override the actual behavior of try_unlock_one() so
91 /// that any internal functions calling into it can still be tested against both normal and
92 /// spurious failure modes.
93 fn test_try_unlock_one(&self) -> bool {
94 self.0.compare_exchange(AVAILABLE_BIT, 0, Ordering::Acquire, Ordering::Relaxed)
95 .is_ok()
96 }
97
98 #[inline]
99 /// Attempts to obtain the event (without locking out future callers). Returns true upon success.
100 fn try_unlock_all(&self) -> bool {
101 // Obtains the event if it is available, with no preconditions.
102 (self.0.load(Ordering::Acquire) & AVAILABLE_BIT) != 0
103 }
104
105 /// Parks the calling thread until the underlying event has unlocked. If the event is set during
106 /// this call, the park aborts/returns early so that no event sets are missed. Consumes the
107 /// event's set state in case of early return.
108 ///
109 /// Returns `true` only if the thread has obtained the thread, otherwise returns `false` (only
110 /// possible in the case of a timeout).
111 unsafe fn suspend_one(&self, timeout: Option<Duration>) -> bool {
112 let timeout = timeout.map(|duration| Instant::now() + duration);
113 let mut state = self.0.load(Ordering::Relaxed);
114 loop {
115 // The only way a thread can obtain the event _before_ it is parked/put to sleep is to
116 // check on the state before setting the WAITING bit for itself, otherwise it can't know
117 // if there are any other threads waiting so it can't clear the WAITING bit, and if it
118 // can't clear the WAITING bit, it can't obtain the event.
119 if (state & AVAILABLE_BIT) != 0 {
120 // The lock is available; try to obtain it even if the WAITING bit is set by
121 // another thread.
122 match self.0.compare_exchange_weak(
123 state, state & !AVAILABLE_BIT, Ordering::Acquire, Ordering::Relaxed,
124 ) {
125 Ok(_) => {
126 // The lock was obtained; there may or may not be other threads suspended.
127 return true;
128 }
129 Err(s) => {
130 // Another thread contended with this call, loop to try again.
131 state = s;
132 continue;
133 }
134 }
135 } else if (state & WAITING_BIT) == 0 {
136 // There are no other threads waiting, so we need to set the WAITING bit ourselves
137 // before we try to park the thread.
138 match self.0.compare_exchange_weak(
139 state, state | WAITING_BIT, Ordering::Relaxed, Ordering::Relaxed,
140 ) {
141 Ok(_) => {
142 // We set the WAITING bit and can continue with attempting to park this
143 // thread.
144 }
145 Err(s) => {
146 // Another thread contended with this call, loop to try again.
147 state = s;
148 continue;
149 }
150 }
151 } else {
152 // The event isn't available and another thread has already marked it as pending, so
153 // we are good to go.
154 }
155
156 // This callback is run with the plc queue locked, before the thread is parked. If it
157 // returns false, the park is aborted. We can't opportunistically claim the event here
158 // even if it is available, because we don't know if there are other suspended threads,
159 // which means we can't clear the WAITING bit in case we were the last thread.
160 let before_suspend = || -> bool {
161 self.0.load(Ordering::Relaxed) == WAITING_BIT
162 };
163
164 // This callback is run with the plc queue locked, which is the only time we can safely
165 // clear the WAITING bit (because `before_suspend` checks the WAITING bit with the queue
166 // locked as well), making it possible to abort/retry the park() call if there's any
167 // contention.
168 let on_timeout = |_, last_thread| {
169 if last_thread {
170 self.0.fetch_and(!WAITING_BIT, Ordering::Relaxed);
171 }
172 };
173
174 match plc::park(
175 self as *const RawEvent as usize, // key for keyed event
176 before_suspend,
177 || {}, // callback before parking, run after queue is unlocked
178 on_timeout,
179 plc::DEFAULT_PARK_TOKEN,
180 timeout,
181 ) {
182 // before_suspend() detected a change in the state that indicates the lock may have
183 // become available (or the WAITING bit could have been cleared because another
184 // thread, which was the last actually parked thread, was awoken).
185 // Loop to retry so we never miss a set() call.
186 ParkResult::Invalid => state = self.0.load(Ordering::Relaxed),
187 // The timeout was reached before the thread could be obtained.
188 ParkResult::TimedOut => return false,
189 // The thread was awoken by another thread calling into unlock_one().
190 ParkResult::Unparked(_) => return true,
191 }
192 }
193 }
194
195 /// Parks the calling thread until the underlying event has unlocked. If the event is set during
196 /// this call, the park aborts/returns early so that no event sets are missed. Unlike
197 /// [`suspend_one()`](Self::suspend_one), does not consume the event's set state in case of
198 /// early return.
199 unsafe fn suspend_all(&self, timeout: Option<Duration>) -> bool {
200 let timeout = timeout.map(|duration| Instant::now() + duration);
201 let mut state = self.0.load(Ordering::Relaxed);
202 loop {
203 // The only way a thread can obtain the event _before_ it is parked/put to sleep is to
204 // check on the state before setting the WAITING bit for itself, otherwise it can't know
205 // if there are any other threads waiting so it can't clear the WAITING bit, and if it
206 // can't clear the WAITING bit, it can't directly obtain the event for itself.
207 if (state & AVAILABLE_BIT) != 0 {
208 // The event has become available. We can return right away; we don't care about
209 // anything else.
210 return true;
211 } else if (state & WAITING_BIT) == 0 {
212 // There are no other threads waiting, so we need to set the WAITING bit ourselves
213 // before we try to park the thread.
214 match self.0.compare_exchange_weak(
215 state, state | WAITING_BIT, Ordering::Relaxed, Ordering::Relaxed,
216 ) {
217 Ok(_) => {
218 // We set the WAITING bit without contention and can move on to trying to
219 // park this thread.
220 }
221 Err(s) => {
222 // Another thread contended with this call, loop to try again.
223 state = s;
224 continue;
225 }
226 }
227 } else {
228 // The event isn't available and another thread has already marked it as pending, so
229 // we are good to go.
230 }
231
232 // This callback is run with the plc queue locked, before the thread is parked. If it
233 // returns false, the park is aborted. We can't opportunistically claim the event here
234 // even if it is available, because we don't know if there are other suspended threads,
235 // which means we can't clear the WAITING bit in case we were the last thread.
236 let before_suspend = || -> bool {
237 self.0.load(Ordering::Relaxed) == WAITING_BIT
238 };
239
240 // This callback is run with the plc queue locked, which is the only time we can safely
241 // clear the WAITING bit (because `before_suspend` checks the WAITING bit with the queue
242 // locked as well), making it possible to abort/retry the park() call if there's any
243 // contention.
244 let mut timeout_result = false;
245 let on_timeout = |_, last_thread| {
246 if last_thread {
247 // self.0.fetch_and(!WAITING_BIT, Ordering::Relaxed);
248 if (self.0.swap(0, Ordering::Relaxed) & AVAILABLE_BIT) != 0 {
249 timeout_result = true;
250 }
251 }
252 };
253
254 match plc::park(
255 self as *const RawEvent as usize, // key for keyed event
256 before_suspend,
257 || {}, // callback before parking, run after queue is unlocked
258 on_timeout,
259 plc::DEFAULT_PARK_TOKEN,
260 timeout,
261 ) {
262 // before_suspend() detected a change in the state that indicates the lock may have
263 // become available (or the WAITING bit could have been cleared because another
264 // thread, which was the last actually parked thread, was awoken).
265 // Loop to retry so we never miss a set() call.
266 ParkResult::Invalid => state = self.0.load(Ordering::Relaxed),
267 // The timeout was reached before the thread could be obtained.
268 ParkResult::TimedOut => return timeout_result,
269 // The thread was awoken by another thread calling into unlock_all().
270 ParkResult::Unparked(_) => return true,
271 }
272 }
273 }
274
275 /// Trigger the event, releasing one waiter
276 fn set_one(&self) {
277 // Optimize for cases where the event wasn't available and isn't under any contention.
278 // NOTE: This makes calling set() on an already set event more expensive. This match block
279 // can be replaced with `self.0.load(Ordering::Relaxed)` to bypass this optimization.
280 let mut state = match self.0.compare_exchange(
281 0, AVAILABLE_BIT, Ordering::Release, Ordering::Relaxed,
282 ) {
283 Ok(_) => return,
284 Err(s) => s,
285 };
286
287 loop {
288 match state {
289 0b00 => {
290 // There are no parked/suspended threads so we are able to "fast set" the event
291 // without worrying about synchronizing with threads parked or about to park.
292 match self.0.compare_exchange_weak(
293 0, AVAILABLE_BIT, Ordering::Release, Ordering::Relaxed,
294 ) {
295 Ok(_) => return,
296 Err(s) => {
297 // We raced with a thread trying to park or another call to set(). Loop
298 // to figure out what happened.
299 state = s;
300 continue;
301 }
302 }
303 }
304 0b01 => {
305 // This was a call to set_one() on an event that was already set; we don't need to
306 // "do" anything but we need to touch the shared memory location to ensure
307 // memory ordering.
308 //
309 // It may be possible to forego this, but I'm not sure if that's wise. It is true
310 // that a thread awoken after two back-to-back set() calls is guaranteed to see at
311 // least _something_ new without an explicit Release here, but there's no guarantee
312 // that there will ever be any more set() calls afterwards, meaning whatever was
313 // written by the thread making the second set() call may never wind up being
314 // observed by a thread that fast-obtains the event in a wait() call.
315 match self.0.compare_exchange_weak(
316 state, state, Ordering::Release, Ordering::Relaxed,
317 ) {
318 Ok(_) => return,
319 Err(s) => {
320 state = s;
321 continue;
322 }
323 }
324 }
325 0b10 => {
326 // A thread is waiting to obtain this event, so we can't fast set it and must
327 // instead go through the plc queue lock.
328 break;
329 }
330 0b11 => {
331 // This shouldn't happen but it's hard to guarantee because of the interplay
332 // between concurrent `set_one()` calls at the same time as a `suspend_one()`
333 // call attempts to park a thread.
334 #[cfg(any(test, miri))]
335 assert!(false, "AVAILABLE and WAITING bits set!");
336 break;
337 }
338 _ => {
339 // We only use the lowest two bits of the AtomicU8 state
340 unsafe { core::hint::unreachable_unchecked() }
341 }
342 }
343 }
344
345 unsafe {
346 // The unpark callback happens with the plc queue locked, so we guarantee that the logic
347 // here happens either completely before or completely after the logic in the callback
348 // passed to plc::park() in suspend_one().
349 plc::unpark_one(self as *const RawEvent as usize, |unpark_result| {
350 // This has to be done here with the plc queue locked so that a simultaneous call
351 // into plc::park() will not suspend a thread after we've tried unfruitfully to
352 // awaken one but before we've had a chance to set the internal state, causing the
353 // set_one() call to be missed.
354
355 if unpark_result.unparked_threads == 0 {
356 // This can happen if there were two simultaneous calls to set_one() with only
357 // one thread parked or if there were no threads parked but a thread trying to
358 // park (which then didn't happen when we changed the state above and it failed
359 // the park validation callback).
360 // It's not only safe but actually required to stomp the WAITING bit because we
361 // have the plc queue locked - contending threads (about to park but not yet
362 // parked) will retry. If we don't stomp the WAITING bit and there was only one
363 // thread _about_ to park but not yet parked, it would loop after the park
364 // validation callback failed, but the WAITING bit wouldn't have been cleared
365 // and it won't be able to obtain the event.
366 self.0.store(AVAILABLE_BIT, Ordering::Release);
367 } else if !unpark_result.have_more_threads {
368 // Clear the WAITING bit. We can stomp the AVAILABLE bit because until we clear
369 // the WAITING bit, the AVAILABLE bit can only be set in the plc::unpark_one()
370 // callback w/ the queue locked.
371 // Clearing the WAITING bit makes it possible for the next call to set_one() to
372 // fast-set the event without going through the plc lock and triggers a retry in
373 // any threads trying to park.
374 self.0.store(0, Ordering::Release);
375 } else {
376 // One thread was unparked but there are others still waiting. Subsequent
377 // set_one() calls will still need to go through the plc lock.
378 // We need to write to the shared memory address to guarantee Release semantics,
379 // and we can stomp the AVAILABLE bit since it can only be set with the plc lock
380 // held once the WAITING bit is asserted.
381 self.0.store(WAITING_BIT, Ordering::Release);
382 }
383 plc::DEFAULT_UNPARK_TOKEN
384 })
385 };
386 }
387
388 /// Trigger the event, releasing all waiters
389 fn set_all(&self) {
390 // Stomp the WAITING bit (if set) so that no other thread wastes time trying to unpark
391 // threads since we're going to unlock them all.
392 let prev_state = self.0.swap(AVAILABLE_BIT, Ordering::Release);
393 if (prev_state & WAITING_BIT) == 0 {
394 // No threads were suspended/about to be suspended so we can just return. Or maybe there
395 // _are_ other threads suspended but we raced with a simultaneous call into set_all()
396 // and that other thread is going to handle waking them.
397 return;
398 }
399
400 let _unparked = unsafe {
401 plc::unpark_all(self as *const RawEvent as usize, plc::DEFAULT_UNPARK_TOKEN)
402 };
403
404 // NOTE: _unparked may equal zero if there were no threads fully parked but there was a
405 // thread _about_ to park until changing the state above caused its validation callback to
406 // fail and then on retry they just obtained the available lock and returned.
407 }
408
409 fn unlock_one(&self) {
410 if !self.try_unlock_one() {
411 unsafe {
412 self.suspend_one(None);
413 }
414 }
415 }
416
417 fn unlock_all(&self) {
418 if !self.try_unlock_all() {
419 unsafe {
420 self.suspend_all(None);
421 }
422 }
423 }
424
425 /// Put the event in a locked (reset) state.
426 fn reset(&self) {
427 // Clear the AVAILABLE bit without touching the WAITING bit.
428 // Calling reset() does not imply any strict ordering of code before or after a matching
429 // (try_)unlock() call, so we use Relaxed semantics.
430 self.0.fetch_and(!AVAILABLE_BIT, Ordering::Relaxed);
431 }
432
433 fn wait_one_for(&self, limit: Duration) -> bool {
434 if self.try_unlock_one() {
435 return true;
436 }
437
438 unsafe {self.suspend_one(Some(limit)) }
439 }
440
441 fn wait_all_for(&self, limit: Duration) -> bool {
442 if self.try_unlock_all() {
443 return true;
444 }
445
446 unsafe { self.suspend_all(Some(limit)) }
447 }
448}
449
450#[doc(hidden)]
451/// This is for backwards compatibility with earlier `rsevents` releases, which used the less
452/// specific (and much more likely to conflict) name `State` instead of `EventState`.
453pub type State = EventState;
454
455/// The default error returned by types implementing [`Awaitable`], with the only possible error
456/// being a timeout to a bounded `wait()` call.
457///
458/// When `Awaitable<Error = TimeoutError>`, a simpler `Awaitable` api bypassing error handling is
459/// exposed.
460#[derive(Debug, Copy, Clone)]
461pub struct TimeoutError;
462
463impl std::fmt::Display for TimeoutError {
464 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
465 f.write_str("The wait call timed out")
466 }
467}
468
469impl std::error::Error for TimeoutError {}
470
471impl AwaitableError for TimeoutError {
472 type UnboundedError = std::convert::Infallible;
473}
474
475impl std::convert::From<Infallible> for TimeoutError {
476 fn from(_: Infallible) -> Self {
477 unsafe { core::hint::unreachable_unchecked() }
478 }
479}
480
481mod sealed {
482 use crate::{Awaitable, AwaitableError};
483
484 pub trait InfallibleUnboundedWait {}
485 impl<E> InfallibleUnboundedWait for E where
486 E: AwaitableError<UnboundedError = std::convert::Infallible>
487 {
488 }
489
490 pub trait VoidAwaitable {}
491 impl<T, E> VoidAwaitable for T
492 where
493 T: for<'a> Awaitable<'a, T = (), Error = E>,
494 E: std::error::Error,
495 {
496 }
497}
498
499/// Denotes the error returned by an [`Awaitable`] object for its various wait calls, separating
500/// between internal errors preventing the wait from succeeding (e.g. a poison error) and errors due
501/// only to a timeout.
502///
503/// Types implementing `Awaitable<T = (), Error = TimeoutError>` unlock a much simpler `Awaitable`
504/// api for end users, that omits error handling and replaces timeout errors with boolean results.
505pub trait AwaitableError: std::error::Error {
506 /// The error type that may result from a call to an unbounded [`Awaitable::try_wait()`] call
507 /// (i.e. excluding any timeout errors).
508 ///
509 /// Using `std::convert::Infallible` here will unlock a simpler `Awaitable` API for end users,
510 /// with an infallible [`Awaitable::wait()`] becoming available. Typically use `Self` otherwise
511 /// to denote that `wait()` and `wait_for()` return the same error type.
512 ///
513 /// It is recommended - but not required - to implement or otherwise satisfy the constraint
514 /// `From<E::UnboundedError>` for `E` where `E` implements `AwaitableError`.
515 type UnboundedError: std::error::Error;
516}
517
518/// The basic interface for waiting on void awaitable types
519///
520/// This is a unified trait that is used by `rsevents` and downstream dependent crates implementing
521/// synchronization primitives atop of `rsevents` to expose a single interface for waiting on an
522/// object either indefinitely or for a bounded length of time.
523///
524/// Types implementing `Awaitable<T = (), Error = TimeoutError>` unlock a much simpler `Awaitable`
525/// api for end users, that omits error handling and replaces timeout errors with boolean results.
526pub trait Awaitable<'a> {
527 /// The type yielded by the Awaitable type on a successful wait
528 type T;
529 /// The type yielded by the Awaitable type in case of an error, also specifying whether or not
530 /// an unbounded `Awaitable::wait()` returns any error at all.
531 type Error: AwaitableError;
532
533 /// Waits on the `Awaitable` type, blocking efficiently until it becomes available. Returns the
534 /// awaited type `T` (if it isn't `()`) or an error indicating a wait issue. Does not time out.
535 fn try_wait(&'a self) -> Result<Self::T, <Self::Error as AwaitableError>::UnboundedError>;
536
537 /// Waits on the `Awaitable` type until it becomes available or the timeout period described by
538 /// `limit` elapses, in which case a timeout error is returned.
539 fn try_wait_for(&'a self, limit: Duration) -> Result<Self::T, Self::Error>;
540
541 /// Attempt to obtain the `Awaitable` type `T` in a potentially lock-free, wait-free manor,
542 /// returning a timeout error if it is not available.
543 /// **This call may have side effects beyond merely returning the current state and must
544 /// not be considered the equivalent of a `test()` or `peek()` function.**
545 ///
546 /// This function should be overridden by `Awaitable` implementations that can offer a
547 /// streamlined version of `try_wait_for()` for hard-coded zero timeout.
548 fn try_wait0(&'a self) -> Result<Self::T, Self::Error> {
549 // The default implementation of this method is to just call `wait_for()` with a zero wait.
550 // The function should be overridden if a better alternative is possible.
551 self.try_wait_for(Duration::ZERO)
552 }
553
554 /// Blocks until the `Awaitable` type and its associated type `T` become available. Like
555 /// [`try_wait()`](Self::try_wait) but bypasses error handling.
556 ///
557 /// Only available if the `Awaitable` implementation implements `InfallibleUnboundedWait`, i.e.
558 /// does not return any errors except on timeout.
559 fn wait(&'a self) -> Self::T
560 where
561 Self::Error: sealed::InfallibleUnboundedWait,
562 {
563 self.try_wait()
564 .expect("try_wait() is not allowed to return TimeoutError!")
565 }
566
567 /// Attempts a bounded wait on the the `Awaitable` type. Like
568 /// [`try_wait_for()`](Self::try_wait_for) but returns `true` if the `Awaitable` was originally
569 /// available or if it became so within the specified duration and `false` otherwise.
570 ///
571 /// Only available if `Awaitable::Error` implements `InfallibleUnboundedWait` (i.e. does not
572 /// return any errors except on timeout) and has a void return type `T`.
573 fn wait_for(&'a self, limit: Duration) -> bool
574 where
575 Self: sealed::VoidAwaitable,
576 Self::Error: sealed::InfallibleUnboundedWait,
577 {
578 match self.try_wait_for(limit) {
579 Ok(_) => true,
580 Err(_) => false,
581 }
582 }
583
584 /// Attempts to obtain the `Awaitable` in a potentially lock-free, wait-free manner, returning a
585 /// timeout error if it's currently unavailable.
586 /// Like [`try_wait0()`](Self::try_wait0) but returns `true` if the `Awaitable` was
587 /// available and obtained or `false` otherwise.
588 ///
589 /// **This call may have side effects beyond merely returning the current state and must
590 /// not be considered the equivalent of a `test()` or `peek()` function.**
591 ///
592 /// Note that this may not be the same as calling [`Awaitable::wait_for()`] with a `Duration` of
593 /// zero, as the implementing type may use a different approach to ensure that the calling
594 /// thread does not block.
595 ///
596 /// Only available if `Awaitable:Error` implements `InfallibleUnboundedWait` (i.e. does not
597 /// return any errors except on timeout) and has a void return type `T`.
598 fn wait0(&'a self) -> bool
599 where
600 Self: sealed::VoidAwaitable,
601 Self::Error: sealed::InfallibleUnboundedWait,
602 {
603 match self.try_wait0() {
604 Ok(_) => true,
605 Err(_) => false,
606 }
607 }
608}
609
610/// An `AutoResetEvent` is a synchronization primitive that is functionally equivalent to an
611/// "awaitable boolean" and can be atomically waited upon and consumed to signal one and only one
612/// waiter at a time, thereby guaranteeing exclusive signalling. This is not unlike a
613/// multi-producer, multi-consumer non-broadcast `Channel<()>` with a buffer size of 1, except much
614/// more efficient and lightweight.
615///
616/// `AutoResetEvent` can be used to implement other synchronization objects such as mutexes and
617/// condition variables, but it is most appropriate for uses involving signalling between two or
618/// more threads. Unlike a [`ManualResetEvent`], an `AutoResetEvent`'s `set` state is selectively
619/// made visible to only one waiter at a time (including both past waiters currently in a
620/// suspended/parked state and future waiters that haven't yet made a call to `Awaitable::wait()` or
621/// similar).
622///
623/// When [`AutoResetEvent::set()`] is called, at most one thread blocked in a call to
624/// [`Awaitable::wait()`] will be let through: if a previously parked thread was awakened, then the
625/// event's state remains unset for all other past/future callers (until another call to
626/// `AutoResetEvent::set()`), but if no threads were previously parked waiting for this event to be
627/// signalled then only the next thread to call [`AutoResetEvent::wait()`] against this instance
628/// will be let through without blocking. Regardless of whether or not there are any threads
629/// currently waiting, the call to `set()` always returns immediately (i.e. it does not block until
630/// another thread attempts to obtain the event).
631///
632/// Auto-reset events are thread-safe and may be wrapped in an [`Arc`](std::sync::Arc) or declared
633/// as a static global to easily share access across threads.
634pub struct AutoResetEvent {
635 event: RawEvent,
636}
637
638impl AutoResetEvent {
639 /// Create a new [`AutoResetEvent`] that can be used to atomically signal one waiter at a time.
640 pub const fn new(state: EventState) -> AutoResetEvent {
641 Self {
642 event: RawEvent::new(match state {
643 EventState::Set => AVAILABLE_BIT,
644 EventState::Unset => 0,
645 }),
646 }
647 }
648
649 /// Triggers the underlying `RawEvent`, either releasing one suspended waiter or allowing one
650 /// future caller to exclusively obtain the event.
651 pub fn set(&self) {
652 self.event.set_one()
653 }
654
655 /// Set the state of the internal event to [`EventState::Unset`], regardless of its current
656 /// status.
657 pub fn reset(&self) {
658 self.event.reset()
659 }
660}
661
662impl Awaitable<'_> for AutoResetEvent {
663 type T = ();
664 type Error = TimeoutError;
665
666 /// Check if the event has been signalled, and if not, block waiting for it to be set. When the
667 /// event becomes available to this thread, its state is atomically set to
668 /// [`EventState::Unset`], allowing only this one waiter through until another call to
669 /// [`AutoResetEvent::set()`] is made.
670 fn try_wait(&self) -> Result<Self::T, Infallible> {
671 Ok(self.event.unlock_one())
672 }
673
674 /// Check if the event has been signalled, and if not, block for `limit` waiting for it to be set.
675 /// If and when the event becomes available, its state is atomically set to
676 /// [`EventState::Unset`] before this method returns, allowing only this one waiter through.
677 /*///
678 /// Returns `true` if the event was originally set or if it was signalled within the specified
679 /// duration, and `false` otherwise (i.e. the timeout elapsed without the event becoming set).*/
680 fn try_wait_for(&self, limit: Duration) -> Result<(), TimeoutError> {
681 if self.event.wait_one_for(limit) {
682 Ok(())
683 } else {
684 Err(TimeoutError)
685 }
686 }
687
688 /// "Wait" on the `AutoResetEvent` event without blocking, immediately returning `Ok` if the
689 /// event was signalled for this thread and `Err(TimeoutError)` if it wasn't set.
690 /// **This is _not_ a `peek()` function:** if the event's state was [`EventState::Set`], it is
691 /// atomically reset to [`EventState::Unset`], locking out all other waiters.
692 ///
693 /// Note that this is similar but not identical to calling [`AutoResetEvent::try_wait_for()`] with a
694 /// `Duration` of zero, as the calling thread never blocks or yields.
695 /*/// "Wait" on the `AutoResetEvent` event without blocking, immediately returning `true` if the
696 /// event was signalled for this thread and `false` if it wasn't set.
697 /// **This is _not_ a `peek()` function:** if the event's state was [`EventState::Set`], it is
698 /// atomically reset to [`EventState::Unset`], locking out all other callers.
699 ///
700 /// Note that this is similar but not identical to calling [`AutoResetEvent::wait_for()`] with a
701 /// `Duration` of zero, as the calling thread never blocks or yields.*/
702 fn try_wait0(&self) -> Result<(), TimeoutError> {
703 // In case of miri or if testing under ARM, make sure that a top-level wait0() call from
704 // outside the implementation code returns a deterministic result.
705 #[cfg(any(test, miri))]
706 return match self.event.test_try_unlock_one() {
707 true => Ok(()),
708 false => Err(TimeoutError),
709 };
710 #[cfg(not(any(test, miri)))]
711 return match self.event.try_unlock_one() {
712 true => Ok(()),
713 false => Err(TimeoutError),
714 };
715 }
716}
717
718/// A `ManualResetEvent` is an event type best understood as an "awaitable boolean" that efficiently
719/// synchronizes thread access to a shared state, allowing one or more threads to wait for a signal
720/// from one or more other threads, where the signal could have either occurred in the past or could
721/// come at any time in the future.
722///
723/// Unlike an [`AutoResetEvent`] which atomically allows one and only one waiter through each time
724/// the underlying `RawEvent` is set, a `ManualResetEvent` unparks all past waiters and allows
725/// all future waiters calling [`Awaitable::wait()`] to continue without blocking (until
726/// [`ManualResetEvent::reset()`] is called).
727///
728/// A `ManualResetEvent` is rarely appropriate for general purpose thread synchronization (à la
729/// condition variables and mutexes), where exclusive access to a protected critical section is
730/// usually desired, as if multiple threads are suspended/parked waiting for the event to be
731/// signalled and then [`ManualResetEvent::set()`] is called, _all_ of the suspended threads will be
732/// unparked and will resume. However, a `ManualResetEvent` shines when it comes to setting
733/// persistent state indicators, such as a globally-shared abort flag.
734///
735/// Manual-reset events are thread-safe and may be wrapped in an [`Arc`](std::sync::Arc) or declared
736/// as a static global to easily share access across threads.
737pub struct ManualResetEvent {
738 event: RawEvent,
739}
740
741impl ManualResetEvent {
742 /// Create a new [`ManualResetEvent`] with the initial [`EventState`] set to `state`.
743 pub const fn new(state: EventState) -> ManualResetEvent {
744 Self {
745 event: RawEvent::new(match state {
746 EventState::Set => AVAILABLE_BIT,
747 EventState::Unset => 0,
748 }),
749 }
750 }
751
752 /// Puts the [`ManualResetEvent`] into a set state, releasing all suspended waiters (if any)
753 /// and leaving the event set for future callers to [`ManualResetEvent::wait()`] and co.
754 pub fn set(&self) {
755 self.event.set_all()
756 }
757
758 /// Set the state of the [`ManualResetEvent`] to [`EventState::Unset`], regardless of its
759 /// current state. This will cause future calls to [`ManualResetEvent::wait()`] to block until
760 /// the event is set (via [`ManualResetEvent::set()`]).
761 pub fn reset(&self) {
762 self.event.reset()
763 }
764}
765
766impl Awaitable<'_> for ManualResetEvent {
767 type T = ();
768 type Error = TimeoutError;
769
770 /// Check if the underlying event is in a set state or wait for its state to become
771 /// [`EventState::Set`]. In contrast with [`AutoResetEvent::try_wait()`], the event's state is
772 /// not affected by this operation, i.e. it remains set for future callers even after this
773 /// function call returns (until a call to [`ManualResetEvent::reset()`] is made).
774 fn try_wait(&self) -> Result<(), Infallible> {
775 Ok(self.event.unlock_all())
776 }
777
778 /// Check if the underlying event is in a set state (and return immediately) or wait for it to
779 /// become set, up to the limit specified by the `Duration` parameter.
780 ///
781 /// Returns `Ok(())` if the event was initially set or if it became set within the timeout
782 /// specified, otherwise returns `Err(TimeoutError)` if the timeout elapsed with thet event
783 /// becoming available.
784 /*/// Returns `true` if the event was initially set or if it became set within the timeout
785 /// specified, otherwise returns `false` if the timeout elapsed without the event becoming
786 /// available.*/
787 fn try_wait_for(&self, limit: Duration) -> Result<(), TimeoutError> {
788 match self.event.wait_all_for(limit) {
789 true => Ok(()),
790 false => Err(TimeoutError),
791 }
792 }
793
794 /// Test if an event is available without blocking, returning `Err(TimeoutErr)` immediately if
795 /// it is not set.
796 ///
797 /// Note that this is not the same as calling [`ManualResetEvent::try_wait_for()`] with a
798 /// `Duration` of zero, as the calling thread never yields.
799 fn try_wait0(&self) -> Result<(), TimeoutError> {
800 match self.event.try_unlock_all() {
801 true => Ok(()),
802 false => Err(TimeoutError),
803 }
804 }
805}