Skip to main content

Module spmc

Module spmc 

Source
Expand description

Single-producer multi-consumer bounded queue.

A lock-free ring buffer optimized for one producer thread fanning out to multiple consumer threads. Uses Vyukov-style turn counters with CAS-based head claiming for consumers.

§Design

┌─────────────────────────────────────────────────────────────────┐
│ Shared (Arc):                                                   │
│   head: CachePadded<AtomicUsize>   ← Consumers CAS here         │
│   tail: CachePadded<AtomicUsize>   ← Producer publishes here    │
│   producer_alive: AtomicBool       ← Disconnection detection    │
│   slots: *mut Slot<T>              ← Per-slot turn counters     │
└─────────────────────────────────────────────────────────────────┘

┌─────────────────────┐     ┌─────────────────────┐
│ Producer (!Clone):  │     │ Consumer (Clone):    │
│   local_tail        │     │   shared: Arc        │
│   shared: Arc       │     └─────────────────────┘
└─────────────────────┘

The producer writes directly (no CAS) since it’s the sole writer. Consumers compete via CAS on the head index to claim slots. After claiming, the consumer reads the data and advances the turn counter for the next producer lap.

§Turn Counter Protocol

For slot at index i on lap turn:

  • turn * 2: Slot is ready for producer to write
  • turn * 2 + 1: Slot contains data, ready for consumer

§Disconnection

Unlike MPSC where Arc::strong_count == 1 detects disconnection on both sides, SPMC consumers hold Arc refs to each other. An AtomicBool tracks whether the producer is alive so consumers can detect disconnection.

§Example

use nexus_queue::spmc;
use std::thread;

let (tx, rx) = spmc::bounded::<u64>(1024);

let rx2 = rx.clone();
let rx1 = rx;
let h1 = thread::spawn(move || {
    let mut received = Vec::new();
    loop {
        if let Some(v) = rx1.pop() {
            received.push(v);
        } else if rx1.is_disconnected() {
            while let Some(v) = rx1.pop() { received.push(v); }
            break;
        } else {
            std::hint::spin_loop();
        }
    }
    received
});
let h2 = thread::spawn(move || {
    let mut received = Vec::new();
    loop {
        if let Some(v) = rx2.pop() {
            received.push(v);
        } else if rx2.is_disconnected() {
            while let Some(v) = rx2.pop() { received.push(v); }
            break;
        } else {
            std::hint::spin_loop();
        }
    }
    received
});

for i in 0..200 {
    while tx.push(i).is_err() { std::hint::spin_loop(); }
}
drop(tx);

let mut all: Vec<_> = h1.join().unwrap();
all.extend(h2.join().unwrap());
all.sort();
assert_eq!(all, (0..200).collect::<Vec<_>>());

Structs§

Consumer
The consumer endpoint of an SPMC queue.
Producer
The producer endpoint of an SPMC queue.

Functions§

bounded
Creates a bounded SPMC queue with the given capacity.