use core::cell::UnsafeCell;
use core::mem::MaybeUninit;
use core::sync::atomic::{AtomicBool, AtomicUsize, AtomicU8, Ordering};
#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, sync::Arc, vec::Vec};
#[cfg(feature = "std")]
use std::sync::Arc;
const EMPTY: u8 = 0;
const WRITING: u8 = 1;
const READY: u8 = 2;
struct Slot<T> {
state: AtomicU8,
data: UnsafeCell<MaybeUninit<T>>,
}
unsafe impl<T: Send> Send for Slot<T> {}
unsafe impl<T: Send> Sync for Slot<T> {}
pub struct LossyQueue<T> {
mask: usize,
tail: AtomicUsize,
head: AtomicUsize,
buffer: Box<[Slot<T>]>,
}
unsafe impl<T: Send> Send for LossyQueue<T> {}
unsafe impl<T: Send> Sync for LossyQueue<T> {}
impl<T> LossyQueue<T> {
pub fn new(capacity: usize) -> Self {
assert!(
capacity.is_power_of_two(),
"LossyQueue capacity must be a power of two"
);
let mut buf = Vec::with_capacity(capacity);
for _ in 0..capacity {
buf.push(Slot {
state: AtomicU8::new(EMPTY),
data: UnsafeCell::new(MaybeUninit::uninit()),
});
}
Self {
mask: capacity - 1,
tail: AtomicUsize::new(0),
head: AtomicUsize::new(0),
buffer: buf.into_boxed_slice(),
}
}
#[inline(always)]
pub fn try_send(&self, item: T) -> Result<(), T> {
let tail = self.tail.load(Ordering::Relaxed);
let head = self.head.load(Ordering::Acquire);
if tail.wrapping_sub(head) >= self.buffer.len() {
return Err(item);
}
let idx = self.tail.fetch_add(1, Ordering::Relaxed) & self.mask;
let slot = &self.buffer[idx];
if slot
.state
.compare_exchange(EMPTY, WRITING, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
return Err(item);
}
unsafe { (*slot.data.get()).write(item) };
slot.state.store(READY, Ordering::Release);
Ok(())
}
pub fn send_blocking(&self, mut item: T) {
loop {
match self.try_send(item) {
Ok(_) => return,
Err(returned_item) => {
item = returned_item;
core::hint::spin_loop();
}
}
}
}
#[inline(always)]
pub fn try_recv(&self) -> Option<T> {
let idx = self.head.load(Ordering::Relaxed) & self.mask;
let slot = &self.buffer[idx];
if slot.state.load(Ordering::Acquire) == READY {
let item = unsafe { (*slot.data.get()).assume_init_read() };
slot.state.store(EMPTY, Ordering::Release);
self.head.fetch_add(1, Ordering::Relaxed);
Some(item)
} else {
None
}
}
}
impl<T> Drop for LossyQueue<T> {
fn drop(&mut self) {
loop {
let idx = self.head.load(Ordering::Relaxed) & self.mask;
let slot = &self.buffer[idx];
if slot.state.load(Ordering::Acquire) == READY {
unsafe { (*slot.data.get()).assume_init_drop() };
slot.state.store(EMPTY, Ordering::Relaxed);
self.head.fetch_add(1, Ordering::Relaxed);
} else {
break;
}
}
}
}
pub struct OneshotAck {
ready: AtomicBool,
}
impl OneshotAck {
pub fn new() -> Arc<Self> {
Arc::new(Self {
ready: AtomicBool::new(false),
})
}
#[inline(always)]
pub fn signal(&self) {
self.ready.store(true, Ordering::Release);
}
#[inline(always)]
pub fn wait(&self) {
while !self.ready.load(Ordering::Acquire) {
core::hint::spin_loop();
}
}
}