1use std::cell::{Cell, UnsafeCell};
67use std::fmt;
68use std::mem::MaybeUninit;
69use std::sync::Arc;
70use std::sync::atomic::{AtomicUsize, Ordering};
71
72use crossbeam_utils::CachePadded;
73
74use crate::Full;
75
76#[deprecated(since = "1.3.0", note = "renamed to ring_buffer()")]
78#[inline]
79pub fn bounded<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
80 ring_buffer(capacity)
81}
82
83pub fn ring_buffer<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
91 assert!(capacity > 0, "capacity must be non-zero");
92
93 let capacity = capacity
94 .checked_next_power_of_two()
95 .expect("capacity too large (must be <= usize::MAX / 2)");
96 let mask = capacity - 1;
97
98 let slots: Vec<Slot<T>> = (0..capacity)
100 .map(|_| Slot {
101 turn: AtomicUsize::new(0),
102 data: UnsafeCell::new(MaybeUninit::uninit()),
103 })
104 .collect();
105 let slots = Box::into_raw(slots.into_boxed_slice()) as *mut Slot<T>;
106
107 let shift = capacity.trailing_zeros();
108
109 let shared = Arc::new(Shared {
110 tail: CachePadded::new(AtomicUsize::new(0)),
111 head: CachePadded::new(AtomicUsize::new(0)),
112 slots,
113 capacity,
114 shift,
115 mask,
116 });
117
118 (
119 Producer {
120 cached_head: Cell::new(0),
121 slots,
122 mask,
123 capacity,
124 shift,
125 shared: Arc::clone(&shared),
126 },
127 Consumer {
128 local_head: Cell::new(0),
129 slots,
130 mask,
131 shift,
132 shared,
133 },
134 )
135}
136
137struct Slot<T> {
139 turn: AtomicUsize,
143 data: UnsafeCell<MaybeUninit<T>>,
145}
146
147#[repr(C)]
150struct Shared<T> {
151 tail: CachePadded<AtomicUsize>,
153 head: CachePadded<AtomicUsize>,
155 slots: *mut Slot<T>,
157 capacity: usize,
159 shift: u32,
161 mask: usize,
163}
164
165unsafe impl<T: Send> Send for Shared<T> {}
168unsafe impl<T: Send> Sync for Shared<T> {}
169
170impl<T> Drop for Shared<T> {
171 fn drop(&mut self) {
172 let head = self.head.load(Ordering::Relaxed);
173 let tail = self.tail.load(Ordering::Relaxed);
174
175 let mut i = head;
177 while i != tail {
178 let slot = unsafe { &*self.slots.add(i & self.mask) };
179 let turn = i >> self.shift;
180
181 if slot.turn.load(Ordering::Relaxed) == turn * 2 + 1 {
183 unsafe { (*slot.data.get()).assume_init_drop() };
185 }
186 i = i.wrapping_add(1);
187 }
188
189 unsafe {
191 let _ = Box::from_raw(std::ptr::slice_from_raw_parts_mut(
192 self.slots,
193 self.capacity,
194 ));
195 }
196 }
197}
198
199#[repr(C)]
205pub struct Producer<T> {
206 cached_head: Cell<usize>,
208 slots: *mut Slot<T>,
210 mask: usize,
212 capacity: usize,
214 shift: u32,
216 shared: Arc<Shared<T>>,
217}
218
219impl<T> Clone for Producer<T> {
220 fn clone(&self) -> Self {
221 Producer {
222 cached_head: Cell::new(self.shared.head.load(Ordering::Relaxed)),
224 slots: self.slots,
225 mask: self.mask,
226 capacity: self.capacity,
227 shift: self.shift,
228 shared: Arc::clone(&self.shared),
229 }
230 }
231}
232
233unsafe impl<T: Send> Send for Producer<T> {}
236
237impl<T> Producer<T> {
238 #[inline]
246 #[must_use = "push returns Err if full, which should be handled"]
247 pub fn push(&self, value: T) -> Result<(), Full<T>> {
248 let mut spin_count = 0u32;
249
250 loop {
251 let tail = self.shared.tail.load(Ordering::Relaxed);
252
253 if tail.wrapping_sub(self.cached_head.get()) >= self.capacity {
255 self.cached_head
257 .set(self.shared.head.load(Ordering::Acquire));
258
259 if tail.wrapping_sub(self.cached_head.get()) >= self.capacity {
261 return Err(Full(value));
262 }
263 }
264
265 let slot = unsafe { &*self.slots.add(tail & self.mask) };
267 let turn = tail >> self.shift;
268 let expected_stamp = turn * 2;
269
270 let stamp = slot.turn.load(Ordering::Acquire);
272
273 if stamp == expected_stamp {
274 if self
276 .shared
277 .tail
278 .compare_exchange_weak(
279 tail,
280 tail.wrapping_add(1),
281 Ordering::Relaxed,
282 Ordering::Relaxed,
283 )
284 .is_ok()
285 {
286 unsafe { (*slot.data.get()).write(value) };
288
289 slot.turn.store(turn * 2 + 1, Ordering::Release);
291
292 return Ok(());
293 }
294 }
295
296 let spins = 1 << spin_count.min(6);
299 for _ in 0..spins {
300 std::hint::spin_loop();
301 }
302 spin_count += 1;
303
304 if spin_count >= 5 && self.is_disconnected() {
308 return Err(Full(value));
309 }
310 }
311 }
312
313 #[inline]
315 pub fn capacity(&self) -> usize {
316 1 << self.shift
317 }
318
319 #[inline]
324 pub fn is_disconnected(&self) -> bool {
325 Arc::strong_count(&self.shared) == 1
326 }
327}
328
329impl<T> fmt::Debug for Producer<T> {
330 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
331 f.debug_struct("Producer")
332 .field("capacity", &self.capacity())
333 .finish_non_exhaustive()
334 }
335}
336
337#[repr(C)]
342pub struct Consumer<T> {
343 local_head: Cell<usize>,
345 slots: *mut Slot<T>,
347 mask: usize,
349 shift: u32,
351 shared: Arc<Shared<T>>,
352}
353
354unsafe impl<T: Send> Send for Consumer<T> {}
357
358impl<T> Consumer<T> {
359 #[inline]
363 pub fn pop(&self) -> Option<T> {
364 let head = self.local_head.get();
365 let slot = unsafe { &*self.slots.add(head & self.mask) };
367 let turn = head >> self.shift;
368
369 if slot.turn.load(Ordering::Acquire) != turn * 2 + 1 {
371 return None;
372 }
373
374 let value = unsafe { (*slot.data.get()).assume_init_read() };
376
377 slot.turn.store((turn + 1) * 2, Ordering::Release);
379
380 let new_head = head.wrapping_add(1);
382 self.local_head.set(new_head);
383 self.shared.head.store(new_head, Ordering::Release);
384
385 Some(value)
386 }
387
388 #[inline]
390 pub fn capacity(&self) -> usize {
391 1 << self.shift
392 }
393
394 #[inline]
396 pub fn is_disconnected(&self) -> bool {
397 Arc::strong_count(&self.shared) == 1
398 }
399}
400
401impl<T> fmt::Debug for Consumer<T> {
402 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403 f.debug_struct("Consumer")
404 .field("capacity", &self.capacity())
405 .finish_non_exhaustive()
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412
413 #[test]
418 fn basic_push_pop() {
419 let (tx, rx) = ring_buffer::<u64>(4);
420
421 assert!(tx.push(1).is_ok());
422 assert!(tx.push(2).is_ok());
423 assert!(tx.push(3).is_ok());
424
425 assert_eq!(rx.pop(), Some(1));
426 assert_eq!(rx.pop(), Some(2));
427 assert_eq!(rx.pop(), Some(3));
428 assert_eq!(rx.pop(), None);
429 }
430
431 #[test]
432 fn empty_pop_returns_none() {
433 let (_, rx) = ring_buffer::<u64>(4);
434 assert_eq!(rx.pop(), None);
435 assert_eq!(rx.pop(), None);
436 }
437
438 #[test]
439 fn fill_then_drain() {
440 let (tx, rx) = ring_buffer::<u64>(4);
441
442 for i in 0..4 {
443 assert!(tx.push(i).is_ok());
444 }
445
446 for i in 0..4 {
447 assert_eq!(rx.pop(), Some(i));
448 }
449
450 assert_eq!(rx.pop(), None);
451 }
452
453 #[test]
454 fn push_returns_error_when_full() {
455 let (tx, _rx) = ring_buffer::<u64>(4);
456
457 assert!(tx.push(1).is_ok());
458 assert!(tx.push(2).is_ok());
459 assert!(tx.push(3).is_ok());
460 assert!(tx.push(4).is_ok());
461
462 let err = tx.push(5).unwrap_err();
463 assert_eq!(err.into_inner(), 5);
464 }
465
466 #[test]
471 fn interleaved_single_producer() {
472 let (tx, rx) = ring_buffer::<u64>(8);
473
474 for i in 0..1000 {
475 assert!(tx.push(i).is_ok());
476 assert_eq!(rx.pop(), Some(i));
477 }
478 }
479
480 #[test]
481 fn partial_fill_drain_cycles() {
482 let (tx, rx) = ring_buffer::<u64>(8);
483
484 for round in 0..100 {
485 for i in 0..4 {
486 assert!(tx.push(round * 4 + i).is_ok());
487 }
488
489 for i in 0..4 {
490 assert_eq!(rx.pop(), Some(round * 4 + i));
491 }
492 }
493 }
494
495 #[test]
500 fn two_producers_single_consumer() {
501 use std::thread;
502
503 let (tx, rx) = ring_buffer::<u64>(64);
504 let tx2 = tx.clone();
505
506 let h1 = thread::spawn(move || {
507 for i in 0..1000 {
508 while tx.push(i).is_err() {
509 std::hint::spin_loop();
510 }
511 }
512 });
513
514 let h2 = thread::spawn(move || {
515 for i in 1000..2000 {
516 while tx2.push(i).is_err() {
517 std::hint::spin_loop();
518 }
519 }
520 });
521
522 let mut received = Vec::new();
523 while received.len() < 2000 {
524 if let Some(val) = rx.pop() {
525 received.push(val);
526 } else {
527 std::hint::spin_loop();
528 }
529 }
530
531 h1.join().unwrap();
532 h2.join().unwrap();
533
534 received.sort_unstable();
536 assert_eq!(received, (0..2000).collect::<Vec<_>>());
537 }
538
539 #[test]
540 fn four_producers_single_consumer() {
541 use std::thread;
542
543 let (tx, rx) = ring_buffer::<u64>(256);
544
545 let handles: Vec<_> = (0..4)
546 .map(|p| {
547 let tx = tx.clone();
548 thread::spawn(move || {
549 for i in 0..1000 {
550 let val = p * 1000 + i;
551 while tx.push(val).is_err() {
552 std::hint::spin_loop();
553 }
554 }
555 })
556 })
557 .collect();
558
559 drop(tx); let mut received = Vec::new();
562 while received.len() < 4000 {
563 if let Some(val) = rx.pop() {
564 received.push(val);
565 } else if rx.is_disconnected() && received.len() < 4000 {
566 std::hint::spin_loop();
568 } else {
569 std::hint::spin_loop();
570 }
571 }
572
573 for h in handles {
574 h.join().unwrap();
575 }
576
577 received.sort_unstable();
578 let expected: Vec<u64> = (0..4)
579 .flat_map(|p| (0..1000).map(move |i| p * 1000 + i))
580 .collect();
581 let mut expected_sorted = expected;
582 expected_sorted.sort_unstable();
583 assert_eq!(received, expected_sorted);
584 }
585
586 #[test]
591 fn single_slot_bounded() {
592 let (tx, rx) = ring_buffer::<u64>(1);
593
594 assert!(tx.push(1).is_ok());
595 assert!(tx.push(2).is_err());
596
597 assert_eq!(rx.pop(), Some(1));
598 assert!(tx.push(2).is_ok());
599 }
600
601 #[test]
606 fn producer_disconnected() {
607 let (tx, rx) = ring_buffer::<u64>(4);
608
609 assert!(!rx.is_disconnected());
610 drop(tx);
611 assert!(rx.is_disconnected());
612 }
613
614 #[test]
615 fn consumer_disconnected() {
616 let (tx, rx) = ring_buffer::<u64>(4);
617
618 assert!(!tx.is_disconnected());
619 drop(rx);
620 assert!(tx.is_disconnected());
621 }
622
623 #[test]
624 fn multiple_producers_one_disconnects() {
625 let (tx1, rx) = ring_buffer::<u64>(4);
626 let tx2 = tx1.clone();
627
628 assert!(!rx.is_disconnected());
629 drop(tx1);
630 assert!(!rx.is_disconnected()); drop(tx2);
632 assert!(rx.is_disconnected());
633 }
634
635 #[test]
640 fn drop_cleans_up_remaining() {
641 use std::sync::atomic::AtomicUsize;
642
643 static DROP_COUNT: AtomicUsize = AtomicUsize::new(0);
644
645 struct DropCounter;
646 impl Drop for DropCounter {
647 fn drop(&mut self) {
648 DROP_COUNT.fetch_add(1, Ordering::SeqCst);
649 }
650 }
651
652 DROP_COUNT.store(0, Ordering::SeqCst);
653
654 let (tx, rx) = ring_buffer::<DropCounter>(4);
655
656 let _ = tx.push(DropCounter);
657 let _ = tx.push(DropCounter);
658 let _ = tx.push(DropCounter);
659
660 assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 0);
661
662 drop(tx);
663 drop(rx);
664
665 assert_eq!(DROP_COUNT.load(Ordering::SeqCst), 3);
666 }
667
668 #[test]
673 fn zero_sized_type() {
674 let (tx, rx) = ring_buffer::<()>(8);
675
676 let _ = tx.push(());
677 let _ = tx.push(());
678
679 assert_eq!(rx.pop(), Some(()));
680 assert_eq!(rx.pop(), Some(()));
681 assert_eq!(rx.pop(), None);
682 }
683
684 #[test]
685 fn string_type() {
686 let (tx, rx) = ring_buffer::<String>(4);
687
688 let _ = tx.push("hello".to_string());
689 let _ = tx.push("world".to_string());
690
691 assert_eq!(rx.pop(), Some("hello".to_string()));
692 assert_eq!(rx.pop(), Some("world".to_string()));
693 }
694
695 #[test]
696 #[should_panic(expected = "capacity must be non-zero")]
697 fn zero_capacity_panics() {
698 let _ = ring_buffer::<u64>(0);
699 }
700
701 #[test]
702 fn large_message_type() {
703 #[repr(C, align(64))]
704 struct LargeMessage {
705 data: [u8; 256],
706 }
707
708 let (tx, rx) = ring_buffer::<LargeMessage>(8);
709
710 let msg = LargeMessage { data: [42u8; 256] };
711 assert!(tx.push(msg).is_ok());
712
713 let received = rx.pop().unwrap();
714 assert_eq!(received.data[0], 42);
715 assert_eq!(received.data[255], 42);
716 }
717
718 #[test]
719 fn multiple_laps() {
720 let (tx, rx) = ring_buffer::<u64>(4);
721
722 for i in 0..40 {
724 assert!(tx.push(i).is_ok());
725 assert_eq!(rx.pop(), Some(i));
726 }
727 }
728
729 #[test]
730 fn capacity_rounds_to_power_of_two() {
731 let (tx, _) = ring_buffer::<u64>(100);
732 assert_eq!(tx.capacity(), 128);
733
734 let (tx, _) = ring_buffer::<u64>(1000);
735 assert_eq!(tx.capacity(), 1024);
736 }
737
738 #[test]
743 fn stress_single_producer() {
744 use std::thread;
745
746 const COUNT: u64 = 100_000;
747
748 let (tx, rx) = ring_buffer::<u64>(1024);
749
750 let producer = thread::spawn(move || {
751 for i in 0..COUNT {
752 while tx.push(i).is_err() {
753 std::hint::spin_loop();
754 }
755 }
756 });
757
758 let consumer = thread::spawn(move || {
759 let mut sum = 0u64;
760 let mut received = 0u64;
761 while received < COUNT {
762 if let Some(val) = rx.pop() {
763 sum = sum.wrapping_add(val);
764 received += 1;
765 } else {
766 std::hint::spin_loop();
767 }
768 }
769 sum
770 });
771
772 producer.join().unwrap();
773 let sum = consumer.join().unwrap();
774 assert_eq!(sum, COUNT * (COUNT - 1) / 2);
775 }
776
777 #[test]
778 fn stress_multiple_producers() {
779 use std::thread;
780
781 const PRODUCERS: u64 = 4;
782 const PER_PRODUCER: u64 = 25_000;
783 const TOTAL: u64 = PRODUCERS * PER_PRODUCER;
784
785 let (tx, rx) = ring_buffer::<u64>(1024);
786
787 let handles: Vec<_> = (0..PRODUCERS)
788 .map(|_| {
789 let tx = tx.clone();
790 thread::spawn(move || {
791 for i in 0..PER_PRODUCER {
792 while tx.push(i).is_err() {
793 std::hint::spin_loop();
794 }
795 }
796 })
797 })
798 .collect();
799
800 drop(tx);
801
802 let mut received = 0u64;
803 while received < TOTAL {
804 if rx.pop().is_some() {
805 received += 1;
806 } else {
807 std::hint::spin_loop();
808 }
809 }
810
811 for h in handles {
812 h.join().unwrap();
813 }
814
815 assert_eq!(received, TOTAL);
816 }
817}