crossfire 2.1.8

channels for async and threads
Documentation

Crossfire

Build Status License Cargo Documentation Rust 1.36+

High-performance lockless spsc/mpsc/mpmc channels.

It supports async contexts, and communication between async and blocking contexts.

The low level is based on crossbeam-queue.

For the concept, please refer to the wiki.

Version history

  • V1.0: Released in 2022.12 and used in production.

  • V2.0: Released in 2025.6. Refactored the codebase and API by removing generic types from the ChannelShared type, which made it easier to code with.

  • v2.1: Released in 2025.9. Removed the dependency on crossbeam-channel and implemented with a modified version of crossbeam-queue, which brings performance improvements for both async and blocking contexts.

Performance

Being a lockless channel, crossfire outperforms other async-capable channels. And thanks to a lighter notification mechanism, in a blocking context, some cases are even better than the original crossbeam-channel,

More benchmark data is posted on wiki.

Also, being a lockless channel, the algorithm relies on spinning and yielding. Spinning is good on multi-core systems, but not friendly to single-core systems (like virtual machines). So we provide a function detect_backoff_cfg() to detect the running platform. Calling it within the initialization section of your code, will get a 2x performance boost on VPS.

The benchmark is written in the criterion framework. You can run the benchmark by:

cargo bench --bench crossfire

Test status

NOTE: Because v2.1 has push the speed to a level no one has gone before, it can put a pure pressure to the async runtime. Some hidden bug (especially atomic ops on weaker ordering platform) might occur:

v2.0.26 (legacy):

Debugging deadlock issue

Debug locally:

Use --features trace_log to run the bench or test until it hangs, then press ctrl+c or send SIGINT, there will be latest log dump to /tmp/crossfire_ring.log (refer to tests/common.rs _setup_log())

Debug with github workflow: https://github.com/frostyplanet/crossfire-rs/issues/37

APIs

Modules and functions

There are 3 modules: spsc, mpsc, mpmc, providing functions to allocate different types of channels.

The SP or SC interface is only for non-concurrent operation. It's more memory-efficient than MP or MC implementations, and sometimes slightly faster.

The return types in these 3 modules are different:

  • mpmc::bounded_blocking() : (tx blocking, rx blocking)

  • mpmc::bounded_async() : (tx async, rx async)

  • mpmc::bounded_tx_async_rx_blocking() : (tx async, rx blocking)

  • mpmc::bounded_tx_blocking_rx_async() : (tx blocking, rx async)

  • mpmc::unbounded_blocking() : (tx non-blocking, rx blocking)

  • mpmc::unbounded_async() : (tx non-blocking, rx async)

NOTE : For a bounded channel, a 0 size case is not supported yet. (Temporary rewrite as 1 size).

Types

For the SP / SC version, AsyncTx, AsyncRx, Tx, and Rx are not Clone and without Sync. Although can be moved to other threads, but not allowed to use send/recv while in an Arc. (Refer to the compile_fail examples in the type document).

The benefit of using the SP / SC API is completely lockless waker registration, in exchange for a performance boost.

The sender/receiver can use the From trait to convert between blocking and async context counterparts.

Error types

Error types are the same as crossbeam-channel: TrySendError, SendError, TryRecvError, RecvError

Feature flags

  • tokio: Enable send_timeout, recv_timeout API for async context, based on tokio. And will detect the right backoff strategy for the type of runtime (multi-threaded / current-thread).

  • async_std: Enable send_timeout, recv_timeout API for async context, based on async-std.

Async compatibility

Tested on tokio-1.x and async-std-1.x, crossfire is runtime-agnostic.

The following scenarios are considered:

  • The AsyncTx::send() and AsyncRx:recv() operations are cancellation-safe in an async context. You can safely use the select! macro and timeout() function in tokio/futures in combination with recv(). On cancellation, [SendFuture] and [RecvFuture] will trigger drop(), which will clean up the state of the waker, making sure there is no mem-leak and deadlock. But you cannot know the true result from SendFuture, since it's dropped upon cancellation. Thus, we suggest using AsyncTx::send_timeout() instead.

  • When the "tokio" or "async_std" feature is enabled, we also provide two additional functions:

  • AsyncTx::send_timeout(), which will return the message that failed to be sent in [SendTimeoutError]. We guarantee the result is atomic. Alternatively, you can use AsyncTx::send_with_timer().

  • AsyncRx::recv_timeout(), we guarantee the result is atomic. Alternatively, you can use crate::AsyncRx::recv_with_timer().

  • Between blocking context and async context, and between different async runtime instances.

  • The async waker footprint.

When using a multi-producer and multi-consumer scenario, there's a small memory overhead to pass along a Weak reference of wakers. Because we aim to be lockless, when the sending/receiving futures are canceled (like tokio::time::timeout()), it might trigger an immediate cleanup if the try-lock is successful, otherwise will rely on lazy cleanup. (This won't be an issue because weak wakers will be consumed by actual message send and recv). On an idle-select scenario, like a notification for close, the waker will be reused as much as possible if poll() returns pending.

Usage

Cargo.toml:

[dependencies]
crossfire = "2.1"

Example with tokio::select!

extern crate crossfire;
use crossfire::*;
#[macro_use]
extern crate tokio;
use tokio::time::{sleep, interval, Duration};

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::bounded_async::<i32>(100);
    for _ in 0..10 {
        let _tx = tx.clone();
        tokio::spawn(async move {
            for i in 0i32..10 {
                let _ = _tx.send(i).await;
                sleep(Duration::from_millis(100)).await;
                println!("sent {}", i);
            }
        });
    }
    drop(tx);
    let mut inv = tokio::time::interval(Duration::from_millis(500));
    loop {
        tokio::select! {
            _ = inv.tick() =>{
                println!("tick");
            }
            r = rx.recv() => {
                if let Ok(_i) = r {
                    println!("recv {}", _i);
                } else {
                    println!("rx closed");
                    break;
                }
            }
        }
    }
}