crossfire/lib.rs
1#![cfg_attr(docsrs, feature(doc_cfg))]
2#![cfg_attr(docsrs, allow(unused_attributes))]
3
4//! # Crossfire
5//!
6//! High-performance lockless spsc/mpsc/mpmc channels, algorithm derives crossbeam with improvements.
7//!
8//! It supports async contexts and bridges the gap between async and blocking contexts.
9//!
10//! For the concept, please refer to the [wiki](https://github.com/frostyplanet/crossfire-rs/wiki).
11//!
12//! ## Version history
13//!
14//! * v1.0: Used in production since 2022.12.
15//!
16//! * v2.0: [2025.6] Refactored the codebase and API
17//! by removing generic types from the ChannelShared type, which made it easier to code with.
18//!
19//! * v2.1: [2025.9] Removed the dependency on crossbeam-channel
20//! and implemented with [a modified version of crossbeam-queue](https://github.com/frostyplanet/crossfire-rs/wiki/crossbeam-related),
21//! brings 2x performance improvements for both async and blocking contexts.
22//!
23//! * v3.0: [2026.1] Refactored API back to generic flavor interface, added [select].
24//! Dedicated optimization: Bounded SPSC +70%, MPSC +30%, one-size +20%.
25//! Eliminate enum dispatch cost, async performance improved for another 33%. Checkout [compat] for migiration from v2.x.
26//!
27//! ## Test status
28//!
29//! Refer to the [README](https://github.com/frostyplanet/crossfire-rs?tab=readme-ov-file#test-status) page for known issue on specified platform and runtime.
30//!
31//! ## Performance
32//!
33//! Being a lockless channel, crossfire outperforms other async-capable channels.
34//! And thanks to a lighter notification mechanism, most cases in blocking context are even
35//! better than the original crossbeam-channel,
36//!
37//! benchmark data is posted on [wiki](https://github.com/frostyplanet/crossfire-rs/wiki/benchmark-v3.0.0-2026%E2%80%9001%E2%80%9018).
38//!
39//! Also, being a lockless channel, the algorithm relies on spinning and yielding. Spinning is good on
40//! multi-core systems, but not friendly to single-core systems (like virtual machines).
41//! So we provide a function [detect_backoff_cfg()] to detect the running platform.
42//! Calling it within the initialization section of your code, will get a 2x performance boost on
43//! VPS.
44//!
45//! The benchmark is written in the criterion framework. You can run the benchmark by:
46//!
47//! ``` shell
48//! make bench crossfire
49//! make bench crossfire_select
50//! ```
51//!
52//! ## APIs
53//!
54//! ### Concurrency Modules
55//!
56//! - [spsc], [mpsc], [mpmc]. Each has different underlying implementation
57//! optimized to its concurrent model.
58//! The SP or SC interface is only for non-concurrent operation. It's more memory-efficient in waker registration,
59//! and has atomic ops cost reduced in the lockless algorithm.
60//!
61//! - [oneshot] has its special sender/receiver type because using `Tx` / `Rx` will be too heavy.
62//!
63//! - [select]:
64//! - [Select<'a>](crate::select::Select): crossbeam-channel style type erased API, borrows receiver address and select with "token"
65//! - [Multiplex](crate::select::Multiplex): Multiplex stream that owns multiple receiver, select from the same type of
66//! channel flavors, for the same type of message.
67//!
68//! - [waitgroup]: High performance WaitGroup which allows custom threshold
69//!
70//! ### Flavors
71//!
72//! The following lockless queues are expose in [flavor] module, and each one have type alias in spsc/mpsc/mpmc:
73//!
74//! - `List` (which use crossbeam `SegQueue`)
75//! - `Array` (which is an enum that wraps crossbeam `ArrayQueue`, and a `One` if init with size<=1)
76//! - For a bounded channel, a 0 size case is not supported yet. (rewrite as 1 size).
77//! - The implementation for spsc & mpsc is simplified from mpmc version.
78//! - `One` (which derives from `ArrayQueue` algorithm, but have better performance in size=1
79//! scenario, because it have two slots to allow reader and writer works concurrently)
80//! - `Null` (See the doc [crate::null]), for cancellation purpose channel, that only wakeup on
81//! closing.
82//!
83//! **NOTE** :
84//! Although the name [Array](crate::mpmc::Array), [List](crate::mpmc::List) are the same between spsc/mpsc/mpmc module,
85//! they are different type alias local to its parent module. We suggest distinguish by
86//! namespace when import for use.
87//!
88//! ### Channel builder function
89//!
90//! Aside from function `bounded_*`, `unbounded_*` which specify the sender / receiver type,
91//! each module has [build()](crate::mpmc::build()) and [new()](crate::mpmc::new()) function, which can apply to any channel flavors, and any async/blocking combinations.
92//!
93//!
94//! ### Types
95//!
96//! <table align="center" cellpadding="20">
97//! <tr> <th rowspan="2"> Context</th><th colspan="2" align="center">Sender (Producer)</th> <th colspan="2" align="center">Receiver (Consumer)</th> </tr>
98//! <tr> <td>Single</td> <td>Multiple</td><td>Single</td><td>Multiple</td></tr>
99//! <tr><td rowspan="2"><b>Blocking</b></td><td colspan="2" align="center"><a href="trait.BlockingTxTrait.html">BlockingTxTrait</a></td>
100//! <td colspan="2" align="center"><a href="trait.BlockingRxTrait.html">BlockingRxTrait</a></td></tr>
101//! <tr>
102//! <td align="center"><a href="struct.Tx.html">Tx</a></td>
103//! <td align="center"><a href="struct.MTx.html">MTx</a></td>
104//! <td align="center"><a href="struct.Rx.html">Rx</a></td>
105//! <td align="center"><a href="struct.MRx.html">MRx</a></td> </tr>
106//!
107//! <tr><td rowspan="2"><b>Async</b></td>
108//! <td colspan="2" align="center"><a href="trait.AsyncTxTrait.html">AsyncTxTrait</a></td>
109//! <td colspan="2" align="center"><a href="trait.AsyncRxTrait.html">AsyncRxTrait</a></td></tr>
110//! <tr><td><a href="struct.AsyncTx.html">AsyncTx</a></td>
111//! <td><a href="struct.MAsyncTx.html">MAsyncTx</a></td><td><a href="struct.AsyncRx.html">AsyncRx</a></td>
112//! <td><a href="struct.MAsyncRx.html">MAsyncRx</a></td></tr>
113//!
114//! </table>
115//!
116//! *Safety*: For the SP / SC version, [AsyncTx], [AsyncRx], [Tx], and [Rx] are not `Clone` and without `Sync`.
117//! Although can be moved to other threads, but not allowed to use send/recv while in an Arc. (Refer to the compile_fail
118//! examples in the type document).
119//!
120//! The benefit of using the SP / SC API is completely lockless waker registration, in exchange for a performance boost.
121//!
122//! The sender/receiver can use the **`From`** trait to convert between blocking and async context
123//! counterparts (refer to the [example](#example) below)
124//!
125//! ### Error types
126//!
127//! Error types are the same as crossbeam-channel:
128//!
129//! [TrySendError], [SendError], [SendTimeoutError], [TryRecvError], [RecvError], [RecvTimeoutError]
130//!
131//! ### Async compatibility
132//!
133//! Tested on tokio-1.x and async-std-1.x, crossfire is runtime-agnostic.
134//!
135//! The following scenarios are considered:
136//!
137//! * The [AsyncTx::send()] and [AsyncRx::recv()] operations are **cancellation-safe** in an async context.
138//! You can safely use the select! macro and timeout() function in tokio/futures in combination with recv().
139//! On cancellation, [SendFuture] and [RecvFuture] will trigger drop(), which will clean up the state of the waker,
140//! making sure there is no memory-leak and deadlock.
141//! But you cannot know the true result from SendFuture, since it's dropped
142//! upon cancellation. Thus, we suggest using [AsyncTx::send_timeout()] instead.
143//!
144//! * When the "tokio" or "async_std" feature is enabled, we also provide two additional functions:
145//!
146//! - [send_timeout()](crate::AsyncTx::send_timeout()), which will return the message that failed to be sent in
147//! [SendTimeoutError]. We guarantee the result is atomic. Alternatively, you can use
148//! [send_with_timer()](crate::AsyncTx::send_with_timer()).
149//!
150//! - [recv_timeout()](crate::AsyncRx::recv_timeout()), we guarantee the result is atomic.
151//! Alternatively, you can use [recv_with_timer()](crate::AsyncRx::recv_with_timer())
152//!
153//! * The waker footprint:
154//!
155//! When using a multi-producer and multi-consumer scenario, there's a small memory overhead to pass along a `Weak`
156//! reference of wakers.
157//! Because we aim to be lockless, when the sending/receiving futures are canceled (like tokio::time::timeout()),
158//! it might trigger an immediate cleanup if the try-lock is successful, otherwise will rely on lazy cleanup.
159//! (This won't be an issue because weak wakers will be consumed by actual message send and recv).
160//! On an idle-select scenario, like a notification for close, the waker will be reused as much as possible
161//! if poll() returns pending.
162//!
163//! * Handle written future:
164//!
165//! The future object created by [AsyncTx::send()], [AsyncTx::send_timeout()], [AsyncRx::recv()],
166//! [AsyncRx::recv_timeout()] is `Sized`. You don't need to put them in `Box`.
167//!
168//! If you like to use poll function directly for complex behavior, you can call
169//! [AsyncSink::poll_send()](crate::sink::AsyncSink::poll_send()) or [AsyncStream::poll_item()](crate::stream::AsyncStream::poll_item()) with Context.
170
171//!
172//! ## Usage
173//!
174//! Cargo.toml:
175//! ```toml
176//! [dependencies]
177//! crossfire = "3.0"
178//! ```
179//!
180//! ### Feature flags
181//!
182//! * `compat`: Enable the [compat] model, which has the same API namespace struct as V2.x
183//!
184//! * `tokio`: Enable [send_timeout](crate::AsyncTx::send_timeout()), [recv_timeout](crate::AsyncRx::recv_timeout()) with tokio sleep function. (conflict
185//! with `async_std` feature)
186//!
187//! * `async_std`: Enable send_timeout, recv_timeout with async-std sleep function. (conflict
188//! with `tokio` feature)
189//!
190//! * `trace_log`: Development mode, to enable internal log while testing or benchmark, to debug deadlock issues.
191//!
192//! ### Example
193//!
194//! blocking / async sender receiver mixed together
195//!
196//! ```rust
197//!
198//! extern crate crossfire;
199//! use crossfire::*;
200//! #[macro_use]
201//! extern crate tokio;
202//! use tokio::time::{sleep, interval, Duration};
203//!
204//! #[tokio::main]
205//! async fn main() {
206//! let (tx, rx) = mpmc::bounded_async::<usize>(100);
207//! let mut recv_counter = 0;
208//! let mut co_tx = Vec::new();
209//! let mut co_rx = Vec::new();
210//! const ROUND: usize = 1000;
211//!
212//! let _tx: MTx<mpmc::Array<usize>> = tx.clone().into_blocking();
213//! co_tx.push(tokio::task::spawn_blocking(move || {
214//! for i in 0..ROUND {
215//! _tx.send(i).expect("send ok");
216//! }
217//! }));
218//! co_tx.push(tokio::spawn(async move {
219//! for i in 0..ROUND {
220//! tx.send(i).await.expect("send ok");
221//! }
222//! }));
223//! let _rx: MRx<mpmc::Array<usize>> = rx.clone().into_blocking();
224//! co_rx.push(tokio::task::spawn_blocking(move || {
225//! let mut count: usize = 0;
226//! 'A: loop {
227//! match _rx.recv() {
228//! Ok(_i) => {
229//! count += 1;
230//! }
231//! Err(_) => break 'A,
232//! }
233//! }
234//! count
235//! }));
236//! co_rx.push(tokio::spawn(async move {
237//! let mut count: usize = 0;
238//! 'A: loop {
239//! match rx.recv().await {
240//! Ok(_i) => {
241//! count += 1;
242//! }
243//! Err(_) => break 'A,
244//! }
245//! }
246//! count
247//! }));
248//! for th in co_tx {
249//! let _ = th.await.unwrap();
250//! }
251//! for th in co_rx {
252//! recv_counter += th.await.unwrap();
253//! }
254//! assert_eq!(recv_counter, ROUND * 2);
255//! }
256//! ```
257
258#[allow(private_bounds)]
259/// lockless queue implementation and channel flavor traits
260pub mod flavor;
261mod shared;
262pub use shared::ChannelShared;
263
264mod backoff;
265pub use backoff::detect_backoff_cfg;
266
267#[allow(dead_code)]
268mod collections;
269#[allow(dead_code)]
270mod waker;
271#[allow(private_bounds)]
272mod waker_registry;
273
274pub mod mpmc;
275pub mod mpsc;
276pub mod oneshot;
277pub mod spsc;
278pub mod waitgroup;
279
280mod blocking_tx;
281pub use blocking_tx::*;
282#[allow(private_bounds)]
283mod blocking_rx;
284pub use blocking_rx::*;
285mod async_tx;
286pub use async_tx::*;
287#[allow(private_bounds)]
288mod async_rx;
289pub use async_rx::*;
290
291#[cfg(feature = "compat")]
292pub mod compat;
293pub mod null;
294pub mod sink;
295pub mod stream;
296
297mod crossbeam;
298pub use crossbeam::err::*;
299#[allow(private_bounds)]
300pub mod select;
301
302/// logging macro for development
303#[macro_export(local_inner_macros)]
304macro_rules! trace_log {
305 ($($arg:tt)+)=>{
306 #[cfg(feature="trace_log")]
307 {
308 log::debug!($($arg)+);
309 }
310 };
311}
312
313/// logging macro for development under tokio
314#[macro_export(local_inner_macros)]
315macro_rules! tokio_task_id {
316 () => {{
317 #[cfg(all(feature = "trace_log", feature = "tokio"))]
318 {
319 tokio::task::try_id()
320 }
321 #[cfg(not(all(feature = "trace_log", feature = "tokio")))]
322 {
323 ""
324 }
325 }};
326}
327
328use flavor::Flavor;
329use std::sync::Arc;
330
331/// type limiter for channel builder
332pub trait SenderType {
333 type Flavor: Flavor;
334 fn new(shared: Arc<ChannelShared<Self::Flavor>>) -> Self;
335}
336
337/// type limiter for channel builder
338pub trait ReceiverType: AsRef<ChannelShared<Self::Flavor>> {
339 type Flavor: Flavor;
340
341 fn new(shared: Arc<ChannelShared<Self::Flavor>>) -> Self;
342}
343
344pub trait NotCloneable {}