use std::alloc::{Layout, alloc_zeroed, dealloc};
use std::ffi::c_void;
use std::sync::atomic::{AtomicUsize, Ordering};
#[repr(C)]
pub struct RingBufferCtrl {
head: AtomicUsize,
tail: AtomicUsize,
capacity: usize,
slot_size: usize,
data: *mut u8,
}
unsafe impl Send for RingBufferCtrl {}
unsafe impl Sync for RingBufferCtrl {}
impl RingBufferCtrl {
#[inline]
fn slot_ptr(&self, index: usize) -> *mut u8 {
unsafe { self.data.add((index % self.capacity) * self.slot_size) }
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_rb_new(capacity: usize, slot_size: usize) -> *mut RingBufferCtrl {
assert!(capacity > 0 && slot_size > 0);
let layout = Layout::from_size_align(capacity * slot_size, 8).unwrap();
let data = unsafe { alloc_zeroed(layout) };
if data.is_null() {
return std::ptr::null_mut();
}
Box::into_raw(Box::new(RingBufferCtrl {
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
capacity,
slot_size,
data,
}))
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_rb_destroy(ctrl: *mut RingBufferCtrl) {
if ctrl.is_null() {
return;
}
unsafe {
let ctrl = Box::from_raw(ctrl);
let layout = Layout::from_size_align(ctrl.capacity * ctrl.slot_size, 8).unwrap();
dealloc(ctrl.data, layout);
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_rb_try_push(ctrl: *mut RingBufferCtrl, src: *const c_void) -> bool {
unsafe {
let ctrl = &*ctrl;
let head = ctrl.head.load(Ordering::Relaxed);
let tail = ctrl.tail.load(Ordering::Acquire);
if head.wrapping_sub(tail) >= ctrl.capacity {
return false;
}
std::ptr::copy_nonoverlapping(src as *const u8, ctrl.slot_ptr(head), ctrl.slot_size);
ctrl.head.store(head.wrapping_add(1), Ordering::Release);
true
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_rb_force_push(ctrl: *mut RingBufferCtrl, src: *const c_void) {
unsafe {
let ctrl = &*ctrl;
let head = ctrl.head.load(Ordering::Relaxed);
let tail = ctrl.tail.load(Ordering::Acquire);
if head.wrapping_sub(tail) >= ctrl.capacity {
ctrl.tail.store(tail.wrapping_add(1), Ordering::Release);
}
std::ptr::copy_nonoverlapping(src as *const u8, ctrl.slot_ptr(head), ctrl.slot_size);
ctrl.head.store(head.wrapping_add(1), Ordering::Release);
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_rb_try_pop(ctrl: *mut RingBufferCtrl, dst: *mut c_void) -> bool {
unsafe {
let ctrl = &*ctrl;
let tail = ctrl.tail.load(Ordering::Relaxed);
let head = ctrl.head.load(Ordering::Acquire);
if head == tail {
return false;
}
std::ptr::copy_nonoverlapping(ctrl.slot_ptr(tail), dst as *mut u8, ctrl.slot_size);
ctrl.tail.store(tail.wrapping_add(1), Ordering::Release);
true
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_rb_len(ctrl: *const RingBufferCtrl) -> usize {
unsafe {
let ctrl = &*ctrl;
let head = ctrl.head.load(Ordering::Acquire);
let tail = ctrl.tail.load(Ordering::Acquire);
head.wrapping_sub(tail)
}
}
#[unsafe(no_mangle)]
pub unsafe extern "C" fn roplat_rb_capacity(ctrl: *const RingBufferCtrl) -> usize {
unsafe { (*ctrl).capacity }
}
pub struct RingBufferWriter<T> {
ctrl: *mut RingBufferCtrl,
_marker: std::marker::PhantomData<T>,
}
unsafe impl<T: Send> Send for RingBufferWriter<T> {}
impl<T> RingBufferWriter<T> {
pub unsafe fn new(ctrl: *mut RingBufferCtrl) -> Self {
Self { ctrl, _marker: std::marker::PhantomData }
}
pub fn try_push(&mut self, data: &T) -> bool {
unsafe { roplat_rb_try_push(self.ctrl, data as *const T as *const c_void) }
}
pub fn force_push(&mut self, data: &T) {
unsafe { roplat_rb_force_push(self.ctrl, data as *const T as *const c_void) }
}
pub fn len(&self) -> usize {
unsafe { roplat_rb_len(self.ctrl) }
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn is_full(&self) -> bool {
self.len() >= unsafe { roplat_rb_capacity(self.ctrl) }
}
}
pub struct RingBufferReader<T> {
ctrl: *mut RingBufferCtrl,
_marker: std::marker::PhantomData<T>,
}
unsafe impl<T: Send> Send for RingBufferReader<T> {}
impl<T> RingBufferReader<T> {
pub unsafe fn new(ctrl: *mut RingBufferCtrl) -> Self {
Self { ctrl, _marker: std::marker::PhantomData }
}
pub fn try_pop(&mut self) -> Option<T> {
let mut out = std::mem::MaybeUninit::<T>::uninit();
let ok = unsafe { roplat_rb_try_pop(self.ctrl, out.as_mut_ptr() as *mut c_void) };
if ok {
Some(unsafe { out.assume_init() })
} else {
None
}
}
pub fn len(&self) -> usize {
unsafe { roplat_rb_len(self.ctrl) }
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
pub fn create_ring_buffer<T>(capacity: usize) -> (RingBufferWriter<T>, RingBufferReader<T>) {
let ctrl = unsafe { roplat_rb_new(capacity, std::mem::size_of::<T>()) };
assert!(!ctrl.is_null(), "failed to allocate ring buffer");
let writer = unsafe { RingBufferWriter::new(ctrl) };
let reader = unsafe { RingBufferReader::new(ctrl) };
(writer, reader)
}
pub struct RingBufferChannel<T> {
ctrl: *mut RingBufferCtrl,
_marker: std::marker::PhantomData<T>,
}
unsafe impl<T: Send> Send for RingBufferChannel<T> {}
unsafe impl<T: Send + Sync> Sync for RingBufferChannel<T> {}
impl<T> RingBufferChannel<T> {
pub fn new(capacity: usize) -> Self {
let ctrl = unsafe { roplat_rb_new(capacity, std::mem::size_of::<T>()) };
assert!(!ctrl.is_null(), "failed to allocate ring buffer");
Self { ctrl, _marker: std::marker::PhantomData }
}
pub fn writer(&self) -> RingBufferWriter<T> {
unsafe { RingBufferWriter::new(self.ctrl) }
}
pub fn reader(&self) -> RingBufferReader<T> {
unsafe { RingBufferReader::new(self.ctrl) }
}
pub fn capacity(&self) -> usize {
unsafe { roplat_rb_capacity(self.ctrl) }
}
}
impl<T> Drop for RingBufferChannel<T> {
fn drop(&mut self) {
unsafe { roplat_rb_destroy(self.ctrl) }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_push_pop() {
let (mut writer, mut reader) = create_ring_buffer::<i32>(4);
assert!(reader.try_pop().is_none());
assert!(writer.try_push(&10));
assert!(writer.try_push(&20));
assert!(writer.try_push(&30));
assert_eq!(reader.try_pop(), Some(10));
assert_eq!(reader.try_pop(), Some(20));
assert_eq!(reader.try_pop(), Some(30));
assert!(reader.try_pop().is_none());
}
#[test]
fn test_full_buffer() {
let (mut writer, mut reader) = create_ring_buffer::<u64>(2);
assert!(writer.try_push(&1));
assert!(writer.try_push(&2));
assert!(!writer.try_push(&3));
assert_eq!(reader.try_pop(), Some(1));
assert!(writer.try_push(&3)); assert_eq!(reader.try_pop(), Some(2));
assert_eq!(reader.try_pop(), Some(3));
}
#[test]
fn test_force_push_overwrites() {
let (mut writer, mut reader) = create_ring_buffer::<i32>(2);
writer.force_push(&1);
writer.force_push(&2);
writer.force_push(&3);
assert_eq!(reader.try_pop(), Some(2));
assert_eq!(reader.try_pop(), Some(3));
assert!(reader.try_pop().is_none());
}
#[test]
fn test_len_and_capacity() {
let (mut writer, mut reader) = create_ring_buffer::<f64>(8);
assert_eq!(writer.len(), 0);
assert!(writer.is_empty());
assert!(!writer.is_full());
for i in 0..8 {
writer.try_push(&(i as f64));
}
assert_eq!(writer.len(), 8);
assert!(writer.is_full());
reader.try_pop();
assert_eq!(reader.len(), 7);
}
#[test]
#[allow(clippy::approx_constant)] fn test_repr_c_struct() {
#[derive(Clone, Debug, PartialEq)]
#[repr(C)]
struct Packet {
id: u64,
value: f64,
}
let (mut writer, mut reader) = create_ring_buffer::<Packet>(4);
writer.try_push(&Packet { id: 1, value: 3.14 });
writer.try_push(&Packet { id: 2, value: 2.72 });
let p1 = reader.try_pop().unwrap();
assert_eq!(p1.id, 1);
assert_eq!(p1.value, 3.14);
let p2 = reader.try_pop().unwrap();
assert_eq!(p2.id, 2);
assert_eq!(p2.value, 2.72);
}
#[test]
fn test_wraparound() {
let (mut writer, mut reader) = create_ring_buffer::<u32>(3);
for round in 0..10 {
for i in 0..3 {
assert!(writer.try_push(&(round * 10 + i)));
}
for i in 0..3 {
assert_eq!(reader.try_pop(), Some(round * 10 + i));
}
}
}
#[test]
fn test_spsc_threading() {
use std::thread;
let (mut writer, mut reader) = create_ring_buffer::<u64>(64);
let count = 10_000u64;
let producer = thread::spawn(move || {
for i in 0..count {
while !writer.try_push(&i) {
std::hint::spin_loop();
}
}
});
let consumer = thread::spawn(move || {
let mut received = Vec::with_capacity(count as usize);
while received.len() < count as usize {
if let Some(v) = reader.try_pop() {
received.push(v);
} else {
std::hint::spin_loop();
}
}
received
});
producer.join().unwrap();
let received = consumer.join().unwrap();
assert_eq!(received.len(), count as usize);
for (i, &v) in received.iter().enumerate() {
assert_eq!(v, i as u64);
}
}
#[test]
fn test_channel_lifecycle() {
let channel = RingBufferChannel::<i32>::new(4);
assert_eq!(channel.capacity(), 4);
let mut writer = channel.writer();
let mut reader = channel.reader();
writer.try_push(&42);
assert_eq!(reader.try_pop(), Some(42));
}
}