1#![cfg_attr(docsrs, feature(doc_cfg))]
2#[cfg(not(feature = "hybrid-array"))]
46mod array;
47#[cfg(not(feature = "hybrid-array"))]
48pub use array::StackBuffer;
49
50#[cfg(feature = "hybrid-array")]
51mod hybrid_array;
52#[cfg(feature = "hybrid-array")]
53pub use hybrid_array::ArraySize;
54#[cfg(feature = "hybrid-array")]
55pub use hybrid_array::StackBuffer;
56
57#[cfg(feature = "alloc")]
58mod vec;
59#[cfg(feature = "alloc")]
60pub use vec::HeapBuffer;
61
62#[cfg(feature = "buf-trait")]
63use bytes::buf::UninitSlice;
64#[cfg(feature = "buf-trait")]
65use bytes::{Buf, BufMut};
66
67#[cfg(feature = "zeroize")]
68use zeroize::{Zeroize, ZeroizeOnDrop};
69
70#[doc(hidden)]
71mod sealed {
72 #[cfg(not(feature = "zeroize"))]
73 pub trait StorageBase: Send {}
74 #[cfg(feature = "zeroize")]
75 pub trait StorageBase: Send + zeroize::Zeroize {}
76}
77
78#[doc(hidden)]
79pub trait Storage: sealed::StorageBase {
83 fn len(&self) -> usize;
84 fn as_slice(&self) -> &[u8];
85 fn as_mut_slice(&mut self) -> &mut [u8];
86 fn split_at(&self, offset: usize) -> (&[u8], &[u8]);
87 fn split_at_mut(&mut self, offset: usize) -> (&mut [u8], &mut [u8]);
88}
89
90pub struct CircularBuffer<S: Storage> {
94 bytes: S,
95 size: usize,
96 start: usize,
97}
98
99impl<S: Storage> CircularBuffer<S> {
100 fn _new_with_storage(bytes: S) -> Self {
101 Self {
102 bytes,
103 size: 0,
104 start: 0,
105 }
106 }
107
108 pub fn capacity(&self) -> usize {
112 self.bytes.len()
113 }
114
115 pub fn is_empty(&self) -> bool {
117 self.size == 0
118 }
119
120 pub fn remaining(&self) -> usize {
122 self.size
123 }
124
125 pub fn consume(&mut self, cnt: usize) {
142 assert!(cnt <= self.size, "attempt to consume beyond available data");
143 if cnt == 0 {
144 return;
145 }
146 let capacity = self.bytes.len();
147 debug_assert!(self.start < capacity, "start out-of-bounds");
148 self.start = add_mod(self.start, cnt, capacity);
149 self.size -= cnt;
150 }
151
152 pub fn reset(&mut self) {
157 self.size = 0;
158 self.start = 0;
159 }
160
161 pub fn is_full(&self) -> bool {
163 self.size == self.bytes.len()
164 }
165
166 pub fn remaining_mut(&self) -> usize {
168 self.bytes.len() - self.size
169 }
170
171 pub fn commit(&mut self, cnt: usize) {
179 assert!(
180 cnt <= self.remaining_mut(),
181 "attempt to advance beyond available space"
182 );
183 if cnt == 0 {
184 return;
185 }
186 self.size += cnt;
187 }
188
189 pub fn as_slices(&self) -> (&[u8], &[u8]) {
214 let capacity = self.bytes.len();
215
216 if capacity == 0 || self.is_empty() {
217 return (&[], &[]);
218 }
219
220 debug_assert!(self.start < capacity, "start out-of-bounds");
221 debug_assert!(self.size <= capacity, "size out-of-bounds");
222
223 let start = self.start;
224 let end = add_mod(self.start, self.size, capacity);
225
226 if start < end {
227 (&self.bytes.as_slice()[start..end], &[][..])
228 } else {
229 let (back, front) = self.bytes.split_at(start);
230 (front, &back[..end])
231 }
232 }
233
234 pub fn as_mut_slices(&mut self) -> (&mut [u8], &mut [u8]) {
241 let capacity = self.bytes.len();
242
243 if capacity == 0 || self.size == capacity {
244 return (&mut [][..], &mut [][..]);
245 }
246
247 debug_assert!(self.start < capacity, "start out-of-bounds");
248 debug_assert!(self.size <= capacity, "size out-of-bounds");
249
250 let write_start = add_mod(self.start, self.size, capacity);
251 let available = capacity - self.size;
252 let write_end = add_mod(write_start, available, capacity);
253
254 if write_start < write_end {
255 (
256 &mut self.bytes.as_mut_slice()[write_start..write_end],
257 &mut [][..],
258 )
259 } else {
260 let (back, front) = self.bytes.split_at_mut(write_start);
261 (front, &mut back[..write_end])
262 }
263 }
264
265 pub fn make_contiguous(&mut self) -> &[u8] {
274 let capacity = self.bytes.len();
275
276 if capacity == 0 || self.size == 0 {
277 return &[];
278 }
279
280 debug_assert!(self.start < capacity, "start out-of-bounds");
281 debug_assert!(self.size <= capacity, "size out-of-bounds");
282
283 let start = self.start;
284 let end = add_mod(self.start, self.size, capacity);
285
286 if start < end {
287 &self.bytes.as_slice()[start..end]
289 } else {
290 self.start = 0;
292 self.bytes.as_mut_slice().rotate_left(start);
293 &self.bytes.as_slice()[..self.size]
294 }
295 }
296}
297
298#[inline]
302const fn add_mod(x: usize, y: usize, m: usize) -> usize {
303 debug_assert!(m > 0);
304 debug_assert!(x <= m);
305 debug_assert!(y <= m);
306 let (z, overflow) = x.overflowing_add(y);
307 (z + (overflow as usize) * (usize::MAX % m + 1)) % m
308}
309
310#[cfg(feature = "zeroize")]
311impl<S: Storage> ZeroizeOnDrop for CircularBuffer<S> {}
312
313#[cfg(feature = "zeroize")]
314impl<S: Storage> Drop for CircularBuffer<S> {
315 fn drop(&mut self) {
316 self.bytes.zeroize()
317 }
318}
319
320#[cfg(feature = "zeroize")]
321impl<S: Storage> Zeroize for CircularBuffer<S> {
322 fn zeroize(&mut self) {
324 self.bytes.zeroize();
325 self.reset();
326 }
327}
328
329#[cfg(feature = "buf-trait")]
330impl<S: Storage> Buf for CircularBuffer<S> {
331 fn remaining(&self) -> usize {
332 self.remaining()
333 }
334
335 fn chunk(&self) -> &[u8] {
336 let (first, second) = self.as_slices();
337 if !first.is_empty() { first } else { second }
338 }
339
340 fn advance(&mut self, cnt: usize) {
341 self.consume(cnt);
342 }
343}
344
345#[cfg(feature = "buf-trait")]
346unsafe impl<S: Storage> BufMut for CircularBuffer<S> {
347 fn remaining_mut(&self) -> usize {
348 self.remaining_mut()
349 }
350
351 unsafe fn advance_mut(&mut self, cnt: usize) {
352 self.commit(cnt)
353 }
354
355 fn chunk_mut(&mut self) -> &mut UninitSlice {
356 let (first, second) = self.as_mut_slices();
357 let slice = if !first.is_empty() { first } else { second };
358
359 UninitSlice::new(slice)
360 }
361}
362
363#[cfg(test)]
364mod tests {
365 use crate::{CircularBuffer, HeapBuffer, StackBuffer};
366 use bytes::{Buf, BufMut};
367 use hybrid_array::sizes::{U2, U3, U4, U5, U8, U10, U64};
368 use std::cmp::min;
369 use std::ptr;
370
371 static ONE_MB: &'static [u8] = include_bytes!("../testdata/1mb.bin");
372
373 macro_rules! test_all_impls {
374 ($test_name:ident, $test_body:expr) => {
375 #[test]
376 fn $test_name() {
377 {
379 let buf: HeapBuffer = HeapBuffer::new(10);
380 $test_body(buf);
381 }
382
383 {
385 let buf: StackBuffer<U10> = StackBuffer::new();
386 $test_body(buf);
387 }
388 }
389 };
390 }
391
392 macro_rules! test_all_impls_custom_size {
393 ($test_name:ident, $size:expr, $stack_size:ty, $test_body:expr) => {
394 #[test]
395 fn $test_name() {
396 {
398 let buf: HeapBuffer = HeapBuffer::new($size);
399 $test_body(buf);
400 }
401
402 {
404 let buf: StackBuffer<$stack_size> = StackBuffer::new();
405 $test_body(buf);
406 }
407 }
408 };
409 }
410
411 test_all_impls!(test_empty_buffer, |buf: CircularBuffer<_>| {
412 assert!(buf.is_empty());
413 assert_eq!(buf.remaining(), 0);
414 assert_eq!(buf.remaining_mut(), buf.capacity());
415 assert!(!buf.is_full());
416 });
417
418 test_all_impls_custom_size!(test_full_buffer, 5, U5, |mut buf: CircularBuffer<_>| {
419 buf.commit(5);
420 assert!(buf.is_full());
421 assert_eq!(buf.remaining(), 5);
422 assert_eq!(buf.remaining_mut(), 0);
423 });
424
425 test_all_impls_custom_size!(test_consume_partial, 5, U5, |mut buf: CircularBuffer<_>| {
426 buf.commit(3);
427 buf.consume(2);
428 assert_eq!(buf.remaining(), 1);
429 assert_eq!(buf.remaining_mut(), 4);
430 });
431
432 #[test]
433 #[cfg(not(debug_assertions))]
434 #[should_panic]
435 fn test_consume_too_much() {
436 let mut buf = HeapBuffer::new(5);
437 buf.commit(3);
438 buf.consume(4);
439 }
440
441 #[test]
442 #[cfg(not(debug_assertions))]
443 #[should_panic]
444 fn test_zero_capacity_stack() {
445 let _buf: StackBuffer<hybrid_array::sizes::U0> = StackBuffer::new();
446 }
447
448 #[test]
449 #[cfg(not(debug_assertions))]
450 #[should_panic]
451 fn test_zero_capacity_heap() {
452 let _buf: HeapBuffer = HeapBuffer::new(0);
453 }
454
455 test_all_impls_custom_size!(test_wrap_around_read, 3, U3, |mut buf: CircularBuffer<
456 _,
457 >| {
458 buf.commit(3);
459 buf.consume(2);
460 buf.commit(2);
461
462 let (slice1, slice2) = buf.as_slices();
463 assert_eq!(slice1.len() + slice2.len(), 3);
464 assert!(!slice1.is_empty());
465 assert!(!slice2.is_empty());
466 });
467
468 test_all_impls_custom_size!(test_wrap_around_write, 3, U3, |mut buf: CircularBuffer<
469 _,
470 >| {
471 buf.commit(2);
472 buf.consume(2);
473 buf.commit(2);
474
475 let (slice1, slice2) = buf.as_slices();
476 assert_eq!(slice1.len() + slice2.len(), 2);
477 });
478
479 test_all_impls_custom_size!(test_reset, 5, U5, |mut buf: CircularBuffer<_>| {
480 buf.commit(3);
481 buf.consume(1);
482 buf.reset();
483
484 assert!(buf.is_empty());
485 assert_eq!(buf.start, 0);
486 assert_eq!(buf.remaining_mut(), buf.capacity());
487 });
488
489 test_all_impls_custom_size!(test_mut_slices_wrap, 4, U4, |mut buf: CircularBuffer<_>| {
490 buf.commit(4);
491 buf.consume(3);
492 buf.commit(2);
493
494 let (slice1, slice2) = buf.as_mut_slices();
495 assert_eq!(slice1.len() + slice2.len(), 1);
496 });
497
498 test_all_impls_custom_size!(
499 test_exact_capacity_usage,
500 2,
501 U2,
502 |mut buf: CircularBuffer<_>| {
503 buf.commit(2);
504 buf.consume(2);
505 buf.commit(2);
506
507 assert!(buf.is_full());
508 assert_eq!(buf.remaining(), 2);
509 }
510 );
511
512 #[test]
513 fn test_data_integrity_through_circular_buffer() {
514 let input_data: &[u8] = ONE_MB;
515 let mut circular_buffer = HeapBuffer::new(64 * 1024);
516 let mut output = Vec::new();
517 let mut input_pos = 0;
518
519 while input_pos < input_data.len() {
520 let (first_mut, second_mut) = circular_buffer.as_mut_slices();
521 let mut written = 0;
522
523 if !first_mut.is_empty() {
524 let to_copy = std::cmp::min(first_mut.len(), input_data.len() - input_pos);
525 first_mut[..to_copy].copy_from_slice(&input_data[input_pos..input_pos + to_copy]);
526 written += to_copy;
527 input_pos += to_copy;
528 }
529
530 if !second_mut.is_empty() && input_pos < input_data.len() {
531 let to_copy = std::cmp::min(second_mut.len(), input_data.len() - input_pos);
532 second_mut[..to_copy].copy_from_slice(&input_data[input_pos..input_pos + to_copy]);
533 written += to_copy;
534 input_pos += to_copy;
535 }
536
537 circular_buffer.commit(written);
538
539 let (first, second) = circular_buffer.as_slices();
540 if !first.is_empty() {
541 output.extend_from_slice(first);
542 }
543 if !second.is_empty() {
544 output.extend_from_slice(second);
545 }
546
547 circular_buffer.consume(circular_buffer.remaining());
548 }
549
550 assert_eq!(input_data, output.as_slice(), "Data corruption detected!");
551 }
552
553 test_all_impls_custom_size!(
554 test_data_integrity_through_circular_buffer_buf_traits,
555 64,
556 U64,
557 |mut buf: CircularBuffer<_>| {
558 let input_data: &[u8] =
559 b"your_test_data_here_with_some_longer_content_to_test_wrapping";
560 let mut output = Vec::new();
561 let mut input_pos = 0;
562
563 while input_pos < input_data.len() {
564 while buf.remaining_mut() > 0 && input_pos < input_data.len() {
565 let chunk = buf.chunk_mut();
566 let to_copy = min(chunk.len(), input_data.len() - input_pos);
567
568 unsafe {
569 ptr::copy_nonoverlapping(
570 input_data[input_pos..].as_ptr(),
571 chunk.as_mut_ptr(),
572 to_copy,
573 );
574 buf.advance_mut(to_copy);
575 }
576 input_pos += to_copy;
577 }
578
579 while buf.remaining() > 0 {
580 let chunk = buf.chunk();
581 output.extend_from_slice(chunk);
582 let chunk_len = chunk.len();
583 buf.advance(chunk_len);
584 }
585 }
586
587 assert_eq!(input_data, output.as_slice(), "Data corruption detected!");
588 }
589 );
590
591 test_all_impls_custom_size!(
592 test_as_mut_slices_returns_writable_space,
593 8,
594 U8,
595 |mut buf: CircularBuffer<_>| {
596 let (first, second) = buf.as_mut_slices();
597 assert_eq!(
598 first.len() + second.len(),
599 8,
600 "Empty buffer should have full capacity writable"
601 );
602
603 first[0..3].copy_from_slice(b"abc");
604 buf.commit(3);
605
606 let (first, second) = buf.as_mut_slices();
607 assert_eq!(
608 first.len() + second.len(),
609 5,
610 "After writing 3 bytes, should have 5 writable"
611 );
612
613 let total_writable = first.len() + second.len();
614 if !first.is_empty() {
615 first.fill(b'x');
616 }
617 if !second.is_empty() {
618 second.fill(b'y');
619 }
620 buf.commit(total_writable);
621
622 let (first, second) = buf.as_mut_slices();
623 assert_eq!(
624 first.len() + second.len(),
625 0,
626 "Full buffer should have no writable space"
627 );
628
629 buf.consume(2);
630
631 let (first, second) = buf.as_mut_slices();
632 assert_eq!(
633 first.len() + second.len(),
634 2,
635 "After consuming 2 bytes, should have 2 writable"
636 );
637 }
638 );
639
640 test_all_impls_custom_size!(test_make_contiguous, 5, U5, |mut buf: CircularBuffer<_>| {
641 assert_eq!(buf.make_contiguous(), &[]);
643
644 let (first, _) = buf.as_mut_slices();
647 first[0..3].copy_from_slice(&[1, 2, 3]);
648 buf.commit(3);
649
650 assert_eq!(buf.make_contiguous(), &[1, 2, 3]);
651
652 let (first, _) = buf.as_mut_slices();
656 first.copy_from_slice(&[4, 5]);
659 buf.commit(2);
660
661 buf.consume(2);
665 let (first, second) = buf.as_mut_slices();
672 assert_eq!(first.len(), 2);
673 assert!(second.is_empty());
674
675 first.copy_from_slice(&[6, 7]);
676 buf.commit(2);
677
678 let (_, s2) = buf.as_slices();
684 assert!(
685 !s2.is_empty(),
686 "Test setup failed: Buffer should be wrapped"
687 );
688
689 let contiguous_slice = buf.make_contiguous();
691
692 assert_eq!(contiguous_slice, &[3, 4, 5, 6, 7]);
694
695 let (s1, s2) = buf.as_slices();
697 assert_eq!(s1, &[3, 4, 5, 6, 7]);
698 assert!(s2.is_empty());
699 });
700
701 #[test]
702 #[cfg(feature = "alloc")]
703 fn test_heap_resize_grow() {
704 let mut buf = HeapBuffer::new(5);
705 buf.commit(3); buf.try_resize(10).expect("Resize should succeed");
709
710 assert_eq!(buf.capacity(), 10);
711 assert_eq!(buf.remaining(), 3);
712 assert_eq!(buf.remaining_mut(), 7);
713
714 let (s1, _) = buf.as_slices();
716 assert_eq!(s1.len(), 3);
717
718 buf.commit(7);
720 assert!(buf.is_full());
721 }
722
723 #[test]
724 #[cfg(feature = "alloc")]
725 fn test_heap_resize_shrink_success() {
726 let mut buf = HeapBuffer::new(10);
727 buf.commit(3); buf.try_resize(5).expect("Resize should succeed");
732
733 assert_eq!(buf.capacity(), 5);
734 assert_eq!(buf.remaining(), 3);
735 assert_eq!(buf.remaining_mut(), 2);
736 }
737
738 #[test]
739 #[cfg(feature = "alloc")]
740 fn test_heap_resize_shrink_fail() {
741 let mut buf = HeapBuffer::new(10);
742 buf.commit(8); let err = buf.try_resize(5).unwrap_err();
748
749 assert_eq!(err, 3, "Should report 3 bytes need to be consumed");
750
751 assert_eq!(buf.capacity(), 10);
753 assert_eq!(buf.remaining(), 8);
754 }
755
756 #[test]
757 #[cfg(feature = "alloc")]
758 fn test_heap_resize_shrink_wraparound() {
759 let mut buf = HeapBuffer::new(5);
760
761 buf.commit(5);
764 buf.consume(2);
766 buf.commit(1);
768
769 buf.try_resize(4).expect("Resize should succeed");
773
774 assert_eq!(buf.capacity(), 4);
775 assert!(buf.is_full());
776
777 let (s1, s2) = buf.as_slices();
779 assert_eq!(s1.len(), 4);
780 assert!(s2.is_empty());
781 }
782
783 #[test]
784 #[cfg(not(debug_assertions))]
785 #[should_panic]
786 #[cfg(feature = "alloc")]
787 fn test_heap_resize_zero_panics() {
788 let mut buf = HeapBuffer::new(10);
789 let _ = buf.try_resize(0);
790 }
791}