1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
//! Bounded MPMC channel abstraction on top of a ring buffer.
//!
//! # Overview
//!
//! This crate provides a flavor of message passing that favors throughput over lossless
//! communication. Under the hood, [`ring_channel`] is just a thin abstraction layer on top of a
//! multi-producer multi-consumer lock-free ring-buffer. Sending messages never blocks, however
//! messages can be lost if the internal buffer overflows, as incoming messages gradually overwrite
//! older pending messages. This behavior is ideal for use-cases in which the consuming threads
//! only care about the most recent messages and applying back pressure to producer threads is
//! not desirable.
//!
//! * A classic example are video streamers that send frames across threads for display.
//! If the consuming thread experiences lag, the producing threads are at risk of overflowing the
//! communication buffer. Rather than panicking, it's often acceptable to skip frames with little
//! to no impact to the user experience.
//!
//! * Another example is a rendering GUI thread that runs at a fixed rate of frames per second and
//! receives the current state of the application through a channel for display. Only the most
//! up-to-date version of the application state matters at the point in time the GUI is refreshed,
//! so there's no use in keeping a backlog of all intermediary state transitions.
//!
//! # Hello, world!
//!
//! ```rust
//! use ring_channel::*;
//! use std::num::NonZeroUsize;
//!
//! // Open the channel.
//! let (mut tx, mut rx) = ring_channel(NonZeroUsize::new(1).unwrap());
//!
//! // Send a message through the inbound endpoint.
//! tx.send("Hello, world!").unwrap();
//!
//! // Receive the message through the outbound endpoint.
//! # #[cfg(all(feature = "std", feature = "futures_api"))]
//! assert_eq!(rx.recv(), Ok("Hello, world!"));
//! ```
//!
//! # Overflowing the buffer
//!
//! ```rust
//! use ring_channel::*;
//! use std::num::NonZeroUsize;
//!
//! // Open the channel.
//! let (mut tx, mut rx) = ring_channel(NonZeroUsize::new(1).unwrap());
//!
//! // Send a message through the inbound endpoint.
//! tx.send("Hello, world!").unwrap();
//!
//! // Since the buffer can hold at most one message in this case,
//! // sending a second message overwrites the former.
//! tx.send("Hello, universe!").unwrap();
//!
//! // Receive the message through the outbound endpoint.
//! # #[cfg(all(feature = "std", feature = "futures_api"))]
//! assert_eq!(rx.recv(), Ok("Hello, universe!"));
//! ```
//!
//! # Communicating across threads
//!
//! Endpoints are just handles that may be cloned and sent to other threads.
//! They come in two flavors that allow sending and receiving messages through the channel,
//! respectively [`RingSender`] and [`RingReceiver`]. Cloning an endpoint produces a new
//! handle of the same kind associated with the same channel.
//! The channel lives as long as there is an endpoint associated with it.
//!
//! ```rust
//! # #[cfg(all(feature = "std", feature = "futures_api"))]
//! # fn doctest() -> Result<(), Box<dyn std::error::Error>> {
//! use ring_channel::*;
//! use std::{num::NonZeroUsize, thread};
//!
//! // Open the channel.
//! let (mut tx1, mut rx1) = ring_channel(NonZeroUsize::new(1).unwrap());
//!
//! let mut tx2 = tx1.clone();
//! let mut rx2 = rx1.clone();
//!
//! // Spawn a thread that echoes any message it receives.
//! thread::spawn(move || {
//! while let Ok(msg) = rx2.recv() {
//! if let Err(SendError::Disconnected(_)) = tx2.send(msg) {
//! break;
//! }
//! }
//! });
//!
//! tx1.send("Hello, world!")?;
//!
//! if let Ok(msg) = rx1.recv() {
//! // Depending on which thread goes first,
//! // we might have received the direct message or the echo.
//! assert_eq!(msg, "Hello, world!");
//! }
//! # Ok(())
//! # }
//! ```
//!
//! # Disconnection
//!
//! When all endpoints of one type get dropped, the channel becomes disconnected.
//! Attempting to send an message through a disconnected channel returns an error.
//! Receiving messages through a disconnected channel succeeds as long as there are
//! pending messages and an error is returned once all of them have been received.
//!
//! ```rust
//! use ring_channel::*;
//! use std::num::NonZeroUsize;
//!
//! // Open the channel.
//! let (mut tx1, mut rx) = ring_channel(NonZeroUsize::new(3).unwrap());
//!
//! let mut tx2 = tx1.clone();
//! let mut tx3 = tx2.clone();
//!
//! tx1.send(1).unwrap();
//! tx2.send(2).unwrap();
//! tx3.send(3).unwrap();
//!
//! // All senders are dropped and the channel becomes disconnected.
//! drop((tx1, tx2, tx3));
//!
//! // Pending messages can still be received.
//! assert_eq!(rx.try_recv(), Ok(1));
//! assert_eq!(rx.try_recv(), Ok(2));
//! assert_eq!(rx.try_recv(), Ok(3));
//!
//! // Finally, the channel reports itself as disconnected.
//! assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
//! ```
//!
//! # Futures API
//!
//! By default, [`RingSender`] implements [`futures::sink::Sink`] and
//! [`RingReceiver`] implements [`futures::stream::Stream`].
//!
//! The cargo feature `futures_api` can be disabled to opt out of the dependency on [futures-rs].
//!
//! ```rust
//! # #[cfg(all(feature = "std", feature = "futures_api"))]
//! # #[tokio::main]
//! # async fn doctest() -> Result<(), Box<dyn std::error::Error>> {
//! use ring_channel::*;
//! use futures::{prelude::*, stream};
//! use std::num::NonZeroUsize;
//!
//! // Open the channel.
//! let (tx, rx) = ring_channel(NonZeroUsize::try_from(13)?);
//!
//! let message = &['H', 'e', 'l', 'l', 'o', ',', ' ', 'w', 'o', 'r', 'l', 'd', '!'];
//!
//! // Send the stream of characters through the Sink.
//! stream::iter(message).map(Ok).forward(tx).await?;
//!
//! // Collect the Stream into a String.
//! assert_eq!(&rx.collect::<String>().await, "Hello, world!");
//! # Ok(())
//! # }
//! ```
//!
//! # Optional Features
//!
//! * `std` (enabled by default)
//!
//! Controls whether [crate `std`] is linked.
//!
//! * `futures_api` (enabled by default)
//!
//! Enables integration with [futures-rs](https://crates.io/crates/futures),
//! see [§ Futures API](index.html#futures-api).
//!
//! [crate `std`]: https://doc.rust-lang.org/std/
//! [futures-rs]: https://crates.io/crates/futures
#![no_std]
extern crate alloc;
#[cfg(any(feature = "std", test))]
#[cfg_attr(test, macro_use)]
extern crate std;
mod atomic;
mod buffer;
mod channel;
mod control;
mod error;
#[cfg(feature = "futures_api")]
mod waitlist;
pub use channel::*;
pub use error::*;
#[cfg(test)]
enum Void {}