#![no_std]
extern crate alloc;
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use core::{
mem::{self, MaybeUninit},
sync::atomic::{AtomicUsize, Ordering},
};
use crossbeam_utils::CachePadded;
pub fn ringbuffer<T>(capacity: usize) -> (RingBufferWriter<T>, RingBufferReader<T>) {
assert!(capacity.is_power_of_two(), "Capacity must be a power of 2");
let v = (0..capacity)
.map(|_| MaybeUninit::uninit())
.collect::<Vec<_>>()
.into_boxed_slice();
let rb = Arc::new(RingBuffer {
ptr: Box::into_raw(v),
mask: capacity - 1,
idx_r: CachePadded::new(AtomicUsize::new(0)),
idx_w: CachePadded::new(AtomicUsize::new(0)),
});
(
RingBufferWriter {
inner: rb.clone(),
cached_idx_r: 0,
local_idx_w: 0,
},
RingBufferReader {
inner: rb,
local_idx_r: 0,
cached_idx_w: 0,
},
)
}
struct RingBuffer<T> {
ptr: *mut [MaybeUninit<T>],
mask: usize,
idx_r: CachePadded<AtomicUsize>,
idx_w: CachePadded<AtomicUsize>,
}
impl<T> RingBuffer<T> {
#[allow(clippy::mut_from_ref)]
unsafe fn get_unchecked_mut(&self, idx: usize) -> &mut MaybeUninit<T> {
unsafe {
let base = self.ptr as *mut MaybeUninit<T>;
&mut *base.add(idx & self.mask)
}
}
}
impl<T> Drop for RingBuffer<T> {
fn drop(&mut self) {
let mut idx_r = self.idx_r.load(Ordering::Acquire);
let idx_w = self.idx_w.load(Ordering::Acquire);
while idx_r != idx_w {
let t = unsafe {
mem::replace(self.get_unchecked_mut(idx_r), MaybeUninit::uninit()).assume_init()
};
mem::drop(t);
idx_r = idx_r.wrapping_add(1);
}
let ptr = unsafe { Box::from_raw(self.ptr) };
mem::drop(ptr);
}
}
pub struct RingBufferWriter<T> {
inner: Arc<RingBuffer<T>>,
cached_idx_r: usize,
local_idx_w: usize,
}
unsafe impl<T: Send> Send for RingBufferWriter<T> {}
unsafe impl<T: Sync> Sync for RingBufferWriter<T> {}
impl<T> RingBufferWriter<T> {
pub fn capacity(&self) -> usize {
self.inner.ptr.len()
}
#[inline]
pub fn push(&mut self, t: T) -> Option<T> {
if self.is_full() {
return Some(t);
}
let _ = mem::replace(
unsafe { self.inner.get_unchecked_mut(self.local_idx_w) },
MaybeUninit::new(t),
);
self.local_idx_w = self.local_idx_w.wrapping_add(1);
self.inner.idx_w.store(self.local_idx_w, Ordering::Release);
None
}
#[inline]
pub fn is_full(&mut self) -> bool {
if self.local_idx_w.wrapping_sub(self.cached_idx_r) == self.inner.ptr.len() {
self.cached_idx_r = self.inner.idx_r.load(Ordering::Acquire);
self.local_idx_w.wrapping_sub(self.cached_idx_r) == self.inner.ptr.len()
} else {
false
}
}
}
pub struct RingBufferReader<T> {
inner: Arc<RingBuffer<T>>,
local_idx_r: usize,
cached_idx_w: usize,
}
unsafe impl<T: Send> Send for RingBufferReader<T> {}
unsafe impl<T: Sync> Sync for RingBufferReader<T> {}
impl<T> RingBufferReader<T> {
pub fn capacity(&self) -> usize {
self.inner.ptr.len()
}
#[inline]
pub fn pull(&mut self) -> Option<T> {
if self.is_empty() {
return None;
}
let t = unsafe {
mem::replace(
self.inner.get_unchecked_mut(self.local_idx_r),
MaybeUninit::uninit(),
)
.assume_init()
};
self.local_idx_r = self.local_idx_r.wrapping_add(1);
self.inner.idx_r.store(self.local_idx_r, Ordering::Release);
Some(t)
}
#[inline]
pub fn peek(&mut self) -> Option<&T> {
if self.is_empty() {
return None;
}
Some(unsafe {
self.inner
.get_unchecked_mut(self.local_idx_r)
.assume_init_ref()
})
}
#[inline]
pub fn peek_mut(&mut self) -> Option<&mut T> {
if self.is_empty() {
return None;
}
Some(unsafe {
self.inner
.get_unchecked_mut(self.local_idx_r)
.assume_init_mut()
})
}
#[inline]
pub fn is_empty(&mut self) -> bool {
if self.local_idx_r == self.cached_idx_w {
self.cached_idx_w = self.inner.idx_w.load(Ordering::Acquire);
self.local_idx_r == self.cached_idx_w
} else {
false
}
}
}