use core::{fmt, mem};
use core::cell::UnsafeCell;
use core::sync::atomic::{AtomicUsize, Ordering};
struct Assert<const C: usize>;
impl<const C: usize> Assert<C> {
const RING_BUFFER_GOOD_CAPACITY: () = {
assert!(C != 0, "Capacity cannot be 0");
assert!((C & (C - 1)) == 0, "Capacity is not power of 2");
};
}
pub struct RingBuffer<T, const C: usize> {
inner: [UnsafeCell<mem::MaybeUninit<T>>; C],
read: AtomicUsize,
write: AtomicUsize,
}
impl<T, const CAPACITY: usize> RingBuffer<T, CAPACITY> {
const INIT: UnsafeCell<mem::MaybeUninit<T>> = UnsafeCell::new(mem::MaybeUninit::uninit());
#[inline(always)]
pub const fn new() -> Self {
let _ = Assert::<CAPACITY>::RING_BUFFER_GOOD_CAPACITY;
Self {
inner: [Self::INIT; CAPACITY],
read: AtomicUsize::new(0),
write: AtomicUsize::new(0),
}
}
#[inline(always)]
const fn mask_idx(idx: usize) -> usize {
idx & (CAPACITY - 1)
}
#[inline(always)]
pub const fn capacity(&self) -> usize {
CAPACITY
}
#[inline(always)]
pub fn size(&self) -> usize {
self.write.load(Ordering::Relaxed).wrapping_sub(self.read.load(Ordering::Relaxed))
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
self.write.load(Ordering::Relaxed) == self.read.load(Ordering::Relaxed)
}
#[inline(always)]
pub fn is_full(&self) -> bool {
self.size() == CAPACITY
}
pub fn push(&mut self, value: T) {
let write = self.write.fetch_add(1, Ordering::Relaxed);
let read = self.read.load(Ordering::Relaxed);
let remaning = write.wrapping_sub(read);
if remaning == CAPACITY {
unsafe {
(self.inner.get_unchecked(Self::mask_idx(read)).get() as *const T).read();
}
self.read.store(read.wrapping_add(1), Ordering::Relaxed);
}
unsafe {
self.inner.get_unchecked(Self::mask_idx(write)).get().write(mem::MaybeUninit::new(value))
}
}
#[inline]
pub fn try_push(&mut self, value: T) -> Option<T> {
self.inner_push(value, Ordering::Relaxed, Ordering::Relaxed)
}
#[inline]
fn inner_push(&self, value: T, read_op: Ordering, write_op: Ordering) -> Option<T> {
let idx = self.write.load(Ordering::Relaxed);
let remaning = idx.wrapping_sub(self.read.load(read_op));
if remaning != CAPACITY {
unsafe {
self.inner.get_unchecked(Self::mask_idx(idx)).get().write(mem::MaybeUninit::new(value))
}
self.write.store(idx.wrapping_add(1), write_op);
None
} else {
Some(value)
}
}
#[inline]
pub unsafe fn push_unchecked(&mut self, value: T) {
let idx = Self::mask_idx(
self.write.fetch_add(1, Ordering::Relaxed)
);
self.inner.get_unchecked(idx).get().write(mem::MaybeUninit::new(value))
}
#[inline(always)]
pub fn pop(&mut self) -> Option<T> {
self.inner_pop(Ordering::Relaxed, Ordering::Relaxed)
}
#[inline]
fn inner_pop(&self, write_op: Ordering, read_op: Ordering) -> Option<T> {
let idx = self.read.load(Ordering::Relaxed);
if idx != self.write.load(write_op) {
let value = unsafe {
(self.inner.get_unchecked(Self::mask_idx(idx)).get() as *const T).read()
};
self.read.store(idx.wrapping_add(1), read_op);
Some(value)
} else {
None
}
}
#[inline]
pub unsafe fn pop_unchecked(&mut self) -> T {
let idx = Self::mask_idx(
self.read.fetch_add(1, Ordering::Relaxed)
);
(self.inner.get_unchecked(idx).get() as *const T).read()
}
#[inline(always)]
pub fn clear(&mut self) {
if mem::needs_drop::<T>() {
for _ in 0..self.size() {
unsafe {
self.pop_unchecked();
}
}
} else {
self.read.store(self.write.load(Ordering::Relaxed), Ordering::Relaxed);
}
}
#[inline(always)]
pub fn split(&mut self) -> (Producer<'_, T, CAPACITY>, Consumer<'_, T, CAPACITY>) {
(Producer(self), Consumer(self))
}
}
impl<T, const CAPACITY: usize> Drop for RingBuffer<T, CAPACITY> {
#[inline(always)]
fn drop(&mut self) {
self.clear();
}
}
impl<T, const CAPACITY: usize> Default for RingBuffer<T, CAPACITY> {
#[inline(always)]
fn default() -> Self {
Self::new()
}
}
impl<T, const CAPACITY: usize> fmt::Debug for RingBuffer<T, CAPACITY> {
#[inline(always)]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("RingBuffer")
.field("capacity", &CAPACITY)
.field("size", &self.size())
.finish()
}
}
pub struct Consumer<'a, T, const N: usize>(&'a RingBuffer<T, N>);
impl<'a, T, const N: usize> Consumer<'a, T, N> {
#[inline(always)]
pub fn pop(&self) -> Option<T> {
self.0.inner_pop(Ordering::Acquire, Ordering::Release)
}
}
unsafe impl<'a, T, const N: usize> Sync for Consumer<'a, T, N> {
}
unsafe impl<'a, T, const N: usize> Send for Consumer<'a, T, N> {
}
impl<'a, T, const N: usize> fmt::Debug for Consumer<'a, T, N> {
#[inline(always)]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self.0, fmt)
}
}
pub struct Producer<'a, T, const N: usize>(&'a RingBuffer<T, N>);
impl<'a, T, const N: usize> Producer<'a, T, N> {
#[inline]
pub fn try_push(&self, value: T) -> Option<T> {
self.0.inner_push(value, Ordering::Acquire, Ordering::Release)
}
}
unsafe impl<'a, T, const N: usize> Sync for Producer<'a, T, N> {
}
unsafe impl<'a, T, const N: usize> Send for Producer<'a, T, N> {
}
impl<'a, T, const N: usize> fmt::Debug for Producer<'a, T, N> {
#[inline(always)]
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(self.0, fmt)
}
}