Crate asyncnal

Source
Expand description

§Overview

This crate provides executor-agnostic async signalling primitives for building more complex asynchronous datastructures. They are designed to be lightweight and extremely easy to use. The documentation contains many examples on how to apply them to a variety of situations. There also exists local varieties for runtimes that are !Send and !Sync.

The inspiration from the crate is two-fold. First, the excellent synchronous event notification mechansism rsevents. Additionally, Tokio’s Notify primitive. The main difference being that this crate is very lightweight and can be brought in without bringing in the entire Tokio runtime.

It can be thought of as a Semaphore starting without any permits, although you can optionally create an event that is already set. Every event mechanism in this crate can be described in terms of the EventSetter interface. A task waits for an event via the wait method, and then is woken up through one of two calls:

  1. set_one wakes up a single event.
  2. set_all wakes up all the queued events. There is additionally a synchronous method, try_wait which tries to acquire the event along the fast path. This is composed of two simple atomic operations and thus is very fast and does not need to be asynchronous.

§Quickstart

For quickstart, you are most likely uniquely interested in the most basic event, which is also the most versatile, Event.

A quick guide to usage for this event is shown below:

use asyncnal::{Event, EventSetter};
 
// We create a new event in the unset state.
let event = Event::new();
 
// We set the event.
event.set_one();
 
// We will be able to immediately acquire this event.
event.wait().await;
 
 

§Fairness

The events are stored in a lock-free concurrent queue provided by lfqueue, and are thus woken in a FIFO order. This means that if events are queued up in order A, B, C, they will be woken up in that order. For the case of set_all, the events are guaranteed to be unloaded in FIFO order but events may make progress at different times depending on the nature of the executor.

§Cancel Safety

The events provided by the crate all fully cancel safe. This is achieved by special drop handling of the futures. Essentially, when a waiter is dropped, we check if we have modified the internal state of the event, and in this case, we roll it back. There is one catch though:

Due to the concurrent nature of the queues, we cannot simply just iterate over it and kick one event out of the queue. Therefore, the event has a way of communicating back to the queue where it signals that said queue entry is actually invalid, causing it to be skipped over. This is not, however, immediately evited from the queue.

§Examples

§Basic

The example below shows the creation of an event, along with setting an event, and then immediately acquiring that event.

use asyncnal::{Event, EventSetter};
 
let event = Event::new();
assert!(!event.has_waiters());
 
// We'll pre-set the event.
event.set_one();
 
// This will immediately return.
event.wait().await;

§Asynchronous Mutex

This signalling mechanism can be composed to create an asynchronous mutex to avoid system calls.

use asyncnal::*;
use core::{cell::UnsafeCell, ops::{Deref, DerefMut}};
 
struct AsyncMutex<T> {
    cell: UnsafeCell<T>,
    event: Event
}
 
struct AsyncMutexGuard<'a, T>(&'a AsyncMutex<T>);
 
impl<T> AsyncMutex<T> {
    pub fn new(item: T) -> Self {
        Self {
            cell: UnsafeCell::new(item),
            // Since the mutex should be in an available state initially,
            // we should initialize the event in a pre-set state.
            event: Event::new_set()
        }
    }
    pub fn try_lock(&self) -> Option<AsyncMutexGuard<'_, T>> {
        // Using the try_wait method we can implement a try_lock that
        // is synchronous!
        if self.event.try_wait() {
            Some(AsyncMutexGuard(self))
        } else {
            // We could not acquire the lock on the fast
            // path.
            None
        }
    }
    pub async fn lock(&self) -> AsyncMutexGuard<'_, T> {
        // Wait for the event to become available.
        self.event.wait().await;
        AsyncMutexGuard(self)
    }
}
 
impl<'a, T> Drop for AsyncMutexGuard<'a, T> {
    fn drop(&mut self) {
        // Now we need to set the event to indicate it is now available!
        self.0.event.set_one();
    }
}
 
impl<'a, T> Deref for AsyncMutexGuard<'a, T> {
    type Target = T;
    fn deref(&self) -> &T {
        // SAFETY: The event guarantees we are the only
        // ones with this guard.
        unsafe { &*self.0.cell.get() }
    }
}
 
impl<'a, T> DerefMut for AsyncMutexGuard<'a, T> {
    fn deref_mut(&mut self) -> &mut T {
        // SAFETY: The event guarantees we are the only
        // ones with this guard.
        unsafe { &mut *self.0.cell.get() }
    }
 
}
 
let mutex = AsyncMutex::new(4);
assert_eq!(*mutex.lock().await, 4);
 
// Let's try to double acquire.
// We start by acquiring a handle.
let handle = mutex.lock().await;
// Now the try_lock method should fail.
assert!(mutex.try_lock().is_none());
drop(handle);
// Now that we have released the handle, we
// can acquire :)
assert!(mutex.try_lock().is_some());
 
 
*mutex.lock().await = 5;
 
assert_eq!(*mutex.lock().await, 5);
 

§Asynchronous Channels

We can use this abstraction to trivially build asynchronous channels for local runtimes.

use std::{collections::VecDeque, cell::RefCell, rc::Rc};
use asyncnal::{LocalEvent, EventSetter};
 
#[derive(Clone)]
struct Channel<T> {
    queue: Rc<RefCell<VecDeque<T>>>,
    event: Rc<LocalEvent>
}
 

 
impl<T> Channel<T> {
    pub fn new() -> Self {
        Self {
            queue: Rc::default(),
            event: Rc::default()
        }
    }
    pub fn send(&self, item: T) {
        self.queue.borrow_mut().push_back(item);
        self.event.set_one();
    }
    fn try_remove(&self) -> Option<T> {
        self.queue.borrow_mut().pop_front()
    }
    pub async fn recv(&self) -> T {
        let mut value = self.try_remove();
        while value.is_none() {
            // Wait for a notificaiton
            self.event.wait().await;
            value = self.try_remove();
        }
        value.unwrap()
    }
     
}
 
let channel = Channel::<usize>::new();
channel.send(4);
assert_eq!(channel.recv().await, 4);

Structs§

CountedAwaiter
CountedEvent
A CountedEvent wraps a type that implements the EventSetter trait. It can tell you how many waiters there are on that very event. The count will only go up once the waiter actually starts waiting on the event. If there is an immediate return on the first time the wait is polled, the count will not go up.
Event
An event for normal usage. It is Send and Sync and can be used within multithreaded environments.
EventAwait
LocalEvent
An event for usage within local runtimess. It is !Send and !Sync.
LocalEventAwait
Yield
An asynchronous Yield. This was largely just an implementation of Yield from the smol crate.

Traits§

EventSetter
Describes an event interface that can be created in a set or unset state and can be waited upon.