parcoll/spmc/
const_bounded.rs

1//! This module provides a single-producer multi-consumer queue.
2//!
3//! It is implemented as a const bounded ring buffer.
4//! It is optimized for the work-stealing model.
5#![allow(
6    clippy::cast_possible_truncation,
7    reason = "LongNumber should be synonymous to usize"
8)]
9use orengine_utils::hints::unlikely;
10use orengine_utils::light_arc::LightArc;
11use crate::batch_receiver::{BatchReceiver, LockFreeBatchReceiver, LockFreePushBatchErr};
12use crate::number_types::{
13    CachePaddedLongAtomic, LongAtomic, LongNumber, NotCachePaddedLongAtomic,
14};
15use crate::suspicious_orders::SUSPICIOUS_RELAXED_ACQUIRE;
16use crate::{LockFreePopErr, LockFreePushErr, LockFreePushManyErr};
17use std::cell::{Cell, UnsafeCell};
18use std::marker::PhantomData;
19use std::mem::{needs_drop, MaybeUninit};
20use std::ops::Deref;
21use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
22use std::{ptr, slice};
23
24// Don't care about ABA because we can count that 16-bit and 32-bit processors never
25// insert + read (2 ^ 16) - 1 or (2 ^ 32) - 1 values while some consumer is preempted.
26// For 32-bit:
27// If we guess, it always puts and gets 10 values by time and does it in
28// 20 nanoseconds (it becomes slower by adding new threads), then the thread needs to be preempted
29// for 8.5 seconds while the other thread only works with this queue.
30// For 16-bit:
31// We guess it never has so much concurrency.
32// For 64-bit it is unrealistic to have the ABA problem.
33
34// Reads from the head, writes to the tail.
35
36/// The single-producer, single-consumer ring-based _const bounded_ queue.
37///
38/// It is safe to use when and only when only one thread is writing to the queue at the same time.
39///
40/// You can call `producer_` methods for the producer and `consumer_` methods for the consumers.
41///
42/// It accepts the atomic wrapper as a generic parameter.
43/// It allows using cache-padded atomics or not.
44/// You should create type aliases not to write this large type name.
45///
46/// # Using directly the [`SPMCBoundedQueue`] vs. using [`new_bounded`] or [`new_cache_padded_bounded`].
47///
48/// Functions [`new_bounded`] and [`new_cache_padded_bounded`] allocate the
49/// [`SPMCUnboundedQueue`](crate::spmc::SPMCUnboundedQueue) on the heap in [`LightArc`]
50/// and provide producer's and consumer's parts.
51/// It hurts the performance if you don't need to allocate the queue separately but improves
52/// the readability when you need to separate producer and consumer logic and share them.
53///
54/// It doesn't implement the [`Producer`](crate::Producer) and [`Consumer`](crate::Consumer)
55/// traits because all producer methods
56/// are unsafe (can be called only by one thread).
57#[repr(C)]
58pub struct SPMCBoundedQueue<
59    T: Send,
60    const CAPACITY: usize,
61    AtomicWrapper: Deref<Target = LongAtomic> + Default = NotCachePaddedLongAtomic,
62> {
63    tail: AtomicWrapper,
64    cached_head: Cell<LongNumber>,
65    head: AtomicWrapper,
66    buffer: UnsafeCell<[MaybeUninit<T>; CAPACITY]>,
67}
68
69impl<T: Send, const CAPACITY: usize, AtomicWrapper: Deref<Target = LongAtomic> + Default>
70    SPMCBoundedQueue<T, CAPACITY, AtomicWrapper>
71{
72    /// Indicates how many elements we are taking from the local queue.
73    ///
74    /// This is one less than the number of values pushed to the global
75    /// queue (or any other `SyncBatchReceiver`) as we are also inserting the `value` argument.
76    const NUM_VALUES_TAKEN: LongNumber = CAPACITY as LongNumber / 2;
77
78    /// Creates a new [`SPMCBoundedQueue`].
79    pub fn new() -> Self {
80        debug_assert!(size_of::<MaybeUninit<T>>() == size_of::<T>()); // Assume that we can just cast it
81
82        Self {
83            buffer: UnsafeCell::new([const { MaybeUninit::uninit() }; CAPACITY]),
84            tail: AtomicWrapper::default(),
85            cached_head: Cell::new(0),
86            head: AtomicWrapper::default(),
87        }
88    }
89
90    /// Returns the capacity of the queue.
91    #[inline]
92    pub fn capacity(&self) -> usize {
93        CAPACITY
94    }
95
96    /// Returns a mutable reference to the buffer.
97    #[allow(clippy::mut_from_ref, reason = "It improves readability")]
98    fn buffer_mut(&self) -> &mut [MaybeUninit<T>] {
99        unsafe { &mut *self.buffer.get() }
100    }
101
102    /// Returns a pointer to the buffer.
103    fn buffer_thin_ptr(&self) -> *const MaybeUninit<T> {
104        self.buffer.get() as *const _
105    }
106
107    /// Returns a mutable pointer to the buffer.
108    fn buffer_mut_thin_ptr(&self) -> *mut MaybeUninit<T> {
109        self.buffer.get().cast()
110    }
111
112    /// Returns the number of elements in the queue.
113    #[inline]
114    fn len(head: LongNumber, tail: LongNumber) -> usize {
115        tail.wrapping_sub(head) as usize
116    }
117}
118
119// Producer
120impl<T: Send, const CAPACITY: usize, AtomicWrapper: Deref<Target = LongAtomic> + Default>
121    SPMCBoundedQueue<T, CAPACITY, AtomicWrapper>
122{
123    /// Pushes a slice into the queue. Returns a new tail (not index).
124    fn copy_slice(buffer_ptr: *mut T, start_tail: LongNumber, slice: &[T]) -> LongNumber {
125        let tail_idx = start_tail as usize % CAPACITY;
126
127        if tail_idx + slice.len() <= CAPACITY {
128            unsafe {
129                ptr::copy_nonoverlapping(slice.as_ptr(), buffer_ptr.add(tail_idx), slice.len());
130            };
131        } else {
132            let right = CAPACITY - tail_idx;
133
134            unsafe {
135                ptr::copy_nonoverlapping(slice.as_ptr(), buffer_ptr.add(tail_idx), right);
136                ptr::copy_nonoverlapping(
137                    slice.as_ptr().add(right),
138                    buffer_ptr,
139                    slice.len() - right,
140                );
141            }
142        }
143
144        start_tail.wrapping_add(slice.len() as LongNumber)
145    }
146
147    /// Return the number of elements in the queue.
148    ///
149    /// # Safety
150    ///
151    /// It should be called only by the producer.
152    #[inline]
153    pub unsafe fn producer_len(&self) -> usize {
154        let tail = unsafe { self.tail.unsync_load() }; // only the producer can change tail
155        self.cached_head
156            .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
157
158        Self::len(self.cached_head.get(), tail)
159    }
160
161    /// Pops a value from the queue.
162    /// Returns `None` if the queue is empty.
163    ///
164    /// # Safety
165    ///
166    /// It should be called only by the producer.
167    #[inline]
168    pub unsafe fn producer_pop(&self) -> Option<T> {
169        let tail = unsafe { self.tail.unsync_load() }; // only the producer can change tail
170        let mut head = self.cached_head.get();
171
172        loop {
173            if unlikely(head == tail) {
174                self.cached_head
175                    .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
176
177                if head == self.cached_head.get() {
178                    return None;
179                }
180
181                head = self.cached_head.get();
182            }
183
184            let new_head = head.wrapping_add(1);
185
186            match self
187                .head
188                .compare_exchange_weak(head, new_head, Release, Relaxed)
189            {
190                Ok(_) => {
191                    // We are the only producer,
192                    // so we can don't worry about someone overwriting the value before we read it
193
194                    self.cached_head.set(new_head);
195
196                    return Some(unsafe {
197                        self.buffer_thin_ptr()
198                            .add(head as usize % CAPACITY)
199                            .read()
200                            .assume_init()
201                    });
202                }
203                Err(new_head) => {
204                    head = new_head;
205                }
206            }
207        }
208    }
209
210    /// Pops many values from the queue.
211    /// Returns the number of values popped.
212    ///
213    /// # Safety
214    ///
215    /// It should be called only by the producer.
216    #[inline]
217    pub unsafe fn producer_pop_many(&self, dst: &mut [MaybeUninit<T>]) -> usize {
218        let tail = unsafe { self.tail.unsync_load() }; // only the producer can change tail
219        let mut head = self.cached_head.get();
220
221        loop {
222            let mut available = Self::len(head, tail);
223            let mut n = dst.len().min(available);
224
225            if n == 0 {
226                self.cached_head
227                    .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
228
229                if unlikely(head == self.cached_head.get()) {
230                    return 0;
231                }
232
233                head = self.cached_head.get();
234                available = Self::len(head, tail);
235                n = dst.len().min(available);
236            }
237
238            debug_assert!(n <= CAPACITY, "Bug occurred, please report it.");
239
240            let new_head = head.wrapping_add(n as LongNumber);
241
242            match self
243                .head
244                .compare_exchange_weak(head, new_head, Release, Relaxed)
245            {
246                Ok(_) => {
247                    // We are the only producer,
248                    // so we can don't worry about someone overwriting the value before we read it.
249
250                    let dst_ptr = dst.as_mut_ptr();
251                    let head_idx = head as usize % CAPACITY;
252                    let right = CAPACITY - head_idx;
253
254                    if n <= right {
255                        // No wraparound, copy in one shot
256                        unsafe {
257                            ptr::copy_nonoverlapping(
258                                self.buffer_thin_ptr().add(head_idx),
259                                dst_ptr,
260                                n,
261                            );
262                        }
263                    } else {
264                        unsafe {
265                            // Wraparound: copy right half then left half
266                            ptr::copy_nonoverlapping(
267                                self.buffer_thin_ptr().add(head_idx),
268                                dst_ptr,
269                                right,
270                            );
271                            ptr::copy_nonoverlapping(
272                                self.buffer_thin_ptr(),
273                                dst_ptr.add(right),
274                                n - right,
275                            );
276                        }
277                    }
278
279                    self.cached_head.set(new_head);
280
281                    return n;
282                }
283                Err(new_head) => {
284                    head = new_head;
285                }
286            }
287        }
288    }
289
290    /// Pushes a value to the queue.
291    ///
292    /// # Safety
293    ///
294    /// It should be called only by the producer, and the queue should not be full.
295    #[inline(always)]
296    pub unsafe fn push_unchecked(&self, value: T, tail: LongNumber) {
297        unsafe {
298            self.buffer_mut_thin_ptr()
299                .add(tail as usize % CAPACITY)
300                .write(MaybeUninit::new(value));
301        }
302
303        self.tail.store(tail.wrapping_add(1), Release);
304    }
305
306    /// Likely moves a half of the queue and one value to the [`BatchReceiver`].
307    #[inline(never)]
308    #[cold]
309    fn handle_overflow_one<BR: BatchReceiver<T>>(
310        &self,
311        tail: LongNumber,
312        mut head: LongNumber,
313        br: &BR,
314        value: T,
315    ) {
316        debug_assert!(tail == head.wrapping_add(CAPACITY as LongNumber) && tail > head);
317
318        loop {
319            let head_idx = head as usize % CAPACITY;
320            let buffer = self.buffer_mut();
321
322            let (right, left): (&[MaybeUninit<T>], &[MaybeUninit<T>]) =
323                if head_idx < Self::NUM_VALUES_TAKEN as usize {
324                    // we can return only the right half of the queue
325                    (
326                        &buffer[head_idx..head_idx + Self::NUM_VALUES_TAKEN as usize],
327                        &[],
328                    )
329                } else {
330                    let left_part_len = head_idx - Self::NUM_VALUES_TAKEN as usize;
331
332                    (&buffer[head_idx..], &buffer[..left_part_len])
333                };
334
335            // We haven't read the value yet, so we can use `compare_exchange_weak`.
336            //If it fails, we calculate two slices and try again; it is not a performance issue.
337            let res = self.head.compare_exchange_weak(
338                head,
339                head.wrapping_add(Self::NUM_VALUES_TAKEN),
340                Release,
341                Relaxed,
342            );
343
344            match res {
345                Ok(_) => unsafe {
346                    br.push_many_and_one(
347                        &*(ptr::from_ref::<[MaybeUninit<T>]>(left) as *const [T]),
348                        &*(ptr::from_ref::<[MaybeUninit<T>]>(right) as *const [T]),
349                        value,
350                    );
351
352                    return;
353                },
354                Err(new_head) => {
355                    head = new_head;
356
357                    if Self::len(head, tail) < CAPACITY {
358                        // Another thread concurrently
359                        // took the value from the queue.
360                        // Because we are the one producer,
361                        // we can just insert the value (it can't become full before we return).
362
363                        unsafe { self.push_unchecked(value, tail) };
364
365                        return;
366                    }
367                }
368            }
369        }
370    }
371
372    /// Likely moves a half of the queue and many values to the [`BatchReceiver`].
373    #[inline(never)]
374    #[cold]
375    fn handle_overflow_many<BR: BatchReceiver<T>>(
376        &self,
377        tail: LongNumber,
378        mut head: LongNumber,
379        br: &BR,
380        slice: &[T],
381    ) {
382        debug_assert!(tail == head.wrapping_add(CAPACITY as LongNumber) && tail > head);
383
384        loop {
385            let head_idx = head as usize % CAPACITY;
386            let buffer = self.buffer_mut();
387
388            let (right, left): (&[MaybeUninit<T>], &[MaybeUninit<T>]) =
389                if head_idx < Self::NUM_VALUES_TAKEN as usize {
390                    // we can return only the right half of the queue
391                    (
392                        &buffer[head_idx..head_idx + Self::NUM_VALUES_TAKEN as usize],
393                        &[],
394                    )
395                } else {
396                    let left_part_len = head_idx - Self::NUM_VALUES_TAKEN as usize;
397
398                    (&buffer[head_idx..], &buffer[..left_part_len])
399                };
400
401            // We haven't read the value yet, so we can use `compare_exchange_weak`.
402            // If it fails, we calculate two slices and try again; it is not a performance issue.
403            let res = self.head.compare_exchange_weak(
404                head,
405                head.wrapping_add(Self::NUM_VALUES_TAKEN),
406                Release,
407                Relaxed,
408            );
409
410            match res {
411                Ok(_) => {
412                    unsafe {
413                        br.push_many_and_slice(
414                            &*(ptr::from_ref::<[MaybeUninit<T>]>(left) as *const [T]),
415                            &*(ptr::from_ref::<[MaybeUninit<T>]>(right) as *const [T]),
416                            slice,
417                        );
418                    }
419
420                    return;
421                }
422                Err(new_head) => {
423                    head = new_head;
424
425                    let len = Self::len(head, tail);
426
427                    if len + slice.len() <= CAPACITY {
428                        // Another thread concurrently
429                        // took values from the queue.
430                        // Because we are the one producer,
431                        // we can just insert the slice (it can't become full before we return).
432
433                        let new_tail =
434                            Self::copy_slice(self.buffer_mut_thin_ptr().cast(), tail, slice);
435                        self.tail.store(new_tail, Release);
436
437                        return;
438                    }
439                }
440            }
441        }
442    }
443
444    /// Likely moves a half of the queue and one value to the [`BatchReceiver`].
445    ///
446    /// It fails if the operation cannot be finished now.
447    #[inline(never)]
448    #[cold]
449    fn handle_lock_free_overflow_one<BR: LockFreeBatchReceiver<T>>(
450        &self,
451        tail: LongNumber,
452        head: LongNumber,
453        br: &BR,
454        value: T,
455    ) -> Result<(), T> {
456        debug_assert!(tail == head.wrapping_add(CAPACITY as LongNumber) && tail > head);
457
458        let head_idx = head as usize % CAPACITY;
459        let buffer = self.buffer_mut();
460        let (right, left): (&[MaybeUninit<T>], &[MaybeUninit<T>]) =
461            if head_idx < Self::NUM_VALUES_TAKEN as usize {
462                // we can return only the right half of the queue
463                (
464                    &buffer[head_idx..head_idx + Self::NUM_VALUES_TAKEN as usize],
465                    &[],
466                )
467            } else {
468                let left_part_len = head_idx - Self::NUM_VALUES_TAKEN as usize;
469
470                (&buffer[head_idx..], &buffer[..left_part_len])
471            };
472
473        let res = unsafe {
474            br.push_many_and_one_and_commit_if(
475                &*(ptr::from_ref::<[MaybeUninit<T>]>(left) as *const [T]),
476                &*(ptr::from_ref::<[MaybeUninit<T>]>(right) as *const [T]),
477                value,
478                || {
479                    self.head.compare_exchange(
480                        head,
481                        head.wrapping_add(Self::NUM_VALUES_TAKEN),
482                        Release,
483                        Relaxed,
484                    )
485                },
486            )
487        };
488
489        match res {
490            Ok(_) => Ok(()),
491            Err(LockFreePushBatchErr::ShouldWait(value)) => Err(value),
492            Err(LockFreePushBatchErr::CondictionIsFalse((value, head))) => {
493                debug_assert!(Self::len(head, tail) < CAPACITY);
494
495                // Another thread concurrently
496                // stole from the queue.
497                // Because we are the one producer,
498                // we can just insert the value
499                // (it can't become full before we return).
500
501                unsafe { self.push_unchecked(value, tail) };
502
503                Ok(())
504            }
505        }
506    }
507
508    /// Likely moves a half of the queue and many values to the [`BatchReceiver`].
509    ///
510    /// It fails if the operation cannot be finished now.
511    #[inline(never)]
512    #[cold]
513    fn handle_lock_free_overflow_many<BR: LockFreeBatchReceiver<T>>(
514        &self,
515        tail: LongNumber,
516        head: LongNumber,
517        br: &BR,
518        slice: &[T],
519    ) -> Result<(), ()> {
520        debug_assert!(tail == head.wrapping_add(CAPACITY as LongNumber) && tail > head);
521
522        let head_idx = head as usize % CAPACITY;
523        let buffer = self.buffer_mut();
524
525        let (right, left): (&[MaybeUninit<T>], &[MaybeUninit<T>]) =
526            if head_idx < Self::NUM_VALUES_TAKEN as usize {
527                // we can return only the right half of the queue
528                (
529                    &buffer[head_idx..head_idx + Self::NUM_VALUES_TAKEN as usize],
530                    &[],
531                )
532            } else {
533                let left_part_len = head_idx - Self::NUM_VALUES_TAKEN as usize;
534
535                (&buffer[head_idx..], &buffer[..left_part_len])
536            };
537
538        let res = unsafe {
539            br.lock_free_push_many_and_slice_and_commit_if(
540                &*(ptr::from_ref::<[MaybeUninit<T>]>(left) as *const [T]),
541                &*(ptr::from_ref::<[MaybeUninit<T>]>(right) as *const [T]),
542                slice,
543                || {
544                    self.head.compare_exchange(
545                        head,
546                        head.wrapping_add(Self::NUM_VALUES_TAKEN),
547                        Release,
548                        Relaxed,
549                    )
550                },
551            )
552        };
553
554        match res {
555            Ok(_) => Ok(()),
556            Err(LockFreePushBatchErr::ShouldWait(())) => Err(()),
557            Err(LockFreePushBatchErr::CondictionIsFalse(((), head))) => {
558                let len = Self::len(head, tail);
559
560                if len + slice.len() <= CAPACITY {
561                    // Another thread concurrently
562                    // took values from the queue.
563                    // Because we are the one producer,
564                    // we can just insert the slice
565                    // (it can't become full before we return).
566
567                    let new_tail = Self::copy_slice(self.buffer_mut_thin_ptr().cast(), tail, slice);
568                    self.tail.store(new_tail, Release);
569
570                    return Ok(());
571                }
572
573                Ok(())
574            }
575        }
576    }
577
578    /// Pushes a value to the queue or to the [`BatchReceiver`].
579    ///
580    /// # Safety
581    ///
582    /// It should be called only by the producer.
583    #[inline]
584    pub unsafe fn producer_push<BR: BatchReceiver<T>>(&self, value: T, batch_receiver: &BR) {
585        let tail = unsafe { self.tail.unsync_load() }; // only the producer can change tail
586        let head = self.cached_head.get();
587
588        if unlikely(Self::len(head, tail) == CAPACITY) {
589            self.cached_head
590                .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
591
592            if unlikely(head == self.cached_head.get()) {
593                self.handle_overflow_one(tail, head, batch_receiver, value);
594
595                return;
596            }
597
598            // The queue is not full
599        }
600
601        unsafe { self.push_unchecked(value, tail) };
602    }
603
604    /// Pushes a value to the queue or to the [`BatchReceiver`]
605    /// or returns an error if the operation should wait.
606    ///
607    /// # Safety
608    ///
609    /// It should be called only by the producer.
610    #[inline]
611    pub unsafe fn producer_lock_free_push<BR: LockFreeBatchReceiver<T>>(
612        &self,
613        value: T,
614        batch_receiver: &BR,
615    ) -> Result<(), T> {
616        let tail = unsafe { self.tail.unsync_load() }; // only the producer can change tail
617        let head = self.cached_head.get();
618
619        if unlikely(Self::len(self.cached_head.get(), tail) == CAPACITY) {
620            self.cached_head.set(self.head.load(Acquire));
621
622            if unlikely(head == self.cached_head.get()) {
623                self.handle_lock_free_overflow_one(tail, head, batch_receiver, value)?;
624
625                return Ok(());
626            }
627
628            // The queue is not full
629        }
630
631        unsafe { self.push_unchecked(value, tail) };
632
633        Ok(())
634    }
635
636    /// Pushes a value to the queue or returns an error.
637    ///
638    /// # Safety
639    ///
640    /// It should be called only by the producer.
641    #[inline]
642    pub unsafe fn producer_maybe_push(&self, value: T) -> Result<(), T> {
643        let tail = unsafe { self.tail.unsync_load() }; // only the producer can change tail
644        let head = self.cached_head.get();
645
646        if unlikely(Self::len(head, tail) >= CAPACITY) {
647            self.cached_head
648                .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
649
650            if unlikely(head == self.cached_head.get()) {
651                return Err(value);
652            }
653
654            // The queue is not full
655        }
656
657        debug_assert!(Self::len(self.cached_head.get(), tail) < CAPACITY);
658
659        unsafe { self.push_unchecked(value, tail) };
660
661        Ok(())
662    }
663
664    /// Pushes many values to the queue.
665    /// It accepts two slices to allow using ring-based src.
666    ///
667    /// # Safety
668    ///
669    /// It should be called only by the producer, and the space is enough.
670    #[inline]
671    pub unsafe fn producer_push_many_unchecked(&self, first: &[T], last: &[T]) {
672        if cfg!(debug_assertions) {
673            let head = self.head.load(Acquire);
674            let tail = unsafe { self.tail.unsync_load() }; // only the producer can change tail
675
676            debug_assert!(Self::len(head, tail) + first.len() + last.len() <= CAPACITY);
677        }
678
679        // It is SPMC, and it is expected that the capacity is enough.
680
681        let mut tail = unsafe { self.tail.unsync_load() }; // only the producer can change tail
682
683        tail = Self::copy_slice(self.buffer_mut_thin_ptr().cast(), tail, first);
684        tail = Self::copy_slice(self.buffer_mut_thin_ptr().cast(), tail, last);
685
686        self.tail.store(tail, Release);
687    }
688
689    /// Pushes many values to the queue or to the [`BatchReceiver`].
690    ///
691    /// # Safety
692    ///
693    /// It should be called only by the producer.
694    #[inline]
695    pub unsafe fn producer_push_many<BR: BatchReceiver<T>>(
696        &self,
697        slice: &[T],
698        batch_receiver: &BR,
699    ) {
700        let mut tail = unsafe { self.tail.unsync_load() }; // only the producer can change tail
701
702        if unlikely(Self::len(self.cached_head.get(), tail) + slice.len() > CAPACITY) {
703            self.cached_head
704                .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
705
706            if unlikely(Self::len(self.cached_head.get(), tail) + slice.len() > CAPACITY) {
707                self.handle_overflow_many(tail, self.cached_head.get(), batch_receiver, slice);
708
709                return;
710            }
711
712            // We have enough space
713        }
714
715        tail = Self::copy_slice(self.buffer_mut_thin_ptr().cast(), tail, slice);
716
717        self.tail.store(tail, Release);
718    }
719
720    /// Pushes many values to the queue or to the [`BatchReceiver`]
721    /// or returns an error if the operation should wait.
722    ///
723    /// # Safety
724    ///
725    /// It should be called only by the producer.
726    #[inline]
727    pub unsafe fn producer_lock_free_push_many<BR: LockFreeBatchReceiver<T>>(
728        &self,
729        slice: &[T],
730        batch_receiver: &BR,
731    ) -> Result<(), ()> {
732        let mut tail = unsafe { self.tail.unsync_load() }; // only the producer can change tail
733
734        if unlikely(Self::len(self.cached_head.get(), tail) + slice.len() > CAPACITY) {
735            self.cached_head
736                .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
737
738            if unlikely(Self::len(self.cached_head.get(), tail) + slice.len() > CAPACITY) {
739                self.handle_lock_free_overflow_many(
740                    tail,
741                    self.cached_head.get(),
742                    batch_receiver,
743                    slice,
744                )?;
745
746                return Ok(());
747            }
748
749            // We have enough space
750        }
751
752        tail = Self::copy_slice(self.buffer_mut_thin_ptr().cast(), tail, slice);
753
754        self.tail.store(tail, Release);
755
756        Ok(())
757    }
758
759    /// Pushes many values to the queue or returns an error.
760    ///
761    /// # Safety
762    ///
763    /// It should be called only by the producer.
764    #[inline]
765    pub unsafe fn producer_maybe_push_many(&self, slice: &[T]) -> Result<(), ()> {
766        let mut tail = unsafe { self.tail.unsync_load() }; // only the producer can change tail
767
768        if unlikely(Self::len(self.cached_head.get(), tail) + slice.len() > CAPACITY) {
769            self.cached_head
770                .set(self.head.load(SUSPICIOUS_RELAXED_ACQUIRE));
771
772            if unlikely(Self::len(self.cached_head.get(), tail) + slice.len() > CAPACITY) {
773                return Err(()); // full
774            }
775
776            // We have enough space
777        }
778
779        debug_assert!(Self::len(self.cached_head.get(), tail) + slice.len() <= CAPACITY);
780
781        tail = Self::copy_slice(self.buffer_mut_thin_ptr().cast(), tail, slice);
782
783        self.tail.store(tail, Release);
784
785        Ok(())
786    }
787
788    /// Read the doc at [`Producer::copy_and_commit_if`].
789    ///
790    /// # Safety
791    ///
792    /// The called should be the only producer and the safety conditions
793    /// from [`Producer::copy_and_commit_if`].
794    ///
795    /// # Panics
796    ///
797    /// Read the doc at [`Producer::copy_and_commit_if`].
798    unsafe fn producer_copy_and_commit_if<FSuccess, FError>(
799        &self,
800        left: &[T],
801        right: &[T],
802        condition: impl FnOnce() -> Result<FSuccess, FError>,
803    ) -> Result<FSuccess, FError> {
804        debug_assert!(left.len() + right.len() + self.producer_len() <= CAPACITY);
805
806        let mut new_tail = Self::copy_slice(
807            self.buffer_mut_thin_ptr().cast(),
808            unsafe { self.tail.unsync_load() }, // only the producer can change tail
809            right,
810        );
811        new_tail = Self::copy_slice(self.buffer_mut_thin_ptr().cast(), new_tail, left);
812
813        let should_commit = condition();
814        match should_commit {
815            Ok(res) => {
816                self.tail.store(new_tail, Release);
817
818                Ok(res)
819            }
820            Err(err) => Err(err),
821        }
822    }
823}
824
825// Consumers
826impl<T: Send, const CAPACITY: usize, AtomicWrapper: Deref<Target = LongAtomic> + Default>
827    SPMCBoundedQueue<T, CAPACITY, AtomicWrapper>
828{
829    /// Returns the number of values in the queue.
830    #[inline]
831    pub fn consumer_len(&self) -> usize {
832        loop {
833            let head = self.head.load(SUSPICIOUS_RELAXED_ACQUIRE);
834            let tail = self.tail.load(SUSPICIOUS_RELAXED_ACQUIRE);
835            let len = Self::len(head, tail);
836
837            if unlikely(len > CAPACITY) {
838                // Inconsistent state (this thread has been preempted
839                // after we have loaded `head`,
840                // and before we have loaded `tail`),
841                // try again
842                continue;
843            }
844
845            return len;
846        }
847    }
848
849    /// Pops many values from the queue to the `dst`.
850    /// Returns the number of values popped.
851    #[inline]
852    pub fn consumer_pop_many(&self, dst: &mut [MaybeUninit<T>]) -> usize {
853        let mut head = self.head.load(SUSPICIOUS_RELAXED_ACQUIRE);
854        let mut tail = self.tail.load(Acquire);
855
856        loop {
857            let available = Self::len(head, tail);
858            let n = dst.len().min(available);
859
860            if n == 0 {
861                return 0;
862            }
863
864            if unlikely(n > CAPACITY) {
865                // Inconsistent state (this thread has been preempted
866                // after we have loaded `head`,
867                // and before we have loaded `tail`),
868                // try again
869
870                head = self.head.load(SUSPICIOUS_RELAXED_ACQUIRE);
871                tail = self.tail.load(Acquire);
872
873                continue;
874            }
875
876            let dst_ptr = dst.as_mut_ptr();
877            let head_idx = head as usize % CAPACITY;
878            let right = CAPACITY - head_idx;
879
880            // We optimistically copy the values from the buffer into the dst.
881            // On CAS failure, we forget the copied values and try again.
882            // It is safe because we can concurrently read from the head.
883
884            if n <= right {
885                // No wraparound, copy in one shot
886                unsafe {
887                    ptr::copy_nonoverlapping(self.buffer_thin_ptr().add(head_idx), dst_ptr, n);
888                }
889            } else {
890                unsafe {
891                    // Wraparound: copy right half then left half
892                    ptr::copy_nonoverlapping(self.buffer_thin_ptr().add(head_idx), dst_ptr, right);
893                    ptr::copy_nonoverlapping(self.buffer_thin_ptr(), dst_ptr.add(right), n - right);
894                }
895            }
896
897            // Now claim ownership
898            // CAS is strong because we don't want to recopy the values
899            match self.head.compare_exchange(
900                head,
901                head.wrapping_add(n as LongNumber),
902                Release,
903                Relaxed,
904            ) {
905                Ok(_) => return n,
906                Err(actual_head) => {
907                    // CAS failed, forget read values (they're MaybeUninit, so it's fine)
908                    // But don't try to drop, just retry
909
910                    head = actual_head;
911                    tail = self.tail.load(Acquire);
912                }
913            }
914        }
915    }
916
917    /// Steals many values from the consumer to the `dst`.
918    /// Returns the number of values stolen.
919    ///
920    /// # Panics
921    ///
922    /// If `dst` is not empty.
923    pub fn steal_into(&self, dst: &impl crate::single_producer::SingleProducer<T>) -> usize {
924        if cfg!(debug_assertions) {
925            assert!(
926                dst.is_empty(),
927                "steal_into should not be called when dst is not empty"
928            );
929        }
930
931        let mut src_head = self.head.load(SUSPICIOUS_RELAXED_ACQUIRE);
932
933        loop {
934            let src_tail = self.tail.load(Acquire);
935            let n = Self::len(src_head, src_tail) / 2;
936
937            if n > CAPACITY / 2 {
938                // Inconsistent state (this thread has been preempted
939                // after we have loaded `src_head`,
940                // and before we have loaded `src_tail`),
941                // try again
942
943                src_head = self.head.load(SUSPICIOUS_RELAXED_ACQUIRE);
944
945                continue;
946            }
947
948            if !cfg!(feature = "always_steal") && n < 4 || n == 0 {
949                // we don't steal less than 4 by default
950                // because else we may lose more because of cache locality and NUMA awareness
951                return 0;
952            }
953
954            let src_head_idx = src_head as usize % CAPACITY;
955
956            let (src_right, src_left): (&[T], &[T]) = unsafe {
957                let right_occupied = CAPACITY - src_head_idx;
958                if n <= right_occupied {
959                    (
960                        slice::from_raw_parts(self.buffer_thin_ptr().add(src_head_idx).cast(), n),
961                        &[],
962                    )
963                } else {
964                    (
965                        slice::from_raw_parts(
966                            self.buffer_thin_ptr().add(src_head_idx).cast(),
967                            right_occupied,
968                        ),
969                        slice::from_raw_parts(self.buffer_thin_ptr().cast(), n - right_occupied),
970                    )
971                }
972            };
973
974            let cas_closure = || {
975                // CAS is strong because we don't want to recopy the values
976                self.head.compare_exchange(
977                    src_head,
978                    src_head.wrapping_add(n as LongNumber),
979                    Release,
980                    Relaxed,
981                )
982            };
983
984            // CAS is strong because we don't want to recopy the values
985            let res = unsafe { dst.copy_and_commit_if(src_right, src_left, cas_closure) };
986
987            match res {
988                Ok(_) => {
989                    return n;
990                }
991                Err(current_head) => {
992                    // another thread has read the same values, full retry
993                    src_head = current_head;
994                }
995            }
996        }
997    }
998}
999
1000impl<T: Send, const CAPACITY: usize, AtomicWrapper: Deref<Target = LongAtomic> + Default> Default
1001    for SPMCBoundedQueue<T, CAPACITY, AtomicWrapper>
1002{
1003    fn default() -> Self {
1004        Self::new()
1005    }
1006}
1007
1008unsafe impl<T: Send, const CAPACITY: usize, AtomicWrapper> Sync
1009    for SPMCBoundedQueue<T, CAPACITY, AtomicWrapper>
1010where
1011    AtomicWrapper: Deref<Target = LongAtomic> + Default,
1012{
1013}
1014
1015#[allow(clippy::non_send_fields_in_send_ty, reason = "We guarantee it is Send")]
1016unsafe impl<T: Send, const CAPACITY: usize, AtomicWrapper> Send
1017    for SPMCBoundedQueue<T, CAPACITY, AtomicWrapper>
1018where
1019    AtomicWrapper: Deref<Target = LongAtomic> + Default,
1020{
1021}
1022
1023impl<T: Send, const CAPACITY: usize, AtomicWrapper> Drop
1024    for SPMCBoundedQueue<T, CAPACITY, AtomicWrapper>
1025where
1026    AtomicWrapper: Deref<Target = LongAtomic> + Default,
1027{
1028    fn drop(&mut self) {
1029        // While dropping, there is no concurrency
1030
1031        if needs_drop::<T>() {
1032            let mut head = unsafe { self.head.unsync_load() };
1033            let tail = unsafe { self.tail.unsync_load() };
1034
1035            while head != tail {
1036                unsafe {
1037                    ptr::drop_in_place(
1038                        self.buffer_thin_ptr()
1039                            .add(head as usize % CAPACITY)
1040                            .cast::<T>()
1041                            .cast_mut(),
1042                    );
1043                }
1044
1045                head = head.wrapping_add(1);
1046            }
1047        }
1048    }
1049}
1050
1051/// Generates SPMC producer and consumer.
1052macro_rules! generate_spmc_producer_and_consumer {
1053    ($producer_name:ident, $consumer_name:ident, $atomic_wrapper:ty) => {
1054        /// The producer of the [`SPMCBoundedQueue`].
1055        pub struct $producer_name<T: Send, const CAPACITY: usize> {
1056            inner: LightArc<SPMCBoundedQueue<T, CAPACITY, $atomic_wrapper>>,
1057            _non_sync: PhantomData<*const ()>,
1058        }
1059
1060        impl<T: Send, const CAPACITY: usize> $crate::Producer<T> for $producer_name<T, CAPACITY> {
1061            #[inline]
1062            fn capacity(&self) -> usize {
1063                CAPACITY as usize
1064            }
1065
1066            #[inline]
1067            fn len(&self) -> usize {
1068                unsafe { self.inner.producer_len() }
1069            }
1070
1071            #[inline]
1072            fn maybe_push(&self, value: T) -> Result<(), T> {
1073                unsafe { self.inner.producer_maybe_push(value) }
1074            }
1075        }
1076
1077        impl<T: Send, const CAPACITY: usize> $crate::LockFreeProducer<T>
1078            for $producer_name<T, CAPACITY>
1079        {
1080            fn lock_free_maybe_push(&self, value: T) -> Result<(), LockFreePushErr<T>> {
1081                unsafe {
1082                    self.inner
1083                        .producer_maybe_push(value)
1084                        .map_err(|value| LockFreePushErr::Full(value))
1085                }
1086            }
1087        }
1088
1089        impl<T: Send, const CAPACITY: usize> $crate::single_producer::SingleProducer<T>
1090            for $producer_name<T, CAPACITY>
1091        {
1092            #[inline]
1093            unsafe fn push_many_unchecked(&self, first: &[T], last: &[T]) {
1094                unsafe { self.inner.producer_push_many_unchecked(first, last) };
1095            }
1096
1097            #[inline]
1098            unsafe fn maybe_push_many(&self, slice: &[T]) -> Result<(), ()> {
1099                unsafe { self.inner.producer_maybe_push_many(slice) }
1100            }
1101
1102            #[inline]
1103            unsafe fn copy_and_commit_if<F, FSuccess, FError>(
1104                &self,
1105                right: &[T],
1106                left: &[T],
1107                f: F,
1108            ) -> Result<FSuccess, FError>
1109            where
1110                F: FnOnce() -> Result<FSuccess, FError>,
1111            {
1112                unsafe { self.inner.producer_copy_and_commit_if(right, left, f) }
1113            }
1114        }
1115
1116        impl<T: Send, const CAPACITY: usize> $crate::single_producer::SingleLockFreeProducer<T>
1117            for $producer_name<T, CAPACITY>
1118        {
1119            unsafe fn lock_free_maybe_push_many(
1120                &self,
1121                slice: &[T],
1122            ) -> Result<(), LockFreePushManyErr> {
1123                unsafe {
1124                    self.inner
1125                        .producer_maybe_push_many(slice)
1126                        .map_err(|_| LockFreePushManyErr::NotEnoughSpace)
1127                }
1128            }
1129        }
1130
1131        impl<T: Send, const CAPACITY: usize> $crate::multi_consumer::MultiConsumerSpawner<T>
1132            for $producer_name<T, CAPACITY>
1133        {
1134            type SpawnedConsumer = $consumer_name<T, CAPACITY>;
1135
1136            fn spawn_multi_consumer(&self) -> Self::SpawnedConsumer {
1137                $consumer_name {
1138                    inner: self.inner.clone(),
1139                    _non_sync: PhantomData,
1140                }
1141            }
1142        }
1143
1144        impl<T: Send, const CAPACITY: usize> $crate::multi_consumer::MultiLockFreeConsumerSpawner<T>
1145            for $producer_name<T, CAPACITY>
1146        {
1147            type SpawnedLockFreeConsumer = $consumer_name<T, CAPACITY>;
1148
1149            fn spawn_multi_lock_free_consumer(&self) -> Self::SpawnedLockFreeConsumer {
1150                $consumer_name {
1151                    inner: self.inner.clone(),
1152                    _non_sync: PhantomData,
1153                }
1154            }
1155        }
1156
1157        impl<T: Send, const CAPACITY: usize> $crate::spmc_producer::SPMCProducer<T>
1158            for $producer_name<T, CAPACITY>
1159        {
1160            #[inline]
1161            unsafe fn push_many<BR: BatchReceiver<T>>(&self, slice: &[T], batch_receiver: &BR) {
1162                unsafe { self.inner.producer_push_many(slice, batch_receiver) };
1163            }
1164
1165            #[inline]
1166            fn push<BR: BatchReceiver<T>>(&self, value: T, batch_receiver: &BR) {
1167                unsafe { self.inner.producer_push(value, batch_receiver) };
1168            }
1169
1170            #[inline]
1171            fn pop(&self) -> Option<T> {
1172                unsafe { self.inner.producer_pop() }
1173            }
1174
1175            #[inline]
1176            fn pop_many(&self, dst: &mut [MaybeUninit<T>]) -> usize {
1177                unsafe { self.inner.producer_pop_many(dst) }
1178            }
1179        }
1180
1181        impl<T: Send, const CAPACITY: usize> $crate::spmc_producer::SPMCLockFreeProducer<T>
1182            for $producer_name<T, CAPACITY>
1183        {
1184            unsafe fn lock_free_push_many<BR: LockFreeBatchReceiver<T>>(
1185                &self,
1186                slice: &[T],
1187                batch_receiver: &BR,
1188            ) -> Result<(), ()> {
1189                unsafe {
1190                    self.inner
1191                        .producer_lock_free_push_many(slice, batch_receiver)
1192                }
1193            }
1194
1195            fn lock_free_push<BR: LockFreeBatchReceiver<T>>(
1196                &self,
1197                value: T,
1198                batch_receiver: &BR,
1199            ) -> Result<(), T> {
1200                unsafe { self.inner.producer_lock_free_push(value, batch_receiver) }
1201            }
1202
1203            fn lock_free_pop_many(&self, dst: &mut [MaybeUninit<T>]) -> (usize, bool) {
1204                unsafe { (self.inner.producer_pop_many(dst), false) }
1205            }
1206
1207            fn lock_free_pop(&self) -> Result<T, LockFreePopErr> {
1208                unsafe { self.inner.producer_pop().ok_or(LockFreePopErr::Empty) }
1209            }
1210        }
1211
1212        unsafe impl<T: Send, const CAPACITY: usize> Send for $producer_name<T, CAPACITY> {}
1213
1214        /// The consumer of the [`SPMCBoundedQueue`].
1215        pub struct $consumer_name<T: Send, const CAPACITY: usize> {
1216            inner: LightArc<SPMCBoundedQueue<T, CAPACITY, $atomic_wrapper>>,
1217            _non_sync: PhantomData<*const ()>,
1218        }
1219
1220        impl<T: Send, const CAPACITY: usize> $crate::Consumer<T> for $consumer_name<T, CAPACITY> {
1221            #[inline]
1222            fn capacity(&self) -> usize {
1223                CAPACITY as usize
1224            }
1225
1226            #[inline]
1227            fn len(&self) -> usize {
1228                self.inner.consumer_len()
1229            }
1230
1231            #[inline]
1232            fn pop_many(&self, dst: &mut [MaybeUninit<T>]) -> usize {
1233                self.inner.consumer_pop_many(dst)
1234            }
1235
1236            #[inline(never)]
1237            fn steal_into(&self, dst: &impl $crate::single_producer::SingleProducer<T>) -> usize {
1238                self.inner.steal_into(dst)
1239            }
1240        }
1241
1242        impl<T: Send, const CAPACITY: usize> $crate::LockFreeConsumer<T>
1243            for $consumer_name<T, CAPACITY>
1244        {
1245            #[inline]
1246            fn lock_free_pop_many(&self, dst: &mut [MaybeUninit<T>]) -> (usize, bool) {
1247                (self.inner.consumer_pop_many(dst), false)
1248            }
1249
1250            #[inline(never)]
1251            fn lock_free_steal_into(
1252                &self,
1253                dst: &impl $crate::single_producer::SingleLockFreeProducer<T>,
1254            ) -> (usize, bool) {
1255                (self.inner.steal_into(dst), false)
1256            }
1257        }
1258
1259        impl<T: Send, const CAPACITY: usize> $crate::multi_consumer::MultiConsumer<T>
1260            for $consumer_name<T, CAPACITY>
1261        {
1262        }
1263
1264        impl<T: Send, const CAPACITY: usize> $crate::multi_consumer::MultiLockFreeConsumer<T>
1265            for $consumer_name<T, CAPACITY>
1266        {
1267        }
1268
1269        impl<T: Send, const CAPACITY: usize> Clone for $consumer_name<T, CAPACITY> {
1270            fn clone(&self) -> Self {
1271                Self {
1272                    inner: self.inner.clone(),
1273                    _non_sync: PhantomData,
1274                }
1275            }
1276        }
1277
1278        unsafe impl<T: Send, const CAPACITY: usize> Send for $consumer_name<T, CAPACITY> {}
1279    };
1280
1281    ($producer_name:ident, $consumer_name:ident) => {
1282        generate_spmc_producer_and_consumer!(
1283            $producer_name,
1284            $consumer_name,
1285            NotCachePaddedLongAtomic
1286        );
1287    };
1288}
1289
1290generate_spmc_producer_and_consumer!(SPMCProducer, SPMCConsumer);
1291
1292/// Creates a new single-producer, multi-consumer queue with the given capacity.
1293/// Returns [`producer`](SPMCProducer) and [`consumer`](SPMCConsumer).
1294///
1295/// It accepts the capacity as a const generic parameter.
1296/// We recommend using a power of two.
1297///
1298/// The producer __should__ be only one while consumers can be cloned.
1299///
1300/// # Bounded queue vs. [`unbounded queue`](crate::spmc::new_unbounded)
1301///
1302/// - [`maybe_push`](crate::Producer::maybe_push),
1303///   [`maybe_push_many`](crate::single_producer::SingleProducer::maybe_push_many)
1304///   can return an error only for `bounded` queue.
1305/// - [`push`](crate::spmc_producer::SPMCProducer::push),
1306///   [`push_many`](crate::spmc_producer::SPMCProducer::push_many)
1307///   writes to the [`BatchReceiver`] only for `bounded` queue.
1308/// - [`Consumer::steal_into`](crate::Consumer::steal_into)
1309///   and [`Consumer::pop_many`](crate::Consumer::pop_many) can pop zero values even if the source
1310///   queue is not empty for `unbounded` queue.
1311/// - [`Consumer::capacity`](crate::Consumer::capacity) and [`Consumer::len`](crate::Consumer::len)
1312///   can return old values for `unbounded` queue.
1313/// - All methods of `bounded` queue work much faster than all methods of `unbounded` queue.
1314///
1315/// # Cache padding
1316///
1317/// Cache padding can improve the performance of the queue many times, but it also requires
1318/// much more memory (likely 128 or 256 more bytes for the queue).
1319/// If you can sacrifice some memory for the performance, use [`new_cache_padded_bounded`].
1320///
1321/// # Examples
1322///
1323/// ```
1324/// use parcoll::spmc::new_bounded;
1325/// use parcoll::{Consumer, Producer};
1326///
1327/// let (producer, consumer) = new_bounded::<_, 256>();
1328/// let consumer2 = consumer.clone(); // You can clone the consumer
1329///
1330/// producer.maybe_push(1).unwrap();
1331/// producer.maybe_push(2).unwrap();
1332///
1333/// let mut slice = [std::mem::MaybeUninit::uninit(); 3];
1334/// let popped = consumer.pop_many(&mut slice);
1335///
1336/// assert_eq!(popped, 2);
1337/// assert_eq!(unsafe { slice[0].assume_init() }, 1);
1338/// assert_eq!(unsafe { slice[1].assume_init() }, 2);
1339/// ```
1340pub fn new_bounded<T: Send, const CAPACITY: usize>(
1341) -> (SPMCProducer<T, CAPACITY>, SPMCConsumer<T, CAPACITY>) {
1342    let queue = LightArc::new(SPMCBoundedQueue::new());
1343
1344    (
1345        SPMCProducer {
1346            inner: queue.clone(),
1347            _non_sync: PhantomData,
1348        },
1349        SPMCConsumer {
1350            inner: queue,
1351            _non_sync: PhantomData,
1352        },
1353    )
1354}
1355
1356generate_spmc_producer_and_consumer!(
1357    CachePaddedSPMCProducer,
1358    CachePaddedSPMCConsumer,
1359    CachePaddedLongAtomic
1360);
1361
1362/// Creates a new single-producer, multi-consumer queue with the given capacity.
1363/// Returns [`producer`](CachePaddedSPMCProducer) and [`consumer`](CachePaddedSPMCConsumer).
1364///
1365/// It accepts the capacity as a const generic parameter.
1366/// We recommend using a power of two.
1367///
1368/// The producer __should__ be only one while consumers can be cloned.
1369///
1370/// # Bounded queue vs. [`unbounded queue`](crate::spmc::new_unbounded)
1371///
1372/// - [`maybe_push`](crate::Producer::maybe_push),
1373///   [`maybe_push_many`](crate::single_producer::SingleProducer::maybe_push_many)
1374///   can return an error only for `bounded` queue.
1375/// - [`push`](crate::spmc_producer::SPMCProducer::push),
1376///   [`push_many`](crate::spmc_producer::SPMCProducer::push_many)
1377///   writes to the [`BatchReceiver`] only for `bounded` queue.
1378/// - [`Consumer::steal_into`](crate::Consumer::steal_into)
1379///   and [`Consumer::pop_many`](crate::Consumer::pop_many) can pop zero values even if the source
1380///   queue is not empty for `unbounded` queue.
1381/// - [`Consumer::capacity`](crate::Consumer::capacity) and [`Consumer::len`](crate::Consumer::len)
1382///   can return old values for `unbounded` queue.
1383/// - All methods of `bounded` queue work much faster than all methods of `unbounded` queue.
1384///
1385/// # Cache padding
1386///
1387/// Cache padding can improve the performance of the queue many times, but it also requires
1388/// much more memory (likely 128 or 256 more bytes for the queue).
1389/// If you can't sacrifice some memory for the performance, use [`new_bounded`].
1390///
1391/// # Examples
1392///
1393/// ```
1394/// use parcoll::spmc::new_bounded;
1395/// use parcoll::{Consumer, Producer};
1396///
1397/// let (producer, consumer) = new_bounded::<_, 256>();
1398/// let consumer2 = consumer.clone(); // You can clone the consumer
1399///
1400/// producer.maybe_push(1).unwrap();
1401/// producer.maybe_push(2).unwrap();
1402///
1403/// let mut slice = [std::mem::MaybeUninit::uninit(); 3];
1404/// let popped = consumer.pop_many(&mut slice);
1405///
1406/// assert_eq!(popped, 2);
1407/// assert_eq!(unsafe { slice[0].assume_init() }, 1);
1408/// assert_eq!(unsafe { slice[1].assume_init() }, 2);
1409/// ```
1410pub fn new_cache_padded_bounded<T: Send, const CAPACITY: usize>() -> (
1411    CachePaddedSPMCProducer<T, CAPACITY>,
1412    CachePaddedSPMCConsumer<T, CAPACITY>,
1413) {
1414    let queue = LightArc::new(SPMCBoundedQueue::new());
1415
1416    (
1417        CachePaddedSPMCProducer {
1418            inner: queue.clone(),
1419            _non_sync: PhantomData,
1420        },
1421        CachePaddedSPMCConsumer {
1422            inner: queue,
1423            _non_sync: PhantomData,
1424        },
1425    )
1426}
1427
1428#[cfg(test)]
1429mod tests {
1430    use super::*;
1431    use crate::mutex_vec_queue::MutexVecQueue;
1432    use crate::single_producer::{SingleLockFreeProducer, SingleProducer};
1433    use crate::spmc_producer::{SPMCLockFreeProducer, SPMCProducer};
1434    use crate::{Consumer, LockFreeConsumer, Producer};
1435    use std::collections::VecDeque;
1436
1437    const CAPACITY: usize = 16;
1438
1439    #[test]
1440    fn test_spmc_bounded_size() {
1441        let queue = SPMCBoundedQueue::<u8, CAPACITY>::new();
1442
1443        assert_eq!(
1444            size_of_val(&queue),
1445            CAPACITY + size_of::<LongAtomic>() * 2 + align_of_val(&queue)
1446        );
1447
1448        let cache_padded_queue = SPMCBoundedQueue::<u8, CAPACITY, CachePaddedLongAtomic>::new();
1449
1450        assert_eq!(
1451            size_of_val(&cache_padded_queue),
1452            size_of::<CachePaddedLongAtomic>() * 2 + CAPACITY + align_of_val(&cache_padded_queue)
1453        );
1454    }
1455
1456    #[test]
1457    fn test_spmc_bounded_seq_insertions() {
1458        let global_queue = MutexVecQueue::new();
1459        let (producer, _) = new_bounded::<_, CAPACITY>();
1460
1461        for i in 0..CAPACITY * 100 {
1462            producer.push(i, &global_queue);
1463        }
1464
1465        let (new_producer, _) = new_bounded::<_, CAPACITY>();
1466
1467        global_queue
1468            .move_batch_to_producer(&new_producer, producer.capacity() - producer.len());
1469
1470        assert_eq!(
1471            producer.len() + new_producer.len() + global_queue.len(),
1472            CAPACITY * 100
1473        );
1474
1475        for _ in 0..producer.len() {
1476            assert!(producer.pop().is_some());
1477        }
1478
1479        for _ in 0..new_producer.len() {
1480            assert!(new_producer.pop().is_some());
1481        }
1482    }
1483
1484    #[test]
1485    fn test_spmc_bounded_stealing() {
1486        const TRIES: usize = 10;
1487
1488        let global_queue = MutexVecQueue::new();
1489        let (producer1, consumer) = new_bounded::<_, CAPACITY>();
1490        let (producer2, _) = new_bounded::<_, CAPACITY>();
1491
1492        let mut stolen = VecDeque::new();
1493
1494        for _ in 0..TRIES * 2 {
1495            for i in 0..CAPACITY / 2 {
1496                producer1.push(i, &global_queue);
1497            }
1498
1499            consumer.steal_into(&producer2);
1500
1501            while let Some(task) = producer2.pop() {
1502                stolen.push_back(task);
1503            }
1504
1505            assert!(global_queue.is_empty());
1506        }
1507
1508        assert!(producer2.is_empty());
1509
1510        let mut count = 0;
1511
1512        while producer1.pop().is_some() {
1513            count += 1;
1514        }
1515
1516        assert_eq!(count + stolen.len() + global_queue.len(), CAPACITY * TRIES);
1517    }
1518
1519    #[test]
1520    fn test_spmc_bounded_many() {
1521        const BATCH_SIZE: usize = 8;
1522        const N: usize = BATCH_SIZE * 100;
1523
1524        let global_queue = MutexVecQueue::new();
1525        let (producer, consumer) = new_bounded::<_, CAPACITY>();
1526
1527        for i in 0..N / BATCH_SIZE / 2 {
1528            let slice = (0..BATCH_SIZE)
1529                .map(|j| i * BATCH_SIZE + j)
1530                .collect::<Vec<_>>();
1531
1532            unsafe {
1533                producer.maybe_push_many(&slice).unwrap();
1534            }
1535
1536            let mut slice = [MaybeUninit::uninit(); BATCH_SIZE];
1537            producer.pop_many(slice.as_mut_slice());
1538
1539            for (j, item) in slice.iter().enumerate().take(BATCH_SIZE) {
1540                let index = i * BATCH_SIZE + j;
1541
1542                assert_eq!(unsafe { item.assume_init() }, index);
1543            }
1544        }
1545
1546        for i in 0..N / BATCH_SIZE / 2 {
1547            let slice = (0..BATCH_SIZE)
1548                .map(|j| i * BATCH_SIZE + j)
1549                .collect::<Vec<_>>();
1550
1551            unsafe {
1552                producer.push_many(&slice, &global_queue);
1553            }
1554
1555            assert!(global_queue.is_empty());
1556
1557            let mut slice = [MaybeUninit::uninit(); BATCH_SIZE];
1558            consumer.pop_many(slice.as_mut_slice());
1559
1560            for (j, item) in slice.iter().enumerate().take(BATCH_SIZE) {
1561                let index = i * BATCH_SIZE + j;
1562
1563                assert_eq!(unsafe { item.assume_init() }, index);
1564            }
1565        }
1566    }
1567
1568    #[test]
1569    fn test_spmc_lock_free_bounded_seq_insertions() {
1570        let global_queue = MutexVecQueue::new();
1571        let (producer, _) = new_cache_padded_bounded::<_, CAPACITY>();
1572
1573        for i in 0..CAPACITY * 100 {
1574            producer.lock_free_push(i, &global_queue).unwrap();
1575        }
1576
1577        let (new_producer, _) = new_bounded::<_, CAPACITY>();
1578
1579        global_queue
1580            .move_batch_to_producer(&new_producer, producer.capacity() - producer.len());
1581
1582        assert_eq!(
1583            producer.len() + new_producer.len() + global_queue.len(),
1584            CAPACITY * 100
1585        );
1586
1587        for _ in 0..producer.len() {
1588            producer.lock_free_pop().unwrap();
1589        }
1590
1591        for _ in 0..new_producer.len() {
1592            new_producer.lock_free_pop().unwrap();
1593        }
1594    }
1595
1596    #[test]
1597    fn test_spmc_lock_free_bounded_stealing() {
1598        const TRIES: usize = 10;
1599
1600        let global_queue = MutexVecQueue::new();
1601        let (producer1, consumer) = new_bounded::<_, CAPACITY>();
1602        let (producer2, _) = new_bounded::<_, CAPACITY>();
1603
1604        let mut stolen = VecDeque::new();
1605
1606        for _ in 0..TRIES * 2 {
1607            for i in 0..CAPACITY / 2 {
1608                producer1.lock_free_push(i, &global_queue).unwrap();
1609            }
1610
1611            assert!(!consumer.lock_free_steal_into(&producer2).1);
1612
1613            while let Ok(task) = producer2.lock_free_pop() {
1614                stolen.push_back(task);
1615            }
1616
1617            assert!(global_queue.is_empty());
1618        }
1619
1620        assert!(producer2.is_empty());
1621
1622        let mut count = 0;
1623
1624        while producer1.lock_free_pop().is_ok() {
1625            count += 1;
1626        }
1627
1628        assert_eq!(count + stolen.len() + global_queue.len(), CAPACITY * TRIES);
1629    }
1630
1631    #[test]
1632    fn test_spmc_lock_free_bounded_many() {
1633        const BATCH_SIZE: usize = 8;
1634        const N: usize = BATCH_SIZE * 100;
1635
1636        let global_queue = MutexVecQueue::new();
1637        let (producer, consumer) = new_bounded::<_, CAPACITY>();
1638
1639        for i in 0..N / BATCH_SIZE / 2 {
1640            let slice = (0..BATCH_SIZE)
1641                .map(|j| i * BATCH_SIZE + j)
1642                .collect::<Vec<_>>();
1643
1644            unsafe {
1645                producer.lock_free_maybe_push_many(&slice).unwrap();
1646            }
1647
1648            let mut slice = [MaybeUninit::uninit(); BATCH_SIZE];
1649
1650            assert_eq!(
1651                producer.lock_free_pop_many(slice.as_mut_slice()),
1652                (BATCH_SIZE, false)
1653            );
1654
1655            for (j, item) in slice.iter().enumerate().take(BATCH_SIZE) {
1656                let index = i * BATCH_SIZE + j;
1657
1658                assert_eq!(unsafe { item.assume_init() }, index);
1659            }
1660        }
1661
1662        for i in 0..N / BATCH_SIZE / 2 {
1663            let slice = (0..BATCH_SIZE)
1664                .map(|j| i * BATCH_SIZE + j)
1665                .collect::<Vec<_>>();
1666
1667            unsafe {
1668                producer
1669                    .lock_free_push_many(&slice, &global_queue)
1670                    .unwrap();
1671            }
1672
1673            assert!(global_queue.is_empty());
1674
1675            let mut slice = [MaybeUninit::uninit(); BATCH_SIZE];
1676
1677            assert_eq!(
1678                consumer.lock_free_pop_many(slice.as_mut_slice()),
1679                (BATCH_SIZE, false)
1680            );
1681
1682            for (j, item) in slice.iter().enumerate().take(BATCH_SIZE) {
1683                let index = i * BATCH_SIZE + j;
1684
1685                assert_eq!(unsafe { item.assume_init() }, index);
1686            }
1687        }
1688    }
1689}