1use super::core::RingBufCore;
2use crate::shim::atomic::Ordering;
3use crate::shim::sync::Arc;
4use std::fmt;
5use std::num::NonZero;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum PushError<T> {
12 Full(T),
16}
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum PopError {
23 Empty,
27}
28
29pub struct SharedData<T, const N: usize> {
41 core: RingBufCore<T, N>,
45}
46
47pub struct Producer<T, const N: usize> {
59 shared: Arc<SharedData<T, N>>,
63
64 cached_read: usize,
68}
69
70pub struct Consumer<T, const N: usize> {
82 shared: Arc<SharedData<T, N>>,
86
87 cached_write: usize,
91}
92
93pub struct Drain<'a, T, const N: usize> {
109 consumer: &'a mut Consumer<T, N>,
110}
111
112impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
113 type Item = T;
114
115 #[inline]
116 fn next(&mut self) -> Option<Self::Item> {
117 self.consumer.pop().ok()
118 }
119
120 #[inline]
121 fn size_hint(&self) -> (usize, Option<usize>) {
122 let len = self.consumer.slots();
123 (len, Some(len))
124 }
125}
126
127impl<T, const N: usize> SharedData<T, N> {
128 #[inline]
132 pub fn capacity(&self) -> usize {
133 self.core.capacity()
134 }
135}
136
137pub fn new<T, const N: usize>(capacity: NonZero<usize>) -> (Producer<T, N>, Consumer<T, N>) {
161 let core = RingBufCore::new(capacity.get());
162
163 let shared = Arc::new(SharedData { core });
164
165 let producer = Producer {
166 shared: shared.clone(),
167 cached_read: 0,
168 };
169
170 let consumer = Consumer {
171 shared,
172 cached_write: 0,
173 };
174
175 (producer, consumer)
176}
177
178impl<T: fmt::Debug, const N: usize> fmt::Debug for Producer<T, N> {
179 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180 f.debug_struct("Producer")
181 .field("capacity", &self.shared.core.capacity())
182 .field("slots", &self.slots())
183 .field("free_slots", &self.free_slots())
184 .field("is_empty", &self.is_empty())
185 .field("is_full", &self.is_full())
186 .finish()
187 }
188}
189
190impl<T, const N: usize> Producer<T, N> {
191 #[inline]
195 pub fn capacity(&self) -> usize {
196 self.shared.core.capacity()
197 }
198
199 #[inline]
203 pub fn slots(&self) -> usize {
204 let write = self.shared.core.write_idx().load(Ordering::Relaxed);
205 let read = self.shared.core.read_idx().load(Ordering::Acquire);
206 write.wrapping_sub(read)
207 }
208
209 #[inline]
213 pub fn len(&self) -> usize {
214 self.slots()
215 }
216
217 #[inline]
221 pub fn is_empty(&self) -> bool {
222 let write = self.shared.core.write_idx().load(Ordering::Relaxed);
223 let read = self.shared.core.read_idx().load(Ordering::Acquire);
224 write == read
225 }
226
227 #[inline]
231 pub fn free_slots(&self) -> usize {
232 self.shared.core.capacity() - self.slots()
233 }
234
235 #[inline]
239 pub fn is_full(&self) -> bool {
240 let write = self.shared.core.write_idx().load(Ordering::Relaxed);
241 let read = self.shared.core.read_idx().load(Ordering::Acquire);
242 write.wrapping_sub(read) >= self.shared.core.capacity()
243 }
244
245 #[inline]
255 pub fn push(&mut self, value: T) -> Result<(), PushError<T>> {
256 let write = self.shared.core.write_idx().load(Ordering::Relaxed);
257 let mut read = self.cached_read;
258
259 if write.wrapping_sub(read) >= self.shared.core.capacity() {
262 read = self.shared.core.read_idx().load(Ordering::Acquire);
265 self.cached_read = read;
266
267 if write.wrapping_sub(read) >= self.shared.core.capacity() {
268 return Err(PushError::Full(value));
269 }
270 }
271
272 let index = write & self.shared.core.mask();
275 unsafe {
276 self.shared.core.write_at(index, value);
277 }
278
279 self.shared
282 .core
283 .write_idx()
284 .store(write.wrapping_add(1), Ordering::Release);
285
286 Ok(())
287 }
288}
289
290impl<T: Copy, const N: usize> Producer<T, N> {
291 #[inline]
320 pub fn push_slice(&mut self, values: &[T]) -> usize {
321 if values.is_empty() {
322 return 0;
323 }
324
325 let write = self.shared.core.write_idx().load(Ordering::Relaxed);
326 let mut read = self.cached_read;
327
328 let mut available = self
331 .shared
332 .core
333 .capacity()
334 .saturating_sub(write.wrapping_sub(read));
335
336 if available == 0 {
337 read = self.shared.core.read_idx().load(Ordering::Acquire);
340 self.cached_read = read;
341 available = self
342 .shared
343 .core
344 .capacity()
345 .saturating_sub(write.wrapping_sub(read));
346
347 if available == 0 {
348 return 0;
349 }
350 }
351
352 let to_push = available.min(values.len());
355
356 unsafe {
359 self.shared.core.copy_from_slice(write, values, to_push);
360 }
361
362 self.shared
365 .core
366 .write_idx()
367 .store(write.wrapping_add(to_push), Ordering::Release);
368
369 to_push
370 }
371}
372
373impl<T: fmt::Debug, const N: usize> fmt::Debug for Consumer<T, N> {
374 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
375 f.debug_struct("Consumer")
376 .field("capacity", &self.shared.core.capacity())
377 .field("slots", &self.slots())
378 .field("is_empty", &self.is_empty())
379 .finish()
380 }
381}
382
383impl<T, const N: usize> Consumer<T, N> {
384 #[inline]
394 pub fn pop(&mut self) -> Result<T, PopError> {
395 let read = self.shared.core.read_idx().load(Ordering::Relaxed);
396 let mut write = self.cached_write;
397
398 if read == write {
401 write = self.shared.core.write_idx().load(Ordering::Acquire);
404 self.cached_write = write;
405
406 if read == write {
407 return Err(PopError::Empty);
408 }
409 }
410
411 let index = read & self.shared.core.mask();
414 let value = unsafe { self.shared.core.read_at(index) };
415
416 self.shared
419 .core
420 .read_idx()
421 .store(read.wrapping_add(1), Ordering::Release);
422
423 Ok(value)
424 }
425
426 #[inline]
430 pub fn is_empty(&self) -> bool {
431 let read = self.shared.core.read_idx().load(Ordering::Relaxed);
432 let write = self.shared.core.write_idx().load(Ordering::Acquire);
433 read == write
434 }
435
436 #[inline]
440 pub fn slots(&self) -> usize {
441 let read = self.shared.core.read_idx().load(Ordering::Relaxed);
442 let write = self.shared.core.write_idx().load(Ordering::Acquire);
443 write.wrapping_sub(read)
444 }
445
446 #[inline]
450 pub fn len(&self) -> usize {
451 self.slots()
452 }
453
454 #[inline]
458 pub fn capacity(&self) -> usize {
459 self.shared.core.capacity()
460 }
461
462 #[inline]
479 pub fn peek(&self) -> Option<&T> {
480 let read = self.shared.core.read_idx().load(Ordering::Relaxed);
481 let write = self.shared.core.write_idx().load(Ordering::Acquire);
482
483 if read == write {
484 return None;
485 }
486
487 let index = read & self.shared.core.mask();
488 unsafe { Some(self.shared.core.peek_at(index)) }
489 }
490
491 pub fn clear(&mut self) {
499 while self.pop().is_ok() {
500 }
503 }
504
505 #[inline]
531 pub fn drain(&mut self) -> Drain<'_, T, N> {
532 Drain { consumer: self }
533 }
534
535 #[inline]
539 pub fn buffer(&self) -> &SharedData<T, N> {
540 &self.shared
541 }
542}
543
544impl<T: Copy, const N: usize> Consumer<T, N> {
545 #[inline]
574 pub fn pop_slice(&mut self, dest: &mut [T]) -> usize {
575 if dest.is_empty() {
576 return 0;
577 }
578
579 let read = self.shared.core.read_idx().load(Ordering::Relaxed);
580 let mut write = self.cached_write;
581
582 let mut available = write.wrapping_sub(read);
585
586 if available == 0 {
587 write = self.shared.core.write_idx().load(Ordering::Acquire);
590 self.cached_write = write;
591 available = write.wrapping_sub(read);
592
593 if available == 0 {
594 return 0;
595 }
596 }
597
598 let to_pop = available.min(dest.len());
601
602 unsafe {
605 self.shared.core.copy_to_slice(read, dest, to_pop);
606 }
607
608 self.shared
611 .core
612 .read_idx()
613 .store(read.wrapping_add(to_pop), Ordering::Release);
614
615 to_pop
616 }
617}
618
619impl<T, const N: usize> Drop for Consumer<T, N> {
620 fn drop(&mut self) {
621 while self.pop().is_ok() {
624 }
627 }
628}
629
630#[cfg(all(test, not(feature = "loom")))]
631mod tests {
632 use super::*;
633
634 #[test]
635 fn test_basic_push_pop() {
636 let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
637
638 assert!(producer.push(1).is_ok());
639 assert!(producer.push(2).is_ok());
640 assert!(producer.push(3).is_ok());
641
642 assert_eq!(consumer.pop().unwrap(), 1);
643 assert_eq!(consumer.pop().unwrap(), 2);
644 assert_eq!(consumer.pop().unwrap(), 3);
645 assert!(consumer.pop().is_err());
646 }
647
648 #[test]
649 fn test_capacity_rounding() {
650 let (_, consumer) = new::<i32, 32>(NonZero::new(5).unwrap());
651 assert_eq!(consumer.buffer().capacity(), 8);
653
654 let (_, consumer) = new::<i32, 64>(NonZero::new(32).unwrap());
655 assert_eq!(consumer.buffer().capacity(), 32);
656
657 let (_, consumer) = new::<i32, 128>(NonZero::new(33).unwrap());
658 assert_eq!(consumer.buffer().capacity(), 64);
660 }
661
662 #[test]
663 fn test_buffer_full() {
664 let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
665 assert!(producer.push(1).is_ok());
668 assert!(producer.push(2).is_ok());
669 assert!(producer.push(3).is_ok());
670 assert!(producer.push(4).is_ok());
671
672 assert!(matches!(producer.push(5), Err(PushError::Full(5))));
674
675 assert_eq!(consumer.pop().unwrap(), 1);
677
678 assert!(producer.push(5).is_ok());
680 }
681
682 #[test]
683 fn test_buffer_empty() {
684 let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
685
686 assert!(consumer.pop().is_err());
687 assert!(consumer.is_empty());
688
689 producer.push(42).unwrap();
690 assert!(!consumer.is_empty());
691
692 consumer.pop().unwrap();
693 assert!(consumer.is_empty());
694 }
695
696 #[test]
697 fn test_slots() {
698 let (mut producer, consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
699
700 assert_eq!(consumer.slots(), 0);
701
702 producer.push(1).unwrap();
703 producer.push(2).unwrap();
704 producer.push(3).unwrap();
705
706 assert_eq!(consumer.slots(), 3);
707 }
708
709 #[test]
710 fn test_wrap_around() {
711 let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
712
713 for round in 0..10 {
715 for i in 0..4 {
716 producer.push(round * 10 + i).unwrap();
717 }
718
719 for i in 0..4 {
720 assert_eq!(consumer.pop().unwrap(), round * 10 + i);
721 }
722 }
723 }
724
725 #[test]
726 fn test_drop_cleanup() {
727 use std::sync::Arc;
728 use std::sync::atomic::{AtomicUsize, Ordering};
729
730 #[derive(Debug)]
731 struct DropCounter {
732 counter: Arc<AtomicUsize>,
733 }
734
735 impl Drop for DropCounter {
736 fn drop(&mut self) {
737 self.counter.fetch_add(1, Ordering::SeqCst);
738 }
739 }
740
741 let counter = Arc::new(AtomicUsize::new(0));
742
743 {
744 let (mut producer, consumer) = new::<DropCounter, 32>(NonZero::new(8).unwrap());
745
746 for _ in 0..5 {
747 producer
748 .push(DropCounter {
749 counter: counter.clone(),
750 })
751 .unwrap();
752 }
753
754 drop(consumer);
756 }
757
758 assert_eq!(counter.load(Ordering::SeqCst), 5);
760 }
761
762 #[test]
763 fn test_concurrent_access() {
764 use std::thread;
765
766 let (mut producer, mut consumer) = new::<u64, 128>(NonZero::new(128).unwrap());
767
768 let producer_handle = thread::spawn(move || {
769 for i in 0..1000 {
770 loop {
771 if producer.push(i).is_ok() {
772 break;
773 }
774 thread::yield_now();
775 }
776 }
777 });
778
779 let consumer_handle = thread::spawn(move || {
780 let mut received = Vec::new();
781 for _ in 0..1000 {
782 loop {
783 match consumer.pop() {
784 Ok(val) => {
785 received.push(val);
786 break;
787 }
788 Err(_) => thread::yield_now(),
789 }
790 }
791 }
792 received
793 });
794
795 producer_handle.join().unwrap();
796 let received = consumer_handle.join().unwrap();
797
798 assert_eq!(received.len(), 1000);
800 for (i, &val) in received.iter().enumerate() {
801 assert_eq!(val, i as u64);
802 }
803 }
804
805 #[test]
806 fn test_small_capacity_stack_allocation() {
807 let (mut producer, mut consumer) = new::<u8, 32>(NonZero::new(16).unwrap());
810
811 for i in 0..10 {
812 producer.push(i).unwrap();
813 }
814
815 for i in 0..10 {
816 assert_eq!(consumer.pop().unwrap(), i);
817 }
818 }
819
820 #[test]
821 fn test_large_capacity_heap_allocation() {
822 let (mut producer, mut consumer) = new::<u8, 32>(NonZero::new(64).unwrap());
824
825 for i in 0..50 {
826 producer.push(i).unwrap();
827 }
828
829 for i in 0..50 {
830 assert_eq!(consumer.pop().unwrap(), i);
831 }
832 }
833
834 #[test]
837 fn test_capacity_one() {
838 let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(1).unwrap());
841
842 assert_eq!(consumer.buffer().capacity(), 1);
845
846 assert!(producer.push(42).is_ok());
847 assert!(matches!(producer.push(99), Err(PushError::Full(99))));
848
849 assert_eq!(consumer.pop().unwrap(), 42);
850 assert!(consumer.pop().is_err());
851 }
852
853 #[test]
854 fn test_power_of_two_capacities() {
855 for power in 0..10 {
858 let capacity = 1 << power; let (_, consumer) = new::<u8, 128>(NonZero::new(capacity).unwrap());
860 assert_eq!(consumer.buffer().capacity(), capacity);
861 }
862 }
863
864 #[test]
865 fn test_non_power_of_two_rounding() {
866 let test_cases = vec![
869 (3, 4),
870 (5, 8),
871 (7, 8),
872 (9, 16),
873 (15, 16),
874 (17, 32),
875 (31, 32),
876 (33, 64),
877 (100, 128),
878 (1000, 1024),
879 ];
880
881 for (input, expected) in test_cases {
882 let (_, consumer) = new::<u8, 128>(NonZero::new(input).unwrap());
883 assert_eq!(
884 consumer.buffer().capacity(),
885 expected,
886 "Capacity {} should round up to {}",
887 input,
888 expected
889 );
890 }
891 }
892
893 #[test]
894 fn test_single_element_operations() {
895 let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
898
899 for i in 0..100 {
900 producer.push(i).unwrap();
901 assert_eq!(consumer.slots(), 1);
902 assert_eq!(consumer.pop().unwrap(), i);
903 assert_eq!(consumer.slots(), 0);
904 assert!(consumer.is_empty());
905 }
906 }
907
908 #[test]
909 fn test_alternating_push_pop() {
910 let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
913
914 for i in 0..50 {
915 producer.push(i * 2).unwrap();
916 producer.push(i * 2 + 1).unwrap();
917 assert_eq!(consumer.pop().unwrap(), i * 2);
918 assert_eq!(consumer.pop().unwrap(), i * 2 + 1);
919 }
920 }
921
922 #[test]
923 fn test_index_wrapping() {
924 let (mut producer, mut consumer) = new::<usize, 32>(NonZero::new(4).unwrap());
927
928 for iteration in 0..1000 {
931 for i in 0..4 {
932 producer.push(iteration * 4 + i).unwrap();
933 }
934 for i in 0..4 {
935 assert_eq!(consumer.pop().unwrap(), iteration * 4 + i);
936 }
937 }
938 }
939
940 #[test]
943 fn test_various_stack_thresholds() {
944 let (mut p1, mut c1) = new::<u32, 16>(NonZero::new(8).unwrap());
946 p1.push(1).unwrap();
947 assert_eq!(c1.pop().unwrap(), 1);
948
949 let (mut p2, mut c2) = new::<u32, 64>(NonZero::new(32).unwrap());
951 p2.push(2).unwrap();
952 assert_eq!(c2.pop().unwrap(), 2);
953
954 let (mut p3, mut c3) = new::<u32, 128>(NonZero::new(64).unwrap());
956 p3.push(3).unwrap();
957 assert_eq!(c3.pop().unwrap(), 3);
958
959 let (mut p4, mut c4) = new::<u32, 256>(NonZero::new(128).unwrap());
961 p4.push(4).unwrap();
962 assert_eq!(c4.pop().unwrap(), 4);
963 }
964
965 #[test]
966 fn test_small_n_with_large_capacity() {
967 let (mut producer, mut consumer) = new::<u64, 8>(NonZero::new(32).unwrap());
970
971 for i in 0..20 {
972 producer.push(i).unwrap();
973 }
974
975 for i in 0..20 {
976 assert_eq!(consumer.pop().unwrap(), i);
977 }
978 }
979
980 #[test]
981 fn test_large_n_with_small_capacity() {
982 let (mut producer, mut consumer) = new::<u64, 256>(NonZero::new(16).unwrap());
985
986 for i in 0..10 {
987 producer.push(i).unwrap();
988 }
989
990 for i in 0..10 {
991 assert_eq!(consumer.pop().unwrap(), i);
992 }
993 }
994
995 #[test]
998 fn test_zero_sized_types() {
999 let (mut producer, mut consumer) = new::<(), 32>(NonZero::new(4).unwrap());
1002
1003 producer.push(()).unwrap();
1004 producer.push(()).unwrap();
1005
1006 assert_eq!(consumer.pop().unwrap(), ());
1007 assert_eq!(consumer.pop().unwrap(), ());
1008 }
1009
1010 #[test]
1011 fn test_large_types() {
1012 #[derive(Debug, PartialEq, Clone)]
1015 struct LargeStruct {
1016 data: [u64; 32],
1017 }
1018
1019 let (mut producer, mut consumer) = new::<LargeStruct, 32>(NonZero::new(4).unwrap());
1020
1021 let item1 = LargeStruct { data: [1; 32] };
1022 let item2 = LargeStruct { data: [2; 32] };
1023
1024 producer.push(item1.clone()).unwrap();
1025 producer.push(item2.clone()).unwrap();
1026
1027 assert_eq!(consumer.pop().unwrap(), item1);
1028 assert_eq!(consumer.pop().unwrap(), item2);
1029 }
1030
1031 #[test]
1032 fn test_string_type() {
1033 let (mut producer, mut consumer) = new::<String, 32>(NonZero::new(8).unwrap());
1036
1037 let messages = vec!["Hello", "World", "Rust", "Ring", "Buffer"];
1038
1039 for msg in &messages {
1040 producer.push(msg.to_string()).unwrap();
1041 }
1042
1043 for msg in &messages {
1044 assert_eq!(consumer.pop().unwrap(), msg.to_string());
1045 }
1046 }
1047
1048 #[test]
1049 fn test_option_type() {
1050 let (mut producer, mut consumer) = new::<Option<i32>, 32>(NonZero::new(4).unwrap());
1053
1054 producer.push(Some(42)).unwrap();
1055 producer.push(None).unwrap();
1056 producer.push(Some(100)).unwrap();
1057
1058 assert_eq!(consumer.pop().unwrap(), Some(42));
1059 assert_eq!(consumer.pop().unwrap(), None);
1060 assert_eq!(consumer.pop().unwrap(), Some(100));
1061 }
1062
1063 #[test]
1066 fn test_concurrent_small_buffer() {
1067 use std::thread;
1070
1071 let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(4).unwrap());
1072
1073 let count = 100;
1074
1075 let producer_handle = thread::spawn(move || {
1076 for i in 0..count {
1077 loop {
1078 if producer.push(i).is_ok() {
1079 break;
1080 }
1081 thread::yield_now();
1082 }
1083 }
1084 });
1085
1086 let consumer_handle = thread::spawn(move || {
1087 let mut sum = 0;
1088 for _ in 0..count {
1089 loop {
1090 match consumer.pop() {
1091 Ok(val) => {
1092 sum += val;
1093 break;
1094 }
1095 Err(_) => thread::yield_now(),
1096 }
1097 }
1098 }
1099 sum
1100 });
1101
1102 producer_handle.join().unwrap();
1103 let sum = consumer_handle.join().unwrap();
1104
1105 assert_eq!(sum, (count * (count - 1)) / 2);
1107 }
1108
1109 #[test]
1110 fn test_concurrent_large_buffer() {
1111 use std::thread;
1114
1115 let (mut producer, mut consumer) = new::<u64, 512>(NonZero::new(512).unwrap());
1116
1117 let count = 10000;
1118
1119 let producer_handle = thread::spawn(move || {
1120 for i in 0..count {
1121 loop {
1122 if producer.push(i).is_ok() {
1123 break;
1124 }
1125 thread::yield_now();
1126 }
1127 }
1128 });
1129
1130 let consumer_handle = thread::spawn(move || {
1131 let mut received = 0;
1132 for _ in 0..count {
1133 loop {
1134 match consumer.pop() {
1135 Ok(_) => {
1136 received += 1;
1137 break;
1138 }
1139 Err(_) => thread::yield_now(),
1140 }
1141 }
1142 }
1143 received
1144 });
1145
1146 producer_handle.join().unwrap();
1147 let received = consumer_handle.join().unwrap();
1148
1149 assert_eq!(received, count);
1150 }
1151
1152 #[test]
1153 fn test_concurrent_with_different_speeds() {
1154 use std::thread;
1157 use std::time::Duration;
1158
1159 let (mut producer, mut consumer) = new::<u32, 64>(NonZero::new(32).unwrap());
1160
1161 let producer_handle = thread::spawn(move || {
1162 for i in 0..50 {
1163 loop {
1164 if producer.push(i).is_ok() {
1165 break;
1166 }
1167 thread::yield_now();
1168 }
1169 if i % 10 == 0 {
1172 thread::sleep(Duration::from_micros(1));
1173 }
1174 }
1175 });
1176
1177 let consumer_handle = thread::spawn(move || {
1178 let mut received = Vec::new();
1179 for _ in 0..50 {
1180 loop {
1181 match consumer.pop() {
1182 Ok(val) => {
1183 received.push(val);
1184 break;
1185 }
1186 Err(_) => thread::yield_now(),
1187 }
1188 }
1189 }
1190 received
1191 });
1192
1193 producer_handle.join().unwrap();
1194 let received = consumer_handle.join().unwrap();
1195
1196 assert_eq!(received.len(), 50);
1197 for (i, &val) in received.iter().enumerate() {
1198 assert_eq!(val, i as u32);
1199 }
1200 }
1201
1202 #[test]
1205 fn test_push_error_value_returned() {
1206 let (mut producer, _consumer) = new::<String, 32>(NonZero::new(2).unwrap());
1209
1210 producer.push("first".to_string()).unwrap();
1211 producer.push("second".to_string()).unwrap();
1212
1213 let value = "third".to_string();
1214 match producer.push(value.clone()) {
1215 Err(PushError::Full(returned_value)) => {
1216 assert_eq!(returned_value, value);
1217 }
1218 Ok(_) => panic!("Expected PushError::Full"),
1219 }
1220 }
1221
1222 #[test]
1223 fn test_pop_error() {
1224 let (_producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
1227
1228 match consumer.pop() {
1229 Err(PopError::Empty) => {} Ok(_) => panic!("Expected PopError::Empty"),
1231 }
1232 }
1233
1234 #[test]
1237 fn test_is_empty_after_operations() {
1238 let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
1241
1242 assert!(consumer.is_empty());
1243
1244 producer.push(1).unwrap();
1245 assert!(!consumer.is_empty());
1246
1247 producer.push(2).unwrap();
1248 assert!(!consumer.is_empty());
1249
1250 consumer.pop().unwrap();
1251 assert!(!consumer.is_empty());
1252
1253 consumer.pop().unwrap();
1254 assert!(consumer.is_empty());
1255 }
1256
1257 #[test]
1258 fn test_slots_accuracy() {
1259 let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
1262
1263 assert_eq!(consumer.slots(), 0);
1264
1265 for i in 1..=10 {
1266 producer.push(i).unwrap();
1267 assert_eq!(consumer.slots(), i as usize);
1268 }
1269
1270 for i in (0..10).rev() {
1271 consumer.pop().unwrap();
1272 assert_eq!(consumer.slots(), i);
1273 }
1274
1275 assert_eq!(consumer.slots(), 0);
1276 }
1277
1278 #[test]
1279 fn test_slots_with_wrap_around() {
1280 let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(4).unwrap());
1283
1284 for _ in 0..100 {
1287 for i in 0..3 {
1288 producer.push(i).unwrap();
1289 }
1290 assert_eq!(consumer.slots(), 3);
1291
1292 for _ in 0..3 {
1293 consumer.pop().unwrap();
1294 }
1295 assert_eq!(consumer.slots(), 0);
1296 }
1297 }
1298
1299 #[test]
1302 fn test_partial_drop_cleanup() {
1303 use std::sync::Arc;
1306 use std::sync::atomic::{AtomicUsize, Ordering};
1307
1308 #[derive(Debug)]
1309 struct DropCounter {
1310 counter: Arc<AtomicUsize>,
1311 }
1312
1313 impl Drop for DropCounter {
1314 fn drop(&mut self) {
1315 self.counter.fetch_add(1, Ordering::SeqCst);
1316 }
1317 }
1318
1319 let counter = Arc::new(AtomicUsize::new(0));
1320
1321 {
1322 let (mut producer, mut consumer) = new::<DropCounter, 32>(NonZero::new(16).unwrap());
1323
1324 for _ in 0..10 {
1326 producer
1327 .push(DropCounter {
1328 counter: counter.clone(),
1329 })
1330 .unwrap();
1331 }
1332
1333 for _ in 0..6 {
1335 consumer.pop().unwrap();
1336 }
1337
1338 assert_eq!(counter.load(Ordering::SeqCst), 6);
1340
1341 drop(consumer);
1343 }
1344
1345 assert_eq!(counter.load(Ordering::SeqCst), 10);
1347 }
1348
1349 #[test]
1350 fn test_empty_buffer_drop() {
1351 use std::sync::Arc;
1354 use std::sync::atomic::{AtomicUsize, Ordering};
1355
1356 #[derive(Debug)]
1357 struct DropCounter {
1358 counter: Arc<AtomicUsize>,
1359 }
1360
1361 impl Drop for DropCounter {
1362 fn drop(&mut self) {
1363 self.counter.fetch_add(1, Ordering::SeqCst);
1364 }
1365 }
1366
1367 let counter = Arc::new(AtomicUsize::new(0));
1368
1369 {
1370 let (mut producer, mut consumer) = new::<DropCounter, 32>(NonZero::new(8).unwrap());
1371
1372 producer
1373 .push(DropCounter {
1374 counter: counter.clone(),
1375 })
1376 .unwrap();
1377 consumer.pop().unwrap();
1378
1379 assert!(consumer.is_empty());
1381
1382 drop(consumer);
1384 }
1385
1386 assert_eq!(counter.load(Ordering::SeqCst), 1);
1388 }
1389
1390 #[test]
1393 fn test_high_throughput() {
1394 use std::thread;
1397
1398 let (mut producer, mut consumer) = new::<u64, 256>(NonZero::new(256).unwrap());
1399 let count = 100000;
1400
1401 let producer_handle = thread::spawn(move || {
1402 for i in 0..count {
1403 loop {
1404 if producer.push(i).is_ok() {
1405 break;
1406 }
1407 thread::yield_now();
1408 }
1409 }
1410 });
1411
1412 let consumer_handle = thread::spawn(move || {
1413 let mut last = None;
1414 for _ in 0..count {
1415 loop {
1416 match consumer.pop() {
1417 Ok(val) => {
1418 if let Some(prev) = last {
1419 assert_eq!(val, prev + 1, "Values must be sequential");
1420 }
1421 last = Some(val);
1422 break;
1423 }
1424 Err(_) => thread::yield_now(),
1425 }
1426 }
1427 }
1428 last
1429 });
1430
1431 producer_handle.join().unwrap();
1432 let last = consumer_handle.join().unwrap();
1433
1434 assert_eq!(last, Some(count - 1));
1435 }
1436
1437 #[test]
1440 fn test_producer_capacity_queries() {
1441 let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
1444
1445 assert_eq!(producer.capacity(), 8);
1446 assert_eq!(producer.len(), 0);
1447 assert_eq!(producer.slots(), 0);
1448 assert_eq!(producer.free_slots(), 8);
1449 assert!(!producer.is_full());
1450
1451 producer.push(1).unwrap();
1452 producer.push(2).unwrap();
1453 producer.push(3).unwrap();
1454
1455 assert_eq!(producer.len(), 3);
1456 assert_eq!(producer.slots(), 3);
1457 assert_eq!(producer.free_slots(), 5);
1458 assert!(!producer.is_full());
1459
1460 producer.push(4).unwrap();
1462 producer.push(5).unwrap();
1463 producer.push(6).unwrap();
1464 producer.push(7).unwrap();
1465 producer.push(8).unwrap();
1466
1467 assert_eq!(producer.len(), 8);
1468 assert_eq!(producer.slots(), 8);
1469 assert_eq!(producer.free_slots(), 0);
1470 assert!(producer.is_full());
1471
1472 consumer.pop().unwrap();
1474
1475 assert_eq!(producer.len(), 7);
1476 assert_eq!(producer.free_slots(), 1);
1477 assert!(!producer.is_full());
1478 }
1479
1480 #[test]
1481 fn test_consumer_len_and_capacity() {
1482 let (mut producer, consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
1485
1486 assert_eq!(consumer.len(), 0);
1487 assert_eq!(consumer.capacity(), 16);
1488
1489 for i in 0..10 {
1490 producer.push(i).unwrap();
1491 }
1492
1493 assert_eq!(consumer.len(), 10);
1494 assert_eq!(consumer.capacity(), 16);
1495 }
1496
1497 #[test]
1498 fn test_peek() {
1499 let (mut producer, consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
1502
1503 assert!(consumer.peek().is_none());
1505
1506 producer.push(42).unwrap();
1507 producer.push(100).unwrap();
1508 producer.push(200).unwrap();
1509
1510 assert_eq!(consumer.peek(), Some(&42));
1512 assert_eq!(consumer.peek(), Some(&42)); assert_eq!(consumer.len(), 3); }
1515
1516 #[test]
1517 fn test_peek_after_pop() {
1518 let (mut producer, mut consumer) = new::<String, 32>(NonZero::new(8).unwrap());
1521
1522 producer.push("first".to_string()).unwrap();
1523 producer.push("second".to_string()).unwrap();
1524 producer.push("third".to_string()).unwrap();
1525
1526 assert_eq!(consumer.peek(), Some(&"first".to_string()));
1527 consumer.pop().unwrap();
1528
1529 assert_eq!(consumer.peek(), Some(&"second".to_string()));
1530 consumer.pop().unwrap();
1531
1532 assert_eq!(consumer.peek(), Some(&"third".to_string()));
1533 consumer.pop().unwrap();
1534
1535 assert!(consumer.peek().is_none());
1536 }
1537
1538 #[test]
1539 fn test_push_slice_basic() {
1540 let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(16).unwrap());
1543
1544 let data = [1, 2, 3, 4, 5];
1545 let pushed = producer.push_slice(&data);
1546
1547 assert_eq!(pushed, 5);
1548 assert_eq!(consumer.len(), 5);
1549
1550 for i in 0..5 {
1551 assert_eq!(consumer.pop().unwrap(), data[i]);
1552 }
1553 }
1554
1555 #[test]
1556 fn test_push_slice_partial() {
1557 let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
1560
1561 let initial = [1, 2, 3, 4, 5];
1563 producer.push_slice(&initial);
1564
1565 let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
1567 let pushed = producer.push_slice(&more);
1568
1569 assert_eq!(pushed, 3);
1570 assert_eq!(consumer.len(), 8);
1571 assert!(producer.is_full());
1572
1573 for i in 1..=8 {
1575 assert_eq!(consumer.pop().unwrap(), i);
1576 }
1577 }
1578
1579 #[test]
1580 fn test_push_slice_wrap_around() {
1581 let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
1584
1585 producer.push_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
1587
1588 for _ in 0..5 {
1590 consumer.pop().unwrap();
1591 }
1592
1593 let data = [10, 11, 12, 13, 14];
1595 let pushed = producer.push_slice(&data);
1596
1597 assert_eq!(pushed, 5);
1598
1599 assert_eq!(consumer.pop().unwrap(), 6);
1601 assert_eq!(consumer.pop().unwrap(), 7);
1602 assert_eq!(consumer.pop().unwrap(), 8);
1603 assert_eq!(consumer.pop().unwrap(), 10);
1604 assert_eq!(consumer.pop().unwrap(), 11);
1605 assert_eq!(consumer.pop().unwrap(), 12);
1606 assert_eq!(consumer.pop().unwrap(), 13);
1607 assert_eq!(consumer.pop().unwrap(), 14);
1608 }
1609
1610 #[test]
1611 fn test_push_slice_empty() {
1612 let (mut producer, _consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
1615
1616 let pushed = producer.push_slice(&[]);
1617 assert_eq!(pushed, 0);
1618 }
1619
1620 #[test]
1621 fn test_pop_slice_basic() {
1622 let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(16).unwrap());
1625
1626 for i in 0..10 {
1628 producer.push(i).unwrap();
1629 }
1630
1631 let mut dest = [0u32; 5];
1632 let popped = consumer.pop_slice(&mut dest);
1633
1634 assert_eq!(popped, 5);
1635 assert_eq!(dest, [0, 1, 2, 3, 4]);
1636 assert_eq!(consumer.len(), 5);
1637 }
1638
1639 #[test]
1640 fn test_pop_slice_partial() {
1641 let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(16).unwrap());
1644
1645 producer.push(1).unwrap();
1646 producer.push(2).unwrap();
1647 producer.push(3).unwrap();
1648
1649 let mut dest = [0u32; 10];
1650 let popped = consumer.pop_slice(&mut dest);
1651
1652 assert_eq!(popped, 3);
1653 assert_eq!(&dest[0..3], &[1, 2, 3]);
1654 assert!(consumer.is_empty());
1655 }
1656
1657 #[test]
1658 fn test_pop_slice_wrap_around() {
1659 let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
1662
1663 producer.push_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
1665
1666 let mut temp = [0u32; 5];
1668 let popped = consumer.pop_slice(&mut temp);
1669 assert_eq!(popped, 5);
1670 assert_eq!(temp, [1, 2, 3, 4, 5]);
1671
1672 let pushed = producer.push_slice(&[9, 10, 11, 12, 13]);
1674 assert_eq!(pushed, 5);
1675
1676 let mut dest1 = [0u32; 3];
1678 let popped1 = consumer.pop_slice(&mut dest1);
1679 assert_eq!(popped1, 3);
1680 assert_eq!(dest1, [6, 7, 8]);
1681
1682 let mut dest2 = [0u32; 5];
1684 let popped2 = consumer.pop_slice(&mut dest2);
1685 assert_eq!(popped2, 5);
1686 assert_eq!(dest2, [9, 10, 11, 12, 13]);
1687 }
1688
1689 #[test]
1690 fn test_pop_slice_empty() {
1691 let (_producer, mut consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
1694
1695 let mut dest = [0u32; 5];
1696 let popped = consumer.pop_slice(&mut dest);
1697
1698 assert_eq!(popped, 0);
1699 }
1700
1701 #[test]
1702 fn test_clear() {
1703 let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
1706
1707 for i in 0..10 {
1708 producer.push(i).unwrap();
1709 }
1710
1711 assert_eq!(consumer.len(), 10);
1712
1713 consumer.clear();
1714
1715 assert_eq!(consumer.len(), 0);
1716 assert!(consumer.is_empty());
1717 }
1718
1719 #[test]
1720 fn test_clear_with_drop() {
1721 use std::sync::Arc;
1724 use std::sync::atomic::{AtomicUsize, Ordering};
1725
1726 #[derive(Debug)]
1727 struct DropCounter {
1728 counter: Arc<AtomicUsize>,
1729 }
1730
1731 impl Drop for DropCounter {
1732 fn drop(&mut self) {
1733 self.counter.fetch_add(1, Ordering::SeqCst);
1734 }
1735 }
1736
1737 let counter = Arc::new(AtomicUsize::new(0));
1738
1739 {
1740 let (mut producer, mut consumer) = new::<DropCounter, 32>(NonZero::new(16).unwrap());
1741
1742 for _ in 0..8 {
1743 producer
1744 .push(DropCounter {
1745 counter: counter.clone(),
1746 })
1747 .unwrap();
1748 }
1749
1750 assert_eq!(counter.load(Ordering::SeqCst), 0);
1751
1752 consumer.clear();
1753
1754 assert_eq!(counter.load(Ordering::SeqCst), 8);
1755 }
1756 }
1757
1758 #[test]
1759 fn test_drain_iterator() {
1760 let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
1763
1764 for i in 0..10 {
1765 producer.push(i).unwrap();
1766 }
1767
1768 let collected: Vec<i32> = consumer.drain().collect();
1769
1770 assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1771 assert!(consumer.is_empty());
1772 }
1773
1774 #[test]
1775 fn test_drain_empty() {
1776 let (_producer, mut consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
1779
1780 let collected: Vec<i32> = consumer.drain().collect();
1781
1782 assert!(collected.is_empty());
1783 }
1784
1785 #[test]
1786 fn test_drain_size_hint() {
1787 let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
1790
1791 for i in 0..5 {
1792 producer.push(i).unwrap();
1793 }
1794
1795 let mut drain = consumer.drain();
1796
1797 assert_eq!(drain.size_hint(), (5, Some(5)));
1798
1799 drain.next();
1800 assert_eq!(drain.size_hint(), (4, Some(4)));
1801
1802 drain.next();
1803 assert_eq!(drain.size_hint(), (3, Some(3)));
1804 }
1805
1806 #[test]
1807 fn test_drain_partial() {
1808 let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
1811
1812 for i in 0..10 {
1813 producer.push(i).unwrap();
1814 }
1815
1816 let mut drain = consumer.drain();
1817
1818 assert_eq!(drain.next(), Some(0));
1819 assert_eq!(drain.next(), Some(1));
1820 assert_eq!(drain.next(), Some(2));
1821
1822 drop(drain); assert_eq!(consumer.len(), 7);
1826 }
1827
1828 #[test]
1829 fn test_combined_operations() {
1830 let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(16).unwrap());
1833
1834 let data = [1, 2, 3, 4, 5];
1836 producer.push_slice(&data);
1837
1838 assert_eq!(producer.len(), 5);
1839 assert_eq!(consumer.len(), 5);
1840 assert_eq!(consumer.capacity(), 16);
1841
1842 assert_eq!(consumer.peek(), Some(&1));
1844
1845 let mut dest = [0u32; 3];
1847 consumer.pop_slice(&mut dest);
1848 assert_eq!(dest, [1, 2, 3]);
1849
1850 assert_eq!(consumer.len(), 2);
1851 assert_eq!(producer.free_slots(), 14);
1852
1853 consumer.clear();
1855 assert!(consumer.is_empty());
1856 assert!(!producer.is_full());
1857 }
1858}