async_std/sync/waker_set.rs
1//! A common utility for building synchronization primitives.
2//!
3//! When an async operation is blocked, it needs to register itself somewhere so that it can be
4//! notified later on. The `WakerSet` type helps with keeping track of such async operations and
5//! notifying them when they may make progress.
6
7use std::cell::UnsafeCell;
8use std::ops::{Deref, DerefMut};
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::task::{Context, Waker};
11
12use crossbeam_utils::Backoff;
13use slab::Slab;
14
15/// Set when the entry list is locked.
16#[allow(clippy::identity_op)]
17const LOCKED: usize = 1 << 0;
18
19/// Set when there is at least one entry that has already been notified.
20const NOTIFIED: usize = 1 << 1;
21
22/// Set when there is at least one notifiable entry.
23const NOTIFIABLE: usize = 1 << 2;
24
25/// Inner representation of `WakerSet`.
26struct Inner {
27 /// A list of entries in the set.
28 ///
29 /// Each entry has an optional waker associated with the task that is executing the operation.
30 /// If the waker is set to `None`, that means the task has been woken up but hasn't removed
31 /// itself from the `WakerSet` yet.
32 ///
33 /// The key of each entry is its index in the `Slab`.
34 entries: Slab<Option<Waker>>,
35
36 /// The number of notifiable entries.
37 notifiable: usize,
38}
39
40/// A set holding wakers.
41pub struct WakerSet {
42 /// Holds three bits: `LOCKED`, `NOTIFY_ONE`, and `NOTIFY_ALL`.
43 flag: AtomicUsize,
44
45 /// A set holding wakers.
46 inner: UnsafeCell<Inner>,
47}
48
49impl WakerSet {
50 /// Creates a new `WakerSet`.
51 #[inline]
52 pub fn new() -> WakerSet {
53 WakerSet {
54 flag: AtomicUsize::new(0),
55 inner: UnsafeCell::new(Inner {
56 entries: Slab::new(),
57 notifiable: 0,
58 }),
59 }
60 }
61
62 /// Inserts a waker for a blocked operation and returns a key associated with it.
63 #[cold]
64 pub fn insert(&self, cx: &Context<'_>) -> usize {
65 let w = cx.waker().clone();
66 let mut inner = self.lock();
67
68 let key = inner.entries.insert(Some(w));
69 inner.notifiable += 1;
70 key
71 }
72
73 /// Removes the waker of an operation.
74 #[cold]
75 pub fn remove(&self, key: usize) {
76 let mut inner = self.lock();
77
78 if inner.entries.remove(key).is_some() {
79 inner.notifiable -= 1;
80 }
81 }
82
83 /// Removes the waker of a cancelled operation.
84 ///
85 /// Returns `true` if another blocked operation from the set was notified.
86 #[cold]
87 pub fn cancel(&self, key: usize) -> bool {
88 let mut inner = self.lock();
89
90 match inner.entries.remove(key) {
91 Some(_) => inner.notifiable -= 1,
92 None => {
93 // The operation was cancelled and notified so notify another operation instead.
94 for (_, opt_waker) in inner.entries.iter_mut() {
95 // If there is no waker in this entry, that means it was already woken.
96 if let Some(w) = opt_waker.take() {
97 w.wake();
98 inner.notifiable -= 1;
99 return true;
100 }
101 }
102 }
103 }
104
105 false
106 }
107
108 /// Notifies a blocked operation if none have been notified already.
109 ///
110 /// Returns `true` if an operation was notified.
111 #[inline]
112 pub fn notify_any(&self) -> bool {
113 // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
114 let flag = self.flag.load(Ordering::SeqCst);
115
116 if flag & NOTIFIED == 0 && flag & NOTIFIABLE != 0 {
117 self.notify(Notify::Any)
118 } else {
119 false
120 }
121 }
122
123 /// Notifies one additional blocked operation.
124 ///
125 /// Returns `true` if an operation was notified.
126 #[inline]
127 #[cfg(feature = "unstable")]
128 pub fn notify_one(&self) -> bool {
129 // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
130 if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
131 self.notify(Notify::One)
132 } else {
133 false
134 }
135 }
136
137 /// Notifies all blocked operations.
138 ///
139 /// Returns `true` if at least one operation was notified.
140 #[inline]
141 pub fn notify_all(&self) -> bool {
142 // Use `SeqCst` ordering to synchronize with `Lock::drop()`.
143 if self.flag.load(Ordering::SeqCst) & NOTIFIABLE != 0 {
144 self.notify(Notify::All)
145 } else {
146 false
147 }
148 }
149
150 /// Notifies blocked operations, either one or all of them.
151 ///
152 /// Returns `true` if at least one operation was notified.
153 #[cold]
154 fn notify(&self, n: Notify) -> bool {
155 let mut inner = &mut *self.lock();
156 let mut notified = false;
157
158 for (_, opt_waker) in inner.entries.iter_mut() {
159 // If there is no waker in this entry, that means it was already woken.
160 if let Some(w) = opt_waker.take() {
161 w.wake();
162 inner.notifiable -= 1;
163 notified = true;
164
165 if n == Notify::One {
166 break;
167 }
168 }
169
170 if n == Notify::Any {
171 break;
172 }
173 }
174
175 notified
176 }
177
178 /// Locks the list of entries.
179 fn lock(&self) -> Lock<'_> {
180 let backoff = Backoff::new();
181 while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 {
182 backoff.snooze();
183 }
184 Lock { waker_set: self }
185 }
186}
187
188/// A guard holding a `WakerSet` locked.
189struct Lock<'a> {
190 waker_set: &'a WakerSet,
191}
192
193impl Drop for Lock<'_> {
194 #[inline]
195 fn drop(&mut self) {
196 let mut flag = 0;
197
198 // Set the `NOTIFIED` flag if there is at least one notified entry.
199 if self.entries.len() - self.notifiable > 0 {
200 flag |= NOTIFIED;
201 }
202
203 // Set the `NOTIFIABLE` flag if there is at least one notifiable entry.
204 if self.notifiable > 0 {
205 flag |= NOTIFIABLE;
206 }
207
208 // Use `SeqCst` ordering to synchronize with `WakerSet::lock_to_notify()`.
209 self.waker_set.flag.store(flag, Ordering::SeqCst);
210 }
211}
212
213impl Deref for Lock<'_> {
214 type Target = Inner;
215
216 #[inline]
217 fn deref(&self) -> &Inner {
218 unsafe { &*self.waker_set.inner.get() }
219 }
220}
221
222impl DerefMut for Lock<'_> {
223 #[inline]
224 fn deref_mut(&mut self) -> &mut Inner {
225 unsafe { &mut *self.waker_set.inner.get() }
226 }
227}
228
229/// Notification strategy.
230#[derive(Clone, Copy, Eq, PartialEq)]
231enum Notify {
232 /// Make sure at least one entry is notified.
233 Any,
234 /// Notify one additional entry.
235 One,
236 /// Notify all entries.
237 All,
238}