1use core::cmp;
6use managed::ManagedSlice;
7
8use crate::storage::Resettable;
9use crate::{Error, Result};
10
11#[derive(Debug)]
26pub struct RingBuffer<'a, T: 'a> {
27 storage: ManagedSlice<'a, T>,
28 read_at: usize,
29 length: usize,
30}
31
32impl<'a, T: 'a> RingBuffer<'a, T> {
33 pub fn new<S>(storage: S) -> RingBuffer<'a, T>
37 where
38 S: Into<ManagedSlice<'a, T>>,
39 {
40 RingBuffer {
41 storage: storage.into(),
42 read_at: 0,
43 length: 0,
44 }
45 }
46
47 pub fn clear(&mut self) {
49 self.read_at = 0;
50 self.length = 0;
51 }
52
53 pub fn capacity(&self) -> usize {
55 self.storage.len()
56 }
57
58 pub fn reset(&mut self)
60 where
61 T: Resettable,
62 {
63 self.clear();
64 for elem in self.storage.iter_mut() {
65 elem.reset();
66 }
67 }
68
69 pub fn len(&self) -> usize {
71 self.length
72 }
73
74 pub fn window(&self) -> usize {
76 self.capacity() - self.len()
77 }
78
79 pub fn contiguous_window(&self) -> usize {
82 cmp::min(self.window(), self.capacity() - self.get_idx(self.length))
83 }
84
85 pub fn is_empty(&self) -> bool {
87 self.len() == 0
88 }
89
90 pub fn is_full(&self) -> bool {
92 self.window() == 0
93 }
94
95 fn get_idx(&self, idx: usize) -> usize {
98 let len = self.capacity();
99 if len > 0 {
100 (self.read_at + idx) % len
101 } else {
102 0
103 }
104 }
105
106 fn get_idx_unchecked(&self, idx: usize) -> usize {
109 (self.read_at + idx) % self.capacity()
110 }
111}
112
113impl<'a, T: 'a> RingBuffer<'a, T> {
116 pub fn enqueue_one_with<'b, R, F>(&'b mut self, f: F) -> Result<R>
119 where
120 F: FnOnce(&'b mut T) -> Result<R>,
121 {
122 if self.is_full() {
123 return Err(Error::Exhausted);
124 }
125
126 let index = self.get_idx_unchecked(self.length);
127 match f(&mut self.storage[index]) {
128 Ok(result) => {
129 self.length += 1;
130 Ok(result)
131 }
132 Err(error) => Err(error),
133 }
134 }
135
136 pub fn enqueue_one(&mut self) -> Result<&mut T> {
141 self.enqueue_one_with(Ok)
142 }
143
144 pub fn dequeue_one_with<'b, R, F>(&'b mut self, f: F) -> Result<R>
147 where
148 F: FnOnce(&'b mut T) -> Result<R>,
149 {
150 if self.is_empty() {
151 return Err(Error::Exhausted);
152 }
153
154 let next_at = self.get_idx_unchecked(1);
155 match f(&mut self.storage[self.read_at]) {
156 Ok(result) => {
157 self.length -= 1;
158 self.read_at = next_at;
159 Ok(result)
160 }
161 Err(error) => Err(error),
162 }
163 }
164
165 pub fn dequeue_one(&mut self) -> Result<&mut T> {
170 self.dequeue_one_with(Ok)
171 }
172}
173
174impl<'a, T: 'a> RingBuffer<'a, T> {
177 pub fn enqueue_many_with<'b, R, F>(&'b mut self, f: F) -> (usize, R)
184 where
185 F: FnOnce(&'b mut [T]) -> (usize, R),
186 {
187 if self.length == 0 {
188 self.read_at = 0;
191 }
192
193 let write_at = self.get_idx(self.length);
194 let max_size = self.contiguous_window();
195 let (size, result) = f(&mut self.storage[write_at..write_at + max_size]);
196 assert!(size <= max_size);
197 self.length += size;
198 (size, result)
199 }
200
201 #[must_use]
207 pub fn enqueue_many(&mut self, size: usize) -> &mut [T] {
208 self.enqueue_many_with(|buf| {
209 let size = cmp::min(size, buf.len());
210 (size, &mut buf[..size])
211 })
212 .1
213 }
214
215 #[must_use]
218 pub fn enqueue_slice(&mut self, data: &[T]) -> usize
219 where
220 T: Copy,
221 {
222 let (size_1, data) = self.enqueue_many_with(|buf| {
223 let size = cmp::min(buf.len(), data.len());
224 buf[..size].copy_from_slice(&data[..size]);
225 (size, &data[size..])
226 });
227 let (size_2, ()) = self.enqueue_many_with(|buf| {
228 let size = cmp::min(buf.len(), data.len());
229 buf[..size].copy_from_slice(&data[..size]);
230 (size, ())
231 });
232 size_1 + size_2
233 }
234
235 pub fn dequeue_many_with<'b, R, F>(&'b mut self, f: F) -> (usize, R)
242 where
243 F: FnOnce(&'b mut [T]) -> (usize, R),
244 {
245 let capacity = self.capacity();
246 let max_size = cmp::min(self.len(), capacity - self.read_at);
247 let (size, result) = f(&mut self.storage[self.read_at..self.read_at + max_size]);
248 assert!(size <= max_size);
249 self.read_at = if capacity > 0 {
250 (self.read_at + size) % capacity
251 } else {
252 0
253 };
254 self.length -= size;
255 (size, result)
256 }
257
258 #[must_use]
264 pub fn dequeue_many(&mut self, size: usize) -> &mut [T] {
265 self.dequeue_many_with(|buf| {
266 let size = cmp::min(size, buf.len());
267 (size, &mut buf[..size])
268 })
269 .1
270 }
271
272 #[must_use]
275 pub fn dequeue_slice(&mut self, data: &mut [T]) -> usize
276 where
277 T: Copy,
278 {
279 let (size_1, data) = self.dequeue_many_with(|buf| {
280 let size = cmp::min(buf.len(), data.len());
281 data[..size].copy_from_slice(&buf[..size]);
282 (size, &mut data[size..])
283 });
284 let (size_2, ()) = self.dequeue_many_with(|buf| {
285 let size = cmp::min(buf.len(), data.len());
286 data[..size].copy_from_slice(&buf[..size]);
287 (size, ())
288 });
289 size_1 + size_2
290 }
291}
292
293impl<'a, T: 'a> RingBuffer<'a, T> {
296 #[must_use]
299 pub fn get_unallocated(&mut self, offset: usize, mut size: usize) -> &mut [T] {
300 let start_at = self.get_idx(self.length + offset);
301 if offset > self.window() {
303 return &mut [];
304 }
305 let clamped_window = self.window() - offset;
307 if size > clamped_window {
308 size = clamped_window
309 }
310 let until_end = self.capacity() - start_at;
312 if size > until_end {
313 size = until_end
314 }
315
316 &mut self.storage[start_at..start_at + size]
317 }
318
319 #[must_use]
323 pub fn write_unallocated(&mut self, offset: usize, data: &[T]) -> usize
324 where
325 T: Copy,
326 {
327 let (size_1, offset, data) = {
328 let slice = self.get_unallocated(offset, data.len());
329 let slice_len = slice.len();
330 slice.copy_from_slice(&data[..slice_len]);
331 (slice_len, offset + slice_len, &data[slice_len..])
332 };
333 let size_2 = {
334 let slice = self.get_unallocated(offset, data.len());
335 let slice_len = slice.len();
336 slice.copy_from_slice(&data[..slice_len]);
337 slice_len
338 };
339 size_1 + size_2
340 }
341
342 pub fn enqueue_unallocated(&mut self, count: usize) {
347 assert!(count <= self.window());
348 self.length += count;
349 }
350
351 #[must_use]
354 pub fn get_allocated(&self, offset: usize, mut size: usize) -> &[T] {
355 let start_at = self.get_idx(offset);
356 if offset > self.length {
358 return &mut [];
359 }
360 let clamped_length = self.length - offset;
362 if size > clamped_length {
363 size = clamped_length
364 }
365 let until_end = self.capacity() - start_at;
367 if size > until_end {
368 size = until_end
369 }
370
371 &self.storage[start_at..start_at + size]
372 }
373
374 #[must_use]
378 pub fn read_allocated(&mut self, offset: usize, data: &mut [T]) -> usize
379 where
380 T: Copy,
381 {
382 let (size_1, offset, data) = {
383 let slice = self.get_allocated(offset, data.len());
384 data[..slice.len()].copy_from_slice(slice);
385 (slice.len(), offset + slice.len(), &mut data[slice.len()..])
386 };
387 let size_2 = {
388 let slice = self.get_allocated(offset, data.len());
389 data[..slice.len()].copy_from_slice(slice);
390 slice.len()
391 };
392 size_1 + size_2
393 }
394
395 pub fn dequeue_allocated(&mut self, count: usize) {
400 assert!(count <= self.len());
401 self.length -= count;
402 self.read_at = self.get_idx(count);
403 }
404}
405
406impl<'a, T: 'a> From<ManagedSlice<'a, T>> for RingBuffer<'a, T> {
407 fn from(slice: ManagedSlice<'a, T>) -> RingBuffer<'a, T> {
408 RingBuffer::new(slice)
409 }
410}
411
412#[cfg(test)]
413mod test {
414 use super::*;
415
416 #[test]
417 fn test_buffer_length_changes() {
418 let mut ring = RingBuffer::new(vec![0; 2]);
419 assert!(ring.is_empty());
420 assert!(!ring.is_full());
421 assert_eq!(ring.len(), 0);
422 assert_eq!(ring.capacity(), 2);
423 assert_eq!(ring.window(), 2);
424
425 ring.length = 1;
426 assert!(!ring.is_empty());
427 assert!(!ring.is_full());
428 assert_eq!(ring.len(), 1);
429 assert_eq!(ring.capacity(), 2);
430 assert_eq!(ring.window(), 1);
431
432 ring.length = 2;
433 assert!(!ring.is_empty());
434 assert!(ring.is_full());
435 assert_eq!(ring.len(), 2);
436 assert_eq!(ring.capacity(), 2);
437 assert_eq!(ring.window(), 0);
438 }
439
440 #[test]
441 fn test_buffer_enqueue_dequeue_one_with() {
442 let mut ring = RingBuffer::new(vec![0; 5]);
443 assert_eq!(
444 ring.dequeue_one_with(|_| unreachable!()) as Result<()>,
445 Err(Error::Exhausted)
446 );
447
448 ring.enqueue_one_with(Ok).unwrap();
449 assert!(!ring.is_empty());
450 assert!(!ring.is_full());
451
452 for i in 1..5 {
453 ring.enqueue_one_with(|e| Ok(*e = i)).unwrap();
454 assert!(!ring.is_empty());
455 }
456 assert!(ring.is_full());
457 assert_eq!(
458 ring.enqueue_one_with(|_| unreachable!()) as Result<()>,
459 Err(Error::Exhausted)
460 );
461
462 for i in 0..5 {
463 assert_eq!(ring.dequeue_one_with(|e| Ok(*e)).unwrap(), i);
464 assert!(!ring.is_full());
465 }
466 assert_eq!(
467 ring.dequeue_one_with(|_| unreachable!()) as Result<()>,
468 Err(Error::Exhausted)
469 );
470 assert!(ring.is_empty());
471 }
472
473 #[test]
474 fn test_buffer_enqueue_dequeue_one() {
475 let mut ring = RingBuffer::new(vec![0; 5]);
476 assert_eq!(ring.dequeue_one(), Err(Error::Exhausted));
477
478 ring.enqueue_one().unwrap();
479 assert!(!ring.is_empty());
480 assert!(!ring.is_full());
481
482 for i in 1..5 {
483 *ring.enqueue_one().unwrap() = i;
484 assert!(!ring.is_empty());
485 }
486 assert!(ring.is_full());
487 assert_eq!(ring.enqueue_one(), Err(Error::Exhausted));
488
489 for i in 0..5 {
490 assert_eq!(*ring.dequeue_one().unwrap(), i);
491 assert!(!ring.is_full());
492 }
493 assert_eq!(ring.dequeue_one(), Err(Error::Exhausted));
494 assert!(ring.is_empty());
495 }
496
497 #[test]
498 fn test_buffer_enqueue_many_with() {
499 let mut ring = RingBuffer::new(vec![b'.'; 12]);
500
501 assert_eq!(
502 ring.enqueue_many_with(|buf| {
503 assert_eq!(buf.len(), 12);
504 buf[0..2].copy_from_slice(b"ab");
505 (2, true)
506 }),
507 (2, true)
508 );
509 assert_eq!(ring.len(), 2);
510 assert_eq!(&ring.storage[..], b"ab..........");
511
512 ring.enqueue_many_with(|buf| {
513 assert_eq!(buf.len(), 12 - 2);
514 buf[0..4].copy_from_slice(b"cdXX");
515 (2, ())
516 });
517 assert_eq!(ring.len(), 4);
518 assert_eq!(&ring.storage[..], b"abcdXX......");
519
520 ring.enqueue_many_with(|buf| {
521 assert_eq!(buf.len(), 12 - 4);
522 buf[0..4].copy_from_slice(b"efgh");
523 (4, ())
524 });
525 assert_eq!(ring.len(), 8);
526 assert_eq!(&ring.storage[..], b"abcdefgh....");
527
528 for _ in 0..4 {
529 *ring.dequeue_one().unwrap() = b'.';
530 }
531 assert_eq!(ring.len(), 4);
532 assert_eq!(&ring.storage[..], b"....efgh....");
533
534 ring.enqueue_many_with(|buf| {
535 assert_eq!(buf.len(), 12 - 8);
536 buf[0..4].copy_from_slice(b"ijkl");
537 (4, ())
538 });
539 assert_eq!(ring.len(), 8);
540 assert_eq!(&ring.storage[..], b"....efghijkl");
541
542 ring.enqueue_many_with(|buf| {
543 assert_eq!(buf.len(), 4);
544 buf[0..4].copy_from_slice(b"abcd");
545 (4, ())
546 });
547 assert_eq!(ring.len(), 12);
548 assert_eq!(&ring.storage[..], b"abcdefghijkl");
549
550 for _ in 0..4 {
551 *ring.dequeue_one().unwrap() = b'.';
552 }
553 assert_eq!(ring.len(), 8);
554 assert_eq!(&ring.storage[..], b"abcd....ijkl");
555 }
556
557 #[test]
558 fn test_buffer_enqueue_many() {
559 let mut ring = RingBuffer::new(vec![b'.'; 12]);
560
561 ring.enqueue_many(8).copy_from_slice(b"abcdefgh");
562 assert_eq!(ring.len(), 8);
563 assert_eq!(&ring.storage[..], b"abcdefgh....");
564
565 ring.enqueue_many(8).copy_from_slice(b"ijkl");
566 assert_eq!(ring.len(), 12);
567 assert_eq!(&ring.storage[..], b"abcdefghijkl");
568 }
569
570 #[test]
571 fn test_buffer_enqueue_slice() {
572 let mut ring = RingBuffer::new(vec![b'.'; 12]);
573
574 assert_eq!(ring.enqueue_slice(b"abcdefgh"), 8);
575 assert_eq!(ring.len(), 8);
576 assert_eq!(&ring.storage[..], b"abcdefgh....");
577
578 for _ in 0..4 {
579 *ring.dequeue_one().unwrap() = b'.';
580 }
581 assert_eq!(ring.len(), 4);
582 assert_eq!(&ring.storage[..], b"....efgh....");
583
584 assert_eq!(ring.enqueue_slice(b"ijklabcd"), 8);
585 assert_eq!(ring.len(), 12);
586 assert_eq!(&ring.storage[..], b"abcdefghijkl");
587 }
588
589 #[test]
590 fn test_buffer_dequeue_many_with() {
591 let mut ring = RingBuffer::new(vec![b'.'; 12]);
592
593 assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
594
595 assert_eq!(
596 ring.dequeue_many_with(|buf| {
597 assert_eq!(buf.len(), 12);
598 assert_eq!(buf, b"abcdefghijkl");
599 buf[..4].copy_from_slice(b"....");
600 (4, true)
601 }),
602 (4, true)
603 );
604 assert_eq!(ring.len(), 8);
605 assert_eq!(&ring.storage[..], b"....efghijkl");
606
607 ring.dequeue_many_with(|buf| {
608 assert_eq!(buf, b"efghijkl");
609 buf[..4].copy_from_slice(b"....");
610 (4, ())
611 });
612 assert_eq!(ring.len(), 4);
613 assert_eq!(&ring.storage[..], b"........ijkl");
614
615 assert_eq!(ring.enqueue_slice(b"abcd"), 4);
616 assert_eq!(ring.len(), 8);
617
618 ring.dequeue_many_with(|buf| {
619 assert_eq!(buf, b"ijkl");
620 buf[..4].copy_from_slice(b"....");
621 (4, ())
622 });
623 ring.dequeue_many_with(|buf| {
624 assert_eq!(buf, b"abcd");
625 buf[..4].copy_from_slice(b"....");
626 (4, ())
627 });
628 assert_eq!(ring.len(), 0);
629 assert_eq!(&ring.storage[..], b"............");
630 }
631
632 #[test]
633 fn test_buffer_dequeue_many() {
634 let mut ring = RingBuffer::new(vec![b'.'; 12]);
635
636 assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
637
638 {
639 let buf = ring.dequeue_many(8);
640 assert_eq!(buf, b"abcdefgh");
641 buf.copy_from_slice(b"........");
642 }
643 assert_eq!(ring.len(), 4);
644 assert_eq!(&ring.storage[..], b"........ijkl");
645
646 {
647 let buf = ring.dequeue_many(8);
648 assert_eq!(buf, b"ijkl");
649 buf.copy_from_slice(b"....");
650 }
651 assert_eq!(ring.len(), 0);
652 assert_eq!(&ring.storage[..], b"............");
653 }
654
655 #[test]
656 fn test_buffer_dequeue_slice() {
657 let mut ring = RingBuffer::new(vec![b'.'; 12]);
658
659 assert_eq!(ring.enqueue_slice(b"abcdefghijkl"), 12);
660
661 {
662 let mut buf = [0; 8];
663 assert_eq!(ring.dequeue_slice(&mut buf[..]), 8);
664 assert_eq!(&buf[..], b"abcdefgh");
665 assert_eq!(ring.len(), 4);
666 }
667
668 assert_eq!(ring.enqueue_slice(b"abcd"), 4);
669
670 {
671 let mut buf = [0; 8];
672 assert_eq!(ring.dequeue_slice(&mut buf[..]), 8);
673 assert_eq!(&buf[..], b"ijklabcd");
674 assert_eq!(ring.len(), 0);
675 }
676 }
677
678 #[test]
679 fn test_buffer_get_unallocated() {
680 let mut ring = RingBuffer::new(vec![b'.'; 12]);
681
682 assert_eq!(ring.get_unallocated(16, 4), b"");
683
684 {
685 let buf = ring.get_unallocated(0, 4);
686 buf.copy_from_slice(b"abcd");
687 }
688 assert_eq!(&ring.storage[..], b"abcd........");
689
690 let buf_enqueued = ring.enqueue_many(4);
691 assert_eq!(buf_enqueued.len(), 4);
692 assert_eq!(ring.len(), 4);
693
694 {
695 let buf = ring.get_unallocated(4, 8);
696 buf.copy_from_slice(b"ijkl");
697 }
698 assert_eq!(&ring.storage[..], b"abcd....ijkl");
699
700 ring.enqueue_many(8).copy_from_slice(b"EFGHIJKL");
701 ring.dequeue_many(4).copy_from_slice(b"abcd");
702 assert_eq!(ring.len(), 8);
703 assert_eq!(&ring.storage[..], b"abcdEFGHIJKL");
704
705 {
706 let buf = ring.get_unallocated(0, 8);
707 buf.copy_from_slice(b"ABCD");
708 }
709 assert_eq!(&ring.storage[..], b"ABCDEFGHIJKL");
710 }
711
712 #[test]
713 fn test_buffer_write_unallocated() {
714 let mut ring = RingBuffer::new(vec![b'.'; 12]);
715 ring.enqueue_many(6).copy_from_slice(b"abcdef");
716 ring.dequeue_many(6).copy_from_slice(b"ABCDEF");
717
718 assert_eq!(ring.write_unallocated(0, b"ghi"), 3);
719 assert_eq!(ring.get_unallocated(0, 3), b"ghi");
720
721 assert_eq!(ring.write_unallocated(3, b"jklmno"), 6);
722 assert_eq!(ring.get_unallocated(3, 3), b"jkl");
723
724 assert_eq!(ring.write_unallocated(9, b"pqrstu"), 3);
725 assert_eq!(ring.get_unallocated(9, 3), b"pqr");
726 }
727
728 #[test]
729 fn test_buffer_get_allocated() {
730 let mut ring = RingBuffer::new(vec![b'.'; 12]);
731
732 assert_eq!(ring.get_allocated(16, 4), b"");
733 assert_eq!(ring.get_allocated(0, 4), b"");
734
735 let len_enqueued = ring.enqueue_slice(b"abcd");
736 assert_eq!(ring.get_allocated(0, 8), b"abcd");
737 assert_eq!(len_enqueued, 4);
738
739 let len_enqueued = ring.enqueue_slice(b"efghijkl");
740 ring.dequeue_many(4).copy_from_slice(b"....");
741 assert_eq!(ring.get_allocated(4, 8), b"ijkl");
742 assert_eq!(len_enqueued, 8);
743
744 let len_enqueued = ring.enqueue_slice(b"abcd");
745 assert_eq!(ring.get_allocated(4, 8), b"ijkl");
746 assert_eq!(len_enqueued, 4);
747 }
748
749 #[test]
750 fn test_buffer_read_allocated() {
751 let mut ring = RingBuffer::new(vec![b'.'; 12]);
752 ring.enqueue_many(12).copy_from_slice(b"abcdefghijkl");
753
754 let mut data = [0; 6];
755 assert_eq!(ring.read_allocated(0, &mut data[..]), 6);
756 assert_eq!(&data[..], b"abcdef");
757
758 ring.dequeue_many(6).copy_from_slice(b"ABCDEF");
759 ring.enqueue_many(3).copy_from_slice(b"mno");
760
761 let mut data = [0; 6];
762 assert_eq!(ring.read_allocated(3, &mut data[..]), 6);
763 assert_eq!(&data[..], b"jklmno");
764
765 let mut data = [0; 6];
766 assert_eq!(ring.read_allocated(6, &mut data[..]), 3);
767 assert_eq!(&data[..], b"mno\x00\x00\x00");
768 }
769
770 #[test]
771 fn test_buffer_with_no_capacity() {
772 let mut no_capacity: RingBuffer<u8> = RingBuffer::new(vec![]);
773
774 assert_eq!(no_capacity.get_unallocated(0, 0), &[]);
777 assert_eq!(no_capacity.get_allocated(0, 0), &[]);
778 no_capacity.dequeue_allocated(0);
779 assert_eq!(no_capacity.enqueue_many(0), &[]);
780 assert_eq!(no_capacity.enqueue_one(), Err(Error::Exhausted));
781 assert_eq!(no_capacity.contiguous_window(), 0);
782 }
783
784 #[test]
788 fn test_buffer_write_wholly() {
789 let mut ring = RingBuffer::new(vec![b'.'; 8]);
790 ring.enqueue_many(2).copy_from_slice(b"ab");
791 ring.enqueue_many(2).copy_from_slice(b"cd");
792 assert_eq!(ring.len(), 4);
793 let buf_dequeued = ring.dequeue_many(4);
794 assert_eq!(buf_dequeued, b"abcd");
795 assert_eq!(ring.len(), 0);
796
797 let large = ring.enqueue_many(8);
798 assert_eq!(large.len(), 8);
799 }
800}