Crate codas_flow

Source
Expand description

codas-flow on crates.io codas-flow on docs.rs MIT licensed

Low-latency, high-throughput bounded queues (“data flows”) for (a)synchronous and event-driven systems, inspired by the LMAX Disruptor and built for codas.

§What’s Here

This crate provides the flow data structure: A “ring” buffer which concurrent, (a)synchronous tasks can publish data to and receive data from.

flows work kind of like the broadcast channels in Tokio, with some key differences:

  1. Zero-Copy Multicast Reads: Every data published to a flow is immediately available to each subscriber, in parallel, with no copies or cloning.

  2. Lock-Free by default: No locks or mutexes are used when publishing or receiving data from the flow on supported targets.

  3. Broad Compatibility:

  • no-std by default.
  • async and synchronous APIs.
  • async functionality doesn’t depend on a specific runtime or framework (not even futures!).

flows work wherever channels or queues would work, but they’re built specifically for systems that need the same data processed concurrently (or in parallel) by multiple tasks.

§Examples

Flows are created with Flow::new, which returns a tuple of (flow, subscribers):

use codas_flow::*;

// Create a flow with a capacity of 32 strings,
// and one subscriber.
let (mut flow, [mut sub]) = Flow::<String>::new(32);

// Publish "Hello!" to the next data sequence in the flow.
let seq = flow.try_next().unwrap();
seq.publish("Hello!".to_string());

// Receive the next published data sequence from the flow.
let seq = sub.try_next().unwrap();
assert_eq!("Hello!", *seq);

Data is published into a flow via Flow::try_next (or await Flow::next), which returns an UnpublishedData reference. Once this reference is published (via UnpublishedData::publish), or dropped, it becomes receivable by every subscriber.

Data is received from a flow via FlowSubscriber::try_next (or await FlowSubscriber::next), which returns a PublishedData reference.

§Subscribers

Using slice patterns, any number of subscribers can be returned by Flow::new:

use codas_flow::*;

// Create a flow with a capacity of 32 strings,
// and 2 subscribers.
let (mut flow, [mut sub_a, mut sub_b]) = Flow::<String>::new(32);

New subscribers cannot be added to an active flow. To overcome this challenge, any subscriber can be wrapped in a Stage.

§Stages

A stage is a dynamic group of data processors that share a single subscriber:

use codas_flow::{*, stage::*};

// Create a flow.
let (mut flow, [mut sub]) = Flow::<String>::new(32);

// Wrap the subscriber in a processing stage.
let mut stage = Stage::from(sub);

// Add a data processor to the stage; an indefinite 
// number of processors can be added to a stage, even
// while the flow is active.
let calls = Arc::new(AtomicU64::new(0));
let closure_calls = calls.clone();
stage.add_proc(move |proc: &mut Proc, data: &String| {
   assert_eq!("Hello!", *data);
   closure_calls.add(1, Ordering::SeqCst);
});

// Publish "Hello!" to the next data sequence in the flow.
let seq = flow.try_next().unwrap();
seq.publish("Hello!".to_string());

// Run processors for a set of data in the flow.
stage.proc();
assert_eq!(1, calls.load(Ordering::SeqCst));

Stages only receive data from the flow when one of the Stage::proc* functions is invoked; refer to the Stage docs for more information.

§Lock-Free Targets

This crate uses AtomicU64 to coordinate flow access without locks. This type is lock-free where possible, but may use locks on some platforms or compile targets.

This section contains a list of the primary targets supported by this crate, along with their support for lock-free behavior.

TargetLock-Free?
aarch64-unknown-linux-gnu (64-Bit Linux ARM)Yes
aarch64-apple-darwin (64-Bit MacOS ARM)Yes
x86_64-unknown-linux-gnu (64-Bit Linux)Yes
x86_64-apple-darwin (64-Bit MacOS)Yes
x86_64-pc-windows-gnu (64-Bit Windows)Yes
wasm32-unknown-unknown (WASM)Yes1
armv7-unknown-linux-gnueabihf (ARM Cortex A7 and A8)No2
riscv32i-unknown-none-elf (ESP 32)No

1 WASM targets don’t technically support atomic instructions. However, because WASM code is executed in a single-thread, regular variables are simply substituted for their atomic counterparts, enabling full lock-free support.

2 Confirmation required; a safe assumption is that 32-bit targets don’t support atomic operations on 64-bit values.

§Relative Performance (“Benchmarks”)

Operationcodas (flow)codas (stage)codas (stage w/ tokio-yield)tokio (mpsc)tokio (broadcast)
Move -> Read55ns (18M/s)26ns (38M/s)19ns (53M/s)--
Move -> Take---70ns (14M/s)-
Move -> Clone----33ns (30M/s)

Comparitive performance of different scenarios we’ve written benchmarks for. Exact numbers will vary between platforms.

§License

Copyright © 2024—2025 With Caer, LLC and Alicorn Systems, LLC.

Licensed under the MIT license. Refer to the license file for more info.

Modules§

async_support
Runtime-agnostic async utilities.
stage
Unstable

Structs§

Flow
Bounded queue for publishing and receiving data from (a)synchronous tasks.
FlowSubscriber
Subscriber which receives data from a Flow.
UnpublishedData
Reference to mutable, unpublished data in a Flow.

Enums§

Error
Enumeration of non-retryable errors that may happen while using flows.

Traits§

Flows
Blanket trait for data in a Flow.