Struct maitake_sync::WaitQueue

source ·
pub struct WaitQueue { /* private fields */ }
Expand description

A queue of waiting tasks which can be woken in first-in, first-out order, or all at once.

A WaitQueue allows any number of tasks to wait asynchronously and be woken when some event occurs, either individually in first-in, first-out order, or all at once. This makes it a vital building block of runtime services (such as timers or I/O resources), where it may be used to wake a set of tasks when a timer completes or when a resource becomes available. It can be equally useful for implementing higher-level synchronization primitives: for example, a WaitQueue plus an UnsafeCell is essentially an entire implementation of a fair asynchronous mutex. Finally, a WaitQueue can be a useful synchronization primitive on its own: sometimes, you just need to have a bunch of tasks wait for something and then wake them all up.

§Examples

Waking a single task at a time by calling wake:

use std::sync::Arc;
use maitake::scheduler::Scheduler;
use maitake_sync::WaitQueue;

const TASKS: usize = 10;

// In order to spawn tasks, we need a `Scheduler` instance.
let scheduler = Scheduler::new();

// Construct a new `WaitQueue`.
let q = Arc::new(WaitQueue::new());

// Spawn some tasks that will wait on the queue.
for _ in 0..TASKS {
    let q = q.clone();
    scheduler.spawn(async move {
        // Wait to be woken by the queue.
        q.wait().await.expect("queue is not closed");
    });
}

// Tick the scheduler once.
let tick = scheduler.tick();

// No tasks should complete on this tick, as they are all waiting
// to be woken by the queue.
assert_eq!(tick.completed, 0, "no tasks have been woken");

let mut completed = 0;
for i in 1..=TASKS {
    // Wake the next task from the queue.
    q.wake();

    // Tick the scheduler.
    let tick = scheduler.tick();

    // A single task should have completed on this tick.
    completed += tick.completed;
    assert_eq!(completed, i);
}

assert_eq!(completed, TASKS, "all tasks should have completed");

Waking all tasks using wake_all:

use std::sync::Arc;
use maitake::scheduler::Scheduler;
use maitake_sync::WaitQueue;

const TASKS: usize = 10;

// In order to spawn tasks, we need a `Scheduler` instance.
let scheduler = Scheduler::new();

// Construct a new `WaitQueue`.
let q = Arc::new(WaitQueue::new());

// Spawn some tasks that will wait on the queue.
for _ in 0..TASKS {
    let q = q.clone();
    scheduler.spawn(async move {
        // Wait to be woken by the queue.
        q.wait().await.expect("queue is not closed");
    });
}

// Tick the scheduler once.
let tick = scheduler.tick();

// No tasks should complete on this tick, as they are all waiting
// to be woken by the queue.
assert_eq!(tick.completed, 0, "no tasks have been woken");

// Wake all tasks waiting for the queue.
q.wake_all();

// Tick the scheduler again to run the woken tasks.
let tick = scheduler.tick();

// All tasks have now completed, since they were woken by the
// queue.
assert_eq!(tick.completed, TASKS, "all tasks should have completed");

§Implementation Notes

This type is implemented using an intrusive doubly-linked list.

The intrusive aspect of this list is important, as it means that it does not allocate memory. Instead, nodes in the linked list are stored in the futures of tasks trying to wait for capacity. This means that it is not necessary to allocate any heap memory for each task waiting to be woken.

However, the intrusive linked list introduces one new danger: because futures can be cancelled, and the linked list nodes live within the futures trying to wait on the queue, we must ensure that the node is unlinked from the list before dropping a cancelled future. Failure to do so would result in the list containing dangling pointers. Therefore, we must use a doubly-linked list, so that nodes can edit both the previous and next node when they have to remove themselves. This is kind of a bummer, as it means we can’t use something nice like this intrusive queue by Dmitry Vyukov, and there are not really practical designs for lock-free doubly-linked lists that don’t rely on some kind of deferred reclamation scheme such as hazard pointers or QSBR.

Instead, we just stick a Mutex around the linked list, which must be acquired to pop nodes from it, or for nodes to remove themselves when futures are cancelled. This is a bit sad, but the critical sections for this mutex are short enough that we still get pretty good performance despite it.

Implementations§

source§

impl WaitQueue

source

pub const fn new() -> Self

Returns a new WaitQueue.

source

pub fn wake(&self)

Wake the next task in the queue.

