use super::core::RingBufCore;
use crate::shim::atomic::Ordering;
use core::fmt;
pub struct Iter<'a, T> {
first: core::slice::Iter<'a, T>,
second: core::slice::Iter<'a, T>,
}
impl<'a, T> Iterator for Iter<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
self.first.next().or_else(|| self.second.next())
}
fn size_hint(&self) -> (usize, Option<usize>) {
let (first_low, first_high) = self.first.size_hint();
let (second_low, second_high) = self.second.size_hint();
let low = first_low + second_low;
let high = first_high.and_then(|fh| second_high.map(|sh| fh + sh));
(low, high)
}
}
impl<'a, T> ExactSizeIterator for Iter<'a, T> {
fn len(&self) -> usize {
self.first.len() + self.second.len()
}
}
impl<'a, T> DoubleEndedIterator for Iter<'a, T> {
fn next_back(&mut self) -> Option<Self::Item> {
self.second.next_back().or_else(|| self.first.next_back())
}
}
pub struct IterMut<'a, T> {
first: core::slice::IterMut<'a, T>,
second: core::slice::IterMut<'a, T>,
}
impl<'a, T> Iterator for IterMut<'a, T> {
type Item = &'a mut T;
fn next(&mut self) -> Option<Self::Item> {
self.first.next().or_else(|| self.second.next())
}
fn size_hint(&self) -> (usize, Option<usize>) {
let (first_low, first_high) = self.first.size_hint();
let (second_low, second_high) = self.second.size_hint();
let low = first_low + second_low;
let high = first_high.and_then(|fh| second_high.map(|sh| fh + sh));
(low, high)
}
}
impl<'a, T> ExactSizeIterator for IterMut<'a, T> {
fn len(&self) -> usize {
self.first.len() + self.second.len()
}
}
impl<'a, T> DoubleEndedIterator for IterMut<'a, T> {
fn next_back(&mut self) -> Option<Self::Item> {
self.second.next_back().or_else(|| self.first.next_back())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RingBufError<T> {
Full(T),
Empty,
}
pub struct RingBuf<T, const N: usize, const OVERWRITE: bool = true> {
core: RingBufCore<T, N>,
}
pub trait PushDispatch<T, const N: usize, const OVERWRITE: bool> {
type PushOutput;
fn push_impl(ringbuf: &mut RingBuf<T, N, OVERWRITE>, value: T) -> Self::PushOutput;
}
pub struct PushMarker<const OVERWRITE: bool>;
impl<T, const N: usize> PushDispatch<T, N, true> for PushMarker<true> {
type PushOutput = Option<T>;
#[inline]
fn push_impl(ringbuf: &mut RingBuf<T, N, true>, value: T) -> Self::PushOutput {
let write = ringbuf.core.write_idx().fetch_add(1, Ordering::Relaxed);
let index = write & ringbuf.core.mask();
let read = ringbuf.core.read_idx().load(Ordering::Acquire);
if write.wrapping_sub(read) >= ringbuf.core.capacity() {
ringbuf
.core
.read_idx()
.compare_exchange(
read,
read.wrapping_add(1),
Ordering::Release,
Ordering::Relaxed,
)
.ok();
unsafe { Some(ringbuf.core.replace_at(index, value)) }
} else {
unsafe {
ringbuf.core.write_at(index, value);
}
None
}
}
}
impl<T, const N: usize> PushDispatch<T, N, false> for PushMarker<false> {
type PushOutput = Result<(), RingBufError<T>>;
#[inline]
fn push_impl(ringbuf: &mut RingBuf<T, N, false>, value: T) -> Self::PushOutput {
let write = ringbuf.core.write_idx().fetch_add(1, Ordering::Relaxed);
let read = ringbuf.core.read_idx().load(Ordering::Acquire);
if write.wrapping_sub(read) >= ringbuf.core.capacity() {
ringbuf.core.write_idx().fetch_sub(1, Ordering::Relaxed);
Err(RingBufError::Full(value))
} else {
let index = write & ringbuf.core.mask();
unsafe {
ringbuf.core.write_at(index, value);
}
Ok(())
}
}
}
impl<T, const N: usize, const OVERWRITE: bool> RingBuf<T, N, OVERWRITE> {
pub fn new(capacity: usize) -> Self {
let core = RingBufCore::new(capacity);
Self { core }
}
#[inline]
pub fn capacity(&self) -> usize {
self.core.capacity()
}
#[inline]
pub fn len(&self) -> usize {
self.core.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.core.is_empty()
}
#[inline]
pub fn is_full(&self) -> bool {
self.core.is_full()
}
#[inline]
pub fn push(
&mut self,
value: T,
) -> <PushMarker<OVERWRITE> as PushDispatch<T, N, OVERWRITE>>::PushOutput
where
PushMarker<OVERWRITE>: PushDispatch<T, N, OVERWRITE>,
{
PushMarker::<OVERWRITE>::push_impl(self, value)
}
pub fn pop(&mut self) -> Result<T, RingBufError<T>> {
let read = self.core.read_idx().load(Ordering::Relaxed);
let write = self.core.write_idx().load(Ordering::Acquire);
if read == write {
return Err(RingBufError::Empty);
}
let index = read & self.core.mask();
let value = unsafe { self.core.read_at(index) };
self.core
.read_idx()
.store(read.wrapping_add(1), Ordering::Release);
Ok(value)
}
#[inline]
pub fn peek(&self) -> Option<&T> {
let read = self.core.read_idx().load(Ordering::Relaxed);
let write = self.core.write_idx().load(Ordering::Acquire);
if read == write {
return None;
}
let index = read & self.core.mask();
unsafe { Some(self.core.peek_at(index)) }
}
pub fn clear(&mut self) {
while self.pop().is_ok() {
}
}
}
impl<T, const N: usize, const OVERWRITE: bool> RingBuf<T, N, OVERWRITE> {
pub fn as_slices(&self) -> (&[T], &[T]) {
let read = self.core.read_idx().load(Ordering::Acquire);
let write = self.core.write_idx().load(Ordering::Acquire);
let len = write.wrapping_sub(read).min(self.core.capacity());
if len == 0 {
return (&[], &[]);
}
let read_idx = read & self.core.mask();
let write_idx = write & self.core.mask();
unsafe {
let buffer_ptr = self.core.buffer_ptr();
if read_idx < write_idx || len == self.core.capacity() {
if len == self.core.capacity() {
let first_len = self.core.capacity() - read_idx;
let first = core::slice::from_raw_parts(buffer_ptr.add(read_idx), first_len);
let second = if read_idx > 0 {
core::slice::from_raw_parts(buffer_ptr, read_idx)
} else {
&[]
};
(first, second)
} else {
let slice = core::slice::from_raw_parts(buffer_ptr.add(read_idx), len);
(slice, &[])
}
} else {
let first_len = self.core.capacity() - read_idx;
let second_len = len - first_len;
let first = core::slice::from_raw_parts(buffer_ptr.add(read_idx), first_len);
let second = core::slice::from_raw_parts(buffer_ptr, second_len);
(first, second)
}
}
}
#[inline]
pub fn iter(&self) -> Iter<'_, T> {
let (first, second) = self.as_slices();
Iter {
first: first.iter(),
second: second.iter(),
}
}
pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T]) {
let read = self.core.read_idx().load(Ordering::Acquire);
let write = self.core.write_idx().load(Ordering::Acquire);
let len = write.wrapping_sub(read).min(self.core.capacity());
if len == 0 {
return (&mut [], &mut []);
}
let read_idx = read & self.core.mask();
let write_idx = write & self.core.mask();
unsafe {
let buffer_ptr = self.core.buffer_ptr() as *mut T;
if read_idx < write_idx || len == self.core.capacity() {
if len == self.core.capacity() {
let first_len = self.core.capacity() - read_idx;
let first =
core::slice::from_raw_parts_mut(buffer_ptr.add(read_idx), first_len);
let second = if read_idx > 0 {
core::slice::from_raw_parts_mut(buffer_ptr, read_idx)
} else {
&mut []
};
(first, second)
} else {
let slice = core::slice::from_raw_parts_mut(buffer_ptr.add(read_idx), len);
(slice, &mut [])
}
} else {
let first_len = self.core.capacity() - read_idx;
let second_len = len - first_len;
let first = core::slice::from_raw_parts_mut(buffer_ptr.add(read_idx), first_len);
let second = core::slice::from_raw_parts_mut(buffer_ptr, second_len);
(first, second)
}
}
}
#[inline]
pub fn iter_mut(&mut self) -> IterMut<'_, T> {
let (first, second) = self.as_mut_slices();
IterMut {
first: first.iter_mut(),
second: second.iter_mut(),
}
}
}
impl<T: Clone, const N: usize, const OVERWRITE: bool> Clone for RingBuf<T, N, OVERWRITE> {
fn clone(&self) -> Self {
Self {
core: self.core.clone(),
}
}
}
impl<T: Copy, const N: usize, const OVERWRITE: bool> RingBuf<T, N, OVERWRITE> {
pub fn push_slice(&mut self, values: &[T]) -> usize {
if values.is_empty() {
return 0;
}
let write = self
.core
.write_idx()
.fetch_add(values.len(), Ordering::Relaxed);
let read = self.core.read_idx().load(Ordering::Acquire);
let (to_push, value_offset, overwrite_count) = if OVERWRITE {
if values.len() >= self.core.capacity() {
let to_push = self.core.capacity();
let value_offset = values.len() - to_push;
let old_read = read;
self.core.read_idx().store(
write.wrapping_add(values.len()).wrapping_sub(to_push),
Ordering::Release,
);
let overwrite_count = (write.wrapping_sub(old_read)).min(self.core.capacity());
(to_push, value_offset, overwrite_count)
} else if write.wrapping_sub(read) + values.len() > self.core.capacity() {
let overflow = write.wrapping_sub(read) + values.len() - self.core.capacity();
self.core.read_idx().fetch_add(overflow, Ordering::Release);
(values.len(), 0, overflow)
} else {
(values.len(), 0, 0)
}
} else {
let available = self
.core
.capacity()
.saturating_sub(write.wrapping_sub(read));
let to_push = available.min(values.len());
if to_push < values.len() {
self.core
.write_idx()
.fetch_sub(values.len() - to_push, Ordering::Relaxed);
}
(to_push, 0, 0)
};
if to_push == 0 {
return 0;
}
if OVERWRITE && overwrite_count > 0 {
unsafe {
for i in 0..overwrite_count {
let idx = (write.wrapping_add(value_offset).wrapping_add(i)) & self.core.mask();
let ptr = self.core.buffer_ptr_at(idx) as *mut T;
core::ptr::drop_in_place(ptr);
}
}
}
unsafe {
self.core.copy_from_slice(
write.wrapping_add(value_offset),
&values[value_offset..],
to_push,
);
}
if OVERWRITE { values.len() } else { to_push }
}
pub fn pop_slice(&mut self, dest: &mut [T]) -> usize {
if dest.is_empty() {
return 0;
}
let read = self.core.read_idx().load(Ordering::Relaxed);
let write = self.core.write_idx().load(Ordering::Acquire);
let available = write.wrapping_sub(read).min(self.core.capacity());
let to_pop = available.min(dest.len());
if to_pop == 0 {
return 0;
}
unsafe {
self.core.copy_to_slice(read, dest, to_pop);
}
self.core.read_idx().fetch_add(to_pop, Ordering::Release);
to_pop
}
}
impl<T: fmt::Debug, const N: usize, const OVERWRITE: bool> fmt::Debug for RingBuf<T, N, OVERWRITE> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RingBuf")
.field("capacity", &self.core.capacity())
.field("len", &self.core.len())
.field("is_empty", &self.core.is_empty())
.field("is_full", &self.core.is_full())
.field("overwrite_mode", &OVERWRITE)
.finish()
}
}
unsafe impl<T: Send, const N: usize, const OVERWRITE: bool> Send for RingBuf<T, N, OVERWRITE> {}
unsafe impl<T: Send, const N: usize, const OVERWRITE: bool> Sync for RingBuf<T, N, OVERWRITE> {}
#[cfg(all(test, not(feature = "loom")))]
mod tests {
use super::*;
#[test]
fn test_capacity_rounding() {
let buf: RingBuf<i32, 32, true> = RingBuf::new(5);
assert_eq!(buf.capacity(), 8);
let buf: RingBuf<i32, 32, true> = RingBuf::new(8);
assert_eq!(buf.capacity(), 8);
let buf: RingBuf<i32, 32, true> = RingBuf::new(9);
assert_eq!(buf.capacity(), 16);
}
#[test]
fn test_basic_push_pop() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(4);
assert_eq!(buf.push(1), None);
assert_eq!(buf.push(2), None);
assert_eq!(buf.push(3), None);
assert_eq!(buf.len(), 3);
assert_eq!(buf.pop().unwrap(), 1);
assert_eq!(buf.pop().unwrap(), 2);
assert_eq!(buf.pop().unwrap(), 3);
assert!(buf.is_empty());
}
#[test]
fn test_overwrite_mode() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(4);
assert_eq!(buf.push(1), None);
assert_eq!(buf.push(2), None);
assert_eq!(buf.push(3), None);
assert_eq!(buf.push(4), None);
assert_eq!(buf.push(5), Some(1)); assert_eq!(buf.push(6), Some(2));
assert_eq!(buf.pop().unwrap(), 3);
assert_eq!(buf.pop().unwrap(), 4);
assert_eq!(buf.pop().unwrap(), 5);
assert_eq!(buf.pop().unwrap(), 6);
}
#[test]
fn test_non_overwrite_mode() {
let mut buf: RingBuf<i32, 32, false> = RingBuf::new(4);
buf.push(1).unwrap();
buf.push(2).unwrap();
buf.push(3).unwrap();
buf.push(4).unwrap();
assert!(matches!(buf.push(5), Err(RingBufError::Full(5))));
assert_eq!(buf.pop().unwrap(), 1);
buf.push(5).unwrap();
}
#[test]
fn test_empty_buffer() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(4);
assert!(buf.is_empty());
assert_eq!(buf.len(), 0);
assert!(matches!(buf.pop(), Err(RingBufError::Empty)));
}
#[test]
fn test_push_slice() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(8);
let data = [1, 2, 3, 4, 5];
let pushed = buf.push_slice(&data);
assert_eq!(pushed, 5);
assert_eq!(buf.len(), 5);
assert_eq!(buf.pop().unwrap(), 1);
assert_eq!(buf.pop().unwrap(), 2);
assert_eq!(buf.pop().unwrap(), 3);
}
#[test]
fn test_pop_slice() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(8);
buf.push(1);
buf.push(2);
buf.push(3);
buf.push(4);
let mut dest = [0i32; 3];
let popped = buf.pop_slice(&mut dest);
assert_eq!(popped, 3);
assert_eq!(dest, [1, 2, 3]);
assert_eq!(buf.len(), 1);
assert_eq!(buf.pop().unwrap(), 4);
}
#[test]
fn test_clear() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(8);
buf.push(1);
buf.push(2);
buf.push(3);
assert_eq!(buf.len(), 3);
buf.clear();
assert!(buf.is_empty());
}
#[test]
fn test_concurrent_access() {
use std::sync::{Arc, Mutex};
use std::thread;
let buf = Arc::new(Mutex::new(RingBuf::<u64, 128, true>::new(128)));
let mut handles = vec![];
for thread_id in 0..4 {
let buf_clone = Arc::clone(&buf);
let handle = thread::spawn(move || {
for i in 0..100 {
let value = (thread_id * 100 + i) as u64;
buf_clone.lock().unwrap().push(value);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(buf.lock().unwrap().len(), 128); }
#[test]
fn test_peek() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(4);
assert_eq!(buf.peek(), None);
buf.push(1);
buf.push(2);
buf.push(3);
assert_eq!(buf.peek(), Some(&1));
assert_eq!(buf.len(), 3);
assert_eq!(buf.pop().unwrap(), 1);
assert_eq!(buf.peek(), Some(&2));
assert_eq!(buf.len(), 2);
}
#[test]
fn test_is_full() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(4);
assert!(!buf.is_full());
buf.push(1);
buf.push(2);
assert!(!buf.is_full());
buf.push(3);
buf.push(4);
assert!(buf.is_full());
buf.pop().unwrap();
assert!(!buf.is_full());
}
#[test]
fn test_as_slices_contiguous() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(8);
let (first, second) = buf.as_slices();
assert_eq!(first, &[]);
assert_eq!(second, &[]);
buf.push(1);
buf.push(2);
buf.push(3);
let (first, second) = buf.as_slices();
assert_eq!(first, &[1, 2, 3]);
assert_eq!(second, &[]);
}
#[test]
fn test_as_slices_wrapped() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(4);
buf.push(1);
buf.push(2);
buf.push(3);
buf.push(4);
buf.pop().unwrap();
buf.pop().unwrap();
buf.push(5);
buf.push(6);
let (first, second) = buf.as_slices();
assert_eq!(first.len() + second.len(), 4);
let mut all_elements = Vec::new();
all_elements.extend_from_slice(first);
all_elements.extend_from_slice(second);
assert_eq!(all_elements, vec![3, 4, 5, 6]);
}
#[test]
fn test_iter() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(8);
assert_eq!(buf.iter().count(), 0);
buf.push(1);
buf.push(2);
buf.push(3);
buf.push(4);
let values: Vec<_> = buf.iter().copied().collect();
assert_eq!(values, vec![1, 2, 3, 4]);
let iter = buf.iter();
assert_eq!(iter.len(), 4);
}
#[test]
fn test_iter_double_ended() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(8);
buf.push(1);
buf.push(2);
buf.push(3);
buf.push(4);
let values: Vec<_> = buf.iter().rev().copied().collect();
assert_eq!(values, vec![4, 3, 2, 1]);
let mut iter = buf.iter();
assert_eq!(iter.next(), Some(&1));
assert_eq!(iter.next_back(), Some(&4));
assert_eq!(iter.next(), Some(&2));
assert_eq!(iter.next_back(), Some(&3));
assert_eq!(iter.next(), None);
}
#[test]
fn test_push_slice_overwrite() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(4);
buf.push(1);
buf.push(2);
buf.push(3);
buf.push(4);
let data = [5, 6, 7];
let pushed = buf.push_slice(&data);
assert_eq!(pushed, 3);
assert_eq!(buf.pop().unwrap(), 4);
assert_eq!(buf.pop().unwrap(), 5);
assert_eq!(buf.pop().unwrap(), 6);
assert_eq!(buf.pop().unwrap(), 7);
}
#[test]
fn test_push_slice_non_overwrite() {
let mut buf: RingBuf<i32, 32, false> = RingBuf::new(4);
buf.push(1).unwrap();
buf.push(2).unwrap();
let data = [3, 4, 5, 6];
let pushed = buf.push_slice(&data);
assert_eq!(pushed, 2);
assert_eq!(buf.len(), 4);
assert!(buf.is_full());
}
#[test]
fn test_empty_slice_operations() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(4);
let pushed = buf.push_slice(&[]);
assert_eq!(pushed, 0);
buf.push(1);
buf.push(2);
let mut dest = [];
let popped = buf.pop_slice(&mut dest);
assert_eq!(popped, 0);
assert_eq!(buf.len(), 2);
}
#[test]
fn test_as_mut_slices_contiguous() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(8);
let (first, second) = buf.as_mut_slices();
assert_eq!(first.len(), 0);
assert_eq!(second.len(), 0);
buf.push(1);
buf.push(2);
buf.push(3);
let (first, second) = buf.as_mut_slices();
assert_eq!(first, &[1, 2, 3]);
assert_eq!(second, &[]);
for x in first.iter_mut() {
*x *= 10;
}
assert_eq!(buf.pop().unwrap(), 10);
assert_eq!(buf.pop().unwrap(), 20);
assert_eq!(buf.pop().unwrap(), 30);
}
#[test]
fn test_as_mut_slices_wrapped() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(4);
buf.push(1);
buf.push(2);
buf.push(3);
buf.push(4);
buf.pop().unwrap();
buf.pop().unwrap();
buf.push(5);
buf.push(6);
let (first, second) = buf.as_mut_slices();
assert_eq!(first.len() + second.len(), 4);
for x in first.iter_mut() {
*x += 100;
}
for x in second.iter_mut() {
*x += 100;
}
assert_eq!(buf.pop().unwrap(), 103);
assert_eq!(buf.pop().unwrap(), 104);
assert_eq!(buf.pop().unwrap(), 105);
assert_eq!(buf.pop().unwrap(), 106);
}
#[test]
fn test_iter_mut() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(8);
buf.push(1);
buf.push(2);
buf.push(3);
buf.push(4);
for x in buf.iter_mut() {
*x *= 2;
}
let values: Vec<_> = buf.iter().copied().collect();
assert_eq!(values, vec![2, 4, 6, 8]);
let iter = buf.iter_mut();
assert_eq!(iter.len(), 4);
}
#[test]
fn test_iter_mut_double_ended() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(8);
buf.push(1);
buf.push(2);
buf.push(3);
buf.push(4);
for x in buf.iter_mut().rev() {
*x += 10;
}
let values: Vec<_> = buf.iter().copied().collect();
assert_eq!(values, vec![11, 12, 13, 14]);
let mut iter = buf.iter_mut();
if let Some(x) = iter.next() {
*x = 100;
}
if let Some(x) = iter.next_back() {
*x = 200;
}
let values: Vec<_> = buf.iter().copied().collect();
assert_eq!(values, vec![100, 12, 13, 200]);
}
#[test]
fn test_iter_mut_empty() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(4);
let mut iter = buf.iter_mut();
assert_eq!(iter.next(), None);
assert_eq!(iter.len(), 0);
}
#[test]
fn test_iter_mut_wrapped() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(4);
for i in 0..8 {
buf.push(i);
}
for x in buf.iter_mut() {
*x *= 10;
}
let values: Vec<i32> = buf.iter().copied().collect();
assert_eq!(values, vec![40, 50, 60, 70]);
}
#[test]
fn test_clone() {
let mut buf: RingBuf<i32, 32, true> = RingBuf::new(4);
buf.push(1);
buf.push(2);
buf.push(3);
let buf_clone = buf.clone();
assert_eq!(buf.len(), buf_clone.len());
assert_eq!(buf.capacity(), buf_clone.capacity());
let values: Vec<_> = buf_clone.iter().copied().collect();
assert_eq!(values, vec![1, 2, 3]);
buf.push(4);
assert_eq!(buf.len(), 4);
assert_eq!(buf_clone.len(), 3);
}
}