#![allow(unused)]
use std::cell::Cell;
use std::error::Error;
use std::fmt;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering;
use crossbeam_utils::CachePadded;
use crate::loom_exports::cell::UnsafeCell;
use crate::loom_exports::sync::atomic::{AtomicBool, AtomicPtr};
use crate::loom_exports::sync::Arc;
const SEGMENT_LEN: usize = 32;
struct Slot<T> {
has_value: AtomicBool,
value: UnsafeCell<MaybeUninit<T>>,
}
impl<T> Default for Slot<T> {
fn default() -> Self {
Slot {
has_value: AtomicBool::new(false),
value: UnsafeCell::new(MaybeUninit::uninit()),
}
}
}
struct Segment<T> {
next_segment: AtomicPtr<Segment<T>>,
data: [Slot<T>; SEGMENT_LEN],
}
impl<T> Segment<T> {
fn allocate_new() -> NonNull<Self> {
let segment = Self {
next_segment: AtomicPtr::new(ptr::null_mut()),
data: Default::default(),
};
unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(segment))) }
}
}
struct Head<T> {
segment: NonNull<Segment<T>>,
next_read_idx: usize,
}
struct Tail<T> {
segment: NonNull<Segment<T>>,
next_write_idx: usize,
}
struct Queue<T> {
head: CachePadded<UnsafeCell<Head<T>>>,
tail: CachePadded<UnsafeCell<Tail<T>>>,
}
impl<T> Queue<T> {
fn new() -> Self {
let segment = Segment::allocate_new();
let head = Head {
segment,
next_read_idx: 0,
};
let tail = Tail {
segment,
next_write_idx: 0,
};
Self {
head: CachePadded::new(UnsafeCell::new(head)),
tail: CachePadded::new(UnsafeCell::new(tail)),
}
}
unsafe fn push(&self, value: T) {
let tail = self.tail.with_mut(|p| &mut *p);
if tail.next_write_idx == SEGMENT_LEN {
let old_segment = tail.segment;
tail.segment = Segment::allocate_new();
old_segment
.as_ref()
.next_segment
.store(tail.segment.as_ptr(), Ordering::Release);
tail.next_write_idx = 0;
}
let data = &tail.segment.as_ref().data[tail.next_write_idx];
data.value.with_mut(|p| (*p).write(value));
data.has_value.store(true, Ordering::Release);
tail.next_write_idx += 1;
}
unsafe fn pop(&self) -> Option<T> {
let head = self.head.with_mut(|p| &mut *p);
if head.next_read_idx == SEGMENT_LEN {
let next_segment = head.segment.as_ref().next_segment.load(Ordering::Acquire);
let next_segment = NonNull::new(next_segment)?;
let _ = Box::from_raw(head.segment.as_ptr());
head.segment = next_segment;
head.next_read_idx = 0;
}
let data = &head.segment.as_ref().data[head.next_read_idx];
if !data.has_value.load(Ordering::Acquire) {
return None;
}
let value = data.value.with(|p| (*p).assume_init_read());
head.next_read_idx += 1;
Some(value)
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
while self.pop().is_some() {}
let head = self.head.with_mut(|p| &mut *p);
let _ = Box::from_raw(head.segment.as_ptr());
}
}
}
unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}
impl<T> UnwindSafe for Queue<T> {}
impl<T> RefUnwindSafe for Queue<T> {}
pub(crate) struct Producer<T> {
queue: Arc<Queue<T>>,
_non_sync_phantom: PhantomData<Cell<()>>,
}
impl<T> Producer<T> {
pub(crate) fn push(&self, value: T) -> Result<(), PushError> {
if Arc::strong_count(&self.queue) == 1 {
return Err(PushError {});
}
unsafe { self.queue.push(value) };
Ok(())
}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub(crate) struct PushError {}
impl fmt::Display for PushError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "sending message into a closed mailbox")
}
}
impl Error for PushError {}
pub(crate) struct Consumer<T> {
queue: Arc<Queue<T>>,
_non_sync_phantom: PhantomData<Cell<()>>,
}
impl<T> Consumer<T> {
pub(crate) fn pop(&self) -> Option<T> {
unsafe { self.queue.pop() }
}
}
pub(crate) fn spsc_queue<T>() -> (Producer<T>, Consumer<T>) {
let queue = Arc::new(Queue::new());
let producer = Producer {
queue: queue.clone(),
_non_sync_phantom: PhantomData,
};
let consumer = Consumer {
queue,
_non_sync_phantom: PhantomData,
};
(producer, consumer)
}
#[cfg(all(test, not(asynchronix_loom)))]
mod tests {
use super::*;
use std::thread;
#[test]
fn spsc_queue_basic() {
const VALUE_COUNT: usize = if cfg!(miri) { 1000 } else { 100_000 };
let (producer, consumer) = spsc_queue();
let th = thread::spawn(move || {
for i in 0..VALUE_COUNT {
let value = loop {
if let Some(v) = consumer.pop() {
break v;
}
};
assert_eq!(value, i);
}
});
for i in 0..VALUE_COUNT {
producer.push(i).unwrap();
}
th.join().unwrap();
}
}
#[cfg(all(test, asynchronix_loom))]
mod tests {
use super::*;
use loom::model::Builder;
use loom::thread;
#[test]
fn loom_spsc_queue_basic() {
const DEFAULT_PREEMPTION_BOUND: usize = 4;
const VALUE_COUNT: usize = 10;
let mut builder = Builder::new();
if builder.preemption_bound.is_none() {
builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
}
builder.check(move || {
let (producer, consumer) = spsc_queue();
let th = thread::spawn(move || {
let mut value = 0;
for _ in 0..VALUE_COUNT {
if let Some(v) = consumer.pop() {
assert_eq!(v, value);
value += 1;
}
}
});
for i in 0..VALUE_COUNT {
let _ = producer.push(i);
}
th.join().unwrap();
});
}
#[test]
fn loom_spsc_queue_new_segment() {
const DEFAULT_PREEMPTION_BOUND: usize = 4;
const VALUE_COUNT_BEFORE: usize = 5;
const VALUE_COUNT_AFTER: usize = 5;
let mut builder = Builder::new();
if builder.preemption_bound.is_none() {
builder.preemption_bound = Some(DEFAULT_PREEMPTION_BOUND);
}
builder.check(move || {
let (producer, consumer) = spsc_queue();
for i in 0..(SEGMENT_LEN - VALUE_COUNT_BEFORE) {
producer.push(i).unwrap();
consumer.pop();
}
let th = thread::spawn(move || {
let mut value = SEGMENT_LEN - VALUE_COUNT_BEFORE;
for _ in (SEGMENT_LEN - VALUE_COUNT_BEFORE)..(SEGMENT_LEN + VALUE_COUNT_AFTER) {
if let Some(v) = consumer.pop() {
assert_eq!(v, value);
value += 1;
}
}
});
for i in (SEGMENT_LEN - VALUE_COUNT_BEFORE)..(SEGMENT_LEN + VALUE_COUNT_AFTER) {
let _ = producer.push(i);
}
th.join().unwrap();
});
}
}