Crate diatomic_waker

source ·
Expand description

Async, fast synchronization primitives for task wakeup.

diatomic-waker is similar to atomic-waker in that it enables concurrent updates and notifications to a wrapped Waker. Unlike the latter, however, it does not use spinlocks1 and is faster, in particular when the consumer is notified periodically rather than just once. It can in particular be used as a very fast, single-consumer eventcount to turn a non-blocking data structure into an asynchronous one (see MPSC channel receiver example).

The API distinguishes between the entity that registers wakers (WakeSink or WakeSinkRef) and the possibly many entities that notify the waker (WakeSources or WakeSourceRefs).

Most users will prefer to use WakeSink and WakeSource, which readily store a shared DiatomicWaker within an Arc. You may otherwise elect to allocate a DiatomicWaker yourself, but will then need to use the lifetime-bounded WakeSinkRef and WakeSourceRef, or ensure by other means that waker registration is not performed concurrently.

§Features flags

By default, this crate enables the alloc feature to provide the owned WakeSink and WakeSource. It can be made no-std-compatible by specifying default-features = false.

§Examples

A multi-producer, single-consumer channel of capacity 1 for sending NonZeroUsize values, with an asynchronous receiver:

use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use diatomic_waker::{WakeSink, WakeSource};

// The sending side of the channel.
#[derive(Clone)]
struct Sender {
    wake_src: WakeSource,
    value: Arc<AtomicUsize>,
}

// The receiving side of the channel.
struct Receiver {
    wake_sink: WakeSink,
    value: Arc<AtomicUsize>,
}

// Creates an empty channel.
fn channel() -> (Sender, Receiver) {
    let value = Arc::new(AtomicUsize::new(0));
    let wake_sink = WakeSink::new();
    let wake_src = wake_sink.source();

    (
        Sender {
            wake_src,
            value: value.clone(),
        },
        Receiver { wake_sink, value },
    )
}

impl Sender {
    // Sends a value if the channel is empty.
    fn try_send(&self, value: NonZeroUsize) -> bool {
        let success = self
            .value
            .compare_exchange(0, value.get(), Ordering::Relaxed, Ordering::Relaxed)
            .is_ok();
        if success {
            self.wake_src.notify()
        };

        success
    }
}

impl Receiver {
    // Receives a value asynchronously.
    async fn recv(&mut self) -> NonZeroUsize {
        // Wait until the predicate returns `Some(value)`, i.e. when the atomic
        // value becomes non-zero.
        self.wake_sink
            .wait_until(|| NonZeroUsize::new(self.value.swap(0, Ordering::Relaxed)))
            .await
    }
}

In some case, it may be necessary to use the lower-level register and unregister methods rather than the wait_until convenience method.

This is how the behavior of the above recv method could be reproduced with a hand-coded future:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

struct Recv<'a> {
    receiver: &'a mut Receiver,
}

impl Future for Recv<'_> {
    type Output = NonZeroUsize;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<NonZeroUsize> {
        // Avoid waker registration if a value is readily available.
        let value = NonZeroUsize::new(self.receiver.value.swap(0, Ordering::Relaxed));
        if let Some(value) = value {
            return Poll::Ready(value);
        }

        // Register the waker to be polled again once a value is available.
        self.receiver.wake_sink.register(cx.waker());

        // Check again after registering the waker to prevent a race condition.
        let value = NonZeroUsize::new(self.receiver.value.swap(0, Ordering::Relaxed));
        if let Some(value) = value {
            // Avoid a spurious wake-up.
            self.receiver.wake_sink.unregister();

            return Poll::Ready(value);
        }

        Poll::Pending
    }
}

  1. The implementation of AtomicWaker yields to the runtime on contention, which is in effect an executor-mediated spinlock. 

Modules§

Structs§