maniac_runtime/future/waker/waker.rs
1use core::future::Future;
2use core::pin::Pin;
3use core::sync::atomic::Ordering;
4use core::task::{Context, Poll, Waker};
5
6use crate::future::waker::WakeSinkRef;
7use crate::future::waker::loom_exports::cell::UnsafeCell;
8use crate::future::waker::loom_exports::sync::atomic::AtomicUsize;
9
10// The state of the waker is tracked by the following bit flags:
11//
12// * INDEX [I]: slot index of the current waker, if any (0 or 1),
13// * UPDATE [U]: an updated waker has been registered in the redundant slot at
14// index 1 - INDEX,
15// * REGISTERED [R]: a waker is registered and awaits a notification
16// * LOCKED [L]: a notifier has taken the notifier lock and is in the process of
17// sending a notification,
18// * NOTIFICATION [N]: a notifier has failed to take the lock when a waker was
19// registered and has requested the notifier holding the lock to send a
20// notification on its behalf (implies REGISTERED and LOCKED).
21//
22// The waker stored in the slot at INDEX ("current" waker) is shared between the
23// sink (entity which registers wakers) and the source that holds the notifier
24// lock (if any). For this reason, this waker may only be accessed by shared
25// reference. The waker at 1 - INDEX is exclusively owned by the sink, which is
26// free to mutate it.
27
28// Summary of valid states:
29//
30// | N L R U I |
31// |---------------------|
32// | 0 any any any any |
33// | 1 1 1 any any |
34
35// [I] Index of the current waker (0 or 1).
36const INDEX: usize = 0b00001;
37// [U] Indicates that an updated waker is available at 1 - INDEX.
38const UPDATE: usize = 0b00010;
39// [R] Indicates that a waker has been registered.
40const REGISTERED: usize = 0b00100;
41// [L] Indicates that a notifier holds the notifier lock to the waker at INDEX.
42const LOCKED: usize = 0b01000;
43// [N] Indicates that a notifier has failed to acquire the lock and has
44// requested the notifier holding the lock to notify on its behalf.
45const NOTIFICATION: usize = 0b10000;
46
47/// A primitive that can send or await notifications.
48///
49/// It is almost always preferable to use the [`WakeSink`](crate::WakeSink) and
50/// [`WakeSource`](crate::WakeSource) which offer more convenience at the cost
51/// of an allocation in an `Arc`.
52///
53/// If allocation is not possible or desirable, the
54/// [`sink_ref`](DiatomicWaker::sink_ref) method can be used to create a
55/// [`WakeSinkRef`] handle and one or more
56/// [`WakeSourceRef`](crate::borrowed_waker::WakeSourceRef)s, the non-owned
57/// counterparts to `WakeSink` and `WakeSource`.
58///
59/// Finally, `DiatomicWaker` exposes `unsafe` methods that can be used to create
60/// custom synchronization primitives.
61#[derive(Debug)]
62pub struct DiatomicWaker {
63 /// A bit field for `INDEX`, `UPDATE`, `REGISTERED`, `LOCKED` and `NOTIFICATION`.
64 state: AtomicUsize,
65 /// Redundant slots for the waker.
66 waker: [UnsafeCell<Option<Waker>>; 2],
67}
68
69impl DiatomicWaker {
70 /// Creates a new `DiatomicWaker`.
71 #[cfg(not(all(test, diatomic_waker_loom)))]
72 pub const fn new() -> Self {
73 Self {
74 state: AtomicUsize::new(0),
75 waker: [UnsafeCell::new(None), UnsafeCell::new(None)],
76 }
77 }
78
79 #[cfg(all(test, diatomic_waker_loom))]
80 pub fn new() -> Self {
81 Self {
82 state: AtomicUsize::new(0),
83 waker: [UnsafeCell::new(None), UnsafeCell::new(None)],
84 }
85 }
86
87 /// Returns a sink with a lifetime bound to this `DiatomicWaker`.
88 ///
89 /// This mutably borrows the waker, thus ensuring that at most one
90 /// associated sink can be active at a time.
91 pub fn sink_ref(&mut self) -> WakeSinkRef<'_> {
92 WakeSinkRef { inner: self }
93 }
94
95 /// Sends a notification if a waker is registered.
96 ///
97 /// This automatically unregisters any waker that may have been previously
98 /// registered.
99 pub fn notify(&self) {
100 // Fast path: check if anyone is actually waiting.
101 // If the REGISTERED bit is not set, there's no waker to notify.
102 // We can safely return without attempting to acquire the lock.
103 //
104 // Ordering: Acquire is sufficient here because:
105 // - We're only checking if work needs to be done
106 // - If REGISTERED is set, try_lock() will re-load with proper ordering
107 // - If we miss a concurrent registration due to reordering, the registering
108 // thread will either succeed (and we'll notify on next call) or will set
109 // NOTIFICATION bit for us to handle
110 let state = self.state.load(Ordering::Acquire);
111 if (state & REGISTERED) == 0 {
112 return;
113 }
114
115 // Slow path: someone is waiting, acquire the lock and notify.
116 // Transitions: see `try_lock` and `try_unlock`.
117
118 let mut state = if let Ok(s) = try_lock(&self.state) {
119 s
120 } else {
121 return;
122 };
123
124 loop {
125 let idx = state & INDEX;
126
127 // Safety: the notifier lock has been acquired, which guarantees
128 // exclusive access to the waker at `INDEX`.
129 unsafe {
130 self.wake_by_ref(idx);
131 }
132
133 if let Err(s) = try_unlock(&self.state, state) {
134 state = s;
135 } else {
136 return;
137 }
138
139 // One more loop iteration is necessary because the waker was
140 // registered again and another notifier has failed to send a
141 // notification while the notifier lock was taken.
142 }
143 }
144
145 /// Registers a new waker.
146 ///
147 /// Registration is lazy: the waker is cloned only if it differs from the
148 /// last registered waker (note that the last registered waker is cached
149 /// even if it was unregistered).
150 ///
151 /// # Safety
152 ///
153 /// The `register`, `unregister` and `wait_until` methods cannot be used
154 /// concurrently from multiple threads.
155 pub unsafe fn register(&self, waker: &Waker) {
156 // Transitions if the new waker is the same as the one currently stored.
157 //
158 // | N L R U I | N L R U I |
159 // |-----------------|-----------------|
160 // | n l r u i | n l 1 u i |
161 //
162 //
163 // Transitions if the new waker needs to be stored:
164 //
165 // Step 1 (only necessary if the state initially indicates R=U=1):
166 //
167 // | N L R U I | N L R U I |
168 // |-----------------|-----------------|
169 // | n l r u i | 0 l 0 u i |
170 //
171 // Step 2:
172 //
173 // | N L R U I | N L R U I |
174 // |-----------------|-----------------|
175 // | n l r u i | n l 1 1 i |
176
177 // Fast path: check if we're already registered with the same waker.
178 // This is a common case during steady-state polling where the same
179 // task repeatedly checks readiness with the same waker.
180 //
181 // Ordering: Acquire is sufficient for this check because:
182 // - If we see REGISTERED=1 and the waker is the same, we can skip the fetch_or
183 // - If state is stale, we'll fall through to the slow path which re-checks
184 // - The slow path uses proper Acquire ordering for synchronization
185 let fast_state = self.state.load(Ordering::Acquire);
186 if (fast_state & REGISTERED) != 0 {
187 // Already registered, check if it's the same waker
188 let idx = fast_state & INDEX;
189 let recent_idx = if fast_state & UPDATE == 0 {
190 idx
191 } else {
192 idx ^ INDEX // XOR is faster than subtraction
193 };
194
195 if unsafe { self.will_wake(recent_idx, waker) } {
196 // Same waker already registered, nothing to do.
197 // No need to fetch_or(REGISTERED) since it's already set.
198 return;
199 }
200 }
201
202 // Slow path: need to register a new waker or update registration.
203 // Ordering: Acquire ordering is necessary to synchronize with the
204 // Release unlocking operation in `notify`, which ensures that all calls
205 // to the waker in the redundant slot have completed.
206 let state = self.state.load(Ordering::Acquire);
207
208 // Compute the index of the waker that was most recently updated. Note
209 // that the value of `recent_idx` as computed below remains correct even
210 // if the state is stale since only this thread can store new wakers.
211 let mut idx = state & INDEX;
212 let recent_idx = if state & UPDATE == 0 {
213 idx
214 } else {
215 idx ^ INDEX // XOR is faster than subtraction
216 };
217
218 // Safety: it is safe to call `will_wake` since the registering thread
219 // is the only one allowed to mutate the wakers so there can be no
220 // concurrent mutable access to the waker.
221 let is_up_to_date = unsafe { self.will_wake(recent_idx, waker) };
222
223 // Fast path in case the waker is up to date.
224 if is_up_to_date {
225 // Set the `REGISTERED` flag. Ideally, the `NOTIFICATION` flag would
226 // be cleared at the same time to avoid a spurious wake-up, but it
227 // probably isn't worth the overhead of a CAS loop because having
228 // this flag set when calling `register` is very unlikely: it would
229 // mean that since the last call to `register`:
230 // 1) a notifier has been holding the lock continuously,
231 // 2) another notifier has tried and failed to take the lock, and
232 // 3) `unregister` was never called.
233 //
234 // Ordering: Acquire ordering synchronizes with the Release and
235 // AcqRel RMWs in `try_lock` (called by `notify`) and ensures that
236 // either the predicate set before the call to `notify` will be
237 // visible after the call to `register`, or the registered waker
238 // will be visible during the call to `notify` (or both). Note that
239 // Release ordering is not necessary since the waker has not changed
240 // and this RMW takes part in a release sequence headed by the
241 // initial registration of the waker.
242 self.state.fetch_or(REGISTERED, Ordering::Acquire);
243
244 return;
245 }
246
247 // The waker needs to be stored in the redundant slot.
248 //
249 // It is necessary to make sure that either the `UPDATE` or the
250 // `REGISTERED` flag is cleared to prevent concurrent access by a notifier
251 // to the redundant waker slot while the waker is updated.
252 //
253 // Note that only the thread registering the waker can set `REGISTERED`
254 // and `UPDATE` so even if the state is stale, observing `REGISTERED` or
255 // `UPDATE` as cleared guarantees that such flag is and will remain
256 // cleared until this thread sets them.
257 if state & (UPDATE | REGISTERED) == (UPDATE | REGISTERED) {
258 // Clear the `REGISTERED` and `NOTIFICATION` flags.
259 //
260 // Ordering: Acquire ordering is necessary to synchronize with the
261 // Release unlocking operation in `notify`, which ensures that all
262 // calls to the waker in the redundant slot have completed.
263 let state = self
264 .state
265 .fetch_and(!(REGISTERED | NOTIFICATION), Ordering::Acquire);
266
267 // It is possible that `UPDATE` was cleared and `INDEX` was switched
268 // by a notifier after the initial load of the state, so the waker
269 // index needs to be updated.
270 idx = state & INDEX;
271 }
272
273 // Always store the new waker in the redundant slot to avoid racing with
274 // a notifier.
275 let redundant_idx = idx ^ INDEX; // XOR is faster than subtraction (1 - idx)
276
277 // Store the new waker.
278 //
279 // Safety: it is safe to store the new waker in the redundant slot
280 // because the `REGISTERED` flag and/or the `UPDATE` flag are/is cleared
281 // so the notifier will not attempt to switch the waker.
282 unsafe { self.set_waker(redundant_idx, waker.clone()) };
283
284 // Make the waker visible.
285 //
286 // Ordering: Acquire ordering synchronizes with the Release and AcqRel
287 // RMWs in `try_lock` (called by `notify`) and ensures that either the
288 // predicate set before the call to `notify` will be visible after the
289 // call to `register`, or the registered waker will be visible during
290 // the call to `notify` (or both). Since the waker has been modified
291 // above, Release ordering is also necessary to synchronize with the
292 // AcqRel RMW in `try_lock` (success case) and ensure that the
293 // modification to the waker is fully visible when notifying.
294 self.state.fetch_or(UPDATE | REGISTERED, Ordering::AcqRel);
295 }
296
297 /// Unregisters the waker.
298 ///
299 /// After the waker is unregistered, subsequent calls to `notify` will be
300 /// ignored.
301 ///
302 /// Note that the previously-registered waker (if any) remains cached.
303 ///
304 /// # Safety
305 ///
306 /// The `register`, `unregister` and `wait_until` methods cannot be used
307 /// concurrently from multiple threads.
308 pub unsafe fn unregister(&self) {
309 // Transitions:
310 //
311 // | N L R U I | N L R U I |
312 // |-----------------|-----------------|
313 // | n l r u i | 0 l 0 u i |
314
315 // Modify the state. Note that the waker is not dropped: caching it can
316 // avoid a waker drop/cloning cycle (typically, 2 RMWs) in the frequent
317 // case when the next waker to be registered will be the same as the one
318 // being unregistered.
319 //
320 // Ordering: no waker was modified so Release ordering is sufficient.
321 self.state
322 .fetch_and(!(REGISTERED | NOTIFICATION), Ordering::Release);
323 }
324
325 /// Returns a future that can be `await`ed until the provided predicate
326 /// returns a value.
327 ///
328 /// The predicate is checked each time a notification is received.
329 ///
330 /// # Safety
331 ///
332 /// The `register`, `unregister` and `wait_until` methods cannot be used
333 /// concurrently from multiple threads.
334 pub unsafe fn wait_until<P, T>(&self, predicate: P) -> WaitUntil<'_, P, T>
335 where
336 P: FnMut() -> Option<T>,
337 {
338 WaitUntil::new(self, predicate)
339 }
340
341 /// Sets the waker at index `idx`.
342 ///
343 /// # Safety
344 ///
345 /// The caller must have exclusive access to the waker at index `idx`.
346 unsafe fn set_waker(&self, idx: usize, new: Waker) {
347 unsafe {
348 self.waker[idx].with_mut(|waker| *waker = Some(new));
349 }
350 }
351
352 /// Notify the waker at index `idx`.
353 ///
354 /// # Safety
355 ///
356 /// The waker at index `idx` cannot be modified concurrently.
357 unsafe fn wake_by_ref(&self, idx: usize) {
358 self.waker[idx].with(|waker| {
359 if let Some(waker) = unsafe { &*waker } {
360 waker.wake_by_ref();
361 }
362 });
363 }
364
365 /// Check whether the waker at index `idx` will wake the same task as the
366 /// provided waker.
367 ///
368 /// # Safety
369 ///
370 /// The waker at index `idx` cannot be modified concurrently.
371 unsafe fn will_wake(&self, idx: usize, other: &Waker) -> bool {
372 self.waker[idx].with(|waker| match unsafe { &*waker } {
373 Some(waker) => waker.will_wake(other),
374 None => false,
375 })
376 }
377}
378
379impl Default for DiatomicWaker {
380 fn default() -> Self {
381 Self::new()
382 }
383}
384
385unsafe impl Send for DiatomicWaker {}
386unsafe impl Sync for DiatomicWaker {}
387
388/// Attempts to acquire the notifier lock and returns the current state upon
389/// success.
390///
391/// Acquisition of the lock will fail in the following cases:
392///
393/// * the `REGISTERED` flag is cleared, meaning that there is no need to wake
394/// and therefore no need to lock,
395/// * the lock is already taken, in which case the `NOTIFICATION` flag will be
396/// set if the `REGISTERED` flag is set.
397///
398/// If acquisition of the lock succeeds, the `REGISTERED` flag is cleared. If
399/// additionally the `UPDATE` flag was set, it is cleared and `INDEX` is
400/// flipped.
401///
402/// Transition table:
403///
404/// | N L R U I | N L R U I |
405/// |-----------------|-----------------|
406/// | 0 0 0 u i | 0 0 0 u i | (failure)
407/// | 0 0 1 0 i | 0 1 0 0 i | (success)
408/// | 0 0 1 1 i | 0 1 0 0 !i | (success)
409/// | 0 1 0 u i | 0 1 0 u i | (failure)
410/// | n 1 1 u i | 1 1 1 u i | (failure)
411///
412fn try_lock(state: &AtomicUsize) -> Result<usize, ()> {
413 let mut old_state = state.load(Ordering::Acquire);
414
415 loop {
416 // Early exit: if no waker is registered, no need to acquire lock
417 if (old_state & REGISTERED) == 0 {
418 return Err(());
419 }
420
421 if old_state & (LOCKED | REGISTERED) == REGISTERED {
422 // Success path.
423
424 // If `UPDATE` is set, clear `UPDATE` and flip `INDEX` with the xor mask.
425 // Then set `LOCKED` and clear `REGISTERED`.
426 // Combine all operations into a single xor_mask calculation.
427 let update_bit = old_state & UPDATE;
428 let xor_mask = update_bit | (update_bit >> 1) | LOCKED | REGISTERED;
429
430 let new_state = old_state ^ xor_mask;
431
432 // Ordering: Acquire is necessary to synchronize with the Release
433 // ordering in `register` so that the new waker, if any, is visible.
434 // Release ordering synchronizes with the Acquire and AcqRel RMWs in
435 // `register` and ensures that either the predicate set before the
436 // call to `notify` will be visible after the call to `register`, or
437 // the registered waker will be visible during the call to `notify`
438 // (or both).
439 match state.compare_exchange_weak(
440 old_state,
441 new_state,
442 Ordering::AcqRel,
443 Ordering::Relaxed,
444 ) {
445 Ok(_) => return Ok(new_state),
446 Err(s) => old_state = s,
447 }
448 } else {
449 // Failure path.
450
451 // Set the `NOTIFICATION` bit if `REGISTERED` was set.
452 let registered_bit = old_state & REGISTERED;
453 let new_state = old_state | (registered_bit << 2);
454
455 // Ordering: Release ordering synchronizes with the Acquire and
456 // AcqRel RMWs in `register` and ensures that either the predicate
457 // set before the call to `notify` will be visible after the call to
458 // `register`, or the registered waker will be visible during the
459 // call to `notify` (or both).
460 match state.compare_exchange_weak(
461 old_state,
462 new_state,
463 Ordering::Release,
464 Ordering::Relaxed,
465 ) {
466 Ok(_) => return Err(()),
467 Err(s) => old_state = s,
468 }
469 };
470 }
471}
472
473/// Attempts to release the notifier lock and returns the current state upon
474/// failure.
475///
476/// Release of the lock will fail if the `NOTIFICATION` flag is set because it
477/// means that, after the lock was taken, the registering thread has requested
478/// to be notified again and another notifier has subsequently requested that
479/// such notification be sent on its behalf; if additionally the `UPDATE` flag
480/// was set (i.e. a new waker is available), it is cleared and `INDEX` is
481/// flipped.
482///
483/// Transition table:
484///
485/// | N L R U I | N L R U I |
486/// |-----------------|-----------------|
487/// | 0 1 r u i | 0 0 r u i | (success)
488/// | 1 1 1 0 i | 0 1 0 0 i | (failure)
489/// | 1 1 1 1 i | 0 1 0 0 !i | (failure)
490///
491fn try_unlock(state: &AtomicUsize, mut old_state: usize) -> Result<(), usize> {
492 loop {
493 if old_state & NOTIFICATION == 0 {
494 // Success path.
495
496 let new_state = old_state & !LOCKED;
497
498 // Ordering: Release is necessary to synchronize with the Acquire
499 // ordering in `register` and ensure that the waker call has
500 // completed before a new waker is stored.
501 match state.compare_exchange_weak(
502 old_state,
503 new_state,
504 Ordering::Release,
505 Ordering::Relaxed,
506 ) {
507 Ok(_) => return Ok(()),
508 Err(s) => old_state = s,
509 }
510 } else {
511 // Failure path.
512
513 // If `UPDATE` is set, clear `UPDATE` and flip `INDEX` with the xor mask.
514 // Then clear `NOTIFICATION` and `REGISTERED`.
515 // Combine all operations into a single xor_mask calculation.
516 let update_bit = old_state & UPDATE;
517 let xor_mask = update_bit | (update_bit >> 1) | NOTIFICATION | REGISTERED;
518
519 let new_state = old_state ^ xor_mask;
520
521 // Ordering: Release is necessary to synchronize with the Acquire
522 // ordering in `register` and ensure that the call to
523 // `Waker::wake_by_ref` has completed before a new waker is stored.
524 // Acquire ordering is in turn necessary to ensure that any newly
525 // registered waker is visible.
526 match state.compare_exchange_weak(
527 old_state,
528 new_state,
529 Ordering::AcqRel,
530 Ordering::Relaxed,
531 ) {
532 Ok(_) => return Err(new_state),
533 Err(s) => old_state = s,
534 }
535 };
536 }
537}
538
539/// A future that can be `await`ed until a predicate is satisfied.
540#[derive(Debug)]
541pub struct WaitUntil<'a, P, T>
542where
543 P: FnMut() -> Option<T>,
544{
545 predicate: P,
546 wake: &'a DiatomicWaker,
547}
548
549impl<'a, P, T> WaitUntil<'a, P, T>
550where
551 P: FnMut() -> Option<T>,
552{
553 /// Creates a future associated to the specified wake that can be `await`ed
554 /// until the specified predicate is satisfied.
555 fn new(wake: &'a DiatomicWaker, predicate: P) -> Self {
556 Self { predicate, wake }
557 }
558}
559
560impl<P: FnMut() -> Option<T>, T> Unpin for WaitUntil<'_, P, T> {}
561
562impl<'a, P, T> Future for WaitUntil<'a, P, T>
563where
564 P: FnMut() -> Option<T>,
565{
566 type Output = T;
567
568 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
569 // Safety: the safety of this method is contingent on the safety of the
570 // `register` and `unregister` methods. Since a `WaitUntil` future can
571 // only be created from the unsafe `wait_until` method, however, the
572 // user must uphold the contract that `register`, `unregister` and
573 // `wait_until` cannot be used concurrently from multiple threads.
574 unsafe {
575 if let Some(value) = (self.predicate)() {
576 return Poll::Ready(value);
577 }
578 self.wake.register(cx.waker());
579
580 if let Some(value) = (self.predicate)() {
581 self.wake.unregister();
582 return Poll::Ready(value);
583 }
584 }
585
586 Poll::Pending
587 }
588}