use std::{
ops::{AddAssign, BitAndAssign, BitOrAssign},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
};
use omango_util::{
backoff::Backoff,
cache_padded::CachePadded,
hint::{likely, unlikely},
};
use crate::{
queue::{
state::State,
elem::ElemArr,
waker::{Checker, Waiter, Waker},
error::{SendError, RecvError, TrySendError, TryRecvError},
},
};
const RETRIES: u8 = 2;
const MPMC_MASK_32: u32 = 1 << 31;
const MPMC_MASK_64: u64 = 1 << 63;
pub(crate) trait Bounded<T> {
#[inline]
fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
let (elem, lap, state) = self.select_bucket_4_send();
return if likely(state == State::Success) {
elem.write(lap.wrapping_add(1), msg);
self.wake_reader();
Ok(())
} else if unlikely(state == State::Closed) {
Err(TrySendError::Disconnected(msg))
} else {
Err(TrySendError::Full(msg))
};
}
#[inline]
fn try_recv(&mut self) -> Result<T, TryRecvError> {
let (elem, lap, success) = self.select_bucket_4_recv();
if unlikely(!success) {
return Err(TryRecvError);
}
let msg = elem.read(lap.wrapping_add(1));
self.wake_writer();
Ok(msg)
}
fn send(&mut self, msg: T, checker: &dyn Checker) -> Result<(), SendError<T>> {
loop {
let (elem, lap, state) = self.select_bucket_4_send();
if likely(state == State::Success) {
elem.write(lap.wrapping_add(1), msg);
self.wake_reader();
return Ok(());
} else if unlikely(state == State::Closed) {
return Err(SendError(msg));
}
let waiter = &Waiter::new(elem.atom_lap(), lap);
let mut state = waiter.retry(checker, RETRIES);
if likely(state == State::Success) {
continue;
}
if unlikely(state == State::Closed) {
return Err(SendError(msg));
}
if likely(self.register_writer_waiter(waiter)) {
state = waiter.sleep(checker);
self.unregister_writer_waiter(waiter);
if unlikely(state == State::Closed) {
return Err(SendError(msg));
}
}
}
}
fn recv(&mut self, checker: &dyn Checker) -> Result<T, RecvError> {
let mut state = State::Success;
loop {
let (elem, lap, success) = self.select_bucket_4_recv();
if likely(success) {
let msg = elem.read(lap.wrapping_add(1));
self.wake_writer();
return Ok(msg);
} else if unlikely(state == State::Closed) {
return Err(RecvError);
}
let waiter = &Waiter::new(elem.atom_lap(), lap);
state = waiter.retry(checker, RETRIES);
if likely(state == State::Success) {
continue;
}
if unlikely(state == State::Closed) {
continue;
}
if likely(self.register_reader_waiter(waiter)) {
state = waiter.sleep(checker);
self.unregister_reader_waiter(waiter);
}
}
}
fn close(&self);
fn length(&self) -> u32;
fn select_bucket_4_send(&mut self) -> (&ElemArr<T>, u32, State);
fn select_bucket_4_recv(&mut self) -> (&ElemArr<T>, u32, bool);
fn register_writer_waiter(&self, waiter: &Waiter) -> bool;
fn register_reader_waiter(&self, waiter: &Waiter) -> bool;
fn unregister_writer_waiter(&self, waiter: &Waiter);
fn unregister_reader_waiter(&self, waiter: &Waiter);
fn wake_reader(&self);
fn wake_writer(&self);
fn cast(&self) -> &dyn Checker;
}
struct Meta {
write_meta: u64,
closed: AtomicBool,
}
pub(crate) struct SpscBounded<T> {
read_meta: CachePadded<u64>,
meta: CachePadded<Meta>,
read_waker: CachePadded<Waker>,
write_waker: CachePadded<Waker>,
buffer: Box<[ElemArr<T>]>,
capacity: u32,
}
impl<T> SpscBounded<T> {
#[inline]
pub(crate) fn new(cap: u32) -> Self <> {
assert!(cap <= (1 << 30));
let raw_cap = if cap > 0 {
cap
} else {
1
};
let buf: Box<[ElemArr<T>]> = (0..raw_cap)
.map(|_i| {
ElemArr::default()
})
.collect();
Self {
read_meta: CachePadded::new(1 << 32),
meta: CachePadded::new(Meta{
write_meta: 0,
closed: AtomicBool::new(false),
}),
read_waker: CachePadded::new(Waker::default()),
write_waker: CachePadded::new(Waker::default()),
buffer: buf,
capacity: raw_cap,
}
}
}
impl<T> Bounded<T> for SpscBounded<T> {
#[inline]
fn close(&self) {
self.meta.closed.store(true, Ordering::Relaxed);
self.write_waker.close();
self.read_waker.close();
}
#[inline]
fn length(&self) -> u32 {
let head = self.read_meta.clone() as u32;
let tail = self.meta.write_meta as u32;
if tail > head {
return tail - head + 1;
}
self.capacity - head + tail
}
fn select_bucket_4_send(&mut self) -> (&ElemArr<T>, u32, State) {
let meta = self.meta.write_meta;
let backoff = Backoff::default();
loop {
let idx = meta as u32;
let lap = (meta >> 32) as u32;
let elem = unsafe { self.buffer.get_unchecked(idx as usize) };
if unlikely(self.meta.closed.load(Ordering::Relaxed)) {
return (elem, 0, State::Closed);
}
let elem_lap = elem.load_lap();
if lap == elem_lap {
if likely(idx + 1 < self.capacity) {
self.meta.write_meta += 1;
} else {
self.meta.write_meta = (lap.wrapping_add(2) as u64) << 32;
};
return (elem, elem_lap, State::Success);
} else if lap > elem_lap {
if lap > elem.load_lap() {
return (elem, elem_lap, State::Failed);
}
backoff.spin();
} else {
backoff.snooze();
}
}
}
fn select_bucket_4_recv(&mut self) -> (&ElemArr<T>, u32, bool) {
let meta = self.read_meta.clone();
let idx = meta as u32;
let lap = (meta >> 32) as u32;
let elem = unsafe { self.buffer.get_unchecked(idx as usize) };
let backoff = Backoff::default();
loop {
let elem_lap = elem.load_lap();
if lap == elem_lap {
if likely(idx + 1 < self.capacity) {
self.read_meta.add_assign(1);
} else {
self.read_meta.bitand_assign(0);
self.read_meta.bitor_assign((lap.wrapping_add(2) as u64) << 32);
};
return (elem, elem_lap, true);
} else if lap > elem_lap {
if lap > elem.load_lap() {
return (elem, elem_lap, false);
}
backoff.spin();
} else {
backoff.snooze();
}
}
}
#[inline]
fn register_writer_waiter(&self, waiter: &Waiter) -> bool {
self.write_waker.register(waiter)
}
#[inline]
fn register_reader_waiter(&self, waiter: &Waiter) -> bool {
self.read_waker.register(waiter)
}
#[inline]
fn unregister_writer_waiter(&self, waiter: &Waiter) {
self.write_waker.unregister(waiter);
}
#[inline]
fn unregister_reader_waiter(&self, waiter: &Waiter) {
self.read_waker.unregister(waiter);
}
#[inline]
fn wake_reader(&self) {
self.read_waker.wake();
}
#[inline]
fn wake_writer(&self) {
self.write_waker.wake();
}
#[inline]
fn cast(&self) -> &dyn Checker {
self
}
}
impl<T> Checker for SpscBounded<T> {
#[inline]
fn is_close(&self) -> bool {
self.meta.closed.load(Ordering::Relaxed)
}
}
pub(crate) struct MpmcBounded<T> {
read_meta: CachePadded<AtomicU64>,
write_meta: CachePadded<AtomicU64>,
read_waker: CachePadded<Waker>,
write_waker: CachePadded<Waker>,
buffer: Box<[ElemArr<T>]>,
capacity: u32,
}
impl<T> MpmcBounded<T> {
#[inline]
pub(crate) fn new(cap: u32) -> Self <> {
assert!(cap <= (1 << 30));
let raw_cap = if cap > 0 {
cap
} else {
1
};
let buf: Box<[ElemArr<T>]> = (0..raw_cap)
.map(|_i| {
ElemArr::default()
})
.collect();
Self {
read_meta: CachePadded::new(AtomicU64::new(1 << 32)),
write_meta: CachePadded::new(AtomicU64::new(0)),
read_waker: CachePadded::new(Waker::default()),
write_waker: CachePadded::new(Waker::default()),
buffer: buf,
capacity: raw_cap,
}
}
#[inline]
pub(crate) fn get_cap(&self) -> u32 {
self.capacity
}
}
impl<T> Bounded<T> for MpmcBounded<T> {
fn close(&self) {
let mut meta = self.write_meta.load(Ordering::Acquire);
if (meta & MPMC_MASK_64) != 0 {
return;
}
let mut new_meta: u64;
let backoff = Backoff::default();
loop {
new_meta = meta | MPMC_MASK_64;
match self.write_meta.compare_exchange_weak(
meta,
new_meta,
Ordering::Acquire,
Ordering::Relaxed
) {
Ok(_) => break,
Err(v) => {
meta = v;
backoff.spin();
}
}
}
self.write_waker.close();
self.read_waker.close()
}
#[inline]
fn length(&self) -> u32 {
let head = self.read_meta.load(Ordering::Relaxed) as u32;
let tail = self.write_meta.load(Ordering::Relaxed) as u32;
if tail > head {
return tail - head + 1;
}
self.capacity - head + tail
}
fn select_bucket_4_send(&mut self) -> (&ElemArr<T>, u32, State) {
let mut meta = self.write_meta.load(Ordering::Relaxed);
let backoff = Backoff::default();
loop {
let idx = meta as u32;
let lap = (meta >> 32) as u32;
let elem = unsafe { self.buffer.get_unchecked(idx as usize) };
if unlikely(lap >= MPMC_MASK_32) {
return (elem, 0, State::Closed);
}
let elem_lap = elem.load_lap();
if lap == elem_lap {
let new_tail = if likely(idx + 1 < self.capacity) {
meta + 1
} else {
(lap.wrapping_add(2) as u64) << 32
};
match self.write_meta.compare_exchange_weak(
meta,
new_tail,
Ordering::Acquire,
Ordering::Relaxed
) {
Ok(_) => return (elem, elem_lap, State::Success),
Err(v) => {
meta = v;
backoff.spin();
}
}
} else if lap > elem_lap {
if lap > elem.load_lap() {
return (elem, elem_lap, State::Failed);
}
backoff.spin();
meta = self.write_meta.load(Ordering::Relaxed);
} else {
backoff.snooze();
meta = self.write_meta.load(Ordering::Relaxed);
}
}
}
fn select_bucket_4_recv(&mut self) -> (&ElemArr<T>, u32, bool) {
let mut meta = self.read_meta.load(Ordering::Relaxed);
let backoff = Backoff::default();
loop {
let idx = meta as u32;
let lap = (meta >> 32) as u32;
let elem = unsafe { self.buffer.get_unchecked(idx as usize) };
let elem_lap = elem.load_lap();
if lap == elem_lap {
let new_data = if likely(idx + 1 < self.capacity) {
meta + 1
} else {
(lap.wrapping_add(2) as u64) << 32
};
match self.read_meta.compare_exchange_weak(
meta,
new_data,
Ordering::Acquire,
Ordering::Relaxed
) {
Ok(_) => return (elem, elem_lap, true),
Err(v) => {
meta = v;
backoff.spin();
}
}
} else if lap > elem_lap {
if lap > elem.load_lap() {
return (elem, elem_lap, false);
}
backoff.spin();
meta = self.read_meta.load(Ordering::Relaxed);
} else {
backoff.snooze();
meta = self.read_meta.load(Ordering::Relaxed);
}
}
}
#[inline]
fn register_writer_waiter(&self, waiter: &Waiter) -> bool {
self.write_waker.register(waiter)
}
#[inline]
fn register_reader_waiter(&self, waiter: &Waiter) -> bool {
self.read_waker.register(waiter)
}
#[inline]
fn unregister_writer_waiter(&self, waiter: &Waiter) {
self.write_waker.unregister(waiter);
}
#[inline]
fn unregister_reader_waiter(&self, waiter: &Waiter) {
self.read_waker.unregister(waiter);
}
#[inline]
fn wake_reader(&self) {
self.read_waker.wake();
}
#[inline]
fn wake_writer(&self) {
self.write_waker.wake();
}
#[inline]
fn cast(&self) -> &dyn Checker {
self
}
}
impl<T> Checker for MpmcBounded<T> {
#[inline]
fn is_close(&self) -> bool {
(self.write_meta.load(Ordering::Relaxed) & MPMC_MASK_64) != 0
}
}