async_std/sync/
waker_set.rs

1//! A common utility for building synchronization primitives.
2//!
3//! When an async operation is blocked, it needs to register itself somewhere so that it can be
4//! notified later on. The `WakerSet` type helps with keeping track of such async operations and
5//! notifying them when they may make progress.
6
7use std::cell::UnsafeCell;
8use std::ops::{Deref, DerefMut};
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::task::{Context, Waker};
11
12use crossbeam_utils::Backoff;
13use slab::Slab;
14
15/// Set when the entry list is locked.
16#[allow(clippy::identity_op)]
17const LOCKED: usize = 1 << 0;
18
19/// Set when there is at least one entry that has already been notified.
20const NOTIFIED: usize = 1 << 1;
21
22/// Set when there is at least one notifiable entry.
23const NOTIFIABLE: usize = 1 << 2;
24
25/// Inner representation of `WakerSet`.
26struct Inner {
27    /// A list of entries in the set.
28    ///
29    /// Each entry has an optional waker associated with the task that is executing the operation.
30    /// If the waker is set to `None`, that means the task has been woken up but hasn't removed
31    /// itself from the `WakerSet` yet.
32    ///
33    /// The key of each entry is its index in the `Slab`.
34    entries: Slab<Option<Waker>>,
35
36    /// The number of notifiable entries.
37    notifiable: usize,
38}
39
40/// A set holding wakers.
41pub struct WakerSet {
42    /// Holds three bits: `LOCKED`, `NOTIFY_ONE`, and `NOTIFY_ALL`.
43    flag: AtomicUsize,
44
45    /// A set holding wakers.
46    inner: UnsafeCell<Inner>,
47}
48
49impl WakerSet {
50    /// Creates a new `WakerSet`.
51    #[inline]
52    pub fn new() -> WakerSet {
53        WakerSet {
54            flag: AtomicUsize::new(0),
55            inner: UnsafeCell::new(Inner {
56                entries: Slab::new(),
57                notifiable: 0,
58            }),
59        }
60    }
61
62    /// Inserts a waker for a blocked operation and returns a key associated with it.
63    #[cold]
64    pub fn insert(&self, cx: &Context<'_>) -> usize {
65        let w = cx.waker().clone();
66        let mut inner = self.lock();
67
68        let key = inner.entries.insert(Some(w));
69        inner.notifiable += 1;
70        key
71    }
72
73    /// Removes the waker of an operation.
74    #[cold]
75    pub fn remove(&self, key: usize) {
76        let mut inner = self.lock();
77
78        if inner.entries.remove(key).is_some() {
79            inner.notifiable -= 1;
80        }
81    }
82
83    /// Removes the waker of a cancelled operation.
84    ///
85    /// Returns `true` if another blocked operation from the set was notified.
86    #[cold]
87    pub fn cancel(&self, key: usize) -> bool {
88        let mut inner = self.lock();
89
90        match inner.entries.remove(key) {
91            Some(_) => inner.notifiable -= 1,
92            None => {
93                // The operation was cancelled and notified so notify another operation instead.
94                for (_, opt_waker) in inner.entries.iter_mut() {
95                    // If there is no waker in this entry, that means it was already woken.
96                    if let Some(w) = opt_waker.take() {
97                        w.wake();
98                        inner.notifiable -= 1;
99                        return true;
100                    }
101                }
102            }
103        }
104
105        false
106    }
107
108    /// Notifies a blocked operation if none have been notified already.
109    ///
110    /// Returns `true` if an operation was notified.
111    #[inline]
112    pub fn notify_any(&self) -> bool {
113        // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
114        let flag = self.flag.load(Ordering::SeqCst);
115
116        if flag & NOTIFIED == 0 && flag & NOTIFIABLE != 0 {
117            self.notify(Notify::Any)
118        } else {
119            false
120        }
121    }
122
123    /// Notifies one additional blocked operation.
124    ///
125    /// Returns `true` if an operation was notified.
126    #[inline]
127    #[cfg(feature = "unstable")]
128    pub fn notify_one(&self) -> bool {
129        // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
130        if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
131            self.notify(Notify::One)
132        } else {
133            false
134        }
135    }
136
137    /// Notifies all blocked operations.
138    ///
139    /// Returns `true` if at least one operation was notified.
140    #[inline]
141    pub fn notify_all(&self) -> bool {
142        // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
143        if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
144            self.notify(Notify::All)
145        } else {
146            false
147        }
148    }
149
150    /// Notifies blocked operations, either one or all of them.
151    ///
152    /// Returns `true` if at least one operation was notified.
153    #[cold]
154    fn notify(&self, n: Notify) -> bool {
155        let mut inner = &mut *self.lock();
156        let mut notified = false;
157
158        for (_, opt_waker) in inner.entries.iter_mut() {
159            // If there is no waker in this entry, that means it was already woken.
160            if let Some(w) = opt_waker.take() {
161                w.wake();
162                inner.notifiable -= 1;
163                notified = true;
164
165                if n == Notify::One {
166                    break;
167                }
168            }
169
170            if n == Notify::Any {
171                break;
172            }
173        }
174
175        notified
176    }
177
178    /// Locks the list of entries.
179    fn lock(&self) -> Lock<'_> {
180        let backoff = Backoff::new();
181        while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 {
182            backoff.snooze();
183        }
184        Lock { waker_set: self }
185    }
186}
187
188/// A guard holding a `WakerSet` locked.
189struct Lock<'a> {
190    waker_set: &'a WakerSet,
191}
192
193impl Drop for Lock<'_> {
194    #[inline]
195    fn drop(&mut self) {
196        let mut flag = 0;
197
198        // Set the `NOTIFIED` flag if there is at least one notified entry.
199        if self.entries.len() - self.notifiable > 0 {
200            flag |= NOTIFIED;
201        }
202
203        // Set the `NOTIFIABLE` flag if there is at least one notifiable entry.
204        if self.notifiable > 0 {
205            flag |= NOTIFIABLE;
206        }
207
208        // Use `SeqCst` ordering to synchronize with `WakerSet::lock_to_notify()`.
209        self.waker_set.flag.store(flag, Ordering::SeqCst);
210    }
211}
212
213impl Deref for Lock<'_> {
214    type Target = Inner;
215
216    #[inline]
217    fn deref(&self) -> &Inner {
218        unsafe { &*self.waker_set.inner.get() }
219    }
220}
221
222impl DerefMut for Lock<'_> {
223    #[inline]
224    fn deref_mut(&mut self) -> &mut Inner {
225        unsafe { &mut *self.waker_set.inner.get() }
226    }
227}
228
229/// Notification strategy.
230#[derive(Clone, Copy, Eq, PartialEq)]
231enum Notify {
232    /// Make sure at least one entry is notified.
233    Any,
234    /// Notify one additional entry.
235    One,
236    /// Notify all entries.
237    All,
238}