async_std/sync/waker_set.rs
1//! A common utility for building synchronization primitives.
2//!
3//! When an async operation is blocked, it needs to register itself somewhere so that it can be
4//! notified later on. The `WakerSet` type helps with keeping track of such async operations and
5//! notifying them when they may make progress.
6
7use std::cell::UnsafeCell;
8use std::ops::{Deref, DerefMut};
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::task::{Context, Waker};
11
12use crossbeam_utils::Backoff;
13use slab::Slab;
14
15/// Set when the entry list is locked.
16#[allow(clippy::identity_op)]
17const LOCKED: usize = 1 << 0;
18
19/// Set when there is at least one entry that has already been notified.
20const NOTIFIED: usize = 1 << 1;
21
22/// Set when there is at least one notifiable entry.
23const NOTIFIABLE: usize = 1 << 2;
24
25/// Inner representation of `WakerSet`.
26struct Inner {
27 /// A list of entries in the set.
28 ///
29 /// Each entry has an optional waker associated with the task that is executing the operation.
30 /// If the waker is set to `None`, that means the task has been woken up but hasn't removed
31 /// itself from the `WakerSet` yet.
32 ///
33 /// The key of each entry is its index in the `Slab`.
34 entries: Slab<Option<Waker>>,
35
36 /// The number of notifiable entries.
37 notifiable: usize,
38}
39
40/// A set holding wakers.
41pub struct WakerSet {
42 /// Holds three bits: `LOCKED`, `NOTIFY_ONE`, and `NOTIFY_ALL`.
43 flag: AtomicUsize,
44
45 /// A set holding wakers.
46 inner: UnsafeCell<Inner>,
47}
48
49impl WakerSet {
50 /// Creates a new `WakerSet`.
51 #[inline]
52 pub fn new() -> WakerSet {
53 WakerSet {
54 flag: AtomicUsize::new(0),
55 inner: UnsafeCell::new(Inner {
56 entries: Slab::new(),
57 notifiable: 0,
58 }),
59 }
60 }
61
62 /// Inserts a waker for a blocked operation and returns a key associated with it.
63 #[cold]
64 pub fn insert(&self, cx: &Context<'_>) -> usize {
65 let w = cx.waker().clone();
66 let mut inner = self.lock();
67
68 let key = inner.entries.insert(Some(w));
69 inner.notifiable += 1;
70 key
71 }
72
73 /// If the waker for this key is still waiting for a notification, then update
74 /// the waker for the entry, and return false. If the waker has been notified,
75 /// treat the entry as completed and return true.
76 #[cfg(feature = "unstable")]
77 pub fn remove_if_notified(&self, key: usize, cx: &Context<'_>) -> bool {
78 let mut inner = self.lock();
79
80 match &mut inner.entries[key] {
81 None => {
82 inner.entries.remove(key);
83 true
84 }
85 Some(w) => {
86 // We were never woken, so update instead
87 if !w.will_wake(cx.waker()) {
88 *w = cx.waker().clone();
89 }
90 false
91 }
92 }
93 }
94
95 /// Removes the waker of a cancelled operation.
96 ///
97 /// Returns `true` if another blocked operation from the set was notified.
98 #[cold]
99 pub fn cancel(&self, key: usize) -> bool {
100 let mut inner = self.lock();
101
102 match inner.entries.remove(key) {
103 Some(_) => inner.notifiable -= 1,
104 None => {
105 // The operation was cancelled and notified so notify another operation instead.
106 for (_, opt_waker) in inner.entries.iter_mut() {
107 // If there is no waker in this entry, that means it was already woken.
108 if let Some(w) = opt_waker.take() {
109 w.wake();
110 inner.notifiable -= 1;
111 return true;
112 }
113 }
114 }
115 }
116
117 false
118 }
119
120 /// Notifies one additional blocked operation.
121 ///
122 /// Returns `true` if an operation was notified.
123 #[inline]
124 #[cfg(feature = "unstable")]
125 pub fn notify_one(&self) -> bool {
126 // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
127 if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
128 self.notify(Notify::One)
129 } else {
130 false
131 }
132 }
133
134 /// Notifies all blocked operations.
135 ///
136 /// Returns `true` if at least one operation was notified.
137 #[inline]
138 pub fn notify_all(&self) -> bool {
139 // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
140 if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
141 self.notify(Notify::All)
142 } else {
143 false
144 }
145 }
146
147 /// Notifies blocked operations, either one or all of them.
148 ///
149 /// Returns `true` if at least one operation was notified.
150 #[cold]
151 fn notify(&self, n: Notify) -> bool {
152 let inner = &mut *self.lock();
153 let mut notified = false;
154
155 for (_, opt_waker) in inner.entries.iter_mut() {
156 // If there is no waker in this entry, that means it was already woken.
157 if let Some(w) = opt_waker.take() {
158 w.wake();
159 inner.notifiable -= 1;
160 notified = true;
161
162 if n == Notify::One {
163 break;
164 }
165 }
166
167 if n == Notify::Any {
168 break;
169 }
170 }
171
172 notified
173 }
174
175 /// Locks the list of entries.
176 fn lock(&self) -> Lock<'_> {
177 let backoff = Backoff::new();
178 while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 {
179 backoff.snooze();
180 }
181 Lock { waker_set: self }
182 }
183}
184
185/// A guard holding a `WakerSet` locked.
186struct Lock<'a> {
187 waker_set: &'a WakerSet,
188}
189
190impl Drop for Lock<'_> {
191 #[inline]
192 fn drop(&mut self) {
193 let mut flag = 0;
194
195 // Set the `NOTIFIED` flag if there is at least one notified entry.
196 if self.entries.len() - self.notifiable > 0 {
197 flag |= NOTIFIED;
198 }
199
200 // Set the `NOTIFIABLE` flag if there is at least one notifiable entry.
201 if self.notifiable > 0 {
202 flag |= NOTIFIABLE;
203 }
204
205 // Use `SeqCst` ordering to synchronize with `WakerSet::lock_to_notify()`.
206 self.waker_set.flag.store(flag, Ordering::SeqCst);
207 }
208}
209
210impl Deref for Lock<'_> {
211 type Target = Inner;
212
213 #[inline]
214 fn deref(&self) -> &Inner {
215 unsafe { &*self.waker_set.inner.get() }
216 }
217}
218
219impl DerefMut for Lock<'_> {
220 #[inline]
221 fn deref_mut(&mut self) -> &mut Inner {
222 unsafe { &mut *self.waker_set.inner.get() }
223 }
224}
225
226/// Notification strategy.
227#[derive(Clone, Copy, Eq, PartialEq)]
228enum Notify {
229 /// Make sure at least one entry is notified.
230 Any,
231 /// Notify one additional entry.
232 One,
233 /// Notify all entries.
234 All,
235}