use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use likely_stable::unlikely;
use crate::block;
use crate::cache_padded::CachePadded;
use crate::common::{Bucket, N_SPIN, Operation, Selector};
use crate::error::{RecvError, SendError, TryRecvError, TrySendError};
use crate::spsc::waker::{Waiter, Waker};
macro_rules! get_selector {
($data:expr, $buffer:expr, $capacity:expr, $selector:expr) => {
let mut data = $data.load(Ordering::Relaxed);
loop {
let pos = data as u32;
let lap = (data >> 32) as u32;
let bucket = unsafe { $buffer.get_unchecked(pos as usize) };
let bucket_lap = bucket.lap.load(Ordering::Acquire);
if lap == bucket_lap {
let new_data = if pos + 1 < $capacity {
data + 1
} else {
((lap + 2) as u64) << 32
};
match $data.compare_exchange_weak(data, new_data, Ordering::Acquire, Ordering::Relaxed) {
Ok(_) => {
$selector.ptr = bucket as *const Bucket<T> as *const u8;
$selector.lap = bucket_lap;
return true;
},
Err(v) => data = v,
}
} else if lap > bucket_lap {
if lap > bucket.lap.load(Ordering::Acquire) {
$selector.ptr = bucket as *const Bucket<T> as *const u8;
$selector.lap = bucket_lap;
return false;
}
data = $data.load(Ordering::Relaxed);
} else {
data = $data.load(Ordering::Relaxed);
}
}
};
}
pub(crate) struct Spsc<T> {
head: CachePadded<AtomicU64>,
tail: CachePadded<AtomicU64>,
buffer: Box<[Bucket<T>]>,
capacity: u32,
is_closed: AtomicBool,
receiver: Waker,
sender: Waker,
}
impl<T> Spsc<T> {
#[inline]
pub(crate) fn new(size: u32) -> Self <> {
let buf: Box<[Bucket<T>]> = (0..size + 1)
.map(|_i| {
Bucket::default()
})
.collect();
Self {
head: CachePadded::new(AtomicU64::new(1 << 32)),
tail: CachePadded::new(AtomicU64::new(0)),
buffer: buf,
capacity: size + 1,
is_closed: AtomicBool::new(false),
receiver: Waker::new(),
sender: Waker::new(),
}
}
pub(crate) fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
if unlikely(self.is_closed.load(Ordering::Relaxed)) {
return Err(TrySendError::Disconnected(msg));
}
let selector = &mut Selector::default();
if !self.get_selector(Operation::Sending, selector) {
return Err(TrySendError::Full(msg));
}
selector.write_message(msg);
self.receiver.notify();
Ok(())
}
pub(crate) fn try_recv(&mut self) -> Result<T, TryRecvError> {
let selector = &mut Selector::default();
if !self.get_selector(Operation::Receiving, selector) {
return Err(TryRecvError);
}
let msg = selector.read_message();
self.sender.notify();
Ok(msg)
}
pub(crate) fn send(&mut self, msg: T) -> Result<(), SendError<T>> {
let selector = &mut Selector::default();
loop {
for _ in 0..N_SPIN {
if unlikely(self.is_closed.load(Ordering::Relaxed)) {
return Err(SendError(msg));
}
if self.get_selector(Operation::Sending, selector) {
selector.write_message(msg);
self.receiver.notify();
return Ok(());
}
core::hint::spin_loop();
}
block!(self.is_closed, self.sender, selector);
}
}
pub(crate) fn recv(&mut self) -> Result<T, RecvError> {
let selector = &mut Selector::default();
loop {
for _ in 0..N_SPIN {
if unlikely(self.is_closed.load(Ordering::Relaxed)) {
return Err(RecvError);
}
if self.get_selector(Operation::Receiving, selector) {
let msg = selector.read_message();
self.sender.notify();
return Ok(msg);
}
core::hint::spin_loop();
}
block!(self.is_closed, self.receiver, selector);
}
}
#[inline]
pub(crate) fn close(&mut self) {
if !self.is_closed.swap(true, Ordering::Relaxed) {
self.receiver.notify();
self.sender.notify();
}
}
fn get_selector(&mut self, operation: Operation, selector: &mut Selector) -> bool {
match operation {
Operation::Sending => {
get_selector!(self.tail, self.buffer, self.capacity, selector);
}
Operation::Receiving => {
get_selector!(self.head, self.buffer, self.capacity, selector);
}
}
}
}