use std::sync::atomic::{AtomicU32, Ordering};
pub struct RingBuffer<T> {
buffer: *mut T,
write_idx: *mut AtomicU32,
read_idx: *mut AtomicU32,
mask: u32,
}
unsafe impl<T: Send> Send for RingBuffer<T> {}
unsafe impl<T: Sync> Sync for RingBuffer<T> {}
impl<T: Copy> RingBuffer<T> {
pub unsafe fn new(
buffer: *mut T,
write_idx: *mut AtomicU32,
read_idx: *mut AtomicU32,
capacity: usize,
) -> Self {
assert!(
capacity.is_power_of_two(),
"RingBuffer capacity must be a power of two"
);
Self {
buffer,
write_idx,
read_idx,
mask: (capacity - 1) as u32,
}
}
pub fn push(&self, event: T) -> bool {
let write = unsafe { (*self.write_idx).load(Ordering::Relaxed) };
let read = unsafe { (*self.read_idx).load(Ordering::Acquire) };
let count = write.wrapping_sub(read);
if count as usize >= (self.mask as usize) {
return false;
}
let idx = (write & self.mask) as usize;
unsafe {
self.buffer.add(idx).write(event);
}
unsafe {
(*self.write_idx).store(write.wrapping_add(1), Ordering::Release);
}
true
}
pub fn pop(&self) -> Option<T> {
let read = unsafe { (*self.read_idx).load(Ordering::Relaxed) };
let write = unsafe { (*self.write_idx).load(Ordering::Acquire) };
if read == write {
return None;
}
let idx = (read & self.mask) as usize;
let event = unsafe { self.buffer.add(idx).read() };
unsafe {
(*self.read_idx).store(read.wrapping_add(1), Ordering::Release);
}
Some(event)
}
pub fn len(&self) -> usize {
let write = unsafe { (*self.write_idx).load(Ordering::Acquire) };
let read = unsafe { (*self.read_idx).load(Ordering::Acquire) };
write.wrapping_sub(read) as usize
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicU32;
#[test]
fn overflow_drops_excess() {
let mut buf = vec![0u32; 8];
let write = AtomicU32::new(0);
let read = AtomicU32::new(0);
let rb = unsafe {
RingBuffer::new(
buf.as_mut_ptr(),
&write as *const AtomicU32 as *mut AtomicU32,
&read as *const AtomicU32 as *mut AtomicU32,
8,
)
};
for i in 0..7 {
assert!(rb.push(i), "push {i} should succeed");
}
assert!(!rb.push(99), "overflow push should be rejected");
for i in 0..7 {
assert_eq!(rb.pop(), Some(i), "should read item {i}");
}
assert!(rb.is_empty());
}
}