use crate::internal::cache_padded::CachePadded;
use parking_lot::Mutex;
use core::fmt;
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{AtomicPtr, AtomicU8, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
const BLOCK_CAPACITY: usize = 32;
const SLOT_EMPTY: u8 = 0;
const SLOT_WRITING: u8 = 1;
const SLOT_READY: u8 = 2;
struct Slot<T> {
state: AtomicU8,
value: UnsafeCell<MaybeUninit<T>>,
}
impl<T> Default for Slot<T> {
fn default() -> Self {
Self {
state: AtomicU8::new(SLOT_EMPTY),
value: UnsafeCell::new(MaybeUninit::uninit()),
}
}
}
pub(crate) struct Block<T> {
next: AtomicPtr<Block<T>>,
write_index: CachePadded<AtomicUsize>,
slots: [Slot<T>; BLOCK_CAPACITY],
}
unsafe impl<T: Send> Send for Block<T> {}
unsafe impl<T: Send> Sync for Block<T> {}
impl<T> fmt::Debug for Block<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Block")
.field("write_index", &self.write_index.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
impl<T> Block<T> {
fn new() -> Self {
let slots: [Slot<T>; BLOCK_CAPACITY] = unsafe {
let mut slots_uninit: [MaybeUninit<Slot<T>>; BLOCK_CAPACITY] =
MaybeUninit::uninit().assume_init();
for i in 0..BLOCK_CAPACITY {
slots_uninit[i].write(Slot::default());
}
ptr::read(&slots_uninit as *const _ as *const [Slot<T>; BLOCK_CAPACITY])
};
Self {
next: AtomicPtr::new(ptr::null_mut()),
write_index: CachePadded::new(AtomicUsize::new(0)),
slots,
}
}
}
pub(crate) struct UnboundedBlockQueue<T> {
head: CachePadded<Mutex<Arc<Block<T>>>>,
tail: CachePadded<UnsafeCell<Arc<Block<T>>>>,
tail_cursor: CachePadded<UnsafeCell<usize>>,
}
unsafe impl<T: Send> Send for UnboundedBlockQueue<T> {}
unsafe impl<T: Send> Sync for UnboundedBlockQueue<T> {}
impl<T> UnboundedBlockQueue<T> {
pub(crate) fn new() -> Self {
let block = Arc::new(Block::new());
Self {
head: CachePadded::new(Mutex::new(block.clone())),
tail: CachePadded::new(UnsafeCell::new(block)),
tail_cursor: CachePadded::new(UnsafeCell::new(0)),
}
}
pub(crate) fn push(&self, value: T, block_cache: &mut Option<Arc<Block<T>>>) {
loop {
if let Some(block) = block_cache {
let idx = block.write_index.fetch_add(1, Ordering::Relaxed);
if idx < BLOCK_CAPACITY {
let slot = &block.slots[idx];
slot.state.store(SLOT_WRITING, Ordering::Release);
unsafe {
(*slot.value.get()).write(value);
}
slot.state.store(SLOT_READY, Ordering::Release);
return;
} else {
*block_cache = None;
}
}
{
let mut head_guard = self.head.lock(); let current_head = head_guard.clone();
if current_head.write_index.load(Ordering::Relaxed) < BLOCK_CAPACITY {
*block_cache = Some(current_head);
continue;
}
let new_block = Arc::new(Block::new());
let new_block_ptr = Arc::into_raw(new_block.clone()) as *mut Block<T>;
current_head.next.store(new_block_ptr, Ordering::Release);
*head_guard = new_block.clone();
*block_cache = Some(new_block);
}
}
}
pub(crate) fn pop(&self) -> Option<T> {
unsafe {
let tail_arc_ptr = self.tail.get(); let cursor_ptr = self.tail_cursor.get();
let cursor = *cursor_ptr;
if cursor == BLOCK_CAPACITY {
let next_ptr = {
let tail_block = &**tail_arc_ptr;
tail_block.next.load(Ordering::Acquire)
};
if !next_ptr.is_null() {
let next_block_arc = Arc::from_raw(next_ptr);
ptr::replace(tail_arc_ptr, next_block_arc);
*cursor_ptr = 0;
return self.pop();
} else {
return None;
}
}
let tail_block = &**tail_arc_ptr;
let slot = &tail_block.slots[cursor];
let state = slot.state.load(Ordering::Acquire);
if state == SLOT_READY {
let value = slot.value.get().read().assume_init();
*cursor_ptr = cursor + 1;
return Some(value);
} else if state == SLOT_WRITING {
let mut spin_count = 0;
while slot.state.load(Ordering::Acquire) != SLOT_READY {
if spin_count > 100 {
thread::yield_now();
} else {
std::hint::spin_loop();
}
spin_count += 1;
}
let value = slot.value.get().read().assume_init();
*cursor_ptr = cursor + 1;
return Some(value);
} else {
return None;
}
}
}
pub(crate) fn is_empty(&self) -> bool {
unsafe {
let tail_block = &**self.tail.get();
let cursor = *self.tail_cursor.get();
if cursor == BLOCK_CAPACITY {
let next = tail_block.next.load(Ordering::Acquire);
return next.is_null();
}
let state = tail_block.slots[cursor].state.load(Ordering::Acquire);
state == SLOT_EMPTY
}
}
}
impl<T> Drop for UnboundedBlockQueue<T> {
fn drop(&mut self) {
unsafe {
let tail_ptr = self.tail.get();
let tail_ref = &*tail_ptr;
Arc::increment_strong_count(Arc::as_ptr(tail_ref));
let mut current_arc = ptr::read(tail_ptr);
let mut cursor = *self.tail_cursor.get();
loop {
for i in cursor..BLOCK_CAPACITY {
let slot = ¤t_arc.slots[i];
if slot.state.load(Ordering::Acquire) == SLOT_READY {
(*slot.value.get()).assume_init_drop();
} else {
break; }
}
let next_ptr = current_arc.next.load(Ordering::Acquire);
if !next_ptr.is_null() {
let next_arc = Arc::from_raw(next_ptr);
current_arc = next_arc;
cursor = 0;
} else {
break;
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Barrier};
use std::thread;
use std::sync::Mutex;
use rand::Rng;
#[test]
fn test_simple_push_pop() {
let q = UnboundedBlockQueue::new();
let mut cache = None;
q.push(1, &mut cache);
q.push(2, &mut cache);
assert_eq!(q.pop(), Some(1));
assert_eq!(q.pop(), Some(2));
assert_eq!(q.pop(), None);
}
#[test]
fn test_block_rotation() {
let q = UnboundedBlockQueue::new();
let mut cache = None;
for i in 0..BLOCK_CAPACITY {
q.push(i, &mut cache);
}
q.push(999, &mut cache);
for i in 0..BLOCK_CAPACITY {
assert_eq!(q.pop(), Some(i));
}
assert_eq!(q.pop(), Some(999));
assert_eq!(q.pop(), None);
}
#[test]
fn test_concurrent_push_pop() {
let q = Arc::new(UnboundedBlockQueue::new());
let q_clone = q.clone();
let t = thread::spawn(move || {
let mut cache = None;
for i in 0..1000 {
q_clone.push(i, &mut cache);
}
});
let mut received = 0;
for _ in 0..1000 {
loop {
if let Some(_) = q.pop() {
received += 1;
break;
}
thread::yield_now();
}
}
t.join().unwrap();
assert_eq!(received, 1000);
assert_eq!(q.pop(), None);
}
#[test]
fn test_drop_cleanup() {
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
struct Dropper(usize);
impl Drop for Dropper {
fn drop(&mut self) {
DROP_COUNT.fetch_add(1, Ordering::Relaxed);
}
}
let q = UnboundedBlockQueue::new();
let mut cache = None;
q.push(Dropper(1), &mut cache);
q.push(Dropper(2), &mut cache);
q.push(Dropper(3), &mut cache);
assert_eq!(q.pop().map(|d| d.0), Some(1));
drop(q);
assert_eq!(DROP_COUNT.load(Ordering::Relaxed), 3);
}
#[test]
fn test_multi_producer_race_to_allocate_block() {
const THREADS: usize = 8;
const ITEMS_PER_THREAD: usize = BLOCK_CAPACITY * 4;
let q = Arc::new(UnboundedBlockQueue::new());
let barrier = Arc::new(Barrier::new(THREADS));
let mut handles = vec![];
for i in 0..THREADS {
let q = q.clone();
let b = barrier.clone();
handles.push(thread::spawn(move || {
let mut cache = None;
b.wait();
for j in 0..ITEMS_PER_THREAD {
q.push(i * ITEMS_PER_THREAD + j, &mut cache);
}
}));
}
for h in handles {
h.join().unwrap();
}
let mut results = Vec::new();
while let Some(val) = q.pop() {
results.push(val);
}
assert_eq!(results.len(), THREADS * ITEMS_PER_THREAD);
results.sort();
for i in 0..THREADS {
for j in 0..ITEMS_PER_THREAD {
assert!(results.contains(&(i * ITEMS_PER_THREAD + j)));
}
}
}
#[test]
fn test_drop_cleanup_across_multiple_blocks() {
static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
struct Dropper;
impl Drop for Dropper {
fn drop(&mut self) {
DROP_COUNT.fetch_add(1, Ordering::Relaxed);
}
}
DROP_COUNT.store(0, Ordering::Relaxed);
let q = UnboundedBlockQueue::new();
let mut cache = None;
let count = (BLOCK_CAPACITY * 2) + (BLOCK_CAPACITY / 2);
for _ in 0..count {
q.push(Dropper, &mut cache);
}
for _ in 0..(BLOCK_CAPACITY / 2) {
q.pop();
}
assert_eq!(DROP_COUNT.load(Ordering::Relaxed), BLOCK_CAPACITY / 2);
drop(q);
assert_eq!(DROP_COUNT.load(Ordering::Relaxed), count);
}
#[test]
fn test_is_empty_correctness() {
let q = UnboundedBlockQueue::new();
let mut cache = None;
assert!(q.is_empty());
q.push(1, &mut cache);
assert!(!q.is_empty());
q.pop();
assert!(q.is_empty());
for i in 0..BLOCK_CAPACITY {
q.push(i, &mut cache);
}
for _ in 0..BLOCK_CAPACITY {
q.pop();
}
assert!(q.is_empty());
q.push(100, &mut cache);
assert!(!q.is_empty());
q.pop();
assert!(q.is_empty());
}
#[test]
fn test_stress_random_ops() {
const NUM_PRODUCERS: usize = 8;
const NUM_OPS_PER_PRODUCER: usize = if cfg!(miri) { 100 } else { 50_000 };
let q = Arc::new(UnboundedBlockQueue::new());
let consumer_lock = Arc::new(Mutex::new(q.clone()));
let mut handles = vec![];
for i in 0..NUM_PRODUCERS {
let q = q.clone();
handles.push(thread::spawn(move || {
let mut rng = rand::rng();
let mut cache = None;
for _ in 0..NUM_OPS_PER_PRODUCER {
if rng.random_bool(0.01) {
thread::yield_now();
}
q.push(i, &mut cache); }
}));
}
let received_counts = Arc::new(Mutex::new(vec![0; NUM_PRODUCERS]));
let receiver_finished = Arc::new(std::sync::atomic::AtomicBool::new(false));
let c_lock = consumer_lock.clone();
let counts = received_counts.clone();
let finished = receiver_finished.clone();
let consumer_handle = thread::spawn(move || loop {
let q = c_lock.lock().unwrap();
match q.pop() {
Some(producer_id) => {
let mut c = counts.lock().unwrap();
c[producer_id] += 1;
}
None => {
if finished.load(Ordering::Acquire) {
if q.is_empty() {
break;
}
} else {
drop(q);
thread::yield_now();
}
}
}
});
for h in handles {
h.join().unwrap();
}
receiver_finished.store(true, Ordering::Release);
consumer_handle.join().unwrap();
let final_counts = received_counts.lock().unwrap();
for (i, count) in final_counts.iter().enumerate() {
assert_eq!(
*count, NUM_OPS_PER_PRODUCER,
"Producer {} count mismatch",
i
);
}
}
}