crossfire 3.1.10

channels for async and threads
Documentation
use super::{FlavorImpl, FlavorNew, FlavorSelect, Queue, Token};
use core::cell::UnsafeCell;
use core::mem::{needs_drop, MaybeUninit};
use crossbeam_utils::CachePadded;
use std::ptr;
use std::sync::atomic::{
    AtomicU64,
    Ordering::{self, Acquire, SeqCst},
};

/// This is a spsc version of `One` without stamp.
///
/// The sender side allow to push and drop it's own previous value, if receivers had not consumed it.
pub type OneSpsc<T> = OneSp<T, false>;

///// This is a spmc version of `One` without stamp, allow replace() on the sender side.
/////
///// The sender side allow to push and drop it's own previous value, if receivers had not consumed it.
/////
///// NOTE: use lockless technique inspired by the OFLIT paper, miri will probably report data racing issue,
///// but it's intentional.
///// This module cannot not separate pop into start_read/read interface,
///// so it cannot implement Flavor interface.
//type OneSpmc<T> = OneSp<T, true>;

pub struct OneSp<T, const MC: bool> {
    pos: CachePadded<AtomicU64>,

    /// The value in this slot.
    slots: [Slot<T>; 2],
}

unsafe impl<T, const MC: bool> Sync for OneSp<T, MC> {}
unsafe impl<T, const MC: bool> Send for OneSp<T, MC> {}

impl<T, const MC: bool> OneSp<T, MC> {
    #[inline]
    pub fn new() -> Self {
        Self { pos: CachePadded::new(AtomicU64::new(0)), slots: [Slot::init(), Slot::init()] }
    }

    #[inline(always)]
    fn unpack(pos: u64) -> (u32, u32) {
        let head = (pos >> 32) as u32;
        let tail = pos as u32;
        (head, tail)
    }

    #[inline(always)]
    fn pack(head: u32, tail: u32) -> u64 {
        ((head as u64) << 32) | (tail as u64)
    }

    #[inline(always)]
    pub fn is_empty(&self) -> bool {
        let pos = self.pos.load(SeqCst);
        let (head, tail) = Self::unpack(pos);
        head == tail
    }

    #[inline(always)]
    pub fn len(&self) -> usize {
        if self.is_empty() {
            0
        } else {
            1
        }
    }

    #[inline]
    fn try_push(&self, value: *const T, order: Ordering) -> bool {
        let pos = self.pos.load(order);
        let (head, tail) = Self::unpack(pos);
        if head == tail {
            let new_tail = tail.wrapping_add(1);
            let index = new_tail & 0x1;
            self.slots[index as usize].write(value);
            let new_pos = Self::pack(head, new_tail);
            self.pos.store(new_pos, Ordering::SeqCst);
            true
        } else {
            false
        }
    }
}

impl<T, const MC: bool> Drop for OneSp<T, MC> {
    fn drop(&mut self) {
        if needs_drop::<T>() {
            let pos = *self.pos.get_mut();
            let (head, tail) = Self::unpack(pos);
            if head != tail {
                let index = tail & 0x1;
                self.slots[index as usize].drop();
            }
        }
    }
}

impl<T> OneSpsc<T> {
    #[inline(always)]
    fn _read(&self, slot: &Slot<T>, next_head: u32) -> T {
        // NOTE: This is only valid for SPSC (not for Spmc)
        // Because we have two slot, the sender will write to next index,
        // it's safe to update the pos before we read, so that sender may begin to write
        let new_pos = Self::pack(next_head, next_head);
        self.pos.store(new_pos, SeqCst);
        slot.read()
    }

    #[inline(always)]
    fn _pop(&self, order: Ordering) -> Option<T> {
        if let Some(tail) = self.start_read(order) {
            let index = (tail & 0x1) as usize;
            Some(self._read(&self.slots[index], tail))
        } else {
            None
        }
    }

    #[inline(always)]
    fn start_read(&self, order: Ordering) -> Option<u32> {
        let pos = self.pos.load(order);
        let (head, tail) = Self::unpack(pos);
        if head == tail {
            None
        } else {
            debug_assert_eq!(head.wrapping_add(1), tail);
            Some(tail)
        }
    }
}

struct Slot<T> {
    value: UnsafeCell<MaybeUninit<T>>,
}

impl<T> Slot<T> {
    #[inline]
    fn init() -> Self {
        Self { value: UnsafeCell::new(MaybeUninit::uninit()) }
    }

    #[inline(always)]
    fn write(&self, value: *const T) {
        unsafe { (*self.value.get()).write(ptr::read(value)) };
    }

    //    #[inline(always)]
    //    fn read_into(&self, dest: *mut T) {
    //        unsafe {
    //            let src_ptr = (*self.value.get()).as_ptr();
    //            ptr::copy_nonoverlapping(src_ptr, dest, 1);
    //        }
    //    }

    #[inline(always)]
    fn read(&self) -> T {
        unsafe { self.value.get().read().assume_init() }
    }

    #[inline(always)]
    fn drop(&self) {
        unsafe { self.value.get().read().assume_init_drop() };
    }
}

