Expand description
Multi-producer single-consumer bounded queue.
A lock-free ring buffer optimized for multiple producer threads sending to one consumer thread. Uses CAS-based slot claiming with Vyukov-style turn counters for synchronization.
§Design
┌─────────────────────────────────────────────────────────────┐
│ Shared (Arc): │
│ tail: CachePadded<AtomicUsize> ← Producers CAS here │
│ head: CachePadded<AtomicUsize> ← Consumer writes │
│ slots: *mut Slot<T> ← Per-slot turn counters │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────┐ ┌─────────────────────┐
│ Producer (Clone): │ │ Consumer (!Clone): │
│ cached_head │ │ local_head │
│ shared: Arc │ │ shared: Arc │
└─────────────────────┘ └─────────────────────┘Producers compete via CAS on the tail index. After claiming a slot, the producer waits for the slot’s turn counter to indicate it’s writable, writes the data, then advances the turn to signal readiness.
The consumer checks the turn counter to know when data is ready, reads it, then advances the turn for the next producer lap.
§Turn Counter Protocol
For slot at index i on lap turn:
turn * 2: Slot is ready for producer to writeturn * 2 + 1: Slot contains data, ready for consumer
§Example
use nexus_queue::mpsc;
use std::thread;
let (mut tx, mut rx) = mpsc::bounded::<u64>(1024);
let mut tx2 = tx.clone();
let h1 = thread::spawn(move || {
for i in 0..100 {
while tx.push(i).is_err() { std::hint::spin_loop(); }
}
});
let h2 = thread::spawn(move || {
for i in 100..200 {
while tx2.push(i).is_err() { std::hint::spin_loop(); }
}
});
let mut received = 0;
while received < 200 {
if rx.pop().is_some() { received += 1; }
}
h1.join().unwrap();
h2.join().unwrap();Structs§
Functions§
- bounded
- Creates a bounded MPSC queue with the given capacity.