Skip to main content

enso_channel/
lib.rs

1#![allow(clippy::type_complexity)]
2//! enso_channel
3//!
4//! **Bounded. Lock-free. Batch-native.**
5//!
6//! `enso_channel` is a batch-first concurrency primitive: a family of bounded, lock-free,
7//! ring-buffer channels designed for bursty, latency-sensitive systems.
8//!
9//! The API is intentionally non-blocking: operations are exposed as `try_*` and surface
10//! backpressure/termination explicitly via errors (`Full`, `Empty`, `Disconnected`).
11//!
12//! ## Mental model
13//!
14//! Instead of sending items one-by-one, producers typically:
15//!
16//! 1. claim a contiguous range in the ring buffer,
17//! 2. write into it,
18//! 3. commit the range.
19//!
20//! Receivers observe items via RAII guards/iterators; dropping them commits consumption.
21//!
22//! ## Misuse prevention (compile-time)
23//!
24//! Batch receives return a guard that commits consumption on drop. To keep this sound,
25//! the guard is intentionally *not* an `Iterator<Item = &T>`.
26//!
27//! ```compile_fail
28//! use enso_channel::mpsc;
29//! # fn main() {
30//! let (mut tx, mut rx) = mpsc::channel::<u64>(4);
31//! tx.try_send(1).unwrap();
32//! let batch = rx.try_recv_many(1).unwrap();
33//! // `RecvIter` is not an iterator; use `batch.iter()` instead.
34//! for v in batch {
35//!     let _ = v;
36//! }
37//! # }
38//! ```
39//!
40//! References yielded by `batch.iter()` are tied to the borrow of the batch guard and
41//! cannot outlive it:
42//!
43//! ```compile_fail
44//! use enso_channel::mpsc;
45//! # fn main() {
46//! let (mut tx, mut rx) = mpsc::channel::<u64>(4);
47//! tx.try_send(1).unwrap();
48//!
49//! let r: &u64 = {
50//!     let batch = rx.try_recv_many(1).unwrap();
51//!     batch.iter().next().unwrap()
52//! };
53//! let _ = *r;
54//! # }
55//! ```
56//!
57//! And you can't commit (drop/`finish`) the guard while holding a reference from it:
58//!
59//! ```compile_fail
60//! use enso_channel::mpsc;
61//! # fn main() {
62//! let (mut tx, mut rx) = mpsc::channel::<u64>(4);
63//! tx.try_send(1).unwrap();
64//!
65//! let batch = rx.try_recv_many(1).unwrap();
66//! let first = batch.iter().next().unwrap();
67//! batch.finish();
68//! let _ = *first;
69//! # }
70//! ```
71//!
72//! ## Public API modules
73//!
74//! - [`mpsc`]: multi-producer, single-consumer
75//! - [`broadcast`]: lossless fixed-N fanout (each receiver sees every item)
76//! - [`mpmc`]: multi-producer, multi-consumer work distribution
77//!
78//! ## Non-goals
79//!
80//! - no blocking API
81//! - no async/await integration
82//! - no dynamic resizing
83//! - no built-in scheduling policy
84//!
85//! ## Lifecycle / shutdown (RAII)
86//!
87//! This crate intentionally does **not** expose an explicit `close()`/`terminate()` API.
88//! Shutdown is expressed through normal Rust endpoint lifecycle:
89//!
90//! - dropping the last sender initiates shutdown; receivers may drain already-committed items
91//!   and then observe `Disconnected`;
92//! - dropping the last receiver disconnects senders (subsequent sends return `Disconnected`).
93//!
94//! ### Concurrency caveat
95//!
96//! Disconnection is **eventual, not transactional**.
97//! In concurrent code, an operation may still succeed while the peer endpoint is being dropped,
98//! and already-committed items may never be observed by the application.
99
100#[macro_use]
101mod channel_api_macros;
102
103pub mod broadcast;
104pub mod mpmc;
105pub mod mpsc;
106
107mod ringbuffer;
108mod slot_states;
109
110mod consumers;
111mod publisher;
112
113mod sequencers;
114
115pub(crate) mod permit;
116
117pub mod errors;
118mod sequence;
119
120mod cursor;
121
122pub(crate) use cursor::Cursor;
123pub(crate) use sequence::Sequence;
124pub(crate) use sequencers::{ConsumerSeqGate, PublisherSeqGate};
125
126pub(crate) use ringbuffer::{RingBuffer, RingBufferMeta};
127
128#[cfg(test)]
129mod send_sync_tests {
130    //! Compile-time tests to ensure all channel types implement Send.
131    //!
132    //! These tests verify the thread-safety contract for this crate:
133    //! all Sender and Receiver types must be transferable between threads.
134
135    fn assert_send<T: Send>() {}
136
137    #[allow(dead_code)]
138    fn assert_sync<T: Sync>() {}
139
140    #[test]
141    fn mpsc_is_send() {
142        assert_send::<crate::mpsc::Sender<u32>>();
143        assert_send::<crate::mpsc::Receiver<u32>>();
144    }
145
146    #[test]
147    fn broadcast_is_send() {
148        assert_send::<crate::broadcast::Sender<u32, 2>>();
149        assert_send::<crate::broadcast::Receiver<u32>>();
150    }
151
152    #[test]
153    fn mpmc_is_send() {
154        assert_send::<crate::mpmc::Sender<u32>>();
155        assert_send::<crate::mpmc::Receiver<u32>>();
156    }
157}