use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::utils::CacheAligned;
use kovan::{Atomic, RetiredNode, Shared, pin};
const SEGMENT_SIZE: usize = 32;
const SLOT_EMPTY: usize = 0;
const SLOT_WRITING: usize = 1;
const SLOT_WRITTEN: usize = 2;
const SLOT_CONSUMED: usize = 3;
struct Slot<T> {
state: AtomicUsize,
value: UnsafeCell<MaybeUninit<T>>,
}
#[repr(C)]
struct Segment<T> {
retired: RetiredNode,
slots: [Slot<T>; SEGMENT_SIZE],
next: Atomic<Segment<T>>,
id: usize,
}
impl<T> Segment<T> {
fn new(id: usize) -> Segment<T> {
let slots = core::array::from_fn(|_| Slot {
state: AtomicUsize::new(SLOT_EMPTY),
value: UnsafeCell::new(MaybeUninit::uninit()),
});
Segment {
retired: RetiredNode::new(),
slots,
next: Atomic::null(),
id,
}
}
}
pub struct SegQueue<T> {
head: CacheAligned<Atomic<Segment<T>>>,
tail: CacheAligned<Atomic<Segment<T>>>,
len: AtomicUsize,
}
unsafe impl<T: Send> Send for SegQueue<T> {}
unsafe impl<T: Send> Sync for SegQueue<T> {}
impl<T: 'static> Default for SegQueue<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: 'static> SegQueue<T> {
pub fn new() -> SegQueue<T> {
let segment = Box::into_raw(Box::new(Segment::new(0)));
let head = Atomic::new(segment);
let tail = Atomic::new(segment);
SegQueue {
head: CacheAligned::new(head),
tail: CacheAligned::new(tail),
len: AtomicUsize::new(0),
}
}
pub fn push(&self, value: T) {
let backoff = crossbeam_utils::Backoff::new();
let guard = pin();
loop {
let tail = self.tail.load(Ordering::Acquire, &guard);
if tail.is_null() {
continue;
}
let t = unsafe { tail.as_ref().unwrap() };
let next = t.next.load(Ordering::Acquire, &guard);
if !next.is_null() {
let _ = self.tail.compare_exchange(
tail,
next,
Ordering::SeqCst,
Ordering::Relaxed,
&guard,
);
continue;
}
for i in 0..SEGMENT_SIZE {
let slot = &t.slots[i];
let state = slot.state.load(Ordering::Acquire);
if state == SLOT_EMPTY {
if slot
.state
.compare_exchange(
SLOT_EMPTY,
SLOT_WRITING,
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_ok()
{
unsafe {
slot.value.get().write(MaybeUninit::new(value));
}
slot.state.store(SLOT_WRITTEN, Ordering::Release);
self.len.fetch_add(1, Ordering::Relaxed);
return;
}
} else if state == SLOT_WRITING {
continue;
}
}
let new_segment = Box::into_raw(Box::new(Segment::new(t.id + 1)));
let new_shared = unsafe { Shared::from_raw(new_segment) };
let null_shared = unsafe { Shared::from_raw(ptr::null_mut()) };
if t.next
.compare_exchange(
null_shared,
new_shared,
Ordering::SeqCst,
Ordering::Relaxed,
&guard,
)
.is_ok()
{
let _ = self.tail.compare_exchange(
tail,
new_shared,
Ordering::SeqCst,
Ordering::Relaxed,
&guard,
);
} else {
unsafe { drop(Box::from_raw(new_segment)) };
}
backoff.snooze();
}
}
pub fn pop(&self) -> Option<T> {
let backoff = crossbeam_utils::Backoff::new();
let guard = pin();
loop {
let head = self.head.load(Ordering::Acquire, &guard);
let h = unsafe { head.as_ref().unwrap() };
let mut all_consumed = true;
for i in 0..SEGMENT_SIZE {
let slot = &h.slots[i];
let state = slot.state.load(Ordering::Acquire);
if state == SLOT_WRITTEN {
if slot
.state
.compare_exchange(
SLOT_WRITTEN,
SLOT_CONSUMED,
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_ok()
{
let value = unsafe { slot.value.get().read().assume_init() };
self.len.fetch_sub(1, Ordering::Relaxed);
return Some(value);
}
} else if state == SLOT_EMPTY {
let next = h.next.load(Ordering::Acquire, &guard);
if next.is_null() {
return None;
}
}
if slot.state.load(Ordering::Acquire) != SLOT_CONSUMED {
all_consumed = false;
}
}
let next = h.next.load(Ordering::Acquire, &guard);
if all_consumed
&& !next.is_null()
&& self
.head
.compare_exchange(head, next, Ordering::SeqCst, Ordering::Relaxed, &guard)
.is_ok()
{
unsafe { kovan::retire(head.as_raw()) };
continue;
}
let current_head = self.head.load(Ordering::Acquire, &guard);
if current_head != head {
continue;
}
if h.next.load(Ordering::Acquire, &guard).is_null() {
return None;
}
backoff.snooze();
}
}
#[inline]
pub fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl<T> Drop for SegQueue<T> {
fn drop(&mut self) {
let guard = pin();
let mut current = self.head.load(Ordering::Relaxed, &guard);
while !current.is_null() {
unsafe {
let segment_ptr = current.as_raw();
let segment = &*segment_ptr;
let next = segment.next.load(Ordering::Relaxed, &guard);
for i in 0..SEGMENT_SIZE {
if segment.slots[i].state.load(Ordering::Relaxed) == SLOT_WRITTEN {
ptr::drop_in_place(segment.slots[i].value.get() as *mut T);
}
}
drop(Box::from_raw(segment_ptr));
current = next;
}
}
}
}