diatomic_waker/lib.rs
1//! Async, fast synchronization primitives for task wakeup.
2//!
3//! `diatomic-waker` is similar to [`atomic-waker`][atomic-waker] in that it
4//! enables concurrent updates and notifications to a wrapped `Waker`. Unlike
5//! the latter, however, it does not use spinlocks[^spinlocks] and is faster, in
6//! particular when the consumer is notified periodically rather than just once.
7//! It can in particular be used as a very fast, single-consumer [eventcount] to
8//! turn a non-blocking data structure into an asynchronous one (see MPSC
9//! channel receiver example).
10//!
11//! The API distinguishes between the entity that registers wakers ([`WakeSink`]
12//! or [`WakeSinkRef`]) and the possibly many entities that notify the waker
13//! ([`WakeSource`]s or [`WakeSourceRef`]s).
14//!
15//! Most users will prefer to use [`WakeSink`] and [`WakeSource`], which readily
16//! store a shared [`DiatomicWaker`] within an `Arc`. You may otherwise elect to
17//! allocate a [`DiatomicWaker`] yourself, but will then need to use the
18//! lifetime-bounded [`WakeSinkRef`] and [`WakeSourceRef`], or ensure by other
19//! means that waker registration is not performed concurrently.
20//!
21//! [atomic-waker]: https://docs.rs/atomic-waker/latest/atomic_waker/
22//! [eventcount]:
23//! https://www.1024cores.net/home/lock-free-algorithms/eventcounts
24//! [^spinlocks]: The implementation of [AtomicWaker][atomic-waker] yields to the
25//! runtime on contention, which is in effect an executor-mediated spinlock.
26//!
27//! # Features flags
28//!
29//! By default, this crate enables the `alloc` feature to provide the owned
30//! [`WakeSink`] and [`WakeSource`]. It can be made `no-std`-compatible by
31//! specifying `default-features = false`.
32//!
33//!
34//! # Examples
35//!
36//! A multi-producer, single-consumer channel of capacity 1 for sending
37//! `NonZeroUsize` values, with an asynchronous receiver:
38//!
39//! ```
40//! use std::num::NonZeroUsize;
41//! use std::sync::atomic::{AtomicUsize, Ordering};
42//! use std::sync::Arc;
43//!
44//! use diatomic_waker::{WakeSink, WakeSource};
45//!
46//! // The sending side of the channel.
47//! #[derive(Clone)]
48//! struct Sender {
49//! wake_src: WakeSource,
50//! value: Arc<AtomicUsize>,
51//! }
52//!
53//! // The receiving side of the channel.
54//! struct Receiver {
55//! wake_sink: WakeSink,
56//! value: Arc<AtomicUsize>,
57//! }
58//!
59//! // Creates an empty channel.
60//! fn channel() -> (Sender, Receiver) {
61//! let value = Arc::new(AtomicUsize::new(0));
62//! let wake_sink = WakeSink::new();
63//! let wake_src = wake_sink.source();
64//!
65//! (
66//! Sender {
67//! wake_src,
68//! value: value.clone(),
69//! },
70//! Receiver { wake_sink, value },
71//! )
72//! }
73//!
74//! impl Sender {
75//! // Sends a value if the channel is empty.
76//! fn try_send(&self, value: NonZeroUsize) -> bool {
77//! let success = self
78//! .value
79//! .compare_exchange(0, value.get(), Ordering::Relaxed, Ordering::Relaxed)
80//! .is_ok();
81//! if success {
82//! self.wake_src.notify()
83//! };
84//!
85//! success
86//! }
87//! }
88//!
89//! impl Receiver {
90//! // Receives a value asynchronously.
91//! async fn recv(&mut self) -> NonZeroUsize {
92//! // Wait until the predicate returns `Some(value)`, i.e. when the atomic
93//! // value becomes non-zero.
94//! self.wake_sink
95//! .wait_until(|| NonZeroUsize::new(self.value.swap(0, Ordering::Relaxed)))
96//! .await
97//! }
98//! }
99//! ```
100//!
101//!
102//! In some case, it may be necessary to use the lower-level
103//! [`register`](WakeSink::register) and [`unregister`](WakeSink::unregister)
104//! methods rather than the [`wait_until`](WakeSink::wait_until) convenience
105//! method.
106//!
107//! This is how the behavior of the above `recv` method could be
108//! reproduced with a hand-coded future:
109//!
110//! ```
111//! use std::future::Future;
112//! # use std::num::NonZeroUsize;
113//! use std::pin::Pin;
114//! # use std::sync::atomic::{AtomicUsize, Ordering};
115//! # use std::sync::Arc;
116//! use std::task::{Context, Poll};
117//! # use diatomic_waker::WakeSink;
118//!
119//! # struct Receiver {
120//! # wake_sink: WakeSink,
121//! # value: Arc<AtomicUsize>,
122//! # }
123//! struct Recv<'a> {
124//! receiver: &'a mut Receiver,
125//! }
126//!
127//! impl Future for Recv<'_> {
128//! type Output = NonZeroUsize;
129//!
130//! fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<NonZeroUsize> {
131//! // Avoid waker registration if a value is readily available.
132//! let value = NonZeroUsize::new(self.receiver.value.swap(0, Ordering::Relaxed));
133//! if let Some(value) = value {
134//! return Poll::Ready(value);
135//! }
136//!
137//! // Register the waker to be polled again once a value is available.
138//! self.receiver.wake_sink.register(cx.waker());
139//!
140//! // Check again after registering the waker to prevent a race condition.
141//! let value = NonZeroUsize::new(self.receiver.value.swap(0, Ordering::Relaxed));
142//! if let Some(value) = value {
143//! // Avoid a spurious wake-up.
144//! self.receiver.wake_sink.unregister();
145//!
146//! return Poll::Ready(value);
147//! }
148//!
149//! Poll::Pending
150//! }
151//! }
152//! ```
153#![warn(missing_docs, missing_debug_implementations, unreachable_pub)]
154#![cfg_attr(not(test), no_std)]
155#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg_hide))]
156
157#[cfg(feature = "alloc")]
158extern crate alloc;
159
160#[cfg(feature = "alloc")]
161mod arc_waker;
162mod borrowed_waker;
163mod loom_exports;
164#[deprecated(
165 since = "0.2.0",
166 note = "items from this module are now available in the root module"
167)]
168pub mod primitives;
169mod waker;
170
171#[cfg(feature = "alloc")]
172pub use arc_waker::{WakeSink, WakeSource};
173pub use borrowed_waker::{WakeSinkRef, WakeSourceRef};
174pub use waker::{DiatomicWaker, WaitUntil};
175
176/// Tests.
177#[cfg(all(test, not(diatomic_waker_loom)))]
178mod tests {
179 use std::sync::atomic::{AtomicBool, Ordering};
180 use std::thread;
181 use std::time::Duration;
182
183 use pollster::block_on;
184
185 use super::*;
186
187 #[test]
188 fn waker_wait_until() {
189 let mut sink = WakeSink::new();
190 let source = sink.source();
191 static FLAG: AtomicBool = AtomicBool::new(false);
192
193 let t1 = thread::spawn(move || {
194 std::thread::sleep(Duration::from_millis(10));
195 source.notify(); // force a spurious notification
196 std::thread::sleep(Duration::from_millis(10));
197 FLAG.store(true, Ordering::Relaxed);
198 source.notify();
199 });
200
201 let t2 = thread::spawn(move || {
202 block_on(sink.wait_until(|| {
203 if FLAG.load(Ordering::Relaxed) {
204 Some(())
205 } else {
206 None
207 }
208 }));
209
210 assert!(FLAG.load(Ordering::Relaxed));
211 });
212
213 t1.join().unwrap();
214 t2.join().unwrap();
215 }
216
217 #[test]
218 fn waker_ref_wait_until() {
219 let mut w = DiatomicWaker::new();
220 let mut sink = w.sink_ref();
221 let source = sink.source_ref();
222 static FLAG: AtomicBool = AtomicBool::new(false);
223
224 thread::scope(|s| {
225 s.spawn(move || {
226 std::thread::sleep(Duration::from_millis(10));
227 source.notify(); // force a spurious notification
228 std::thread::sleep(Duration::from_millis(10));
229 FLAG.store(true, Ordering::Relaxed);
230 source.notify();
231 });
232
233 s.spawn(move || {
234 block_on(sink.wait_until(|| {
235 if FLAG.load(Ordering::Relaxed) {
236 Some(())
237 } else {
238 None
239 }
240 }));
241
242 assert!(FLAG.load(Ordering::Relaxed));
243 });
244 });
245 }
246}
247
248/// Loom tests.
249#[cfg(all(test, diatomic_waker_loom))]
250mod tests {
251 use super::*;
252
253 use core::task::Waker;
254 use std::future::Future;
255 use std::pin::Pin;
256 use std::sync::atomic::Ordering;
257 use std::sync::Arc;
258 use std::task::{Context, Poll};
259
260 use loom::model::Builder;
261 use loom::sync::atomic::{AtomicU32, AtomicUsize};
262 use loom::thread;
263
264 use waker_fn::waker_fn;
265
266 /// A waker factory that registers notifications from the newest waker only.
267 #[derive(Clone, Default)]
268 struct MultiWaker {
269 state: Arc<AtomicU32>,
270 }
271 impl MultiWaker {
272 /// Clears the notification flag and returns the former notification
273 /// status.
274 ///
275 /// This operation has Acquire semantic when a notification is indeed
276 /// present, and Relaxed otherwise. It is therefore appropriate to
277 /// simulate a scheduler receiving a notification as it ensures that all
278 /// memory operations preceding the notification of a task are visible.
279 fn take_notification(&self) -> bool {
280 // Clear the notification flag.
281 let mut state = self.state.load(Ordering::Relaxed);
282 loop {
283 // This is basically a `fetch_or` but with an atomic memory
284 // ordering that depends on the LSB.
285 let notified_stated = state | 1;
286 let unnotified_stated = state & !1;
287 match self.state.compare_exchange_weak(
288 notified_stated,
289 unnotified_stated,
290 Ordering::Acquire,
291 Ordering::Relaxed,
292 ) {
293 Ok(_) => return true,
294 Err(s) => {
295 state = s;
296 if state == unnotified_stated {
297 return false;
298 }
299 }
300 }
301 }
302 }
303
304 /// Clears the notification flag and creates a new waker.
305 fn new_waker(&self) -> Waker {
306 // Increase the epoch and clear the notification flag.
307 let mut state = self.state.load(Ordering::Relaxed);
308 let mut epoch;
309 loop {
310 // Increase the epoch by 2.
311 epoch = (state & !1) + 2;
312 match self.state.compare_exchange_weak(
313 state,
314 epoch,
315 Ordering::Relaxed,
316 Ordering::Relaxed,
317 ) {
318 Ok(_) => break,
319 Err(s) => state = s,
320 }
321 }
322
323 // Create a waker that only notifies if it is the newest waker.
324 let waker_state = self.state.clone();
325 waker_fn(move || {
326 let mut state = waker_state.load(Ordering::Relaxed);
327 loop {
328 let new_state = if state & !1 == epoch {
329 epoch | 1
330 } else {
331 break;
332 };
333 match waker_state.compare_exchange(
334 state,
335 new_state,
336 Ordering::Release,
337 Ordering::Relaxed,
338 ) {
339 Ok(_) => break,
340 Err(s) => state = s,
341 }
342 }
343 })
344 }
345 }
346
347 // A simple counter that can be used to simulate the availability of a
348 // certain number of tokens. In order to model the weakest possible
349 // predicate from the viewpoint of atomic memory ordering, only Relaxed
350 // atomic operations are used.
351 #[derive(Clone, Default)]
352 struct Counter {
353 count: Arc<AtomicUsize>,
354 }
355 impl Counter {
356 fn increment(&self) {
357 self.count.fetch_add(1, Ordering::Relaxed);
358 }
359 fn try_decrement(&self) -> bool {
360 let mut count = self.count.load(Ordering::Relaxed);
361 loop {
362 if count == 0 {
363 return false;
364 }
365 match self.count.compare_exchange(
366 count,
367 count - 1,
368 Ordering::Relaxed,
369 Ordering::Relaxed,
370 ) {
371 Ok(_) => return true,
372 Err(c) => count = c,
373 }
374 }
375 }
376 }
377
378 /// Test whether notifications may be lost.
379 ///
380 /// Make a certain amount of tokens available and notify the sink each time
381 /// a token is made available. Optionally, it is possible to:
382 /// - request that `max_spurious_wake` threads will simulate a spurious
383 /// wake-up,
384 /// - change the waker each time it is polled.
385 ///
386 /// A default preemption bound will be applied if none was specified through
387 /// an environment variable.
388 fn loom_notify(
389 token_count: usize,
390 max_spurious_wake: usize,
391 change_waker: bool,
392 preemption_bound: usize,
393 ) {
394 // Only set the preemption bound if it wasn't already specified via a environment variable.
395 let mut builder = Builder::new();
396 if builder.preemption_bound.is_none() {
397 builder.preemption_bound = Some(preemption_bound);
398 }
399
400 builder.check(move || {
401 let token_counter = Counter::default();
402 let mut wake_sink = WakeSink::new();
403
404 for src_id in 0..token_count {
405 thread::spawn({
406 let token_counter = token_counter.clone();
407 let wake_src = wake_sink.source();
408
409 move || {
410 if src_id < max_spurious_wake {
411 wake_src.notify();
412 }
413 token_counter.increment();
414 wake_src.notify();
415 }
416 });
417 }
418
419 let multi_waker = MultiWaker::default();
420 let mut waker = multi_waker.new_waker();
421 let mut satisfied_predicates_count = 0;
422
423 // Iterate until all tokens are "received".
424 //
425 // Note: the loop does not have any assertion. This is by design:
426 // missed notifications will be caught by Loom with a `Model
427 // exceeded maximum number of branches` error because the spin loop
428 // will then spin forever.
429 while satisfied_predicates_count < token_count {
430 let mut wait_until = wake_sink.wait_until(|| {
431 if token_counter.try_decrement() {
432 Some(())
433 } else {
434 None
435 }
436 });
437
438 // Poll the predicate until it is satisfied.
439 loop {
440 let mut cx = Context::from_waker(&waker);
441 let poll_state = Pin::new(&mut wait_until).poll(&mut cx);
442
443 if poll_state == Poll::Ready(()) {
444 satisfied_predicates_count += 1;
445 break;
446 }
447
448 // Simulate the scheduler by spinning until the next
449 // notification.
450 while !multi_waker.take_notification() {
451 thread::yield_now();
452 }
453
454 if change_waker {
455 waker = multi_waker.new_waker();
456 }
457 }
458 }
459 });
460 }
461
462 #[test]
463 fn loom_notify_two_tokens() {
464 const DEFAULT_PREEMPTION_BOUND: usize = 4;
465
466 loom_notify(2, 0, false, DEFAULT_PREEMPTION_BOUND);
467 }
468
469 #[test]
470 fn loom_notify_two_tokens_one_spurious() {
471 const DEFAULT_PREEMPTION_BOUND: usize = 4;
472
473 loom_notify(2, 1, false, DEFAULT_PREEMPTION_BOUND);
474 }
475
476 #[test]
477 fn loom_notify_two_tokens_change_waker() {
478 const DEFAULT_PREEMPTION_BOUND: usize = 3;
479
480 loom_notify(2, 0, true, DEFAULT_PREEMPTION_BOUND);
481 }
482
483 #[test]
484 fn loom_notify_two_tokens_one_spurious_change_waker() {
485 const DEFAULT_PREEMPTION_BOUND: usize = 3;
486
487 loom_notify(2, 1, true, DEFAULT_PREEMPTION_BOUND);
488 }
489
490 #[test]
491 fn loom_notify_three_tokens() {
492 const DEFAULT_PREEMPTION_BOUND: usize = 2;
493
494 loom_notify(3, 0, false, DEFAULT_PREEMPTION_BOUND);
495 }
496
497 #[test]
498 /// Test whether concurrent read and write access to the waker is possible.
499 ///
500 /// 3 different wakers are registered to force a waker slot to be re-used.
501 fn loom_waker_slot_reuse() {
502 // This tests require a high preemption bound to catch typical atomic
503 // memory ordering mistakes.
504 const DEFAULT_PREEMPTION_BOUND: usize = 5;
505
506 // Only set the preemption bound if it wasn't already specified via a
507 // environment variable.
508 let mut builder = Builder::new();
509 if builder.preemption_bound.is_none() {
510 builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
511 }
512
513 builder.check(move || {
514 let mut wake_sink = WakeSink::new();
515
516 thread::spawn({
517 let wake_src = wake_sink.source();
518
519 move || {
520 wake_src.notify();
521 }
522 });
523 thread::spawn({
524 let wake_src = wake_sink.source();
525
526 move || {
527 wake_src.notify();
528 wake_src.notify();
529 }
530 });
531
532 let multi_waker = MultiWaker::default();
533 for _ in 0..3 {
534 let waker = multi_waker.new_waker();
535 wake_sink.register(&waker);
536 }
537 });
538 }
539}