Expand description
§Crossfire
High-performance lockless spsc/mpsc/mpmc channels.
It supports async contexts and bridges the gap between async and blocking contexts.
The low level is based on crossbeam-queue. For the concept, please refer to the wiki.
§Version history
-
v1.0: Released in 2022.12 and used in production.
-
v2.0: Released in 2025.6. Refactored the codebase and API by removing generic types from the ChannelShared type, which made it easier to code with.
-
v2.1: Released in 2025.9. Removed the dependency on crossbeam-channel and implemented with a modified version of crossbeam-queue, which brings performance improvements for both async and blocking contexts.
§Test status
WARNING: Because v2.1 has push the speed to a level no one has gone before, it can put a pure pressure to the async runtime. Some hidden bug (especially atomic ops on weaker ordering platform) might occur.
Refer to the README page for known issue on specified platform and runtime.
§Performance
Being a lockless channel, crossfire outperforms other async-capable channels. And thanks to a lighter notification mechanism, some cases in blocking context are even better than the original crossbeam-channel,


More benchmark data is posted on wiki.
Also, being a lockless channel, the algorithm relies on spinning and yielding. Spinning is good on multi-core systems, but not friendly to single-core systems (like virtual machines). So we provide a function detect_backoff_cfg() to detect the running platform. Calling it within the initialization section of your code, will get a 2x performance boost on VPS.
The benchmark is written in the criterion framework. You can run the benchmark by:
cargo bench --bench crossfire
§APIs
§Modules and functions
There are 3 modules: spsc, mpsc, mpmc, providing functions to allocate different types of channels.
The SP or SC interface is only for non-concurrent operation. It’s more memory-efficient than MP or MC implementations, and sometimes slightly faster.
The return types in these 3 modules are different:
-
mpmc::bounded_blocking(): tx blocking, rx blocking
-
mpmc::bounded_async(): tx async, rx async
-
mpmc::bounded_tx_async_rx_blocking(): tx async, rx blocking
-
mpmc::bounded_tx_blocking_rx_async(): tx blocking, rx async
-
mpmc::unbounded_blocking(): tx non-blocking, rx blocking
-
mpmc::unbounded_async(): tx non-blocking, rx async
NOTE : For a bounded channel, a 0 size case is not supported yet. (Temporary rewrite as 1 size).
§Types
Context | Sender (Producer) | Receiver (Consumer) | ||
---|---|---|---|---|
Single | Multiple | Single | Multiple | |
Blocking | BlockingTxTrait | BlockingRxTrait | ||
Tx | MTx | Rx | MRx | |
Async | AsyncTxTrait | AsyncRxTrait | ||
AsyncTx | MAsyncTx | AsyncRx | MAsyncRx |
For the SP / SC version, AsyncTx, AsyncRx, Tx, and Rx are not Clone
and without Sync
.
Although can be moved to other threads, but not allowed to use send/recv while in an Arc. (Refer to the compile_fail
examples in the type document).
The benefit of using the SP / SC API is completely lockless waker registration, in exchange for a performance boost.
The sender/receiver can use the From
trait to convert between blocking and async context
counterparts.
§Error types
Error types are the same as crossbeam-channel:
TrySendError, SendError, SendTimeoutError, TryRecvError, RecvError, RecvTimeoutError
§Feature flags
-
tokio
: Enable send_timeout, recv_timeout API for async context, based ontokio
. And will detect the right backoff strategy for the type of runtime (multi-threaded / current-thread). -
async_std
: Enable send_timeout, recv_timeout API for async context, based onasync-std
.
§Async compatibility
Tested on tokio-1.x and async-std-1.x, crossfire is runtime-agnostic.
The following scenarios are considered:
-
The AsyncTx::send() and AsyncRx::recv() operations are cancellation-safe in an async context. You can safely use the select! macro and timeout() function in tokio/futures in combination with recv(). On cancellation, SendFuture and RecvFuture will trigger drop(), which will clean up the state of the waker, making sure there is no mem-leak and deadlock. But you cannot know the true result from SendFuture, since it’s dropped upon cancellation. Thus, we suggest using AsyncTx::send_timeout() instead.
-
When the “tokio” or “async_std” feature is enabled, we also provide two additional functions:
-
send_timeout(), which will return the message that failed to be sent in SendTimeoutError. We guarantee the result is atomic. Alternatively, you can use send_with_timer().
-
recv_timeout(), we guarantee the result is atomic. Alternatively, you can use recv_with_timer()
-
Between blocking context and async context, and between different async runtime instances.
-
The async waker footprint.
When using a multi-producer and multi-consumer scenario, there’s a small memory overhead to pass along a Weak
reference of wakers.
Because we aim to be lockless, when the sending/receiving futures are canceled (like tokio::time::timeout()),
it might trigger an immediate cleanup if the try-lock is successful, otherwise will rely on lazy cleanup.
(This won’t be an issue because weak wakers will be consumed by actual message send and recv).
On an idle-select scenario, like a notification for close, the waker will be reused as much as possible
if poll() returns pending.//!
§Usage
Cargo.toml:
[dependencies]
crossfire = "2.1"
§Example with tokio::select!
extern crate crossfire;
use crossfire::*;
#[macro_use]
extern crate tokio;
use tokio::time::{sleep, interval, Duration};
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::bounded_async::<i32>(100);
for _ in 0..10 {
let _tx = tx.clone();
tokio::spawn(async move {
for i in 0i32..10 {
let _ = _tx.send(i).await;
sleep(Duration::from_millis(100)).await;
println!("sent {}", i);
}
});
}
drop(tx);
let mut inv = tokio::time::interval(Duration::from_millis(500));
loop {
tokio::select! {
_ = inv.tick() =>{
println!("tick");
}
r = rx.recv() => {
if let Ok(_i) = r {
println!("recv {}", _i);
} else {
println!("rx closed");
break;
}
}
}
}
}
§For Future customization
The future object created by AsyncTx::send(), AsyncTx::send_timeout(), AsyncRx::recv(),
AsyncRx::recv_timeout() is Sized
. You don’t need to put them in Box
.
If you like to use poll function directly for complex behavior, you can call AsyncSink::poll_send() or AsyncStream::poll_item() with Context.
Modules§
- mpmc
- Multiple producers, multiple consumers.
- mpsc
- Multiple producers, single consumer.
- sink
- spsc
- Single producer, single consumer.
- stream
Macros§
Structs§
- AsyncRx
- A single consumer (receiver) that works in an async context.
- AsyncTx
- A single producer (sender) that works in an async context.
- Channel
Shared - MAsync
Rx - A multi-consumer (receiver) that works in an async context.
- MAsync
Tx - A multi-producer (sender) that works in an async context.
- MRx
- A multi-consumer (receiver) that works in a blocking context.
- MTx
- A multi-producer (sender) that works in a blocking context.
- Ready
Timeout Error - An error returned from the
ready_timeout
method. - Recv
Error - An error returned from the
recv
method. - Recv
Future - A fixed-sized future object constructed by AsyncRx::recv()
- Recv
Timeout Future - A fixed-sized future object constructed by AsyncRx::recv_timeout()
- Rx
- A single consumer (receiver) that works in a blocking context.
- Select
Timeout Error - An error returned from the
select_timeout
method. - Send
Error - An error returned from the
send
method. - Send
Future - A fixed-sized future object constructed by [AsyncTx::make_send_future()]
- Send
Timeout Future - A fixed-sized future object constructed by AsyncTx::send_timeout()
- TryReady
Error - An error returned from the
try_ready
method. - TrySelect
Error - An error returned from the
try_select
method. - Tx
- A single producer (sender) that works in a blocking context.
Enums§
- Recv
Timeout Error - An error returned from the
recv_timeout
method. - Send
Timeout Error - An error returned from the
send_timeout
method. - TryRecv
Error - An error returned from the
try_recv
method. - TrySend
Error - An error returned from the
try_send
method.
Traits§
- Async
RxTrait - For writing generic code with MAsyncRx & AsyncRx
- Async
TxTrait - For writing generic code with MAsyncTx & AsyncTx
- Blocking
RxTrait - For writing generic code with MRx & Rx
- Blocking
TxTrait - For writing generic code with MTx & Tx
Functions§
- detect_
backoff_ cfg - Detect cpu number and auto setting backoff config.