Crate ring_channel
source ·Expand description
Bounded MPMC channel abstraction on top of a ring buffer.
Overview
This crate provides a flavor of message passing that favors throughput over lossless
communication. Under the hood, ring_channel
is just a thin abstraction layer on top of a
multi-producer multi-consumer lock-free ring-buffer. Sending messages never blocks, however
messages can be lost if the internal buffer overflows, as incoming messages gradually overwrite
older pending messages. This behavior is ideal for use-cases in which the consuming threads
only care about the most recent messages and applying back pressure to producer threads is
not desirable.
-
A classic example are video streamers that send frames across threads for display. If the consuming thread experiences lag, the producing threads are at risk of overflowing the communication buffer. Rather than panicking, it’s often acceptable to skip frames with little to no impact to the user experience.
-
Another example is a rendering GUI thread that runs at a fixed rate of frames per second and receives the current state of the application through a channel for display. Only the most up-to-date version of the application state matters at the point in time the GUI is refreshed, so there’s no use in keeping a backlog of all intermediary state transitions.
Hello, world!
use ring_channel::*;
use std::num::NonZeroUsize;
// Open the channel.
let (mut tx, mut rx) = ring_channel(NonZeroUsize::new(1).unwrap());
// Send a message through the inbound endpoint.
tx.send("Hello, world!").unwrap();
// Receive the message through the outbound endpoint.
assert_eq!(rx.recv(), Ok("Hello, world!"));
Overflowing the buffer
use ring_channel::*;
use std::num::NonZeroUsize;
// Open the channel.
let (mut tx, mut rx) = ring_channel(NonZeroUsize::new(1).unwrap());
// Send a message through the inbound endpoint.
tx.send("Hello, world!").unwrap();
// Since the buffer can hold at most one message in this case,
// sending a second message overwrites the former.
tx.send("Hello, universe!").unwrap();
// Receive the message through the outbound endpoint.
assert_eq!(rx.recv(), Ok("Hello, universe!"));
Communicating across threads
Endpoints are just handles that may be cloned and sent to other threads.
They come in two flavors that allow sending and receiving messages through the channel,
respectively RingSender
and RingReceiver
. Cloning an endpoint produces a new
handle of the same kind associated with the same channel.
The channel lives as long as there is an endpoint associated with it.
use ring_channel::*;
use std::{num::NonZeroUsize, thread};
// Open the channel.
let (mut tx1, mut rx1) = ring_channel(NonZeroUsize::new(1).unwrap());
let mut tx2 = tx1.clone();
let mut rx2 = rx1.clone();
// Spawn a thread that echoes any message it receives.
thread::spawn(move || {
while let Ok(msg) = rx2.recv() {
if let Err(SendError::Disconnected(_)) = tx2.send(msg) {
break;
}
}
});
tx1.send("Hello, world!")?;
if let Ok(msg) = rx1.recv() {
// Depending on which thread goes first,
// we might have received the direct message or the echo.
assert_eq!(msg, "Hello, world!");
}
Disconnection
When all endpoints of one type get dropped, the channel becomes disconnected. Attempting to send an message through a disconnected channel returns an error. Receiving messages through a disconnected channel succeeds as long as there are pending messages and an error is returned once all of them have been received.
use ring_channel::*;
use std::num::NonZeroUsize;
// Open the channel.
let (mut tx1, mut rx) = ring_channel(NonZeroUsize::new(3).unwrap());
let mut tx2 = tx1.clone();
let mut tx3 = tx2.clone();
tx1.send(1).unwrap();
tx2.send(2).unwrap();
tx3.send(3).unwrap();
// All senders are dropped and the channel becomes disconnected.
drop((tx1, tx2, tx3));
// Pending messages can still be received.
assert_eq!(rx.try_recv(), Ok(1));
assert_eq!(rx.try_recv(), Ok(2));
assert_eq!(rx.try_recv(), Ok(3));
// Finally, the channel reports itself as disconnected.
assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
Futures API
By default, RingSender
implements [futures::sink::Sink
] and
RingReceiver
implements [futures::stream::Stream
].
The cargo feature futures_api
can be disabled to opt out of the dependency on futures-rs.
use ring_channel::*;
use futures::{prelude::*, stream};
use std::num::NonZeroUsize;
// Open the channel.
let (tx, rx) = ring_channel(NonZeroUsize::try_from(13)?);
let message = &['H', 'e', 'l', 'l', 'o', ',', ' ', 'w', 'o', 'r', 'l', 'd', '!'];
// Send the stream of characters through the Sink.
stream::iter(message).map(Ok).forward(tx).await?;
// Collect the Stream into a String.
assert_eq!(&rx.collect::<String>().await, "Hello, world!");
Optional Features
-
std
(enabled by default)Controls whether crate
std
is linked. -
futures_api
(enabled by default)Enables integration with futures-rs, see § Futures API.
Structs
- The receiving end of a
ring_channel
. - The sending end of a
ring_channel
.
Enums
- An error that may be returned by
RingReceiver::recv
. - An error that may be returned by
RingSender::send
. - An error that may be returned by
RingReceiver::try_recv
.
Functions
- Opens a multi-producer multi-consumer channel backed by a ring buffer.