waker_set/
lib.rs

1//! A common utility for building synchronization primitives.
2//!
3//! When an async operation is blocked, it needs to register itself somewhere so that it can be
4//! notified later on. The `WakerSet` type helps with keeping track of such async operations and
5//! notifying them when they may make progress.
6
7use std::cell::UnsafeCell;
8use std::ops::{Deref, DerefMut};
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::task::{Context, Waker};
11
12use crossbeam_utils::Backoff;
13use slab::Slab;
14
15/// Set when the entry list is locked.
16#[allow(clippy::identity_op)]
17const LOCKED: usize = 1 << 0;
18
19/// Set when there is at least one entry that has already been notified.
20const NOTIFIED: usize = 1 << 1;
21
22/// Set when there is at least one notifiable entry.
23const NOTIFIABLE: usize = 1 << 2;
24
25/// Inner representation of `WakerSet`.
26struct Inner {
27    /// A list of entries in the set.
28    ///
29    /// Each entry has an optional waker associated with the task that is executing the operation.
30    /// If the waker is set to `None`, that means the task has been woken up but hasn't removed
31    /// itself from the `WakerSet` yet.
32    ///
33    /// The key of each entry is its index in the `Slab`.
34    entries: Slab<Option<Waker>>,
35
36    /// The number of notifiable entries.
37    notifiable: usize,
38}
39
40/// A set holding wakers.
41pub struct WakerSet {
42    /// Holds three bits: `LOCKED`, `NOTIFY_ONE`, and `NOTIFY_ALL`.
43    flag: AtomicUsize,
44
45    /// A set holding wakers.
46    inner: UnsafeCell<Inner>,
47}
48
49// safety(jbr): WakerSet implements its own atomic locking mechanism
50// around the inner UnsafeCell, so I believe the public interface is
51// Send + Sync
52unsafe impl Send for WakerSet {}
53unsafe impl Sync for WakerSet {}
54
55impl WakerSet {
56    /// Creates a new `WakerSet`.
57    #[inline]
58    pub fn new() -> WakerSet {
59        WakerSet {
60            flag: AtomicUsize::new(0),
61            inner: UnsafeCell::new(Inner {
62                entries: Slab::new(),
63                notifiable: 0,
64            }),
65        }
66    }
67
68    /// Inserts a waker for a blocked operation and returns a key associated with it.
69    #[cold]
70    pub fn insert(&self, cx: &Context<'_>) -> usize {
71        let w = cx.waker().clone();
72        let mut inner = self.lock();
73
74        let key = inner.entries.insert(Some(w));
75        inner.notifiable += 1;
76        key
77    }
78
79    /// If the waker for this key is still waiting for a notification, then update
80    /// the waker for the entry, and return false. If the waker has been notified,
81    /// treat the entry as completed and return true.
82    #[cfg(feature = "unstable")]
83    pub fn remove_if_notified(&self, key: usize, cx: &Context<'_>) -> bool {
84        let mut inner = self.lock();
85
86        match &mut inner.entries[key] {
87            None => {
88                inner.entries.remove(key);
89                true
90            }
91            Some(w) => {
92                // We were never woken, so update instead
93                if !w.will_wake(cx.waker()) {
94                    *w = cx.waker().clone();
95                }
96                false
97            }
98        }
99    }
100
101    /// Removes the waker of a cancelled operation.
102    ///
103    /// Returns `true` if another blocked operation from the set was notified.
104    #[cold]
105    pub fn cancel(&self, key: usize) -> bool {
106        let mut inner = self.lock();
107
108        match inner.entries.remove(key) {
109            Some(_) => inner.notifiable -= 1,
110            None => {
111                // The operation was cancelled and notified so notify another operation instead.
112                for (_, opt_waker) in inner.entries.iter_mut() {
113                    // If there is no waker in this entry, that means it was already woken.
114                    if let Some(w) = opt_waker.take() {
115                        w.wake();
116                        inner.notifiable -= 1;
117                        return true;
118                    }
119                }
120            }
121        }
122
123        false
124    }
125
126    /// Notifies one additional blocked operation.
127    ///
128    /// Returns `true` if an operation was notified.
129    #[inline]
130    #[cfg(feature = "unstable")]
131    pub fn notify_one(&self) -> bool {
132        // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
133        if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
134            self.notify(Notify::One)
135        } else {
136            false
137        }
138    }
139
140    /// Notifies all blocked operations.
141    ///
142    /// Returns `true` if at least one operation was notified.
143    #[inline]
144    pub fn notify_all(&self) -> bool {
145        // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
146        if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
147            self.notify(Notify::All)
148        } else {
149            false
150        }
151    }
152
153    /// Notifies blocked operations, either one or all of them.
154    ///
155    /// Returns `true` if at least one operation was notified.
156    #[cold]
157    fn notify(&self, n: Notify) -> bool {
158        let mut inner = &mut *self.lock();
159        let mut notified = false;
160
161        for (_, opt_waker) in inner.entries.iter_mut() {
162            // If there is no waker in this entry, that means it was already woken.
163            if let Some(w) = opt_waker.take() {
164                w.wake();
165                inner.notifiable -= 1;
166                notified = true;
167
168                if n == Notify::One {
169                    break;
170                }
171            }
172
173            if n == Notify::Any {
174                break;
175            }
176        }
177
178        notified
179    }
180
181    /// Locks the list of entries.
182    fn lock(&self) -> Lock<'_> {
183        let backoff = Backoff::new();
184        while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 {
185            backoff.snooze();
186        }
187        Lock { waker_set: self }
188    }
189}
190
191/// A guard holding a `WakerSet` locked.
192struct Lock<'a> {
193    waker_set: &'a WakerSet,
194}
195
196impl Drop for Lock<'_> {
197    #[inline]
198    fn drop(&mut self) {
199        let mut flag = 0;
200
201        // Set the `NOTIFIED` flag if there is at least one notified entry.
202        if self.entries.len() - self.notifiable > 0 {
203            flag |= NOTIFIED;
204        }
205
206        // Set the `NOTIFIABLE` flag if there is at least one notifiable entry.
207        if self.notifiable > 0 {
208            flag |= NOTIFIABLE;
209        }
210
211        // Use `SeqCst` ordering to synchronize with `WakerSet::lock_to_notify()`.
212        self.waker_set.flag.store(flag, Ordering::SeqCst);
213    }
214}
215
216impl Deref for Lock<'_> {
217    type Target = Inner;
218
219    #[inline]
220    fn deref(&self) -> &Inner {
221        unsafe { &*self.waker_set.inner.get() }
222    }
223}
224
225impl DerefMut for Lock<'_> {
226    #[inline]
227    fn deref_mut(&mut self) -> &mut Inner {
228        unsafe { &mut *self.waker_set.inner.get() }
229    }
230}
231
232/// Notification strategy.
233#[derive(Clone, Copy, Eq, PartialEq)]
234enum Notify {
235    /// Make sure at least one entry is notified.
236    Any,
237    /// Notify one additional entry.
238    One,
239    /// Notify all entries.
240    All,
241}