crossbar 0.7.0

Zero-copy pub/sub over shared memory. URI-addressed. O(1) transfer at any payload size.
Documentation

crossbar

Crates.io docs.rs License no_std CI MSRV

Zero-copy pub/sub over shared memory. URI-addressed. O(1) transfer at any payload size.

Transfers an 8-byte descriptor through a lock-free ring — O(1) regardless of payload. Subscribers read directly from shared memory. No copy, no serialization, no service discovery layer.


When to use crossbar

  • High-frequency small messages: market data ticks, sensor readings, telemetry, game state
  • Rust-native multi-process pipelines where latency compounds (10 000+ msg/s)
  • Topics that need to be discovered at runtime by URI, not wired at compile time
  • You want one crate with no heavy dependencies

When not to use crossbar

  • Payload > 64 KB and you're copying into the block — both frameworks are memcpy-bound at that point and latency is equal

Installation

[dependencies]
crossbar = "0.4"

Quick start

Byte-oriented

Publisher — write any bytes into shared memory:

use crossbar::*;

let mut pub_ = Publisher::create("market", Config::default())?;
let topic = pub_.register("/prices/AAPL")?;

let mut loan = pub_.loan(&topic).unwrap();
loan.set_data(b"42.50").unwrap();
loan.publish(); // O(1) — writes 8 bytes to ring

Subscriber — read in-place, zero copies, no unsafe:

use crossbar::*;

let sub = Subscriber::connect("market")?;
let stream = sub.subscribe("/prices/AAPL")?;

if let Some(guard) = stream.try_recv() {
    println!("{}", std::str::from_utf8(&guard).unwrap());
} // guard drops → block freed back to pool

Typed

Any Copy + 'static struct where every bit pattern is valid can implement Pod for direct zero-copy reads:

use crossbar::*;

#[derive(Clone, Copy)]
#[repr(C)]
struct Tick { price: f64, volume: u64 }

unsafe impl Pod for Tick {}

// Publisher
let mut pub_ = Publisher::create("market", Config::default())?;
let topic = pub_.register_typed::<Tick>("/prices/AAPL")?;
let mut loan = pub_.loan_typed::<Tick>(&topic).unwrap();
*loan.as_mut() = Tick { price: 42.50, volume: 1000 };
loan.publish();

// Subscriber
let sub = Subscriber::connect("market")?;
let stream = sub.subscribe("/prices/AAPL")?;

if let Some(guard) = stream.try_recv_typed::<Tick>() {
    println!("${:.2} × {}", guard.price, guard.volume);
}

Blocking receive

// Default: three-phase spin → yield → futex/WFE
let guard = stream.recv()?;

// Or pick a strategy
let guard = stream.recv_with(WaitStrategy::BusySpin)?;

Multi-publisher

Multiple publishers can share the same region — one creates, others join:

use crossbar::*;

// Process A — creates the region
let mut pub_a = Publisher::create("market", Config::default())?;
let topic_a = pub_a.register("/prices/AAPL")?;

// Process B — joins the existing region
let mut pub_b = Publisher::open("market")?;
let topic_b = pub_b.register("/prices/GOOG")?;
// Or publish to the same topic as pub_a:
let topic_b2 = pub_b.register("/prices/AAPL")?;

Sequence numbers are claimed atomically via fetch_add. CAS-based ring slot locking prevents corruption when two publishers write to the same slot. Subscribers scan the ring window to handle out-of-order commits.

Bidirectional channel

Channel wraps two pub/sub regions into a TCP-like pair — one side listens, the other connects:

use crossbar::*;
use std::time::Duration;

// Process A (server)
let mut srv = Channel::listen("rpc", Config::default(),
    Duration::from_secs(30))?;

// Process B (client)
let mut cli = Channel::connect("rpc", Config::default(),
    Duration::from_secs(5))?;

cli.send(b"request")?;
let msg = srv.recv()?;
// ... process and respond
srv.send(b"response")?;
let reply = cli.recv()?;

Born-in-SHM (zero-copy publish)

Write directly into the pool block — no intermediate buffer, no copy at any payload size:

let mut loan = pub_.loan(&topic).unwrap();
let buf = loan.as_mut_slice();
// write directly into shared memory
encode_frame(&mut buf[..frame_len]);
loan.set_len(frame_len).unwrap();
loan.publish();

Performance (v0.6.0)

All measurements: Criterion, same-process publisher + subscriber, try_recv (no futex). Same-process benchmarks; cross-process latency is typically 2-5x higher.

Intel i7-10700KF · Linux 6.8 · rustc 1.87

crossbar iceoryx2 speedup
8 B (transport overhead) 55 ns 230 ns 4.2×
1 KB 67 ns 239 ns 3.6×
64 KB 1.47 µs 1.32 µs 0.9×
1 MB 30.7 µs 29.8 µs ~1×

Apple M1 Pro · macOS · rustc 1.92 (v0.3.0, pre-optimization)

crossbar iceoryx2 speedup
8 B (transport overhead) 52 ns 189 ns 3.6×
1 KB 77 ns 210 ns 2.7×
64 KB 1.27 µs 1.35 µs 1.1×
1 MB 23.9 µs 23.5 µs ~1×