If the queue is empty, a wakeup is stored in the WaitQueue, and the next call to wait().await will complete immediately. If one or more tasks are currently in the queue, the first task in the queue is woken.

At most one wakeup will be stored in the queue at any time. If wake() is called many times while there are no tasks in the queue, only a single wakeup is stored.

§Examples
§Examples
use std::sync::Arc;
use maitake_sync::WaitQueue;

let queue = Arc::new(WaitQueue::new());

let waiter = task::spawn({
    // clone the queue to move into the spawned task
    let queue = queue.clone();
    async move {
        queue.wait().await;
        println!("received wakeup!");
    }
});

println!("waking task...");
queue.wake();

waiter.await.unwrap();
source

pub fn wake_all(&self)

Wake all tasks currently in the queue.

All tasks currently waiting on the queue are woken. Unlike wake(), a wakeup is not stored in the queue to wake the next call to wait() if the queue is empty. Instead, this method only wakes all currently registered waiters. Registering a task to be woken is done by awaiting the Future returned by the wait() method on this queue.

§Examples
use maitake_sync::WaitQueue;
use std::sync::Arc;

let queue = Arc::new(WaitQueue::new());

// spawn multiple tasks to wait on the queue.
let task1 = task::spawn({
    let queue = queue.clone();
    async move {
        println!("task 1 waiting...");
        queue.wait().await;
        println!("task 1 woken")
    }
});

let task2 = task::spawn({
    let queue = queue.clone();
    async move {
        println!("task 2 waiting...");
        queue.wait().await;
        println!("task 2 woken")
    }
});

// yield to the scheduler so that both tasks register
// themselves to wait on the queue.
task::yield_now().await;

// neither task will have been woken.
assert!(!task1.is_finished());
assert!(!task2.is_finished());

// wake all tasks waiting on the queue.
queue.wake_all();

// yield to the scheduler again so that the tasks can execute.
task::yield_now().await;

assert!(task1.is_finished());
assert!(task2.is_finished());
source

pub fn close(&self)

Close the queue, indicating that it may no longer be used.

Once a queue is closed, all wait() calls (current or future) will return an error.

This method is generally used when implementing higher-level synchronization primitives or resources: when an event makes a resource permanently unavailable, the queue can be closed.

source

pub fn wait(&self) -> Wait<'_>

Wait to be woken up by this queue.

Equivalent to:

async fn wait(&self);

This returns a Wait Future that will complete when the task is woken by a call to wake() or wake_all(), or when the WaitQueue is dropped.

Each WaitQueue holds a single wakeup. If wake() was previously called while no tasks were waiting on the queue, then wait().await will complete immediately, consuming the stored wakeup. Otherwise, wait().await waits to be woken by the next call to wake() or wake_all().

The Wait future is not guaranteed to receive wakeups from calls to wake() if it has not yet been polled. See the documentation for the Wait::subscribe() method for details on receiving wakeups from the queue prior to polling the Wait future for the first time.

A Wait future is is guaranteed to recieve wakeups from calls to wake_all() as soon as it is created, even if it has not yet been polled.

§Returns

The Future returned by this method completes with one of the following outputs:

§Cancellation

A WaitQueue fairly distributes wakeups to waiting tasks in the order that they started to wait. If a Wait future is dropped, the task will forfeit its position in the queue.

§Examples
use std::sync::Arc;
use maitake_sync::WaitQueue;

let queue = Arc::new(WaitQueue::new());

let waiter = task::spawn({
    // clone the queue to move into the spawned task
    let queue = queue.clone();
    async move {
        queue.wait().await;
        println!("received wakeup!");
    }
});

println!("waking task...");
queue.wake();

waiter.await.unwrap();
source§

impl WaitQueue

source

pub fn wait_owned(self: &Arc<Self>) -> WaitOwned

Available on crate feature alloc only.

Wait to be woken up by this queue, returning a future that’s valid for the 'static lifetime.

This returns a WaitOwned future that will complete when the task is woken by a call to wake() or wake_all(), or when the WaitQueue is closed.

This is identical to the wait() method, except that it takes a Arc reference to the WaitQueue, allowing the returned future to live for the 'static lifetime. See the documentation for wait() for details on how to use the future returned by this method.

§Returns

The Future returned by this method completes with one of the following outputs:

§Cancellation

A WaitQueue fairly distributes wakeups to waiting tasks in the order that they started to wait. If a WaitOwned future is dropped, the task will forfeit its position in the queue.

Trait Implementations§

source§

impl Debug for WaitQueue

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more