use crate::internal::cache_padded::CachePadded;
use parking_lot::Mutex;
use core::fmt;
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{AtomicPtr, AtomicU8, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
const BLOCK_CAPACITY: usize = 32;
const SLOT_EMPTY: u8 = 0;
const SLOT_WRITING: u8 = 1;
const SLOT_READY: u8 = 2;
struct Slot<T> {
state: AtomicU8,
value: UnsafeCell<MaybeUninit<T>>,
}
impl<T> Default for Slot<T> {
fn default() -> Self {
Self {
state: AtomicU8::new(SLOT_EMPTY),
value: UnsafeCell::new(MaybeUninit::uninit()),
}
}
}
pub(crate) struct Block<T> {
next: AtomicPtr<Block<T>>,
write_index: CachePadded<AtomicUsize>,
slots: [Slot<T>; BLOCK_CAPACITY],
}
unsafe impl<T: Send> Send for Block<T> {}
unsafe impl<T: Send> Sync for Block<T> {}
impl<T> fmt::Debug for Block<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Block")
.field("write_index", &self.write_index.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
impl<T> Block<T> {
fn new() -> Self {
let slots: [Slot<T>; BLOCK_CAPACITY] = unsafe {
let mut slots_uninit: [MaybeUninit<Slot<T>>; BLOCK_CAPACITY] =
MaybeUninit::uninit().assume_init();
for i in 0..BLOCK_CAPACITY {
slots_uninit[i].write(Slot::default());
}
ptr::read(&slots_uninit as *const _ as *const [Slot<T>; BLOCK_CAPACITY])
};
Self {
next: AtomicPtr::new(ptr::null_mut()),
write_index: CachePadded::new(AtomicUsize::new(0)),
slots,
}
}
}
pub(crate) struct UnboundedBlockQueue<T> {
head: CachePadded<Mutex<Arc<Block<T>>>>,
tail: CachePadded<UnsafeCell<Arc<Block<T>>>>,
tail_cursor: CachePadded<UnsafeCell<usize>>,
}
unsafe impl<T: Send> Send for UnboundedBlockQueue<T> {}
unsafe impl<T: Send> Sync for UnboundedBlockQueue<T> {}
impl<T> UnboundedBlockQueue<T> {
pub(crate) fn new() -> Self {
let block = Arc::new(Block::new());
Self {
head: CachePadded::new(Mutex::new(block.clone())),
tail: CachePadded::new(UnsafeCell::new(block)),
tail_cursor: CachePadded::new(UnsafeCell::new(0)),
}
}
pub(crate) fn push(&self, value: T, block_cache: &mut Option<Arc<Block<T>>>) {
loop {
if let Some(block) = block_cache {
let idx = block.write_index.fetch_add(1, Ordering::Relaxed);
if idx < BLOCK_CAPACITY {
let slot = &block.slots[idx];
slot.state.store(SLOT_WRITING, Ordering::Release);
unsafe {
(*slot.value.get()).write(value);
}
slot.state.store(SLOT_READY, Ordering::Release);
return;
} else {
*block_cache = None;
}
}
{
let mut head_guard = self.head.lock(); let current_head = head_guard.clone();
if current_head.write_index.load(Ordering::Relaxed) < BLOCK_CAPACITY {
*block_cache = Some(current_head);
continue;
}
let new_block = Arc::new(Block::new());
let new_block_ptr = Arc::into_raw(new_block.clone()) as *mut Block<T>;
current_head.next.store(new_block_ptr, Ordering::Release);
*head_guard = new_block.clone();
*block_cache = Some(new_block);
}
}
}
pub(crate) fn push_batch<I: Iterator<Item = T>>(
&self,
iter: &mut I,
count: usize,
block_cache: &mut Option<Arc<Block<T>>>,
) {
let mut remaining = count;
while remaining > 0 {
if let Some(block) = block_cache {
let start = block.write_index.fetch_add(remaining, Ordering::Relaxed);
if start < BLOCK_CAPACITY {
let chunk = remaining.min(BLOCK_CAPACITY - start);
for i in 0..chunk {
let slot = &block.slots[start + i];
slot.state.store(SLOT_WRITING, Ordering::Release);
unsafe {
(*slot.value.get())
.write(iter.next().expect("push_batch: iterator yielded fewer than `count` items"));
}
slot.state.store(SLOT_READY, Ordering::Release);
}
remaining -= chunk;
if remaining == 0 {
return;
}
}
*block_cache = None;
}
{
let mut head_guard = self.head.lock();
let current_head = head_guard.clone();
if current_head.write_index.load(Ordering::Relaxed) < BLOCK_CAPACITY {
*block_cache = Some(current_head);
continue;
}
let new_block = Arc::new(Block::new());
let new_block_ptr = Arc::into_raw(new_block.clone()) as *mut Block<T>;
current_head.next.store(new_block_ptr, Ordering::Release);
*head_guard = new_block.clone();
*block_cache = Some(new_block);
}
}
}
pub(crate) fn pop_batch(&self, out: &mut Vec<T>, max: usize) -> usize {
if max == 0 {
return 0;
}
let mut popped = 0;
unsafe {
let tail_arc_ptr = self.tail.get();
let cursor_ptr = self.tail_cursor.get();
while popped < max {
let cursor = *cursor_ptr;
if cursor == BLOCK_CAPACITY {
let next_ptr = {
let tail_block = &**tail_arc_ptr;
tail_block.next.load(Ordering::Acquire)
};
if next_ptr.is_null() {
break;
}
let next_block_arc = Arc::from_raw(next_ptr);
ptr::replace(tail_arc_ptr, next_block_arc);
*cursor_ptr = 0;
continue;
}
let tail_block = &**tail_arc_ptr;
let slot = &tail_block.slots[cursor];
let state = slot.state.load(Ordering::Acquire);
if state == SLOT_READY {
out.push(slot.value.get().read().assume_init());
*cursor_ptr = cursor + 1;
popped += 1;
} else if state == SLOT_WRITING {
if popped > 0 {
break;
}
let mut spin_count = 0;
while slot.state.load(Ordering::Acquire) != SLOT_READY {
if spin_count > 100 {
thread::yield_now();
} else {
std::hint::spin_loop();
}
spin_count += 1;
}
out.push(slot.value.get().read().assume_init());
*cursor_ptr = cursor + 1;
popped += 1;
} else {
break;
}
}
}
popped
}
pub(crate) fn pop(&self) -> Option<T> {
unsafe {
let tail_arc_ptr = self.tail.get(); let cursor_ptr = self.tail_cursor.get();
let cursor = *cursor_ptr;
if cursor == BLOCK_CAPACITY {
let next_ptr = {
let tail_block = &**tail_arc_ptr;
tail_block.next.load(Ordering::Acquire)
};
if !next_ptr.is_null() {
let next_block_arc = Arc::from_raw(next_ptr);
ptr::replace(tail_arc_ptr, next_block_arc);
*cursor_ptr = 0;
return self.pop();
} else {
return None;
}
}
let tail_block = &**tail_arc_ptr;
let slot = &tail_block.slots[cursor];
let state = slot.state.load(Ordering::Acquire);
if state == SLOT_READY {
let value = slot.value.get().read().assume_init();
*cursor_ptr = cursor + 1;
return Some(value);
} else if state == SLOT_WRITING {
let mut spin_count = 0;
while slot.state.load(Ordering::Acquire) != SLOT_READY {
if spin_count > 100 {
thread::yield_now();
} else {
std::hint::spin_loop();
}
spin_count += 1;
}
let value = slot.value.get().read().assume_init();
*cursor_ptr = cursor + 1;
return Some(value);
} else {
return None;
}
}
}
pub(crate) fn is_empty(&self) -> bool {
unsafe {
let tail_block = &**self.tail.get();
let cursor = *self.tail_cursor.get();
if cursor == BLOCK_CAPACITY {
let next = tail_block.next.load(Ordering::Acquire);
return next.is_null();
}
let state = tail_block.slots[cursor].state.load(Ordering::Acquire);
state == SLOT_EMPTY
}
}
}
impl<T> Drop for UnboundedBlockQueue<T> {
fn drop(&mut self) {
unsafe {
let tail_ptr = self.tail.get();
let tail_ref = &*tail_ptr;
Arc::increment_strong_count(Arc::as_ptr(tail_ref));
let mut current_arc = ptr::read(tail_ptr);
let mut cursor = *self.tail_cursor.get();
loop {
for i in cursor..BLOCK_CAPACITY {
let slot = ¤t_arc.slots[i];
if slot.state.load(Ordering::Acquire) == SLOT_READY {
(*slot.value.get()).assume_init_drop();
} else {
break; }
}
let next_ptr = current_arc.next.load(Ordering::Acquire);
if !next_ptr.is_null() {
let next_arc = Arc::from_raw(next_ptr);
current_arc = next_arc;
cursor = 0;
} else {
break;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use std::sync::Mutex;
use rand::Rng;
#[test]
fn test_simple_push_pop() {
let q = UnboundedBlockQueue::new();
let mut cache = None;
q.push(1, &mut cache);
q.push(2, &mut cache);
assert_eq!(q.pop(), Some(1));
assert_eq!(q.pop(), Some(2));
assert_eq!(q.pop(), None);
}
#[test]
fn test_block_rotation() {
let q = UnboundedBlockQueue::new();
let mut cache = None;
for i in 0..BLOCK_CAPACITY {
q.push(i, &mut cache);
}
q.push(999, &mut cache);
for i in 0..BLOCK_CAPACITY {
assert_eq!(q.pop(), Some(i));
}
assert_eq!(q.pop(), Some(999));
assert_eq!(q.pop(), None);
}
#[test]
fn test_concurrent_push_pop() {
let q = Arc::new(UnboundedBlockQueue::new());
let q_clone = q.clone();
let t = thread::spawn(move || {
let mut cache = None;
for i in 0..1000 {
q_clone.push(i, &mut cache);
}
});
let mut received = 0;
for _ in 0..1000 {
loop {
if let Some(_) = q.pop() {
received += 1;
break;
}
thread::yield_now();
}
}
t.join().unwrap();
assert_eq!(received, 1000);
assert_eq!(q.pop(), None);
}
#[test]
fn test_drop_cleanup() {
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
struct Dropper(usize);
impl Drop for Dropper {
fn drop(&mut self) {
DROP_COUNT.fetch_add(1, Ordering::Relaxed);
}
}
let q = UnboundedBlockQueue::new();
let mut cache = None;
q.push(Dropper(1), &mut cache);
q.push(Dropper(2), &mut cache);
q.push(Dropper(3), &mut cache);
assert_eq!(q.pop().map(|d| d.0), Some(1));
drop(q);
assert_eq!(DROP_COUNT.load(Ordering::Relaxed), 3);
}
#[test]
fn test_multi_producer_race_to_allocate_block() {
const THREADS: usize = 8;
const ITEMS_PER_THREAD: usize = BLOCK_CAPACITY * 4;
let q = Arc::new(UnboundedBlockQueue::new());
let barrier = Arc::new(Barrier::new(THREADS));
let mut handles = vec![];
for i in 0..THREADS {
let q = q.clone();
let b = barrier.clone();
handles.push(thread::spawn(move || {
let mut cache = None;
b.wait();
for j in 0..ITEMS_PER_THREAD {
q.push(i * ITEMS_PER_THREAD + j, &mut cache);
}
}));
}
for h in handles {
h.join().unwrap();
}
let mut results = Vec::new();
while let Some(val) = q.pop() {
results.push(val);
}
assert_eq!(results.len(), THREADS * ITEMS_PER_THREAD);
results.sort();
for i in 0..THREADS {
for j in 0..ITEMS_PER_THREAD {
assert!(results.contains(&(i * ITEMS_PER_THREAD + j)));
}
}
}
#[test]
fn test_drop_cleanup_across_multiple_blocks() {
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
struct Dropper;
impl Drop for Dropper {
fn drop(&mut self) {
DROP_COUNT.fetch_add(1, Ordering::Relaxed);
}
}
DROP_COUNT.store(0, Ordering::Relaxed);
let q = UnboundedBlockQueue::new();
let mut cache = None;
let count = (BLOCK_CAPACITY * 2) + (BLOCK_CAPACITY / 2);
for _ in 0..count {
q.push(Dropper, &mut cache);
}
for _ in 0..(BLOCK_CAPACITY / 2) {
q.pop();
}
assert_eq!(DROP_COUNT.load(Ordering::Relaxed), BLOCK_CAPACITY / 2);
drop(q);
assert_eq!(DROP_COUNT.load(Ordering::Relaxed), count);
}
#[test]
fn test_is_empty_correctness() {
let q = UnboundedBlockQueue::new();
let mut cache = None;
assert!(q.is_empty());
q.push(1, &mut cache);
assert!(!q.is_empty());
q.pop();
assert!(q.is_empty());
for i in 0..BLOCK_CAPACITY {
q.push(i, &mut cache);
}
for _ in 0..BLOCK_CAPACITY {
q.pop();
}
assert!(q.is_empty());
q.push(100, &mut cache);
assert!(!q.is_empty());
q.pop();
assert!(q.is_empty());
}
#[test]
fn test_push_batch_pop_batch_block_boundaries() {
for &n in &[1usize, BLOCK_CAPACITY - 1, BLOCK_CAPACITY, BLOCK_CAPACITY + 1, BLOCK_CAPACITY * 3 + 5] {
let q = UnboundedBlockQueue::new();
let mut cache = None;
let items: Vec<usize> = (0..n).collect();
let mut iter = items.into_iter();
q.push_batch(&mut iter, n, &mut cache);
assert!(iter.next().is_none());
let mut out = Vec::new();
let mut total = 0;
while total < n {
let k = q.pop_batch(&mut out, 7);
assert!(k > 0, "queue ran dry early at {total}/{n}");
total += k;
}
assert_eq!(out, (0..n).collect::<Vec<_>>());
assert_eq!(q.pop_batch(&mut out, 7), 0);
assert!(q.is_empty());
}
}
#[test]
fn test_push_batch_interleaved_with_single_push() {
let q = UnboundedBlockQueue::new();
let mut cache = None;
q.push(0usize, &mut cache);
let mut iter = vec![1usize, 2, 3].into_iter();
q.push_batch(&mut iter, 3, &mut cache);
q.push(4, &mut cache);
let mut out = Vec::new();
assert_eq!(q.pop_batch(&mut out, 10), 5);
assert_eq!(out, vec![0, 1, 2, 3, 4]);
}
#[test]
fn test_concurrent_push_batch_totals() {
const THREADS: usize = 4;
const BATCHES: usize = if cfg!(miri) { 5 } else { 200 };
const BATCH_SIZE: usize = 13;
let q = Arc::new(UnboundedBlockQueue::new());
let barrier = Arc::new(Barrier::new(THREADS));
let mut handles = vec![];
for t in 0..THREADS {
let q = q.clone();
let b = barrier.clone();
handles.push(thread::spawn(move || {
let mut cache = None;
b.wait();
for batch in 0..BATCHES {
let base = (t * BATCHES + batch) * BATCH_SIZE;
let items: Vec<usize> = (base..base + BATCH_SIZE).collect();
let mut iter = items.into_iter();
q.push_batch(&mut iter, BATCH_SIZE, &mut cache);
}
}));
}
for h in handles {
h.join().unwrap();
}
let mut out = Vec::new();
loop {
let k = q.pop_batch(&mut out, 64);
if k == 0 {
break;
}
}
assert_eq!(out.len(), THREADS * BATCHES * BATCH_SIZE);
out.sort();
for (i, v) in out.iter().enumerate() {
assert_eq!(*v, i);
}
}
#[test]
fn test_stress_random_ops() {
const NUM_PRODUCERS: usize = 8;
const NUM_OPS_PER_PRODUCER: usize = if cfg!(miri) { 100 } else { 50_000 };
let q = Arc::new(UnboundedBlockQueue::new());
let consumer_lock = Arc::new(Mutex::new(q.clone()));
let mut handles = vec![];
for i in 0..NUM_PRODUCERS {
let q = q.clone();
handles.push(thread::spawn(move || {
let mut rng = rand::rng();
let mut cache = None;
for _ in 0..NUM_OPS_PER_PRODUCER {
if rng.random_bool(0.01) {
thread::yield_now();
}
q.push(i, &mut cache); }
}));
}
let received_counts = Arc::new(Mutex::new(vec![0; NUM_PRODUCERS]));
let receiver_finished = Arc::new(std::sync::atomic::AtomicBool::new(false));
let c_lock = consumer_lock.clone();
let counts = received_counts.clone();
let finished = receiver_finished.clone();
let consumer_handle = thread::spawn(move || loop {
let q = c_lock.lock().unwrap();
match q.pop() {
Some(producer_id) => {
let mut c = counts.lock().unwrap();
c[producer_id] += 1;
}
None => {
if finished.load(Ordering::Acquire) {
if q.is_empty() {
break;
}
} else {
drop(q);
thread::yield_now();
}
}
}
});
for h in handles {
h.join().unwrap();
}
receiver_finished.store(true, Ordering::Release);
consumer_handle.join().unwrap();
let final_counts = received_counts.lock().unwrap();
for (i, count) in final_counts.iter().enumerate() {
assert_eq!(
*count, NUM_OPS_PER_PRODUCER,
"Producer {} count mismatch",
i
);
}
}
}