crossbar 0.5.0

Zero-copy pub/sub over shared memory. URI-addressed. O(1) transfer at any payload size.
Documentation

crossbar

Crates.io docs.rs License no_std CI MSRV

Zero-copy pub/sub over shared memory. URI-addressed. O(1) transfer at any payload size.

Transfers an 8-byte descriptor through a lock-free ring — O(1) regardless of payload. Subscribers read directly from shared memory. No copy, no serialization, no service discovery layer.


When to use crossbar

  • High-frequency small messages: market data ticks, sensor readings, telemetry, game state
  • Rust-native multi-process pipelines where latency compounds (10 000+ msg/s)
  • Topics that need to be discovered at runtime by URI, not wired at compile time
  • You want one crate with no heavy dependencies

When not to use crossbar

  • Payload > 64 KB and you're copying into the block — both frameworks are memcpy-bound at that point and latency is equal

Installation

[dependencies]
crossbar = "0.4"

Quick start

Byte-oriented

Publisher — write any bytes into shared memory:

use crossbar::*;

let mut pub_ = ShmPublisher::create("market", PubSubConfig::default())?;
let topic = pub_.register("/prices/AAPL")?;

let mut loan = pub_.loan(&topic).unwrap();
loan.set_data(b"42.50").unwrap();
loan.publish(); // O(1) — writes 8 bytes to ring

Subscriber — read in-place, zero copies, no unsafe:

use crossbar::*;

let sub = ShmSubscriber::connect("market")?;
let stream = sub.subscribe("/prices/AAPL")?;

if let Some(guard) = stream.try_recv() {
    println!("{}", std::str::from_utf8(&guard).unwrap());
} // guard drops → block freed back to pool

Typed

Any Copy + 'static struct where every bit pattern is valid can implement Pod for direct zero-copy reads:

use crossbar::*;

#[derive(Clone, Copy)]
#[repr(C)]
struct Tick { price: f64, volume: u64 }

unsafe impl Pod for Tick {}

// Publisher
let mut pub_ = ShmPublisher::create("market", PubSubConfig::default())?;
let topic = pub_.register_typed::<Tick>("/prices/AAPL")?;
let mut loan = pub_.loan_typed::<Tick>(&topic).unwrap();
*loan.as_mut() = Tick { price: 42.50, volume: 1000 };
loan.publish();

// Subscriber
let sub = ShmSubscriber::connect("market")?;
let stream = sub.subscribe("/prices/AAPL")?;

if let Some(guard) = stream.try_recv_typed::<Tick>() {
    println!("${:.2} × {}", guard.price, guard.volume);
}

Blocking receive

// Default: three-phase spin → yield → futex/WFE
let guard = stream.recv()?;

// Or pick a strategy
let guard = stream.recv_with(WaitStrategy::BusySpin)?;

Multi-publisher

Multiple publishers can share the same region — one creates, others join:

use crossbar::*;

// Process A — creates the region
let mut pub_a = ShmPublisher::create("market", PubSubConfig::default())?;
let topic_a = pub_a.register("/prices/AAPL")?;

// Process B — joins the existing region
let mut pub_b = ShmPublisher::open("market")?;
let topic_b = pub_b.register("/prices/GOOG")?;
// Or publish to the same topic as pub_a:
let topic_b2 = pub_b.register("/prices/AAPL")?;

Sequence numbers are claimed atomically via fetch_add. CAS-based ring slot locking prevents corruption when two publishers write to the same slot. Subscribers scan the ring window to handle out-of-order commits.

Bidirectional channel

ShmChannel wraps two pub/sub regions into a TCP-like pair — one side listens, the other connects:

use crossbar::*;
use std::time::Duration;

// Process A (server)
let mut srv = ShmChannel::listen("rpc", PubSubConfig::default(),
    Duration::from_secs(30))?;

// Process B (client)
let mut cli = ShmChannel::connect("rpc", PubSubConfig::default(),
    Duration::from_secs(5))?;

cli.send(b"request")?;
let msg = srv.recv()?;
// ... process and respond
srv.send(b"response")?;
let reply = cli.recv()?;

Born-in-SHM (zero-copy publish)

Write directly into the pool block — no intermediate buffer, no copy at any payload size:

let mut loan = pub_.loan(&topic).unwrap();
let buf = loan.as_mut_slice();
// write directly into shared memory
encode_frame(&mut buf[..frame_len]);
loan.set_len(frame_len).unwrap();
loan.publish();

Performance

All measurements: Criterion, same-process publisher + subscriber, try_recv (no futex).

Intel i7-10700KF · Linux 6.8 · rustc 1.87

crossbar iceoryx2 speedup
8 B (transport overhead) 54 ns 232 ns 4.3×
1 KB 66 ns 243 ns 3.7×
64 KB 1.35 µs 1.38 µs ~1×
1 MB 31 µs 30 µs ~1×

Apple M1 Pro · macOS · rustc 1.92 (v0.3.0, pre-optimization)

crossbar iceoryx2 speedup
8 B (transport overhead) 52 ns 189 ns 3.6×
1 KB 77 ns 210 ns 2.7×
64 KB 1.27 µs 1.35 µs 1.1×
1 MB 23.9 µs 23.5 µs ~1×

