Expand description
Single-producer multi-consumer bounded queue.
A lock-free ring buffer optimized for one producer thread fanning out to multiple consumer threads. Uses Vyukov-style turn counters with CAS-based head claiming for consumers.
§Design
┌─────────────────────────────────────────────────────────────────┐
│ Shared (Arc): │
│ head: CachePadded<AtomicUsize> ← Consumers CAS here │
│ tail: CachePadded<AtomicUsize> ← Producer publishes here │
│ producer_alive: AtomicBool ← Disconnection detection │
│ slots: *mut Slot<T> ← Per-slot turn counters │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────┐ ┌─────────────────────┐
│ Producer (!Clone): │ │ Consumer (Clone): │
│ local_tail │ │ shared: Arc │
│ shared: Arc │ └─────────────────────┘
└─────────────────────┘The producer writes directly (no CAS) since it’s the sole writer. Consumers compete via CAS on the head index to claim slots. After claiming, the consumer reads the data and advances the turn counter for the next producer lap.
§Turn Counter Protocol
For slot at index i on lap turn:
turn * 2: Slot is ready for producer to writeturn * 2 + 1: Slot contains data, ready for consumer
§Disconnection
Unlike MPSC where Arc::strong_count == 1 detects disconnection on both
sides, SPMC consumers hold Arc refs to each other. An AtomicBool tracks
whether the producer is alive so consumers can detect disconnection.
§Example
use nexus_queue::spmc;
use std::thread;
let (tx, rx) = spmc::bounded::<u64>(1024);
let rx2 = rx.clone();
let rx1 = rx;
let h1 = thread::spawn(move || {
let mut received = Vec::new();
loop {
if let Some(v) = rx1.pop() {
received.push(v);
} else if rx1.is_disconnected() {
while let Some(v) = rx1.pop() { received.push(v); }
break;
} else {
std::hint::spin_loop();
}
}
received
});
let h2 = thread::spawn(move || {
let mut received = Vec::new();
loop {
if let Some(v) = rx2.pop() {
received.push(v);
} else if rx2.is_disconnected() {
while let Some(v) = rx2.pop() { received.push(v); }
break;
} else {
std::hint::spin_loop();
}
}
received
});
for i in 0..200 {
while tx.push(i).is_err() { std::hint::spin_loop(); }
}
drop(tx);
let mut all: Vec<_> = h1.join().unwrap();
all.extend(h2.join().unwrap());
all.sort();
assert_eq!(all, (0..200).collect::<Vec<_>>());Structs§
Functions§
- bounded
- Creates a bounded SPMC queue with the given capacity.