1use std::cell::UnsafeCell;
18use std::mem::MaybeUninit;
19use std::sync::atomic::{AtomicUsize, Ordering};
20
21#[repr(C, align(64))]
40pub struct CachePadded<T> {
41 value: T,
42}
43
44#[allow(unsafe_code)]
46unsafe impl<T: Send> Send for CachePadded<T> {}
47
48#[allow(unsafe_code)]
50unsafe impl<T: Sync> Sync for CachePadded<T> {}
51
52impl<T> CachePadded<T> {
53 #[must_use]
55 pub const fn new(value: T) -> Self {
56 Self { value }
57 }
58
59 #[must_use]
61 pub const fn get(&self) -> &T {
62 &self.value
63 }
64
65 pub fn get_mut(&mut self) -> &mut T {
67 &mut self.value
68 }
69
70 #[must_use]
72 pub fn into_inner(self) -> T {
73 self.value
74 }
75}
76
77impl<T> std::ops::Deref for CachePadded<T> {
78 type Target = T;
79
80 fn deref(&self) -> &Self::Target {
81 &self.value
82 }
83}
84
85impl<T> std::ops::DerefMut for CachePadded<T> {
86 fn deref_mut(&mut self) -> &mut Self::Target {
87 &mut self.value
88 }
89}
90
91impl<T: Default> Default for CachePadded<T> {
92 fn default() -> Self {
93 Self::new(T::default())
94 }
95}
96
97impl<T: Clone> Clone for CachePadded<T> {
98 fn clone(&self) -> Self {
99 Self::new(self.value.clone())
100 }
101}
102
103impl<T: std::fmt::Debug> std::fmt::Debug for CachePadded<T> {
104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105 f.debug_struct("CachePadded")
106 .field("value", &self.value)
107 .finish()
108 }
109}
110
111pub struct SpscQueue<T> {
136 buffer: Box<[UnsafeCell<MaybeUninit<T>>]>,
138 head: CachePadded<AtomicUsize>,
140 tail: CachePadded<AtomicUsize>,
142 capacity_mask: usize,
144}
145
146#[allow(unsafe_code)]
148unsafe impl<T: Send> Send for SpscQueue<T> {}
149
150#[allow(unsafe_code)]
153unsafe impl<T: Send> Sync for SpscQueue<T> {}
154
155impl<T> SpscQueue<T> {
156 #[must_use]
164 pub fn new(capacity: usize) -> Self {
165 assert!(capacity > 0, "capacity must be > 0");
166
167 let capacity = capacity.next_power_of_two();
169
170 let buffer: Vec<UnsafeCell<MaybeUninit<T>>> = (0..capacity)
172 .map(|_| UnsafeCell::new(MaybeUninit::uninit()))
173 .collect();
174
175 Self {
176 buffer: buffer.into_boxed_slice(),
177 head: CachePadded::new(AtomicUsize::new(0)),
178 tail: CachePadded::new(AtomicUsize::new(0)),
179 capacity_mask: capacity - 1,
180 }
181 }
182
183 #[must_use]
185 pub fn capacity(&self) -> usize {
186 self.capacity_mask + 1
187 }
188
189 #[must_use]
193 pub fn is_empty(&self) -> bool {
194 let head = self.head.load(Ordering::Acquire);
195 let tail = self.tail.load(Ordering::Acquire);
196 head == tail
197 }
198
199 #[must_use]
203 pub fn is_full(&self) -> bool {
204 let head = self.head.load(Ordering::Acquire);
205 let tail = self.tail.load(Ordering::Acquire);
206 self.next_index(tail) == head
207 }
208
209 #[must_use]
213 pub fn len(&self) -> usize {
214 let head = self.head.load(Ordering::Acquire);
215 let tail = self.tail.load(Ordering::Acquire);
216 tail.wrapping_sub(head) & self.capacity_mask
217 }
218
219 pub fn push(&self, item: T) -> Result<(), T> {
231 let tail = self.tail.load(Ordering::Relaxed);
232 let next_tail = self.next_index(tail);
233
234 if next_tail == self.head.load(Ordering::Acquire) {
236 return Err(item);
237 }
238
239 #[allow(unsafe_code)]
244 unsafe {
245 (*self.buffer[tail].get()).write(item);
246 }
247
248 self.tail.store(next_tail, Ordering::Release);
250
251 Ok(())
252 }
253
254 pub fn pop(&self) -> Option<T> {
262 let head = self.head.load(Ordering::Relaxed);
263
264 if head == self.tail.load(Ordering::Acquire) {
266 return None;
267 }
268
269 #[allow(unsafe_code)]
274 let item = unsafe { (*self.buffer[head].get()).assume_init_read() };
275
276 self.head.store(self.next_index(head), Ordering::Release);
278
279 Some(item)
280 }
281
282 pub fn push_batch(&self, items: impl IntoIterator<Item = T>) -> usize {
291 let mut count = 0;
292 for item in items {
293 if self.push(item).is_err() {
294 break;
295 }
296 count += 1;
297 }
298 count
299 }
300
301 pub fn pop_batch(&self, max_count: usize) -> Vec<T> {
314 let mut items = Vec::with_capacity(max_count.min(self.len()));
315 for _ in 0..max_count {
316 if let Some(item) = self.pop() {
317 items.push(item);
318 } else {
319 break;
320 }
321 }
322 items
323 }
324
325 #[inline]
358 pub fn pop_batch_into(&self, buffer: &mut [MaybeUninit<T>]) -> usize {
359 if buffer.is_empty() {
360 return 0;
361 }
362
363 let mut current_head = self.head.load(Ordering::Relaxed);
364 let tail = self.tail.load(Ordering::Acquire);
365
366 let available = if tail >= current_head {
368 tail - current_head
369 } else {
370 (self.capacity_mask + 1) - current_head + tail
371 };
372
373 let count = available.min(buffer.len());
374
375 if count == 0 {
376 return 0;
377 }
378
379 for slot in buffer.iter_mut().take(count) {
381 #[allow(unsafe_code)]
385 unsafe {
386 let src = (*self.buffer[current_head].get()).assume_init_read();
387 slot.write(src);
388 }
389
390 current_head = self.next_index(current_head);
392 }
393
394 self.head.store(current_head, Ordering::Release);
396
397 count
398 }
399
400 #[inline]
433 pub fn pop_each<F>(&self, max_count: usize, mut f: F) -> usize
434 where
435 F: FnMut(T) -> bool,
436 {
437 if max_count == 0 {
438 return 0;
439 }
440
441 let mut current_head = self.head.load(Ordering::Relaxed);
442 let tail = self.tail.load(Ordering::Acquire);
443
444 let available = if tail >= current_head {
446 tail - current_head
447 } else {
448 (self.capacity_mask + 1) - current_head + tail
449 };
450
451 let to_pop = available.min(max_count);
452
453 if to_pop == 0 {
454 return 0;
455 }
456
457 let mut popped = 0;
458 for _ in 0..to_pop {
459 #[allow(unsafe_code)]
461 let item = unsafe { (*self.buffer[current_head].get()).assume_init_read() };
462
463 popped += 1;
464
465 current_head = self.next_index(current_head);
467
468 if !f(item) {
470 break;
471 }
472 }
473
474 if popped > 0 {
476 self.head.store(current_head, Ordering::Release);
477 }
478
479 popped
480 }
481
482 pub fn peek(&self) -> Option<&T> {
490 let head = self.head.load(Ordering::Relaxed);
491
492 if head == self.tail.load(Ordering::Acquire) {
493 return None;
494 }
495
496 #[allow(unsafe_code)]
498 unsafe {
499 Some((*self.buffer[head].get()).assume_init_ref())
500 }
501 }
502
503 #[inline]
505 const fn next_index(&self, index: usize) -> usize {
506 (index + 1) & self.capacity_mask
507 }
508}
509
510impl<T> Drop for SpscQueue<T> {
511 fn drop(&mut self) {
512 while self.pop().is_some() {}
514 }
515}
516
517impl<T: std::fmt::Debug> std::fmt::Debug for SpscQueue<T> {
518 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
519 f.debug_struct("SpscQueue")
520 .field("capacity", &self.capacity())
521 .field("len", &self.len())
522 .finish()
523 }
524}
525
526#[cfg(test)]
527mod tests {
528 use super::*;
529 use std::sync::Arc;
530 use std::thread;
531
532 #[test]
533 fn test_cache_padded_size() {
534 assert!(std::mem::align_of::<CachePadded<AtomicUsize>>() == 64);
536 }
537
538 #[test]
539 fn test_cache_padded_operations() {
540 let padded = CachePadded::new(42u32);
541 assert_eq!(*padded, 42);
542 assert_eq!(*padded.get(), 42);
543
544 let mut padded = CachePadded::new(42u32);
545 *padded.get_mut() = 100;
546 assert_eq!(*padded, 100);
547
548 let inner = padded.into_inner();
549 assert_eq!(inner, 100);
550 }
551
552 #[test]
553 fn test_cache_padded_default() {
554 let padded: CachePadded<u32> = CachePadded::default();
555 assert_eq!(*padded, 0);
556 }
557
558 #[test]
559 fn test_cache_padded_clone() {
560 let padded = CachePadded::new(42u32);
561 let cloned = padded.clone();
562 assert_eq!(*cloned, 42);
563 }
564
565 #[test]
566 fn test_new_queue() {
567 let queue: SpscQueue<i32> = SpscQueue::new(100);
568 assert_eq!(queue.capacity(), 128);
570 assert!(queue.is_empty());
571 assert!(!queue.is_full());
572 assert_eq!(queue.len(), 0);
573 }
574
575 #[test]
576 fn test_push_pop() {
577 let queue: SpscQueue<i32> = SpscQueue::new(4);
578
579 assert!(queue.push(1).is_ok());
580 assert!(queue.push(2).is_ok());
581 assert!(queue.push(3).is_ok());
582 assert!(queue.is_full());
584 assert!(queue.push(4).is_err());
585
586 assert_eq!(queue.pop(), Some(1));
587 assert_eq!(queue.pop(), Some(2));
588 assert_eq!(queue.pop(), Some(3));
589 assert_eq!(queue.pop(), None);
590 assert!(queue.is_empty());
591 }
592
593 #[test]
594 fn test_fifo_order() {
595 let queue: SpscQueue<i32> = SpscQueue::new(16);
596
597 for i in 0..10 {
598 assert!(queue.push(i).is_ok());
599 }
600
601 for i in 0..10 {
602 assert_eq!(queue.pop(), Some(i));
603 }
604 }
605
606 #[test]
607 fn test_wrap_around() {
608 let queue: SpscQueue<i32> = SpscQueue::new(4);
609
610 for iteration in 0..5 {
612 for i in 0..3 {
613 assert!(queue.push(iteration * 10 + i).is_ok());
614 }
615 for i in 0..3 {
616 assert_eq!(queue.pop(), Some(iteration * 10 + i));
617 }
618 }
619 }
620
621 #[test]
622 fn test_peek() {
623 let queue: SpscQueue<i32> = SpscQueue::new(4);
624
625 assert!(queue.peek().is_none());
626
627 queue.push(42).unwrap();
628 assert_eq!(queue.peek(), Some(&42));
629 assert_eq!(queue.peek(), Some(&42)); assert_eq!(queue.pop(), Some(42));
632 assert!(queue.peek().is_none());
633 }
634
635 #[test]
636 fn test_push_batch() {
637 let queue: SpscQueue<i32> = SpscQueue::new(8);
638
639 let pushed = queue.push_batch(vec![1, 2, 3, 4, 5]);
640 assert_eq!(pushed, 5);
641 assert_eq!(queue.len(), 5);
642
643 let pushed = queue.push_batch(vec![6, 7, 8, 9, 10]);
645 assert_eq!(pushed, 2); }
647
648 #[test]
649 fn test_pop_batch() {
650 let queue: SpscQueue<i32> = SpscQueue::new(8);
651
652 queue.push_batch(vec![1, 2, 3, 4, 5]);
653
654 let items = queue.pop_batch(3);
655 assert_eq!(items, vec![1, 2, 3]);
656 assert_eq!(queue.len(), 2);
657
658 let items = queue.pop_batch(10); assert_eq!(items, vec![4, 5]);
660 assert!(queue.is_empty());
661 }
662
663 #[test]
664 fn test_concurrent_producer_consumer() {
665 const ITEMS: i32 = 10_000;
666 let queue = Arc::new(SpscQueue::<i32>::new(1024));
667 let queue_producer = Arc::clone(&queue);
668 let queue_consumer = Arc::clone(&queue);
669
670 let producer = thread::spawn(move || {
672 for i in 0..ITEMS {
673 while queue_producer.push(i).is_err() {
674 thread::yield_now();
675 }
676 }
677 });
678
679 let consumer = thread::spawn(move || {
681 let mut received = Vec::with_capacity(ITEMS as usize);
682 while received.len() < ITEMS as usize {
683 if let Some(item) = queue_consumer.pop() {
684 received.push(item);
685 } else {
686 thread::yield_now();
687 }
688 }
689 received
690 });
691
692 producer.join().unwrap();
693 let received = consumer.join().unwrap();
694
695 assert_eq!(received.len(), ITEMS as usize);
697 for (i, &item) in received.iter().enumerate() {
698 assert_eq!(
699 item,
700 i32::try_from(i).unwrap(),
701 "Item out of order at index {i}"
702 );
703 }
704 }
705
706 #[derive(Debug)]
707 struct DropCounter(Arc<AtomicUsize>);
708
709 impl Drop for DropCounter {
710 fn drop(&mut self) {
711 self.0.fetch_add(1, Ordering::SeqCst);
712 }
713 }
714
715 #[test]
716 fn test_drop() {
717 use std::sync::atomic::AtomicUsize;
718 use std::sync::Arc;
719
720 let drop_count = Arc::new(AtomicUsize::new(0));
721
722 {
723 let queue: SpscQueue<DropCounter> = SpscQueue::new(8);
724 for _ in 0..5 {
725 queue.push(DropCounter(Arc::clone(&drop_count))).unwrap();
726 }
727 queue.pop();
729 queue.pop();
730 }
732
733 assert_eq!(drop_count.load(Ordering::SeqCst), 5);
735 }
736
737 #[test]
738 fn test_debug() {
739 let queue: SpscQueue<i32> = SpscQueue::new(8);
740 queue.push(1).unwrap();
741 queue.push(2).unwrap();
742
743 let debug_str = format!("{queue:?}");
744 assert!(debug_str.contains("SpscQueue"));
745 assert!(debug_str.contains("capacity"));
746 assert!(debug_str.contains("len"));
747 }
748
749 #[test]
750 #[should_panic(expected = "capacity must be > 0")]
751 fn test_zero_capacity_panics() {
752 let _: SpscQueue<i32> = SpscQueue::new(0);
753 }
754
755 #[test]
756 fn test_pop_batch_into() {
757 let queue: SpscQueue<i32> = SpscQueue::new(16);
758
759 queue.push(1).unwrap();
761 queue.push(2).unwrap();
762 queue.push(3).unwrap();
763
764 let mut buffer: [MaybeUninit<i32>; 8] = [MaybeUninit::uninit(); 8];
766 let count = queue.pop_batch_into(&mut buffer);
767
768 assert_eq!(count, 3);
769
770 #[allow(unsafe_code)]
772 unsafe {
773 assert_eq!(buffer[0].assume_init(), 1);
774 assert_eq!(buffer[1].assume_init(), 2);
775 assert_eq!(buffer[2].assume_init(), 3);
776 }
777
778 assert!(queue.is_empty());
779 }
780
781 #[test]
782 fn test_pop_batch_into_partial() {
783 let queue: SpscQueue<i32> = SpscQueue::new(16);
784
785 for i in 0..5 {
787 queue.push(i).unwrap();
788 }
789
790 let mut buffer: [MaybeUninit<i32>; 3] = [MaybeUninit::uninit(); 3];
792 let count = queue.pop_batch_into(&mut buffer);
793
794 assert_eq!(count, 3);
795 assert_eq!(queue.len(), 2); #[allow(unsafe_code)]
799 unsafe {
800 assert_eq!(buffer[0].assume_init(), 0);
801 assert_eq!(buffer[1].assume_init(), 1);
802 assert_eq!(buffer[2].assume_init(), 2);
803 }
804 }
805
806 #[test]
807 fn test_pop_batch_into_empty() {
808 let queue: SpscQueue<i32> = SpscQueue::new(16);
809
810 let mut buffer: [MaybeUninit<i32>; 8] = [MaybeUninit::uninit(); 8];
811 let count = queue.pop_batch_into(&mut buffer);
812
813 assert_eq!(count, 0);
814 }
815
816 #[test]
817 fn test_pop_batch_into_empty_buffer() {
818 let queue: SpscQueue<i32> = SpscQueue::new(16);
819 queue.push(1).unwrap();
820
821 let mut buffer: [MaybeUninit<i32>; 0] = [];
822 let count = queue.pop_batch_into(&mut buffer);
823
824 assert_eq!(count, 0);
825 assert_eq!(queue.len(), 1); }
827
828 #[test]
829 fn test_pop_each() {
830 let queue: SpscQueue<i32> = SpscQueue::new(16);
831
832 queue.push(1).unwrap();
833 queue.push(2).unwrap();
834 queue.push(3).unwrap();
835
836 let mut sum = 0;
837 let count = queue.pop_each(10, |item| {
838 sum += item;
839 true
840 });
841
842 assert_eq!(count, 3);
843 assert_eq!(sum, 6);
844 assert!(queue.is_empty());
845 }
846
847 #[test]
848 fn test_pop_each_early_stop() {
849 let queue: SpscQueue<i32> = SpscQueue::new(16);
850
851 queue.push(1).unwrap();
852 queue.push(2).unwrap();
853 queue.push(3).unwrap();
854 queue.push(4).unwrap();
855 queue.push(5).unwrap();
856
857 let mut items = Vec::new();
858 let count = queue.pop_each(10, |item| {
859 items.push(item);
860 item < 3 });
862
863 assert_eq!(count, 3); assert_eq!(items, vec![1, 2, 3]);
865 assert_eq!(queue.len(), 2); }
867
868 #[test]
869 fn test_pop_each_max_count() {
870 let queue: SpscQueue<i32> = SpscQueue::new(16);
871
872 for i in 0..10 {
873 queue.push(i).unwrap();
874 }
875
876 let mut count_processed = 0;
877 let count = queue.pop_each(5, |_| {
878 count_processed += 1;
879 true
880 });
881
882 assert_eq!(count, 5);
883 assert_eq!(count_processed, 5);
884 assert_eq!(queue.len(), 5); }
886
887 #[test]
888 fn test_pop_each_empty() {
889 let queue: SpscQueue<i32> = SpscQueue::new(16);
890
891 let mut called = false;
892 let count = queue.pop_each(10, |_| {
893 called = true;
894 true
895 });
896
897 assert_eq!(count, 0);
898 assert!(!called);
899 }
900
901 #[test]
902 fn test_pop_each_zero_max() {
903 let queue: SpscQueue<i32> = SpscQueue::new(16);
904 queue.push(1).unwrap();
905
906 let count = queue.pop_each(0, |_| true);
907
908 assert_eq!(count, 0);
909 assert_eq!(queue.len(), 1); }
911
912 #[test]
913 fn test_pop_batch_into_wrap_around() {
914 let queue: SpscQueue<i32> = SpscQueue::new(4); for _ in 0..3 {
918 for i in 0..3 {
919 queue.push(i).unwrap();
920 }
921 for _ in 0..3 {
922 queue.pop();
923 }
924 }
925
926 queue.push(10).unwrap();
928 queue.push(11).unwrap();
929
930 let mut buffer: [MaybeUninit<i32>; 4] = [MaybeUninit::uninit(); 4];
931 let count = queue.pop_batch_into(&mut buffer);
932
933 assert_eq!(count, 2);
934
935 #[allow(unsafe_code)]
936 unsafe {
937 assert_eq!(buffer[0].assume_init(), 10);
938 assert_eq!(buffer[1].assume_init(), 11);
939 }
940 }
941
942 #[test]
943 fn test_pop_each_wrap_around() {
944 let queue: SpscQueue<i32> = SpscQueue::new(4);
945
946 for _ in 0..3 {
948 for i in 0..3 {
949 queue.push(i).unwrap();
950 }
951 let _ = queue.pop_batch(3);
952 }
953
954 queue.push(100).unwrap();
956 queue.push(200).unwrap();
957
958 let mut items = Vec::new();
959 queue.pop_each(10, |item| {
960 items.push(item);
961 true
962 });
963
964 assert_eq!(items, vec![100, 200]);
965 }
966}