use crate::loom::{
cell::UnsafeCell,
sync::atomic::{AtomicPtr, AtomicUsize},
thread,
};
use std::mem::MaybeUninit;
use std::ops;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
pub(crate) struct Block<T> {
start_index: usize,
next: AtomicPtr<Block<T>>,
ready_slots: AtomicUsize,
observed_tail_position: UnsafeCell<usize>,
values: Values<T>,
}
pub(crate) enum Read<T> {
Value(T),
Closed,
}
struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);
use super::BLOCK_CAP;
const BLOCK_MASK: usize = !(BLOCK_CAP - 1);
const SLOT_MASK: usize = BLOCK_CAP - 1;
const RELEASED: usize = 1 << BLOCK_CAP;
const TX_CLOSED: usize = RELEASED << 1;
const READY_MASK: usize = RELEASED - 1;
#[inline(always)]
pub(crate) fn start_index(slot_index: usize) -> usize {
BLOCK_MASK & slot_index
}
#[inline(always)]
pub(crate) fn offset(slot_index: usize) -> usize {
SLOT_MASK & slot_index
}
impl<T> Block<T> {
pub(crate) fn new(start_index: usize) -> Block<T> {
Block {
start_index,
next: AtomicPtr::new(ptr::null_mut()),
ready_slots: AtomicUsize::new(0),
observed_tail_position: UnsafeCell::new(0),
values: unsafe { Values::uninitialized() },
}
}
pub(crate) fn is_at_index(&self, index: usize) -> bool {
debug_assert!(offset(index) == 0);
self.start_index == index
}
pub(crate) fn distance(&self, other_index: usize) -> usize {
debug_assert!(offset(other_index) == 0);
other_index.wrapping_sub(self.start_index) / BLOCK_CAP
}
pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
let offset = offset(slot_index);
let ready_bits = self.ready_slots.load(Acquire);
if !is_ready(ready_bits, offset) {
if is_tx_closed(ready_bits) {
return Some(Read::Closed);
}
return None;
}
let value = self.values[offset].with(|ptr| ptr::read(ptr));
Some(Read::Value(value.assume_init()))
}
pub(crate) unsafe fn write(&self, slot_index: usize, value: T) {
let slot_offset = offset(slot_index);
self.values[slot_offset].with_mut(|ptr| {
ptr::write(ptr, MaybeUninit::new(value));
});
self.set_ready(slot_offset);
}
pub(crate) unsafe fn tx_close(&self) {
self.ready_slots.fetch_or(TX_CLOSED, Release);
}
pub(crate) unsafe fn reclaim(&mut self) {
self.start_index = 0;
self.next = AtomicPtr::new(ptr::null_mut());
self.ready_slots = AtomicUsize::new(0);
}
pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
self.observed_tail_position
.with_mut(|ptr| *ptr = tail_position);
self.ready_slots.fetch_or(RELEASED, Release);
}
fn set_ready(&self, slot: usize) {
let mask = 1 << slot;
self.ready_slots.fetch_or(mask, Release);
}
pub(crate) fn is_final(&self) -> bool {
self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
}
pub(crate) fn observed_tail_position(&self) -> Option<usize> {
if 0 == RELEASED & self.ready_slots.load(Acquire) {
None
} else {
Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
}
}
pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
let ret = NonNull::new(self.next.load(ordering));
debug_assert!(unsafe {
ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
.unwrap_or(true)
});
ret
}
pub(crate) unsafe fn try_push(
&self,
block: &mut NonNull<Block<T>>,
ordering: Ordering,
) -> Result<(), NonNull<Block<T>>> {
block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
let next_ptr = self
.next
.compare_and_swap(ptr::null_mut(), block.as_ptr(), ordering);
match NonNull::new(next_ptr) {
Some(next_ptr) => Err(next_ptr),
None => Ok(()),
}
}
pub(crate) fn grow(&self) -> NonNull<Block<T>> {
let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
let next = NonNull::new(self.next.compare_and_swap(
ptr::null_mut(),
new_block.as_ptr(),
AcqRel,
));
let next = match next {
Some(next) => next,
None => {
return new_block;
}
};
let mut curr = next;
loop {
let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel) };
curr = match actual {
Ok(_) => {
return next;
}
Err(curr) => curr,
};
thread::yield_now();
}
}
}
fn is_ready(bits: usize, slot: usize) -> bool {
let mask = 1 << slot;
mask == mask & bits
}
fn is_tx_closed(bits: usize) -> bool {
TX_CLOSED == bits & TX_CLOSED
}
impl<T> Values<T> {
unsafe fn uninitialized() -> Values<T> {
let mut vals = MaybeUninit::uninit();
if_loom! {
let p = vals.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
for i in 0..BLOCK_CAP {
p.add(i)
.write(UnsafeCell::new(MaybeUninit::uninit()));
}
}
Values(vals.assume_init())
}
}
impl<T> ops::Index<usize> for Values<T> {
type Output = UnsafeCell<MaybeUninit<T>>;
fn index(&self, index: usize) -> &Self::Output {
self.0.index(index)
}
}