async_std/sync/waker_set.rs
1//! A common utility for building synchronization primitives.
2//!
3//! When an async operation is blocked, it needs to register itself somewhere so that it can be
4//! notified later on. The `WakerSet` type helps with keeping track of such async operations and
5//! notifying them when they may make progress.
6
7use std::cell::UnsafeCell;
8use std::ops::{Deref, DerefMut};
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::task::{Context, Waker};
11
12use crossbeam_utils::Backoff;
13use slab::Slab;
14
15/// Set when the entry list is locked.
16#[allow(clippy::identity_op)]
17const LOCKED: usize = 1 << 0;
18
19/// Set when there is at least one entry that has already been notified.
20const NOTIFIED: usize = 1 << 1;
21
22/// Set when there is at least one notifiable entry.
23const NOTIFIABLE: usize = 1 << 2;
24
25/// Inner representation of `WakerSet`.
26struct Inner {
27    /// A list of entries in the set.
28    ///
29    /// Each entry has an optional waker associated with the task that is executing the operation.
30    /// If the waker is set to `None`, that means the task has been woken up but hasn't removed
31    /// itself from the `WakerSet` yet.
32    ///
33    /// The key of each entry is its index in the `Slab`.
34    entries: Slab<Option<Waker>>,
35
36    /// The number of notifiable entries.
37    notifiable: usize,
38}
39
40/// A set holding wakers.
41pub struct WakerSet {
42    /// Holds three bits: `LOCKED`, `NOTIFY_ONE`, and `NOTIFY_ALL`.
43    flag: AtomicUsize,
44
45    /// A set holding wakers.
46    inner: UnsafeCell<Inner>,
47}
48
49impl WakerSet {
50    /// Creates a new `WakerSet`.
51    #[inline]
52    pub fn new() -> WakerSet {
53        WakerSet {
54            flag: AtomicUsize::new(0),
55            inner: UnsafeCell::new(Inner {
56                entries: Slab::new(),
57                notifiable: 0,
58            }),
59        }
60    }
61
62    /// Inserts a waker for a blocked operation and returns a key associated with it.
63    #[cold]
64    pub fn insert(&self, cx: &Context<'_>) -> usize {
65        let w = cx.waker().clone();
66        let mut inner = self.lock();
67
68        let key = inner.entries.insert(Some(w));
69        inner.notifiable += 1;
70        key
71    }
72
73    /// If the waker for this key is still waiting for a notification, then update
74    /// the waker for the entry, and return false. If the waker has been notified,
75    /// treat the entry as completed and return true.
76    #[cfg(feature = "unstable")]
77    pub fn remove_if_notified(&self, key: usize, cx: &Context<'_>) -> bool {
78        let mut inner = self.lock();
79
80        match &mut inner.entries[key] {
81            None => {
82                inner.entries.remove(key);
83                true
84            }
85            Some(w) => {
86                // We were never woken, so update instead
87                if !w.will_wake(cx.waker()) {
88                    *w = cx.waker().clone();
89                }
90                false
91            }
92        }
93    }
94
95    /// Removes the waker of a cancelled operation.
96    ///
97    /// Returns `true` if another blocked operation from the set was notified.
98    #[cold]
99    pub fn cancel(&self, key: usize) -> bool {
100        let mut inner = self.lock();
101
102        match inner.entries.remove(key) {
103            Some(_) => inner.notifiable -= 1,
104            None => {
105                // The operation was cancelled and notified so notify another operation instead.
106                for (_, opt_waker) in inner.entries.iter_mut() {
107                    // If there is no waker in this entry, that means it was already woken.
108                    if let Some(w) = opt_waker.take() {
109                        w.wake();
110                        inner.notifiable -= 1;
111                        return true;
112                    }
113                }
114            }
115        }
116
117        false
118    }
119
120    /// Notifies one additional blocked operation.
121    ///
122    /// Returns `true` if an operation was notified.
123    #[inline]
124    #[cfg(feature = "unstable")]
125    pub fn notify_one(&self) -> bool {
126        // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
127        if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
128            self.notify(Notify::One)
129        } else {
130            false
131        }
132    }
133
134    /// Notifies all blocked operations.
135    ///
136    /// Returns `true` if at least one operation was notified.
137    #[inline]
138    pub fn notify_all(&self) -> bool {
139        // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
140        if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
141            self.notify(Notify::All)
142        } else {
143            false
144        }
145    }
146
147    /// Notifies blocked operations, either one or all of them.
148    ///
149    /// Returns `true` if at least one operation was notified.
150    #[cold]
151    fn notify(&self, n: Notify) -> bool {
152        let inner = &mut *self.lock();
153        let mut notified = false;
154
155        for (_, opt_waker) in inner.entries.iter_mut() {
156            // If there is no waker in this entry, that means it was already woken.
157            if let Some(w) = opt_waker.take() {
158                w.wake();
159                inner.notifiable -= 1;
160                notified = true;
161
162                if n == Notify::One {
163                    break;
164                }
165            }
166
167            if n == Notify::Any {
168                break;
169            }
170        }
171
172        notified
173    }
174
175    /// Locks the list of entries.
176    fn lock(&self) -> Lock<'_> {
177        let backoff = Backoff::new();
178        while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 {
179            backoff.snooze();
180        }
181        Lock { waker_set: self }
182    }
183}
184
185/// A guard holding a `WakerSet` locked.
186struct Lock<'a> {
187    waker_set: &'a WakerSet,
188}
189
190impl Drop for Lock<'_> {
191    #[inline]
192    fn drop(&mut self) {
193        let mut flag = 0;
194
195        // Set the `NOTIFIED` flag if there is at least one notified entry.
196        if self.entries.len() - self.notifiable > 0 {
197            flag |= NOTIFIED;
198        }
199
200        // Set the `NOTIFIABLE` flag if there is at least one notifiable entry.
201        if self.notifiable > 0 {
202            flag |= NOTIFIABLE;
203        }
204
205        // Use `SeqCst` ordering to synchronize with `WakerSet::lock_to_notify()`.
206        self.waker_set.flag.store(flag, Ordering::SeqCst);
207    }
208}
209
210impl Deref for Lock<'_> {
211    type Target = Inner;
212
213    #[inline]
214    fn deref(&self) -> &Inner {
215        unsafe { &*self.waker_set.inner.get() }
216    }
217}
218
219impl DerefMut for Lock<'_> {
220    #[inline]
221    fn deref_mut(&mut self) -> &mut Inner {
222        unsafe { &mut *self.waker_set.inner.get() }
223    }
224}
225
226/// Notification strategy.
227#[derive(Clone, Copy, Eq, PartialEq)]
228enum Notify {
229    /// Make sure at least one entry is notified.
230    Any,
231    /// Notify one additional entry.
232    One,
233    /// Notify all entries.
234    All,
235}