maniac_runtime/future/waker/
waker.rs

1use core::future::Future;
2use core::pin::Pin;
3use core::sync::atomic::Ordering;
4use core::task::{Context, Poll, Waker};
5
6use crate::future::waker::WakeSinkRef;
7use crate::future::waker::loom_exports::cell::UnsafeCell;
8use crate::future::waker::loom_exports::sync::atomic::AtomicUsize;
9
10// The state of the waker is tracked by the following bit flags:
11//
12// * INDEX [I]: slot index of the current waker, if any (0 or 1),
13// * UPDATE [U]: an updated waker has been registered in the redundant slot at
14//   index 1 - INDEX,
15// * REGISTERED [R]: a waker is registered and awaits a notification
16// * LOCKED [L]: a notifier has taken the notifier lock and is in the process of
17//   sending a notification,
18// * NOTIFICATION [N]: a notifier has failed to take the lock when a waker was
19//   registered and has requested the notifier holding the lock to send a
20//   notification on its behalf (implies REGISTERED and LOCKED).
21//
22// The waker stored in the slot at INDEX ("current" waker) is shared between the
23// sink (entity which registers wakers) and the source that holds the notifier
24// lock (if any). For this reason, this waker may only be accessed by shared
25// reference. The waker at 1 - INDEX is exclusively owned by the sink, which is
26// free to mutate it.
27
28// Summary of valid states:
29//
30// |  N   L   R   U   I  |
31// |---------------------|
32// |  0  any any any any |
33// |  1   1   1  any any |
34
35// [I] Index of the current waker (0 or 1).
36const INDEX: usize = 0b00001;
37// [U] Indicates that an updated waker is available at 1 - INDEX.
38const UPDATE: usize = 0b00010;
39// [R] Indicates that a waker has been registered.
40const REGISTERED: usize = 0b00100;
41// [L] Indicates that a notifier holds the notifier lock to the waker at INDEX.
42const LOCKED: usize = 0b01000;
43// [N] Indicates that a notifier has failed to acquire the lock and has
44// requested the notifier holding the lock to notify on its behalf.
45const NOTIFICATION: usize = 0b10000;
46
47/// A primitive that can send or await notifications.
48///
49/// It is almost always preferable to use the [`WakeSink`](crate::WakeSink) and
50/// [`WakeSource`](crate::WakeSource) which offer more convenience at the cost
51/// of an allocation in an `Arc`.
52///
53/// If allocation is not possible or desirable, the
54/// [`sink_ref`](DiatomicWaker::sink_ref) method can be used to create a
55/// [`WakeSinkRef`] handle and one or more
56/// [`WakeSourceRef`](crate::borrowed_waker::WakeSourceRef)s, the non-owned
57/// counterparts to `WakeSink` and `WakeSource`.
58///
59/// Finally, `DiatomicWaker` exposes `unsafe` methods that can be used to create
60/// custom synchronization primitives.
61#[derive(Debug)]
62pub struct DiatomicWaker {
63    /// A bit field for `INDEX`, `UPDATE`, `REGISTERED`, `LOCKED` and `NOTIFICATION`.
64    state: AtomicUsize,
65    /// Redundant slots for the waker.
66    waker: [UnsafeCell<Option<Waker>>; 2],
67}
68
69impl DiatomicWaker {
70    /// Creates a new `DiatomicWaker`.
71    #[cfg(not(all(test, diatomic_waker_loom)))]
72    pub const fn new() -> Self {
73        Self {
74            state: AtomicUsize::new(0),
75            waker: [UnsafeCell::new(None), UnsafeCell::new(None)],
76        }
77    }
78
79    #[cfg(all(test, diatomic_waker_loom))]
80    pub fn new() -> Self {
81        Self {
82            state: AtomicUsize::new(0),
83            waker: [UnsafeCell::new(None), UnsafeCell::new(None)],
84        }
85    }
86
87    /// Returns a sink with a lifetime bound to this `DiatomicWaker`.
88    ///
89    /// This mutably borrows the waker, thus ensuring that at most one
90    /// associated sink can be active at a time.
91    pub fn sink_ref(&mut self) -> WakeSinkRef<'_> {
92        WakeSinkRef { inner: self }
93    }
94
95    /// Sends a notification if a waker is registered.
96    ///
97    /// This automatically unregisters any waker that may have been previously
98    /// registered.
99    pub fn notify(&self) {
100        // Fast path: check if anyone is actually waiting.
101        // If the REGISTERED bit is not set, there's no waker to notify.
102        // We can safely return without attempting to acquire the lock.
103        //
104        // Ordering: Acquire is sufficient here because:
105        // - We're only checking if work needs to be done
106        // - If REGISTERED is set, try_lock() will re-load with proper ordering
107        // - If we miss a concurrent registration due to reordering, the registering
108        //   thread will either succeed (and we'll notify on next call) or will set
109        //   NOTIFICATION bit for us to handle
110        let state = self.state.load(Ordering::Acquire);
111        if (state & REGISTERED) == 0 {
112            return;
113        }
114
115        // Slow path: someone is waiting, acquire the lock and notify.
116        // Transitions: see `try_lock` and `try_unlock`.
117
118        let mut state = if let Ok(s) = try_lock(&self.state) {
119            s
120        } else {
121            return;
122        };
123
124        loop {
125            let idx = state & INDEX;
126
127            // Safety: the notifier lock has been acquired, which guarantees
128            // exclusive access to the waker at `INDEX`.
129            unsafe {
130                self.wake_by_ref(idx);
131            }
132
133            if let Err(s) = try_unlock(&self.state, state) {
134                state = s;
135            } else {
136                return;
137            }
138
139            // One more loop iteration is necessary because the waker was
140            // registered again and another notifier has failed to send a
141            // notification while the notifier lock was taken.
142        }
143    }
144
145    /// Registers a new waker.
146    ///
147    /// Registration is lazy: the waker is cloned only if it differs from the
148    /// last registered waker (note that the last registered waker is cached
149    /// even if it was unregistered).
150    ///
151    /// # Safety
152    ///
153    /// The `register`, `unregister` and `wait_until` methods cannot be used
154    /// concurrently from multiple threads.
155    pub unsafe fn register(&self, waker: &Waker) {
156        // Transitions if the new waker is the same as the one currently stored.
157        //
158        // |  N  L  R  U  I  |  N  L  R  U  I  |
159        // |-----------------|-----------------|
160        // |  n  l  r  u  i  |  n  l  1  u  i  |
161        //
162        //
163        // Transitions if the new waker needs to be stored:
164        //
165        // Step 1 (only necessary if the state initially indicates R=U=1):
166        //
167        // |  N  L  R  U  I  |  N  L  R  U  I  |
168        // |-----------------|-----------------|
169        // |  n  l  r  u  i  |  0  l  0  u  i  |
170        //
171        // Step 2:
172        //
173        // |  N  L  R  U  I  |  N  L  R  U  I  |
174        // |-----------------|-----------------|
175        // |  n  l  r  u  i  |  n  l  1  1  i  |
176
177        // Fast path: check if we're already registered with the same waker.
178        // This is a common case during steady-state polling where the same
179        // task repeatedly checks readiness with the same waker.
180        //
181        // Ordering: Acquire is sufficient for this check because:
182        // - If we see REGISTERED=1 and the waker is the same, we can skip the fetch_or
183        // - If state is stale, we'll fall through to the slow path which re-checks
184        // - The slow path uses proper Acquire ordering for synchronization
185        let fast_state = self.state.load(Ordering::Acquire);
186        if (fast_state & REGISTERED) != 0 {
187            // Already registered, check if it's the same waker
188            let idx = fast_state & INDEX;
189            let recent_idx = if fast_state & UPDATE == 0 {
190                idx
191            } else {
192                idx ^ INDEX // XOR is faster than subtraction
193            };
194
195            if unsafe { self.will_wake(recent_idx, waker) } {
196                // Same waker already registered, nothing to do.
197                // No need to fetch_or(REGISTERED) since it's already set.
198                return;
199            }
200        }
201
202        // Slow path: need to register a new waker or update registration.
203        // Ordering: Acquire ordering is necessary to synchronize with the
204        // Release unlocking operation in `notify`, which ensures that all calls
205        // to the waker in the redundant slot have completed.
206        let state = self.state.load(Ordering::Acquire);
207
208        // Compute the index of the waker that was most recently updated. Note
209        // that the value of `recent_idx` as computed below remains correct even
210        // if the state is stale since only this thread can store new wakers.
211        let mut idx = state & INDEX;
212        let recent_idx = if state & UPDATE == 0 {
213            idx
214        } else {
215            idx ^ INDEX // XOR is faster than subtraction
216        };
217
218        // Safety: it is safe to call `will_wake` since the registering thread
219        // is the only one allowed to mutate the wakers so there can be no
220        // concurrent mutable access to the waker.
221        let is_up_to_date = unsafe { self.will_wake(recent_idx, waker) };
222
223        // Fast path in case the waker is up to date.
224        if is_up_to_date {
225            // Set the `REGISTERED` flag. Ideally, the `NOTIFICATION` flag would
226            // be cleared at the same time to avoid a spurious wake-up, but it
227            // probably isn't worth the overhead of a CAS loop because having
228            // this flag set when calling `register` is very unlikely: it would
229            // mean that since the last call to `register`:
230            // 1) a notifier has been holding the lock continuously,
231            // 2) another notifier has tried and failed to take the lock, and
232            // 3) `unregister` was never called.
233            //
234            // Ordering: Acquire ordering synchronizes with the Release and
235            // AcqRel RMWs in `try_lock` (called by `notify`) and ensures that
236            // either the predicate set before the call to `notify` will be
237            // visible after the call to `register`, or the registered waker
238            // will be visible during the call to `notify` (or both). Note that
239            // Release ordering is not necessary since the waker has not changed
240            // and this RMW takes part in a release sequence headed by the
241            // initial registration of the waker.
242            self.state.fetch_or(REGISTERED, Ordering::Acquire);
243
244            return;
245        }
246
247        // The waker needs to be stored in the redundant slot.
248        //
249        // It is necessary to make sure that either the `UPDATE` or the
250        // `REGISTERED` flag is cleared to prevent concurrent access by a notifier
251        // to the redundant waker slot while the waker is updated.
252        //
253        // Note that only the thread registering the waker can set `REGISTERED`
254        // and `UPDATE` so even if the state is stale, observing `REGISTERED` or
255        // `UPDATE` as cleared guarantees that such flag is and will remain
256        // cleared until this thread sets them.
257        if state & (UPDATE | REGISTERED) == (UPDATE | REGISTERED) {
258            // Clear the `REGISTERED` and `NOTIFICATION` flags.
259            //
260            // Ordering: Acquire ordering is necessary to synchronize with the
261            // Release unlocking operation in `notify`, which ensures that all
262            // calls to the waker in the redundant slot have completed.
263            let state = self
264                .state
265                .fetch_and(!(REGISTERED | NOTIFICATION), Ordering::Acquire);
266
267            // It is possible that `UPDATE` was cleared and `INDEX` was switched
268            // by a notifier after the initial load of the state, so the waker
269            // index needs to be updated.
270            idx = state & INDEX;
271        }
272
273        // Always store the new waker in the redundant slot to avoid racing with
274        // a notifier.
275        let redundant_idx = idx ^ INDEX; // XOR is faster than subtraction (1 - idx)
276
277        // Store the new waker.
278        //
279        // Safety: it is safe to store the new waker in the redundant slot
280        // because the `REGISTERED` flag and/or the `UPDATE` flag are/is cleared
281        // so the notifier will not attempt to switch the waker.
282        unsafe { self.set_waker(redundant_idx, waker.clone()) };
283
284        // Make the waker visible.
285        //
286        // Ordering: Acquire ordering synchronizes with the Release and AcqRel
287        // RMWs in `try_lock` (called by `notify`) and ensures that either the
288        // predicate set before the call to `notify` will be visible after the
289        // call to `register`, or the registered waker will be visible during
290        // the call to `notify` (or both). Since the waker has been modified
291        // above, Release ordering is also necessary to synchronize with the
292        // AcqRel RMW in `try_lock` (success case) and ensure that the
293        // modification to the waker is fully visible when notifying.
294        self.state.fetch_or(UPDATE | REGISTERED, Ordering::AcqRel);
295    }
296
297    /// Unregisters the waker.
298    ///
299    /// After the waker is unregistered, subsequent calls to `notify` will be
300    /// ignored.
301    ///
302    /// Note that the previously-registered waker (if any) remains cached.
303    ///
304    /// # Safety
305    ///
306    /// The `register`, `unregister` and `wait_until` methods cannot be used
307    /// concurrently from multiple threads.
308    pub unsafe fn unregister(&self) {
309        // Transitions:
310        //
311        // |  N  L  R  U  I  |  N  L  R  U  I  |
312        // |-----------------|-----------------|
313        // |  n  l  r  u  i  |  0  l  0  u  i  |
314
315        // Modify the state. Note that the waker is not dropped: caching it can
316        // avoid a waker drop/cloning cycle (typically, 2 RMWs) in the frequent
317        // case when the next waker to be registered will be the same as the one
318        // being unregistered.
319        //
320        // Ordering: no waker was modified so Release ordering is sufficient.
321        self.state
322            .fetch_and(!(REGISTERED | NOTIFICATION), Ordering::Release);
323    }
324
325    /// Returns a future that can be `await`ed until the provided predicate
326    /// returns a value.
327    ///
328    /// The predicate is checked each time a notification is received.
329    ///
330    /// # Safety
331    ///
332    /// The `register`, `unregister` and `wait_until` methods cannot be used
333    /// concurrently from multiple threads.
334    pub unsafe fn wait_until<P, T>(&self, predicate: P) -> WaitUntil<'_, P, T>
335    where
336        P: FnMut() -> Option<T>,
337    {
338        WaitUntil::new(self, predicate)
339    }
340
341    /// Sets the waker at index `idx`.
342    ///
343    /// # Safety
344    ///
345    /// The caller must have exclusive access to the waker at index `idx`.
346    unsafe fn set_waker(&self, idx: usize, new: Waker) {
347        unsafe {
348            self.waker[idx].with_mut(|waker| *waker = Some(new));
349        }
350    }
351
352    /// Notify the waker at index `idx`.
353    ///
354    /// # Safety
355    ///
356    /// The waker at index `idx` cannot be modified concurrently.
357    unsafe fn wake_by_ref(&self, idx: usize) {
358        self.waker[idx].with(|waker| {
359            if let Some(waker) = unsafe { &*waker } {
360                waker.wake_by_ref();
361            }
362        });
363    }
364
365    /// Check whether the waker at index `idx` will wake the same task as the
366    /// provided waker.
367    ///
368    /// # Safety
369    ///
370    /// The waker at index `idx` cannot be modified concurrently.
371    unsafe fn will_wake(&self, idx: usize, other: &Waker) -> bool {
372        self.waker[idx].with(|waker| match unsafe { &*waker } {
373            Some(waker) => waker.will_wake(other),
374            None => false,
375        })
376    }
377}
378
379impl Default for DiatomicWaker {
380    fn default() -> Self {
381        Self::new()
382    }
383}
384
385unsafe impl Send for DiatomicWaker {}
386unsafe impl Sync for DiatomicWaker {}
387
388/// Attempts to acquire the notifier lock and returns the current state upon
389/// success.
390///
391/// Acquisition of the lock will fail in the following cases:
392///
393/// * the `REGISTERED` flag is cleared, meaning that there is no need to wake
394///   and therefore no need to lock,
395/// * the lock is already taken, in which case the `NOTIFICATION` flag will be
396///   set if the `REGISTERED` flag is set.
397///
398/// If acquisition of the lock succeeds, the `REGISTERED` flag is cleared. If
399/// additionally the `UPDATE` flag was set, it is cleared and `INDEX` is
400/// flipped.
401///
402///  Transition table:
403///
404/// |  N  L  R  U  I  |  N  L  R  U  I  |
405/// |-----------------|-----------------|
406/// |  0  0  0  u  i  |  0  0  0  u  i  | (failure)
407/// |  0  0  1  0  i  |  0  1  0  0  i  | (success)
408/// |  0  0  1  1  i  |  0  1  0  0 !i  | (success)
409/// |  0  1  0  u  i  |  0  1  0  u  i  | (failure)
410/// |  n  1  1  u  i  |  1  1  1  u  i  | (failure)
411///
412fn try_lock(state: &AtomicUsize) -> Result<usize, ()> {
413    let mut old_state = state.load(Ordering::Acquire);
414
415    loop {
416        // Early exit: if no waker is registered, no need to acquire lock
417        if (old_state & REGISTERED) == 0 {
418            return Err(());
419        }
420
421        if old_state & (LOCKED | REGISTERED) == REGISTERED {
422            // Success path.
423
424            // If `UPDATE` is set, clear `UPDATE` and flip `INDEX` with the xor mask.
425            // Then set `LOCKED` and clear `REGISTERED`.
426            // Combine all operations into a single xor_mask calculation.
427            let update_bit = old_state & UPDATE;
428            let xor_mask = update_bit | (update_bit >> 1) | LOCKED | REGISTERED;
429
430            let new_state = old_state ^ xor_mask;
431
432            // Ordering: Acquire is necessary to synchronize with the Release
433            // ordering in `register` so that the new waker, if any, is visible.
434            // Release ordering synchronizes with the Acquire and AcqRel RMWs in
435            // `register` and ensures that either the predicate set before the
436            // call to `notify` will be visible after the call to `register`, or
437            // the registered waker will be visible during the call to `notify`
438            // (or both).
439            match state.compare_exchange_weak(
440                old_state,
441                new_state,
442                Ordering::AcqRel,
443                Ordering::Relaxed,
444            ) {
445                Ok(_) => return Ok(new_state),
446                Err(s) => old_state = s,
447            }
448        } else {
449            // Failure path.
450
451            // Set the `NOTIFICATION` bit if `REGISTERED` was set.
452            let registered_bit = old_state & REGISTERED;
453            let new_state = old_state | (registered_bit << 2);
454
455            // Ordering: Release ordering synchronizes with the Acquire and
456            // AcqRel RMWs in `register` and ensures that either the predicate
457            // set before the call to `notify` will be visible after the call to
458            // `register`, or the registered waker will be visible during the
459            // call to `notify` (or both).
460            match state.compare_exchange_weak(
461                old_state,
462                new_state,
463                Ordering::Release,
464                Ordering::Relaxed,
465            ) {
466                Ok(_) => return Err(()),
467                Err(s) => old_state = s,
468            }
469        };
470    }
471}
472
473/// Attempts to release the notifier lock and returns the current state upon
474/// failure.
475///
476/// Release of the lock will fail if the `NOTIFICATION` flag is set because it
477/// means that, after the lock was taken, the registering thread has requested
478/// to be notified again and another notifier has subsequently requested that
479/// such notification be sent on its behalf; if additionally the `UPDATE` flag
480/// was set (i.e. a new waker is available), it is cleared and `INDEX` is
481/// flipped.
482///
483/// Transition table:
484///
485/// |  N  L  R  U  I  |  N  L  R  U  I  |
486/// |-----------------|-----------------|
487/// |  0  1  r  u  i  |  0  0  r  u  i  | (success)
488/// |  1  1  1  0  i  |  0  1  0  0  i  | (failure)
489/// |  1  1  1  1  i  |  0  1  0  0 !i  | (failure)
490///
491fn try_unlock(state: &AtomicUsize, mut old_state: usize) -> Result<(), usize> {
492    loop {
493        if old_state & NOTIFICATION == 0 {
494            // Success path.
495
496            let new_state = old_state & !LOCKED;
497
498            // Ordering: Release is necessary to synchronize with the Acquire
499            // ordering in `register` and ensure that the waker call has
500            // completed before a new waker is stored.
501            match state.compare_exchange_weak(
502                old_state,
503                new_state,
504                Ordering::Release,
505                Ordering::Relaxed,
506            ) {
507                Ok(_) => return Ok(()),
508                Err(s) => old_state = s,
509            }
510        } else {
511            // Failure path.
512
513            // If `UPDATE` is set, clear `UPDATE` and flip `INDEX` with the xor mask.
514            // Then clear `NOTIFICATION` and `REGISTERED`.
515            // Combine all operations into a single xor_mask calculation.
516            let update_bit = old_state & UPDATE;
517            let xor_mask = update_bit | (update_bit >> 1) | NOTIFICATION | REGISTERED;
518
519            let new_state = old_state ^ xor_mask;
520
521            // Ordering: Release is necessary to synchronize with the Acquire
522            // ordering in `register` and ensure that the call to
523            // `Waker::wake_by_ref` has completed before a new waker is stored.
524            // Acquire ordering is in turn necessary to ensure that any newly
525            // registered waker is visible.
526            match state.compare_exchange_weak(
527                old_state,
528                new_state,
529                Ordering::AcqRel,
530                Ordering::Relaxed,
531            ) {
532                Ok(_) => return Err(new_state),
533                Err(s) => old_state = s,
534            }
535        };
536    }
537}
538
539/// A future that can be `await`ed until a predicate is satisfied.
540#[derive(Debug)]
541pub struct WaitUntil<'a, P, T>
542where
543    P: FnMut() -> Option<T>,
544{
545    predicate: P,
546    wake: &'a DiatomicWaker,
547}
548
549impl<'a, P, T> WaitUntil<'a, P, T>
550where
551    P: FnMut() -> Option<T>,
552{
553    /// Creates a future associated to the specified wake that can be `await`ed
554    /// until the specified predicate is satisfied.
555    fn new(wake: &'a DiatomicWaker, predicate: P) -> Self {
556        Self { predicate, wake }
557    }
558}
559
560impl<P: FnMut() -> Option<T>, T> Unpin for WaitUntil<'_, P, T> {}
561
562impl<'a, P, T> Future for WaitUntil<'a, P, T>
563where
564    P: FnMut() -> Option<T>,
565{
566    type Output = T;
567
568    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
569        // Safety: the safety of this method is contingent on the safety of the
570        // `register` and `unregister` methods. Since a `WaitUntil` future can
571        // only be created from the unsafe `wait_until` method, however, the
572        // user must uphold the contract that `register`, `unregister` and
573        // `wait_until` cannot be used concurrently from multiple threads.
574        unsafe {
575            if let Some(value) = (self.predicate)() {
576                return Poll::Ready(value);
577            }
578            self.wake.register(cx.waker());
579
580            if let Some(value) = (self.predicate)() {
581                self.wake.unregister();
582                return Poll::Ready(value);
583            }
584        }
585
586        Poll::Pending
587    }
588}