Module thingbuf::mpsc

source ·
Expand description

Multi-producer, single-consumer channels using ThingBuf.

A channel is a synchronization and communication primitive that combines a shared queue with the ability to wait. Channels provide a send operation, which enqueues a message if there is capacity in the queue, or waits for capacity to become available if there is none; and a receive operation, which dequeues a message from the queue if any are available, or waits for a message to be sent if the queue is empty. This module provides an implementation of multi-producer, single-consumer channels built on top of the lock-free queue implemented by ThingBuf.

The channel API in this module is broadly similar to the one provided by other implementations, such as std::sync::mpsc, tokio::sync::mpsc, or crossbeam::channel. A given channel instance is represented by a pair of types:

  • a Sender handle, which represents the capacity to send messages to the channel
  • a Receiver handle, which represents the capacity to receive messages from the channel
// Constructing a channel returns a paired `Sender` and `Receiver`:
let (tx, rx) = thingbuf::mpsc::channel::<String>(10);

As these are multi-producer, single-consumer channels, the Sender type implements Clone; it may be cloned any number of times to create multiple Senders that send messages to the same channel. On the other hand, each channel instance has only a single Receiver.

Disconnection

When all Sender handles have been dropped, it is no longer possible to send values into the channel. When this occurs, the channel is considered to have been closed by the sender; calling Receiver::recv once a channel has closed will return None. Note that if the Receiver has not received any messages sent prior to the last Sender being dropped, those messages will be returned before Receiver::recv returns None.

If the Receiver handle is dropped, then messages can no longer be read out of the channel. In this case, all further attempts to send will result in an error.

Channel Flavors

This module contains several different “flavors” of multi-producer, single-consumer channels. The primary differences between the different channel implementations are whether they wait asynchronously or by blocking the current thread, and whether the array that stores channel messages is allocated dynamically on the heap or statically at compile-time.

Dynamic AllocationStatic Allocation
AsyncchannelStaticChannel
Blockingblocking::channelblocking::StaticChannel

Asynchronous and Blocking Channels

The default channel implementation (in this module) are asynchronous. With these channels, the send and receive operations are async fns — when it’s necessary to wait to send or receive a message, the Futures returned by these functions will yield, allowing other tasks to execute. These asynchronous channels are suitable for use in an async runtime like tokio, or in embedded or bare-metal systems that use async/await syntax. They do not require the Rust standard library.

If the “std” feature flag is enabled, this module also provides a blocking submodule, which implements blocking (or synchronous) channels. The blocking receiver will instead wait for new messages by blocking the current thread, and the blocking sender will wait for send capacity by blocking the current thread. A blocking channel can be constructed using the blocking::channel function. Naturally, blocking the current thread requires thread APIs, so these channels are only available when the Rust standard library is available.

An asynchronous channel is used with asynchronous tasks:

use thingbuf::mpsc;

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel(8);

    // Spawn some tasks that write to the channel:
    for i in 0..10 {
        let tx = tx.clone();
        tokio::spawn(async move {
            tx.send(i).await.unwrap();
        });
    }

    // Print out every message recieved from the channel:
    for _ in 0..10 {
        let j = rx.recv().await.unwrap();

        println!("received {}", j);
        assert!(0 <= j && j < 10);
    }
}

A blocking channel is used with threads:

use thingbuf::mpsc::blocking;
use std::thread;

let (tx, rx) = blocking::channel(8);

// Spawn some threads that write to the channel:
for i in 0..10 {
    let tx = tx.clone();
    thread::spawn(move || {
        tx.send(i).unwrap();
    });
}

// Print out every message recieved from the channel:
for _ in 0..10 {
    let j = rx.recv().unwrap();

    println!("received {}", j);
    assert!(0 <= j && j < 10);
}

Static and Dynamically Allocated Channels

The other difference between channel flavors is whether the array that backs the channel’s queue is allocated dynamically on the heap, or allocated statically.

The default channels returned by channel and blocking::channel are dynamically allocated: although they are fixed-size, the size of the channel can be determined at runtime when the channel is created, and any number of dynamically-allocated channels may be created and destroyed over the lifetime of the program. Because these channels dynamically allocate memory, they require the “alloc” feature flag.

In some use cases, though, dynamic memory allocation may not be available (such as in some embedded systems, or within a memory allocator implementation), or it may be desirable to avoid dynamic memory allocation (such as in very performance-sensitive applications). To support those use cases, thingbuf::mpsc also provides static channels. The size of these channels is determined at compile-time (by a const generic parameter) rather than at runtime, so they may be constructed in a static initializer. This allows allocating the array that backs the channel’s queue statically, so that dynamic allocation is not needed.The StaticChannel and blocking::StaticChannel types are used to construct static channels.

A dynamically allocated channel’s size can be determined at runtime:

use thingbuf::mpsc::blocking;
use std::env;

fn main() {
    // Determine the channel capacity from the first command-line line
    // argument, if one is present.
    let channel_capacity = env::args()
        .nth(1)
        .and_then(|cap| cap.parse::<usize>().ok())
        .unwrap_or(16);

    // Construct a dynamically-sized blocking channel.
    let (tx, rx) = blocking::channel(channel_capacity);

    tx.send("hello world").unwrap();

    // ...
}

A statically allocated channel may be used without heap allocation:

// We are in a `no_std` context with no memory allocator!
#![no_std]
use thingbuf::mpsc;

// Create a channel backed by a static array with 256 entries.
static KERNEL_EVENTS: mpsc::StaticChannel<KernelEvent, 256> = mpsc::StaticChannel::new();

#[no_mangle]
pub fn kernel_main() {
    // Split the static channel into a sender/receiver pair.
    let (event_tx, event_rx) = KERNEL_EVENTS.split();
    let mut kernel_tasks = TaskExecutor::new();

    kernel_tasks.spawn(async move {
        // Process kernel events
        while let Some(event) = event_rx.recv().await {
            // ...
        }
    });

    // Some device driver that needs to emit events to the channel.
    kernel_tasks.spawn(some_device_driver(event_tx.clone()));

    loop {
        kernel_tasks.tick();
    }
}

async fn some_device_driver(event_tx: mpsc::StaticSender<KernelEvent>) {
    let mut device = SomeDevice::new(0x42);

    loop {
        // When the device has data, emit a kernel event.
        match device.poll_for_events().await {
            Ok(event) => event_tx.send(event).await.unwrap(),
            Err(err) => event_tx.send(KernelEvent::from(err)).await.unwrap(),
        }
    }
}

Modules

  • A synchronous multi-producer, single-consumer channel.
  • Errors returned by channels.

Structs

Functions

  • channelalloc
    Returns a new asynchronous multi-producer, single consumer (MPSC) channel with the provided capacity.
  • Returns a new asynchronous multi-producer, single consumer (MPSC) channel with the provided capacity and recycling policy.