#![allow(clippy::undocumented_unsafe_blocks)] #![allow(unsafe_code)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum RingBufferError {
#[error("Buffer is full, cannot write {0} samples")]
BufferFull(usize),
#[error("Buffer is empty, cannot read {0} samples")]
BufferEmpty(usize),
#[error("Invalid buffer size: must be power of 2, got {0}")]
InvalidSize(usize),
#[error("Requested {requested} samples but only {available} available")]
InsufficientData {
requested: usize,
available: usize,
},
}
pub struct LockFreeSPSC<T: Copy> {
buffer: Box<[T]>,
write_idx: AtomicUsize,
_pad1: [u8; 64 - std::mem::size_of::<AtomicUsize>()],
read_idx: AtomicUsize,
_pad2: [u8; 64 - std::mem::size_of::<AtomicUsize>()],
capacity_mask: usize,
}
pub struct SPSCProducer<T: Copy + Default> {
buffer: Arc<LockFreeSPSC<T>>,
}
pub struct SPSCConsumer<T: Copy + Default> {
buffer: Arc<LockFreeSPSC<T>>,
}
impl<T: Copy + Default> LockFreeSPSC<T> {
pub fn new(capacity: usize) -> Result<Arc<Self>, RingBufferError> {
if capacity == 0 {
return Err(RingBufferError::InvalidSize(0));
}
let capacity = capacity.next_power_of_two();
let buffer: Vec<T> = (0..capacity).map(|_| T::default()).collect();
Ok(Arc::new(Self {
buffer: buffer.into_boxed_slice(),
write_idx: AtomicUsize::new(0),
_pad1: [0; 64 - std::mem::size_of::<AtomicUsize>()],
read_idx: AtomicUsize::new(0),
_pad2: [0; 64 - std::mem::size_of::<AtomicUsize>()],
capacity_mask: capacity - 1,
}))
}
pub fn split(self: Arc<Self>) -> (SPSCProducer<T>, SPSCConsumer<T>) {
(
SPSCProducer {
buffer: Arc::clone(&self),
},
SPSCConsumer { buffer: self },
)
}
#[inline]
fn available_write(&self) -> usize {
let write = self.write_idx.load(Ordering::Relaxed);
let read = self.read_idx.load(Ordering::Acquire);
let used = (write.wrapping_sub(read)) & self.capacity_mask;
(self.capacity_mask + 1) - used - 1 }
#[inline]
fn available_read(&self) -> usize {
let write = self.write_idx.load(Ordering::Acquire);
let read = self.read_idx.load(Ordering::Relaxed);
(write.wrapping_sub(read)) & self.capacity_mask
}
}
impl<T: Copy + Default> SPSCProducer<T> {
pub fn write(&mut self, data: &[T]) -> Result<usize, RingBufferError> {
let available = self.buffer.available_write();
if data.len() > available {
return Err(RingBufferError::BufferFull(data.len()));
}
let write_idx = self.buffer.write_idx.load(Ordering::Relaxed);
let capacity = self.buffer.capacity_mask + 1;
let first_chunk_size = (capacity - write_idx).min(data.len());
let second_chunk_size = data.len() - first_chunk_size;
unsafe {
let dst = self.buffer.buffer.as_ptr().add(write_idx) as *mut T;
std::ptr::copy_nonoverlapping(data.as_ptr(), dst, first_chunk_size);
}
if second_chunk_size > 0 {
unsafe {
let dst = self.buffer.buffer.as_ptr() as *mut T;
std::ptr::copy_nonoverlapping(
data.as_ptr().add(first_chunk_size),
dst,
second_chunk_size,
);
}
}
let new_write_idx = (write_idx + data.len()) & self.buffer.capacity_mask;
self.buffer
.write_idx
.store(new_write_idx, Ordering::Release);
Ok(data.len())
}
pub fn try_write(&mut self, data: &[T]) -> usize {
let available = self.buffer.available_write();
let to_write = data.len().min(available);
if to_write == 0 {
return 0;
}
self.write(&data[..to_write]).unwrap_or(0)
}
pub fn available(&self) -> usize {
self.buffer.available_write()
}
pub fn is_full(&self) -> bool {
self.available() == 0
}
}
impl<T: Copy + Default> SPSCConsumer<T> {
pub fn read(&mut self, data: &mut [T]) -> Result<usize, RingBufferError> {
let available = self.buffer.available_read();
if data.len() > available {
return Err(RingBufferError::InsufficientData {
requested: data.len(),
available,
});
}
let read_idx = self.buffer.read_idx.load(Ordering::Relaxed);
let capacity = self.buffer.capacity_mask + 1;
let first_chunk_size = (capacity - read_idx).min(data.len());
let second_chunk_size = data.len() - first_chunk_size;
unsafe {
let src = self.buffer.buffer.as_ptr().add(read_idx);
std::ptr::copy_nonoverlapping(src, data.as_mut_ptr(), first_chunk_size);
}
if second_chunk_size > 0 {
unsafe {
let src = self.buffer.buffer.as_ptr();
std::ptr::copy_nonoverlapping(
src,
data.as_mut_ptr().add(first_chunk_size),
second_chunk_size,
);
}
}
let new_read_idx = (read_idx + data.len()) & self.buffer.capacity_mask;
self.buffer.read_idx.store(new_read_idx, Ordering::Release);
Ok(data.len())
}
pub fn try_read(&mut self, data: &mut [T]) -> usize {
let available = self.buffer.available_read();
let to_read = data.len().min(available);
if to_read == 0 {
return 0;
}
self.read(&mut data[..to_read]).unwrap_or(0)
}
pub fn available(&self) -> usize {
self.buffer.available_read()
}
pub fn is_empty(&self) -> bool {
self.available() == 0
}
}
pub struct LockFreeMPSC<T: Copy> {
buffer: Box<[T]>,
write_idx: AtomicUsize,
_pad1: [u8; 64 - std::mem::size_of::<AtomicUsize>()],
read_idx: AtomicUsize,
_pad2: [u8; 64 - std::mem::size_of::<AtomicUsize>()],
capacity_mask: usize,
}
impl<T: Copy + Default> LockFreeMPSC<T> {
pub fn new(capacity: usize) -> Result<Arc<Self>, RingBufferError> {
if capacity == 0 {
return Err(RingBufferError::InvalidSize(0));
}
let capacity = capacity.next_power_of_two();
let buffer: Vec<T> = (0..capacity).map(|_| T::default()).collect();
Ok(Arc::new(Self {
buffer: buffer.into_boxed_slice(),
write_idx: AtomicUsize::new(0),
_pad1: [0; 64 - std::mem::size_of::<AtomicUsize>()],
read_idx: AtomicUsize::new(0),
_pad2: [0; 64 - std::mem::size_of::<AtomicUsize>()],
capacity_mask: capacity - 1,
}))
}
pub fn write(&self, data: &[T]) -> Result<usize, RingBufferError> {
loop {
let write_idx = self.write_idx.load(Ordering::Acquire);
let read_idx = self.read_idx.load(Ordering::Acquire);
let used = (write_idx.wrapping_sub(read_idx)) & self.capacity_mask;
let available = (self.capacity_mask + 1) - used - 1;
if data.len() > available {
return Err(RingBufferError::BufferFull(data.len()));
}
let new_write_idx = (write_idx + data.len()) & self.capacity_mask;
if self
.write_idx
.compare_exchange_weak(
write_idx,
new_write_idx,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
{
let capacity = self.capacity_mask + 1;
let first_chunk = (capacity - write_idx).min(data.len());
let second_chunk = data.len() - first_chunk;
unsafe {
let dst = self.buffer.as_ptr().add(write_idx) as *mut T;
std::ptr::copy_nonoverlapping(data.as_ptr(), dst, first_chunk);
if second_chunk > 0 {
let dst = self.buffer.as_ptr() as *mut T;
std::ptr::copy_nonoverlapping(
data.as_ptr().add(first_chunk),
dst,
second_chunk,
);
}
}
return Ok(data.len());
}
}
}
pub fn read(&self, data: &mut [T]) -> Result<usize, RingBufferError> {
let write_idx = self.write_idx.load(Ordering::Acquire);
let read_idx = self.read_idx.load(Ordering::Relaxed);
let available = (write_idx.wrapping_sub(read_idx)) & self.capacity_mask;
if data.len() > available {
return Err(RingBufferError::InsufficientData {
requested: data.len(),
available,
});
}
let capacity = self.capacity_mask + 1;
let first_chunk = (capacity - read_idx).min(data.len());
let second_chunk = data.len() - first_chunk;
unsafe {
let src = self.buffer.as_ptr().add(read_idx);
std::ptr::copy_nonoverlapping(src, data.as_mut_ptr(), first_chunk);
if second_chunk > 0 {
let src = self.buffer.as_ptr();
std::ptr::copy_nonoverlapping(
src,
data.as_mut_ptr().add(first_chunk),
second_chunk,
);
}
}
let new_read_idx = (read_idx + data.len()) & self.capacity_mask;
self.read_idx.store(new_read_idx, Ordering::Release);
Ok(data.len())
}
}
unsafe impl<T: Copy + Send> Send for LockFreeSPSC<T> {}
unsafe impl<T: Copy + Send> Sync for LockFreeSPSC<T> {}
unsafe impl<T: Copy + Send> Send for LockFreeMPSC<T> {}
unsafe impl<T: Copy + Send> Sync for LockFreeMPSC<T> {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_spsc_basic() {
let buffer = LockFreeSPSC::<f32>::new(16).expect("Failed to create buffer");
let (mut producer, mut consumer) = buffer.split();
let data = vec![1.0, 2.0, 3.0, 4.0];
assert_eq!(producer.write(&data).expect("Write should succeed"), 4);
let mut output = vec![0.0; 4];
assert_eq!(consumer.read(&mut output).expect("Read should succeed"), 4);
assert_eq!(output, data);
}
#[test]
fn test_spsc_wraparound() {
let buffer = LockFreeSPSC::<f32>::new(8).expect("Failed to create buffer");
let (mut producer, mut consumer) = buffer.split();
let data1 = vec![1.0, 2.0, 3.0, 4.0, 5.0];
producer.write(&data1).expect("Write should succeed");
let mut out1 = vec![0.0; 3];
consumer.read(&mut out1).expect("Read should succeed");
let data2 = vec![6.0, 7.0, 8.0];
producer.write(&data2).expect("Write should succeed");
let mut out2 = vec![0.0; 5];
consumer.read(&mut out2).expect("Read should succeed");
assert_eq!(&out1, &[1.0, 2.0, 3.0]);
assert_eq!(&out2, &[4.0, 5.0, 6.0, 7.0, 8.0]);
}
#[test]
fn test_spsc_full() {
let buffer = LockFreeSPSC::<f32>::new(4).expect("Failed to create buffer");
let (mut producer, _consumer) = buffer.split();
let data = vec![1.0, 2.0, 3.0];
producer.write(&data).expect("Write should succeed");
let more_data = vec![4.0];
assert!(producer.write(&more_data).is_err());
}
#[test]
fn test_spsc_empty() {
let buffer = LockFreeSPSC::<f32>::new(16).expect("Failed to create buffer");
let (_producer, mut consumer) = buffer.split();
let mut output = vec![0.0; 4];
assert!(consumer.read(&mut output).is_err());
}
#[test]
fn test_mpsc_basic() {
let buffer = LockFreeMPSC::<f32>::new(16).expect("Failed to create buffer");
let data1 = vec![1.0, 2.0];
let data2 = vec![3.0, 4.0];
buffer.write(&data1).expect("Write should succeed");
buffer.write(&data2).expect("Write should succeed");
let mut output = vec![0.0; 4];
buffer.read(&mut output).expect("Read should succeed");
assert_eq!(output, vec![1.0, 2.0, 3.0, 4.0]);
}
}