diatomic_waker/waker.rs
1use core::future::Future;
2use core::pin::Pin;
3use core::sync::atomic::Ordering;
4use core::task::{Context, Poll, Waker};
5
6use crate::loom_exports::cell::UnsafeCell;
7use crate::loom_exports::sync::atomic::AtomicUsize;
8use crate::WakeSinkRef;
9
10// The state of the waker is tracked by the following bit flags:
11//
12// * INDEX [I]: slot index of the current waker, if any (0 or 1),
13// * UPDATE [U]: an updated waker has been registered in the redundant slot at
14// index 1 - INDEX,
15// * REGISTERED [R]: a waker is registered and awaits a notification
16// * LOCKED [L]: a notifier has taken the notifier lock and is in the process of
17// sending a notification,
18// * NOTIFICATION [N]: a notifier has failed to take the lock when a waker was
19// registered and has requested the notifier holding the lock to send a
20// notification on its behalf (implies REGISTERED and LOCKED).
21//
22// The waker stored in the slot at INDEX ("current" waker) is shared between the
23// sink (entity which registers wakers) and the source that holds the notifier
24// lock (if any). For this reason, this waker may only be accessed by shared
25// reference. The waker at 1 - INDEX is exclusively owned by the sink, which is
26// free to mutate it.
27
28// Summary of valid states:
29//
30// | N L R U I |
31// |---------------------|
32// | 0 any any any any |
33// | 1 1 1 any any |
34
35// [I] Index of the current waker (0 or 1).
36const INDEX: usize = 0b00001;
37// [U] Indicates that an updated waker is available at 1 - INDEX.
38const UPDATE: usize = 0b00010;
39// [R] Indicates that a waker has been registered.
40const REGISTERED: usize = 0b00100;
41// [L] Indicates that a notifier holds the notifier lock to the waker at INDEX.
42const LOCKED: usize = 0b01000;
43// [N] Indicates that a notifier has failed to acquire the lock and has
44// requested the notifier holding the lock to notify on its behalf.
45const NOTIFICATION: usize = 0b10000;
46
47/// A primitive that can send or await notifications.
48///
49/// It is almost always preferable to use the [`WakeSink`](crate::WakeSink) and
50/// [`WakeSource`](crate::WakeSource) which offer more convenience at the cost
51/// of an allocation in an `Arc`.
52///
53/// If allocation is not possible or desirable, the
54/// [`sink_ref`](DiatomicWaker::sink_ref) method can be used to create a
55/// [`WakeSinkRef`] handle and one or more
56/// [`WakeSourceRef`](crate::borrowed_waker::WakeSourceRef)s, the non-owned
57/// counterparts to `WakeSink` and `WakeSource`.
58///
59/// Finally, `DiatomicWaker` exposes `unsafe` methods that can be used to create
60/// custom synchronization primitives.
61#[derive(Debug)]
62pub struct DiatomicWaker {
63 /// A bit field for `INDEX`, `UPDATE`, `REGISTERED`, `LOCKED` and `NOTIFICATION`.
64 state: AtomicUsize,
65 /// Redundant slots for the waker.
66 waker: [UnsafeCell<Option<Waker>>; 2],
67}
68
69impl DiatomicWaker {
70 /// Creates a new `DiatomicWaker`.
71 #[cfg(not(all(test, diatomic_waker_loom)))]
72 pub const fn new() -> Self {
73 Self {
74 state: AtomicUsize::new(0),
75 waker: [UnsafeCell::new(None), UnsafeCell::new(None)],
76 }
77 }
78
79 #[cfg(all(test, diatomic_waker_loom))]
80 pub fn new() -> Self {
81 Self {
82 state: AtomicUsize::new(0),
83 waker: [UnsafeCell::new(None), UnsafeCell::new(None)],
84 }
85 }
86
87 /// Returns a sink with a lifetime bound to this `DiatomicWaker`.
88 ///
89 /// This mutably borrows the waker, thus ensuring that at most one
90 /// associated sink can be active at a time.
91 pub fn sink_ref(&mut self) -> WakeSinkRef<'_> {
92 WakeSinkRef { inner: self }
93 }
94
95 /// Sends a notification if a waker is registered.
96 ///
97 /// This automatically unregisters any waker that may have been previously
98 /// registered.
99 pub fn notify(&self) {
100 // Transitions: see `try_lock` and `try_unlock`.
101
102 let mut state = if let Ok(s) = try_lock(&self.state) {
103 s
104 } else {
105 return;
106 };
107
108 loop {
109 let idx = state & INDEX;
110
111 // Safety: the notifier lock has been acquired, which guarantees
112 // exclusive access to the waker at `INDEX`.
113 unsafe {
114 self.wake_by_ref(idx);
115 }
116
117 if let Err(s) = try_unlock(&self.state, state) {
118 state = s;
119 } else {
120 return;
121 }
122
123 // One more loop iteration is necessary because the waker was
124 // registered again and another notifier has failed to send a
125 // notification while the notifier lock was taken.
126 }
127 }
128
129 /// Registers a new waker.
130 ///
131 /// Registration is lazy: the waker is cloned only if it differs from the
132 /// last registered waker (note that the last registered waker is cached
133 /// even if it was unregistered).
134 ///
135 /// # Safety
136 ///
137 /// The `register`, `unregister` and `wait_until` methods cannot be used
138 /// concurrently from multiple threads.
139 pub unsafe fn register(&self, waker: &Waker) {
140 // Transitions if the new waker is the same as the one currently stored.
141 //
142 // | N L R U I | N L R U I |
143 // |-----------------|-----------------|
144 // | n l r u i | n l 1 u i |
145 //
146 //
147 // Transitions if the new waker needs to be stored:
148 //
149 // Step 1 (only necessary if the state initially indicates R=U=1):
150 //
151 // | N L R U I | N L R U I |
152 // |-----------------|-----------------|
153 // | n l r u i | 0 l 0 u i |
154 //
155 // Step 2:
156 //
157 // | N L R U I | N L R U I |
158 // |-----------------|-----------------|
159 // | n l r u i | n l 1 1 i |
160
161 // Ordering: Acquire ordering is necessary to synchronize with the
162 // Release unlocking operation in `notify`, which ensures that all calls
163 // to the waker in the redundant slot have completed.
164 let state = self.state.load(Ordering::Acquire);
165
166 // Compute the index of the waker that was most recently updated. Note
167 // that the value of `recent_idx` as computed below remains correct even
168 // if the state is stale since only this thread can store new wakers.
169 let mut idx = state & INDEX;
170 let recent_idx = if state & UPDATE == 0 {
171 idx
172 } else {
173 INDEX - idx
174 };
175
176 // Safety: it is safe to call `will_wake` since the registering thread
177 // is the only one allowed to mutate the wakers so there can be no
178 // concurrent mutable access to the waker.
179 let is_up_to_date = self.will_wake(recent_idx, waker);
180
181 // Fast path in case the waker is up to date.
182 if is_up_to_date {
183 // Set the `REGISTERED` flag. Ideally, the `NOTIFICATION` flag would
184 // be cleared at the same time to avoid a spurious wake-up, but it
185 // probably isn't worth the overhead of a CAS loop because having
186 // this flag set when calling `register` is very unlikely: it would
187 // mean that since the last call to `register`:
188 // 1) a notifier has been holding the lock continuously,
189 // 2) another notifier has tried and failed to take the lock, and
190 // 3) `unregister` was never called.
191 //
192 // Ordering: Acquire ordering synchronizes with the Release and
193 // AcqRel RMWs in `try_lock` (called by `notify`) and ensures that
194 // either the predicate set before the call to `notify` will be
195 // visible after the call to `register`, or the registered waker
196 // will be visible during the call to `notify` (or both). Note that
197 // Release ordering is not necessary since the waker has not changed
198 // and this RMW takes part in a release sequence headed by the
199 // initial registration of the waker.
200 self.state.fetch_or(REGISTERED, Ordering::Acquire);
201
202 return;
203 }
204
205 // The waker needs to be stored in the redundant slot.
206 //
207 // It is necessary to make sure that either the `UPDATE` or the
208 // `REGISTERED` flag is cleared to prevent concurrent access by a notifier
209 // to the redundant waker slot while the waker is updated.
210 //
211 // Note that only the thread registering the waker can set `REGISTERED`
212 // and `UPDATE` so even if the state is stale, observing `REGISTERED` or
213 // `UPDATE` as cleared guarantees that such flag is and will remain
214 // cleared until this thread sets them.
215 if state & (UPDATE | REGISTERED) == (UPDATE | REGISTERED) {
216 // Clear the `REGISTERED` and `NOTIFICATION` flags.
217 //
218 // Ordering: Acquire ordering is necessary to synchronize with the
219 // Release unlocking operation in `notify`, which ensures that all
220 // calls to the waker in the redundant slot have completed.
221 let state = self
222 .state
223 .fetch_and(!(REGISTERED | NOTIFICATION), Ordering::Acquire);
224
225 // It is possible that `UPDATE` was cleared and `INDEX` was switched
226 // by a notifier after the initial load of the state, so the waker
227 // index needs to be updated.
228 idx = state & INDEX;
229 }
230
231 // Always store the new waker in the redundant slot to avoid racing with
232 // a notifier.
233 let redundant_idx = 1 - idx;
234
235 // Store the new waker.
236 //
237 // Safety: it is safe to store the new waker in the redundant slot
238 // because the `REGISTERED` flag and/or the `UPDATE` flag are/is cleared
239 // so the notifier will not attempt to switch the waker.
240 self.set_waker(redundant_idx, waker.clone());
241
242 // Make the waker visible.
243 //
244 // Ordering: Acquire ordering synchronizes with the Release and AcqRel
245 // RMWs in `try_lock` (called by `notify`) and ensures that either the
246 // predicate set before the call to `notify` will be visible after the
247 // call to `register`, or the registered waker will be visible during
248 // the call to `notify` (or both). Since the waker has been modified
249 // above, Release ordering is also necessary to synchronize with the
250 // AcqRel RMW in `try_lock` (success case) and ensure that the
251 // modification to the waker is fully visible when notifying.
252 self.state.fetch_or(UPDATE | REGISTERED, Ordering::AcqRel);
253 }
254
255 /// Unregisters the waker.
256 ///
257 /// After the waker is unregistered, subsequent calls to `notify` will be
258 /// ignored.
259 ///
260 /// Note that the previously-registered waker (if any) remains cached.
261 ///
262 /// # Safety
263 ///
264 /// The `register`, `unregister` and `wait_until` methods cannot be used
265 /// concurrently from multiple threads.
266 pub unsafe fn unregister(&self) {
267 // Transitions:
268 //
269 // | N L R U I | N L R U I |
270 // |-----------------|-----------------|
271 // | n l r u i | 0 l 0 u i |
272
273 // Modify the state. Note that the waker is not dropped: caching it can
274 // avoid a waker drop/cloning cycle (typically, 2 RMWs) in the frequent
275 // case when the next waker to be registered will be the same as the one
276 // being unregistered.
277 //
278 // Ordering: no waker was modified so Relaxed ordering is sufficient.
279 self.state
280 .fetch_and(!(REGISTERED | NOTIFICATION), Ordering::Relaxed);
281 }
282
283 /// Returns a future that can be `await`ed until the provided predicate
284 /// returns a value.
285 ///
286 /// The predicate is checked each time a notification is received.
287 ///
288 /// # Safety
289 ///
290 /// The `register`, `unregister` and `wait_until` methods cannot be used
291 /// concurrently from multiple threads.
292 pub unsafe fn wait_until<P, T>(&self, predicate: P) -> WaitUntil<'_, P, T>
293 where
294 P: FnMut() -> Option<T>,
295 {
296 WaitUntil::new(self, predicate)
297 }
298
299 /// Sets the waker at index `idx`.
300 ///
301 /// # Safety
302 ///
303 /// The caller must have exclusive access to the waker at index `idx`.
304 unsafe fn set_waker(&self, idx: usize, new: Waker) {
305 self.waker[idx].with_mut(|waker| (*waker) = Some(new));
306 }
307
308 /// Notify the waker at index `idx`.
309 ///
310 /// # Safety
311 ///
312 /// The waker at index `idx` cannot be modified concurrently.
313 unsafe fn wake_by_ref(&self, idx: usize) {
314 self.waker[idx].with(|waker| {
315 if let Some(waker) = &*waker {
316 waker.wake_by_ref();
317 }
318 });
319 }
320
321 /// Check whether the waker at index `idx` will wake the same task as the
322 /// provided waker.
323 ///
324 /// # Safety
325 ///
326 /// The waker at index `idx` cannot be modified concurrently.
327 unsafe fn will_wake(&self, idx: usize, other: &Waker) -> bool {
328 self.waker[idx].with(|waker| match &*waker {
329 Some(waker) => waker.will_wake(other),
330 None => false,
331 })
332 }
333}
334
335impl Default for DiatomicWaker {
336 fn default() -> Self {
337 Self::new()
338 }
339}
340
341unsafe impl Send for DiatomicWaker {}
342unsafe impl Sync for DiatomicWaker {}
343
344/// Attempts to acquire the notifier lock and returns the current state upon
345/// success.
346///
347/// Acquisition of the lock will fail in the following cases:
348///
349/// * the `REGISTERED` flag is cleared, meaning that there is no need to wake
350/// and therefore no need to lock,
351/// * the lock is already taken, in which case the `NOTIFICATION` flag will be
352/// set if the `REGISTERED` flag is set.
353///
354/// If acquisition of the lock succeeds, the `REGISTERED` flag is cleared. If
355/// additionally the `UPDATE` flag was set, it is cleared and `INDEX` is
356/// flipped.
357///
358/// Transition table:
359///
360/// | N L R U I | N L R U I |
361/// |-----------------|-----------------|
362/// | 0 0 0 u i | 0 0 0 u i | (failure)
363/// | 0 0 1 0 i | 0 1 0 0 i | (success)
364/// | 0 0 1 1 i | 0 1 0 0 !i | (success)
365/// | 0 1 0 u i | 0 1 0 u i | (failure)
366/// | n 1 1 u i | 1 1 1 u i | (failure)
367///
368fn try_lock(state: &AtomicUsize) -> Result<usize, ()> {
369 let mut old_state = state.load(Ordering::Relaxed);
370
371 loop {
372 if old_state & (LOCKED | REGISTERED) == REGISTERED {
373 // Success path.
374
375 // If `UPDATE` is set, clear `UPDATE` and flip `INDEX` with the xor
376 // mask.
377 let update_bit = old_state & UPDATE;
378 let xor_mask = update_bit | (update_bit >> 1);
379
380 // Set `LOCKED` and clear `REGISTERED` with the xor mask.
381 let xor_mask = xor_mask | LOCKED | REGISTERED;
382
383 let new_state = old_state ^ xor_mask;
384
385 // Ordering: Acquire is necessary to synchronize with the Release
386 // ordering in `register` so that the new waker, if any, is visible.
387 // Release ordering synchronizes with the Acquire and AcqRel RMWs in
388 // `register` and ensures that either the predicate set before the
389 // call to `notify` will be visible after the call to `register`, or
390 // the registered waker will be visible during the call to `notify`
391 // (or both).
392 match state.compare_exchange_weak(
393 old_state,
394 new_state,
395 Ordering::AcqRel,
396 Ordering::Relaxed,
397 ) {
398 Ok(_) => return Ok(new_state),
399 Err(s) => old_state = s,
400 }
401 } else {
402 // Failure path.
403
404 // Set the `NOTIFICATION` bit if `REGISTERED` was set.
405 let registered_bit = old_state & REGISTERED;
406 let new_state = old_state | (registered_bit << 2);
407
408 // Ordering: Release ordering synchronizes with the Acquire and
409 // AcqRel RMWs in `register` and ensures that either the predicate
410 // set before the call to `notify` will be visible after the call to
411 // `register`, or the registered waker will be visible during the
412 // call to `notify` (or both).
413 match state.compare_exchange_weak(
414 old_state,
415 new_state,
416 Ordering::Release,
417 Ordering::Relaxed,
418 ) {
419 Ok(_) => return Err(()),
420 Err(s) => old_state = s,
421 }
422 };
423 }
424}
425
426/// Attempts to release the notifier lock and returns the current state upon
427/// failure.
428///
429/// Release of the lock will fail if the `NOTIFICATION` flag is set because it
430/// means that, after the lock was taken, the registering thread has requested
431/// to be notified again and another notifier has subsequently requested that
432/// such notification be sent on its behalf; if additionally the `UPDATE` flag
433/// was set (i.e. a new waker is available), it is cleared and `INDEX` is
434/// flipped.
435///
436/// Transition table:
437///
438/// | N L R U I | N L R U I |
439/// |-----------------|-----------------|
440/// | 0 1 r u i | 0 0 r u i | (success)
441/// | 1 1 1 0 i | 0 1 0 0 i | (failure)
442/// | 1 1 1 1 i | 0 1 0 0 !i | (failure)
443///
444fn try_unlock(state: &AtomicUsize, mut old_state: usize) -> Result<(), usize> {
445 loop {
446 if old_state & NOTIFICATION == 0 {
447 // Success path.
448
449 let new_state = old_state & !LOCKED;
450
451 // Ordering: Release is necessary to synchronize with the Acquire
452 // ordering in `register` and ensure that the waker call has
453 // completed before a new waker is stored.
454 match state.compare_exchange_weak(
455 old_state,
456 new_state,
457 Ordering::Release,
458 Ordering::Relaxed,
459 ) {
460 Ok(_) => return Ok(()),
461 Err(s) => old_state = s,
462 }
463 } else {
464 // Failure path.
465
466 // If `UPDATE` is set, clear `UPDATE` and flip `INDEX` with the xor mask.
467 let update_bit = old_state & UPDATE;
468 let xor_mask = update_bit | (update_bit >> 1);
469
470 // Clear `NOTIFICATION` and `REGISTERED` with the xor mask.
471 let xor_mask = xor_mask | NOTIFICATION | REGISTERED;
472
473 let new_state = old_state ^ xor_mask;
474
475 // Ordering: Release is necessary to synchronize with the Acquire
476 // ordering in `register` and ensure that the call to
477 // `Waker::wake_by_ref` has completed before a new waker is stored.
478 // Acquire ordering is in turn necessary to ensure that any newly
479 // registered waker is visible.
480 match state.compare_exchange_weak(
481 old_state,
482 new_state,
483 Ordering::AcqRel,
484 Ordering::Relaxed,
485 ) {
486 Ok(_) => return Err(new_state),
487 Err(s) => old_state = s,
488 }
489 };
490 }
491}
492
493/// A future that can be `await`ed until a predicate is satisfied.
494#[derive(Debug)]
495pub struct WaitUntil<'a, P, T>
496where
497 P: FnMut() -> Option<T>,
498{
499 predicate: P,
500 wake: &'a DiatomicWaker,
501}
502
503impl<'a, P, T> WaitUntil<'a, P, T>
504where
505 P: FnMut() -> Option<T>,
506{
507 /// Creates a future associated to the specified wake that can be `await`ed
508 /// until the specified predicate is satisfied.
509 fn new(wake: &'a DiatomicWaker, predicate: P) -> Self {
510 Self { predicate, wake }
511 }
512}
513
514impl<P: FnMut() -> Option<T>, T> Unpin for WaitUntil<'_, P, T> {}
515
516impl<'a, P, T> Future for WaitUntil<'a, P, T>
517where
518 P: FnMut() -> Option<T>,
519{
520 type Output = T;
521
522 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
523 // Safety: the safety of this method is contingent on the safety of the
524 // `register` and `unregister` methods. Since a `WaitUntil` future can
525 // only be created from the unsafe `wait_until` method, however, the
526 // user must uphold the contract that `register`, `unregister` and
527 // `wait_until` cannot be used concurrently from multiple threads.
528 unsafe {
529 if let Some(value) = (self.predicate)() {
530 return Poll::Ready(value);
531 }
532 self.wake.register(cx.waker());
533
534 if let Some(value) = (self.predicate)() {
535 self.wake.unregister();
536 return Poll::Ready(value);
537 }
538 }
539
540 Poll::Pending
541 }
542}