#![cfg_attr(docsrs, feature(doc_cfg))]
#[cfg(not(feature = "hybrid-array"))]
mod array;
#[cfg(not(feature = "hybrid-array"))]
pub use array::StackBuffer;
#[cfg(feature = "hybrid-array")]
mod hybrid_array;
#[cfg(feature = "hybrid-array")]
pub use hybrid_array::ArraySize;
#[cfg(feature = "hybrid-array")]
pub use hybrid_array::StackBuffer;
#[cfg(feature = "alloc")]
mod vec;
#[cfg(feature = "alloc")]
pub use vec::HeapBuffer;
#[cfg(feature = "buf-trait")]
use bytes::buf::UninitSlice;
#[cfg(feature = "buf-trait")]
use bytes::{Buf, BufMut};
#[cfg(feature = "zeroize")]
use zeroize::{Zeroize, ZeroizeOnDrop};
#[doc(hidden)]
mod sealed {
#[cfg(not(feature = "zeroize"))]
pub trait StorageBase: Send {}
#[cfg(feature = "zeroize")]
pub trait StorageBase: Send + zeroize::Zeroize {}
}
#[doc(hidden)]
pub trait Storage: sealed::StorageBase {
fn len(&self) -> usize;
fn as_slice(&self) -> &[u8];
fn as_mut_slice(&mut self) -> &mut [u8];
fn split_at(&self, offset: usize) -> (&[u8], &[u8]);
fn split_at_mut(&mut self, offset: usize) -> (&mut [u8], &mut [u8]);
}
pub struct CircularBuffer<S: Storage> {
bytes: S,
size: usize,
start: usize,
}
impl<S: Storage> CircularBuffer<S> {
fn _new_with_storage(bytes: S) -> Self {
Self {
bytes,
size: 0,
start: 0,
}
}
pub fn capacity(&self) -> usize {
self.bytes.len()
}
pub fn is_empty(&self) -> bool {
self.size == 0
}
pub fn remaining(&self) -> usize {
self.size
}
pub fn consume(&mut self, cnt: usize) {
assert!(cnt <= self.size, "attempt to consume beyond available data");
if cnt == 0 {
return;
}
let capacity = self.bytes.len();
debug_assert!(self.start < capacity, "start out-of-bounds");
self.start = add_mod(self.start, cnt, capacity);
self.size -= cnt;
}
pub fn reset(&mut self) {
self.size = 0;
self.start = 0;
}
pub fn is_full(&self) -> bool {
self.size == self.bytes.len()
}
pub fn remaining_mut(&self) -> usize {
self.bytes.len() - self.size
}
pub fn commit(&mut self, cnt: usize) {
assert!(
cnt <= self.remaining_mut(),
"attempt to advance beyond available space"
);
if cnt == 0 {
return;
}
self.size += cnt;
}
pub fn as_slices(&self) -> (&[u8], &[u8]) {
let capacity = self.bytes.len();
if capacity == 0 || self.is_empty() {
return (&[], &[]);
}
debug_assert!(self.start < capacity, "start out-of-bounds");
debug_assert!(self.size <= capacity, "size out-of-bounds");
let start = self.start;
let end = add_mod(self.start, self.size, capacity);
if start < end {
(&self.bytes.as_slice()[start..end], &[][..])
} else {
let (back, front) = self.bytes.split_at(start);
(front, &back[..end])
}
}
pub fn as_mut_slices(&mut self) -> (&mut [u8], &mut [u8]) {
let capacity = self.bytes.len();
if capacity == 0 || self.size == capacity {
return (&mut [][..], &mut [][..]);
}
debug_assert!(self.start < capacity, "start out-of-bounds");
debug_assert!(self.size <= capacity, "size out-of-bounds");
let write_start = add_mod(self.start, self.size, capacity);
let available = capacity - self.size;
let write_end = add_mod(write_start, available, capacity);
if write_start < write_end {
(
&mut self.bytes.as_mut_slice()[write_start..write_end],
&mut [][..],
)
} else {
let (back, front) = self.bytes.split_at_mut(write_start);
(front, &mut back[..write_end])
}
}
pub fn make_contiguous(&mut self) -> &[u8] {
let capacity = self.bytes.len();
if capacity == 0 || self.size == 0 {
return &[];
}
debug_assert!(self.start < capacity, "start out-of-bounds");
debug_assert!(self.size <= capacity, "size out-of-bounds");
let start = self.start;
let end = add_mod(self.start, self.size, capacity);
if start < end {
&self.bytes.as_slice()[start..end]
} else {
self.start = 0;
self.bytes.as_mut_slice().rotate_left(start);
&self.bytes.as_slice()[..self.size]
}
}
}
#[inline]
const fn add_mod(x: usize, y: usize, m: usize) -> usize {
debug_assert!(m > 0);
debug_assert!(x <= m);
debug_assert!(y <= m);
let (z, overflow) = x.overflowing_add(y);
(z + (overflow as usize) * (usize::MAX % m + 1)) % m
}
#[cfg(feature = "zeroize")]
impl<S: Storage> ZeroizeOnDrop for CircularBuffer<S> {}
#[cfg(feature = "zeroize")]
impl<S: Storage> Drop for CircularBuffer<S> {
fn drop(&mut self) {
self.bytes.zeroize()
}
}
#[cfg(feature = "zeroize")]
impl<S: Storage> Zeroize for CircularBuffer<S> {
fn zeroize(&mut self) {
self.bytes.zeroize();
self.reset();
}
}
#[cfg(feature = "buf-trait")]
impl<S: Storage> Buf for CircularBuffer<S> {
fn remaining(&self) -> usize {
self.remaining()
}
fn chunk(&self) -> &[u8] {
let (first, second) = self.as_slices();
if !first.is_empty() { first } else { second }
}
fn advance(&mut self, cnt: usize) {
self.consume(cnt);
}
}
#[cfg(feature = "buf-trait")]
unsafe impl<S: Storage> BufMut for CircularBuffer<S> {
fn remaining_mut(&self) -> usize {
self.remaining_mut()
}
unsafe fn advance_mut(&mut self, cnt: usize) {
self.commit(cnt)
}
fn chunk_mut(&mut self) -> &mut UninitSlice {
let (first, second) = self.as_mut_slices();
let slice = if !first.is_empty() { first } else { second };
UninitSlice::new(slice)
}
}
#[cfg(test)]
mod tests {
use crate::{CircularBuffer, HeapBuffer, StackBuffer};
use bytes::{Buf, BufMut};
use hybrid_array::sizes::{U2, U3, U4, U5, U8, U10, U64};
use std::cmp::min;
use std::ptr;
static ONE_MB: &'static [u8] = include_bytes!("../testdata/1mb.bin");
macro_rules! test_all_impls {
($test_name:ident, $test_body:expr) => {
#[test]
fn $test_name() {
{
let buf: HeapBuffer = HeapBuffer::new(10);
$test_body(buf);
}
{
let buf: StackBuffer<U10> = StackBuffer::new();
$test_body(buf);
}
}
};
}
macro_rules! test_all_impls_custom_size {
($test_name:ident, $size:expr, $stack_size:ty, $test_body:expr) => {
#[test]
fn $test_name() {
{
let buf: HeapBuffer = HeapBuffer::new($size);
$test_body(buf);
}
{
let buf: StackBuffer<$stack_size> = StackBuffer::new();
$test_body(buf);
}
}
};
}
test_all_impls!(test_empty_buffer, |buf: CircularBuffer<_>| {
assert!(buf.is_empty());
assert_eq!(buf.remaining(), 0);
assert_eq!(buf.remaining_mut(), buf.capacity());
assert!(!buf.is_full());
});
test_all_impls_custom_size!(test_full_buffer, 5, U5, |mut buf: CircularBuffer<_>| {
buf.commit(5);
assert!(buf.is_full());
assert_eq!(buf.remaining(), 5);
assert_eq!(buf.remaining_mut(), 0);
});
test_all_impls_custom_size!(test_consume_partial, 5, U5, |mut buf: CircularBuffer<_>| {
buf.commit(3);
buf.consume(2);
assert_eq!(buf.remaining(), 1);
assert_eq!(buf.remaining_mut(), 4);
});
#[test]
#[cfg(not(debug_assertions))]
#[should_panic]
fn test_consume_too_much() {
let mut buf = HeapBuffer::new(5);
buf.commit(3);
buf.consume(4);
}
#[test]
#[cfg(not(debug_assertions))]
#[should_panic]
fn test_zero_capacity_stack() {
let _buf: StackBuffer<hybrid_array::sizes::U0> = StackBuffer::new();
}
#[test]
#[cfg(not(debug_assertions))]
#[should_panic]
fn test_zero_capacity_heap() {
let _buf: HeapBuffer = HeapBuffer::new(0);
}
test_all_impls_custom_size!(test_wrap_around_read, 3, U3, |mut buf: CircularBuffer<
_,
>| {
buf.commit(3);
buf.consume(2);
buf.commit(2);
let (slice1, slice2) = buf.as_slices();
assert_eq!(slice1.len() + slice2.len(), 3);
assert!(!slice1.is_empty());
assert!(!slice2.is_empty());
});
test_all_impls_custom_size!(test_wrap_around_write, 3, U3, |mut buf: CircularBuffer<
_,
>| {
buf.commit(2);
buf.consume(2);
buf.commit(2);
let (slice1, slice2) = buf.as_slices();
assert_eq!(slice1.len() + slice2.len(), 2);
});
test_all_impls_custom_size!(test_reset, 5, U5, |mut buf: CircularBuffer<_>| {
buf.commit(3);
buf.consume(1);
buf.reset();
assert!(buf.is_empty());
assert_eq!(buf.start, 0);
assert_eq!(buf.remaining_mut(), buf.capacity());
});
test_all_impls_custom_size!(test_mut_slices_wrap, 4, U4, |mut buf: CircularBuffer<_>| {
buf.commit(4);
buf.consume(3);
buf.commit(2);
let (slice1, slice2) = buf.as_mut_slices();
assert_eq!(slice1.len() + slice2.len(), 1);
});
test_all_impls_custom_size!(
test_exact_capacity_usage,
2,
U2,
|mut buf: CircularBuffer<_>| {
buf.commit(2);
buf.consume(2);
buf.commit(2);
assert!(buf.is_full());
assert_eq!(buf.remaining(), 2);
}
);
#[test]
fn test_data_integrity_through_circular_buffer() {
let input_data: &[u8] = ONE_MB;
let mut circular_buffer = HeapBuffer::new(64 * 1024);
let mut output = Vec::new();
let mut input_pos = 0;
while input_pos < input_data.len() {
let (first_mut, second_mut) = circular_buffer.as_mut_slices();
let mut written = 0;
if !first_mut.is_empty() {
let to_copy = std::cmp::min(first_mut.len(), input_data.len() - input_pos);
first_mut[..to_copy].copy_from_slice(&input_data[input_pos..input_pos + to_copy]);
written += to_copy;
input_pos += to_copy;
}
if !second_mut.is_empty() && input_pos < input_data.len() {
let to_copy = std::cmp::min(second_mut.len(), input_data.len() - input_pos);
second_mut[..to_copy].copy_from_slice(&input_data[input_pos..input_pos + to_copy]);
written += to_copy;
input_pos += to_copy;
}
circular_buffer.commit(written);
let (first, second) = circular_buffer.as_slices();
if !first.is_empty() {
output.extend_from_slice(first);
}
if !second.is_empty() {
output.extend_from_slice(second);
}
circular_buffer.consume(circular_buffer.remaining());
}
assert_eq!(input_data, output.as_slice(), "Data corruption detected!");
}
test_all_impls_custom_size!(
test_data_integrity_through_circular_buffer_buf_traits,
64,
U64,
|mut buf: CircularBuffer<_>| {
let input_data: &[u8] =
b"your_test_data_here_with_some_longer_content_to_test_wrapping";
let mut output = Vec::new();
let mut input_pos = 0;
while input_pos < input_data.len() {
while buf.remaining_mut() > 0 && input_pos < input_data.len() {
let chunk = buf.chunk_mut();
let to_copy = min(chunk.len(), input_data.len() - input_pos);
unsafe {
ptr::copy_nonoverlapping(
input_data[input_pos..].as_ptr(),
chunk.as_mut_ptr(),
to_copy,
);
buf.advance_mut(to_copy);
}
input_pos += to_copy;
}
while buf.remaining() > 0 {
let chunk = buf.chunk();
output.extend_from_slice(chunk);
let chunk_len = chunk.len();
buf.advance(chunk_len);
}
}
assert_eq!(input_data, output.as_slice(), "Data corruption detected!");
}
);
test_all_impls_custom_size!(
test_as_mut_slices_returns_writable_space,
8,
U8,
|mut buf: CircularBuffer<_>| {
let (first, second) = buf.as_mut_slices();
assert_eq!(
first.len() + second.len(),
8,
"Empty buffer should have full capacity writable"
);
first[0..3].copy_from_slice(b"abc");
buf.commit(3);
let (first, second) = buf.as_mut_slices();
assert_eq!(
first.len() + second.len(),
5,
"After writing 3 bytes, should have 5 writable"
);
let total_writable = first.len() + second.len();
if !first.is_empty() {
first.fill(b'x');
}
if !second.is_empty() {
second.fill(b'y');
}
buf.commit(total_writable);
let (first, second) = buf.as_mut_slices();
assert_eq!(
first.len() + second.len(),
0,
"Full buffer should have no writable space"
);
buf.consume(2);
let (first, second) = buf.as_mut_slices();
assert_eq!(
first.len() + second.len(),
2,
"After consuming 2 bytes, should have 2 writable"
);
}
);
test_all_impls_custom_size!(test_make_contiguous, 5, U5, |mut buf: CircularBuffer<_>| {
assert_eq!(buf.make_contiguous(), &[]);
let (first, _) = buf.as_mut_slices();
first[0..3].copy_from_slice(&[1, 2, 3]);
buf.commit(3);
assert_eq!(buf.make_contiguous(), &[1, 2, 3]);
let (first, _) = buf.as_mut_slices();
first.copy_from_slice(&[4, 5]);
buf.commit(2);
buf.consume(2);
let (first, second) = buf.as_mut_slices();
assert_eq!(first.len(), 2);
assert!(second.is_empty());
first.copy_from_slice(&[6, 7]);
buf.commit(2);
let (_, s2) = buf.as_slices();
assert!(
!s2.is_empty(),
"Test setup failed: Buffer should be wrapped"
);
let contiguous_slice = buf.make_contiguous();
assert_eq!(contiguous_slice, &[3, 4, 5, 6, 7]);
let (s1, s2) = buf.as_slices();
assert_eq!(s1, &[3, 4, 5, 6, 7]);
assert!(s2.is_empty());
});
#[test]
#[cfg(feature = "alloc")]
fn test_heap_resize_grow() {
let mut buf = HeapBuffer::new(5);
buf.commit(3);
buf.try_resize(10).expect("Resize should succeed");
assert_eq!(buf.capacity(), 10);
assert_eq!(buf.remaining(), 3);
assert_eq!(buf.remaining_mut(), 7);
let (s1, _) = buf.as_slices();
assert_eq!(s1.len(), 3);
buf.commit(7);
assert!(buf.is_full());
}
#[test]
#[cfg(feature = "alloc")]
fn test_heap_resize_shrink_success() {
let mut buf = HeapBuffer::new(10);
buf.commit(3);
buf.try_resize(5).expect("Resize should succeed");
assert_eq!(buf.capacity(), 5);
assert_eq!(buf.remaining(), 3);
assert_eq!(buf.remaining_mut(), 2);
}
#[test]
#[cfg(feature = "alloc")]
fn test_heap_resize_shrink_fail() {
let mut buf = HeapBuffer::new(10);
buf.commit(8);
let err = buf.try_resize(5).unwrap_err();
assert_eq!(err, 3, "Should report 3 bytes need to be consumed");
assert_eq!(buf.capacity(), 10);
assert_eq!(buf.remaining(), 8);
}
#[test]
#[cfg(feature = "alloc")]
fn test_heap_resize_shrink_wraparound() {
let mut buf = HeapBuffer::new(5);
buf.commit(5);
buf.consume(2);
buf.commit(1);
buf.try_resize(4).expect("Resize should succeed");
assert_eq!(buf.capacity(), 4);
assert!(buf.is_full());
let (s1, s2) = buf.as_slices();
assert_eq!(s1.len(), 4);
assert!(s2.is_empty());
}
#[test]
#[cfg(not(debug_assertions))]
#[should_panic]
#[cfg(feature = "alloc")]
fn test_heap_resize_zero_panics() {
let mut buf = HeapBuffer::new(10);
let _ = buf.try_resize(0);
}
}