The win is in the overhead. At small payloads crossbar's lighter path (no service discovery, no POSIX config layer) is 4× faster. At 64 KB+ both frameworks are memcpy-bound and converge. The 8-byte descriptor is always O(1) — payload latency scales with how long you take to write into the block.

Pinned mode (latest-value, same buffer every iteration)

crossbar iceoryx2 speedup
8 B 35 ns 229 ns 6.5×
1 KB 45 ns 238 ns 5.3×
64 KB 1.07 µs 1.30 µs 1.2×
1 MB 18.1 µs 18.4 µs ~1×

Pinned mode (loan_pinned / try_recv_pinned) reuses the same block every iteration — no allocation, no refcount, no ring. Safe API with CAS-based reader/writer exclusion. Best for market data, sensors, telemetry, game state. The 8B cost increased from 26 ns to 35 ns in v0.3.1 due to the CAS sentinel added for safety (prevents data races between publisher writes and subscriber reads).

Reproduce: cargo bench -- head_to_head (requires iceoryx2 dev-dep, Unix only).


C / C++ FFI

Build the shared library and link against it from C or C++:

cargo build --release --features ffi --crate-type cdylib
# produces target/release/libcrossbar.so (Linux), .dylib (macOS), .dll (Windows)

Include include/crossbar.h:

#include "crossbar.h"

// Publisher
crossbar_publisher_t* pub = crossbar_publisher_create("market", NULL);
crossbar_topic_t topic = crossbar_publisher_register(pub, "/prices/AAPL");
crossbar_publish(pub, topic, data, data_len);

// Subscriber
crossbar_subscriber_t* sub = crossbar_subscriber_connect("market");
crossbar_subscription_t* stream = crossbar_subscriber_subscribe(sub, "/prices/AAPL");
crossbar_sample_t* sample = crossbar_try_recv(stream);  // allocates
if (sample) {
    const uint8_t* data = crossbar_sample_data(sample);
    size_t len = crossbar_sample_len(sample);
    // zero-copy read — data points directly into SHM
    crossbar_sample_free(sample);
}

// Or zero-allocation hot path:
crossbar_sample_t sample;
if (crossbar_try_recv_into(stream, &sample)) {
    const uint8_t* data = crossbar_sample_data(&sample);
    crossbar_sample_free(&sample);
}

// Bidirectional channel
crossbar_channel_t* ch = crossbar_channel_connect("rpc", NULL, 5000);
crossbar_channel_send(ch, msg, msg_len);
crossbar_sample_t* reply = crossbar_channel_recv(ch);

Configuration

PubSubConfig {
    max_topics:          16,    // concurrent topics
    block_count:        256,    // pool blocks
    block_size:       65536,    // bytes per block (usable: block_size - 8)
    ring_depth:           8,    // samples before overwrite (must be power of 2)
    heartbeat_interval: 100ms,  // liveness signal period
    stale_timeout:        5s,   // publisher dead after this
}

The pool is a Treiber stack — lock-free allocation at any payload size. Blocks are refcounted; a subscriber holding a SampleGuard keeps the block alive.


Project layout

src/
  lib.rs                 Crate root (#![no_std], feature gates)
  pod.rs                 Pod trait — marker for safe zero-copy SHM reads
  error.rs               IpcError
  wait.rs                WaitStrategy (BusySpin / YieldSpin / BackoffSpin / Adaptive)
  ffi.rs                 C FFI bindings (behind "ffi" feature)

  protocol/              no_std core — pure atomics, no OS calls
    layout.rs            SHM layout constants and offset helpers
    config.rs            PubSubConfig
    region.rs            Region — Treiber stack, seqlock, refcount

  platform/              std only — mmap, futex, file I/O
    mmap.rs              RawMmap (MADV_HUGEPAGE on Linux)
    notify.rs            futex (Linux) / WaitOnAddress (Windows) / WFE (aarch64)
    shm.rs               ShmPublisher, ShmSubscriber
    subscription.rs      Subscription, SampleGuard, TypedSampleGuard
    loan.rs              ShmLoan, TypedShmLoan, TopicHandle
    channel.rs           ShmChannel — bidirectional channel

include/
  crossbar.h             C/C++ header for FFI consumers
tests/
  pubsub.rs              Integration tests
  typed_pubsub.rs        Typed pub/sub integration tests
  channel.rs             Bidirectional channel tests
  multi_publisher.rs     Multi-publisher tests
benches/pubsub.rs        Criterion benchmarks (+ iceoryx2 head-to-head, Unix)
examples/
  publisher.rs           Cross-process latency benchmark — publisher side
  subscriber.rs          Cross-process latency benchmark — subscriber side

no_std

The protocol core (src/protocol/, src/pod.rs, src/wait.rs, src/error.rs) is no_std + alloc. The platform layer (mmap, futex, file I/O) requires std and is gated behind features = ["std"] (the default).

Requirement: target_has_atomic = "64" — the ABA-safe Treiber stack uses 64-bit CAS.

# no_std + alloc only (protocol core, no ShmPublisher/ShmSubscriber)
crossbar = { version = "0.4", default-features = false }

# std (default — includes everything)
crossbar = "0.4"

License

Apache-2.0.