wait_list/
lib.rs

1//! This crate has been deprecated in favour of [`pin-list`](https://docs.rs/pin-list)!
2//! If you want to get the crate name for something else, please do not hesitate to [contact me] or
3//! email <help@crates.io>.
4//!
5//! [contact me]: https://sabrinajewson.org/
6//!
7//! Original readme below:
8//!
9//! This crate provides `WaitList`, the most fundamental type for async synchronization. `WaitList`
10//! is implemented as an intrusive linked list of futures.
11//!
12//! # Feature flags
13//!
14//! - `std`: Implements the `Lock` traits on locks from the standard library.
15//! - `lock_api_04`: Implements the `Lock` traits on locks from [`lock_api`] v0.4. This enables
16//! integration of crates like [`parking_lot`], [`spin`] and [`usync`].
17//! - `loom_05`: Implements the `Lock` traits on locks from [`loom`] v0.5.
18//!
19//! # Example
20//!
21//! A thread-safe unfair async mutex.
22//!
23//! ```
24//! use pin_project_lite::pin_project;
25//! use std::cell::UnsafeCell;
26//! use std::future::Future;
27//! use std::ops::Deref;
28//! use std::ops::DerefMut;
29//! use std::pin::Pin;
30//! use std::task;
31//! use std::task::Poll;
32//! use wait_list::WaitList;
33//!
34//! pub struct Mutex<T> {
35//!     data: UnsafeCell<T>,
36//!     waiters: WaitList<std::sync::Mutex<bool>, (), ()>,
37//! }
38//!
39//! unsafe impl<T> Sync for Mutex<T> {}
40//!
41//! impl<T> Mutex<T> {
42//!     pub fn new(data: T) -> Self {
43//!         Self {
44//!             data: UnsafeCell::new(data),
45//!             waiters: WaitList::new(std::sync::Mutex::new(false)),
46//!         }
47//!     }
48//!     pub fn lock(&self) -> Lock<'_, T> {
49//!         Lock {
50//!             mutex: self,
51//!             inner: wait_list::Wait::new(),
52//!         }
53//!     }
54//! }
55//!
56//! pin_project! {
57//!     pub struct Lock<'mutex, T> {
58//!         mutex: &'mutex Mutex<T>,
59//!         #[pin]
60//!         inner: wait_list::Wait<'mutex, std::sync::Mutex<bool>, (), (), TryForward>,
61//!     }
62//! }
63//!
64//! impl<'mutex, T> Future for Lock<'mutex, T> {
65//!     type Output = Guard<'mutex, T>;
66//!     fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
67//!         let mut this = self.project();
68//!
69//!         let mut waiters = if this.inner.as_ref().is_completed() {
70//!             // If we haven't initialized the future yet, lock the mutex for the first time
71//!             this.mutex.waiters.lock_exclusive()
72//!         } else {
73//!             // Otherwise, wait for us to be woken
74//!             match this.inner.as_mut().poll(cx) {
75//!                 Poll::Ready((waiters, ())) => waiters,
76//!                 Poll::Pending => return Poll::Pending,
77//!             }
78//!         };
79//!
80//!         // If the mutex is unlocked, mark it as locked and return the guard
81//!         if !*waiters.guard {
82//!             *waiters.guard = true;
83//!             return Poll::Ready(Guard { mutex: this.mutex });
84//!         }
85//!
86//!         // Otherwise, re-register ourselves to be woken when the mutex is unlocked again
87//!         this.inner.init(cx.waker().clone(), &mut waiters, (), TryForward);
88//!         Poll::Pending
89//!     }
90//! }
91//!
92//! /// When the future is cancelled before the mutex guard can be taken, wake up the next waiter.
93//! struct TryForward;
94//! impl<'wait_list> wait_list::CancelCallback<'wait_list, std::sync::Mutex<bool>, (), ()>
95//!     for TryForward
96//! {
97//!     fn on_cancel(
98//!         self,
99//!         mut list: wait_list::LockedExclusive<'wait_list, std::sync::Mutex<bool>, (), ()>,
100//!         output: (),
101//!     ) {
102//!         let _ = list.wake_one(());
103//!     }
104//! }
105//!
106//! pub struct Guard<'mutex, T> {
107//!     mutex: &'mutex Mutex<T>,
108//! }
109//!
110//! impl<T> Deref for Guard<'_, T> {
111//!     type Target = T;
112//!     fn deref(&self) -> &Self::Target {
113//!         unsafe { &*self.mutex.data.get() }
114//!     }
115//! }
116//! impl<T> DerefMut for Guard<'_, T> {
117//!     fn deref_mut(&mut self) -> &mut Self::Target {
118//!         unsafe { &mut *self.mutex.data.get() }
119//!     }
120//! }
121//!
122//! impl<T> Drop for Guard<'_, T> {
123//!     fn drop(&mut self) {
124//!         let mut waiters = self.mutex.waiters.lock_exclusive();
125//!         *waiters.guard = false;
126//!         let _ = waiters.wake_one(());
127//!     }
128//! }
129//! #
130//! # fn assert_send<T: Send>(_: T) {}
131//! # let mutex = Mutex::new(());
132//! # assert_send(mutex.lock());
133//! ```
134//!
135//! [`lock_api`]: https://docs.rs/lock_api
136//! [`parking_lot`]: https://docs.rs/parking_lot
137//! [`spin`]: https://docs.rs/spin
138//! [`usync`]: https://docs.rs/usync
139//! [`loom`]: https://docs.rs/loom
140#![warn(
141    clippy::pedantic,
142    missing_debug_implementations,
143    missing_docs,
144    noop_method_call,
145    trivial_casts,
146    trivial_numeric_casts,
147    unsafe_op_in_unsafe_fn,
148    unused_lifetimes,
149    unused_qualifications
150)]
151#![allow(
152    clippy::items_after_statements,
153    // `ǃ` (latin letter retroflex click) is used in the tests for a never type
154    uncommon_codepoints,
155)]
156#![no_std]
157#![cfg_attr(doc_nightly, feature(doc_cfg))]
158
159#[cfg(feature = "alloc")]
160extern crate alloc;
161
162#[cfg(feature = "std")]
163extern crate std;
164
165#[cfg(feature = "lock_api_04")]
166pub extern crate lock_api_04_crate as lock_api_04;
167
168#[cfg(feature = "loom_05")]
169pub extern crate loom_05_crate as loom_05;
170
171use core::cell::UnsafeCell;
172use core::fmt;
173use core::fmt::Debug;
174use core::fmt::Formatter;
175use core::future::Future;
176use core::mem;
177use core::ops::Deref;
178use core::pin::Pin;
179use core::ptr;
180use core::ptr::NonNull;
181use core::task;
182use core::task::Poll;
183use pin_project_lite::pin_project;
184use pinned_aliasable::Aliasable;
185
186pub mod lock;
187#[doc(no_inline)]
188pub use lock::Lock;
189
190/// An intrusive linked list of futures.
191pub struct WaitList<L: Lock, I, O> {
192    /// The lock used by the `WaitList`.
193    ///
194    /// This is used for internal synchronization, but you can also protect arbitrary state useful
195    /// to you in here.
196    pub lock: L,
197
198    /// Inner state of the wait list, protected by the above lock.
199    inner: UnsafeCell<Inner<I, O>>,
200}
201
202unsafe impl<L: Lock, I, O> Send for WaitList<L, I, O>
203where
204    // - `L` is required to be `Send` because we provide access to it and run its destructor
205    // regardless of caller thread.
206    // - `L` is not required to be `Sync` because this type holds complete ownership over all `L`s.
207    L: Send,
208    // - `I` is required to be `Send` because we allow an `&mut Self -> &mut I` conversion, and
209    // it's possible to send an `&mut Self` with values inside if a future is leaked.
210    // - `I` is not required to be `Sync` because there is no shared access of it between objects.
211    I: Send,
212    // - `O` is not required to be `Send` because the only situation in which an `O` can be
213    // obtained or dropped through this type is in `wait`, which needs multiple shared references
214    // to this type to exist.
215    // - `O` is not required to be `Sync` because we never deal in `&O`.
216    O:,
217{
218}
219
220unsafe impl<L: Lock, I, O> Sync for WaitList<L, I, O>
221where
222    // - `L` is not required to be `Send` because we don't allow moving out our `L` from a shared
223    // reference.
224    // - `L` is required to be `Sync` because we support an `&Self -> &L` conversion: `&self.lock`.
225    L: Sync,
226    // - `I` is required to be `Send` because its ownership can be transferred between two threads
227    // via a `WaitList`, if one thread waits on input and another wakes the first.
228    // - `I` is required to be `Sync` because we support an `&Self -> &I` conversion which can be
229    // called from multiple threads concurrently (via the shared locks).
230    I: Send + Sync,
231    // - `O` is required to be `Send` because its ownership can be transferred between threads
232    // using an `&Self`.
233    // - `O` is not required to be `Sync` because we never deal in `&O`.
234    O: Send,
235{
236}
237
238struct Inner<I, O> {
239    /// The head of the queue; the oldest waiter.
240    ///
241    /// If this is `None`, the list is empty.
242    head: Option<NonNull<UnsafeCell<Waiter<I, O>>>>,
243
244    /// The tail of the queue; the newest waiter.
245    ///
246    /// Whether this is `None` must remain in sync with whether `head` is `None`.
247    tail: Option<NonNull<UnsafeCell<Waiter<I, O>>>>,
248}
249
250/// A waiter in the above list.
251///
252/// Each waiter in the list is wrapped in an `UnsafeCell` because there are several places that may
253/// hold a reference two it (the linked list and the waiting future). The `UnsafeCell` is guarded
254/// by the `WaitList`'s lock.
255///
256/// Each `Waiter` is stored by its waiting future, and will be automatically removed from the
257/// linked list by `dequeue` when the future completes or is cancelled.
258struct Waiter<I, O> {
259    /// The next waiter in the linked list.
260    next: Option<NonNull<UnsafeCell<Waiter<I, O>>>>,
261
262    /// The previous waiter in the linked list.
263    prev: Option<NonNull<UnsafeCell<Waiter<I, O>>>>,
264
265    /// The state the waiter is in.
266    state: WaiterState<I, O>,
267}
268
269enum WaiterState<I, O> {
270    /// The waiter has not been woken.
271    Waiting { input: I, waker: task::Waker },
272
273    /// The waiter has been woken.
274    Woken { output: O },
275}
276
277impl<I, O> Inner<I, O> {
278    /// Add a waiter node to the end of this linked list.
279    ///
280    /// # Safety
281    ///
282    /// - `waiter` must be the only reference to that object.
283    /// - `waiter` must be a valid pointer until it is removed.
284    unsafe fn enqueue(&mut self, waiter: &UnsafeCell<Waiter<I, O>>) {
285        // Set the previous waiter to the current tail of the queue, if there was one.
286        unsafe { &mut *waiter.get() }.prev = self.tail;
287
288        let waiter_ptr = NonNull::from(waiter);
289
290        // Update the old tail's next pointer
291        if let Some(prev) = self.tail {
292            let prev = unsafe { &mut *prev.as_ref().get() };
293            debug_assert_eq!(prev.next, None);
294            prev.next = Some(waiter_ptr);
295        }
296
297        // Set the waiter as the new tail of the linked list
298        self.tail = Some(waiter_ptr);
299
300        // Also set it as the head if there isn't currently a head.
301        self.head.get_or_insert(waiter_ptr);
302    }
303
304    /// Remove a waiter node from an arbitrary position in the linked list.
305    ///
306    /// # Safety
307    ///
308    /// - `waiter` must be a waiter in this queue.
309    /// - No other unique references to `waiter` may exist.
310    unsafe fn dequeue(&mut self, waiter: &UnsafeCell<Waiter<I, O>>) {
311        let waiter_ptr = Some(NonNull::from(waiter));
312        let waiter = unsafe { &mut *waiter.get() };
313
314        let prev = waiter.prev;
315        let next = waiter.next;
316
317        // Update the pointer of the previous node, or the queue head
318        let prev_next_pointer = match waiter.prev {
319            Some(prev) => &mut unsafe { &mut *prev.as_ref().get() }.next,
320            None => &mut self.head,
321        };
322        debug_assert_eq!(*prev_next_pointer, waiter_ptr);
323        *prev_next_pointer = next;
324
325        // Update the pointer of the next node, or the queue tail
326        let next_prev_pointer = match waiter.next {
327            Some(next) => &mut unsafe { &mut *next.as_ref().get() }.prev,
328            None => &mut self.tail,
329        };
330        debug_assert_eq!(*next_prev_pointer, waiter_ptr);
331        *next_prev_pointer = prev;
332    }
333}
334
335impl<L, I, O> WaitList<L, I, O>
336where
337    // workaround for no trait bounds in `const fn`
338    <core::iter::Empty<L> as Iterator>::Item: Lock,
339{
340    /// Create a new empty `WaitList`.
341    #[must_use]
342    pub const fn new(lock: L) -> Self {
343        Self {
344            lock,
345            inner: UnsafeCell::new(Inner {
346                head: None,
347                tail: None,
348            }),
349        }
350    }
351
352    /// Take an exclusive lock on the contents of this list.
353    ///
354    /// This function should not be called recursively as it may deadlock, panic or abort.
355    #[must_use]
356    pub fn lock_exclusive(&self) -> LockedExclusive<'_, L, I, O> {
357        LockedExclusive {
358            guard: self.lock.lock_exclusive(),
359            common: LockedCommon { wait_list: self },
360        }
361    }
362
363    /// Take a shared lock on the contents of this list.
364    ///
365    /// If your chosen lock type only supports exclusive locks, this will take an exclusive lock
366    /// instead.
367    #[must_use]
368    pub fn lock_shared(&self) -> LockedShared<'_, L, I, O> {
369        LockedShared {
370            guard: self.lock.lock_shared(),
371            common: LockedCommon { wait_list: self },
372        }
373    }
374}
375
376impl<L: Lock + Default, I, O> Default for WaitList<L, I, O> {
377    fn default() -> Self {
378        Self::new(L::default())
379    }
380}
381
382impl<L: Lock + Debug, I, O> Debug for WaitList<L, I, O> {
383    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
384        f.debug_struct("WaitList")
385            .field("lock", &self.lock)
386            .finish()
387    }
388}
389
390/// An exclusive lock on a [`WaitList`], created by [`WaitList::lock_exclusive`].
391pub struct LockedExclusive<'wait_list, L: Lock, I, O> {
392    /// The lock guard holding the lock on the `WaitList`.
393    ///
394    /// You can use this field to access whatever auxiliary locked state you have associated with
395    /// the `WaitList`.
396    pub guard: <L as lock::Lifetime<'wait_list>>::ExclusiveGuard,
397
398    common: LockedCommon<'wait_list, L, I, O>,
399}
400
401impl<'wait_list, L: Lock, I, O> Deref for LockedExclusive<'wait_list, L, I, O> {
402    type Target = LockedCommon<'wait_list, L, I, O>;
403    fn deref(&self) -> &Self::Target {
404        &self.common
405    }
406}
407
408impl<'wait_list, L: Lock, I, O> LockedExclusive<'wait_list, L, I, O> {
409    fn inner_mut(&mut self) -> &mut Inner<I, O> {
410        // SAFETY: We have exclusive locked access to the `WaitList`
411        unsafe { &mut *self.wait_list.inner.get() }
412    }
413
414    /// Retrieve a unique reference to the input given by the head entry in the list, if there is
415    /// one.
416    #[must_use]
417    pub fn head_input_mut(&mut self) -> Option<&mut I> {
418        // SAFETY: We have exclusive access, so we can access any entry in the list.
419        Some(match unsafe { &mut (*self.head()?.get()).state } {
420            WaiterState::Waiting { input, waker: _ } => input,
421            WaiterState::Woken { .. } => unreachable!(),
422        })
423    }
424
425    /// Wait on the list for someone to call [`Self::wake_one`].
426    ///
427    /// This method takes ownership of `self` so that the lock can be released while the future is
428    /// suspended. At the end, ownership of the lock guard is transferred back to the caller.
429    ///
430    /// A callback must be supplied to call in the event that the future has been woken but was
431    /// cancelled before it could complete. You will often want to re-call [`Self::wake_one`] in
432    /// this case to pass on the notification to someone else.
433    ///
434    /// Note that the returned future will not be `Send` if your guard types are `!Send`. To avoid
435    /// this problem, use the lower-level [`Wait`] API instead.
436    pub fn init_and_wait<OnCancel>(
437        self,
438        input: I,
439        on_cancel: OnCancel,
440    ) -> InitAndWait<'wait_list, L, I, O, OnCancel>
441    where
442        OnCancel: CancelCallback<'wait_list, L, I, O>,
443    {
444        InitAndWait {
445            input: Some(InitAndWaitInput {
446                lock: self,
447                input,
448                on_cancel,
449            }),
450            inner: Wait::new(),
451        }
452    }
453
454    /// Pop the first waiter from the front of the queue, if there is one.
455    ///
456    /// Returns ownership of that waiter's input value and the waker that can be used to wake it.
457    ///
458    /// It is recommended to only wake the waker when the lock guard is _not_ held, because waking
459    /// the waker may attempt to drop the future (if for example the runtime is shutting down)
460    /// which would deadlock if the future is registered in `WaitList`.
461    ///
462    /// # Errors
463    ///
464    /// Returns an error and gives back the given output when there are no wakers in the list.
465    pub fn pop(&mut self, output: O) -> Result<(I, task::Waker), O> {
466        let head = match self.inner_mut().head {
467            Some(head) => head,
468            None => return Err(output),
469        };
470
471        let (input, waker) = {
472            // SAFETY: We hold an exclusive lock to the list.
473            let head_waiter = unsafe { &mut *head.as_ref().get() };
474
475            // Mark the head node's state as done.
476            let new_state = WaiterState::Woken { output };
477            match mem::replace(&mut head_waiter.state, new_state) {
478                WaiterState::Waiting { input, waker } => (input, waker),
479                WaiterState::Woken { .. } => unreachable!(),
480            }
481        };
482
483        // Dequeue the first waiter now that it's not necessary to keep it in the queue.
484        unsafe { self.inner_mut().dequeue(head.as_ref()) };
485
486        Ok((input, waker))
487    }
488
489    /// Wake and dequeue the first waiter in the queue, if there is one.
490    ///
491    /// Returns ownership of that waiter's input value.
492    ///
493    /// This method consumes `self` so we can ensure that the lock guard is freed before calling
494    /// `wake` on the waker, to prevent deadlocks.
495    ///
496    /// # Errors
497    ///
498    /// Returns an error and gives back the given output when there are no wakers in the list.
499    pub fn wake_one(mut self, output: O) -> Result<I, O> {
500        let (input, waker) = self.pop(output)?;
501        drop(self);
502        waker.wake();
503        Ok(input)
504    }
505}
506
507impl<'wait_list, L: Lock + Debug, I, O> Debug for LockedExclusive<'wait_list, L, I, O>
508where
509    <L as lock::Lifetime<'wait_list>>::ExclusiveGuard: Debug,
510{
511    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
512        f.debug_struct("LockedExclusive")
513            .field("guard", &self.guard)
514            .field("common", &self.common)
515            .finish()
516    }
517}
518
519/// A shared lock on a [`WaitList`], created by [`WaitList::lock_shared`].
520pub struct LockedShared<'wait_list, L: Lock, I, O> {
521    /// The lock guard holding the lock on the `WaitList`.
522    pub guard: <L as lock::Lifetime<'wait_list>>::SharedGuard,
523
524    common: LockedCommon<'wait_list, L, I, O>,
525}
526
527impl<'wait_list, L: Lock, I, O> Deref for LockedShared<'wait_list, L, I, O> {
528    type Target = LockedCommon<'wait_list, L, I, O>;
529    fn deref(&self) -> &Self::Target {
530        &self.common
531    }
532}
533
534impl<'wait_list, L: Lock + Debug, I, O> Debug for LockedShared<'wait_list, L, I, O>
535where
536    <L as lock::Lifetime<'wait_list>>::SharedGuard: Debug,
537{
538    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
539        f.debug_struct("LockedShared")
540            .field("guard", &self.guard)
541            .field("common", &self.common)
542            .finish()
543    }
544}
545
546/// Common functions that can work on both exclusive and shared locks.
547///
548/// You can never create nor hold an instance of this type — it is accessed solely through the
549/// `Deref` implementations of [`LockedShared`] and [`LockedExclusive`].
550#[non_exhaustive]
551pub struct LockedCommon<'wait_list, L: Lock, I, O> {
552    /// The list this type locks.
553    pub wait_list: &'wait_list WaitList<L, I, O>,
554}
555
556impl<'wait_list, L: Lock, I, O> LockedCommon<'wait_list, L, I, O> {
557    fn inner(&self) -> &Inner<I, O> {
558        // SAFETY: We have at least shared locked access to the `WaitList`
559        unsafe { &*self.wait_list.inner.get() }
560    }
561
562    fn head(&self) -> Option<&UnsafeCell<Waiter<I, O>>> {
563        // SAFETY: The head pointer of the linked list must always be valid.
564        Some(unsafe { self.inner().head?.as_ref() })
565    }
566
567    /// Check whether there are any futures waiting in this list.
568    #[must_use]
569    pub fn is_empty(&self) -> bool {
570        self.inner().head.is_none()
571    }
572
573    /// Retrieve a shared reference to the input given by the head entry in the list, if there is
574    /// one.
575    #[must_use]
576    pub fn head_input(&self) -> Option<&I> {
577        // SAFETY: We have at least shared locked access, so we can access any entry in the list.
578        Some(match unsafe { &(*self.head()?.get()).state } {
579            WaiterState::Waiting { input, waker: _ } => input,
580            WaiterState::Woken { .. } => unreachable!(),
581        })
582    }
583}
584
585impl<'wait_list, L: Lock + Debug, I, O> Debug for LockedCommon<'wait_list, L, I, O> {
586    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
587        f.debug_struct("LockedCommon")
588            .field("wait_list", &self.wait_list)
589            .finish()
590    }
591}
592
593/// The future of a waiting operation.
594///
595/// This type provides a lower-level API than [`LockedExclusive::init_and_wait`], but is useful
596/// if your guard types are `!Send` but you still want the outer future to remain `Send`.
597///
598/// Awaiting and polling this future will panic if you have not called [`init`] yet.
599///
600/// [`init`]: Self::init
601#[must_use = "futures do nothing unless you `.await` or poll them"]
602pub struct Wait<'wait_list, L: Lock, I, O, OnCancel>
603where
604    OnCancel: CancelCallback<'wait_list, L, I, O>,
605{
606    inner: Option<WaitInner<'wait_list, L, I, O, OnCancel>>,
607}
608
609pin_project! {
610    /// Intentionally avoids bounding `OnCancel` so the `Send` bound doesn't need it. This is
611    /// used to work around <https://github.com/rust-lang/rust/issues/96865>.
612    struct WaitInner<'wait_list, L: Lock, I, O, OnCancel> {
613        // The list this future is a part of.
614        wait_list: &'wait_list WaitList<L, I, O>,
615
616        // The actual aliased node in the `WaitList`'s linked list.
617        #[pin]
618        waiter: Aliasable<UnsafeCell<Waiter<I, O>>>,
619
620        // The callback to be called when the future has been woken but it is cancelled before
621        // it could return `Ready`.
622        on_cancel: OnCancel,
623    }
624}
625
626unsafe impl<'wait_list, L: Lock, I, O, OnCancel> Send for WaitInner<'wait_list, L, I, O, OnCancel>
627where
628    // We hold and expose a shared reference to the `WaitList`.
629    WaitList<L, I, O>: Sync,
630    // - `OnCancel` is required to be `Send` because we always own an instance of it.
631    // - `OnCancel` is not required to be `Sync` because we don't deal in `&OnCancel`.
632    OnCancel: Send,
633{
634}
635
636unsafe impl<'wait_list, L: Lock, I, O, OnCancel> Sync for WaitInner<'wait_list, L, I, O, OnCancel>
637where
638    // We hold and expose a shared reference to the `WaitList`.
639    WaitList<L, I, O>: Sync,
640    // This type supports accessing `&O` from a shared reference to `self`.
641    O: Sync,
642    // `OnCancel` is not required to be `Sync` because we don't deal in `&OnCancel`.
643{
644}
645
646impl<'wait_list, L: Lock, I, O, OnCancel> Wait<'wait_list, L, I, O, OnCancel>
647where
648    OnCancel: CancelCallback<'wait_list, L, I, O>,
649{
650    /// Manual pin-projection because I need `Drop` and don't want to bring in the full
651    /// `pin-project`.
652    fn project(self: Pin<&mut Self>) -> Pin<&mut Option<WaitInner<'wait_list, L, I, O, OnCancel>>> {
653        let this = unsafe { Pin::into_inner_unchecked(self) };
654        unsafe { Pin::new_unchecked(&mut this.inner) }
655    }
656}
657
658impl<'wait_list, L: Lock, I, O, OnCancel> Wait<'wait_list, L, I, O, OnCancel>
659where
660    OnCancel: CancelCallback<'wait_list, L, I, O>,
661{
662    /// Create a new `Wait` future.
663    ///
664    /// The returned future will be in its "completed" state, so attempting to `.await` it will
665    /// panic unless [`init`] is called.
666    ///
667    /// [`init`]: Self::init
668    pub fn new() -> Self {
669        Self { inner: None }
670    }
671
672    /// Check whether this future is in its completed state or not.
673    #[must_use]
674    pub fn is_completed(&self) -> bool {
675        self.inner.is_none()
676    }
677
678    /// Initialize the future, moving it from a completed to waiting state.
679    ///
680    /// This function is mostly only useful inside a `poll` function (when you have a `cx`
681    /// variable to hand). After calling this, you should return [`Poll::Pending`] as the given
682    /// waker has been successfully registered in the wait list.
683    ///
684    /// A callback must be supplied to call in the event that the future has been woken but was
685    /// cancelled before it could complete. You will often want to re-call
686    /// [`LockedExclusive::wake_one`] in this case to pass on the notification to someone else.
687    ///
688    /// # Panics
689    ///
690    /// Panics if called on a non-completed future.
691    pub fn init(
692        self: Pin<&mut Self>,
693        waker: task::Waker,
694        guard: &mut LockedExclusive<'wait_list, L, I, O>,
695        input: I,
696        on_cancel: OnCancel,
697    ) {
698        assert!(
699            self.as_ref().is_completed(),
700            "called `Wait::init` on an incomplete future"
701        );
702
703        let mut inner = self.project();
704
705        // Construct and set the new `Waiting` state
706        let waiter = Aliasable::new(UnsafeCell::new(Waiter {
707            next: None,
708            prev: None,
709            state: WaiterState::Waiting { input, waker },
710        }));
711        inner.set(Some(WaitInner {
712            wait_list: guard.wait_list,
713            waiter,
714            on_cancel,
715        }));
716
717        // Take a reference to the waiter and enqueue it in the linked list.
718        let inner = inner.as_ref().as_pin_ref().unwrap();
719        let waiter = inner.project_ref().waiter.get();
720        unsafe { guard.inner_mut().enqueue(waiter) };
721    }
722
723    /// The same as [`init`] but not requiring a [`task::Waker`], instead substituting in a
724    /// temporary no-op waker.
725    ///
726    /// Using this API is always less efficient than writing a `poll` function manually that calls
727    /// [`init`], but it can be useful if you (a) need `Send` futures but have `!Send` mutex guards
728    /// and (b) want to stay in an `async` context.
729    ///
730    /// [`init`]: Self::init
731    pub fn init_without_waker(
732        self: Pin<&mut Self>,
733        guard: &mut LockedExclusive<'wait_list, L, I, O>,
734        input: I,
735        on_cancel: OnCancel,
736    ) {
737        self.init(noop_waker(), guard, input, on_cancel);
738    }
739
740    fn inner(&self) -> Pin<&WaitInner<'wait_list, L, I, O, OnCancel>> {
741        let inner = self.inner.as_ref().expect("`Wait` is in completed state");
742        // SAFETY: In order for this state to be set, we must already be pinned.
743        unsafe { Pin::new_unchecked(inner) }
744    }
745
746    /// Retrieve a shared reference to the [`WaitList`] this [`Wait`] is currently associated with.
747    ///
748    /// # Panics
749    ///
750    /// Panics if the wait is currently in the "completed" state.
751    #[must_use]
752    pub fn wait_list(&self) -> &'wait_list WaitList<L, I, O> {
753        self.inner().wait_list
754    }
755}
756
757impl<'wait_list, L: Lock, I, O, OnCancel> Future for Wait<'wait_list, L, I, O, OnCancel>
758where
759    OnCancel: CancelCallback<'wait_list, L, I, O>,
760{
761    type Output = (LockedExclusive<'wait_list, L, I, O>, O);
762
763    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
764        let inner = self.inner().project_ref();
765
766        let lock = inner.wait_list.lock_exclusive();
767
768        let waiter = inner.waiter.get();
769
770        // SAFETY: We hold the exclusive lock to the linked list.
771        let waiter = unsafe { &mut *waiter.get() };
772
773        // Check whether we've been woken or not
774        match &mut waiter.state {
775            // Still waiting, just refresh our waker and pend
776            WaiterState::Waiting { waker, .. } => {
777                // If necessary, update the waker to the new one.
778                if !waker.will_wake(cx.waker()) {
779                    *waker = cx.waker().clone();
780                }
781                Poll::Pending
782            }
783            // We have been woken! Take the output, set ourselves to the Done state and
784            // report that we are ready. Dequeuing has already been managed by the waker.
785            WaiterState::Woken { .. } => {
786                // SAFETY: We are no longer queued in the list, so we don't need to be `!Unpin`
787                // anymore.
788                let inner = unsafe { Pin::into_inner_unchecked(self.project()) };
789                let old_inner = inner.take().unwrap();
790                let output = match old_inner.waiter.into_inner().into_inner().state {
791                    WaiterState::Woken { output } => output,
792                    WaiterState::Waiting { .. } => unreachable!(),
793                };
794                Poll::Ready((lock, output))
795            }
796        }
797    }
798}
799
800impl<'wait_list, L: Lock, I, O, OnCancel> Drop for Wait<'wait_list, L, I, O, OnCancel>
801where
802    OnCancel: CancelCallback<'wait_list, L, I, O>,
803{
804    fn drop(&mut self) {
805        // This is necessary for soundness since we were pinned before in our `poll` function
806        let this = unsafe { Pin::new_unchecked(self) };
807
808        // No need to do anything if we're already completed or haven't been started.
809        if this.is_completed() {
810            return;
811        }
812        let inner = this.inner().project_ref();
813
814        // Set up a guard that panics on drop, in order to cause an abort should
815        // `lock_exclusive` panic. This is necessary because we absolutely must remove the
816        // waiter from the linked list before returning here otherwise we can cause
817        // use-after-frees.
818        let abort_on_panic = PanicOnDrop;
819
820        let mut list = inner.wait_list.lock_exclusive();
821
822        let waiter = inner.waiter.as_ref().get();
823
824        // If we were still waiting before we were cancelled, remove ourselves from the list.
825        // SAFETY: We hold an exclusive lock to the linked list.
826        if let WaiterState::Waiting { .. } = unsafe { &(*waiter.get()).state } {
827            unsafe { list.inner_mut().dequeue(waiter) };
828        }
829
830        // Disarm the guard, we no longer need to abort on a panic.
831        mem::forget(abort_on_panic);
832
833        // Call the `on_cancel` callback if necessary.
834        // SAFETY: We are no longer queued in the list, so we don't need to be `!Unpin` anymore.
835        let inner = unsafe { Pin::into_inner_unchecked(this.project()) };
836        let old_inner = inner.take().unwrap();
837        let waiter = old_inner.waiter.into_inner().into_inner();
838        if let WaiterState::Woken { output } = waiter.state {
839            old_inner.on_cancel.on_cancel(list, output);
840        }
841    }
842}
843
844impl<'wait_list, L: Lock, I, O, OnCancel> Default for Wait<'wait_list, L, I, O, OnCancel>
845where
846    OnCancel: CancelCallback<'wait_list, L, I, O>,
847{
848    fn default() -> Self {
849        Self::new()
850    }
851}
852
853impl<'wait_list, L: Lock, I: Debug, O, OnCancel> Debug for Wait<'wait_list, L, I, O, OnCancel>
854where
855    OnCancel: CancelCallback<'wait_list, L, I, O>,
856    L: Debug,
857    <L as lock::Lifetime<'wait_list>>::ExclusiveGuard: Debug,
858{
859    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
860        match &self.inner {
861            Some(inner) => f
862                .debug_struct("Wait::Waiting")
863                .field("wait_list", inner.wait_list)
864                .finish(),
865            None => f.pad("Wait::Done"),
866        }
867    }
868}
869
870pin_project! {
871    /// A future that both initializes and waits on a [`WaitList`], created by
872    /// [`LockedExclusive::init_and_wait`].
873    #[must_use = "futures do nothing unless you `.await` or poll them"]
874    pub struct InitAndWait<'wait_list, L: Lock, I, O, OnCancel>
875    where
876        OnCancel: CancelCallback<'wait_list, L, I, O>,
877    {
878        input: Option<InitAndWaitInput<'wait_list, L, I, O, OnCancel>>,
879        #[pin]
880        inner: Wait<'wait_list, L, I, O, OnCancel>,
881    }
882}
883
884impl<'wait_list, L: Lock, I, O, OnCancel> Future for InitAndWait<'wait_list, L, I, O, OnCancel>
885where
886    OnCancel: CancelCallback<'wait_list, L, I, O>,
887{
888    type Output = (LockedExclusive<'wait_list, L, I, O>, O);
889    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
890        let this = self.project();
891
892        if let Some(InitAndWaitInput {
893            mut lock,
894            input,
895            on_cancel,
896        }) = this.input.take()
897        {
898            this.inner
899                .init(cx.waker().clone(), &mut lock, input, on_cancel);
900            Poll::Pending
901        } else {
902            this.inner.poll(cx)
903        }
904    }
905}
906
907impl<'wait_list, L: Lock, I, O, OnCancel> Debug for InitAndWait<'wait_list, L, I, O, OnCancel>
908where
909    OnCancel: CancelCallback<'wait_list, L, I, O>,
910    <L as lock::Lifetime<'wait_list>>::ExclusiveGuard: Debug,
911    I: Debug,
912    L: Debug,
913{
914    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
915        if let Some(input) = &self.input {
916            f.debug_struct("InitAndWait::Initial")
917                .field("lock", &input.lock)
918                .field("input", &input.input)
919                .finish()
920        } else {
921            f.debug_struct("InitAndWait::Waiting")
922                .field("inner", &self.inner)
923                .finish()
924        }
925    }
926}
927
928struct InitAndWaitInput<'wait_list, L: Lock, I, O, OnCancel> {
929    lock: LockedExclusive<'wait_list, L, I, O>,
930    input: I,
931    on_cancel: OnCancel,
932}
933
934/// A callback that is called in the event that the future has been woken but was cancelled before
935/// it could complete.
936///
937/// This trait is implemented for all functions and closures that accept a
938/// `LockedExclusive<'wait_list, L, I, O>` and an `O`, but is also available as a separate trait so
939/// you can implement it on concrete types.
940pub trait CancelCallback<'wait_list, L: Lock, I, O>: Sized {
941    /// Called when the future has been woken but was cancelled before it could complete.
942    ///
943    /// It is given an exclusive lock to the associated [`WaitList`] as well as the output value
944    /// that was not yielded by the future.
945    fn on_cancel(self, list: LockedExclusive<'wait_list, L, I, O>, output: O);
946}
947
948impl<'wait_list, L: Lock, I, O, F> CancelCallback<'wait_list, L, I, O> for F
949where
950    L: 'wait_list,
951    I: 'wait_list,
952    O: 'wait_list,
953    F: FnOnce(LockedExclusive<'wait_list, L, I, O>, O),
954{
955    fn on_cancel(self, list: LockedExclusive<'wait_list, L, I, O>, output: O) {
956        self(list, output);
957    }
958}
959
960struct PanicOnDrop;
961impl Drop for PanicOnDrop {
962    fn drop(&mut self) {
963        panic!();
964    }
965}
966
967const fn noop_waker() -> task::Waker {
968    const VTABLE: task::RawWakerVTable = task::RawWakerVTable::new(
969        // clone
970        |_| RAW,
971        // wake
972        |_| {},
973        // wake_by_ref
974        |_| {},
975        // drop
976        |_| {},
977    );
978    const RAW: task::RawWaker = task::RawWaker::new(ptr::null(), &VTABLE);
979
980    // SAFETY: `Waker` is `#[repr(transparent)]` over `RawWaker`
981    unsafe { mem::transmute::<task::RawWaker, task::Waker>(RAW) }
982}
983
984#[cfg(all(test, feature = "std"))]
985mod tests {
986    use super::WaitList;
987    use crate::lock;
988    use crate::lock::Lock;
989    use alloc::boxed::Box;
990    use core::future::Future;
991    use core::task;
992    use core::task::Poll;
993
994    // Never type, but it's actually latin letter retroflex click
995    #[derive(Debug, PartialEq)]
996    enum ǃ {}
997
998    #[test]
999    fn wake_empty() {
1000        let list = <WaitList<lock::Local<()>, ǃ, Box<u32>>>::default();
1001        assert_eq!(*list.lock_exclusive().wake_one(Box::new(1)).unwrap_err(), 1);
1002        assert_eq!(*list.lock_exclusive().wake_one(Box::new(2)).unwrap_err(), 2);
1003        assert_eq!(list.lock_exclusive().head_input(), None);
1004        assert_eq!(list.lock_exclusive().head_input_mut(), None);
1005        assert!(list.lock_shared().is_empty());
1006    }
1007
1008    #[test]
1009    fn cancel() {
1010        let cx = &mut noop_cx();
1011
1012        let list = <WaitList<lock::Local<()>, Box<u32>, ǃ>>::default();
1013        let mut future = Box::pin(list.lock_exclusive().init_and_wait(Box::new(5), no_cancel));
1014        for _ in 0..10 {
1015            assert!(future.as_mut().poll(cx).is_pending());
1016        }
1017        assert_eq!(**list.lock_exclusive().head_input().unwrap(), 5);
1018        assert!(!list.lock_shared().is_empty());
1019        drop(future);
1020        assert_eq!(list.lock_exclusive().head_input(), None);
1021        assert!(list.lock_shared().is_empty());
1022    }
1023
1024    #[test]
1025    fn wake_single() {
1026        let cx = &mut noop_cx();
1027
1028        let list = <WaitList<lock::Local<()>, Box<u8>, Box<u32>>>::default();
1029
1030        let mut future = Box::pin(list.lock_exclusive().init_and_wait(Box::new(5), no_cancel));
1031        assert!(future.as_mut().poll(cx).is_pending());
1032
1033        assert_eq!(*list.lock_exclusive().wake_one(Box::new(6)).unwrap(), 5);
1034        assert_eq!(
1035            future.as_mut().poll(cx).map(|(_, output)| output),
1036            Poll::Ready(Box::new(6))
1037        );
1038        assert!(list.lock_shared().is_empty());
1039    }
1040
1041    #[test]
1042    fn wake_multiple() {
1043        let cx = &mut noop_cx();
1044
1045        let list = <WaitList<lock::Local<()>, Box<u8>, Box<u32>>>::default();
1046
1047        let mut f1 = Box::pin(list.lock_exclusive().init_and_wait(Box::new(1), no_cancel));
1048        assert!(f1.as_mut().poll(cx).is_pending());
1049
1050        let mut f2 = Box::pin(list.lock_exclusive().init_and_wait(Box::new(2), no_cancel));
1051        assert!(f2.as_mut().poll(cx).is_pending());
1052
1053        assert_eq!(*list.lock_exclusive().wake_one(Box::new(11)).unwrap(), 1);
1054
1055        let mut f3_out = None;
1056        let mut f3 = Box::pin(
1057            list.lock_exclusive()
1058                .init_and_wait(Box::new(3), |_, out| f3_out = Some(out)),
1059        );
1060        assert!(f3.as_mut().poll(cx).is_pending());
1061
1062        assert_eq!(*list.lock_exclusive().wake_one(Box::new(12)).unwrap(), 2);
1063        assert_eq!(*list.lock_exclusive().wake_one(Box::new(13)).unwrap(), 3);
1064        assert_eq!(*list.lock_exclusive().wake_one(Box::new(9)).unwrap_err(), 9);
1065
1066        assert_eq!(
1067            f2.as_mut().poll(cx).map(|(_, output)| output),
1068            Poll::Ready(Box::new(12))
1069        );
1070        assert_eq!(
1071            f1.as_mut().poll(cx).map(|(_, output)| output),
1072            Poll::Ready(Box::new(11))
1073        );
1074        drop(f3);
1075        assert_eq!(f3_out, Some(Box::new(13)));
1076    }
1077
1078    #[test]
1079    fn drop_in_middle() {
1080        let cx = &mut noop_cx();
1081
1082        let list = <WaitList<lock::Local<()>, Box<u32>, ǃ>>::default();
1083
1084        let mut f1 = Box::pin(list.lock_exclusive().init_and_wait(Box::new(1), no_cancel));
1085        assert!(f1.as_mut().poll(cx).is_pending());
1086
1087        let mut f2 = Box::pin(list.lock_exclusive().init_and_wait(Box::new(2), no_cancel));
1088        assert!(f2.as_mut().poll(cx).is_pending());
1089
1090        let mut f3 = Box::pin(list.lock_exclusive().init_and_wait(Box::new(3), no_cancel));
1091        assert!(f3.as_mut().poll(cx).is_pending());
1092
1093        drop(f2);
1094        drop(f3);
1095        drop(f1);
1096
1097        assert!(list.lock_shared().is_empty());
1098    }
1099
1100    #[test]
1101    fn cancellation_waking_chain() {
1102        let cx = &mut noop_cx();
1103
1104        let list = <WaitList<lock::Local<()>, Box<u8>, Box<u32>>>::default();
1105
1106        let mut f1 = Box::pin(list.lock_exclusive().init_and_wait(
1107            Box::new(1),
1108            |list: crate::LockedExclusive<_, Box<u8>, _>, mut output: Box<u32>| {
1109                *output += 1;
1110                assert_eq!(*list.wake_one(output).unwrap(), 2);
1111            },
1112        ));
1113        assert!(f1.as_mut().poll(cx).is_pending());
1114
1115        let mut f2 = Box::pin(list.lock_exclusive().init_and_wait(
1116            Box::new(2),
1117            |list: crate::LockedExclusive<_, Box<u8>, _>, mut output: Box<u32>| {
1118                *output += 1;
1119                assert_eq!(*list.wake_one(output).unwrap(), 3);
1120            },
1121        ));
1122        assert!(f2.as_mut().poll(cx).is_pending());
1123
1124        let mut final_output = None;
1125
1126        let mut f3 = Box::pin(list.lock_exclusive().init_and_wait(
1127            Box::new(3),
1128            |list: crate::LockedExclusive<_, Box<u8>, _>, output| {
1129                assert!(list.is_empty());
1130                final_output = Some(output);
1131            },
1132        ));
1133        assert!(f3.as_mut().poll(cx).is_pending());
1134
1135        assert_eq!(*list.lock_exclusive().wake_one(Box::new(12)).unwrap(), 1);
1136
1137        drop(f1);
1138        drop(f2);
1139        drop(f3);
1140
1141        assert_eq!(final_output, Some(Box::new(14)));
1142    }
1143
1144    fn no_cancel<L: Lock, I, O>(_: crate::LockedExclusive<'_, L, I, O>, _: O) {
1145        panic!("did not expect cancellation")
1146    }
1147
1148    fn noop_cx() -> task::Context<'static> {
1149        static WAKER: task::Waker = crate::noop_waker();
1150        task::Context::from_waker(&WAKER)
1151    }
1152}
1153
1154#[cfg(test)]
1155mod test_util {
1156    pub(crate) trait AssertSend {
1157        fn assert_send(&self) {}
1158    }
1159    impl<T: ?Sized + Send> AssertSend for T {}
1160
1161    pub(crate) trait AssertNotSend<A> {
1162        fn assert_not_send(&self) {}
1163    }
1164    impl<T: ?Sized> AssertNotSend<()> for T {}
1165    impl<T: ?Sized + Send> AssertNotSend<u8> for T {}
1166}