The win is in the overhead. At small payloads crossbar's lighter path (no service discovery, no POSIX config layer) is 4.2x faster. At 64 KB+ both frameworks are memcpy-bound and converge — iceoryx2 is slightly faster at large payloads. The 8-byte descriptor is always O(1) — payload latency scales with how long you take to write into the block.

Pinned mode (latest-value, same buffer every iteration)

crossbar iceoryx2 speedup
8 B 35 ns 229 ns 6.5×
1 KB 45 ns 237 ns 5.3×
64 KB 1.10 µs 1.32 µs 1.2×
1 MB 18.4 µs 18.3 µs ~1×

Pinned mode (loan_pinned / try_recv_pinned) reuses the same block every iteration — no allocation, no refcount, no ring. Safe API with CAS-based reader/writer exclusion. Best for market data, sensors, telemetry, game state.

Reproduce: cargo bench -- head_to_head (requires iceoryx2 dev-dep, Unix only).

See BENCHMARKS.md for full numbers, PodBus results, and methodology caveats.


C / C++ FFI

Build the shared library and link against it from C or C++:

cargo build --release --features ffi --crate-type cdylib
# produces target/release/libcrossbar.so (Linux), .dylib (macOS), .dll (Windows)

Include include/crossbar.h:

#include "crossbar.h"

// Publisher
crossbar_publisher_t* pub = crossbar_publisher_create("market", NULL);
crossbar_topic_t topic = crossbar_publisher_register(pub, "/prices/AAPL");
crossbar_publish(pub, topic, data, data_len);

// Subscriber
crossbar_subscriber_t* sub = crossbar_subscriber_connect("market");
crossbar_subscription_t* stream = crossbar_subscriber_subscribe(sub, "/prices/AAPL");
crossbar_sample_t* sample = crossbar_try_recv(stream);  // allocates
if (sample) {
    const uint8_t* data = crossbar_sample_data(sample);
    size_t len = crossbar_sample_len(sample);
    // zero-copy read — data points directly into SHM
    crossbar_sample_free(sample);
}

// Or zero-allocation hot path:
crossbar_sample_t sample;
if (crossbar_try_recv_into(stream, &sample)) {
    const uint8_t* data = crossbar_sample_data(&sample);
    crossbar_sample_free(&sample);
}

// Bidirectional channel
crossbar_channel_t* ch = crossbar_channel_connect("rpc", NULL, 5000);
crossbar_channel_send(ch, msg, msg_len);
crossbar_sample_t* reply = crossbar_channel_recv(ch);

Configuration

Config {
    max_topics:          16,    // concurrent topics
    block_count:        256,    // pool blocks
    block_size:       65536,    // bytes per block (usable: block_size - 8)
    ring_depth:           8,    // samples before overwrite (must be power of 2)
    heartbeat_interval: 100ms,  // liveness signal period
    stale_timeout:        5s,   // publisher dead after this
}

The pool is a Treiber stack — lock-free allocation at any payload size. Blocks are refcounted; a subscriber holding a Sample keeps the block alive.


Project layout

src/
  lib.rs                 Crate root (#![no_std], feature gates)
  pod.rs                 Pod trait — marker for safe zero-copy SHM reads
  error.rs               Error
  wait.rs                WaitStrategy (BusySpin / YieldSpin / BackoffSpin / Adaptive)
  ffi.rs                 C FFI bindings (behind "ffi" feature)

  protocol/              no_std core — pure atomics, no OS calls
    layout.rs            SHM layout constants and offset helpers
    config.rs            Config
    region.rs            Region — Treiber stack, seqlock, refcount

  platform/              std only — mmap, futex, file I/O
    mmap.rs              RawMmap (MADV_HUGEPAGE on Linux)
    notify.rs            futex (Linux) / WaitOnAddress (Windows) / WFE (aarch64)
    shm.rs               Publisher, Subscriber
    subscription.rs      Stream, Sample, TypedSample
    loan.rs              Loan, TypedLoan, Topic
    channel.rs           Channel — bidirectional channel

include/
  crossbar.h             C/C++ header for FFI consumers
tests/
  pubsub.rs              Integration tests
  typed_pubsub.rs        Typed pub/sub integration tests
  channel.rs             Bidirectional channel tests
  multi_publisher.rs     Multi-publisher tests
benches/pubsub.rs        Criterion benchmarks (+ iceoryx2 head-to-head, Unix)
examples/
  publisher.rs           Cross-process latency benchmark — publisher side
  subscriber.rs          Cross-process latency benchmark — subscriber side

no_std

The protocol core (src/protocol/, src/pod.rs, src/wait.rs, src/error.rs) is no_std + alloc. The platform layer (mmap, futex, file I/O) requires std and is gated behind features = ["std"] (the default).

Requirement: target_has_atomic = "64" — the ABA-safe Treiber stack uses 64-bit CAS.

# no_std + alloc only (protocol core, no Publisher/Subscriber)
crossbar = { version = "0.4", default-features = false }

# std (default — includes everything)
crossbar = "0.4"

License

Apache-2.0.