Expand description
Multi-producer, single-consumer channels using ThingBuf
.
A channel is a synchronization and communication primitive that combines a
shared queue with the ability to wait. Channels provide a send
operation, which enqueues a message if there is capacity in the queue, or
waits for capacity to become available if there is none; and a receive
operation, which dequeues a message from the queue if any are available, or
waits for a message to be sent if the queue is empty. This module provides
an implementation of multi-producer, single-consumer channels built on top
of the lock-free queue implemented by ThingBuf
.
The channel API in this module is broadly similar to the one provided by
other implementations, such as std::sync::mpsc
, tokio::sync::mpsc
,
or crossbeam::channel
. A given channel instance is represented by a pair
of types:
- a
Sender
handle, which represents the capacity to send messages to the channel - a
Receiver
handle, which represents the capacity to receive messages from the channel
// Constructing a channel returns a paired `Sender` and `Receiver`:
let (tx, rx) = thingbuf::mpsc::channel::<String>(10);
As these are multi-producer, single-consumer channels, the Sender
type
implements Clone
; it may be cloned any number of times to create multiple
Sender
s that send messages to the same channel. On the other hand, each
channel instance has only a single Receiver
.
Disconnection
When all Sender
handles have been dropped, it is no longer
possible to send values into the channel. When this occurs, the channel is
considered to have been closed by the sender; calling Receiver::recv
once a channel has closed will return None
. Note that if the Receiver
has
not received any messages sent prior to the last Sender
being dropped,
those messages will be returned before Receiver::recv
returns None
.
If the Receiver
handle is dropped, then messages can no longer
be read out of the channel. In this case, all further attempts to send will
result in an error.
Channel Flavors
This module contains several different “flavors” of multi-producer, single-consumer channels. The primary differences between the different channel implementations are whether they wait asynchronously or by blocking the current thread, and whether the array that stores channel messages is allocated dynamically on the heap or statically at compile-time.
Dynamic Allocation | Static Allocation | |
---|---|---|
Async | channel | StaticChannel |
Blocking | blocking::channel | blocking::StaticChannel |
Asynchronous and Blocking Channels
The default channel implementation (in this module) are asynchronous.
With these channels, the send and receive operations are async fn
s
— when it’s necessary to wait to send or receive a message, the
Future
s returned by these functions will yield, allowing other tasks
to execute. These asynchronous channels are suitable for use in an async
runtime like tokio
, or in embedded or bare-metal systems that use
async
/await
syntax. They do not require the Rust standard library.
If the “std” feature flag is enabled, this module also provides a
blocking
submodule, which implements blocking (or synchronous)
channels. The blocking receiver will instead wait for new messages by
blocking the current thread, and the blocking sender will wait for send
capacity by blocking the current thread. A blocking channel can be
constructed using the blocking::channel
function. Naturally, blocking
the current thread requires thread APIs, so these channels are only
available when the Rust standard library is available.
An asynchronous channel is used with asynchronous tasks:
use thingbuf::mpsc;
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(8);
// Spawn some tasks that write to the channel:
for i in 0..10 {
let tx = tx.clone();
tokio::spawn(async move {
tx.send(i).await.unwrap();
});
}
// Print out every message recieved from the channel:
for _ in 0..10 {
let j = rx.recv().await.unwrap();
println!("received {}", j);
assert!(0 <= j && j < 10);
}
}
A blocking channel is used with threads:
use thingbuf::mpsc::blocking;
use std::thread;
let (tx, rx) = blocking::channel(8);
// Spawn some threads that write to the channel:
for i in 0..10 {
let tx = tx.clone();
thread::spawn(move || {
tx.send(i).unwrap();
});
}
// Print out every message recieved from the channel:
for _ in 0..10 {
let j = rx.recv().unwrap();
println!("received {}", j);
assert!(0 <= j && j < 10);
}
Static and Dynamically Allocated Channels
The other difference between channel flavors is whether the array that backs the channel’s queue is allocated dynamically on the heap, or allocated statically.
The default channels returned by channel
and blocking::channel
are
dynamically allocated: although they are fixed-size, the size of the
channel can be determined at runtime when the channel is created, and any
number of dynamically-allocated channels may be created and destroyed over
the lifetime of the program. Because these channels dynamically allocate
memory, they require the “alloc” feature flag.
In some use cases, though, dynamic memory allocation may not be available
(such as in some embedded systems, or within a memory allocator
implementation), or it may be desirable to avoid dynamic memory allocation
(such as in very performance-sensitive applications). To support those use
cases, thingbuf::mpsc
also provides static channels. The size of these
channels is determined at compile-time (by a const generic parameter) rather
than at runtime, so they may be constructed in a static initializer. This
allows allocating the array that backs the channel’s queue statically, so
that dynamic allocation is not needed.The StaticChannel
and
blocking::StaticChannel
types are used to construct static channels.
A dynamically allocated channel’s size can be determined at runtime:
use thingbuf::mpsc::blocking;
use std::env;
fn main() {
// Determine the channel capacity from the first command-line line
// argument, if one is present.
let channel_capacity = env::args()
.nth(1)
.and_then(|cap| cap.parse::<usize>().ok())
.unwrap_or(16);
// Construct a dynamically-sized blocking channel.
let (tx, rx) = blocking::channel(channel_capacity);
tx.send("hello world").unwrap();
// ...
}
A statically allocated channel may be used without heap allocation:
// We are in a `no_std` context with no memory allocator!
#![no_std]
use thingbuf::mpsc;
// Create a channel backed by a static array with 256 entries.
static KERNEL_EVENTS: mpsc::StaticChannel<KernelEvent, 256> = mpsc::StaticChannel::new();
#[no_mangle]
pub fn kernel_main() {
// Split the static channel into a sender/receiver pair.
let (event_tx, event_rx) = KERNEL_EVENTS.split();
let mut kernel_tasks = TaskExecutor::new();
kernel_tasks.spawn(async move {
// Process kernel events
while let Some(event) = event_rx.recv().await {
// ...
}
});
// Some device driver that needs to emit events to the channel.
kernel_tasks.spawn(some_device_driver(event_tx.clone()));
loop {
kernel_tasks.tick();
}
}
async fn some_device_driver(event_tx: mpsc::StaticSender<KernelEvent>) {
let mut device = SomeDevice::new(0x42);
loop {
// When the device has data, emit a kernel event.
match device.poll_for_events().await {
Ok(event) => event_tx.send(event).await.unwrap(),
Err(err) => event_tx.send(KernelEvent::from(err)).await.unwrap(),
}
}
}
Modules
- blocking
std
A synchronous multi-producer, single-consumer channel. - Errors returned by channels.
Structs
- Receiver
alloc
Asynchronously receives values from associatedSender
s. - A reference to a message being received from an asynchronous channel.
- A reference to a message being sent to an asynchronous channel.
- Sender
alloc
Asynchronously sends values to an associatedReceiver
. - StaticChannel
static
A statically-allocated, asynchronous bounded MPSC channel. - StaticReceiver
static
Asynchronously receives values from associatedStaticSender
s. - StaticSender
static
Asynchronously sends values to an associatedStaticReceiver
.
Functions
- channel
alloc
Returns a new asynchronous multi-producer, single consumer (MPSC) channel with the provided capacity. - with_recycle
alloc
Returns a new asynchronous multi-producer, single consumer (MPSC) channel with the provided capacity and recycling policy.