subms-mpsc-queue 0.4.0

submillisecond.com cookbook recipe - concurrency: subms-mpsc-queue. Vyukov-style multi-producer single-consumer linked queue with dangling-tail handling.
Documentation
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Instant;

use subms_mpsc_queue::{MpscQueue, PopResult};

fn pop_eventually<T>(q: &mut MpscQueue<T>) -> Option<T> {
    let start = Instant::now();
    loop {
        match q.try_pop() {
            PopResult::Some(v) => return Some(v),
            PopResult::Inconsistent => {
                if start.elapsed().as_secs() > 5 {
                    panic!("stuck in Inconsistent");
                }
                std::hint::spin_loop();
            }
            PopResult::Empty => return None,
        }
    }
}

#[test]
fn push_and_pop_a_single_value() {
    let mut q: MpscQueue<u32> = MpscQueue::new();
    q.push(7);
    assert_eq!(pop_eventually(&mut q), Some(7));
    assert_eq!(pop_eventually(&mut q), None);
}

#[test]
fn empty_pop_returns_empty() {
    let mut q: MpscQueue<u32> = MpscQueue::new();
    assert!(matches!(q.try_pop(), PopResult::Empty));
    assert!(matches!(q.try_pop(), PopResult::Empty));
}

#[test]
fn fifo_order_single_producer() {
    let mut q: MpscQueue<u32> = MpscQueue::new();
    for i in 0..100u32 {
        q.push(i);
    }
    for i in 0..100u32 {
        assert_eq!(pop_eventually(&mut q), Some(i));
    }
    assert_eq!(pop_eventually(&mut q), None);
}

#[test]
fn alternating_push_pop() {
    let mut q: MpscQueue<u32> = MpscQueue::new();
    for i in 0..1000u32 {
        q.push(i);
        assert_eq!(pop_eventually(&mut q), Some(i));
    }
}

#[test]
fn drain_then_refill() {
    let mut q: MpscQueue<u32> = MpscQueue::new();
    for i in 0..50u32 {
        q.push(i);
    }
    for i in 0..50u32 {
        assert_eq!(pop_eventually(&mut q), Some(i));
    }
    assert_eq!(pop_eventually(&mut q), None);
    for i in 100..120u32 {
        q.push(i);
    }
    for i in 100..120u32 {
        assert_eq!(pop_eventually(&mut q), Some(i));
    }
}

#[test]
fn multi_producer_no_lost_items() {
    let producers = 4usize;
    let per_producer = 100_000usize;
    let q: Arc<MpscQueue<u64>> = Arc::new(MpscQueue::new());
    let mut handles = Vec::new();
    for tid in 0..producers as u64 {
        let q = q.clone();
        handles.push(thread::spawn(move || {
            for i in 0..per_producer as u64 {
                q.push((tid << 32) | i);
            }
        }));
    }
    let consumer_q = q.clone();
    let consumer = thread::spawn(move || {
        let q_ptr = Arc::as_ptr(&consumer_q) as *mut MpscQueue<u64>;
        let q_mut = unsafe { &mut *q_ptr };
        let mut counts = [0usize; 4];
        let mut total = 0usize;
        while total < producers * per_producer {
            match q_mut.try_pop() {
                PopResult::Some(v) => {
                    let tid = (v >> 32) as usize;
                    counts[tid] += 1;
                    total += 1;
                }
                _ => std::hint::spin_loop(),
            }
        }
        counts
    });
    for h in handles {
        h.join().unwrap();
    }
    let counts = consumer.join().unwrap();
    for c in counts {
        assert_eq!(c, per_producer);
    }
}

#[test]
fn higher_producer_contention() {
    let producers = 8usize;
    let per_producer = 25_000usize;
    let q: Arc<MpscQueue<u64>> = Arc::new(MpscQueue::new());
    let mut handles = Vec::new();
    for tid in 0..producers as u64 {
        let q = q.clone();
        handles.push(thread::spawn(move || {
            for i in 0..per_producer as u64 {
                q.push((tid << 32) | i);
            }
        }));
    }
    let consumer_q = q.clone();
    let consumer = thread::spawn(move || {
        let q_ptr = Arc::as_ptr(&consumer_q) as *mut MpscQueue<u64>;
        let q_mut = unsafe { &mut *q_ptr };
        let mut total = 0usize;
        while total < producers * per_producer {
            if let PopResult::Some(_) = q_mut.try_pop() {
                total += 1;
            }
        }
        total
    });
    for h in handles {
        h.join().unwrap();
    }
    assert_eq!(consumer.join().unwrap(), producers * per_producer);
}

struct DropCounted(Arc<AtomicUsize>);
impl Drop for DropCounted {
    fn drop(&mut self) {
        self.0.fetch_add(1, Ordering::Relaxed);
    }
}

#[test]
fn drops_pending_items_on_destruction() {
    let counter = Arc::new(AtomicUsize::new(0));
    {
        let q: MpscQueue<DropCounted> = MpscQueue::new();
        q.push(DropCounted(counter.clone()));
        q.push(DropCounted(counter.clone()));
        q.push(DropCounted(counter.clone()));
    }
    assert_eq!(counter.load(Ordering::Relaxed), 3);
}

#[test]
fn popped_items_drop_only_once() {
    let counter = Arc::new(AtomicUsize::new(0));
    let mut q: MpscQueue<DropCounted> = MpscQueue::new();
    q.push(DropCounted(counter.clone()));
    let v = pop_eventually(&mut q).unwrap();
    assert_eq!(
        counter.load(Ordering::Relaxed),
        0,
        "not dropped while owned"
    );
    drop(v);
    assert_eq!(
        counter.load(Ordering::Relaxed),
        1,
        "dropped exactly once after release"
    );
}

#[test]
fn large_single_thread_workload() {
    let mut q: MpscQueue<u64> = MpscQueue::new();
    let n = 100_000u64;
    for i in 0..n {
        q.push(i);
    }
    let mut next = 0u64;
    loop {
        match q.try_pop() {
            PopResult::Some(v) => {
                assert_eq!(v, next);
                next += 1;
            }
            PopResult::Inconsistent => continue,
            PopResult::Empty => break,
        }
    }
    assert_eq!(next, n);
}

#[test]
fn default_constructor_works() {
    let mut q: MpscQueue<u32> = MpscQueue::default();
    q.push(1);
    assert_eq!(pop_eventually(&mut q), Some(1));
}