use super::{FlavorImpl, FlavorNew, FlavorSelect, Queue, Token};
use core::cell::UnsafeCell;
use core::mem::{needs_drop, MaybeUninit};
use crossbeam_utils::CachePadded;
use std::ptr;
use std::sync::atomic::{
AtomicU64,
Ordering::{self, Acquire, SeqCst},
};
pub type OneSpsc<T> = OneSp<T, false>;
pub struct OneSp<T, const MC: bool> {
pos: CachePadded<AtomicU64>,
slots: [Slot<T>; 2],
}
unsafe impl<T, const MC: bool> Sync for OneSp<T, MC> {}
unsafe impl<T, const MC: bool> Send for OneSp<T, MC> {}
impl<T, const MC: bool> OneSp<T, MC> {
#[inline]
pub fn new() -> Self {
Self { pos: CachePadded::new(AtomicU64::new(0)), slots: [Slot::init(), Slot::init()] }
}
#[inline(always)]
fn unpack(pos: u64) -> (u32, u32) {
let head = (pos >> 32) as u32;
let tail = pos as u32;
(head, tail)
}
#[inline(always)]
fn pack(head: u32, tail: u32) -> u64 {
((head as u64) << 32) | (tail as u64)
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
let pos = self.pos.load(SeqCst);
let (head, tail) = Self::unpack(pos);
head == tail
}
#[inline(always)]
pub fn len(&self) -> usize {
if self.is_empty() {
0
} else {
1
}
}
#[inline]
fn try_push(&self, value: *const T, order: Ordering) -> bool {
let pos = self.pos.load(order);
let (head, tail) = Self::unpack(pos);
if head == tail {
let new_tail = tail.wrapping_add(1);
let index = new_tail & 0x1;
self.slots[index as usize].write(value);
let new_pos = Self::pack(head, new_tail);
self.pos.store(new_pos, Ordering::SeqCst);
true
} else {
false
}
}
}
impl<T, const MC: bool> Drop for OneSp<T, MC> {
fn drop(&mut self) {
if needs_drop::<T>() {
let pos = *self.pos.get_mut();
let (head, tail) = Self::unpack(pos);
if head != tail {
let index = tail & 0x1;
self.slots[index as usize].drop();
}
}
}
}
impl<T> OneSpsc<T> {
#[inline(always)]
fn _read(&self, slot: &Slot<T>, next_head: u32) -> T {
let new_pos = Self::pack(next_head, next_head);
self.pos.store(new_pos, SeqCst);
slot.read()
}
#[inline(always)]
fn _pop(&self, order: Ordering) -> Option<T> {
if let Some(tail) = self.start_read(order) {
let index = (tail & 0x1) as usize;
Some(self._read(&self.slots[index], tail))
} else {
None
}
}
#[inline(always)]
fn start_read(&self, order: Ordering) -> Option<u32> {
let pos = self.pos.load(order);
let (head, tail) = Self::unpack(pos);
if head == tail {
None
} else {
debug_assert_eq!(head.wrapping_add(1), tail);
Some(tail)
}
}
}
struct Slot<T> {
value: UnsafeCell<MaybeUninit<T>>,
}
impl<T> Slot<T> {
#[inline]
fn init() -> Self {
Self { value: UnsafeCell::new(MaybeUninit::uninit()) }
}
#[inline(always)]
fn write(&self, value: *const T) {
unsafe { (*self.value.get()).write(ptr::read(value)) };
}
#[inline(always)]
fn read(&self) -> T {
unsafe { self.value.get().read().assume_init() }
}
#[inline(always)]
fn drop(&self) {
unsafe { self.value.get().read().assume_init_drop() };
}
}
impl<T> Queue for OneSpsc<T> {
type Item = T;
#[inline(always)]
fn len(&self) -> usize {
if self.is_empty() {
0
} else {
1
}
}
#[inline(always)]
fn is_empty(&self) -> bool {
Self::is_empty(self)
}
#[inline(always)]
fn capacity(&self) -> Option<usize> {
Some(1)
}
#[inline(always)]
fn is_full(&self) -> bool {
!Self::is_empty(self)
}
#[inline(always)]
fn pop(&self) -> Option<T> {
self._pop(Ordering::SeqCst)
}
#[inline]
fn push(&self, value: T) -> Result<(), T> {
let item = MaybeUninit::new(value);
if self.try_push(item.as_ptr(), Ordering::SeqCst) {
Ok(())
} else {
Err(unsafe { item.assume_init_read() })
}
}
}
impl<T> FlavorImpl for OneSpsc<T> {
#[inline(always)]
fn try_send(&self, item: &MaybeUninit<T>) -> bool {
self.try_push(item.as_ptr(), Acquire)
}
#[inline(always)]
fn try_send_oneshot(&self, item: *const T) -> Option<bool> {
Some(self.try_push(item, SeqCst))
}
#[inline(always)]
fn try_recv(&self) -> Option<T> {
self._pop(Ordering::Acquire)
}
#[inline]
fn try_recv_final(&self) -> Option<T> {
self._pop(Ordering::SeqCst)
}
#[inline]
fn backoff_limit(&self) -> u16 {
crate::backoff::MAX_LIMIT
}
#[inline]
fn may_direct_copy(&self) -> bool {
false
}
}
impl<T> FlavorNew for OneSpsc<T> {
#[inline]
fn new() -> Self {
OneSpsc::new()
}
}
impl<T> FlavorSelect for OneSpsc<T> {
#[inline]
fn try_select(&self, final_check: bool) -> Option<Token> {
if let Some(tail) =
self.start_read(if final_check { Ordering::SeqCst } else { Ordering::Acquire })
{
let index = (tail & 0x1) as usize;
Some(Token::new(&self.slots[index] as *const Slot<T> as *const u8, tail as usize))
} else {
None
}
}
#[inline(always)]
fn read_with_token(&self, token: Token) -> T {
let slot: &Slot<T> = unsafe { &*token.pos.cast::<Slot<T>>() };
self._read(slot, token.stamp as u32)
}
}