1#![allow(
6 clippy::cast_possible_truncation,
7 reason = "LongNumber should be synonymous to usize"
8)]
9use orengine_utils::hints::unlikely;
10use orengine_utils::light_arc::LightArc;
11use crate::batch_receiver::{BatchReceiver, LockFreeBatchReceiver, LockFreePushBatchErr};
12use crate::number_types::{
13 CachePaddedLongAtomic, LongAtomic, LongNumber, NotCachePaddedLongAtomic,
14};
15use crate::suspicious_orders::SUSPICIOUS_RELAXED_ACQUIRE;
16use crate::{LockFreePopErr, LockFreePushErr, LockFreePushManyErr};
17use std::cell::{Cell, UnsafeCell};
18use std::marker::PhantomData;
19use std::mem::{needs_drop, MaybeUninit};
20use std::ops::Deref;
21use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
22use std::{ptr, slice};
23
24#[repr(C)]
58pub struct SPMCBoundedQueue<
59 T: Send,
60 const CAPACITY: usize,
61 AtomicWrapper: Deref<Target = LongAtomic> + Default = NotCachePaddedLongAtomic,
62> {
63 tail: AtomicWrapper,
64 cached_head: Cell<LongNumber>,
65 head: AtomicWrapper,
66 buffer: UnsafeCell<[MaybeUninit<T>; CAPACITY]>,
67}
68
69impl<T: Send, const CAPACITY: usize, AtomicWrapper: Deref<Target = LongAtomic> + Default>
70 SPMCBoundedQueue<T, CAPACITY, AtomicWrapper>
71{
72 const NUM_VALUES_TAKEN: LongNumber = CAPACITY as LongNumber / 2;
77
78 pub fn new() -> Self {
80 debug_assert!(size_of::<MaybeUninit<T>>() == size_of::<T>()); Self {
83 buffer: UnsafeCell::new([const { MaybeUninit::uninit() }; CAPACITY]),
84 tail: AtomicWrapper::default(),
85 cached_head: Cell::new(0),
86 head: AtomicWrapper::default(),
87 }
88 }
89
90 #[inline]
92 pub fn capacity(&self) -> usize {
93 CAPACITY
94 }
95
96 #[allow(clippy::mut_from_ref, reason = "It improves readability")]
98 fn buffer_mut(&self) -> &mut [MaybeUninit<T>] {
99 unsafe { &mut *self.buffer.get() }
100 }
101
102 fn buffer_thin_ptr(&self) -> *const MaybeUninit<T> {
104 self.buffer.get() as *const _
105 }
106
107 fn buffer_mut_thin_ptr(&self) -> *mut MaybeUninit<T> {
109 self.buffer.get().cast()
110 }
111
112 #[inline]
114 fn len(head: LongNumber, tail: LongNumber) -> usize {
115 tail.wrapping_sub(head) as usize
116 }
117}
118
119impl<T: Send, const CAPACITY: usize, AtomicWrapper: Deref<Target = LongAtomic> + Default>
121 SPMCBoundedQueue<T, CAPACITY, AtomicWrapper>
122{
123 fn copy_slice(buffer_ptr: *mut T, start_tail: LongNumber, slice: &[T]) -> LongNumber {
125 let tail_idx = start_tail as usize % CAPACITY;
126
127 if tail_idx + slice.len() <= CAPACITY {
128 unsafe {
129 ptr::copy_nonoverlapping(slice.as_ptr(), buffer_ptr.add(tail_idx), slice.len());
130 };
131 } else {
132 let right = CAPACITY - tail_idx;
133
134 unsafe {
135 ptr::copy_nonoverlapping(slice.as_ptr(), buffer_ptr.add(tail_idx), right);
136 ptr::copy_nonoverlapping(
137 slice.as_ptr().add(right),
138 buffer_ptr,
139 slice.len() - right,
140 );
141 }
142 }
143
144 start_tail.wrapping_add(slice.len() as LongNumber)
145 }
146
147 #[inline]
153 pub unsafe fn producer_len(&self) -> usize {
154 let tail = unsafe { self.tail.unsync_load() }; self.cached_head
156 .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
157
158 Self::len(self.cached_head.get(), tail)
159 }
160
161 #[inline]
168 pub unsafe fn producer_pop(&self) -> Option<T> {
169 let tail = unsafe { self.tail.unsync_load() }; let mut head = self.cached_head.get();
171
172 loop {
173 if unlikely(head == tail) {
174 self.cached_head
175 .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
176
177 if head == self.cached_head.get() {
178 return None;
179 }
180
181 head = self.cached_head.get();
182 }
183
184 let new_head = head.wrapping_add(1);
185
186 match self
187 .head
188 .compare_exchange_weak(head, new_head, Release, Relaxed)
189 {
190 Ok(_) => {
191 self.cached_head.set(new_head);
195
196 return Some(unsafe {
197 self.buffer_thin_ptr()
198 .add(head as usize % CAPACITY)
199 .read()
200 .assume_init()
201 });
202 }
203 Err(new_head) => {
204 head = new_head;
205 }
206 }
207 }
208 }
209
210 #[inline]
217 pub unsafe fn producer_pop_many(&self, dst: &mut [MaybeUninit<T>]) -> usize {
218 let tail = unsafe { self.tail.unsync_load() }; let mut head = self.cached_head.get();
220
221 loop {
222 let mut available = Self::len(head, tail);
223 let mut n = dst.len().min(available);
224
225 if n == 0 {
226 self.cached_head
227 .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
228
229 if unlikely(head == self.cached_head.get()) {
230 return 0;
231 }
232
233 head = self.cached_head.get();
234 available = Self::len(head, tail);
235 n = dst.len().min(available);
236 }
237
238 debug_assert!(n <= CAPACITY, "Bug occurred, please report it.");
239
240 let new_head = head.wrapping_add(n as LongNumber);
241
242 match self
243 .head
244 .compare_exchange_weak(head, new_head, Release, Relaxed)
245 {
246 Ok(_) => {
247 let dst_ptr = dst.as_mut_ptr();
251 let head_idx = head as usize % CAPACITY;
252 let right = CAPACITY - head_idx;
253
254 if n <= right {
255 unsafe {
257 ptr::copy_nonoverlapping(
258 self.buffer_thin_ptr().add(head_idx),
259 dst_ptr,
260 n,
261 );
262 }
263 } else {
264 unsafe {
265 ptr::copy_nonoverlapping(
267 self.buffer_thin_ptr().add(head_idx),
268 dst_ptr,
269 right,
270 );
271 ptr::copy_nonoverlapping(
272 self.buffer_thin_ptr(),
273 dst_ptr.add(right),
274 n - right,
275 );
276 }
277 }
278
279 self.cached_head.set(new_head);
280
281 return n;
282 }
283 Err(new_head) => {
284 head = new_head;
285 }
286 }
287 }
288 }
289
290 #[inline(always)]
296 pub unsafe fn push_unchecked(&self, value: T, tail: LongNumber) {
297 unsafe {
298 self.buffer_mut_thin_ptr()
299 .add(tail as usize % CAPACITY)
300 .write(MaybeUninit::new(value));
301 }
302
303 self.tail.store(tail.wrapping_add(1), Release);
304 }
305
306 #[inline(never)]
308 #[cold]
309 fn handle_overflow_one<BR: BatchReceiver<T>>(
310 &self,
311 tail: LongNumber,
312 mut head: LongNumber,
313 br: &BR,
314 value: T,
315 ) {
316 debug_assert!(tail == head.wrapping_add(CAPACITY as LongNumber) && tail > head);
317
318 loop {
319 let head_idx = head as usize % CAPACITY;
320 let buffer = self.buffer_mut();
321
322 let (right, left): (&[MaybeUninit<T>], &[MaybeUninit<T>]) =
323 if head_idx < Self::NUM_VALUES_TAKEN as usize {
324 (
326 &buffer[head_idx..head_idx + Self::NUM_VALUES_TAKEN as usize],
327 &[],
328 )
329 } else {
330 let left_part_len = head_idx - Self::NUM_VALUES_TAKEN as usize;
331
332 (&buffer[head_idx..], &buffer[..left_part_len])
333 };
334
335 let res = self.head.compare_exchange_weak(
338 head,
339 head.wrapping_add(Self::NUM_VALUES_TAKEN),
340 Release,
341 Relaxed,
342 );
343
344 match res {
345 Ok(_) => unsafe {
346 br.push_many_and_one(
347 &*(ptr::from_ref::<[MaybeUninit<T>]>(left) as *const [T]),
348 &*(ptr::from_ref::<[MaybeUninit<T>]>(right) as *const [T]),
349 value,
350 );
351
352 return;
353 },
354 Err(new_head) => {
355 head = new_head;
356
357 if Self::len(head, tail) < CAPACITY {
358 unsafe { self.push_unchecked(value, tail) };
364
365 return;
366 }
367 }
368 }
369 }
370 }
371
372 #[inline(never)]
374 #[cold]
375 fn handle_overflow_many<BR: BatchReceiver<T>>(
376 &self,
377 tail: LongNumber,
378 mut head: LongNumber,
379 br: &BR,
380 slice: &[T],
381 ) {
382 debug_assert!(tail == head.wrapping_add(CAPACITY as LongNumber) && tail > head);
383
384 loop {
385 let head_idx = head as usize % CAPACITY;
386 let buffer = self.buffer_mut();
387
388 let (right, left): (&[MaybeUninit<T>], &[MaybeUninit<T>]) =
389 if head_idx < Self::NUM_VALUES_TAKEN as usize {
390 (
392 &buffer[head_idx..head_idx + Self::NUM_VALUES_TAKEN as usize],
393 &[],
394 )
395 } else {
396 let left_part_len = head_idx - Self::NUM_VALUES_TAKEN as usize;
397
398 (&buffer[head_idx..], &buffer[..left_part_len])
399 };
400
401 let res = self.head.compare_exchange_weak(
404 head,
405 head.wrapping_add(Self::NUM_VALUES_TAKEN),
406 Release,
407 Relaxed,
408 );
409
410 match res {
411 Ok(_) => {
412 unsafe {
413 br.push_many_and_slice(
414 &*(ptr::from_ref::<[MaybeUninit<T>]>(left) as *const [T]),
415 &*(ptr::from_ref::<[MaybeUninit<T>]>(right) as *const [T]),
416 slice,
417 );
418 }
419
420 return;
421 }
422 Err(new_head) => {
423 head = new_head;
424
425 let len = Self::len(head, tail);
426
427 if len + slice.len() <= CAPACITY {
428 let new_tail =
434 Self::copy_slice(self.buffer_mut_thin_ptr().cast(), tail, slice);
435 self.tail.store(new_tail, Release);
436
437 return;
438 }
439 }
440 }
441 }
442 }
443
444 #[inline(never)]
448 #[cold]
449 fn handle_lock_free_overflow_one<BR: LockFreeBatchReceiver<T>>(
450 &self,
451 tail: LongNumber,
452 head: LongNumber,
453 br: &BR,
454 value: T,
455 ) -> Result<(), T> {
456 debug_assert!(tail == head.wrapping_add(CAPACITY as LongNumber) && tail > head);
457
458 let head_idx = head as usize % CAPACITY;
459 let buffer = self.buffer_mut();
460 let (right, left): (&[MaybeUninit<T>], &[MaybeUninit<T>]) =
461 if head_idx < Self::NUM_VALUES_TAKEN as usize {
462 (
464 &buffer[head_idx..head_idx + Self::NUM_VALUES_TAKEN as usize],
465 &[],
466 )
467 } else {
468 let left_part_len = head_idx - Self::NUM_VALUES_TAKEN as usize;
469
470 (&buffer[head_idx..], &buffer[..left_part_len])
471 };
472
473 let res = unsafe {
474 br.push_many_and_one_and_commit_if(
475 &*(ptr::from_ref::<[MaybeUninit<T>]>(left) as *const [T]),
476 &*(ptr::from_ref::<[MaybeUninit<T>]>(right) as *const [T]),
477 value,
478 || {
479 self.head.compare_exchange(
480 head,
481 head.wrapping_add(Self::NUM_VALUES_TAKEN),
482 Release,
483 Relaxed,
484 )
485 },
486 )
487 };
488
489 match res {
490 Ok(_) => Ok(()),
491 Err(LockFreePushBatchErr::ShouldWait(value)) => Err(value),
492 Err(LockFreePushBatchErr::CondictionIsFalse((value, head))) => {
493 debug_assert!(Self::len(head, tail) < CAPACITY);
494
495 unsafe { self.push_unchecked(value, tail) };
502
503 Ok(())
504 }
505 }
506 }
507
508 #[inline(never)]
512 #[cold]
513 fn handle_lock_free_overflow_many<BR: LockFreeBatchReceiver<T>>(
514 &self,
515 tail: LongNumber,
516 head: LongNumber,
517 br: &BR,
518 slice: &[T],
519 ) -> Result<(), ()> {
520 debug_assert!(tail == head.wrapping_add(CAPACITY as LongNumber) && tail > head);
521
522 let head_idx = head as usize % CAPACITY;
523 let buffer = self.buffer_mut();
524
525 let (right, left): (&[MaybeUninit<T>], &[MaybeUninit<T>]) =
526 if head_idx < Self::NUM_VALUES_TAKEN as usize {
527 (
529 &buffer[head_idx..head_idx + Self::NUM_VALUES_TAKEN as usize],
530 &[],
531 )
532 } else {
533 let left_part_len = head_idx - Self::NUM_VALUES_TAKEN as usize;
534
535 (&buffer[head_idx..], &buffer[..left_part_len])
536 };
537
538 let res = unsafe {
539 br.lock_free_push_many_and_slice_and_commit_if(
540 &*(ptr::from_ref::<[MaybeUninit<T>]>(left) as *const [T]),
541 &*(ptr::from_ref::<[MaybeUninit<T>]>(right) as *const [T]),
542 slice,
543 || {
544 self.head.compare_exchange(
545 head,
546 head.wrapping_add(Self::NUM_VALUES_TAKEN),
547 Release,
548 Relaxed,
549 )
550 },
551 )
552 };
553
554 match res {
555 Ok(_) => Ok(()),
556 Err(LockFreePushBatchErr::ShouldWait(())) => Err(()),
557 Err(LockFreePushBatchErr::CondictionIsFalse(((), head))) => {
558 let len = Self::len(head, tail);
559
560 if len + slice.len() <= CAPACITY {
561 let new_tail = Self::copy_slice(self.buffer_mut_thin_ptr().cast(), tail, slice);
568 self.tail.store(new_tail, Release);
569
570 return Ok(());
571 }
572
573 Ok(())
574 }
575 }
576 }
577
578 #[inline]
584 pub unsafe fn producer_push<BR: BatchReceiver<T>>(&self, value: T, batch_receiver: &BR) {
585 let tail = unsafe { self.tail.unsync_load() }; let head = self.cached_head.get();
587
588 if unlikely(Self::len(head, tail) == CAPACITY) {
589 self.cached_head
590 .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
591
592 if unlikely(head == self.cached_head.get()) {
593 self.handle_overflow_one(tail, head, batch_receiver, value);
594
595 return;
596 }
597
598 }
600
601 unsafe { self.push_unchecked(value, tail) };
602 }
603
604 #[inline]
611 pub unsafe fn producer_lock_free_push<BR: LockFreeBatchReceiver<T>>(
612 &self,
613 value: T,
614 batch_receiver: &BR,
615 ) -> Result<(), T> {
616 let tail = unsafe { self.tail.unsync_load() }; let head = self.cached_head.get();
618
619 if unlikely(Self::len(self.cached_head.get(), tail) == CAPACITY) {
620 self.cached_head.set(self.head.load(Acquire));
621
622 if unlikely(head == self.cached_head.get()) {
623 self.handle_lock_free_overflow_one(tail, head, batch_receiver, value)?;
624
625 return Ok(());
626 }
627
628 }
630
631 unsafe { self.push_unchecked(value, tail) };
632
633 Ok(())
634 }
635
636 #[inline]
642 pub unsafe fn producer_maybe_push(&self, value: T) -> Result<(), T> {
643 let tail = unsafe { self.tail.unsync_load() }; let head = self.cached_head.get();
645
646 if unlikely(Self::len(head, tail) >= CAPACITY) {
647 self.cached_head
648 .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
649
650 if unlikely(head == self.cached_head.get()) {
651 return Err(value);
652 }
653
654 }
656
657 debug_assert!(Self::len(self.cached_head.get(), tail) < CAPACITY);
658
659 unsafe { self.push_unchecked(value, tail) };
660
661 Ok(())
662 }
663
664 #[inline]
671 pub unsafe fn producer_push_many_unchecked(&self, first: &[T], last: &[T]) {
672 if cfg!(debug_assertions) {
673 let head = self.head.load(Acquire);
674 let tail = unsafe { self.tail.unsync_load() }; debug_assert!(Self::len(head, tail) + first.len() + last.len() <= CAPACITY);
677 }
678
679 let mut tail = unsafe { self.tail.unsync_load() }; tail = Self::copy_slice(self.buffer_mut_thin_ptr().cast(), tail, first);
684 tail = Self::copy_slice(self.buffer_mut_thin_ptr().cast(), tail, last);
685
686 self.tail.store(tail, Release);
687 }
688
689 #[inline]
695 pub unsafe fn producer_push_many<BR: BatchReceiver<T>>(
696 &self,
697 slice: &[T],
698 batch_receiver: &BR,
699 ) {
700 let mut tail = unsafe { self.tail.unsync_load() }; if unlikely(Self::len(self.cached_head.get(), tail) + slice.len() > CAPACITY) {
703 self.cached_head
704 .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
705
706 if unlikely(Self::len(self.cached_head.get(), tail) + slice.len() > CAPACITY) {
707 self.handle_overflow_many(tail, self.cached_head.get(), batch_receiver, slice);
708
709 return;
710 }
711
712 }
714
715 tail = Self::copy_slice(self.buffer_mut_thin_ptr().cast(), tail, slice);
716
717 self.tail.store(tail, Release);
718 }
719
720 #[inline]
727 pub unsafe fn producer_lock_free_push_many<BR: LockFreeBatchReceiver<T>>(
728 &self,
729 slice: &[T],
730 batch_receiver: &BR,
731 ) -> Result<(), ()> {
732 let mut tail = unsafe { self.tail.unsync_load() }; if unlikely(Self::len(self.cached_head.get(), tail) + slice.len() > CAPACITY) {
735 self.cached_head
736 .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
737
738 if unlikely(Self::len(self.cached_head.get(), tail) + slice.len() > CAPACITY) {
739 self.handle_lock_free_overflow_many(
740 tail,
741 self.cached_head.get(),
742 batch_receiver,
743 slice,
744 )?;
745
746 return Ok(());
747 }
748
749 }
751
752 tail = Self::copy_slice(self.buffer_mut_thin_ptr().cast(), tail, slice);
753
754 self.tail.store(tail, Release);
755
756 Ok(())
757 }
758
759 #[inline]
765 pub unsafe fn producer_maybe_push_many(&self, slice: &[T]) -> Result<(), ()> {
766 let mut tail = unsafe { self.tail.unsync_load() }; if unlikely(Self::len(self.cached_head.get(), tail) + slice.len() > CAPACITY) {
769 self.cached_head
770 .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
771
772 if unlikely(Self::len(self.cached_head.get(), tail) + slice.len() > CAPACITY) {
773 return Err(()); }
775
776 }
778
779 debug_assert!(Self::len(self.cached_head.get(), tail) + slice.len() <= CAPACITY);
780
781 tail = Self::copy_slice(self.buffer_mut_thin_ptr().cast(), tail, slice);
782
783 self.tail.store(tail, Release);
784
785 Ok(())
786 }
787
788 unsafe fn producer_copy_and_commit_if<FSuccess, FError>(
799 &self,
800 left: &[T],
801 right: &[T],
802 condition: impl FnOnce() -> Result<FSuccess, FError>,
803 ) -> Result<FSuccess, FError> {
804 debug_assert!(left.len() + right.len() + self.producer_len() <= CAPACITY);
805
806 let mut new_tail = Self::copy_slice(
807 self.buffer_mut_thin_ptr().cast(),
808 unsafe { self.tail.unsync_load() }, right,
810 );
811 new_tail = Self::copy_slice(self.buffer_mut_thin_ptr().cast(), new_tail, left);
812
813 let should_commit = condition();
814 match should_commit {
815 Ok(res) => {
816 self.tail.store(new_tail, Release);
817
818 Ok(res)
819 }
820 Err(err) => Err(err),
821 }
822 }
823}
824
825impl<T: Send, const CAPACITY: usize, AtomicWrapper: Deref<Target = LongAtomic> + Default>
827 SPMCBoundedQueue<T, CAPACITY, AtomicWrapper>
828{
829 #[inline]
831 pub fn consumer_len(&self) -> usize {
832 loop {
833 let head = self.head.load(SUSPICIOUS_RELAXED_ACQUIRE);
834 let tail = self.tail.load(SUSPICIOUS_RELAXED_ACQUIRE);
835 let len = Self::len(head, tail);
836
837 if unlikely(len > CAPACITY) {
838 continue;
843 }
844
845 return len;
846 }
847 }
848
849 #[inline]
852 pub fn consumer_pop_many(&self, dst: &mut [MaybeUninit<T>]) -> usize {
853 let mut head = self.head.load(SUSPICIOUS_RELAXED_ACQUIRE);
854 let mut tail = self.tail.load(Acquire);
855
856 loop {
857 let available = Self::len(head, tail);
858 let n = dst.len().min(available);
859
860 if n == 0 {
861 return 0;
862 }
863
864 if unlikely(n > CAPACITY) {
865 head = self.head.load(SUSPICIOUS_RELAXED_ACQUIRE);
871 tail = self.tail.load(Acquire);
872
873 continue;
874 }
875
876 let dst_ptr = dst.as_mut_ptr();
877 let head_idx = head as usize % CAPACITY;
878 let right = CAPACITY - head_idx;
879
880 if n <= right {
885 unsafe {
887 ptr::copy_nonoverlapping(self.buffer_thin_ptr().add(head_idx), dst_ptr, n);
888 }
889 } else {
890 unsafe {
891 ptr::copy_nonoverlapping(self.buffer_thin_ptr().add(head_idx), dst_ptr, right);
893 ptr::copy_nonoverlapping(self.buffer_thin_ptr(), dst_ptr.add(right), n - right);
894 }
895 }
896
897 match self.head.compare_exchange(
900 head,
901 head.wrapping_add(n as LongNumber),
902 Release,
903 Relaxed,
904 ) {
905 Ok(_) => return n,
906 Err(actual_head) => {
907 head = actual_head;
911 tail = self.tail.load(Acquire);
912 }
913 }
914 }
915 }
916
917 pub fn steal_into(&self, dst: &impl crate::single_producer::SingleProducer<T>) -> usize {
924 if cfg!(debug_assertions) {
925 assert!(
926 dst.is_empty(),
927 "steal_into should not be called when dst is not empty"
928 );
929 }
930
931 let mut src_head = self.head.load(SUSPICIOUS_RELAXED_ACQUIRE);
932
933 loop {
934 let src_tail = self.tail.load(Acquire);
935 let n = Self::len(src_head, src_tail) / 2;
936
937 if n > CAPACITY / 2 {
938 src_head = self.head.load(SUSPICIOUS_RELAXED_ACQUIRE);
944
945 continue;
946 }
947
948 if !cfg!(feature = "always_steal") && n < 4 || n == 0 {
949 return 0;
952 }
953
954 let src_head_idx = src_head as usize % CAPACITY;
955
956 let (src_right, src_left): (&[T], &[T]) = unsafe {
957 let right_occupied = CAPACITY - src_head_idx;
958 if n <= right_occupied {
959 (
960 slice::from_raw_parts(self.buffer_thin_ptr().add(src_head_idx).cast(), n),
961 &[],
962 )
963 } else {
964 (
965 slice::from_raw_parts(
966 self.buffer_thin_ptr().add(src_head_idx).cast(),
967 right_occupied,
968 ),
969 slice::from_raw_parts(self.buffer_thin_ptr().cast(), n - right_occupied),
970 )
971 }
972 };
973
974 let cas_closure = || {
975 self.head.compare_exchange(
977 src_head,
978 src_head.wrapping_add(n as LongNumber),
979 Release,
980 Relaxed,
981 )
982 };
983
984 let res = unsafe { dst.copy_and_commit_if(src_right, src_left, cas_closure) };
986
987 match res {
988 Ok(_) => {
989 return n;
990 }
991 Err(current_head) => {
992 src_head = current_head;
994 }
995 }
996 }
997 }
998}
999
1000impl<T: Send, const CAPACITY: usize, AtomicWrapper: Deref<Target = LongAtomic> + Default> Default
1001 for SPMCBoundedQueue<T, CAPACITY, AtomicWrapper>
1002{
1003 fn default() -> Self {
1004 Self::new()
1005 }
1006}
1007
1008unsafe impl<T: Send, const CAPACITY: usize, AtomicWrapper> Sync
1009 for SPMCBoundedQueue<T, CAPACITY, AtomicWrapper>
1010where
1011 AtomicWrapper: Deref<Target = LongAtomic> + Default,
1012{
1013}
1014
1015#[allow(clippy::non_send_fields_in_send_ty, reason = "We guarantee it is Send")]
1016unsafe impl<T: Send, const CAPACITY: usize, AtomicWrapper> Send
1017 for SPMCBoundedQueue<T, CAPACITY, AtomicWrapper>
1018where
1019 AtomicWrapper: Deref<Target = LongAtomic> + Default,
1020{
1021}
1022
1023impl<T: Send, const CAPACITY: usize, AtomicWrapper> Drop
1024 for SPMCBoundedQueue<T, CAPACITY, AtomicWrapper>
1025where
1026 AtomicWrapper: Deref<Target = LongAtomic> + Default,
1027{
1028 fn drop(&mut self) {
1029 if needs_drop::<T>() {
1032 let mut head = unsafe { self.head.unsync_load() };
1033 let tail = unsafe { self.tail.unsync_load() };
1034
1035 while head != tail {
1036 unsafe {
1037 ptr::drop_in_place(
1038 self.buffer_thin_ptr()
1039 .add(head as usize % CAPACITY)
1040 .cast::<T>()
1041 .cast_mut(),
1042 );
1043 }
1044
1045 head = head.wrapping_add(1);
1046 }
1047 }
1048 }
1049}
1050
1051macro_rules! generate_spmc_producer_and_consumer {
1053 ($producer_name:ident, $consumer_name:ident, $atomic_wrapper:ty) => {
1054 pub struct $producer_name<T: Send, const CAPACITY: usize> {
1056 inner: LightArc<SPMCBoundedQueue<T, CAPACITY, $atomic_wrapper>>,
1057 _non_sync: PhantomData<*const ()>,
1058 }
1059
1060 impl<T: Send, const CAPACITY: usize> $crate::Producer<T> for $producer_name<T, CAPACITY> {
1061 #[inline]
1062 fn capacity(&self) -> usize {
1063 CAPACITY as usize
1064 }
1065
1066 #[inline]
1067 fn len(&self) -> usize {
1068 unsafe { self.inner.producer_len() }
1069 }
1070
1071 #[inline]
1072 fn maybe_push(&self, value: T) -> Result<(), T> {
1073 unsafe { self.inner.producer_maybe_push(value) }
1074 }
1075 }
1076
1077 impl<T: Send, const CAPACITY: usize> $crate::LockFreeProducer<T>
1078 for $producer_name<T, CAPACITY>
1079 {
1080 fn lock_free_maybe_push(&self, value: T) -> Result<(), LockFreePushErr<T>> {
1081 unsafe {
1082 self.inner
1083 .producer_maybe_push(value)
1084 .map_err(|value| LockFreePushErr::Full(value))
1085 }
1086 }
1087 }
1088
1089 impl<T: Send, const CAPACITY: usize> $crate::single_producer::SingleProducer<T>
1090 for $producer_name<T, CAPACITY>
1091 {
1092 #[inline]
1093 unsafe fn push_many_unchecked(&self, first: &[T], last: &[T]) {
1094 unsafe { self.inner.producer_push_many_unchecked(first, last) };
1095 }
1096
1097 #[inline]
1098 unsafe fn maybe_push_many(&self, slice: &[T]) -> Result<(), ()> {
1099 unsafe { self.inner.producer_maybe_push_many(slice) }
1100 }
1101
1102 #[inline]
1103 unsafe fn copy_and_commit_if<F, FSuccess, FError>(
1104 &self,
1105 right: &[T],
1106 left: &[T],
1107 f: F,
1108 ) -> Result<FSuccess, FError>
1109 where
1110 F: FnOnce() -> Result<FSuccess, FError>,
1111 {
1112 unsafe { self.inner.producer_copy_and_commit_if(right, left, f) }
1113 }
1114 }
1115
1116 impl<T: Send, const CAPACITY: usize> $crate::single_producer::SingleLockFreeProducer<T>
1117 for $producer_name<T, CAPACITY>
1118 {
1119 unsafe fn lock_free_maybe_push_many(
1120 &self,
1121 slice: &[T],
1122 ) -> Result<(), LockFreePushManyErr> {
1123 unsafe {
1124 self.inner
1125 .producer_maybe_push_many(slice)
1126 .map_err(|_| LockFreePushManyErr::NotEnoughSpace)
1127 }
1128 }
1129 }
1130
1131 impl<T: Send, const CAPACITY: usize> $crate::multi_consumer::MultiConsumerSpawner<T>
1132 for $producer_name<T, CAPACITY>
1133 {
1134 type SpawnedConsumer = $consumer_name<T, CAPACITY>;
1135
1136 fn spawn_multi_consumer(&self) -> Self::SpawnedConsumer {
1137 $consumer_name {
1138 inner: self.inner.clone(),
1139 _non_sync: PhantomData,
1140 }
1141 }
1142 }
1143
1144 impl<T: Send, const CAPACITY: usize> $crate::multi_consumer::MultiLockFreeConsumerSpawner<T>
1145 for $producer_name<T, CAPACITY>
1146 {
1147 type SpawnedLockFreeConsumer = $consumer_name<T, CAPACITY>;
1148
1149 fn spawn_multi_lock_free_consumer(&self) -> Self::SpawnedLockFreeConsumer {
1150 $consumer_name {
1151 inner: self.inner.clone(),
1152 _non_sync: PhantomData,
1153 }
1154 }
1155 }
1156
1157 impl<T: Send, const CAPACITY: usize> $crate::spmc_producer::SPMCProducer<T>
1158 for $producer_name<T, CAPACITY>
1159 {
1160 #[inline]
1161 unsafe fn push_many<BR: BatchReceiver<T>>(&self, slice: &[T], batch_receiver: &BR) {
1162 unsafe { self.inner.producer_push_many(slice, batch_receiver) };
1163 }
1164
1165 #[inline]
1166 fn push<BR: BatchReceiver<T>>(&self, value: T, batch_receiver: &BR) {
1167 unsafe { self.inner.producer_push(value, batch_receiver) };
1168 }
1169
1170 #[inline]
1171 fn pop(&self) -> Option<T> {
1172 unsafe { self.inner.producer_pop() }
1173 }
1174
1175 #[inline]
1176 fn pop_many(&self, dst: &mut [MaybeUninit<T>]) -> usize {
1177 unsafe { self.inner.producer_pop_many(dst) }
1178 }
1179 }
1180
1181 impl<T: Send, const CAPACITY: usize> $crate::spmc_producer::SPMCLockFreeProducer<T>
1182 for $producer_name<T, CAPACITY>
1183 {
1184 unsafe fn lock_free_push_many<BR: LockFreeBatchReceiver<T>>(
1185 &self,
1186 slice: &[T],
1187 batch_receiver: &BR,
1188 ) -> Result<(), ()> {
1189 unsafe {
1190 self.inner
1191 .producer_lock_free_push_many(slice, batch_receiver)
1192 }
1193 }
1194
1195 fn lock_free_push<BR: LockFreeBatchReceiver<T>>(
1196 &self,
1197 value: T,
1198 batch_receiver: &BR,
1199 ) -> Result<(), T> {
1200 unsafe { self.inner.producer_lock_free_push(value, batch_receiver) }
1201 }
1202
1203 fn lock_free_pop_many(&self, dst: &mut [MaybeUninit<T>]) -> (usize, bool) {
1204 unsafe { (self.inner.producer_pop_many(dst), false) }
1205 }
1206
1207 fn lock_free_pop(&self) -> Result<T, LockFreePopErr> {
1208 unsafe { self.inner.producer_pop().ok_or(LockFreePopErr::Empty) }
1209 }
1210 }
1211
1212 unsafe impl<T: Send, const CAPACITY: usize> Send for $producer_name<T, CAPACITY> {}
1213
1214 pub struct $consumer_name<T: Send, const CAPACITY: usize> {
1216 inner: LightArc<SPMCBoundedQueue<T, CAPACITY, $atomic_wrapper>>,
1217 _non_sync: PhantomData<*const ()>,
1218 }
1219
1220 impl<T: Send, const CAPACITY: usize> $crate::Consumer<T> for $consumer_name<T, CAPACITY> {
1221 #[inline]
1222 fn capacity(&self) -> usize {
1223 CAPACITY as usize
1224 }
1225
1226 #[inline]
1227 fn len(&self) -> usize {
1228 self.inner.consumer_len()
1229 }
1230
1231 #[inline]
1232 fn pop_many(&self, dst: &mut [MaybeUninit<T>]) -> usize {
1233 self.inner.consumer_pop_many(dst)
1234 }
1235
1236 #[inline(never)]
1237 fn steal_into(&self, dst: &impl $crate::single_producer::SingleProducer<T>) -> usize {
1238 self.inner.steal_into(dst)
1239 }
1240 }
1241
1242 impl<T: Send, const CAPACITY: usize> $crate::LockFreeConsumer<T>
1243 for $consumer_name<T, CAPACITY>
1244 {
1245 #[inline]
1246 fn lock_free_pop_many(&self, dst: &mut [MaybeUninit<T>]) -> (usize, bool) {
1247 (self.inner.consumer_pop_many(dst), false)
1248 }
1249
1250 #[inline(never)]
1251 fn lock_free_steal_into(
1252 &self,
1253 dst: &impl $crate::single_producer::SingleLockFreeProducer<T>,
1254 ) -> (usize, bool) {
1255 (self.inner.steal_into(dst), false)
1256 }
1257 }
1258
1259 impl<T: Send, const CAPACITY: usize> $crate::multi_consumer::MultiConsumer<T>
1260 for $consumer_name<T, CAPACITY>
1261 {
1262 }
1263
1264 impl<T: Send, const CAPACITY: usize> $crate::multi_consumer::MultiLockFreeConsumer<T>
1265 for $consumer_name<T, CAPACITY>
1266 {
1267 }
1268
1269 impl<T: Send, const CAPACITY: usize> Clone for $consumer_name<T, CAPACITY> {
1270 fn clone(&self) -> Self {
1271 Self {
1272 inner: self.inner.clone(),
1273 _non_sync: PhantomData,
1274 }
1275 }
1276 }
1277
1278 unsafe impl<T: Send, const CAPACITY: usize> Send for $consumer_name<T, CAPACITY> {}
1279 };
1280
1281 ($producer_name:ident, $consumer_name:ident) => {
1282 generate_spmc_producer_and_consumer!(
1283 $producer_name,
1284 $consumer_name,
1285 NotCachePaddedLongAtomic
1286 );
1287 };
1288}
1289
1290generate_spmc_producer_and_consumer!(SPMCProducer, SPMCConsumer);
1291
1292pub fn new_bounded<T: Send, const CAPACITY: usize>(
1341) -> (SPMCProducer<T, CAPACITY>, SPMCConsumer<T, CAPACITY>) {
1342 let queue = LightArc::new(SPMCBoundedQueue::new());
1343
1344 (
1345 SPMCProducer {
1346 inner: queue.clone(),
1347 _non_sync: PhantomData,
1348 },
1349 SPMCConsumer {
1350 inner: queue,
1351 _non_sync: PhantomData,
1352 },
1353 )
1354}
1355
1356generate_spmc_producer_and_consumer!(
1357 CachePaddedSPMCProducer,
1358 CachePaddedSPMCConsumer,
1359 CachePaddedLongAtomic
1360);
1361
1362pub fn new_cache_padded_bounded<T: Send, const CAPACITY: usize>() -> (
1411 CachePaddedSPMCProducer<T, CAPACITY>,
1412 CachePaddedSPMCConsumer<T, CAPACITY>,
1413) {
1414 let queue = LightArc::new(SPMCBoundedQueue::new());
1415
1416 (
1417 CachePaddedSPMCProducer {
1418 inner: queue.clone(),
1419 _non_sync: PhantomData,
1420 },
1421 CachePaddedSPMCConsumer {
1422 inner: queue,
1423 _non_sync: PhantomData,
1424 },
1425 )
1426}
1427
1428#[cfg(test)]
1429mod tests {
1430 use super::*;
1431 use crate::mutex_vec_queue::MutexVecQueue;
1432 use crate::single_producer::{SingleLockFreeProducer, SingleProducer};
1433 use crate::spmc_producer::{SPMCLockFreeProducer, SPMCProducer};
1434 use crate::{Consumer, LockFreeConsumer, Producer};
1435 use std::collections::VecDeque;
1436
1437 const CAPACITY: usize = 16;
1438
1439 #[test]
1440 fn test_spmc_bounded_size() {
1441 let queue = SPMCBoundedQueue::<u8, CAPACITY>::new();
1442
1443 assert_eq!(
1444 size_of_val(&queue),
1445 CAPACITY + size_of::<LongAtomic>() * 2 + align_of_val(&queue)
1446 );
1447
1448 let cache_padded_queue = SPMCBoundedQueue::<u8, CAPACITY, CachePaddedLongAtomic>::new();
1449
1450 assert_eq!(
1451 size_of_val(&cache_padded_queue),
1452 size_of::<CachePaddedLongAtomic>() * 2 + CAPACITY + align_of_val(&cache_padded_queue)
1453 );
1454 }
1455
1456 #[test]
1457 fn test_spmc_bounded_seq_insertions() {
1458 let global_queue = MutexVecQueue::new();
1459 let (producer, _) = new_bounded::<_, CAPACITY>();
1460
1461 for i in 0..CAPACITY * 100 {
1462 producer.push(i, &global_queue);
1463 }
1464
1465 let (new_producer, _) = new_bounded::<_, CAPACITY>();
1466
1467 global_queue
1468 .move_batch_to_producer(&new_producer, producer.capacity() - producer.len());
1469
1470 assert_eq!(
1471 producer.len() + new_producer.len() + global_queue.len(),
1472 CAPACITY * 100
1473 );
1474
1475 for _ in 0..producer.len() {
1476 assert!(producer.pop().is_some());
1477 }
1478
1479 for _ in 0..new_producer.len() {
1480 assert!(new_producer.pop().is_some());
1481 }
1482 }
1483
1484 #[test]
1485 fn test_spmc_bounded_stealing() {
1486 const TRIES: usize = 10;
1487
1488 let global_queue = MutexVecQueue::new();
1489 let (producer1, consumer) = new_bounded::<_, CAPACITY>();
1490 let (producer2, _) = new_bounded::<_, CAPACITY>();
1491
1492 let mut stolen = VecDeque::new();
1493
1494 for _ in 0..TRIES * 2 {
1495 for i in 0..CAPACITY / 2 {
1496 producer1.push(i, &global_queue);
1497 }
1498
1499 consumer.steal_into(&producer2);
1500
1501 while let Some(task) = producer2.pop() {
1502 stolen.push_back(task);
1503 }
1504
1505 assert!(global_queue.is_empty());
1506 }
1507
1508 assert!(producer2.is_empty());
1509
1510 let mut count = 0;
1511
1512 while producer1.pop().is_some() {
1513 count += 1;
1514 }
1515
1516 assert_eq!(count + stolen.len() + global_queue.len(), CAPACITY * TRIES);
1517 }
1518
1519 #[test]
1520 fn test_spmc_bounded_many() {
1521 const BATCH_SIZE: usize = 8;
1522 const N: usize = BATCH_SIZE * 100;
1523
1524 let global_queue = MutexVecQueue::new();
1525 let (producer, consumer) = new_bounded::<_, CAPACITY>();
1526
1527 for i in 0..N / BATCH_SIZE / 2 {
1528 let slice = (0..BATCH_SIZE)
1529 .map(|j| i * BATCH_SIZE + j)
1530 .collect::<Vec<_>>();
1531
1532 unsafe {
1533 producer.maybe_push_many(&slice).unwrap();
1534 }
1535
1536 let mut slice = [MaybeUninit::uninit(); BATCH_SIZE];
1537 producer.pop_many(slice.as_mut_slice());
1538
1539 for (j, item) in slice.iter().enumerate().take(BATCH_SIZE) {
1540 let index = i * BATCH_SIZE + j;
1541
1542 assert_eq!(unsafe { item.assume_init() }, index);
1543 }
1544 }
1545
1546 for i in 0..N / BATCH_SIZE / 2 {
1547 let slice = (0..BATCH_SIZE)
1548 .map(|j| i * BATCH_SIZE + j)
1549 .collect::<Vec<_>>();
1550
1551 unsafe {
1552 producer.push_many(&slice, &global_queue);
1553 }
1554
1555 assert!(global_queue.is_empty());
1556
1557 let mut slice = [MaybeUninit::uninit(); BATCH_SIZE];
1558 consumer.pop_many(slice.as_mut_slice());
1559
1560 for (j, item) in slice.iter().enumerate().take(BATCH_SIZE) {
1561 let index = i * BATCH_SIZE + j;
1562
1563 assert_eq!(unsafe { item.assume_init() }, index);
1564 }
1565 }
1566 }
1567
1568 #[test]
1569 fn test_spmc_lock_free_bounded_seq_insertions() {
1570 let global_queue = MutexVecQueue::new();
1571 let (producer, _) = new_cache_padded_bounded::<_, CAPACITY>();
1572
1573 for i in 0..CAPACITY * 100 {
1574 producer.lock_free_push(i, &global_queue).unwrap();
1575 }
1576
1577 let (new_producer, _) = new_bounded::<_, CAPACITY>();
1578
1579 global_queue
1580 .move_batch_to_producer(&new_producer, producer.capacity() - producer.len());
1581
1582 assert_eq!(
1583 producer.len() + new_producer.len() + global_queue.len(),
1584 CAPACITY * 100
1585 );
1586
1587 for _ in 0..producer.len() {
1588 producer.lock_free_pop().unwrap();
1589 }
1590
1591 for _ in 0..new_producer.len() {
1592 new_producer.lock_free_pop().unwrap();
1593 }
1594 }
1595
1596 #[test]
1597 fn test_spmc_lock_free_bounded_stealing() {
1598 const TRIES: usize = 10;
1599
1600 let global_queue = MutexVecQueue::new();
1601 let (producer1, consumer) = new_bounded::<_, CAPACITY>();
1602 let (producer2, _) = new_bounded::<_, CAPACITY>();
1603
1604 let mut stolen = VecDeque::new();
1605
1606 for _ in 0..TRIES * 2 {
1607 for i in 0..CAPACITY / 2 {
1608 producer1.lock_free_push(i, &global_queue).unwrap();
1609 }
1610
1611 assert!(!consumer.lock_free_steal_into(&producer2).1);
1612
1613 while let Ok(task) = producer2.lock_free_pop() {
1614 stolen.push_back(task);
1615 }
1616
1617 assert!(global_queue.is_empty());
1618 }
1619
1620 assert!(producer2.is_empty());
1621
1622 let mut count = 0;
1623
1624 while producer1.lock_free_pop().is_ok() {
1625 count += 1;
1626 }
1627
1628 assert_eq!(count + stolen.len() + global_queue.len(), CAPACITY * TRIES);
1629 }
1630
1631 #[test]
1632 fn test_spmc_lock_free_bounded_many() {
1633 const BATCH_SIZE: usize = 8;
1634 const N: usize = BATCH_SIZE * 100;
1635
1636 let global_queue = MutexVecQueue::new();
1637 let (producer, consumer) = new_bounded::<_, CAPACITY>();
1638
1639 for i in 0..N / BATCH_SIZE / 2 {
1640 let slice = (0..BATCH_SIZE)
1641 .map(|j| i * BATCH_SIZE + j)
1642 .collect::<Vec<_>>();
1643
1644 unsafe {
1645 producer.lock_free_maybe_push_many(&slice).unwrap();
1646 }
1647
1648 let mut slice = [MaybeUninit::uninit(); BATCH_SIZE];
1649
1650 assert_eq!(
1651 producer.lock_free_pop_many(slice.as_mut_slice()),
1652 (BATCH_SIZE, false)
1653 );
1654
1655 for (j, item) in slice.iter().enumerate().take(BATCH_SIZE) {
1656 let index = i * BATCH_SIZE + j;
1657
1658 assert_eq!(unsafe { item.assume_init() }, index);
1659 }
1660 }
1661
1662 for i in 0..N / BATCH_SIZE / 2 {
1663 let slice = (0..BATCH_SIZE)
1664 .map(|j| i * BATCH_SIZE + j)
1665 .collect::<Vec<_>>();
1666
1667 unsafe {
1668 producer
1669 .lock_free_push_many(&slice, &global_queue)
1670 .unwrap();
1671 }
1672
1673 assert!(global_queue.is_empty());
1674
1675 let mut slice = [MaybeUninit::uninit(); BATCH_SIZE];
1676
1677 assert_eq!(
1678 consumer.lock_free_pop_many(slice.as_mut_slice()),
1679 (BATCH_SIZE, false)
1680 );
1681
1682 for (j, item) in slice.iter().enumerate().take(BATCH_SIZE) {
1683 let index = i * BATCH_SIZE + j;
1684
1685 assert_eq!(unsafe { item.assume_init() }, index);
1686 }
1687 }
1688 }
1689}