async_condvar_fair/lib.rs
1// Copyright Ian Jackson and contributors to Rust async-condvar-fair
2// SPDX-License-Identifier: GPL-3.0-or-later
3// There is NO WARRANTY.
4
5#![forbid(unsafe_code)]
6#![allow(clippy::needless_lifetimes)] // these sometimes make things clearer
7#![allow(clippy::option_map_unit_fn)] // suggestion is in poor taste
8
9//! [`wait`]: Condvar::wait
10//! [`wait_baton`]: Condvar::wait_baton
11//! [`wait_no_relock`]: Condvar::wait_no_relock
12//! [`notify_one`]: Condvar::notify_one
13//! [`RelockMutexGuard`]: trait@RelockMutexGuard
14//! [`Arc`]: std::sync::Arc
15#![doc = include_str!("../README.md")]
16
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20use std::task::{Context, Waker, Poll, Poll::*};
21
22use pin_project_lite::pin_project;
23
24#[cfg(test)]
25mod test;
26
27// ---------- public structs ----------
28
29/// **Condition variable (for async)**
30///
31/// For background information about of the semantics of a condition
32/// variable, see [Wikipedia](https://en.wikipedia.org/wiki/Condition_variable#Condition_variables_2) and/or [`std::sync::Condvar`].
33///
34/// Unlike `std`'s condition variables, `async-condvar-fair`'s do not
35/// block the current thread, only the current async task. Also,
36/// multiple condition variables may be freely used with multiple
37/// different mutexes.
38///
39/// Like all condition variables, `async-condvar-wait`'s may generate
40/// spurious wakeups. After completing a wait, you must re-check the
41/// actual condition in your data structure, not simply assume that it
42/// must be ready for you.
43#[derive(Debug,Default)]
44pub struct Condvar(parking_lot::Mutex<CV>);
45
46pin_project!{
47/// Future for waiting, from [`wait_baton`]
48///
49/// See [`wait_baton`] for information about the results of awaiting this.
50///
51/// This type definition could be useful if a named type is wanted.
52///
53/// [`wait_baton`]: Condvar::wait_baton
54pub struct Waiter<'c,G> where G: RelockMutexGuard {
55 #[pin] waitstate: WaitState<'c,G>,
56}}
57
58/// Obligation to do work, or notify someone else to do it
59///
60/// # Task cancellation in async Rust
61///
62/// In async Rust, futures can be cancelled at any `await` point, and
63/// simply discarded, if whatever was waiting for them loses interest.
64/// From the point of view of a reader of the `async` block, this is
65/// as if the code simply stopped running at some `await` point, for
66/// reasons outside of its own control, and discarded all of its state.
67///
68/// When `notify_one` is being used, there is therefore a risk that
69/// the waiting task that `notify_one` chooses to wake up gets
70/// cancelled before it is able to do the work that the notifier
71/// intended.
72///
73/// (This risk only arises if the process of responding to the
74/// notification might `await`. In that case you will also want to be
75/// using an async mutex, since it is generally forbidden to
76/// `await` with a sync mutex held.)
77///
78/// `Baton` helps with this risk. `Option<Baton>` is returned by
79/// `wait_baton`, and should be kept until the work is completed, and
80/// then [`dispose`]d.
81///
82/// If the `Baton` is simply dropped (for example, due to task
83/// cancellation), the condvar will be re-notified.
84///
85/// # How to handle a `Baton`
86///
87/// Use [`wait_baton`] rather than plain [`wait`]. When `wait_baton`
88/// completes, keep the baton while you do whatever work there is to
89/// be done.
90///
91/// After having done the necessary work, as the caller of
92/// `notify_one` was expecting, call `Baton::dispose`.
93///
94/// # Infinite loop, or even livelock, risk
95///
96/// It is important to `dispose` of the baton even if your processing
97/// suffers a (possibly persistent) error. If you accidentally drop the
98/// baton (eg on an error path), another task will be woken up and
99/// perhaps perform the same failing actions, leading to the program
100/// looping uselessly, eating cpu.
101///
102/// Depending on your runtime's scheduler, that might even be a
103/// livelock.
104///
105/// [`dispose`]: Baton::dispose
106/// [`wait_baton`]: Condvar::wait_baton
107/// [`wait`]: Condvar::wait
108#[derive(Debug)]
109pub struct Baton<'c> {
110 condvar: Option<&'c Condvar>, // always Some other than in discard()
111}
112// In the implementation there are not just real `Baton` but
113// also other notional battons:
114//
115// * An `Entry` which is `Signaled`.
116// * A `Waiter` which is `Locking` and has `baton=true`.
117//
118// We arrange `Drop` impls, and the code handling these situations, to
119// always pass on the baton.
120
121impl Condvar {
122 pub fn new() -> Self { Default::default() }
123}
124
125// ---------- RelockMutexRef trait ----------
126
127/// Lock guards that can be unlocked and relocked
128///
129/// # Purpose
130///
131/// [`Condvar::wait_baton`] and [`wait`] need to unlock and then
132/// relock the mutex. So, they need to be able to recover `&Mutex`
133/// from the guard.
134///
135/// # Regaining the mutex ref
136///
137/// If the lock in use doesn't support this (e.g., at the time of
138/// writing, `std::sync::Mutex`, it is usually possible to implement
139/// this trait on a tuple struct of guard and lock reference (and,
140/// therefore, pass that tuple struct to `Condvar::wait`.
141///
142/// # Provided implementations and support
143///
144/// Implementations are provided for a handful of common `MutexGuard`
145/// types. Contributions of more (with the appropriate optional
146/// dependencies in `Cargo.toml` are welcome.
147///
148/// The [`RelockMutexGuard!`] macro can save much boilerplate in the
149/// common cases.
150///
151/// # Semantics
152///
153/// `async-condvar-fair` assumes that `RelockMutexGuard` impl's are
154/// sensible. If they aren't, malfunctions including deadlocks or
155/// livelocks or panics are possible. But memory safety won't be
156/// compromised.
157///
158/// [`wait`]: Condvar::wait
159pub trait RelockMutexGuard {
160 /// The reference to the mutex, recovered after unlocking.
161 type MutexRef: Clone + Send;
162
163 /// The actual guard type.
164 ///
165 /// Where `Self` (the argument to `wait_baton` or `wait` contains
166 /// both a guard and a separate mutex reference, it is typically most
167 /// convenient for the `wait` futures to produce just the mutex guard.
168 ///
169 /// That is what this type is.
170 type JustGuard;
171
172 /// The type of the relock future.
173 type LockFuture: Future<Output=Self::JustGuard> + Send;
174
175 /// Unlock the mutex and return a reference tt.
176 fn unlock_for_relock(self) -> Self::MutexRef;
177
178 /// Relock the mutex, given a reference.
179 ///
180 /// Poisoning (as found in `std::sync::Mutedx`) will have to
181 /// propagate poisoning as panics, or or ignore it, since
182 /// `notify_one` insists that someone must go and acquire the mutex
183 /// which they can't if that always fails due to poisoning.
184 fn lock(r: Self::MutexRef) -> Self::LockFuture;
185}
186
187/// Implements [`trait@RelockMutexGuard`] (needed for any mutexes used
188/// with a condvar)
189///
190/// # Summary
191///
192/// Helper macro to `impl` [`RelockMutexGuard`], for various mutex
193/// types. `RelockMutexGuard!` has five forms, for five use cases:
194///
195/// * Third-party impl, convenient mutex: mutex ref recoverable from guard
196/// * Third-party impl, inconveneint mutex ref passed separately
197/// * First or second party impl, convenient mutex
198/// * Within `async_condvar_fair`, inconvenient mutex
199/// * Explicitly specify type and lifetime parameters.
200///
201/// An alternative to implementing [`RelockMutexGuard`] (via this
202/// macro or otherwisse) is to use [`wait_no_relock`] everywhere.
203///
204/// # Trait coherence, first/second party vs third party
205///
206/// The firt or second party forms can only be used in the crate
207/// defining the mutex guard type (or within `async_condvar_fair`),
208/// because of Rust's trait coherence rules.
209///
210/// The third party forms define a helper struct, for pasing to
211/// `wait_baton` or `wait`.
212///
213/// # Convenient vs inconvenient mutexes
214///
215/// Ideally, mutex guards implement a method for recovering a
216/// reference to the original mutex. This saves users who need to
217/// unlock and relock a mutex (like anyone uusing a condition
218/// variable) from having to carry a separate copy of a reference to
219/// the mutex.
220///
221/// For convenient mutexes, you can just pass the guard to
222/// [`wait_baton`] or [`wait`]. For inconvenient mutexes, you must
223/// also pass a reference to the uneerlying mutex - typically, as
224/// element `.1` of a tuple or tuple struct.
225///
226/// # Examples
227///
228/// ## Third party impl, convenient mutex
229///
230/// ```
231/// # use async_condvar_fair::{Condvar, RelockMutexGuard};
232/// # use parking_lot as nice;
233/// RelockMutexGuard!{
234/// NiceGuardForCondvar(nice::MutexGuard) [nice::Mutex],
235/// l => async move { l.lock() },
236/// g => nice::MutexGuard::mutex(&g),
237/// }
238/// # let condvar = Condvar::new();
239/// # let mutex = nice::Mutex::new(());
240/// # let guard = mutex.lock();
241/// condvar.wait_baton(NiceGuardForCondvar(guard));
242/// ```
243/// Macro expansion:
244/// ```ignore
245/// struct NiceGuardForCondvar<'l,T>(nice::MutexGuard<'l,T>);
246/// impl<'l,T> RelockMutexGuard for NiceGuardForCondvar<'l,T> {/*...*/}
247/// async fn wait(&self, NiceGuardForCondvar<'l,T>) -> nice::MutexGuard<'l,T>; //roughly
248/// ```
249///
250/// The expression after `l =>` must be a a future, such as an async
251/// block or an un-`await`ed call to an async function. `l` will have
252/// type `&'l MutexMT>`. When awaited, the lock expression must yield
253/// `Guard<'l, T>`.
254///
255/// The expression after `g =>` must recover the mutex reference. `g`
256/// will have type `Guard<'l, T>`, and the expression must have type
257/// `&'l Mutex<T>`.
258///
259/// The mutex recovery expression is given ownership of the guard, but
260/// it should discard it. (The expression forms the core of the
261/// generated implementation of `unlock_for_relock`.)
262///
263/// The optional `$xbound`s are additional bounds on `T` (each with
264/// their own `where`, contrary to normal Rust syntax). (This is
265/// needed, for example, for `RwLockReadGuard`, which needs `T:
266/// Send`.)
267///
268/// ## Third party impl, inconvenient mutex
269///
270/// ```
271/// # use async_condvar_fair::{Condvar, RelockMutexGuard};
272/// # use parking_lot as awkward;
273/// RelockMutexGuard!{
274/// AwkwardGuardForCondvar(awkward::MutexGuard, awkward::Mutex),
275/// l => async move { l.lock() },
276/// }
277/// # let condvar = Condvar::new();
278/// # let mutex = awkward::Mutex::new(());
279/// # let guard = mutex.lock();
280/// condvar.wait_baton(AwkwardGuardForCondvar(guard, &mutex));
281/// ```
282/// Macro expansion:
283/// ```ignore
284/// struct AwkwardGuardForCondvar<'o,'i,T>(
285/// awkward::MutexGuard<'i,T>,
286/// &'o awkward::Mutex<T>,
287/// );
288/// impl<'o,'i,T> RelockMutexGuard for AwkwardGuardForCondvar<'o,'i,T> {/*...*/}
289/// async fn wait(&self, AwkwardGuardForCondvar<'o,'i,T>) -> awkward::MutexGuard<'o,T>; //roughly
290/// ```
291///
292/// ## First and second-party impl's
293///
294/// If you are invoking `RelockMutexGuard!` in the same crate as
295/// defines a convenient mutex, or within `async_condvar_fair`, omit
296/// the struct name.
297///
298/// This will implement the trait directly for the guard type, or the
299/// pair consisting of the guard and the mutex reference:
300///
301/// ### First or second party impl, convenient mutex
302/// ```
303/// # use async_condvar_fair::RelockMutexGuard;
304/// # struct Mutex<T>(parking_lot::Mutex<T>);
305/// # impl<T> Mutex<T> { fn lock(&self) -> MutexGuard<T> { todo!() } }
306/// # struct MutexGuard<'l,T>(parking_lot::MutexGuard<'l,T>);
307/// impl<'l,T> MutexGuard<'l,T> {
308/// fn mutex(self_: &Self) -> &'l Mutex<T> { todo!() }
309/// }
310/// RelockMutexGuard!{
311/// (MutexGuard) [Mutex],
312/// l => async move { l.lock() },
313/// g => MutexGuard::mutex(&g),
314/// }
315/// ```
316/// Generates:
317/// ```ignore
318/// impl<'l,T> RelockMutexGuard for MutexGuard<'l,T> {/*...*/}
319/// async fn wait(&self, MutexGuard<'l,T>) -> MutexGuard<'l,T>; //roughly
320/// ```
321/// ### Within `async_condvar_fair`, inconvenient mutex
322/// ```ignore
323/// RelockMutexGuard!{
324/// (MutexGuard, Mutex),
325/// l => async move { l.lock() },
326/// }
327/// ```
328/// Generates:
329/// ```ignore
330/// impl<'o,'i,T> RelockMutexGuard for (MutexGuard<'i,T>, &'o Mutex<T>) {/*...*/}
331/// async fn wait(&self, (MutexGuard<'i,T>, &'o Mutex<T>)) -> MutexGuard<'o,T>; //roughly
332/// ```
333///
334/// ### Explicit type and lifetime parameters
335///
336/// The other four forms all assume that the unlocked mutex is `&...`
337/// and that the mutex is a wrapper type directly around `T`. For other
338/// situations, for example `Arc`-based owned guards, the explicit form
339/// is needed.
340///
341/// The other four forms are convenience wrappers that all expand to
342/// the explicit form.
343///
344/// ```ignore
345/// RelockMutexGuard!{
346/// <T>
347/// (smol::lock::MutexGuardArc<T>)
348/// [std::sync::Arc<smol::lock::Mutex<T>>, smol::lock::MutexGuardArc<T>],
349/// l => async move { l.lock_arc().await },
350/// g => smol::lock::MutexGuardArc::source(&g).clone(),
351/// where T: 'static
352/// }
353/// ```
354/// `$guard_in` is the argument to [`wait`]. `$guard_out` is the output
355/// of the future. `$mutexref` is the intermediate, unlocked, form.
356///
357/// The first lifetime in the generic arguments (if there are any
358/// lifetimes) becomes a bound on `LockFuture`.
359///
360/// `$t: $bounds` is expanded as `$t : std::marker::Send + $bounds`.
361///
362/// [`wait`]: Condvar::wait
363/// [`wait_baton`]: Condvar::wait_baton
364/// [`wait_no_relock`]: Condvar::wait_no_relock
365/// [`notify_one`]: Condvar::notify_one
366/// [`RelockMutexGuard`]: trait@RelockMutexGuard
367#[macro_export]
368macro_rules! RelockMutexGuard {
369 { $struct:ident ($($guard:tt)+) $(,)? [$($mutex:tt)+] $(,)?
370 $l:pat => $lock:expr,
371 $g:pat => $get_mutex:expr
372 $( , where $xbound:path )* $(,)?
373 } => {
374 pub struct $struct<'l,T>(pub $($guard)+<'l,T>);
375 $crate::RelockMutexGuard! {
376 <'l,T>
377 ( $struct<'l,T> )
378 [ &'l $($mutex)+ <T>, $($guard)+ <'l,T> ],
379 $l => $lock,
380 $struct($g) => $get_mutex,
381 where T: $( $xbound + )*
382 }
383 };
384
385 { $struct:ident ($guard:ident $(:: $guardx:ident)*, $($mutex:tt)+) $(,)?
386 $l:pat => $lock:expr
387 $( , where $xbound:path )* $(,)?
388 } => {
389 pub struct $struct<'i,'o,T>(pub $guard $(:: $guardx)*<'i,T>,
390 pub &'o $($mutex)+<T>);
391
392 $crate::RelockMutexGuard! {
393 <'o,'i,T>
394 ( $struct<'i,'o,T> )
395 [ &'o $($mutex)+ <T>, $guard $(:: $guardx)* <'o,T> ],
396 $l => $lock,
397 g => g.1,
398 where T: $( $xbound + )*
399 }
400
401 };
402
403 { ($($guard:tt)+) $(,)? [$($mutex:tt)+] $(,)?
404 $l:ident => $lock:expr,
405 $g:ident => $get_mutex:expr
406 $( , where $xbound:path )* $(,)?
407 } => {
408 $crate::RelockMutexGuard! {
409 <'l,T>
410 ( $($guard)* <'l,T> )
411 [ &'l $($mutex) +<T>, $($guard)* <'l,T> ],
412 $l => $lock,
413 $g => $get_mutex,
414 where T: $( $xbound + )*
415 }
416 };
417
418 { ($guard:ident $(:: $guardx:ident)*, $($mutex:tt)+) $(,)?
419 $l:ident => $lock:expr
420 $( , where $xbound:path )* $(,)?
421 } => {
422 $crate::RelockMutexGuard! {
423 <'o,'i,T>
424 ( ($guard $(:: $guardx)* <'i,T>, &'o $($mutex)+ <T>) )
425 [ &'o $($mutex)+ <T>, $guard $(:: $guardx)* <'o,T> ],
426 $l => $lock,
427 g => g.1,
428 where T: $( $xbound + )*
429 }
430 };
431
432 { < $($gen_lf0:lifetime, $($gen_lf1:lifetime,)*)? $($gen_ty:ident),* > $(,)?
433 ( $guard_in:ty ) $(,)?
434 [ $mutexref:ty, $guard_out:path ] $(,)?
435 $l:pat => $lock:expr ,
436 $g:pat => $get_mutex:expr ,
437 where $t:ident : $($bound:tt)*
438 } => {
439 impl < $($gen_lf0, $($gen_lf1,)*)? $($gen_ty),* >
440 $crate::RelockMutexGuard for $guard_in
441 where $t : std::marker::Send + $($bound)*
442 {
443 type MutexRef = $mutexref;
444 type JustGuard = $guard_out;
445 type LockFuture = std::pin::Pin<std::boxed::Box<
446 dyn std::future::Future<Output=Self::JustGuard>
447 + std::marker::Send $(+ $gen_lf0)?
448 >>;
449 fn unlock_for_relock(self) -> Self::MutexRef {
450 let $g = self;
451 $get_mutex
452 }
453 fn lock($l: Self::MutexRef) -> Self::LockFuture {
454 std::boxed::Box::pin($lock)
455 }
456 }
457 }
458}
459
460RelockMutexGuard!{
461 (std::sync::MutexGuard, std::sync::Mutex),
462 l => async move { l.lock().unwrap() },
463}
464
465impl<G> RelockMutexGuard for NotRelockable<G> {
466 type MutexRef = ();
467 type JustGuard = ();
468 type LockFuture = std::future::Ready<()>;
469 fn unlock_for_relock(self) -> Self::MutexRef { }
470 fn lock(_l: ()) -> Self::LockFuture { std::future::ready(()) }
471}
472
473macro_rules! impl_parking_lot { {
474 $feat:literal, $parking_lot:ident,
475 $( $FairMutex:ident, )?
476} => {
477 #[cfg(feature=$feat)]
478 RelockMutexGuard!{
479 ($parking_lot::MutexGuard) [$parking_lot::Mutex],
480 l => async move { l.lock() },
481 g => $parking_lot::lock_api::MutexGuard::mutex(&g),
482 }
483 $(
484 #[cfg(feature=$feat)]
485 RelockMutexGuard!{
486 ($parking_lot::FairMutexGuard) [$parking_lot::$FairMutex],
487 l => async move { l.lock() },
488 g => $parking_lot::lock_api::MutexGuard::mutex(&g),
489 }
490 )?
491} }
492impl_parking_lot!{ "parking_lot_0_12", parking_lot , FairMutex, }
493impl_parking_lot!{ "parking_lot_0_11", parking_lot_0_11, FairMutex, }
494impl_parking_lot!{ "parking_lot_0_10", parking_lot_0_10, FairMutex, }
495impl_parking_lot!{ "parking_lot_0_9", parking_lot_0_9, }
496
497#[cfg(feature="tokio")]
498RelockMutexGuard!{
499 (tokio::sync::MutexGuard, tokio::sync::Mutex),
500 l => l.lock(),
501}
502#[cfg(feature="tokio")]
503RelockMutexGuard!{
504 (tokio::sync::RwLockReadGuard, tokio::sync::RwLock)
505 l => l.read(),
506 where Sync
507}
508#[cfg(feature="tokio")]
509RelockMutexGuard!{
510 (tokio::sync::RwLockWriteGuard, tokio::sync::RwLock),
511 l => l.write(),
512 where Sync
513}
514#[cfg(feature="tokio")]
515RelockMutexGuard!{
516 <T>
517 ( (tokio::sync::OwnedMutexGuard<T>,
518 std::sync::Arc<tokio::sync::Mutex<T>>) )
519 [ std::sync::Arc<tokio::sync::Mutex<T>>,
520 tokio::sync::OwnedMutexGuard<T> ],
521 l => async move { l.lock_owned().await },
522 g => g.1,
523 where T: 'static
524}
525#[cfg(feature="tokio")]
526RelockMutexGuard!{
527 <T>
528 ( (tokio::sync::OwnedRwLockReadGuard<T>,
529 std::sync::Arc<tokio::sync::RwLock<T>>) )
530 [ std::sync::Arc<tokio::sync::RwLock<T>>,
531 tokio::sync::OwnedRwLockReadGuard<T> ],
532 l => async move { l.read_owned().await },
533 g => g.1,
534 where T: Sync + 'static
535}
536#[cfg(feature="tokio")]
537RelockMutexGuard!{
538 <T>
539 ( (tokio::sync::OwnedRwLockWriteGuard<T>,
540 std::sync::Arc<tokio::sync::RwLock<T>>) )
541 [ std::sync::Arc<tokio::sync::RwLock<T>>,
542 tokio::sync::OwnedRwLockWriteGuard<T> ],
543 l => async move { l.write_owned().await },
544 g => g.1,
545 where T: Sync + 'static
546}
547
548#[cfg(feature="smol")]
549RelockMutexGuard!{
550 (smol::lock::MutexGuard) [smol::lock::Mutex],
551 l => l.lock(),
552 g => smol::lock::MutexGuard::source(&g),
553}
554#[cfg(feature="smol")]
555RelockMutexGuard!{
556 (smol::lock::RwLockReadGuard, smol::lock::RwLock)
557 l => l.read(),
558 where Sync
559}
560#[cfg(feature="smol")]
561RelockMutexGuard!{
562 <T>
563 (smol::lock::MutexGuardArc<T>)
564 [std::sync::Arc<smol::lock::Mutex<T>>, smol::lock::MutexGuardArc<T>],
565 l => async move { l.lock_arc().await },
566 g => smol::lock::MutexGuardArc::source(&g).clone(),
567 where T: 'static
568}
569
570// ---------- internal structs and types ----------
571
572// States a Waiter/WaitState can be in:
573//
574// WS::Waiting, Entry::Waiting
575//
576// A task is waiting on the mutex. (The future has been created and
577// may be being polled.)
578//
579// If the `Waiter` is dropped, the `Entry` is removed from the list.
580// See below (esp., `Entry::Signaled`) about the handling of any
581// notifications this might affect.
582//
583// WS::Waiting, Entry::Broadcasted
584//
585// The task was waiting and has been woken (waker.wake()). (Or
586// Perhaps the fututre has not been polled yet, in which case there
587// is no Waker, but then when the future is polled, we will see the
588// broadcast.
589//
590// WS::Waiting, Entry::Signaled
591//
592// As for Broadcasted, but, additionally: `notify_one` was called
593// and this WS/Entry represents the baton. If the `Waiter` is
594// dropped, the condvar must be re-signaled to notify another
595// waiter.
596//
597// WS::Locking
598//
599// The task is trying to reacquire the lock. We have a suitable
600// future working on that. `baton` tells us whether, if we drop
601// this `Waiter`, we need to pass on the baton by signalling another
602// task.
603//
604// WS::Ended
605//
606// The Waker is a future. We have already returned `Poll::Ready`.
607// We don't expect to be called again; if we are, we panic.
608
609#[derive(Debug,Default)]
610struct CV {
611 list: dlv_list::VecList<Entry>,
612}
613type I = dlv_list::Index<Entry>;
614
615#[derive(Debug)]
616enum Entry {
617 Waiting(Option<Waker>),
618 Signaled, // implied baton
619 Broadcasted,
620}
621use Entry::*;
622
623// This is a bit clunky because of a number of interlocking constraints:
624// * The public struct must be a struct, not an enum. Hence the
625// public wrapper `Waiter` vs the internal `WaitState`.
626//
627// * We want to impl Drop, somehow. pin_project struts cannot impl
628// Drop, so we impl Drop on sub-structs that are not structurally
629// pinned. So we have both pinned and non-pinned sub-fields.
630//
631// * The Drop impls must be on parts of the struct which contain
632// `&Condvar`. This plus the above means `&Condvar` must be
633// entangled with the enum, even though all the variants have it.
634//
635// * pin_project_lite supports only struct variants, not tuple variants.
636
637pin_project! {
638#[project=WSProj]
639enum WaitState<'c,G> where G: RelockMutexGuard {
640 Waiting {
641 ns: WS_Waiting<'c,G>
642 },
643 Locking {
644 ns: WS_Locking_NS<'c>, // "non-structural", ie not #[pin], but Drop
645 #[pin] locking: G::LockFuture,
646 },
647 Ended,
648}}
649type WS<'c,G> = WaitState<'c,G>;
650
651#[derive(Debug)]
652#[allow(non_camel_case_types)]
653struct WS_Waiting<'c,G> where G: RelockMutexGuard {
654 condvar: &'c Condvar,
655 ent: Option<I>, // always Some, except when dropping or overwriting
656 lock: G::MutexRef,
657}
658#[derive(Debug)]
659#[allow(non_camel_case_types)]
660struct WS_Locking_NS<'c> {
661 condvar: &'c Condvar,
662 baton: bool,
663}
664
665// ---------- waiting ----------
666
667impl Condvar {
668 /// Wait for someone to call [`notify_one`] or [`notify_all`]
669 ///
670 /// Atomically unlocks the mutex corresponding to `guard` and starts
671 /// waiting for notify.
672 ///
673 /// This is a future producing `(G::JustGuard, Option<` [`Baton`] `<'c>>)`.
674 ///
675 /// `G` is a fresh guard for the mutex, which has been unlocked and
676 /// relocked.
677 ///
678 /// `baton` is a token representing an possible obligation to either
679 /// perform the actions that the caller of `notify_one` is
680 /// expecting, or re-notify the condvar. See [`Baton`]. `baton`
681 /// will be `None` if the wakeup was the result of `notify_all`.
682 ///
683 /// [`notify_one`]: Condvar::notify_one
684 /// [`notify_all`]: Condvar::notify_all
685 pub fn wait_baton<'c,G>(&'c self, guard: G) -> Waiter<'c,G>
686 where G: RelockMutexGuard
687 {
688 let mut cv = self.0.lock();
689 let ent = cv.list.push_back(Waiting(None));
690 let lock = RelockMutexGuard::unlock_for_relock(guard);
691 Waiter { waitstate: WS::Waiting { ns: WS_Waiting {
692 condvar: self,
693 ent: Some(ent),
694 lock,
695 } } }
696 }
697
698 /// Wait for a notification; caller must worry about cancellation
699 ///
700 /// Like `wait_baton` but [`disposes`](Baton::dispose) of the baton
701 /// right away on return.
702 ///
703 /// # Beware task cancellation when using with `notify_one` and an async mutex
704 ///
705 /// When `wait`, `notify_one`, and an async mutex, are combined,
706 /// notifications can easily be lost: perhaps a task calling `wait`
707 /// could be cancelled after being woken up by `notify_one`, but
708 /// before doing the actual work. If that happens, probably no
709 /// other task will be signaled and the work would go undone.
710 ///
711 /// So when using `notify_one`, with an async mutex, it is probably
712 /// best to use `wait_baton`.
713 ///
714 /// `async-condvar-fair` does guarantee that `notify_one` will
715 /// ensure that at least one `wait` call returns to its caller, (In
716 /// particular, if the `wait` task is cancelled after being selected
717 /// by `notify_one`, but before it manages to acquire the mutex, the
718 /// condvar will be re-notified.)
719 ///
720 /// But, in async code it is difficult and error-prone to try to
721 /// avoid waiting. Any `await` might result in task cancellaton,
722 /// and then if you're not using `wait_baton`, the notification will
723 /// be lost.
724 ///
725 /// [`wait_baton`] avoids this problem. `notify_all` doesn't suffer
726 /// from it because everyone is woken up anyway. If you are using a
727 /// sync mutex, there is no problem either, because you won't be
728 /// `await`ing while processing (ie, while holding the mutex) anyway.
729 ///
730 /// [`notify_one`]: Condvar::notify_one
731 /// [`notify_all`]: Condvar::notify_all
732 /// [`wait_baton`]: Condvar::wait_baton
733 pub async fn wait<'c,G>(&'c self, guard: G) -> G::JustGuard
734 where G: RelockMutexGuard
735 {
736 let (guard, baton) = self.wait_baton(guard).await;
737 baton.dispose();
738 guard
739 }
740
741 /// Wait for notification; caller will have to relock the mutex
742 ///
743 /// Like [`wait_baton`] but does not relock the mutex.
744 ///
745 /// This can be used with any mutex guard type, even one for
746 /// which no impl of [`trait@RelockMutexGuard`] is available.
747 ///
748 /// `wait_no_relock` will first start waiting for notifications, and
749 /// then drop `guard` (which, with a mutex guard, will unlock the
750 /// mutex). When `wait_no_relock` completes, you will very probably
751 /// want to acquire the mutex again: with `wait_no_relock` this must
752 /// be done separately.
753 ///
754 /// Be sure to `dispose` of the `Option<`[`Baton`]`>` exactly iff
755 /// that is appropriate.
756 ///
757 /// # Deadlock hazard
758 ///
759 /// There is no type restricton on `guard`. It is important that
760 /// you pass the ownership of the actual mutex guard.
761 /// There is no way for the compiler to spot if you don't.
762 /// If (for example) you pass `&mut MutexGuard`,
763 /// you will fail to unlock the mutex, usually resulting in deadlock.
764 ///
765 /// [`wait_baton`]: Condvar::wait_baton
766 pub async fn wait_no_relock<'c,G>(&'c self, guard: G) -> Option<Baton<'c>> {
767 let (_guard, baton) = self.wait_baton(NotRelockable(guard)).await;
768 baton
769 }
770}
771struct NotRelockable<G>(G);
772
773impl<'c,G> Future for Waiter<'c,G> where G: RelockMutexGuard {
774 type Output = (G::JustGuard, Option<Baton<'c>>);
775 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
776 -> Poll<Self::Output>
777 {
778 loop { // to let us rerun the match if we cange the state
779
780 match self.as_mut().project().waitstate.project() {
781
782 WSProj::Ended => panic!(),
783
784 WSProj::Waiting { ns: WS_Waiting { condvar, ent, lock } } => {
785 let condvar = *condvar;
786 let mut cv = condvar.0.lock();
787 let entry = &mut cv.list.get_mut(ent.unwrap()).unwrap();
788 let baton = match entry {
789 Signaled => true,
790 Broadcasted => false,
791 Waiting(waker) => {
792 *waker = Some(cx.waker().clone());
793 return Pending;
794 }
795 };
796 // OK, condvar has been signaled, we need to start to
797 // reaquire the lock. We have converted the baton from
798 // Entry::Signaled to a a boolean.
799 cv.list.remove(ent.take().unwrap());
800 let locking = <G as RelockMutexGuard>::lock(lock.clone());
801 self.as_mut().set(Waiter { waitstate: { WS::Locking {
802 ns: WS_Locking_NS { condvar, baton },
803 locking,
804 } } } );
805 },
806
807 WSProj::Locking { ns: WS_Locking_NS { condvar, baton }, locking } => {
808 let guard = match locking.poll(cx) {
809 Pending => return Pending,
810 Ready(guard) => guard,
811 };
812 let rbaton = condvar.baton_from_bool(*baton);
813 *baton = false;
814 self.as_mut().set(Waiter { waitstate: WS::Ended });
815 return Ready((guard, rbaton))
816 }
817 }
818
819 }
820 }
821}
822
823// ---------- notification ----------
824
825impl Condvar {
826 /// Notify a waiting task (aka "signal")
827 ///
828 /// If there are any tasks in [`wait_baton`] (or
829 /// [`wait_no_relock`]), at least one of them will wake up and get
830 /// `Some(`[`Baton`]`)` from wait_baton.
831 ///
832 /// Likewise, if there are any tasks in [`wait`], at least one of
833 /// them will wake up and return. But if that task is cancelled
834 /// after `wait` completss, the notification may be lost.
835 /// See [`wait`] and [`Baton`] for a discussion of the interaction
836 /// between task cancellation and `notify_one`.
837 ///
838 /// Notifications do not "stack" or "count". Calling `notify_one`
839 /// several times might still wake up only one task.
840 ///
841 /// [`wait`]: Condvar::wait
842 /// [`wait_baton`]: Condvar::wait_baton
843 /// [`wait_no_relock`]: Condvar::wait_no_relock
844 pub fn notify_one(&self) {
845 self.0.lock().notify_one()
846 }
847}
848
849impl CV {
850 fn notify_one(&mut self) {
851 //dbg!(&self.list);
852 if let Some(entry) = self.list.front_mut() {
853 match entry {
854 Signaled | Broadcasted => { }, // notify_one is idempotent
855 Waiting(waker) => {
856 if let Some(waker) = waker.take() { waker.wake() }
857 *entry = Signaled;
858 },
859 };
860 }
861 }
862}
863
864impl Condvar {
865 /// Notify all waiting tasks (aka "broadcast")
866 ///
867 /// Wake up all tasks currently in [`wait`], [`wait_baton`],
868 /// and [`wait_no_relock`],
869 ///
870 /// Each the tasks in [`wait`] and [`wait_baton`] will start to try
871 /// to reacquire the mutex; they will then (in general) take turns
872 /// to return from `wait`/`wait_baton` with the mutex held.
873 ///
874 /// All tasks will get `None` rather than `Some(` [`Baton`] `)`,
875 /// from `wait_baton` or `wait_no_relock` - even possibly tasks
876 /// which are in the process of waking up because of a previous call
877 /// to `notify_one`.
878 ///
879 /// [`wait`]: Condvar::wait
880 /// [`wait_baton`]: Condvar::wait_baton
881 /// [`wait_no_relock`]: Condvar::wait_no_relock
882 pub fn notify_all(&self) {
883 let mut cv = self.0.lock();
884 for entry in cv.list.iter_mut() {
885 match entry {
886 Signaled | Broadcasted => {
887 *entry = Broadcasted; // no baton needed any more
888 },
889 Waiting(waker) => {
890 if let Some(waker) = waker.take() { waker.wake() }
891 *entry = Broadcasted;
892 },
893 };
894 }
895 }
896}
897
898impl Condvar {
899 fn baton_from_bool<'c>(&'c self, yes: bool) -> Option<Baton<'c>> {
900 if yes {
901 Some(self.make_baton())
902 } else {
903 None
904 }
905 }
906
907 /// Make a baton directly, without waiting
908 ///
909 /// This may be useful in unusual situations.
910 ///
911 /// If the returned `Baton` is simply dropped, this is the same as
912 /// [`notify_one`].
913 ///
914 /// [`notify_one`]: Condvar::notify_one
915 pub fn make_baton<'c>(&'c self) -> Baton<'c> {
916 Baton { condvar: Some(self) }
917 }
918}
919
920impl Baton<'_> {
921 /// Declare that responsibility has been discharged
922 ///
923 /// The baton will be consumed, without generating any notifications.
924 pub fn dispose(mut self) { let _ = self.condvar.take(); }
925
926 /// Pass on the baton to someone else, if anyone else is waiting
927 ///
928 /// This is equivalent to `mem::drop`.
929 pub fn pass(self) { /* drop impl will do the actual passing */ }
930}
931
932/// Extension trait for `Option<Baton>` to provide `dispose` and `pass`
933pub trait BatonExt: Sized {
934 /// Declare any responsibility has been discharged
935 fn dispose(self);
936 /// Pass on any responsibility to someone else
937 fn pass(self) { /* drop impl will do the actual passing */ }
938}
939impl BatonExt for Option<Baton<'_>> {
940 fn dispose(self) { self.map(Baton::dispose); }
941}
942
943impl<G> Drop for WS_Waiting<'_,G> where G: RelockMutexGuard {
944 fn drop(&mut self) {
945 if let Some(ent) = self.ent.take() {
946 let mut cv = self.condvar.0.lock();
947 cv.list.remove(ent);
948 }
949 }
950}
951impl Drop for WS_Locking_NS<'_> {
952 fn drop(&mut self) {
953 let _baton = self.condvar.baton_from_bool(self.baton);
954 // we pass the baton to Baton::dorp */
955 }
956}
957
958impl Drop for Baton<'_> {
959 fn drop(&mut self) {
960 if let Some(condvar) = self.condvar.take() {
961 condvar.notify_one();
962 }
963 }
964}