diatomic_waker/
waker.rs

1use core::future::Future;
2use core::pin::Pin;
3use core::sync::atomic::Ordering;
4use core::task::{Context, Poll, Waker};
5
6use crate::loom_exports::cell::UnsafeCell;
7use crate::loom_exports::sync::atomic::AtomicUsize;
8use crate::WakeSinkRef;
9
10// The state of the waker is tracked by the following bit flags:
11//
12// * INDEX [I]: slot index of the current waker, if any (0 or 1),
13// * UPDATE [U]: an updated waker has been registered in the redundant slot at
14//   index 1 - INDEX,
15// * REGISTERED [R]: a waker is registered and awaits a notification
16// * LOCKED [L]: a notifier has taken the notifier lock and is in the process of
17//   sending a notification,
18// * NOTIFICATION [N]: a notifier has failed to take the lock when a waker was
19//   registered and has requested the notifier holding the lock to send a
20//   notification on its behalf (implies REGISTERED and LOCKED).
21//
22// The waker stored in the slot at INDEX ("current" waker) is shared between the
23// sink (entity which registers wakers) and the source that holds the notifier
24// lock (if any). For this reason, this waker may only be accessed by shared
25// reference. The waker at 1 - INDEX is exclusively owned by the sink, which is
26// free to mutate it.
27
28// Summary of valid states:
29//
30// |  N   L   R   U   I  |
31// |---------------------|
32// |  0  any any any any |
33// |  1   1   1  any any |
34
35// [I] Index of the current waker (0 or 1).
36const INDEX: usize = 0b00001;
37// [U] Indicates that an updated waker is available at 1 - INDEX.
38const UPDATE: usize = 0b00010;
39// [R] Indicates that a waker has been registered.
40const REGISTERED: usize = 0b00100;
41// [L] Indicates that a notifier holds the notifier lock to the waker at INDEX.
42const LOCKED: usize = 0b01000;
43// [N] Indicates that a notifier has failed to acquire the lock and has
44// requested the notifier holding the lock to notify on its behalf.
45const NOTIFICATION: usize = 0b10000;
46
47/// A primitive that can send or await notifications.
48///
49/// It is almost always preferable to use the [`WakeSink`](crate::WakeSink) and
50/// [`WakeSource`](crate::WakeSource) which offer more convenience at the cost
51/// of an allocation in an `Arc`.
52///
53/// If allocation is not possible or desirable, the
54/// [`sink_ref`](DiatomicWaker::sink_ref) method can be used to create a
55/// [`WakeSinkRef`] handle and one or more
56/// [`WakeSourceRef`](crate::borrowed_waker::WakeSourceRef)s, the non-owned
57/// counterparts to `WakeSink` and `WakeSource`.
58///
59/// Finally, `DiatomicWaker` exposes `unsafe` methods that can be used to create
60/// custom synchronization primitives.
61#[derive(Debug)]
62pub struct DiatomicWaker {
63    /// A bit field for `INDEX`, `UPDATE`, `REGISTERED`, `LOCKED` and `NOTIFICATION`.
64    state: AtomicUsize,
65    /// Redundant slots for the waker.
66    waker: [UnsafeCell<Option<Waker>>; 2],
67}
68
69impl DiatomicWaker {
70    /// Creates a new `DiatomicWaker`.
71    #[cfg(not(all(test, diatomic_waker_loom)))]
72    pub const fn new() -> Self {
73        Self {
74            state: AtomicUsize::new(0),
75            waker: [UnsafeCell::new(None), UnsafeCell::new(None)],
76        }
77    }
78
79    #[cfg(all(test, diatomic_waker_loom))]
80    pub fn new() -> Self {
81        Self {
82            state: AtomicUsize::new(0),
83            waker: [UnsafeCell::new(None), UnsafeCell::new(None)],
84        }
85    }
86
87    /// Returns a sink with a lifetime bound to this `DiatomicWaker`.
88    ///
89    /// This mutably borrows the waker, thus ensuring that at most one
90    /// associated sink can be active at a time.
91    pub fn sink_ref(&mut self) -> WakeSinkRef<'_> {
92        WakeSinkRef { inner: self }
93    }
94
95    /// Sends a notification if a waker is registered.
96    ///
97    /// This automatically unregisters any waker that may have been previously
98    /// registered.
99    pub fn notify(&self) {
100        // Transitions: see `try_lock` and `try_unlock`.
101
102        let mut state = if let Ok(s) = try_lock(&self.state) {
103            s
104        } else {
105            return;
106        };
107
108        loop {
109            let idx = state & INDEX;
110
111            // Safety: the notifier lock has been acquired, which guarantees
112            // exclusive access to the waker at `INDEX`.
113            unsafe {
114                self.wake_by_ref(idx);
115            }
116
117            if let Err(s) = try_unlock(&self.state, state) {
118                state = s;
119            } else {
120                return;
121            }
122
123            // One more loop iteration is necessary because the waker was
124            // registered again and another notifier has failed to send a
125            // notification while the notifier lock was taken.
126        }
127    }
128
129    /// Registers a new waker.
130    ///
131    /// Registration is lazy: the waker is cloned only if it differs from the
132    /// last registered waker (note that the last registered waker is cached
133    /// even if it was unregistered).
134    ///
135    /// # Safety
136    ///
137    /// The `register`, `unregister` and `wait_until` methods cannot be used
138    /// concurrently from multiple threads.
139    pub unsafe fn register(&self, waker: &Waker) {
140        // Transitions if the new waker is the same as the one currently stored.
141        //
142        // |  N  L  R  U  I  |  N  L  R  U  I  |
143        // |-----------------|-----------------|
144        // |  n  l  r  u  i  |  n  l  1  u  i  |
145        //
146        //
147        // Transitions if the new waker needs to be stored:
148        //
149        // Step 1 (only necessary if the state initially indicates R=U=1):
150        //
151        // |  N  L  R  U  I  |  N  L  R  U  I  |
152        // |-----------------|-----------------|
153        // |  n  l  r  u  i  |  0  l  0  u  i  |
154        //
155        // Step 2:
156        //
157        // |  N  L  R  U  I  |  N  L  R  U  I  |
158        // |-----------------|-----------------|
159        // |  n  l  r  u  i  |  n  l  1  1  i  |
160
161        // Ordering: Acquire ordering is necessary to synchronize with the
162        // Release unlocking operation in `notify`, which ensures that all calls
163        // to the waker in the redundant slot have completed.
164        let state = self.state.load(Ordering::Acquire);
165
166        // Compute the index of the waker that was most recently updated. Note
167        // that the value of `recent_idx` as computed below remains correct even
168        // if the state is stale since only this thread can store new wakers.
169        let mut idx = state & INDEX;
170        let recent_idx = if state & UPDATE == 0 {
171            idx
172        } else {
173            INDEX - idx
174        };
175
176        // Safety: it is safe to call `will_wake` since the registering thread
177        // is the only one allowed to mutate the wakers so there can be no
178        // concurrent mutable access to the waker.
179        let is_up_to_date = self.will_wake(recent_idx, waker);
180
181        // Fast path in case the waker is up to date.
182        if is_up_to_date {
183            // Set the `REGISTERED` flag. Ideally, the `NOTIFICATION` flag would
184            // be cleared at the same time to avoid a spurious wake-up, but it
185            // probably isn't worth the overhead of a CAS loop because having
186            // this flag set when calling `register` is very unlikely: it would
187            // mean that since the last call to `register`:
188            // 1) a notifier has been holding the lock continuously,
189            // 2) another notifier has tried and failed to take the lock, and
190            // 3) `unregister` was never called.
191            //
192            // Ordering: Acquire ordering synchronizes with the Release and
193            // AcqRel RMWs in `try_lock` (called by `notify`) and ensures that
194            // either the predicate set before the call to `notify` will be
195            // visible after the call to `register`, or the registered waker
196            // will be visible during the call to `notify` (or both). Note that
197            // Release ordering is not necessary since the waker has not changed
198            // and this RMW takes part in a release sequence headed by the
199            // initial registration of the waker.
200            self.state.fetch_or(REGISTERED, Ordering::Acquire);
201
202            return;
203        }
204
205        // The waker needs to be stored in the redundant slot.
206        //
207        // It is necessary to make sure that either the `UPDATE` or the
208        // `REGISTERED` flag is cleared to prevent concurrent access by a notifier
209        // to the redundant waker slot while the waker is updated.
210        //
211        // Note that only the thread registering the waker can set `REGISTERED`
212        // and `UPDATE` so even if the state is stale, observing `REGISTERED` or
213        // `UPDATE` as cleared guarantees that such flag is and will remain
214        // cleared until this thread sets them.
215        if state & (UPDATE | REGISTERED) == (UPDATE | REGISTERED) {
216            // Clear the `REGISTERED` and `NOTIFICATION` flags.
217            //
218            // Ordering: Acquire ordering is necessary to synchronize with the
219            // Release unlocking operation in `notify`, which ensures that all
220            // calls to the waker in the redundant slot have completed.
221            let state = self
222                .state
223                .fetch_and(!(REGISTERED | NOTIFICATION), Ordering::Acquire);
224
225            // It is possible that `UPDATE` was cleared and `INDEX` was switched
226            // by a notifier after the initial load of the state, so the waker
227            // index needs to be updated.
228            idx = state & INDEX;
229        }
230
231        // Always store the new waker in the redundant slot to avoid racing with
232        // a notifier.
233        let redundant_idx = 1 - idx;
234
235        // Store the new waker.
236        //
237        // Safety: it is safe to store the new waker in the redundant slot
238        // because the `REGISTERED` flag and/or the `UPDATE` flag are/is cleared
239        // so the notifier will not attempt to switch the waker.
240        self.set_waker(redundant_idx, waker.clone());
241
242        // Make the waker visible.
243        //
244        // Ordering: Acquire ordering synchronizes with the Release and AcqRel
245        // RMWs in `try_lock` (called by `notify`) and ensures that either the
246        // predicate set before the call to `notify` will be visible after the
247        // call to `register`, or the registered waker will be visible during
248        // the call to `notify` (or both). Since the waker has been modified
249        // above, Release ordering is also necessary to synchronize with the
250        // AcqRel RMW in `try_lock` (success case) and ensure that the
251        // modification to the waker is fully visible when notifying.
252        self.state.fetch_or(UPDATE | REGISTERED, Ordering::AcqRel);
253    }
254
255    /// Unregisters the waker.
256    ///
257    /// After the waker is unregistered, subsequent calls to `notify` will be
258    /// ignored.
259    ///
260    /// Note that the previously-registered waker (if any) remains cached.
261    ///
262    /// # Safety
263    ///
264    /// The `register`, `unregister` and `wait_until` methods cannot be used
265    /// concurrently from multiple threads.
266    pub unsafe fn unregister(&self) {
267        // Transitions:
268        //
269        // |  N  L  R  U  I  |  N  L  R  U  I  |
270        // |-----------------|-----------------|
271        // |  n  l  r  u  i  |  0  l  0  u  i  |
272
273        // Modify the state. Note that the waker is not dropped: caching it can
274        // avoid a waker drop/cloning cycle (typically, 2 RMWs) in the frequent
275        // case when the next waker to be registered will be the same as the one
276        // being unregistered.
277        //
278        // Ordering: no waker was modified so Relaxed ordering is sufficient.
279        self.state
280            .fetch_and(!(REGISTERED | NOTIFICATION), Ordering::Relaxed);
281    }
282
283    /// Returns a future that can be `await`ed until the provided predicate
284    /// returns a value.
285    ///
286    /// The predicate is checked each time a notification is received.
287    ///
288    /// # Safety
289    ///
290    /// The `register`, `unregister` and `wait_until` methods cannot be used
291    /// concurrently from multiple threads.
292    pub unsafe fn wait_until<P, T>(&self, predicate: P) -> WaitUntil<'_, P, T>
293    where
294        P: FnMut() -> Option<T>,
295    {
296        WaitUntil::new(self, predicate)
297    }
298
299    /// Sets the waker at index `idx`.
300    ///
301    /// # Safety
302    ///
303    /// The caller must have exclusive access to the waker at index `idx`.
304    unsafe fn set_waker(&self, idx: usize, new: Waker) {
305        self.waker[idx].with_mut(|waker| (*waker) = Some(new));
306    }
307
308    /// Notify the waker at index `idx`.
309    ///
310    /// # Safety
311    ///
312    /// The waker at index `idx` cannot be modified concurrently.
313    unsafe fn wake_by_ref(&self, idx: usize) {
314        self.waker[idx].with(|waker| {
315            if let Some(waker) = &*waker {
316                waker.wake_by_ref();
317            }
318        });
319    }
320
321    /// Check whether the waker at index `idx` will wake the same task as the
322    /// provided waker.
323    ///
324    /// # Safety
325    ///
326    /// The waker at index `idx` cannot be modified concurrently.
327    unsafe fn will_wake(&self, idx: usize, other: &Waker) -> bool {
328        self.waker[idx].with(|waker| match &*waker {
329            Some(waker) => waker.will_wake(other),
330            None => false,
331        })
332    }
333}
334
335impl Default for DiatomicWaker {
336    fn default() -> Self {
337        Self::new()
338    }
339}
340
341unsafe impl Send for DiatomicWaker {}
342unsafe impl Sync for DiatomicWaker {}
343
344/// Attempts to acquire the notifier lock and returns the current state upon
345/// success.
346///
347/// Acquisition of the lock will fail in the following cases:
348///
349/// * the `REGISTERED` flag is cleared, meaning that there is no need to wake
350///   and therefore no need to lock,
351/// * the lock is already taken, in which case the `NOTIFICATION` flag will be
352///   set if the `REGISTERED` flag is set.
353///
354/// If acquisition of the lock succeeds, the `REGISTERED` flag is cleared. If
355/// additionally the `UPDATE` flag was set, it is cleared and `INDEX` is
356/// flipped.
357///
358///  Transition table:
359///
360/// |  N  L  R  U  I  |  N  L  R  U  I  |
361/// |-----------------|-----------------|
362/// |  0  0  0  u  i  |  0  0  0  u  i  | (failure)
363/// |  0  0  1  0  i  |  0  1  0  0  i  | (success)
364/// |  0  0  1  1  i  |  0  1  0  0 !i  | (success)
365/// |  0  1  0  u  i  |  0  1  0  u  i  | (failure)
366/// |  n  1  1  u  i  |  1  1  1  u  i  | (failure)
367///
368fn try_lock(state: &AtomicUsize) -> Result<usize, ()> {
369    let mut old_state = state.load(Ordering::Relaxed);
370
371    loop {
372        if old_state & (LOCKED | REGISTERED) == REGISTERED {
373            // Success path.
374
375            // If `UPDATE` is set, clear `UPDATE` and flip `INDEX` with the xor
376            // mask.
377            let update_bit = old_state & UPDATE;
378            let xor_mask = update_bit | (update_bit >> 1);
379
380            // Set `LOCKED` and clear `REGISTERED` with the xor mask.
381            let xor_mask = xor_mask | LOCKED | REGISTERED;
382
383            let new_state = old_state ^ xor_mask;
384
385            // Ordering: Acquire is necessary to synchronize with the Release
386            // ordering in `register` so that the new waker, if any, is visible.
387            // Release ordering synchronizes with the Acquire and AcqRel RMWs in
388            // `register` and ensures that either the predicate set before the
389            // call to `notify` will be visible after the call to `register`, or
390            // the registered waker will be visible during the call to `notify`
391            // (or both).
392            match state.compare_exchange_weak(
393                old_state,
394                new_state,
395                Ordering::AcqRel,
396                Ordering::Relaxed,
397            ) {
398                Ok(_) => return Ok(new_state),
399                Err(s) => old_state = s,
400            }
401        } else {
402            // Failure path.
403
404            // Set the `NOTIFICATION` bit if `REGISTERED` was set.
405            let registered_bit = old_state & REGISTERED;
406            let new_state = old_state | (registered_bit << 2);
407
408            // Ordering: Release ordering synchronizes with the Acquire and
409            // AcqRel RMWs in `register` and ensures that either the predicate
410            // set before the call to `notify` will be visible after the call to
411            // `register`, or the registered waker will be visible during the
412            // call to `notify` (or both).
413            match state.compare_exchange_weak(
414                old_state,
415                new_state,
416                Ordering::Release,
417                Ordering::Relaxed,
418            ) {
419                Ok(_) => return Err(()),
420                Err(s) => old_state = s,
421            }
422        };
423    }
424}
425
426/// Attempts to release the notifier lock and returns the current state upon
427/// failure.
428///
429/// Release of the lock will fail if the `NOTIFICATION` flag is set because it
430/// means that, after the lock was taken, the registering thread has requested
431/// to be notified again and another notifier has subsequently requested that
432/// such notification be sent on its behalf; if additionally the `UPDATE` flag
433/// was set (i.e. a new waker is available), it is cleared and `INDEX` is
434/// flipped.
435///
436/// Transition table:
437///
438/// |  N  L  R  U  I  |  N  L  R  U  I  |
439/// |-----------------|-----------------|
440/// |  0  1  r  u  i  |  0  0  r  u  i  | (success)
441/// |  1  1  1  0  i  |  0  1  0  0  i  | (failure)
442/// |  1  1  1  1  i  |  0  1  0  0 !i  | (failure)
443///
444fn try_unlock(state: &AtomicUsize, mut old_state: usize) -> Result<(), usize> {
445    loop {
446        if old_state & NOTIFICATION == 0 {
447            // Success path.
448
449            let new_state = old_state & !LOCKED;
450
451            // Ordering: Release is necessary to synchronize with the Acquire
452            // ordering in `register` and ensure that the waker call has
453            // completed before a new waker is stored.
454            match state.compare_exchange_weak(
455                old_state,
456                new_state,
457                Ordering::Release,
458                Ordering::Relaxed,
459            ) {
460                Ok(_) => return Ok(()),
461                Err(s) => old_state = s,
462            }
463        } else {
464            // Failure path.
465
466            // If `UPDATE` is set, clear `UPDATE` and flip `INDEX` with the xor mask.
467            let update_bit = old_state & UPDATE;
468            let xor_mask = update_bit | (update_bit >> 1);
469
470            // Clear `NOTIFICATION` and `REGISTERED` with the xor mask.
471            let xor_mask = xor_mask | NOTIFICATION | REGISTERED;
472
473            let new_state = old_state ^ xor_mask;
474
475            // Ordering: Release is necessary to synchronize with the Acquire
476            // ordering in `register` and ensure that the call to
477            // `Waker::wake_by_ref` has completed before a new waker is stored.
478            // Acquire ordering is in turn necessary to ensure that any newly
479            // registered waker is visible.
480            match state.compare_exchange_weak(
481                old_state,
482                new_state,
483                Ordering::AcqRel,
484                Ordering::Relaxed,
485            ) {
486                Ok(_) => return Err(new_state),
487                Err(s) => old_state = s,
488            }
489        };
490    }
491}
492
493/// A future that can be `await`ed until a predicate is satisfied.
494#[derive(Debug)]
495pub struct WaitUntil<'a, P, T>
496where
497    P: FnMut() -> Option<T>,
498{
499    predicate: P,
500    wake: &'a DiatomicWaker,
501}
502
503impl<'a, P, T> WaitUntil<'a, P, T>
504where
505    P: FnMut() -> Option<T>,
506{
507    /// Creates a future associated to the specified wake that can be `await`ed
508    /// until the specified predicate is satisfied.
509    fn new(wake: &'a DiatomicWaker, predicate: P) -> Self {
510        Self { predicate, wake }
511    }
512}
513
514impl<P: FnMut() -> Option<T>, T> Unpin for WaitUntil<'_, P, T> {}
515
516impl<'a, P, T> Future for WaitUntil<'a, P, T>
517where
518    P: FnMut() -> Option<T>,
519{
520    type Output = T;
521
522    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
523        // Safety: the safety of this method is contingent on the safety of the
524        // `register` and `unregister` methods. Since a `WaitUntil` future can
525        // only be created from the unsafe `wait_until` method, however, the
526        // user must uphold the contract that `register`, `unregister` and
527        // `wait_until` cannot be used concurrently from multiple threads.
528        unsafe {
529            if let Some(value) = (self.predicate)() {
530                return Poll::Ready(value);
531            }
532            self.wake.register(cx.waker());
533
534            if let Some(value) = (self.predicate)() {
535                self.wake.unregister();
536                return Poll::Ready(value);
537            }
538        }
539
540        Poll::Pending
541    }
542}