# Architecture
Crossbar is a single Rust crate. It moves data between processes at O(1) cost by transferring only an 8-byte descriptor (block index + data length) through a seqlock ring. The block lives in a shared mmap region; the subscriber reads it in-place.
---
## Crate structure
```
src/
lib.rs #![no_std] crate root, feature gates, public re-exports
pod.rs Pod trait
error.rs Error
wait.rs WaitStrategy
ffi.rs C FFI bindings (#[cfg(feature = "ffi")])
protocol/ no_std — pure atomics, raw pointer math, no OS calls
layout.rs All SHM offset constants and layout helper functions
config.rs Config
region.rs Region — Treiber stack, seqlock ring, refcount, commit_to_ring
platform/ #[cfg(feature = "std")] — every OS call lives here
mmap.rs RawMmap
notify.rs futex / WaitOnAddress / WFE
shm.rs Publisher, Subscriber
subscription.rs Stream, Sample, TypedSample
loan.rs Loan, TypedLoan, Topic
channel.rs Channel — bidirectional channel
pod_bus.rs PodBus, PodSubscriber — SPMC broadcast ring
include/
crossbar.h C/C++ header
```
The split is intentional: `protocol/` can be used on bare-metal (no OS) if you bring your own mmap. `platform/` is the opinionated std implementation that most users want.
---
## Memory layout
One mmap region — three logical areas:
```
+---------------------------+ offset 0
| version (u32) | 3
| max_topics (u32) |
| block_count (u32) |
| block_size (u32) |
| ring_depth (u32) |
| pool_head (AtomicU64) | Treiber stack head: pack(generation, index)
| heartbeat (AtomicU64) | µs since epoch
| pid (u64) |
| stale_timeout_us (u64) |
+---------------------------+ offset 128
| Topic Entry 0 (128 bytes) |
| 1st cache line (hot): |
| 0x00 ACTIVE (u32) | AtomicU32
| 0x04 (reserved, u32) | (was NOTIFY — now merged into WRITE_SEQ)
| 0x08 WRITE_SEQ(u64) | AtomicU64 — monotonic publish counter + futex wake address
| 0x10 WAITERS (u32) | AtomicU32 — blocked subscriber count
| 2nd cache line (cold): |
| 0x14 URI_LEN (u32) |
| 0x18 URI_HASH (u64) | FNV-1a
| 0x20 URI (64B) |
| 0x60 TYPE_SIZE(u32) | size_of::<T>() for typed topics; 0 = untyped
| 0x64 PINNED_BLOCK (u32) | block index for pinned publish; NO_BLOCK = none
| 0x68 PINNED_READERS | AtomicU32 — active pinned-read guard count
| 0x70 PINNED_SEQ (u64) | AtomicU64 — packed (seq:32 | data_len:32) seqlock
| 0x78 SUBSCRIBER_COUNT | AtomicU32 — live Stream count for this topic
+---------------------------+
| Topic Entry 1 … |
+---------------------------+
| Ring 0 (ring_depth × 16B) |
| per entry: |
| 0x00 seq (AtomicU64)| seqlock — SEQ_WRITING (u64::MAX) = being written
| 0x08 block_idx(AtomicU32)| Relaxed
| 0x0C data_len (AtomicU32)| Relaxed
+---------------------------+
| Ring 1 … |
+---------------------------+
| Block Pool |
| per block: |
| 0x00 next_free(AtomicU32)| Treiber free-list link
| 0x04 refcount (AtomicU32)|
| 0x08 data … | user payload starts here
+---------------------------+
```
Hot topic fields (`ACTIVE`, `WRITE_SEQ`, `WAITERS`) are packed into the first cache line of each topic entry to avoid cache misses on the subscriber polling path.
---
## Publish path
1. Pop block from per-publisher cache, or Treiber stack if cache empty (CAS on `pool_head`)
2. Write payload into block at `BLOCK_DATA_OFFSET` (offset 8)
3. Claim sequence number: `fetch_add(1)` on `WRITE_SEQ` (AcqRel)
4. Compute slot: `seq & (ring_depth - 1)` (bitmask, ring_depth must be power of 2)
5. Acquire ring slot — **single-publisher**: plain store of `SEQ_WRITING` (Release); **multi-publisher**: `compare_exchange_weak(current, SEQ_WRITING)` CAS loop with `yield_hint`
6. Set block refcount to 1 (Release)
7. Write `block_idx` and `data_len` (Relaxed) — safe, bracketed by seqlock
8. Seqlock close: store new `seq` (Release) — slot is now readable
9. Decrement old block's refcount (AcqRel); free if it reaches zero
10. Smart wake: call `futex_wake` on `WRITE_SEQ` low-32 bits only if `WAITERS > 0`
Step 10 means `publish()` costs zero extra when all subscribers use `try_recv()` (no blocked waiters). The futex syscall (~170 ns) fires only when a subscriber is blocked in `recv()`.
The per-publisher block cache (step 1) eliminates the Treiber stack CAS for ~87% of allocations. Each publisher caches up to 8 blocks locally.
---
## Receive path
1. Load `WRITE_SEQ` (Acquire) — return `None` if unchanged
2. Compute scan window: `[last_seq+1 .. write_seq]`, clamped to `ring_depth`
3. For each seq in the window (first committed slot wins):
a. Compute slot: `seq & (ring_depth - 1)`
b. Seqlock check 1: load slot `seq` (Acquire) — skip if `SEQ_WRITING` or mismatch
c. Read `block_idx` and `data_len` (Relaxed) — safe inside seqlock bracket
d. CAS-increment block refcount (AcqRel) — acquire a reference
e. Seqlock check 2: verify slot `seq` again — undo refcount if overwritten
4. Advance `last_seq`, return `Sample`
5. On guard drop: decrement refcount (Release); if last reference, acquire fence then free block
In single-publisher mode, the scan window is always 1 slot — the loop executes once. Under multi-publisher, at most `ring_depth` slots are scanned (default: 8, each check is ~9 ns).
---
## Seqlock correctness
Ring data fields (`block_idx`, `data_len`) are read and written with `Relaxed` ordering. This is not a data race: `AtomicU32` operations are atomic by definition. The memory ordering (visibility guarantees) comes from the seqlock bracket — the Release store at seqlock close synchronizes-with the Acquire load at seqlock check 1. The Relaxed data reads between the two checks are safe because they happen within the same coherent atomic region.
The seqlock uses `SEQ_WRITING` (`u64::MAX`) as the "slot locked" sentinel instead of 0. Publishers CAS the slot from its current seq to `SEQ_WRITING` before writing, then store the new seq on close. This allows multiple publishers to safely contend on the same ring slot — the CAS serializes writes, and subscribers skip slots where `entry_seq == SEQ_WRITING`.
---
## Treiber stack
The block pool is a lock-free stack with a 64-bit `(generation, index)` head pointer. The generation counter prevents ABA: even if the same block is freed and reallocated between a load and a CAS, the generation mismatch causes the CAS to fail.
- **Alloc**: CAS-weak head to `(gen+1, next_free_of_head_block)`; reuse `Err(current)` on retry
- **Free**: CAS-weak head to `(gen+1, freed_block)`; write old head index into freed block's link field
- **Per-publisher cache**: Each publisher caches up to 8 blocks locally, refilling from the global stack when empty. This amortizes the CAS over multiple allocations.
- **Block recycling**: When `commit_to_ring` frees the old block, it stashes the index in `Region.last_freed` instead of pushing to the stack. The next `alloc_cached` grabs this block first — it's still warm in L1/L2 from the previous write, eliminating RFO cache misses for large payloads.
---
## Pod trait and typed pub/sub
`Pod` is an unsafe marker trait:
```rust
pub unsafe trait Pod: Copy + Send + 'static {}
```
The implementor guarantees:
1. `Copy + 'static` — no heap, no lifetimes
2. Every bit pattern is a valid value — no padding-dependent invariants, no niche optimizations
3. `#[repr(C)]` — deterministic field layout
At runtime, `register_typed::<T>()` writes `size_of::<T>()` into the topic entry (`TYPE_SIZE`). `try_recv_typed::<T>()` panics if the stored size doesn't match — catching publisher/subscriber type mismatches across process boundaries.
---
## Smart wake
Notification is merged into `WRITE_SEQ`: the publisher's `fetch_add` on `WRITE_SEQ` changes the value, and subscribers in `recv()` futex-wait on the low 32 bits of `WRITE_SEQ`. A subscriber registers itself in `TE_WAITERS` before sleeping; it unregisters on wakeup. The publisher reads `WAITERS` before calling `futex_wake` — if zero, it skips the syscall entirely.
Result: when all subscribers poll with `try_recv()`, publish has zero notification overhead — no separate counter to increment. The ~170 ns futex overhead only appears when a subscriber is genuinely blocked.
---
## Platform wake
| Linux | `futex(FUTEX_WAIT)` / `futex(FUTEX_WAKE)` via `libc::syscall` |
| Windows | `WaitOnAddress` / `WakeByAddressAll` from Win32 |
| macOS x86 | 1 ms sleep (no `futex` equivalent) |
| macOS aarch64 | `WFE` (Wait For Event) — wakes on cache-line invalidation, ~30 ns |
The spin phase uses `PAUSE` on x86 and `SEVL + WFE` on aarch64. WFE puts the core into a low-power state until the publisher's store invalidates the cache line — effectively hardware-assisted polling.
---
## Heartbeat and liveness
Publishers store a microsecond-resolution timestamp in the global header every `heartbeat_interval` (default 100 ms), amortized over 1024 loan calls to avoid `Instant::now()` overhead on the hot path. With multiple publishers, `fetch_max` ensures the heartbeat only advances — any live publisher keeps the region alive.
Subscribers check the heartbeat when blocking in `recv()`. If the timestamp is older than `stale_timeout` (default 5 s), `recv()` returns `Err(Error::PublisherDead)`.
---
## Multi-publisher
Multiple publishers can share the same SHM region via `Publisher::open()`. The protocol supports this through:
- **Atomic seq claiming**: `fetch_add(1)` on `WRITE_SEQ` gives each publisher a unique, monotonically increasing sequence number at commit time
- **CAS-based ring slot locking**: `compare_exchange(current, SEQ_WRITING)` serializes writes to the same ring slot when two publishers' seqs differ by exactly `ring_depth`
- **CAS-based topic registration**: Three-state `ACTIVE` field (`FREE=0 → INIT=2 → ACTIVE=1`) prevents concurrent register races
- **Shared flock**: The creating publisher downgrades its exclusive lock to shared after initialization. Secondary publishers acquire shared locks. All shared locks together prevent a new `create()` from truncating the region.
- **`fetch_max` heartbeat**: Any live publisher keeps the heartbeat fresh
The block pool (Treiber stack) is already lock-free and handles concurrent alloc/free from multiple publishers without modification.
---
## Bidirectional channel
`Channel` composes two pub/sub regions into a bidirectional pair. Each side publishes on its own region and subscribes to the other's:
```
Server Client
Publisher("rpc-srv") ──────────> Stream("rpc-srv")
Stream("rpc-cli") <────────── Publisher("rpc-cli")
```
- `listen()` creates `{name}-srv`, polls for `{name}-cli`
- `connect()` creates `{name}-cli`, connects to `{name}-srv`
- Both sides get an `Channel` with `send()` / `recv()` / `loan()`
- Subscriptions start from seq 0 (not latest) so early messages are not missed
This is built entirely on the existing pub/sub transport — no new protocol machinery. The channel adds ~0 ns overhead vs raw pub/sub.
---
## C / C++ FFI
The `ffi` feature (`src/ffi.rs`) exposes `extern "C"` functions with opaque pointer types. The C header is `include/crossbar.h`.
Key design decisions:
- **No exposed loans in FFI** — `crossbar_publish()` copies data and publishes in one call. The "born-in-SHM" pattern requires Rust lifetimes that don't translate to C.
- **Self-owned samples** — `CrossbarSample` holds an `Arc<Region>` to keep the mmap alive, replacing the borrowed `Sample<'a>`. Refcount semantics are identical.
- **Value-type topic handle** — `crossbar_topic_t` is a small `#[repr(C)]` struct, safe to copy.
Build: `cargo rustc --release --features ffi --crate-type cdylib`
---
## PodBus — SPMC broadcast ring
`PodBus<T: Pod>` is a single-producer, multi-consumer broadcast ring for `Pod` types. It lives entirely in process memory (no SHM) and is designed for high fan-out scenarios where one producer feeds many consumers with the same data stream.
**Architecture:**
```
PodBus<T>
ring: Box<[Slot<T>]> power-of-two array of seqlock-stamped slots
mask: u64 ring_size - 1, for bitmask indexing
write_seq: AtomicU64 monotonic publish counter
Slot<T>
stamp: AtomicU64 seqlock: 0 = never written, odd = writing, even = committed
data: UnsafeCell<T> payload (volatile read/write)
PodSubscriber<T>
cursor: u64 independent read position per consumer
```
**Seqlock stamp protocol:**
- `0` — slot never written
- `seq * 2 + 1` (odd) — write in progress, readers must retry
- `(seq + 1) * 2` (even) — committed and readable
The publisher writes the data between the odd and even stamp transitions. Subscribers snapshot the stamp, read via `read_volatile`, then verify the stamp is unchanged. If the stamp changed mid-read, the data was torn and the read is discarded.
**Key properties:**
- Publish is O(1) regardless of subscriber count — no per-subscriber bookkeeping
- Each `PodSubscriber` tracks its own cursor independently
- Subscribers that fall behind auto-skip to the oldest available data (lossy)
- `T: Pod` ensures all bit patterns are valid, so torn reads are harmless (just discarded)
- No heap allocation on publish or receive
---
## Subscriber count
Each topic entry contains an `AtomicU32` counter at offset `0x78` (`TE_SUBSCRIBER_COUNT`) that tracks the number of live `Stream` objects for that topic.
- **Increment**: When `Subscriber::subscribe()` creates a new `Stream`, the counter is incremented via `fetch_add(1, Relaxed)`.
- **Decrement**: `Stream` implements `Drop`, which decrements the counter via `fetch_sub(1, Relaxed)`.
- **Query**: `Publisher::subscriber_count(&self, handle) -> Result<u32, Error>` reads the counter. The FFI equivalent is `crossbar_topic_subscriber_count()`.
The counter uses `Relaxed` ordering because it is advisory — publishers use it for monitoring and graceful shutdown, not for correctness-critical synchronization. The `Channel` field order ensures `rx: Stream` is declared before `_rx_sub: Subscriber`, so the subscription drops (and decrements the counter) before the subscriber unmaps the region.