Crate crossfire

Crate crossfire 

Source
Expand description

§Crossfire

High-performance spsc/mpsc/mpmc channels.

It supports async context, and bridge the gap between async and blocking context.

Implemented with lockless in mind, low level is based on crossbeam-channel. For the concept, please refer to wiki.

§Stability and versions

Crossfire v1.0 has been released and used in production since 2022.12. Heavily tested on X86_64 and ARM.

V2.0 has refactored the codebase and API at 2025.6. By removing generic types of ChannelShared object in sender and receiver, it’s easier to remember and code.

V2.0.x branch will remain in maintenance mode. Future optimization might be in v2.x_beta version until long run tests prove to be stable.

§Performance

We focus on optimization of async logic, outperforming other async capability channel implementations (flume, tokio::mpsc, etc).

Due to context switching between sleep and wake, there is a certain overhead over crossbeam-channel.

Benchmark is written in criterion framework. You can run benchmark by:

cargo bench --bench crossfire

Some benchmark data is posted on wiki.

mpsc bounded size 100 async context

§APIs

§modules and functions

There are 3 modules: spsc, mpsc, mpmc, providing functions to allocate different types of channels.

The SP or SC interface, only for non-concurrent operation, it’s more memory efficient than MP or MC implementations, and sometimes slightly faster.

The return types in these 3 modules are different:

NOTE : For bounded channel, 0 size case is not supported yet. (Temporary rewrite as 1 size).

§Types

ContextSender (Producer) Receiver (Consumer)
Single MultipleSingleMultiple
BlockingBlockingTxTrait BlockingRxTrait
Tx MTx Rx MRx
Async AsyncTxTrait AsyncRxTrait
AsyncTx MAsyncTxAsyncRx MAsyncRx

NOTE: For SP / SC version AsyncTx, AsyncRx, Tx, Rx, is not Clone, and without Sync, Although can be moved to other thread, but not allowed to use send/recv while in Arc. (Refer to the compile_fail examples in type document).

§Error types

Error types are re-exported from crossbeam-channel:

TrySendError, SendError, SendTimeoutError, TryRecvError, RecvError, RecvTimeoutError

§Feature flags

  • tokio: Enable send_timeout, recv_timeout API for async context, based on tokio.

  • async_std: Enable send_timeout, recv_timeout API for async context, base on async-std.

§Async compatibility

Tested on tokio-1.x and async-std-1.x, by default we do not depend on any async runtime.

In async context, tokio-select! or future-select! can be used. Cancellation is supported. You can combine recv() future with tokio::time::timeout.

When feature “tokio” or “async_std” enable, we also provide two additional functions:

Unlike the possibility in Kanal race condition on future cancellation, in crossfire, message will not be operated outside the context of poll() function, message only moved to, or from the queue in async context, we can guarantee that no message will be lost due to the cancellation of recv(), and no spuriously send failure that message actually copied to the receiver.

While using MAsyncTx or MAsyncRx, there’s memory overhead to pass along small size wakers for pending async producer or consumer. Because we aim to be lockless, when the sending/receiving futures are cancelled (like tokio::time::timeout()), might trigger immediate cleanup if non-conflict conditions are met. Otherwise will rely on lazy cleanup. (waker will be consumed by actual message send and recv).

§Usage

Cargo.toml:

[dependencies]
crossfire = "2.0"

§Example with tokio::select!


extern crate crossfire;
use crossfire::*;
#[macro_use]
extern crate tokio;
use tokio::time::{sleep, interval, Duration};

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::bounded_async::<i32>(100);
    for _ in 0..10 {
        let _tx = tx.clone();
        tokio::spawn(async move {
            for i in 0i32..10 {
                let _ = _tx.send(i).await;
                sleep(Duration::from_millis(100)).await;
                println!("sent {}", i);
            }
        });
    }
    drop(tx);
    let mut inv = tokio::time::interval(Duration::from_millis(500));
    loop {
        tokio::select! {
            _ = inv.tick() =>{
                println!("tick");
            }
            r = rx.recv() => {
                if let Ok(_i) = r {
                    println!("recv {}", _i);
                } else {
                    println!("rx closed");
                    break;
                }
            }
        }
    }
}

§For Future customization

The future object created by AsyncTx::send(), AsyncTx::send_timeout(), AsyncRx::recv(), AsyncRx::recv_timeout() is Sized. You don’t need to put them in Box.

If you like to use poll function directly for complex behavior, you can call AsyncSink::poll_send() or AsyncStream::poll_item() with Context.

Modules§

mpmc
Multiple producers, multiple consumers.
mpsc
Multiple producers, single consumer.
sink
spsc
Single producer, single consumer.
stream

Structs§

AsyncRx
Single consumer (receiver) that works in async context.
AsyncTx
Single producer (sender) that works in async context.
ChannelShared
MAsyncRx
Multi-consumer (receiver) that works in async context.
MAsyncTx
Multi-producer (sender) that works in async context.
MRx
Multi-consumer (receiver) that works in blocking context.
MTx
Multi-producer (sender) that works in blocking context.
ReceiveFuture
A fixed-sized future object constructed by AsyncRx::recv()
ReceiveTimeoutFuture
A fixed-sized future object constructed by AsyncRx::recv_timeout()
RecvError
An error returned from the recv method.
Rx
Single consumer (receiver) that works in blocking context.
SendError
An error returned from the send method.
SendFuture
A fixed-sized future object constructed by AsyncTx::make_send_future()
SendTimeoutFuture
A fixed-sized future object constructed by AsyncTx::send_timeout()
Tx
Single producer (sender) that works in blocking context.

Enums§

RecvTimeoutError
An error returned from the recv_timeout method.
SendTimeoutError
An error returned from the send_timeout method.
TryRecvError
An error returned from the try_recv method.
TrySendError
An error returned from the try_send method.

Traits§

AsyncRxTrait
For writing generic code with MAsyncRx & AsyncRx
AsyncTxTrait
For writing generic code with MAsyncTx & AsyncTx
BlockingRxTrait
For writing generic code with MRx & Rx
BlockingTxTrait
For writing generic code with MTx & Tx