rsevents/
lib.rs

1//! `rsevents` is an implementation of WIN32's auto- and manual-reset events for the rust world.
2//! Events are synchronization primitives (i.e. not implemented atop of mutexes) used to either
3//! create other synchronization primitives with or for implementing signalling between threads.
4//!
5//! Events come in two different flavors: [`AutoResetEvent`] and [`ManualResetEvent`]. Internally,
6//! both are implemented with the unsafe `RawEvent` and use the `parking_lot_core` crate to take
7//! care of efficiently suspending (parking) threads while they wait for an event to become
8//! signalled, and take care of memory coherence issues between the signalling and signalled
9//! threads.
10//!
11//! An event is a synchronization primitive that is functionally the equivalent of an awaitable
12//! boolean that allows for synchronization between threads. Unlike mutexes and condition variables
13//! which are most often used to restrict access to a critical section, events are more appropriate
14//! for efficiently signalling remote threads or waiting on a remote thread to change state - or for
15//! building your own synchronization types on top of something both light and easy to use.
16
17#![forbid(missing_docs)]
18
19use parking_lot_core as plc;
20use parking_lot_core::ParkResult;
21use std::convert::Infallible;
22use std::sync::atomic::{AtomicU8, Ordering};
23use std::time::{Duration, Instant};
24
25#[cfg(test)]
26mod tests;
27
28/// The event is available when this bit is set, otherwise it is unavailable.
29const AVAILABLE_BIT: u8 = 0x01;
30/// There are one or more threads waiting to obtain the event.
31const WAITING_BIT: u8 = 0x02;
32
33/// A wrapper around an atomic state that represents whether or not the event is available.
34/// This isn't pinned and it seems that pinning is unnecessary because the lock may be moved so long
35/// as it is not borrowed (for prior art, see rust's `src/sys/windows/locks/mutex.rs` which is
36/// similarly directly exposed without pinning/boxing to make a movable mutex.
37///
38/// The lowest two bits of the u8 state are used, 0b010 represents the WAITING bit which is set when
39/// a thread is parked or about to park, and 0b001 represents the AVAILABLE bit, set when the event
40/// is available and cleared otherwise.
41///
42/// The following combinations are possible:
43/// * 0b00: Unavailable
44///   The event is not available and no threads are waiting on it. It can be "fast set" without
45///   going through the plc lock.
46/// * 0b01: Available
47///   The event is available and there are no threads waiting on it. It can be "fast obtained"
48///   without going through the plc lock.
49/// * 0b10: Unavailable w/ Parked Waiters
50///   The event is unavailable and there are one or more threads parked or trying to park to wait
51///   for the event to become available. We must go through the plc lock to preferentially "give"
52///   the event to a waiting thread.
53/// * 0b11: Available w/ Parked Threads
54///   The event is available but there are parked threads waiting for it. This is not a valid state
55///   and no function should end with this being the quiescent state.
56///
57#[doc(hidden)]
58pub struct RawEvent(AtomicU8);
59
60/// A representation of the state of an event, which can either be `Set` (i.e. signalled,
61/// ready) or `Unset` (i.e. not ready).
62#[derive(Clone, Debug, PartialEq)]
63#[repr(u8)]
64pub enum EventState {
65    /// The event is available and call(s) to [`Awaitable::wait()`] will go through without
66    /// blocking, i.e. the event is signalled.
67    Set,
68    /// The event is unavailable and calls to [`Awaitable::wait()`] will block until the event
69    /// becomes set, i.e. the event is unsignalled.
70    Unset,
71}
72
73impl RawEvent {
74    const fn new(state: u8) -> RawEvent {
75        RawEvent(AtomicU8::new(state))
76    }
77
78    #[inline]
79    /// Attempts to exclusively obtain the event. Returns true upon success. Called internally by
80    /// [`suspend_one()`](Self::suspend_one) when determining if a thread should be parked/suspended
81    /// or if that's not necessary.
82    fn try_unlock_one(&self) -> bool {
83        // Obtains the event if it is both available and there are no threads waiting on it.
84        self.0.compare_exchange_weak(AVAILABLE_BIT, 0, Ordering::Acquire, Ordering::Relaxed)
85            .is_ok()
86    }
87
88    #[cfg(any(test, miri))]
89    /// This entry point is used to deterministically determine if the event could be obtained
90    /// without any spurious failures. We don't override the actual behavior of try_unlock_one() so
91    /// that any internal functions calling into it can still be tested against both normal and
92    /// spurious failure modes.
93    fn test_try_unlock_one(&self) -> bool {
94        self.0.compare_exchange(AVAILABLE_BIT, 0, Ordering::Acquire, Ordering::Relaxed)
95            .is_ok()
96    }
97
98    #[inline]
99    /// Attempts to obtain the event (without locking out future callers). Returns true upon success.
100    fn try_unlock_all(&self) -> bool {
101        // Obtains the event if it is available, with no preconditions.
102        (self.0.load(Ordering::Acquire) & AVAILABLE_BIT) != 0
103    }
104
105    /// Parks the calling thread until the underlying event has unlocked. If the event is set during
106    /// this call, the park aborts/returns early so that no event sets are missed. Consumes the
107    /// event's set state in case of early return.
108    ///
109    /// Returns `true` only if the thread has obtained the thread, otherwise returns `false` (only
110    /// possible in the case of a timeout).
111    unsafe fn suspend_one(&self, timeout: Option<Duration>) -> bool {
112        let timeout = timeout.map(|duration| Instant::now() + duration);
113        let mut state = self.0.load(Ordering::Relaxed);
114        loop {
115            // The only way a thread can obtain the event _before_ it is parked/put to sleep is to
116            // check on the state before setting the WAITING bit for itself, otherwise it can't know
117            // if there are any other threads waiting so it can't clear the WAITING bit, and if it
118            // can't clear the WAITING bit, it can't obtain the event.
119            if (state & AVAILABLE_BIT) != 0 {
120                // The lock is available; try to obtain it even if the WAITING bit is set by
121                // another thread.
122                match self.0.compare_exchange_weak(
123                    state, state & !AVAILABLE_BIT, Ordering::Acquire, Ordering::Relaxed,
124                ) {
125                    Ok(_) => {
126                        // The lock was obtained; there may or may not be other threads suspended.
127                        return true;
128                    }
129                    Err(s) => {
130                        // Another thread contended with this call, loop to try again.
131                        state = s;
132                        continue;
133                    }
134                }
135            } else if (state & WAITING_BIT) == 0 {
136                // There are no other threads waiting, so we need to set the WAITING bit ourselves
137                // before we try to park the thread.
138                match self.0.compare_exchange_weak(
139                    state, state | WAITING_BIT, Ordering::Relaxed, Ordering::Relaxed,
140                ) {
141                    Ok(_) => {
142                        // We set the WAITING bit and can continue with attempting to park this
143                        // thread.
144                    }
145                    Err(s) => {
146                        // Another thread contended with this call, loop to try again.
147                        state = s;
148                        continue;
149                    }
150                }
151            } else {
152                // The event isn't available and another thread has already marked it as pending, so
153                // we are good to go.
154            }
155
156            // This callback is run with the plc queue locked, before the thread is parked. If it
157            // returns false, the park is aborted. We can't opportunistically claim the event here
158            // even if it is available, because we don't know if there are other suspended threads,
159            // which means we can't clear the WAITING bit in case we were the last thread.
160            let before_suspend = || -> bool {
161                self.0.load(Ordering::Relaxed) == WAITING_BIT
162            };
163
164            // This callback is run with the plc queue locked, which is the only time we can safely
165            // clear the WAITING bit (because `before_suspend` checks the WAITING bit with the queue
166            // locked as well), making it possible to abort/retry the park() call if there's any
167            // contention.
168            let on_timeout = |_, last_thread| {
169                if last_thread {
170                    self.0.fetch_and(!WAITING_BIT, Ordering::Relaxed);
171                }
172            };
173
174            match plc::park(
175                self as *const RawEvent as usize, // key for keyed event
176                before_suspend,
177                || {}, // callback before parking, run after queue is unlocked
178                on_timeout,
179                plc::DEFAULT_PARK_TOKEN,
180                timeout,
181            ) {
182                // before_suspend() detected a change in the state that indicates the lock may have
183                // become available (or the WAITING bit could have been cleared because another
184                // thread, which was the last actually parked thread, was awoken).
185                // Loop to retry so we never miss a set() call.
186                ParkResult::Invalid => state = self.0.load(Ordering::Relaxed),
187                // The timeout was reached before the thread could be obtained.
188                ParkResult::TimedOut => return false,
189                // The thread was awoken by another thread calling into unlock_one().
190                ParkResult::Unparked(_) => return true,
191            }
192        }
193    }
194
195    /// Parks the calling thread until the underlying event has unlocked. If the event is set during
196    /// this call, the park aborts/returns early so that no event sets are missed. Unlike
197    /// [`suspend_one()`](Self::suspend_one), does not consume the event's set state in case of
198    /// early return.
199    unsafe fn suspend_all(&self, timeout: Option<Duration>) -> bool {
200        let timeout = timeout.map(|duration| Instant::now() + duration);
201        let mut state = self.0.load(Ordering::Relaxed);
202        loop {
203            // The only way a thread can obtain the event _before_ it is parked/put to sleep is to
204            // check on the state before setting the WAITING bit for itself, otherwise it can't know
205            // if there are any other threads waiting so it can't clear the WAITING bit, and if it
206            // can't clear the WAITING bit, it can't directly obtain the event for itself.
207            if (state & AVAILABLE_BIT) != 0 {
208                // The event has become available. We can return right away; we don't care about
209                // anything else.
210                return true;
211            } else if (state & WAITING_BIT) == 0 {
212                // There are no other threads waiting, so we need to set the WAITING bit ourselves
213                // before we try to park the thread.
214                match self.0.compare_exchange_weak(
215                    state, state | WAITING_BIT, Ordering::Relaxed, Ordering::Relaxed,
216                ) {
217                    Ok(_) => {
218                        // We set the WAITING bit without contention and can move on to trying to
219                        // park this thread.
220                    }
221                    Err(s) => {
222                        // Another thread contended with this call, loop to try again.
223                        state = s;
224                        continue;
225                    }
226                }
227            } else {
228                // The event isn't available and another thread has already marked it as pending, so
229                // we are good to go.
230            }
231
232            // This callback is run with the plc queue locked, before the thread is parked. If it
233            // returns false, the park is aborted. We can't opportunistically claim the event here
234            // even if it is available, because we don't know if there are other suspended threads,
235            // which means we can't clear the WAITING bit in case we were the last thread.
236            let before_suspend = || -> bool {
237                self.0.load(Ordering::Relaxed) == WAITING_BIT
238            };
239
240            // This callback is run with the plc queue locked, which is the only time we can safely
241            // clear the WAITING bit (because `before_suspend` checks the WAITING bit with the queue
242            // locked as well), making it possible to abort/retry the park() call if there's any
243            // contention.
244            let mut timeout_result = false;
245            let on_timeout = |_, last_thread| {
246                if last_thread {
247                    // self.0.fetch_and(!WAITING_BIT, Ordering::Relaxed);
248                    if (self.0.swap(0, Ordering::Relaxed) & AVAILABLE_BIT) != 0 {
249                        timeout_result = true;
250                    }
251                }
252            };
253
254            match plc::park(
255                self as *const RawEvent as usize, // key for keyed event
256                before_suspend,
257                || {}, // callback before parking, run after queue is unlocked
258                on_timeout,
259                plc::DEFAULT_PARK_TOKEN,
260                timeout,
261            ) {
262                // before_suspend() detected a change in the state that indicates the lock may have
263                // become available (or the WAITING bit could have been cleared because another
264                // thread, which was the last actually parked thread, was awoken).
265                // Loop to retry so we never miss a set() call.
266                ParkResult::Invalid => state = self.0.load(Ordering::Relaxed),
267                // The timeout was reached before the thread could be obtained.
268                ParkResult::TimedOut => return timeout_result,
269                // The thread was awoken by another thread calling into unlock_all().
270                ParkResult::Unparked(_) => return true,
271            }
272        }
273    }
274
275    /// Trigger the event, releasing one waiter
276    fn set_one(&self) {
277        // Optimize for cases where the event wasn't available and isn't under any contention.
278        // NOTE: This makes calling set() on an already set event more expensive. This match block
279        // can be replaced with `self.0.load(Ordering::Relaxed)` to bypass this optimization.
280        let mut state = match self.0.compare_exchange(
281            0, AVAILABLE_BIT, Ordering::Release, Ordering::Relaxed,
282        ) {
283            Ok(_) => return,
284            Err(s) => s,
285        };
286
287        loop {
288            match state {
289                0b00 => {
290                    // There are no parked/suspended threads so we are able to "fast set" the event
291                    // without worrying about synchronizing with threads parked or about to park.
292                    match self.0.compare_exchange_weak(
293                        0, AVAILABLE_BIT, Ordering::Release, Ordering::Relaxed,
294                    ) {
295                        Ok(_) => return,
296                        Err(s) => {
297                            // We raced with a thread trying to park or another call to set(). Loop
298                            // to figure out what happened.
299                            state = s;
300                            continue;
301                        }
302                    }
303                }
304                0b01 => {
305                    // This was a call to set_one() on an event that was already set; we don't need to
306                    // "do" anything but we need to touch the shared memory location to ensure
307                    // memory ordering.
308                    //
309                    // It may be possible to forego this, but I'm not sure if that's wise. It is true
310                    // that a thread awoken after two back-to-back set() calls is guaranteed to see at
311                    // least _something_ new without an explicit Release here, but there's no guarantee
312                    // that there will ever be any more set() calls afterwards, meaning whatever was
313                    // written by the thread making the second set() call may never wind up being
314                    // observed by a thread that fast-obtains the event in a wait() call.
315                    match self.0.compare_exchange_weak(
316                        state, state, Ordering::Release, Ordering::Relaxed,
317                    ) {
318                        Ok(_) => return,
319                        Err(s) => {
320                            state = s;
321                            continue;
322                        }
323                    }
324                }
325                0b10 => {
326                    // A thread is waiting to obtain this event, so we can't fast set it and must
327                    // instead go through the plc queue lock.
328                    break;
329                }
330                0b11 => {
331                    // This shouldn't happen but it's hard to guarantee because of the interplay
332                    // between concurrent `set_one()` calls at the same time as a `suspend_one()`
333                    // call attempts to park a thread.
334                    #[cfg(any(test, miri))]
335                    assert!(false, "AVAILABLE and WAITING bits set!");
336                    break;
337                }
338                _ => {
339                    // We only use the lowest two bits of the AtomicU8 state
340                    unsafe { core::hint::unreachable_unchecked() }
341                }
342            }
343        }
344
345        unsafe {
346            // The unpark callback happens with the plc queue locked, so we guarantee that the logic
347            // here happens either completely before or completely after the logic in the callback
348            // passed to plc::park() in suspend_one().
349            plc::unpark_one(self as *const RawEvent as usize, |unpark_result| {
350                // This has to be done here with the plc queue locked so that a simultaneous call
351                // into plc::park() will not suspend a thread after we've tried unfruitfully to
352                // awaken one but before we've had a chance to set the internal state, causing the
353                // set_one() call to be missed.
354
355                if unpark_result.unparked_threads == 0 {
356                    // This can happen if there were two simultaneous calls to set_one() with only
357                    // one thread parked or if there were no threads parked but a thread trying to
358                    // park (which then didn't happen when we changed the state above and it failed
359                    // the park validation callback).
360                    // It's not only safe but actually required to stomp the WAITING bit because we
361                    // have the plc queue locked - contending threads (about to park but not yet
362                    // parked) will retry. If we don't stomp the WAITING bit and there was only one
363                    // thread _about_ to park but not yet parked, it would loop after the park
364                    // validation callback failed, but the WAITING bit wouldn't have been cleared
365                    // and it won't be able to obtain the event.
366                    self.0.store(AVAILABLE_BIT, Ordering::Release);
367                } else if !unpark_result.have_more_threads {
368                    // Clear the WAITING bit. We can stomp the AVAILABLE bit because until we clear
369                    // the WAITING bit, the AVAILABLE bit can only be set in the plc::unpark_one()
370                    // callback w/ the queue locked.
371                    // Clearing the WAITING bit makes it possible for the next call to set_one() to
372                    // fast-set the event without going through the plc lock and triggers a retry in
373                    // any threads trying to park.
374                    self.0.store(0, Ordering::Release);
375                } else {
376                    // One thread was unparked but there are others still waiting. Subsequent
377                    // set_one() calls will still need to go through the plc lock.
378                    // We need to write to the shared memory address to guarantee Release semantics,
379                    // and we can stomp the AVAILABLE bit since it can only be set with the plc lock
380                    // held once the WAITING bit is asserted.
381                    self.0.store(WAITING_BIT, Ordering::Release);
382                }
383                plc::DEFAULT_UNPARK_TOKEN
384            })
385        };
386    }
387
388    /// Trigger the event, releasing all waiters
389    fn set_all(&self) {
390        // Stomp the WAITING bit (if set) so that no other thread wastes time trying to unpark
391        // threads since we're going to unlock them all.
392        let prev_state = self.0.swap(AVAILABLE_BIT, Ordering::Release);
393        if (prev_state & WAITING_BIT) == 0 {
394            // No threads were suspended/about to be suspended so we can just return. Or maybe there
395            // _are_ other threads suspended but we raced with a simultaneous call into set_all()
396            // and that other thread is going to handle waking them.
397            return;
398        }
399
400        let _unparked = unsafe {
401            plc::unpark_all(self as *const RawEvent as usize, plc::DEFAULT_UNPARK_TOKEN)
402        };
403
404        // NOTE: _unparked may equal zero if there were no threads fully parked but there was a
405        // thread _about_ to park until changing the state above caused its validation callback to
406        // fail and then on retry they just obtained the available lock and returned.
407    }
408
409    fn unlock_one(&self) {
410        if !self.try_unlock_one() {
411            unsafe {
412                self.suspend_one(None);
413            }
414        }
415    }
416
417    fn unlock_all(&self) {
418        if !self.try_unlock_all() {
419            unsafe {
420                self.suspend_all(None);
421            }
422        }
423    }
424
425    /// Put the event in a locked (reset) state.
426    fn reset(&self) {
427        // Clear the AVAILABLE bit without touching the WAITING bit.
428        // Calling reset() does not imply any strict ordering of code before or after a matching
429        // (try_)unlock() call, so we use Relaxed semantics.
430        self.0.fetch_and(!AVAILABLE_BIT, Ordering::Relaxed);
431    }
432
433    fn wait_one_for(&self, limit: Duration) -> bool {
434        if self.try_unlock_one() {
435            return true;
436        }
437
438        unsafe {self.suspend_one(Some(limit)) }
439    }
440
441    fn wait_all_for(&self, limit: Duration) -> bool {
442        if self.try_unlock_all() {
443            return true;
444        }
445
446        unsafe { self.suspend_all(Some(limit)) }
447    }
448}
449
450#[doc(hidden)]
451/// This is for backwards compatibility with earlier `rsevents` releases, which used the less
452/// specific (and much more likely to conflict) name `State` instead of `EventState`.
453pub type State = EventState;
454
455/// The default error returned by types implementing [`Awaitable`], with the only possible error
456/// being a timeout to a bounded `wait()` call.
457///
458/// When `Awaitable<Error = TimeoutError>`, a simpler `Awaitable` api bypassing error handling is
459/// exposed.
460#[derive(Debug, Copy, Clone)]
461pub struct TimeoutError;
462
463impl std::fmt::Display for TimeoutError {
464    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
465        f.write_str("The wait call timed out")
466    }
467}
468
469impl std::error::Error for TimeoutError {}
470
471impl AwaitableError for TimeoutError {
472    type UnboundedError = std::convert::Infallible;
473}
474
475impl std::convert::From<Infallible> for TimeoutError {
476    fn from(_: Infallible) -> Self {
477        unsafe { core::hint::unreachable_unchecked() }
478    }
479}
480
481mod sealed {
482    use crate::{Awaitable, AwaitableError};
483
484    pub trait InfallibleUnboundedWait {}
485    impl<E> InfallibleUnboundedWait for E where
486        E: AwaitableError<UnboundedError = std::convert::Infallible>
487    {
488    }
489
490    pub trait VoidAwaitable {}
491    impl<T, E> VoidAwaitable for T
492    where
493        T: for<'a> Awaitable<'a, T = (), Error = E>,
494        E: std::error::Error,
495    {
496    }
497}
498
499/// Denotes the error returned by an [`Awaitable`] object for its various wait calls, separating
500/// between internal errors preventing the wait from succeeding (e.g. a poison error) and errors due
501/// only to a timeout.
502///
503/// Types implementing `Awaitable<T = (), Error = TimeoutError>` unlock a much simpler `Awaitable`
504/// api for end users, that omits error handling and replaces timeout errors with boolean results.
505pub trait AwaitableError: std::error::Error {
506    /// The error type that may result from a call to an unbounded [`Awaitable::try_wait()`] call
507    /// (i.e. excluding any timeout errors).
508    ///
509    /// Using `std::convert::Infallible` here will unlock a simpler `Awaitable` API for end users,
510    /// with an infallible [`Awaitable::wait()`] becoming available. Typically use `Self` otherwise
511    /// to denote that `wait()` and `wait_for()` return the same error type.
512    ///
513    /// It is recommended - but not required - to implement or otherwise satisfy the constraint
514    /// `From<E::UnboundedError>` for `E` where `E` implements `AwaitableError`.
515    type UnboundedError: std::error::Error;
516}
517
518/// The basic interface for waiting on void awaitable types
519///
520/// This is a unified trait that is used by `rsevents` and downstream dependent crates implementing
521/// synchronization primitives atop of `rsevents` to expose a single interface for waiting on an
522/// object either indefinitely or for a bounded length of time.
523///
524/// Types implementing `Awaitable<T = (), Error = TimeoutError>` unlock a much simpler `Awaitable`
525/// api for end users, that omits error handling and replaces timeout errors with boolean results.
526pub trait Awaitable<'a> {
527    /// The type yielded by the Awaitable type on a successful wait
528    type T;
529    /// The type yielded by the Awaitable type in case of an error, also specifying whether or not
530    /// an unbounded `Awaitable::wait()` returns any error at all.
531    type Error: AwaitableError;
532
533    /// Waits on the `Awaitable` type, blocking efficiently until it becomes available. Returns the
534    /// awaited type `T` (if it isn't `()`) or an error indicating a wait issue. Does not time out.
535    fn try_wait(&'a self) -> Result<Self::T, <Self::Error as AwaitableError>::UnboundedError>;
536
537    /// Waits on the `Awaitable` type until it becomes available or the timeout period described by
538    /// `limit` elapses, in which case a timeout error is returned.
539    fn try_wait_for(&'a self, limit: Duration) -> Result<Self::T, Self::Error>;
540
541    /// Attempt to obtain the `Awaitable` type `T` in a potentially lock-free, wait-free manor,
542    /// returning a timeout error if it is not available.
543    /// **This call may have side effects beyond merely returning the current state and must
544    /// not be considered the equivalent of a `test()` or `peek()` function.**
545    ///
546    /// This function should be overridden by `Awaitable` implementations that can offer a
547    /// streamlined version of `try_wait_for()` for hard-coded zero timeout.
548    fn try_wait0(&'a self) -> Result<Self::T, Self::Error> {
549        // The default implementation of this method is to just call `wait_for()` with a zero wait.
550        // The function should be overridden if a better alternative is possible.
551        self.try_wait_for(Duration::ZERO)
552    }
553
554    /// Blocks until the `Awaitable` type and its associated type `T` become available. Like
555    /// [`try_wait()`](Self::try_wait) but bypasses error handling.
556    ///
557    /// Only available if the `Awaitable` implementation implements `InfallibleUnboundedWait`, i.e.
558    /// does not return any errors except on timeout.
559    fn wait(&'a self) -> Self::T
560    where
561        Self::Error: sealed::InfallibleUnboundedWait,
562    {
563        self.try_wait()
564            .expect("try_wait() is not allowed to return TimeoutError!")
565    }
566
567    /// Attempts a bounded wait on the the `Awaitable` type. Like
568    /// [`try_wait_for()`](Self::try_wait_for) but returns `true` if the `Awaitable` was originally
569    /// available or if it became so within the specified duration and `false` otherwise.
570    ///
571    /// Only available if `Awaitable::Error` implements `InfallibleUnboundedWait` (i.e. does not
572    /// return any errors except on timeout) and has a void return type `T`.
573    fn wait_for(&'a self, limit: Duration) -> bool
574    where
575        Self: sealed::VoidAwaitable,
576        Self::Error: sealed::InfallibleUnboundedWait,
577    {
578        match self.try_wait_for(limit) {
579            Ok(_) => true,
580            Err(_) => false,
581        }
582    }
583
584    /// Attempts to obtain the `Awaitable` in a potentially lock-free, wait-free manner, returning a
585    /// timeout error if it's currently unavailable.
586    /// Like [`try_wait0()`](Self::try_wait0) but returns `true` if the `Awaitable` was
587    /// available and obtained or `false` otherwise.
588    ///
589    /// **This call may have side effects beyond merely returning the current state and must
590    /// not be considered the equivalent of a `test()` or `peek()` function.**
591    ///
592    /// Note that this may not be the same as calling [`Awaitable::wait_for()`] with a `Duration` of
593    /// zero, as the implementing type may use a different approach to ensure that the calling
594    /// thread does not block.
595    ///
596    /// Only available if `Awaitable:Error` implements `InfallibleUnboundedWait` (i.e. does not
597    /// return any errors except on timeout) and has a void return type `T`.
598    fn wait0(&'a self) -> bool
599    where
600        Self: sealed::VoidAwaitable,
601        Self::Error: sealed::InfallibleUnboundedWait,
602    {
603        match self.try_wait0() {
604            Ok(_) => true,
605            Err(_) => false,
606        }
607    }
608}
609
610/// An `AutoResetEvent` is a synchronization primitive that is functionally equivalent to an
611/// "awaitable boolean" and can be atomically waited upon and consumed to signal one and only one
612/// waiter at a time, thereby guaranteeing exclusive signalling. This is not unlike a
613/// multi-producer, multi-consumer non-broadcast `Channel<()>` with a buffer size of 1, except much
614/// more efficient and lightweight.
615///
616/// `AutoResetEvent` can be used to implement other synchronization objects such as mutexes and
617/// condition variables, but it is most appropriate for uses involving signalling between two or
618/// more threads. Unlike a [`ManualResetEvent`], an `AutoResetEvent`'s `set` state is selectively
619/// made visible to only one waiter at a time (including both past waiters currently in a
620/// suspended/parked state and future waiters that haven't yet made a call to `Awaitable::wait()` or
621/// similar).
622///
623/// When [`AutoResetEvent::set()`] is called, at most one thread blocked in a call to
624/// [`Awaitable::wait()`] will be let through: if a previously parked thread was awakened, then the
625/// event's state remains unset for all other past/future callers (until another call to
626/// `AutoResetEvent::set()`), but if no threads were previously parked waiting for this event to be
627/// signalled then only the next thread to call [`AutoResetEvent::wait()`] against this instance
628/// will be let through without blocking. Regardless of whether or not there are any threads
629/// currently waiting, the call to `set()` always returns immediately (i.e. it does not block until
630/// another thread attempts to obtain the event).
631///
632/// Auto-reset events are thread-safe and may be wrapped in an [`Arc`](std::sync::Arc) or declared
633/// as a static global to easily share access across threads.
634pub struct AutoResetEvent {
635    event: RawEvent,
636}
637
638impl AutoResetEvent {
639    /// Create a new [`AutoResetEvent`] that can be used to atomically signal one waiter at a time.
640    pub const fn new(state: EventState) -> AutoResetEvent {
641        Self {
642            event: RawEvent::new(match state {
643                EventState::Set => AVAILABLE_BIT,
644                EventState::Unset => 0,
645            }),
646        }
647    }
648
649    /// Triggers the underlying `RawEvent`, either releasing one suspended waiter or allowing one
650    /// future caller to exclusively obtain the event.
651    pub fn set(&self) {
652        self.event.set_one()
653    }
654
655    /// Set the state of the internal event to [`EventState::Unset`], regardless of its current
656    /// status.
657    pub fn reset(&self) {
658        self.event.reset()
659    }
660}
661
662impl Awaitable<'_> for AutoResetEvent {
663    type T = ();
664    type Error = TimeoutError;
665
666    /// Check if the event has been signalled, and if not, block waiting for it to be set. When the
667    /// event becomes available to this thread, its state is atomically set to
668    /// [`EventState::Unset`], allowing only this one waiter through until another call to
669    /// [`AutoResetEvent::set()`] is made.
670    fn try_wait(&self) -> Result<Self::T, Infallible> {
671        Ok(self.event.unlock_one())
672    }
673
674    /// Check if the event has been signalled, and if not, block for `limit` waiting for it to be set.
675    /// If and when the event becomes available, its state is atomically set to
676    /// [`EventState::Unset`] before this method returns, allowing only this one waiter through.
677    /*///
678    /// Returns `true` if the event was originally set or if it was signalled within the specified
679    /// duration, and `false` otherwise (i.e. the timeout elapsed without the event becoming set).*/
680    fn try_wait_for(&self, limit: Duration) -> Result<(), TimeoutError> {
681        if self.event.wait_one_for(limit) {
682            Ok(())
683        } else {
684            Err(TimeoutError)
685        }
686    }
687
688    /// "Wait" on the `AutoResetEvent` event without blocking, immediately returning `Ok` if the
689    /// event was signalled for this thread and `Err(TimeoutError)` if it wasn't set.
690    /// **This is _not_ a `peek()` function:** if the event's state was [`EventState::Set`], it is
691    /// atomically reset to [`EventState::Unset`], locking out all other waiters.
692    ///
693    /// Note that this is similar but not identical to calling [`AutoResetEvent::try_wait_for()`] with a
694    /// `Duration` of zero, as the calling thread never blocks or yields.
695    /*/// "Wait" on the `AutoResetEvent` event without blocking, immediately returning `true` if the
696    /// event was signalled for this thread and `false` if it wasn't set.
697    /// **This is _not_ a `peek()` function:** if the event's state was [`EventState::Set`], it is
698    /// atomically reset to [`EventState::Unset`], locking out all other callers.
699    ///
700    /// Note that this is similar but not identical to calling [`AutoResetEvent::wait_for()`] with a
701    /// `Duration` of zero, as the calling thread never blocks or yields.*/
702    fn try_wait0(&self) -> Result<(), TimeoutError> {
703        // In case of miri or if testing under ARM, make sure that a top-level wait0() call from
704        // outside the implementation code returns a deterministic result.
705        #[cfg(any(test, miri))]
706        return match self.event.test_try_unlock_one() {
707            true => Ok(()),
708            false => Err(TimeoutError),
709        };
710        #[cfg(not(any(test, miri)))]
711        return match self.event.try_unlock_one() {
712            true => Ok(()),
713            false => Err(TimeoutError),
714        };
715    }
716}
717
718/// A `ManualResetEvent` is an event type best understood as an "awaitable boolean" that efficiently
719/// synchronizes thread access to a shared state, allowing one or more threads to wait for a signal
720/// from one or more other threads, where the signal could have either occurred in the past or could
721/// come at any time in the future.
722///
723/// Unlike an [`AutoResetEvent`] which atomically allows one and only one waiter through each time
724/// the underlying `RawEvent` is set, a `ManualResetEvent` unparks all past waiters and allows
725/// all future waiters calling [`Awaitable::wait()`] to continue without blocking (until
726/// [`ManualResetEvent::reset()`] is called).
727///
728/// A `ManualResetEvent` is rarely appropriate for general purpose thread synchronization (à la
729/// condition variables and mutexes), where exclusive access to a protected critical section is
730/// usually desired, as if multiple threads are suspended/parked waiting for the event to be
731/// signalled and then [`ManualResetEvent::set()`] is called, _all_ of the suspended threads will be
732/// unparked and will resume. However, a `ManualResetEvent` shines when it comes to setting
733/// persistent state indicators, such as a globally-shared abort flag.
734///
735/// Manual-reset events are thread-safe and may be wrapped in an [`Arc`](std::sync::Arc) or declared
736/// as a static global to easily share access across threads.
737pub struct ManualResetEvent {
738    event: RawEvent,
739}
740
741impl ManualResetEvent {
742    /// Create a new [`ManualResetEvent`] with the initial [`EventState`] set to `state`.
743    pub const fn new(state: EventState) -> ManualResetEvent {
744        Self {
745            event: RawEvent::new(match state {
746                EventState::Set => AVAILABLE_BIT,
747                EventState::Unset => 0,
748            }),
749        }
750    }
751
752    /// Puts the [`ManualResetEvent`] into a set state, releasing all suspended waiters (if any)
753    /// and leaving the event set for future callers to [`ManualResetEvent::wait()`] and co.
754    pub fn set(&self) {
755        self.event.set_all()
756    }
757
758    /// Set the state of the [`ManualResetEvent`] to [`EventState::Unset`], regardless of its
759    /// current state. This will cause future calls to [`ManualResetEvent::wait()`] to block until
760    /// the event is set (via [`ManualResetEvent::set()`]).
761    pub fn reset(&self) {
762        self.event.reset()
763    }
764}
765
766impl Awaitable<'_> for ManualResetEvent {
767    type T = ();
768    type Error = TimeoutError;
769
770    /// Check if the underlying event is in a set state or wait for its state to become
771    /// [`EventState::Set`]. In contrast with [`AutoResetEvent::try_wait()`], the event's state is
772    /// not affected by this operation, i.e. it remains set for future callers even after this
773    /// function call returns (until a call to [`ManualResetEvent::reset()`] is made).
774    fn try_wait(&self) -> Result<(), Infallible> {
775        Ok(self.event.unlock_all())
776    }
777
778    /// Check if the underlying event is in a set state (and return immediately) or wait for it to
779    /// become set, up to the limit specified by the `Duration` parameter.
780    ///
781    /// Returns `Ok(())` if the event was initially set or if it became set within the timeout
782    /// specified, otherwise returns `Err(TimeoutError)` if the timeout elapsed with thet event
783    /// becoming available.
784    /*/// Returns `true` if the event was initially set or if it became set within the timeout
785    /// specified, otherwise returns `false` if the timeout elapsed without the event becoming
786    /// available.*/
787    fn try_wait_for(&self, limit: Duration) -> Result<(), TimeoutError> {
788        match self.event.wait_all_for(limit) {
789            true => Ok(()),
790            false => Err(TimeoutError),
791        }
792    }
793
794    /// Test if an event is available without blocking, returning `Err(TimeoutErr)` immediately if
795    /// it is not set.
796    ///
797    /// Note that this is not the same as calling [`ManualResetEvent::try_wait_for()`] with a
798    /// `Duration` of zero, as the calling thread never yields.
799    fn try_wait0(&self) -> Result<(), TimeoutError> {
800        match self.event.try_unlock_all() {
801            true => Ok(()),
802            false => Err(TimeoutError),
803        }
804    }
805}