1use std::cell::{Cell, UnsafeCell};
92use std::fmt;
93use std::mem::MaybeUninit;
94use std::sync::Arc;
95use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
96
97use crossbeam_utils::CachePadded;
98
99use crate::Full;
100
101#[deprecated(since = "1.3.0", note = "renamed to ring_buffer()")]
103#[inline]
104pub fn bounded<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
105 ring_buffer(capacity)
106}
107
108pub fn ring_buffer<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
116 assert!(capacity > 0, "capacity must be non-zero");
117
118 let capacity = capacity
119 .checked_next_power_of_two()
120 .expect("capacity too large (must be <= usize::MAX / 2)");
121 let mask = capacity - 1;
122
123 let slots: Vec<Slot<T>> = (0..capacity)
125 .map(|_| Slot {
126 turn: AtomicUsize::new(0),
127 data: UnsafeCell::new(MaybeUninit::uninit()),
128 })
129 .collect();
130 let slots = Box::into_raw(slots.into_boxed_slice()) as *mut Slot<T>;
131
132 let shift = capacity.trailing_zeros();
133
134 let shared = Arc::new(Shared {
135 head: CachePadded::new(AtomicUsize::new(0)),
136 tail: CachePadded::new(AtomicUsize::new(0)),
137 producer_alive: AtomicBool::new(true),
138 slots,
139 capacity,
140 shift,
141 mask,
142 });
143
144 (
145 Producer {
146 local_tail: Cell::new(0),
147 slots,
148 mask,
149 shift,
150 shared: Arc::clone(&shared),
151 },
152 Consumer {
153 slots,
154 mask,
155 shift,
156 shared,
157 },
158 )
159}
160
161struct Slot<T> {
163 turn: AtomicUsize,
167 data: UnsafeCell<MaybeUninit<T>>,
169}
170
171#[repr(C)]
174struct Shared<T> {
175 head: CachePadded<AtomicUsize>,
177 tail: CachePadded<AtomicUsize>,
179 producer_alive: AtomicBool,
181 slots: *mut Slot<T>,
183 capacity: usize,
185 shift: u32,
187 mask: usize,
189}
190
191unsafe impl<T: Send> Send for Shared<T> {}
194unsafe impl<T: Send> Sync for Shared<T> {}
195
196impl<T> Drop for Shared<T> {
197 fn drop(&mut self) {
198 let head = self.head.load(Ordering::Relaxed);
199 let tail = self.tail.load(Ordering::Relaxed);
200
201 let mut i = head;
203 while i != tail {
204 let slot = unsafe { &*self.slots.add(i & self.mask) };
205 let turn = i >> self.shift;
206
207 if slot.turn.load(Ordering::Relaxed) == turn * 2 + 1 {
209 unsafe { (*slot.data.get()).assume_init_drop() };
211 }
212 i = i.wrapping_add(1);
213 }
214
215 unsafe {
217 let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
218 self.slots,
219 self.capacity,
220 ));
221 }
222 }
223}
224
225#[repr(C)]
231pub struct Producer<T> {
232 local_tail: Cell<usize>,
234 slots: *mut Slot<T>,
236 mask: usize,
238 shift: u32,
240 shared: Arc<Shared<T>>,
241}
242
243unsafe impl<T: Send> Send for Producer<T> {}
246
247impl<T> Producer<T> {
248 #[inline]
255 #[must_use = "push returns Err if full, which should be handled"]
256 pub fn push(&self, value: T) -> Result<(), Full<T>> {
257 let tail = self.local_tail.get();
258 let slot = unsafe { &*self.slots.add(tail & self.mask) };
260 let turn = tail >> self.shift;
261
262 if slot.turn.load(Ordering::Acquire) != turn * 2 {
264 return Err(Full(value));
265 }
266
267 unsafe { (*slot.data.get()).write(value) };
269
270 slot.turn.store(turn * 2 + 1, Ordering::Release);
272
273 self.local_tail.set(tail.wrapping_add(1));
274
275 Ok(())
276 }
277
278 #[inline]
280 pub fn capacity(&self) -> usize {
281 1 << self.shift
282 }
283
284 #[inline]
286 pub fn is_disconnected(&self) -> bool {
287 Arc::strong_count(&self.shared) == 1
288 }
289}
290
291impl<T> Drop for Producer<T> {
292 fn drop(&mut self) {
293 self.shared
295 .tail
296 .store(self.local_tail.get(), Ordering::Relaxed);
297 self.shared.producer_alive.store(false, Ordering::Release);
298 }
299}
300
301impl<T> fmt::Debug for Producer<T> {
302 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303 f.debug_struct("Producer")
304 .field("capacity", &self.capacity())
305 .finish_non_exhaustive()
306 }
307}
308
309#[repr(C)]
315pub struct Consumer<T> {
316 slots: *mut Slot<T>,
318 mask: usize,
320 shift: u32,
322 shared: Arc<Shared<T>>,
323}
324
325impl<T> Clone for Consumer<T> {
326 fn clone(&self) -> Self {
327 Consumer {
328 slots: self.slots,
329 mask: self.mask,
330 shift: self.shift,
331 shared: Arc::clone(&self.shared),
332 }
333 }
334}
335
336unsafe impl<T: Send> Send for Consumer<T> {}
339
340impl<T> Consumer<T> {
341 #[inline]
348 pub fn pop(&self) -> Option<T> {
349 let mut spin_count = 0u32;
350
351 loop {
352 let head = self.shared.head.load(Ordering::Relaxed);
353
354 let slot = unsafe { &*self.slots.add(head & self.mask) };
356 let turn = head >> self.shift;
357
358 let stamp = slot.turn.load(Ordering::Acquire);
359
360 if stamp == turn * 2 + 1 {
361 if self
363 .shared
364 .head
365 .compare_exchange_weak(
366 head,
367 head.wrapping_add(1),
368 Ordering::Relaxed,
369 Ordering::Relaxed,
370 )
371 .is_ok()
372 {
373 let value = unsafe { (*slot.data.get()).assume_init_read() };
375
376 slot.turn.store((turn + 1) * 2, Ordering::Release);
378
379 return Some(value);
380 }
381
382 let spins = 1 << spin_count.min(6);
384 for _ in 0..spins {
385 std::hint::spin_loop();
386 }
387 spin_count += 1;
388 } else {
389 return None;
391 }
392 }
393 }
394
395 #[inline]
397 pub fn capacity(&self) -> usize {
398 1 << self.shift
399 }
400
401 #[inline]
403 pub fn is_disconnected(&self) -> bool {
404 !self.shared.producer_alive.load(Ordering::Acquire)
405 }
406}
407
408impl<T> fmt::Debug for Consumer<T> {
409 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410 f.debug_struct("Consumer")
411 .field("capacity", &self.capacity())
412 .finish_non_exhaustive()
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419
420 #[test]
425 fn basic_push_pop() {
426 let (tx, rx) = ring_buffer::<u64>(4);
427
428 assert!(tx.push(1).is_ok());
429 assert!(tx.push(2).is_ok());
430 assert!(tx.push(3).is_ok());
431
432 assert_eq!(rx.pop(), Some(1));
433 assert_eq!(rx.pop(), Some(2));
434 assert_eq!(rx.pop(), Some(3));
435 assert_eq!(rx.pop(), None);
436 }
437
438 #[test]
439 fn empty_pop_returns_none() {
440 let (_, rx) = ring_buffer::<u64>(4);
441 assert_eq!(rx.pop(), None);
442 assert_eq!(rx.pop(), None);
443 }
444
445 #[test]
446 fn fill_then_drain() {
447 let (tx, rx) = ring_buffer::<u64>(4);
448
449 for i in 0..4 {
450 assert!(tx.push(i).is_ok());
451 }
452
453 for i in 0..4 {
454 assert_eq!(rx.pop(), Some(i));
455 }
456
457 assert_eq!(rx.pop(), None);
458 }
459
460 #[test]
461 fn push_returns_error_when_full() {
462 let (tx, _rx) = ring_buffer::<u64>(4);
463
464 assert!(tx.push(1).is_ok());
465 assert!(tx.push(2).is_ok());
466 assert!(tx.push(3).is_ok());
467 assert!(tx.push(4).is_ok());
468
469 let err = tx.push(5).unwrap_err();
470 assert_eq!(err.into_inner(), 5);
471 }
472
473 #[test]
478 fn interleaved_single_consumer() {
479 let (tx, rx) = ring_buffer::<u64>(8);
480
481 for i in 0..1000 {
482 assert!(tx.push(i).is_ok());
483 assert_eq!(rx.pop(), Some(i));
484 }
485 }
486
487 #[test]
488 fn partial_fill_drain_cycles() {
489 let (tx, rx) = ring_buffer::<u64>(8);
490
491 for round in 0..100 {
492 for i in 0..4 {
493 assert!(tx.push(round * 4 + i).is_ok());
494 }
495
496 for i in 0..4 {
497 assert_eq!(rx.pop(), Some(round * 4 + i));
498 }
499 }
500 }
501
502 #[test]
507 fn two_consumers_single_producer() {
508 use std::thread;
509
510 let (tx, rx) = ring_buffer::<u64>(64);
511 let rx2 = rx.clone();
512
513 let rx1 = rx;
514 let h1 = thread::spawn(move || {
515 let mut received = Vec::new();
516 loop {
517 if let Some(val) = rx1.pop() {
518 received.push(val);
519 } else if rx1.is_disconnected() {
520 while let Some(val) = rx1.pop() {
521 received.push(val);
522 }
523 break;
524 } else {
525 std::hint::spin_loop();
526 }
527 }
528 received
529 });
530
531 let h2 = thread::spawn(move || {
532 let mut received = Vec::new();
533 loop {
534 if let Some(val) = rx2.pop() {
535 received.push(val);
536 } else if rx2.is_disconnected() {
537 while let Some(val) = rx2.pop() {
538 received.push(val);
539 }
540 break;
541 } else {
542 std::hint::spin_loop();
543 }
544 }
545 received
546 });
547
548 for i in 0..2000 {
549 while tx.push(i).is_err() {
550 std::hint::spin_loop();
551 }
552 }
553 drop(tx);
554
555 let mut received = h1.join().unwrap();
556 received.extend(h2.join().unwrap());
557
558 received.sort_unstable();
560 assert_eq!(received, (0..2000).collect::<Vec<_>>());
561 }
562
563 #[test]
564 fn four_consumers_single_producer() {
565 use std::thread;
566
567 let (tx, rx) = ring_buffer::<u64>(256);
568
569 let handles: Vec<_> = (0..4)
570 .map(|_| {
571 let rx = rx.clone();
572 thread::spawn(move || {
573 let mut received = Vec::new();
574 loop {
575 if let Some(val) = rx.pop() {
576 received.push(val);
577 } else if rx.is_disconnected() {
578 while let Some(val) = rx.pop() {
579 received.push(val);
580 }
581 break;
582 } else {
583 std::hint::spin_loop();
584 }
585 }
586 received
587 })
588 })
589 .collect();
590
591 drop(rx); for i in 0..4000u64 {
594 while tx.push(i).is_err() {
595 std::hint::spin_loop();
596 }
597 }
598 drop(tx);
599
600 let mut received = Vec::new();
601 for h in handles {
602 received.extend(h.join().unwrap());
603 }
604
605 received.sort_unstable();
606 assert_eq!(received, (0..4000).collect::<Vec<_>>());
607 }
608
609 #[test]
614 fn single_slot_bounded() {
615 let (tx, rx) = ring_buffer::<u64>(1);
616
617 assert!(tx.push(1).is_ok());
618 assert!(tx.push(2).is_err());
619
620 assert_eq!(rx.pop(), Some(1));
621 assert!(tx.push(2).is_ok());
622 }
623
624 #[test]
629 fn consumer_detects_producer_drop() {
630 let (tx, rx) = ring_buffer::<u64>(4);
631
632 assert!(!rx.is_disconnected());
633 drop(tx);
634 assert!(rx.is_disconnected());
635 }
636
637 #[test]
638 fn producer_detects_all_consumers_drop() {
639 let (tx, rx) = ring_buffer::<u64>(4);
640
641 assert!(!tx.is_disconnected());
642 drop(rx);
643 assert!(tx.is_disconnected());
644 }
645
646 #[test]
647 fn one_consumer_drops_others_alive() {
648 let (tx, rx) = ring_buffer::<u64>(4);
649 let rx2 = rx.clone();
650
651 assert!(!tx.is_disconnected());
652 drop(rx);
653 assert!(!tx.is_disconnected()); assert!(!rx2.is_disconnected()); drop(rx2);
656 assert!(tx.is_disconnected());
657 }
658
659 #[test]
664 fn drop_cleans_up_remaining() {
665 use std::sync::atomic::AtomicUsize;
666
667 static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
668
669 struct DropCounter;
670 impl Drop for DropCounter {
671 fn drop(&mut self) {
672 DROP_COUNT.fetch_add(1, Ordering::SeqCst);
673 }
674 }
675
676 DROP_COUNT.store(0, Ordering::SeqCst);
677
678 let (tx, rx) = ring_buffer::<DropCounter>(4);
679
680 let _ = tx.push(DropCounter);
681 let _ = tx.push(DropCounter);
682 let _ = tx.push(DropCounter);
683
684 assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 0);
685
686 drop(tx);
687 drop(rx);
688
689 assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 3);
690 }
691
692 #[test]
697 fn zero_sized_type() {
698 let (tx, rx) = ring_buffer::<()>(8);
699
700 let _ = tx.push(());
701 let _ = tx.push(());
702
703 assert_eq!(rx.pop(), Some(()));
704 assert_eq!(rx.pop(), Some(()));
705 assert_eq!(rx.pop(), None);
706 }
707
708 #[test]
709 fn string_type() {
710 let (tx, rx) = ring_buffer::<String>(4);
711
712 let _ = tx.push("hello".to_string());
713 let _ = tx.push("world".to_string());
714
715 assert_eq!(rx.pop(), Some("hello".to_string()));
716 assert_eq!(rx.pop(), Some("world".to_string()));
717 }
718
719 #[test]
720 #[should_panic(expected = "capacity must be non-zero")]
721 fn zero_capacity_panics() {
722 let _ = ring_buffer::<u64>(0);
723 }
724
725 #[test]
726 fn large_message_type() {
727 #[repr(C, align(64))]
728 struct LargeMessage {
729 data: [u8; 256],
730 }
731
732 let (tx, rx) = ring_buffer::<LargeMessage>(8);
733
734 let msg = LargeMessage { data: [42u8; 256] };
735 assert!(tx.push(msg).is_ok());
736
737 let received = rx.pop().unwrap();
738 assert_eq!(received.data[0], 42);
739 assert_eq!(received.data[255], 42);
740 }
741
742 #[test]
743 fn multiple_laps() {
744 let (tx, rx) = ring_buffer::<u64>(4);
745
746 for i in 0..40 {
748 assert!(tx.push(i).is_ok());
749 assert_eq!(rx.pop(), Some(i));
750 }
751 }
752
753 #[test]
754 fn capacity_rounds_to_power_of_two() {
755 let (tx, _) = ring_buffer::<u64>(100);
756 assert_eq!(tx.capacity(), 128);
757
758 let (tx, _) = ring_buffer::<u64>(1000);
759 assert_eq!(tx.capacity(), 1024);
760 }
761
762 #[test]
767 fn stress_single_consumer() {
768 use std::thread;
769
770 const COUNT: u64 = 100_000;
771
772 let (tx, rx) = ring_buffer::<u64>(1024);
773
774 let producer = thread::spawn(move || {
775 for i in 0..COUNT {
776 while tx.push(i).is_err() {
777 std::hint::spin_loop();
778 }
779 }
780 });
781
782 let consumer = thread::spawn(move || {
783 let mut sum = 0u64;
784 let mut received = 0u64;
785 while received < COUNT {
786 if let Some(val) = rx.pop() {
787 sum = sum.wrapping_add(val);
788 received += 1;
789 } else {
790 std::hint::spin_loop();
791 }
792 }
793 sum
794 });
795
796 producer.join().unwrap();
797 let sum = consumer.join().unwrap();
798 assert_eq!(sum, COUNT * (COUNT - 1) / 2);
799 }
800
801 #[test]
802 fn stress_multiple_consumers() {
803 use std::thread;
804
805 const CONSUMERS: usize = 4;
806 const TOTAL: u64 = 100_000;
807
808 let (tx, rx) = ring_buffer::<u64>(1024);
809
810 let handles: Vec<_> = (0..CONSUMERS)
811 .map(|_| {
812 let rx = rx.clone();
813 thread::spawn(move || {
814 let mut received = Vec::new();
815 loop {
816 if let Some(val) = rx.pop() {
817 received.push(val);
818 } else if rx.is_disconnected() {
819 while let Some(val) = rx.pop() {
820 received.push(val);
821 }
822 break;
823 } else {
824 std::hint::spin_loop();
825 }
826 }
827 received
828 })
829 })
830 .collect();
831
832 drop(rx);
833
834 let producer = thread::spawn(move || {
835 for i in 0..TOTAL {
836 while tx.push(i).is_err() {
837 std::hint::spin_loop();
838 }
839 }
840 });
841
842 producer.join().unwrap();
843
844 let mut all_received = Vec::new();
845 for h in handles {
846 all_received.extend(h.join().unwrap());
847 }
848
849 all_received.sort_unstable();
850 assert_eq!(all_received, (0..TOTAL).collect::<Vec<_>>());
851 }
852}