1use std::cell::UnsafeCell;
20use std::mem::MaybeUninit;
21use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
22
23use crate::tpc::CachePadded;
24
25use super::config::{MAX_BUFFER_SIZE, MIN_BUFFER_SIZE};
26
27pub struct RingBuffer<T> {
39 buffer: Box<[UnsafeCell<MaybeUninit<T>>]>,
41
42 head: CachePadded<AtomicUsize>,
45
46 tail: CachePadded<AtomicUsize>,
49
50 claim_counter: CachePadded<AtomicU64>,
53
54 capacity_mask: usize,
56}
57
58unsafe impl<T: Send> Send for RingBuffer<T> {}
60
61unsafe impl<T: Send> Sync for RingBuffer<T> {}
64
65impl<T> RingBuffer<T> {
66 #[must_use]
75 pub fn new(capacity: usize) -> Self {
76 assert!(capacity > 0, "capacity must be > 0");
77
78 let capacity = capacity
80 .clamp(MIN_BUFFER_SIZE, MAX_BUFFER_SIZE)
81 .next_power_of_two();
82
83 let buffer: Vec<UnsafeCell<MaybeUninit<T>>> = (0..capacity)
85 .map(|_| UnsafeCell::new(MaybeUninit::uninit()))
86 .collect();
87
88 Self {
89 buffer: buffer.into_boxed_slice(),
90 head: CachePadded::new(AtomicUsize::new(0)),
91 tail: CachePadded::new(AtomicUsize::new(0)),
92 claim_counter: CachePadded::new(AtomicU64::new(0)),
93 capacity_mask: capacity - 1,
94 }
95 }
96
97 #[inline]
99 #[must_use]
100 pub fn capacity(&self) -> usize {
101 self.capacity_mask + 1
102 }
103
104 #[inline]
109 #[must_use]
110 pub fn is_empty(&self) -> bool {
111 let head = self.head.load(Ordering::Relaxed);
112 let tail = self.tail.load(Ordering::Relaxed);
113 head == tail
114 }
115
116 #[inline]
121 #[must_use]
122 pub fn is_full(&self) -> bool {
123 let head = self.head.load(Ordering::Relaxed);
124 let tail = self.tail.load(Ordering::Relaxed);
125 self.next_index(tail) == head
126 }
127
128 #[inline]
133 #[must_use]
134 pub fn len(&self) -> usize {
135 let head = self.head.load(Ordering::Relaxed);
136 let tail = self.tail.load(Ordering::Relaxed);
137 tail.wrapping_sub(head) & self.capacity_mask
138 }
139
140 #[inline]
144 #[must_use]
145 pub fn free_slots(&self) -> usize {
146 self.capacity() - 1 - self.len()
148 }
149
150 #[inline]
162 pub fn push(&self, item: T) -> Result<(), T> {
163 let tail = self.tail.load(Ordering::Relaxed);
164 let next_tail = self.next_index(tail);
165
166 if next_tail == self.head.load(Ordering::Acquire) {
168 return Err(item);
169 }
170
171 unsafe {
176 (*self.buffer[tail].get()).write(item);
177 }
178
179 self.tail.store(next_tail, Ordering::Release);
181
182 Ok(())
183 }
184
185 #[inline]
193 #[must_use]
194 pub fn pop(&self) -> Option<T> {
195 let head = self.head.load(Ordering::Relaxed);
196
197 if head == self.tail.load(Ordering::Acquire) {
199 return None;
200 }
201
202 let item = unsafe { (*self.buffer[head].get()).assume_init_read() };
207
208 self.head.store(self.next_index(head), Ordering::Release);
210
211 Some(item)
212 }
213
214 #[inline]
222 #[must_use]
223 pub fn peek(&self) -> Option<&T> {
224 let head = self.head.load(Ordering::Relaxed);
225
226 if head == self.tail.load(Ordering::Acquire) {
227 return None;
228 }
229
230 unsafe { Some((*self.buffer[head].get()).assume_init_ref()) }
232 }
233
234 #[inline]
242 pub fn claim_slot(&self) -> Option<usize> {
243 let claim = self.claim_counter.fetch_add(1, Ordering::AcqRel);
245
246 #[allow(clippy::cast_possible_truncation)]
249 let slot = usize::try_from(claim).unwrap_or(usize::MAX) & self.capacity_mask;
250
251 let tail = self.tail.load(Ordering::Acquire);
253 let pending = claim.saturating_sub(tail as u64);
254
255 if pending >= (self.capacity() - 1) as u64 {
257 self.claim_counter.fetch_sub(1, Ordering::AcqRel);
260 return None;
261 }
262
263 Some(slot)
264 }
265
266 #[inline]
273 pub unsafe fn write_slot(&self, slot: usize, item: T) {
274 debug_assert!(slot < self.capacity());
275 (*self.buffer[slot].get()).write(item);
276 }
277
278 #[inline]
291 pub fn try_advance_tail(&self, target_tail: usize) -> bool {
292 let current_tail = self.tail.load(Ordering::Acquire);
293
294 if target_tail == self.next_index(current_tail) {
296 self.tail
298 .compare_exchange(
299 current_tail,
300 target_tail,
301 Ordering::AcqRel,
302 Ordering::Relaxed,
303 )
304 .is_ok()
305 } else {
306 false
307 }
308 }
309
310 #[inline]
318 pub fn push_batch(&self, items: impl IntoIterator<Item = T>) -> usize {
319 let mut count = 0;
320 for item in items {
321 if self.push(item).is_err() {
322 break;
323 }
324 count += 1;
325 }
326 count
327 }
328
329 #[cold]
343 #[must_use]
344 pub fn pop_batch(&self, max_count: usize) -> Vec<T> {
345 let mut items = Vec::with_capacity(max_count.min(self.len()));
346 for _ in 0..max_count {
347 if let Some(item) = self.pop() {
348 items.push(item);
349 } else {
350 break;
351 }
352 }
353 items
354 }
355
356 #[inline]
369 pub fn pop_each<F>(&self, max_count: usize, mut f: F) -> usize
370 where
371 F: FnMut(T) -> bool,
372 {
373 if max_count == 0 {
374 return 0;
375 }
376
377 let mut current_head = self.head.load(Ordering::Relaxed);
378 let tail = self.tail.load(Ordering::Acquire);
379
380 let available = if tail >= current_head {
382 tail - current_head
383 } else {
384 (self.capacity_mask + 1) - current_head + tail
385 };
386
387 let to_pop = available.min(max_count);
388
389 if to_pop == 0 {
390 return 0;
391 }
392
393 let mut popped = 0;
394 for _ in 0..to_pop {
395 let item = unsafe { (*self.buffer[current_head].get()).assume_init_read() };
397
398 popped += 1;
399 current_head = self.next_index(current_head);
400
401 if !f(item) {
402 break;
403 }
404 }
405
406 if popped > 0 {
407 self.head.store(current_head, Ordering::Release);
408 }
409
410 popped
411 }
412
413 #[inline]
423 pub fn pop_batch_into(&self, buffer: &mut [MaybeUninit<T>]) -> usize {
424 if buffer.is_empty() {
425 return 0;
426 }
427
428 let mut current_head = self.head.load(Ordering::Relaxed);
429 let tail = self.tail.load(Ordering::Acquire);
430
431 let available = if tail >= current_head {
432 tail - current_head
433 } else {
434 (self.capacity_mask + 1) - current_head + tail
435 };
436
437 let count = available.min(buffer.len());
438
439 if count == 0 {
440 return 0;
441 }
442
443 for slot in buffer.iter_mut().take(count) {
444 unsafe {
446 let src = (*self.buffer[current_head].get()).assume_init_read();
447 slot.write(src);
448 }
449 current_head = self.next_index(current_head);
450 }
451
452 self.head.store(current_head, Ordering::Release);
453 count
454 }
455
456 #[inline]
458 const fn next_index(&self, index: usize) -> usize {
459 (index + 1) & self.capacity_mask
460 }
461
462 pub fn reset(&mut self) {
468 while self.pop().is_some() {}
470
471 self.head.store(0, Ordering::Release);
473 self.tail.store(0, Ordering::Release);
474 self.claim_counter.store(0, Ordering::Release);
475 }
476}
477
478impl<T> Drop for RingBuffer<T> {
479 fn drop(&mut self) {
480 while self.pop().is_some() {}
482 }
483}
484
485impl<T: std::fmt::Debug> std::fmt::Debug for RingBuffer<T> {
486 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
487 f.debug_struct("RingBuffer")
488 .field("capacity", &self.capacity())
489 .field("len", &self.len())
490 .field("is_empty", &self.is_empty())
491 .field("is_full", &self.is_full())
492 .finish()
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499 use std::sync::Arc;
500 use std::thread;
501
502 #[test]
503 fn test_new_buffer() {
504 let buffer: RingBuffer<i32> = RingBuffer::new(100);
505 assert_eq!(buffer.capacity(), 128);
507 assert!(buffer.is_empty());
508 assert!(!buffer.is_full());
509 assert_eq!(buffer.len(), 0);
510 }
511
512 #[test]
513 fn test_push_pop() {
514 let buffer: RingBuffer<i32> = RingBuffer::new(4);
515
516 assert!(buffer.push(1).is_ok());
517 assert!(buffer.push(2).is_ok());
518 assert!(buffer.push(3).is_ok());
519 assert!(buffer.is_full());
521 assert!(buffer.push(4).is_err());
522
523 assert_eq!(buffer.pop(), Some(1));
524 assert_eq!(buffer.pop(), Some(2));
525 assert_eq!(buffer.pop(), Some(3));
526 assert_eq!(buffer.pop(), None);
527 assert!(buffer.is_empty());
528 }
529
530 #[test]
531 fn test_fifo_order() {
532 let buffer: RingBuffer<i32> = RingBuffer::new(16);
533
534 for i in 0..10 {
535 assert!(buffer.push(i).is_ok());
536 }
537
538 for i in 0..10 {
539 assert_eq!(buffer.pop(), Some(i));
540 }
541 }
542
543 #[test]
544 fn test_wrap_around() {
545 let buffer: RingBuffer<i32> = RingBuffer::new(4);
546
547 for iteration in 0..5 {
549 for i in 0..3 {
550 assert!(buffer.push(iteration * 10 + i).is_ok());
551 }
552 for i in 0..3 {
553 assert_eq!(buffer.pop(), Some(iteration * 10 + i));
554 }
555 }
556 }
557
558 #[test]
559 fn test_peek() {
560 let buffer: RingBuffer<i32> = RingBuffer::new(4);
561
562 assert!(buffer.peek().is_none());
563
564 buffer.push(42).unwrap();
565 assert_eq!(buffer.peek(), Some(&42));
566 assert_eq!(buffer.peek(), Some(&42)); assert_eq!(buffer.pop(), Some(42));
569 assert!(buffer.peek().is_none());
570 }
571
572 #[test]
573 fn test_push_batch() {
574 let buffer: RingBuffer<i32> = RingBuffer::new(8);
575
576 let pushed = buffer.push_batch(vec![1, 2, 3, 4, 5]);
577 assert_eq!(pushed, 5);
578 assert_eq!(buffer.len(), 5);
579
580 let pushed = buffer.push_batch(vec![6, 7, 8, 9, 10]);
582 assert_eq!(pushed, 2); }
584
585 #[test]
586 fn test_pop_batch() {
587 let buffer: RingBuffer<i32> = RingBuffer::new(8);
588
589 buffer.push_batch(vec![1, 2, 3, 4, 5]);
590
591 let items = buffer.pop_batch(3);
592 assert_eq!(items, vec![1, 2, 3]);
593 assert_eq!(buffer.len(), 2);
594
595 let items = buffer.pop_batch(10);
596 assert_eq!(items, vec![4, 5]);
597 assert!(buffer.is_empty());
598 }
599
600 #[test]
601 fn test_pop_each() {
602 let buffer: RingBuffer<i32> = RingBuffer::new(16);
603
604 buffer.push_batch(vec![1, 2, 3, 4, 5]);
605
606 let mut sum = 0;
607 let count = buffer.pop_each(10, |item| {
608 sum += item;
609 true
610 });
611
612 assert_eq!(count, 5);
613 assert_eq!(sum, 15);
614 assert!(buffer.is_empty());
615 }
616
617 #[test]
618 fn test_pop_each_early_stop() {
619 let buffer: RingBuffer<i32> = RingBuffer::new(16);
620
621 buffer.push_batch(vec![1, 2, 3, 4, 5]);
622
623 let mut items = Vec::new();
624 let count = buffer.pop_each(10, |item| {
625 items.push(item);
626 item < 3
627 });
628
629 assert_eq!(count, 3);
630 assert_eq!(items, vec![1, 2, 3]);
631 assert_eq!(buffer.len(), 2);
632 }
633
634 #[test]
635 fn test_pop_batch_into() {
636 let buffer: RingBuffer<i32> = RingBuffer::new(16);
637
638 buffer.push_batch(vec![1, 2, 3]);
639
640 let mut dest: [MaybeUninit<i32>; 8] = [MaybeUninit::uninit(); 8];
641 let count = buffer.pop_batch_into(&mut dest);
642
643 assert_eq!(count, 3);
644 unsafe {
645 assert_eq!(dest[0].assume_init(), 1);
646 assert_eq!(dest[1].assume_init(), 2);
647 assert_eq!(dest[2].assume_init(), 3);
648 }
649 }
650
651 #[test]
652 fn test_free_slots() {
653 let buffer: RingBuffer<i32> = RingBuffer::new(8);
654
655 assert_eq!(buffer.free_slots(), 7); buffer.push(1).unwrap();
658 buffer.push(2).unwrap();
659 assert_eq!(buffer.free_slots(), 5);
660
661 let _ = buffer.pop();
662 assert_eq!(buffer.free_slots(), 6);
663 }
664
665 #[test]
666 fn test_concurrent_spsc() {
667 const ITEMS: i32 = 10_000;
668 let buffer = Arc::new(RingBuffer::<i32>::new(1024));
669 let producer_buffer = Arc::clone(&buffer);
670 let consumer_buffer = Arc::clone(&buffer);
671
672 let producer = thread::spawn(move || {
673 for i in 0..ITEMS {
674 while producer_buffer.push(i).is_err() {
675 thread::yield_now();
676 }
677 }
678 });
679
680 let consumer = thread::spawn(move || {
681 let mut received = Vec::with_capacity(ITEMS as usize);
682 while received.len() < ITEMS as usize {
683 if let Some(item) = consumer_buffer.pop() {
684 received.push(item);
685 } else {
686 thread::yield_now();
687 }
688 }
689 received
690 });
691
692 producer.join().unwrap();
693 let received = consumer.join().unwrap();
694
695 assert_eq!(received.len(), ITEMS as usize);
696 for (i, &item) in received.iter().enumerate() {
697 assert_eq!(item, i32::try_from(i).unwrap());
698 }
699 }
700
701 #[test]
702 fn test_drop_items() {
703 use std::sync::atomic::AtomicUsize;
704
705 static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
706
707 #[derive(Debug)]
708 struct DropCounter;
709 impl Drop for DropCounter {
710 fn drop(&mut self) {
711 DROP_COUNT.fetch_add(1, Ordering::SeqCst);
712 }
713 }
714
715 DROP_COUNT.store(0, Ordering::SeqCst);
716
717 {
718 let buffer: RingBuffer<DropCounter> = RingBuffer::new(8);
719 for _ in 0..5 {
720 buffer.push(DropCounter).unwrap();
721 }
722 let _ = buffer.pop();
723 let _ = buffer.pop();
724 }
726
727 assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 5);
729 }
730
731 #[test]
732 fn test_reset() {
733 let mut buffer: RingBuffer<i32> = RingBuffer::new(8);
734
735 buffer.push_batch(vec![1, 2, 3, 4, 5]);
736 assert_eq!(buffer.len(), 5);
737
738 buffer.reset();
739 assert!(buffer.is_empty());
740 assert_eq!(buffer.len(), 0);
741 }
742
743 #[test]
744 fn test_debug() {
745 let buffer: RingBuffer<i32> = RingBuffer::new(8);
746 buffer.push(1).unwrap();
747 buffer.push(2).unwrap();
748
749 let debug_str = format!("{buffer:?}");
750 assert!(debug_str.contains("RingBuffer"));
751 assert!(debug_str.contains("capacity"));
752 assert!(debug_str.contains("len"));
753 }
754
755 #[test]
756 #[should_panic(expected = "capacity must be > 0")]
757 fn test_zero_capacity_panics() {
758 let _: RingBuffer<i32> = RingBuffer::new(0);
759 }
760
761 #[test]
762 fn test_capacity_clamping() {
763 let buffer: RingBuffer<i32> = RingBuffer::new(1);
765 assert!(buffer.capacity() >= MIN_BUFFER_SIZE);
766
767 let buffer: RingBuffer<i32> = RingBuffer::new(usize::MAX / 2);
769 assert!(buffer.capacity() <= MAX_BUFFER_SIZE.next_power_of_two());
770 }
771}