#![doc = include_str!("../README.md")]
use std::{
ptr::NonNull,
slice::{from_raw_parts, from_raw_parts_mut},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
pub struct Producer<T> {
buffer: Arc<DirectRingBuffer<T>>,
index: usize,
}
impl<T> Producer<T> {
pub fn available(&self) -> usize {
self.buffer.available_write()
}
pub fn write_slices(
&mut self,
mut f: impl FnMut(&mut [T], usize) -> usize,
max_size: Option<usize>,
) -> usize {
let available = self.available();
self.buffer.process_slices(
&mut self.index,
available,
|buf, len, process_offset| {
f(
unsafe { from_raw_parts_mut(buf, len) },
process_offset,
)
},
max_size,
|atomic, processed| {
atomic.fetch_add(processed, Ordering::Release);
},
)
}
#[deprecated(note = "Please use `write_slices` instead")]
pub fn write(
&mut self,
f: impl FnMut(&mut [T], usize) -> usize,
max_size: Option<usize>,
) -> usize {
self.write_slices(f, max_size)
}
pub fn write_element(&mut self, value: T) -> bool {
self.buffer.write_element(&mut self.index, value)
}
}
unsafe impl<T> Send for Producer<T> {}
pub struct Consumer<T> {
buffer: Arc<DirectRingBuffer<T>>,
index: usize,
}
impl<T> Consumer<T> {
pub fn available(&self) -> usize {
self.buffer.available_read()
}
pub fn read_slices(
&mut self,
mut f: impl FnMut(&[T], usize) -> usize,
max_size: Option<usize>,
) -> usize {
let available = self.available();
self.buffer.process_slices(
&mut self.index,
available,
|buf, len, process_offset| {
f(
unsafe { from_raw_parts(buf, len) },
process_offset,
)
},
max_size,
|atomic, processed| {
atomic.fetch_sub(processed, Ordering::Release);
},
)
}
#[deprecated(note = "Please use `read_slices` instead")]
pub fn read(&mut self, f: impl FnMut(&[T], usize) -> usize, max_size: Option<usize>) -> usize {
self.read_slices(f, max_size)
}
pub fn read_element(&mut self) -> Option<T>
where
T: Copy,
{
self.buffer.read_element(&mut self.index)
}
}
unsafe impl<T> Send for Consumer<T> {}
struct DirectRingBuffer<T> {
raw: *mut [T],
buf: NonNull<T>,
size: usize,
used: AtomicUsize,
}
impl<T> DirectRingBuffer<T> {
#[inline]
fn available_read(&self) -> usize {
self.used.load(Ordering::Acquire)
}
#[inline]
fn available_write(&self) -> usize {
self.size - self.used.load(Ordering::Acquire)
}
#[inline]
fn ptr_at(&self, count: usize) -> *mut T {
unsafe { self.buf.as_ptr().add(count) }
}
#[inline]
fn wraparound_index(&self, index: &mut usize, advance: usize) {
*index = if *index + advance >= self.size {
0
} else {
*index + advance
}
}
fn read_element(&self, index: &mut usize) -> Option<T>
where
T: Copy,
{
if self.available_read() == 0 {
None
} else {
let ret = Some(unsafe { self.ptr_at(*index).read() });
self.wraparound_index(index, 1);
self.used.fetch_sub(1, Ordering::Release);
ret
}
}
fn write_element(&self, index: &mut usize, value: T) -> bool {
if self.available_write() == 0 {
false
} else {
unsafe { self.ptr_at(*index).write(value) };
self.wraparound_index(index, 1);
self.used.fetch_add(1, Ordering::Release);
true
}
}
fn process_slices(
&self,
index: &mut usize,
available: usize,
mut f: impl FnMut(*mut T, usize, usize) -> usize,
max_size: Option<usize>,
update_used: impl FnOnce(&AtomicUsize, usize),
) -> usize {
let mut total_processed = 0;
let max_size = max_size.unwrap_or(available).min(available);
while total_processed < max_size {
let part_start = *index;
let part_len = (self.size - part_start).min(max_size - total_processed);
let processed = f(self.ptr_at(part_start), part_len, total_processed);
total_processed += processed;
self.wraparound_index(index, processed);
if processed < part_len {
break;
}
}
update_used(&self.used, total_processed);
total_processed
}
}
impl<T> Drop for DirectRingBuffer<T> {
fn drop(&mut self) {
unsafe {
drop(Box::from_raw(self.raw));
}
}
}
pub fn create_ring_buffer<T: Default>(size: usize) -> (Producer<T>, Consumer<T>) {
let raw = {
let mut vec = Vec::<T>::with_capacity(size);
vec.resize_with(size, T::default);
Box::into_raw(vec.into_boxed_slice())
};
let buffer = Arc::new(DirectRingBuffer {
raw,
buf: unsafe { NonNull::new_unchecked(raw as *mut T) },
size,
used: AtomicUsize::new(0),
});
(
Producer {
buffer: Arc::clone(&buffer),
index: 0,
},
Consumer { buffer, index: 0 },
)
}