use std::cmp;
use std::mem::MaybeUninit;
use std::sync::atomic::Ordering;
use crate::loom_exports::cell::UnsafeCell;
use crate::loom_exports::debug_or_loom_assert_eq;
use crate::loom_exports::sync::atomic::AtomicUsize;
use crossbeam_utils::CachePadded;
struct Slot<T> {
stamp: AtomicUsize,
value: UnsafeCell<MaybeUninit<T>>,
}
pub(super) struct Queue<T> {
enqueue_pos: CachePadded<AtomicUsize>,
dequeue_pos: CachePadded<UnsafeCell<usize>>,
buffer: Box<[Slot<T>]>,
right_mask: usize,
closed_channel_mask: usize,
}
impl<T> Queue<T> {
pub(super) fn new(capacity: usize) -> Queue<T> {
assert!(capacity >= 1, "the capacity must be 1 or greater");
assert!(
capacity <= (1 << (usize::BITS - 1)),
"the capacity may not exceed {}",
1usize << (usize::BITS - 1)
);
let mut buffer = Vec::with_capacity(capacity);
for i in 0..capacity {
buffer.push(Slot {
stamp: AtomicUsize::new(i),
value: UnsafeCell::new(MaybeUninit::uninit()),
});
}
let closed_channel_mask = capacity.next_power_of_two();
let right_mask = (closed_channel_mask << 1).wrapping_sub(1);
Queue {
enqueue_pos: CachePadded::new(AtomicUsize::new(0)),
dequeue_pos: CachePadded::new(UnsafeCell::new(0)),
buffer: buffer.into(),
right_mask,
closed_channel_mask,
}
}
pub(super) fn push(&self, value: T) -> Result<(), PushError<T>> {
let mut enqueue_pos = self.enqueue_pos.load(Ordering::Relaxed);
loop {
if enqueue_pos & self.closed_channel_mask != 0 {
return Err(PushError::Closed(value));
}
let slot = &self.buffer[enqueue_pos & self.right_mask];
let stamp = slot.stamp.load(Ordering::Acquire);
let stamp_delta = stamp.wrapping_sub(enqueue_pos) as isize;
match stamp_delta.cmp(&0) {
cmp::Ordering::Equal => {
match self.enqueue_pos.compare_exchange_weak(
enqueue_pos,
self.next_queue_pos(enqueue_pos),
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => {
unsafe {
slot.value.with_mut(|v| *v = MaybeUninit::new(value));
}
slot.stamp.store(stamp.wrapping_add(1), Ordering::Release);
return Ok(());
}
Err(pos) => {
enqueue_pos = pos;
}
}
}
cmp::Ordering::Less => {
return Err(PushError::Full(value));
}
cmp::Ordering::Greater => {
enqueue_pos = self.enqueue_pos.load(Ordering::Relaxed);
}
}
}
}
pub(super) unsafe fn pop(&self) -> Result<T, PopError> {
let dequeue_pos = self.dequeue_pos.with(|p| *p);
let slot = &self.buffer[dequeue_pos & self.right_mask];
let stamp = slot.stamp.load(Ordering::Acquire);
if dequeue_pos != stamp {
debug_or_loom_assert_eq!(stamp, dequeue_pos + 1);
self.dequeue_pos
.with_mut(|p| *p = self.next_queue_pos(dequeue_pos));
let value = slot.value.with(|v| v.read().assume_init());
slot.stamp
.store(stamp.wrapping_add(self.right_mask), Ordering::Release);
Ok(value)
} else {
if self.enqueue_pos.load(Ordering::Relaxed) == (dequeue_pos | self.closed_channel_mask)
{
Err(PopError::Closed)
} else {
Err(PopError::Empty)
}
}
}
pub(super) fn close(&self) {
self.enqueue_pos
.fetch_or(self.closed_channel_mask, Ordering::Relaxed);
}
pub(super) fn is_closed(&self) -> bool {
self.enqueue_pos.load(Ordering::Relaxed) & self.closed_channel_mask != 0
}
#[inline]
fn next_queue_pos(&self, queue_pos: usize) -> usize {
debug_or_loom_assert_eq!(queue_pos & self.closed_channel_mask, 0);
let new_queue_pos = queue_pos + 1;
let new_index = new_queue_pos & self.right_mask;
if new_index < self.buffer.len() {
new_queue_pos
} else {
let sequence_increment = self.right_mask + 1;
let sequence_count = queue_pos & !self.right_mask;
sequence_count.wrapping_add(sequence_increment)
}
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
unsafe { while self.pop().is_ok() {} }
}
}
unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}
#[derive(Debug, Eq, PartialEq)]
pub(super) enum PushError<T> {
Full(T),
Closed(T),
}
#[derive(Debug, Eq, PartialEq)]
pub(super) enum PopError {
Empty,
Closed,
}
#[cfg(all(test, any(not(miri), not(tachyonix_ignore_leaks))))]
mod test_utils {
use super::*;
pub(super) struct Producer<T> {
inner: crate::loom_exports::sync::Arc<Queue<T>>,
}
impl<T> Producer<T> {
pub(super) fn push(&self, value: T) -> Result<(), PushError<T>> {
self.inner.push(value)
}
pub(super) fn close(&self) {
self.inner.close();
}
#[cfg(not(tachyonix_loom))]
pub(super) fn is_closed(&self) -> bool {
self.inner.is_closed()
}
}
impl<T> Clone for Producer<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
pub(super) struct Consumer<T> {
inner: crate::loom_exports::sync::Arc<Queue<T>>,
}
impl<T> Consumer<T> {
pub(super) fn pop(&mut self) -> Result<T, PopError> {
unsafe { self.inner.pop() }
}
pub(super) fn close(&self) {
self.inner.close();
}
}
pub(super) fn queue<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
let inner = crate::loom_exports::sync::Arc::new(Queue::new(capacity));
let producer = Producer {
inner: inner.clone(),
};
let consumer = Consumer {
inner: inner.clone(),
};
(producer, consumer)
}
}
#[cfg(all(test, not(tachyonix_loom), any(not(miri), not(tachyonix_ignore_leaks))))]
mod tests {
use super::test_utils::*;
use super::*;
use std::thread;
#[test]
fn queue_closed_by_sender() {
let (p, mut c) = queue(3);
assert_eq!(c.pop(), Err(PopError::Empty));
p.push(42).unwrap();
p.close();
assert_eq!(c.pop(), Ok(42));
assert_eq!(c.pop(), Err(PopError::Closed));
}
#[test]
fn queue_closed_by_consumer() {
let (p, mut c) = queue(3);
assert_eq!(p.is_closed(), false);
p.push(42).unwrap();
c.close();
assert_eq!(p.is_closed(), true);
assert_eq!(p.push(13), Err(PushError::Closed(13)));
assert_eq!(c.pop(), Ok(42));
assert_eq!(c.pop(), Err(PopError::Closed));
}
fn queue_spsc(capacity: usize) {
const COUNT: usize = if cfg!(miri) { 50 } else { 100_000 };
let (p, mut c) = queue(capacity);
let th_pop = thread::spawn(move || {
for i in 0..COUNT {
loop {
if let Ok(x) = c.pop() {
assert_eq!(x, i);
break;
}
}
}
assert!(c.pop().is_err());
});
let th_push = thread::spawn(move || {
for i in 0..COUNT {
while p.push(i).is_err() {}
}
});
th_pop.join().unwrap();
th_push.join().unwrap();
}
#[test]
fn queue_spsc_capacity_one() {
queue_spsc(1);
}
#[test]
fn queue_spsc_capacity_two() {
queue_spsc(2);
}
#[test]
fn queue_spsc_capacity_three() {
queue_spsc(3);
}
fn queue_mpsc(capacity: usize) {
const COUNT: usize = if cfg!(miri) { 20 } else { 25_000 };
const PRODUCER_THREADS: usize = 4;
let (p, mut c) = queue(capacity);
let mut push_count = Vec::<usize>::new();
push_count.resize_with(COUNT, Default::default);
let th_push: Vec<_> = (0..PRODUCER_THREADS)
.map(|_| {
let p = p.clone();
thread::spawn(move || {
for i in 0..COUNT {
while p.push(i).is_err() {}
}
})
})
.collect();
for _ in 0..COUNT * PRODUCER_THREADS {
let n = loop {
if let Ok(x) = c.pop() {
break x;
}
};
push_count[n] += 1;
}
for c in push_count {
assert_eq!(c, PRODUCER_THREADS);
}
for th in th_push {
th.join().unwrap();
}
}
#[test]
fn queue_mpsc_capacity_one() {
queue_mpsc(1);
}
#[test]
fn queue_mpsc_capacity_two() {
queue_mpsc(2);
}
#[test]
fn queue_mpsc_capacity_three() {
queue_mpsc(3);
}
}
#[cfg(all(test, tachyonix_loom))]
mod tests {
use super::test_utils::*;
use super::*;
use loom::model::Builder;
use loom::sync::atomic::AtomicUsize;
use loom::sync::Arc;
use loom::thread;
fn loom_queue_push_pop(
max_push_per_thread: usize,
producer_thread_count: usize,
capacity: usize,
preemption_bound: usize,
) {
let mut builder = Builder::new();
if builder.preemption_bound.is_none() {
builder.preemption_bound = Some(preemption_bound);
}
builder.check(move || {
let (producer, mut consumer) = queue(capacity);
let push_count = Arc::new(AtomicUsize::new(0));
let producer_threads: Vec<_> = (0..producer_thread_count)
.map(|_| {
let producer = producer.clone();
let push_count = push_count.clone();
thread::spawn(move || {
for i in 0..max_push_per_thread {
match producer.push(i) {
Ok(()) => {}
Err(PushError::Full(_)) => {
assert!(capacity < max_push_per_thread * producer_thread_count);
break;
}
Err(PushError::Closed(_)) => panic!(),
}
push_count.fetch_add(1, Ordering::Relaxed);
}
})
})
.collect();
let mut pop_count = 0;
while consumer.pop().is_ok() {
pop_count += 1;
}
for th in producer_threads {
th.join().unwrap();
}
while consumer.pop().is_ok() {
pop_count += 1;
}
assert_eq!(push_count.load(Ordering::Relaxed), pop_count);
});
}
#[test]
fn loom_queue_push_pop_overflow() {
const DEFAULT_PREEMPTION_BOUND: usize = 5;
loom_queue_push_pop(2, 2, 3, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_queue_push_pop_no_overflow() {
const DEFAULT_PREEMPTION_BOUND: usize = 5;
loom_queue_push_pop(2, 2, 5, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_queue_push_pop_capacity_power_of_two_overflow() {
const DEFAULT_PREEMPTION_BOUND: usize = 5;
loom_queue_push_pop(3, 2, 4, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_queue_push_pop_capacity_one_overflow() {
const DEFAULT_PREEMPTION_BOUND: usize = 5;
loom_queue_push_pop(2, 2, 1, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_queue_push_pop_capacity_power_of_two_no_overflow() {
const DEFAULT_PREEMPTION_BOUND: usize = 5;
loom_queue_push_pop(2, 2, 4, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_queue_push_pop_three_producers() {
const DEFAULT_PREEMPTION_BOUND: usize = 2;
loom_queue_push_pop(2, 3, 3, DEFAULT_PREEMPTION_BOUND);
}
#[test]
fn loom_queue_drop_items() {
const CAPACITY: usize = 3;
const PRODUCER_THREAD_COUNT: usize = 3;
const DEFAULT_PREEMPTION_BOUND: usize = 4;
let mut builder = Builder::new();
if builder.preemption_bound.is_none() {
builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
}
builder.check(move || {
let (producer, consumer) = queue(CAPACITY);
let item = std::sync::Arc::new(());
let producer_threads: Vec<_> = (0..PRODUCER_THREAD_COUNT)
.map(|_| {
thread::spawn({
let item = item.clone();
let producer = producer.clone();
move || {
producer.push(item).unwrap();
}
})
})
.collect();
for th in producer_threads {
th.join().unwrap();
}
drop(producer);
drop(consumer);
assert_eq!(std::sync::Arc::strong_count(&item), 1);
});
}
#[test]
fn loom_queue_closed_by_producer() {
const CAPACITY: usize = 3;
const DEFAULT_PREEMPTION_BOUND: usize = 3;
let mut builder = Builder::new();
if builder.preemption_bound.is_none() {
builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
}
builder.check(move || {
let (producer, mut consumer) = queue(CAPACITY);
let th_push_close = thread::spawn({
let producer = producer.clone();
move || {
producer.push(7).unwrap();
producer.close();
}
});
let th_try_push = thread::spawn({
let producer = producer.clone();
move || match producer.push(13) {
Ok(()) => true,
Err(PushError::Closed(13)) => false,
_ => panic!(),
}
});
let mut sum = 0;
loop {
match consumer.pop() {
Ok(v) => sum += v,
Err(PopError::Closed) => break,
Err(PopError::Empty) => {}
};
thread::yield_now();
}
th_push_close.join().unwrap();
let try_push_success = th_try_push.join().unwrap();
if try_push_success {
assert_eq!(sum, 7 + 13);
} else {
assert_eq!(sum, 7);
}
});
}
#[test]
fn loom_queue_closed_by_consumer() {
const CAPACITY: usize = 3;
const DEFAULT_PREEMPTION_BOUND: usize = 3;
let mut builder = Builder::new();
if builder.preemption_bound.is_none() {
builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
}
builder.check(move || {
let (producer, mut consumer) = queue(CAPACITY);
let th_try_push1 = thread::spawn({
let producer = producer.clone();
move || match producer.push(7) {
Ok(()) => true,
Err(PushError::Closed(7)) => false,
_ => panic!(),
}
});
let th_try_push2 = thread::spawn({
let producer = producer.clone();
move || match producer.push(13) {
Ok(()) => true,
Err(PushError::Closed(13)) => false,
_ => panic!(),
}
});
let mut sum = 0;
consumer.close();
loop {
match consumer.pop() {
Ok(v) => sum += v,
Err(PopError::Closed) => break,
Err(PopError::Empty) => {}
};
thread::yield_now();
}
let try_push1_success = th_try_push1.join().unwrap();
let try_push2_success = th_try_push2.join().unwrap();
match (try_push1_success, try_push2_success) {
(true, true) => assert_eq!(sum, 7 + 13),
(true, false) => assert_eq!(sum, 7),
(false, true) => assert_eq!(sum, 13),
(false, false) => {}
}
});
}
}