waker_set/lib.rs
1//! A common utility for building synchronization primitives.
2//!
3//! When an async operation is blocked, it needs to register itself somewhere so that it can be
4//! notified later on. The `WakerSet` type helps with keeping track of such async operations and
5//! notifying them when they may make progress.
6
7use std::cell::UnsafeCell;
8use std::ops::{Deref, DerefMut};
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::task::{Context, Waker};
11
12use crossbeam_utils::Backoff;
13use slab::Slab;
14
15/// Set when the entry list is locked.
16#[allow(clippy::identity_op)]
17const LOCKED: usize = 1 << 0;
18
19/// Set when there is at least one entry that has already been notified.
20const NOTIFIED: usize = 1 << 1;
21
22/// Set when there is at least one notifiable entry.
23const NOTIFIABLE: usize = 1 << 2;
24
25/// Inner representation of `WakerSet`.
26struct Inner {
27 /// A list of entries in the set.
28 ///
29 /// Each entry has an optional waker associated with the task that is executing the operation.
30 /// If the waker is set to `None`, that means the task has been woken up but hasn't removed
31 /// itself from the `WakerSet` yet.
32 ///
33 /// The key of each entry is its index in the `Slab`.
34 entries: Slab<Option<Waker>>,
35
36 /// The number of notifiable entries.
37 notifiable: usize,
38}
39
40/// A set holding wakers.
41pub struct WakerSet {
42 /// Holds three bits: `LOCKED`, `NOTIFY_ONE`, and `NOTIFY_ALL`.
43 flag: AtomicUsize,
44
45 /// A set holding wakers.
46 inner: UnsafeCell<Inner>,
47}
48
49// safety(jbr): WakerSet implements its own atomic locking mechanism
50// around the inner UnsafeCell, so I believe the public interface is
51// Send + Sync
52unsafe impl Send for WakerSet {}
53unsafe impl Sync for WakerSet {}
54
55impl WakerSet {
56 /// Creates a new `WakerSet`.
57 #[inline]
58 pub fn new() -> WakerSet {
59 WakerSet {
60 flag: AtomicUsize::new(0),
61 inner: UnsafeCell::new(Inner {
62 entries: Slab::new(),
63 notifiable: 0,
64 }),
65 }
66 }
67
68 /// Inserts a waker for a blocked operation and returns a key associated with it.
69 #[cold]
70 pub fn insert(&self, cx: &Context<'_>) -> usize {
71 let w = cx.waker().clone();
72 let mut inner = self.lock();
73
74 let key = inner.entries.insert(Some(w));
75 inner.notifiable += 1;
76 key
77 }
78
79 /// If the waker for this key is still waiting for a notification, then update
80 /// the waker for the entry, and return false. If the waker has been notified,
81 /// treat the entry as completed and return true.
82 #[cfg(feature = "unstable")]
83 pub fn remove_if_notified(&self, key: usize, cx: &Context<'_>) -> bool {
84 let mut inner = self.lock();
85
86 match &mut inner.entries[key] {
87 None => {
88 inner.entries.remove(key);
89 true
90 }
91 Some(w) => {
92 // We were never woken, so update instead
93 if !w.will_wake(cx.waker()) {
94 *w = cx.waker().clone();
95 }
96 false
97 }
98 }
99 }
100
101 /// Removes the waker of a cancelled operation.
102 ///
103 /// Returns `true` if another blocked operation from the set was notified.
104 #[cold]
105 pub fn cancel(&self, key: usize) -> bool {
106 let mut inner = self.lock();
107
108 match inner.entries.remove(key) {
109 Some(_) => inner.notifiable -= 1,
110 None => {
111 // The operation was cancelled and notified so notify another operation instead.
112 for (_, opt_waker) in inner.entries.iter_mut() {
113 // If there is no waker in this entry, that means it was already woken.
114 if let Some(w) = opt_waker.take() {
115 w.wake();
116 inner.notifiable -= 1;
117 return true;
118 }
119 }
120 }
121 }
122
123 false
124 }
125
126 /// Notifies one additional blocked operation.
127 ///
128 /// Returns `true` if an operation was notified.
129 #[inline]
130 #[cfg(feature = "unstable")]
131 pub fn notify_one(&self) -> bool {
132 // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
133 if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
134 self.notify(Notify::One)
135 } else {
136 false
137 }
138 }
139
140 /// Notifies all blocked operations.
141 ///
142 /// Returns `true` if at least one operation was notified.
143 #[inline]
144 pub fn notify_all(&self) -> bool {
145 // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
146 if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
147 self.notify(Notify::All)
148 } else {
149 false
150 }
151 }
152
153 /// Notifies blocked operations, either one or all of them.
154 ///
155 /// Returns `true` if at least one operation was notified.
156 #[cold]
157 fn notify(&self, n: Notify) -> bool {
158 let mut inner = &mut *self.lock();
159 let mut notified = false;
160
161 for (_, opt_waker) in inner.entries.iter_mut() {
162 // If there is no waker in this entry, that means it was already woken.
163 if let Some(w) = opt_waker.take() {
164 w.wake();
165 inner.notifiable -= 1;
166 notified = true;
167
168 if n == Notify::One {
169 break;
170 }
171 }
172
173 if n == Notify::Any {
174 break;
175 }
176 }
177
178 notified
179 }
180
181 /// Locks the list of entries.
182 fn lock(&self) -> Lock<'_> {
183 let backoff = Backoff::new();
184 while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 {
185 backoff.snooze();
186 }
187 Lock { waker_set: self }
188 }
189}
190
191/// A guard holding a `WakerSet` locked.
192struct Lock<'a> {
193 waker_set: &'a WakerSet,
194}
195
196impl Drop for Lock<'_> {
197 #[inline]
198 fn drop(&mut self) {
199 let mut flag = 0;
200
201 // Set the `NOTIFIED` flag if there is at least one notified entry.
202 if self.entries.len() - self.notifiable > 0 {
203 flag |= NOTIFIED;
204 }
205
206 // Set the `NOTIFIABLE` flag if there is at least one notifiable entry.
207 if self.notifiable > 0 {
208 flag |= NOTIFIABLE;
209 }
210
211 // Use `SeqCst` ordering to synchronize with `WakerSet::lock_to_notify()`.
212 self.waker_set.flag.store(flag, Ordering::SeqCst);
213 }
214}
215
216impl Deref for Lock<'_> {
217 type Target = Inner;
218
219 #[inline]
220 fn deref(&self) -> &Inner {
221 unsafe { &*self.waker_set.inner.get() }
222 }
223}
224
225impl DerefMut for Lock<'_> {
226 #[inline]
227 fn deref_mut(&mut self) -> &mut Inner {
228 unsafe { &mut *self.waker_set.inner.get() }
229 }
230}
231
232/// Notification strategy.
233#[derive(Clone, Copy, Eq, PartialEq)]
234enum Notify {
235 /// Make sure at least one entry is notified.
236 Any,
237 /// Notify one additional entry.
238 One,
239 /// Notify all entries.
240 All,
241}