Skip to main content

Module mpsc

Module mpsc 

Source
Expand description

Multi-producer single-consumer bounded queue.

A lock-free ring buffer optimized for multiple producer threads sending to one consumer thread. Uses CAS-based slot claiming with Vyukov-style turn counters for synchronization.

§Design

┌─────────────────────────────────────────────────────────────┐
│ Shared (Arc):                                               │
│   tail: CachePadded<AtomicUsize>   ← Producers CAS here     │
│   head: CachePadded<AtomicUsize>   ← Consumer writes        │
│   slots: *mut Slot<T>              ← Per-slot turn counters │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────┐     ┌─────────────────────┐
│ Producer (Clone):   │     │ Consumer (!Clone):  │
│   cached_head       │     │   local_head        │
│   shared: Arc       │     │   shared: Arc       │
└─────────────────────┘     └─────────────────────┘

Producers compete via CAS on the tail index. After claiming a slot, the producer waits for the slot’s turn counter to indicate it’s writable, writes the data, then advances the turn to signal readiness.

The consumer checks the turn counter to know when data is ready, reads it, then advances the turn for the next producer lap.

§Turn Counter Protocol

For slot at index i on lap turn:

  • turn * 2: Slot is ready for producer to write
  • turn * 2 + 1: Slot contains data, ready for consumer

§Example

use nexus_queue::mpsc;
use std::thread;

let (mut tx, mut rx) = mpsc::bounded::<u64>(1024);

let mut tx2 = tx.clone();
let h1 = thread::spawn(move || {
    for i in 0..100 {
        while tx.push(i).is_err() { std::hint::spin_loop(); }
    }
});
let h2 = thread::spawn(move || {
    for i in 100..200 {
        while tx2.push(i).is_err() { std::hint::spin_loop(); }
    }
});

let mut received = 0;
while received < 200 {
    if rx.pop().is_some() { received += 1; }
}

h1.join().unwrap();
h2.join().unwrap();

Structs§

Consumer
The consumer endpoint of an MPSC queue.
Producer
The producer endpoint of an MPSC queue.

Functions§

bounded
Creates a bounded MPSC queue with the given capacity.