/*
impl<T> OneSpmc<T> {
    #[inline]
    pub fn replace(&self, value: T) {
        let item = MaybeUninit::new(value);
        self._replace(item.as_ptr());
    }

    /// return Ok(true) on ok, Ok(false) on full, Err(()) to spin
    #[inline(always)]
    fn _replace(&self, value: *const T) {
        // No one will advance tail except me
        let mut pos = self.pos.load(Acquire);
        let (mut head, tail) = Self::unpack(pos);
        let new_tail = tail.wrapping_add(1);
        let index = new_tail & 0x1;
        self.slots[index as usize].write(value);
        loop {
            if head == tail {
                let new_pos = Self::pack(head, new_tail);
                self.pos.store(new_pos, Ordering::SeqCst);
                return;
            } else {
                debug_assert_eq!(head.wrapping_add(1), tail);
                let new_pos = Self::pack(tail, new_tail);
                match self.pos.compare_exchange_weak(pos, new_pos, SeqCst, Acquire) {
                    Ok(_) => {
                        let index = tail & 0x1;
                        self.slots[index as usize].drop();
                        return;
                    }
                    Err(_pos) => {
                        if pos != _pos {
                            pos = _pos;
                            let _tail;
                            (head, _tail) = Self::unpack(_pos);
                            debug_assert_eq!(_tail, tail);
                        }
                        continue;
                    }
                }
            }
        }
    }

    #[inline(always)]
    fn _pop(&self, order: Ordering) -> Option<T> {
        let mut pos = self.pos.load(order);
        let mut value_copy: MaybeUninit<T> = MaybeUninit::uninit();
        loop {
            let (head, tail) = Self::unpack(pos);
            if head == tail {
                return None;
            }
            let index = tail & 0x1;
            self.slots[index as usize].read_into(value_copy.as_mut_ptr());
            debug_assert_eq!(head.wrapping_add(1), tail);
            let new_pos = Self::pack(tail, tail);
            match self.pos.compare_exchange_weak(pos, new_pos, SeqCst, order) {
                Err(_pos) => {
                    // Other might read the value, or send might use replace to cancel the value,
                    // should be cas suc to confirm
                    pos = _pos;
                }
                Ok(_) => {
                    return Some(unsafe { value_copy.assume_init_read() });
                }
            }
        }
    }
}

impl<T> Queue for OneSpmc<T> {
    type Item = T;

    #[inline(always)]
    fn len(&self) -> usize {
        if self.is_empty() {
            0
        } else {
            1
        }
    }

    #[inline(always)]
    fn is_empty(&self) -> bool {
        Self::is_empty(self)
    }

    #[inline(always)]
    fn capacity(&self) -> Option<usize> {
        Some(1)
    }

    #[inline(always)]
    fn is_full(&self) -> bool {
        !Self::is_empty(self)
    }

    #[inline(always)]
    fn pop(&self) -> Option<T> where T: Send {
        self._pop(Ordering::SeqCst)
    }

    #[inline]
    fn push(&self, value: T) -> Result<(), T> where T: Send {
        let item = MaybeUninit::new(value);
        if self.try_push(item.as_ptr(), Ordering::SeqCst) {
            Ok(())
        } else {
            Err(unsafe { item.assume_init_read() })
        }
    }
}
*/

impl<T> Queue for OneSpsc<T> {
    type Item = T;

    #[inline(always)]
    fn len(&self) -> usize {
        if self.is_empty() {
            0
        } else {
            1
        }
    }

    #[inline(always)]
    fn is_empty(&self) -> bool {
        Self::is_empty(self)
    }

    #[inline(always)]
    fn capacity(&self) -> Option<usize> {
        Some(1)
    }

    #[inline(always)]
    fn is_full(&self) -> bool {
        !Self::is_empty(self)
    }

    #[inline(always)]
    fn pop(&self) -> Option<T> {
        self._pop(Ordering::SeqCst)
    }

    #[inline]
    fn push(&self, value: T) -> Result<(), T> {
        let item = MaybeUninit::new(value);
        if self.try_push(item.as_ptr(), Ordering::SeqCst) {
            Ok(())
        } else {
            Err(unsafe { item.assume_init_read() })
        }
    }
}

impl<T> FlavorImpl for OneSpsc<T> {
    #[inline(always)]
    fn try_send(&self, item: &MaybeUninit<T>) -> bool {
        self.try_push(item.as_ptr(), Acquire)
    }

    #[inline(always)]
    fn try_send_oneshot(&self, item: *const T) -> Option<bool> {
        Some(self.try_push(item, SeqCst))
    }

    #[inline(always)]
    fn try_recv(&self) -> Option<T> {
        self._pop(Ordering::Acquire)
    }

    #[inline]
    fn try_recv_final(&self) -> Option<T> {
        self._pop(Ordering::SeqCst)
    }

    #[inline]
    fn backoff_limit(&self) -> u16 {
        // Due to bound is too small,
        // yield with MAX_LIMIT to prevent collapse in high contention
        crate::backoff::MAX_LIMIT
    }

    #[inline]
    fn may_direct_copy(&self) -> bool {
        // NOTE sender has no CAS, not safe to direct copy
        false
    }
}

impl<T> FlavorNew for OneSpsc<T> {
    #[inline]
    fn new() -> Self {
        OneSpsc::new()
    }
}

impl<T> FlavorSelect for OneSpsc<T> {
    #[inline]
    fn try_select(&self, final_check: bool) -> Option<Token> {
        if let Some(tail) =
            self.start_read(if final_check { Ordering::SeqCst } else { Ordering::Acquire })
        {
            let index = (tail & 0x1) as usize;
            Some(Token::new(&self.slots[index] as *const Slot<T> as *const u8, tail as usize))
        } else {
            None
        }
    }

    #[inline(always)]
    fn read_with_token(&self, token: Token) -> T {
        let slot: &Slot<T> = unsafe { &*token.pos.cast::<Slot<T>>() };
        self._read(slot, token.stamp as u32)
    }
}