use core::cell::{Cell, UnsafeCell};
use core::marker::PhantomData;
use core::mem::MaybeUninit;
use core::sync::atomic::Ordering;
#[cfg(target_has_atomic = "32")]
use core::sync::atomic::{AtomicBool, AtomicU32};
#[cfg(all(not(target_has_atomic = "32"), feature = "portable-atomic"))]
use portable_atomic::{AtomicBool, AtomicU32};
fn unsafe_cell_array<T, const N: usize>() -> [UnsafeCell<MaybeUninit<T>>; N] {
core::array::from_fn(|_| UnsafeCell::new(MaybeUninit::uninit()))
}
pub struct EventBuf<T: Copy, const N: usize> {
head: AtomicU32,
tail: AtomicU32,
slots: [UnsafeCell<MaybeUninit<T>>; N],
producer_taken: AtomicBool,
consumer_taken: AtomicBool,
}
unsafe impl<T: Copy + Send, const N: usize> Sync for EventBuf<T, N> {}
impl<T: Copy, const N: usize> EventBuf<T, N> {
pub fn new() -> Self {
assert!(N > 0, "EventBuf capacity N must be > 0");
Self {
head: AtomicU32::new(0),
tail: AtomicU32::new(0),
slots: unsafe_cell_array::<T, N>(),
producer_taken: AtomicBool::new(false),
consumer_taken: AtomicBool::new(false),
}
}
#[inline(always)]
const fn slot_index(pos: u32) -> usize {
(pos as usize) % N
}
#[inline]
pub const fn capacity(&self) -> usize {
N
}
#[inline]
pub fn len(&self) -> usize {
let h = self.head.load(Ordering::Relaxed);
let t = self.tail.load(Ordering::Relaxed);
h.wrapping_sub(t) as usize
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn is_full(&self) -> bool {
self.len() >= N
}
#[inline]
pub fn producer(&self) -> Producer<'_, T, N> {
assert!(
!self.producer_taken.swap(true, Ordering::AcqRel),
"EventBuf: only one Producer may be active at a time"
);
Producer {
buf: self,
_not_sync: PhantomData,
}
}
#[inline]
pub fn consumer(&self) -> Consumer<'_, T, N> {
assert!(
!self.consumer_taken.swap(true, Ordering::AcqRel),
"EventBuf: only one Consumer may be active at a time"
);
Consumer {
buf: self,
_not_sync: PhantomData,
}
}
}
impl<T: Copy, const N: usize> Default for EventBuf<T, N> {
fn default() -> Self {
Self::new()
}
}
impl<T: Copy, const N: usize> core::fmt::Debug for EventBuf<T, N> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("EventBuf")
.field("len", &self.len())
.field("capacity", &N)
.finish()
}
}
pub struct Producer<'a, T: Copy, const N: usize> {
buf: &'a EventBuf<T, N>,
_not_sync: PhantomData<Cell<()>>,
}
impl<T: Copy, const N: usize> Producer<'_, T, N> {
#[inline]
pub fn push(&self, val: T) -> Result<(), T> {
let head = self.buf.head.load(Ordering::Relaxed);
let tail = self.buf.tail.load(Ordering::Acquire);
if head.wrapping_sub(tail) as usize >= N {
return Err(val);
}
let idx = EventBuf::<T, N>::slot_index(head);
unsafe {
(*self.buf.slots[idx].get()).write(val);
}
self.buf.head.store(head.wrapping_add(1), Ordering::Release);
Ok(())
}
}
impl<T: Copy, const N: usize> Drop for Producer<'_, T, N> {
fn drop(&mut self) {
self.buf.producer_taken.store(false, Ordering::Release);
}
}
impl<T: Copy, const N: usize> core::fmt::Debug for Producer<'_, T, N> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("event_buf::Producer")
.field("capacity", &N)
.finish()
}
}
pub struct Consumer<'a, T: Copy, const N: usize> {
buf: &'a EventBuf<T, N>,
_not_sync: PhantomData<Cell<()>>,
}
impl<T: Copy, const N: usize> Consumer<'_, T, N> {
#[inline]
pub fn pop(&self) -> Option<T> {
let tail = self.buf.tail.load(Ordering::Relaxed);
let head = self.buf.head.load(Ordering::Acquire);
if tail == head {
return None;
}
let idx = EventBuf::<T, N>::slot_index(tail);
let val = unsafe { (*self.buf.slots[idx].get()).assume_init_read() };
self.buf.tail.store(tail.wrapping_add(1), Ordering::Release);
Some(val)
}
#[inline]
pub fn drain(&self, max: usize, mut hook: impl FnMut(T)) -> usize {
let mut count = 0;
while count < max {
match self.pop() {
Some(val) => {
hook(val);
count += 1;
}
None => break,
}
}
count
}
}
impl<T: Copy, const N: usize> Drop for Consumer<'_, T, N> {
fn drop(&mut self) {
self.buf.consumer_taken.store(false, Ordering::Release);
}
}
impl<T: Copy, const N: usize> core::fmt::Debug for Consumer<'_, T, N> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("event_buf::Consumer")
.field("capacity", &N)
.finish()
}
}
impl<T: Copy, const N: usize> crate::traits::Sink<T> for Producer<'_, T, N> {
type Error = T;
#[inline]
fn try_push(&mut self, val: T) -> Result<(), T> {
self.push(val)
}
}
impl<T: Copy, const N: usize> crate::traits::Source<T> for Consumer<'_, T, N> {
#[inline]
fn try_pop(&mut self) -> Option<T> {
self.pop()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_buf_is_empty() {
let buf = EventBuf::<u32, 4>::new();
assert!(buf.is_empty());
assert!(!buf.is_full());
assert_eq!(buf.len(), 0);
assert_eq!(buf.capacity(), 4);
}
#[test]
fn push_and_pop_fifo() {
let buf = EventBuf::<u32, 4>::new();
let p = buf.producer();
let c = buf.consumer();
assert!(p.push(10).is_ok());
assert!(p.push(20).is_ok());
assert!(p.push(30).is_ok());
assert_eq!(c.pop(), Some(10));
assert_eq!(c.pop(), Some(20));
assert_eq!(c.pop(), Some(30));
assert_eq!(c.pop(), None);
}
#[test]
fn push_rejects_when_full() {
let buf = EventBuf::<u32, 2>::new();
let p = buf.producer();
let c = buf.consumer();
assert!(p.push(1).is_ok());
assert!(p.push(2).is_ok());
assert_eq!(p.push(3), Err(3));
assert_eq!(c.pop(), Some(1));
assert!(p.push(3).is_ok());
}
#[test]
fn drain_returns_count() {
let buf = EventBuf::<u32, 8>::new();
let p = buf.producer();
let c = buf.consumer();
for i in 0..5 {
p.push(i).unwrap();
}
let mut out = std::vec::Vec::new();
let n = c.drain(3, |v| out.push(v));
assert_eq!(n, 3);
assert_eq!(out, [0, 1, 2]);
let n = c.drain(100, |v| out.push(v));
assert_eq!(n, 2);
assert_eq!(out, [0, 1, 2, 3, 4]);
}
#[test]
fn drain_on_empty_returns_zero() {
let buf = EventBuf::<u32, 4>::new();
let _p = buf.producer();
let c = buf.consumer();
let n = c.drain(10, |_| panic!("should not be called"));
assert_eq!(n, 0);
}
#[test]
fn producer_consumer_can_be_recreated() {
let buf = EventBuf::<u32, 4>::new();
{
let p = buf.producer();
p.push(1).unwrap();
}
let p = buf.producer();
p.push(2).unwrap();
{
let c = buf.consumer();
assert_eq!(c.pop(), Some(1));
}
let c = buf.consumer();
assert_eq!(c.pop(), Some(2));
assert_eq!(c.pop(), None);
}
#[test]
#[should_panic(expected = "only one Producer")]
fn double_producer_panics() {
let buf = EventBuf::<u32, 4>::new();
let _p1 = buf.producer();
let _p2 = buf.producer();
}
#[test]
#[should_panic(expected = "only one Consumer")]
fn double_consumer_panics() {
let buf = EventBuf::<u32, 4>::new();
let _c1 = buf.consumer();
let _c2 = buf.consumer();
}
#[test]
fn wraps_around_correctly() {
let buf = EventBuf::<u32, 3>::new();
let p = buf.producer();
let c = buf.consumer();
for round in 0u32..4 {
let base = round * 3;
for i in 0..3 {
assert!(p.push(base + i).is_ok());
}
assert_eq!(p.push(99), Err(99)); for i in 0..3 {
assert_eq!(c.pop(), Some(base + i));
}
assert_eq!(c.pop(), None); }
}
#[test]
fn default_is_new() {
let buf: EventBuf<u8, 4> = EventBuf::default();
assert!(buf.is_empty());
}
#[test]
fn len_and_full_track_state() {
let buf = EventBuf::<u32, 3>::new();
let p = buf.producer();
let c = buf.consumer();
assert_eq!(buf.len(), 0);
assert!(buf.is_empty());
p.push(1).unwrap();
assert_eq!(buf.len(), 1);
p.push(2).unwrap();
p.push(3).unwrap();
assert_eq!(buf.len(), 3);
assert!(buf.is_full());
c.pop();
assert_eq!(buf.len(), 2);
assert!(!buf.is_full());
}
#[test]
fn handles_are_send() {
fn assert_send<T: Send>() {}
assert_send::<super::Producer<'_, u32, 4>>();
assert_send::<super::Consumer<'_, u32, 4>>();
}
}