Skip to main content

laminar_core/tpc/
spsc.rs

1//! # SPSC Queue
2//!
3//! Lock-free single-producer single-consumer bounded queue optimized for
4//! inter-core communication in thread-per-core architectures.
5//!
6//! ## Design
7//!
8//! - Cache-line padded head/tail indices prevent false sharing
9//! - Power-of-2 capacity for fast modulo via bitmask
10//! - Acquire/Release memory ordering for lock-free operation
11//! - Batch operations for throughput optimization
12//!
13//! ## Performance
14//!
15//! Target: < 50ns per push/pop operation.
16
17use std::cell::UnsafeCell;
18use std::mem::MaybeUninit;
19use std::sync::atomic::{AtomicUsize, Ordering};
20
21/// A wrapper that pads a value to a cache line boundary to prevent false sharing.
22///
23/// False sharing occurs when two threads access different data that happens to
24/// reside on the same cache line, causing unnecessary cache invalidations.
25///
26/// # Example
27///
28/// ```rust
29/// use laminar_core::tpc::CachePadded;
30/// use std::sync::atomic::AtomicUsize;
31///
32/// // Each counter gets its own cache line
33/// let counter1 = CachePadded::new(AtomicUsize::new(0));
34/// let counter2 = CachePadded::new(AtomicUsize::new(0));
35///
36/// // Access the inner value
37/// assert_eq!(counter1.load(std::sync::atomic::Ordering::Relaxed), 0);
38/// ```
39#[repr(C, align(64))]
40pub struct CachePadded<T> {
41    value: T,
42}
43
44// SAFETY: CachePadded is Send if T is Send
45#[allow(unsafe_code)]
46unsafe impl<T: Send> Send for CachePadded<T> {}
47
48// SAFETY: CachePadded is Sync if T is Sync
49#[allow(unsafe_code)]
50unsafe impl<T: Sync> Sync for CachePadded<T> {}
51
52impl<T> CachePadded<T> {
53    /// Creates a new cache-padded value.
54    #[must_use]
55    pub const fn new(value: T) -> Self {
56        Self { value }
57    }
58
59    /// Returns a reference to the inner value.
60    #[must_use]
61    pub const fn get(&self) -> &T {
62        &self.value
63    }
64
65    /// Returns a mutable reference to the inner value.
66    pub fn get_mut(&mut self) -> &mut T {
67        &mut self.value
68    }
69
70    /// Consumes the wrapper and returns the inner value.
71    #[must_use]
72    pub fn into_inner(self) -> T {
73        self.value
74    }
75}
76
77impl<T> std::ops::Deref for CachePadded<T> {
78    type Target = T;
79
80    fn deref(&self) -> &Self::Target {
81        &self.value
82    }
83}
84
85impl<T> std::ops::DerefMut for CachePadded<T> {
86    fn deref_mut(&mut self) -> &mut Self::Target {
87        &mut self.value
88    }
89}
90
91impl<T: Default> Default for CachePadded<T> {
92    fn default() -> Self {
93        Self::new(T::default())
94    }
95}
96
97impl<T: Clone> Clone for CachePadded<T> {
98    fn clone(&self) -> Self {
99        Self::new(self.value.clone())
100    }
101}
102
103impl<T: std::fmt::Debug> std::fmt::Debug for CachePadded<T> {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        f.debug_struct("CachePadded")
106            .field("value", &self.value)
107            .finish()
108    }
109}
110
111/// A lock-free single-producer single-consumer bounded queue.
112///
113/// This queue is designed for high-performance inter-core communication.
114/// It uses atomic operations with Acquire/Release ordering to ensure
115/// correct synchronization without locks.
116///
117/// # Safety
118///
119/// This queue is only safe when there is exactly one producer thread and
120/// one consumer thread. Multiple producers or consumers will cause data races.
121///
122/// # Example
123///
124/// ```rust
125/// use laminar_core::tpc::SpscQueue;
126///
127/// let queue: SpscQueue<i32> = SpscQueue::new(1024);
128///
129/// // Producer
130/// assert!(queue.push(42).is_ok());
131///
132/// // Consumer
133/// assert_eq!(queue.pop(), Some(42));
134/// ```
135pub struct SpscQueue<T> {
136    /// Ring buffer storage
137    buffer: Box<[UnsafeCell<MaybeUninit<T>>]>,
138    /// Head index (consumer reads from here)
139    head: CachePadded<AtomicUsize>,
140    /// Tail index (producer writes here)
141    tail: CachePadded<AtomicUsize>,
142    /// Capacity mask for fast modulo (capacity - 1)
143    capacity_mask: usize,
144}
145
146// SAFETY: SpscQueue can be sent between threads as long as T is Send
147#[allow(unsafe_code)]
148unsafe impl<T: Send> Send for SpscQueue<T> {}
149
150// SAFETY: SpscQueue can be shared between threads (one producer, one consumer)
151// as long as T is Send. The atomic operations ensure correct synchronization.
152#[allow(unsafe_code)]
153unsafe impl<T: Send> Sync for SpscQueue<T> {}
154
155impl<T> SpscQueue<T> {
156    /// Creates a new SPSC queue with the given capacity.
157    ///
158    /// The capacity will be rounded up to the next power of 2 for efficiency.
159    ///
160    /// # Panics
161    ///
162    /// Panics if capacity is 0 or would overflow when rounded to power of 2.
163    #[must_use]
164    pub fn new(capacity: usize) -> Self {
165        assert!(capacity > 0, "capacity must be > 0");
166
167        // Round up to next power of 2
168        let capacity = capacity.next_power_of_two();
169
170        // Allocate the buffer
171        let buffer: Vec<UnsafeCell<MaybeUninit<T>>> = (0..capacity)
172            .map(|_| UnsafeCell::new(MaybeUninit::uninit()))
173            .collect();
174
175        Self {
176            buffer: buffer.into_boxed_slice(),
177            head: CachePadded::new(AtomicUsize::new(0)),
178            tail: CachePadded::new(AtomicUsize::new(0)),
179            capacity_mask: capacity - 1,
180        }
181    }
182
183    /// Returns the capacity of the queue.
184    #[must_use]
185    pub fn capacity(&self) -> usize {
186        self.capacity_mask + 1
187    }
188
189    /// Returns true if the queue is empty.
190    ///
191    /// Note: This is a snapshot and may change immediately after returning.
192    #[must_use]
193    pub fn is_empty(&self) -> bool {
194        let head = self.head.load(Ordering::Acquire);
195        let tail = self.tail.load(Ordering::Acquire);
196        head == tail
197    }
198
199    /// Returns true if the queue is full.
200    ///
201    /// Note: This is a snapshot and may change immediately after returning.
202    #[must_use]
203    pub fn is_full(&self) -> bool {
204        let head = self.head.load(Ordering::Acquire);
205        let tail = self.tail.load(Ordering::Acquire);
206        self.next_index(tail) == head
207    }
208
209    /// Returns the current number of items in the queue.
210    ///
211    /// Note: This is a snapshot and may change immediately after returning.
212    #[must_use]
213    pub fn len(&self) -> usize {
214        let head = self.head.load(Ordering::Acquire);
215        let tail = self.tail.load(Ordering::Acquire);
216        tail.wrapping_sub(head) & self.capacity_mask
217    }
218
219    /// Push an item to the queue.
220    ///
221    /// Returns `Ok(())` if successful, or `Err(item)` if the queue is full.
222    ///
223    /// # Errors
224    ///
225    /// Returns the item back if the queue is full.
226    ///
227    /// # Safety
228    ///
229    /// This method must only be called by the single producer thread.
230    pub fn push(&self, item: T) -> Result<(), T> {
231        let tail = self.tail.load(Ordering::Relaxed);
232        let next_tail = self.next_index(tail);
233
234        // Check if queue is full
235        if next_tail == self.head.load(Ordering::Acquire) {
236            return Err(item);
237        }
238
239        // SAFETY: We have exclusive write access to this slot because:
240        // 1. We are the only producer
241        // 2. The consumer only reads slots where head < tail
242        // 3. We haven't published this slot yet (tail not updated)
243        #[allow(unsafe_code)]
244        unsafe {
245            (*self.buffer[tail].get()).write(item);
246        }
247
248        // Publish the item by updating tail
249        self.tail.store(next_tail, Ordering::Release);
250
251        Ok(())
252    }
253
254    /// Pop an item from the queue.
255    ///
256    /// Returns `Some(item)` if successful, or `None` if the queue is empty.
257    ///
258    /// # Safety
259    ///
260    /// This method must only be called by the single consumer thread.
261    pub fn pop(&self) -> Option<T> {
262        let head = self.head.load(Ordering::Relaxed);
263
264        // Check if queue is empty
265        if head == self.tail.load(Ordering::Acquire) {
266            return None;
267        }
268
269        // SAFETY: We have exclusive read access to this slot because:
270        // 1. We are the only consumer
271        // 2. The producer only writes to slots where tail > head
272        // 3. This slot has been published (we checked tail > head)
273        #[allow(unsafe_code)]
274        let item = unsafe { (*self.buffer[head].get()).assume_init_read() };
275
276        // Consume the item by updating head
277        self.head.store(self.next_index(head), Ordering::Release);
278
279        Some(item)
280    }
281
282    /// Push multiple items to the queue.
283    ///
284    /// Returns the number of items successfully pushed. Items are pushed
285    /// in order, stopping at the first failure.
286    ///
287    /// # Safety
288    ///
289    /// This method must only be called by the single producer thread.
290    pub fn push_batch(&self, items: impl IntoIterator<Item = T>) -> usize {
291        let mut count = 0;
292        for item in items {
293            if self.push(item).is_err() {
294                break;
295            }
296            count += 1;
297        }
298        count
299    }
300
301    /// Pop multiple items from the queue.
302    ///
303    /// Returns a vector of up to `max_count` items.
304    ///
305    /// # Safety
306    ///
307    /// This method must only be called by the single consumer thread.
308    ///
309    /// # Note
310    ///
311    /// This method allocates memory. For zero-allocation polling, use
312    /// [`pop_batch_into`](Self::pop_batch_into) or [`pop_each`](Self::pop_each) instead.
313    pub fn pop_batch(&self, max_count: usize) -> Vec<T> {
314        let mut items = Vec::with_capacity(max_count.min(self.len()));
315        for _ in 0..max_count {
316            if let Some(item) = self.pop() {
317                items.push(item);
318            } else {
319                break;
320            }
321        }
322        items
323    }
324
325    /// Pop multiple items into a caller-provided buffer (zero-allocation).
326    ///
327    /// Items are written to `buffer` starting at index 0. Returns the number
328    /// of items actually popped (0 if queue empty or buffer full).
329    ///
330    /// # Safety
331    ///
332    /// This method must only be called by the single consumer thread.
333    ///
334    /// After this method returns `n`, the first `n` elements of `buffer`
335    /// are initialized and can be safely read with `assume_init_read()`.
336    ///
337    /// # Example
338    ///
339    /// ```rust
340    /// use laminar_core::tpc::SpscQueue;
341    /// use std::mem::MaybeUninit;
342    ///
343    /// let queue: SpscQueue<i32> = SpscQueue::new(16);
344    /// queue.push(1).unwrap();
345    /// queue.push(2).unwrap();
346    ///
347    /// let mut buffer: [MaybeUninit<i32>; 8] = [MaybeUninit::uninit(); 8];
348    /// let count = queue.pop_batch_into(&mut buffer);
349    ///
350    /// assert_eq!(count, 2);
351    /// // SAFETY: We just initialized these elements
352    /// unsafe {
353    ///     assert_eq!(buffer[0].assume_init(), 1);
354    ///     assert_eq!(buffer[1].assume_init(), 2);
355    /// }
356    /// ```
357    #[inline]
358    pub fn pop_batch_into(&self, buffer: &mut [MaybeUninit<T>]) -> usize {
359        if buffer.is_empty() {
360            return 0;
361        }
362
363        let mut current_head = self.head.load(Ordering::Relaxed);
364        let tail = self.tail.load(Ordering::Acquire);
365
366        // Calculate available items (handle wrap-around)
367        let available = if tail >= current_head {
368            tail - current_head
369        } else {
370            (self.capacity_mask + 1) - current_head + tail
371        };
372
373        let count = available.min(buffer.len());
374
375        if count == 0 {
376            return 0;
377        }
378
379        // Copy items to buffer
380        for slot in buffer.iter_mut().take(count) {
381            // SAFETY: We have exclusive read access to slots between head and tail.
382            // The producer only writes to slots where tail > head, and we've verified
383            // that these slots contain valid data by checking tail.
384            #[allow(unsafe_code)]
385            unsafe {
386                let src = (*self.buffer[current_head].get()).assume_init_read();
387                slot.write(src);
388            }
389
390            // Advance head with wrap-around
391            current_head = self.next_index(current_head);
392        }
393
394        // Update head to release slots
395        self.head.store(current_head, Ordering::Release);
396
397        count
398    }
399
400    /// Pop items and call a callback for each one (zero-allocation).
401    ///
402    /// Processing stops when either:
403    /// - `max_count` items have been processed
404    /// - The queue becomes empty
405    /// - The callback returns `false`
406    ///
407    /// Returns the number of items processed.
408    ///
409    /// # Safety
410    ///
411    /// This method must only be called by the single consumer thread.
412    ///
413    /// # Example
414    ///
415    /// ```rust
416    /// use laminar_core::tpc::SpscQueue;
417    ///
418    /// let queue: SpscQueue<i32> = SpscQueue::new(16);
419    /// queue.push(1).unwrap();
420    /// queue.push(2).unwrap();
421    /// queue.push(3).unwrap();
422    ///
423    /// let mut sum = 0;
424    /// let count = queue.pop_each(10, |item| {
425    ///     sum += item;
426    ///     true // Continue processing
427    /// });
428    ///
429    /// assert_eq!(count, 3);
430    /// assert_eq!(sum, 6);
431    /// ```
432    #[inline]
433    pub fn pop_each<F>(&self, max_count: usize, mut f: F) -> usize
434    where
435        F: FnMut(T) -> bool,
436    {
437        if max_count == 0 {
438            return 0;
439        }
440
441        let mut current_head = self.head.load(Ordering::Relaxed);
442        let tail = self.tail.load(Ordering::Acquire);
443
444        // Calculate available items (handle wrap-around)
445        let available = if tail >= current_head {
446            tail - current_head
447        } else {
448            (self.capacity_mask + 1) - current_head + tail
449        };
450
451        let to_pop = available.min(max_count);
452
453        if to_pop == 0 {
454            return 0;
455        }
456
457        let mut popped = 0;
458        for _ in 0..to_pop {
459            // SAFETY: We have exclusive read access to slots between head and tail.
460            #[allow(unsafe_code)]
461            let item = unsafe { (*self.buffer[current_head].get()).assume_init_read() };
462
463            popped += 1;
464
465            // Advance head with wrap-around
466            current_head = self.next_index(current_head);
467
468            // Call the callback; stop if it returns false
469            if !f(item) {
470                break;
471            }
472        }
473
474        // Update head to release processed slots
475        if popped > 0 {
476            self.head.store(current_head, Ordering::Release);
477        }
478
479        popped
480    }
481
482    /// Peek at the next item without removing it.
483    ///
484    /// Returns `None` if the queue is empty.
485    ///
486    /// # Safety
487    ///
488    /// This method must only be called by the single consumer thread.
489    pub fn peek(&self) -> Option<&T> {
490        let head = self.head.load(Ordering::Relaxed);
491
492        if head == self.tail.load(Ordering::Acquire) {
493            return None;
494        }
495
496        // SAFETY: Same reasoning as pop() - we have exclusive read access
497        #[allow(unsafe_code)]
498        unsafe {
499            Some((*self.buffer[head].get()).assume_init_ref())
500        }
501    }
502
503    /// Calculate the next index with wrap-around.
504    #[inline]
505    const fn next_index(&self, index: usize) -> usize {
506        (index + 1) & self.capacity_mask
507    }
508}
509
510impl<T> Drop for SpscQueue<T> {
511    fn drop(&mut self) {
512        // Drop any remaining items in the queue
513        while self.pop().is_some() {}
514    }
515}
516
517impl<T: std::fmt::Debug> std::fmt::Debug for SpscQueue<T> {
518    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
519        f.debug_struct("SpscQueue")
520            .field("capacity", &self.capacity())
521            .field("len", &self.len())
522            .finish()
523    }
524}
525
526#[cfg(test)]
527mod tests {
528    use super::*;
529    use std::sync::Arc;
530    use std::thread;
531
532    #[test]
533    fn test_cache_padded_size() {
534        // Verify CachePadded provides proper alignment (64 bytes = cache line)
535        assert!(std::mem::align_of::<CachePadded<AtomicUsize>>() == 64);
536    }
537
538    #[test]
539    fn test_cache_padded_operations() {
540        let padded = CachePadded::new(42u32);
541        assert_eq!(*padded, 42);
542        assert_eq!(*padded.get(), 42);
543
544        let mut padded = CachePadded::new(42u32);
545        *padded.get_mut() = 100;
546        assert_eq!(*padded, 100);
547
548        let inner = padded.into_inner();
549        assert_eq!(inner, 100);
550    }
551
552    #[test]
553    fn test_cache_padded_default() {
554        let padded: CachePadded<u32> = CachePadded::default();
555        assert_eq!(*padded, 0);
556    }
557
558    #[test]
559    fn test_cache_padded_clone() {
560        let padded = CachePadded::new(42u32);
561        let cloned = padded.clone();
562        assert_eq!(*cloned, 42);
563    }
564
565    #[test]
566    fn test_new_queue() {
567        let queue: SpscQueue<i32> = SpscQueue::new(100);
568        // Should round up to 128
569        assert_eq!(queue.capacity(), 128);
570        assert!(queue.is_empty());
571        assert!(!queue.is_full());
572        assert_eq!(queue.len(), 0);
573    }
574
575    #[test]
576    fn test_push_pop() {
577        let queue: SpscQueue<i32> = SpscQueue::new(4);
578
579        assert!(queue.push(1).is_ok());
580        assert!(queue.push(2).is_ok());
581        assert!(queue.push(3).is_ok());
582        // Queue of capacity 4 can only hold 3 items (one slot reserved)
583        assert!(queue.is_full());
584        assert!(queue.push(4).is_err());
585
586        assert_eq!(queue.pop(), Some(1));
587        assert_eq!(queue.pop(), Some(2));
588        assert_eq!(queue.pop(), Some(3));
589        assert_eq!(queue.pop(), None);
590        assert!(queue.is_empty());
591    }
592
593    #[test]
594    fn test_fifo_order() {
595        let queue: SpscQueue<i32> = SpscQueue::new(16);
596
597        for i in 0..10 {
598            assert!(queue.push(i).is_ok());
599        }
600
601        for i in 0..10 {
602            assert_eq!(queue.pop(), Some(i));
603        }
604    }
605
606    #[test]
607    fn test_wrap_around() {
608        let queue: SpscQueue<i32> = SpscQueue::new(4);
609
610        // Fill and empty multiple times to test wrap-around
611        for iteration in 0..5 {
612            for i in 0..3 {
613                assert!(queue.push(iteration * 10 + i).is_ok());
614            }
615            for i in 0..3 {
616                assert_eq!(queue.pop(), Some(iteration * 10 + i));
617            }
618        }
619    }
620
621    #[test]
622    fn test_peek() {
623        let queue: SpscQueue<i32> = SpscQueue::new(4);
624
625        assert!(queue.peek().is_none());
626
627        queue.push(42).unwrap();
628        assert_eq!(queue.peek(), Some(&42));
629        assert_eq!(queue.peek(), Some(&42)); // Still there
630
631        assert_eq!(queue.pop(), Some(42));
632        assert!(queue.peek().is_none());
633    }
634
635    #[test]
636    fn test_push_batch() {
637        let queue: SpscQueue<i32> = SpscQueue::new(8);
638
639        let pushed = queue.push_batch(vec![1, 2, 3, 4, 5]);
640        assert_eq!(pushed, 5);
641        assert_eq!(queue.len(), 5);
642
643        // Try to push more than capacity
644        let pushed = queue.push_batch(vec![6, 7, 8, 9, 10]);
645        assert_eq!(pushed, 2); // Only 2 more fit (7 slots max, 5 used)
646    }
647
648    #[test]
649    fn test_pop_batch() {
650        let queue: SpscQueue<i32> = SpscQueue::new(8);
651
652        queue.push_batch(vec![1, 2, 3, 4, 5]);
653
654        let items = queue.pop_batch(3);
655        assert_eq!(items, vec![1, 2, 3]);
656        assert_eq!(queue.len(), 2);
657
658        let items = queue.pop_batch(10); // Request more than available
659        assert_eq!(items, vec![4, 5]);
660        assert!(queue.is_empty());
661    }
662
663    #[test]
664    fn test_concurrent_producer_consumer() {
665        const ITEMS: i32 = 10_000;
666        let queue = Arc::new(SpscQueue::<i32>::new(1024));
667        let queue_producer = Arc::clone(&queue);
668        let queue_consumer = Arc::clone(&queue);
669
670        // Producer thread
671        let producer = thread::spawn(move || {
672            for i in 0..ITEMS {
673                while queue_producer.push(i).is_err() {
674                    thread::yield_now();
675                }
676            }
677        });
678
679        // Consumer thread
680        let consumer = thread::spawn(move || {
681            let mut received = Vec::with_capacity(ITEMS as usize);
682            while received.len() < ITEMS as usize {
683                if let Some(item) = queue_consumer.pop() {
684                    received.push(item);
685                } else {
686                    thread::yield_now();
687                }
688            }
689            received
690        });
691
692        producer.join().unwrap();
693        let received = consumer.join().unwrap();
694
695        // Verify all items received in order
696        assert_eq!(received.len(), ITEMS as usize);
697        for (i, &item) in received.iter().enumerate() {
698            assert_eq!(
699                item,
700                i32::try_from(i).unwrap(),
701                "Item out of order at index {i}"
702            );
703        }
704    }
705
706    #[derive(Debug)]
707    struct DropCounter(Arc<AtomicUsize>);
708
709    impl Drop for DropCounter {
710        fn drop(&mut self) {
711            self.0.fetch_add(1, Ordering::SeqCst);
712        }
713    }
714
715    #[test]
716    fn test_drop() {
717        use std::sync::atomic::AtomicUsize;
718        use std::sync::Arc;
719
720        let drop_count = Arc::new(AtomicUsize::new(0));
721
722        {
723            let queue: SpscQueue<DropCounter> = SpscQueue::new(8);
724            for _ in 0..5 {
725                queue.push(DropCounter(Arc::clone(&drop_count))).unwrap();
726            }
727            // Pop 2 items
728            queue.pop();
729            queue.pop();
730            // Queue drops with 3 items remaining
731        }
732
733        // All 5 items should be dropped (2 popped + 3 on drop)
734        assert_eq!(drop_count.load(Ordering::SeqCst), 5);
735    }
736
737    #[test]
738    fn test_debug() {
739        let queue: SpscQueue<i32> = SpscQueue::new(8);
740        queue.push(1).unwrap();
741        queue.push(2).unwrap();
742
743        let debug_str = format!("{queue:?}");
744        assert!(debug_str.contains("SpscQueue"));
745        assert!(debug_str.contains("capacity"));
746        assert!(debug_str.contains("len"));
747    }
748
749    #[test]
750    #[should_panic(expected = "capacity must be > 0")]
751    fn test_zero_capacity_panics() {
752        let _: SpscQueue<i32> = SpscQueue::new(0);
753    }
754
755    #[test]
756    fn test_pop_batch_into() {
757        let queue: SpscQueue<i32> = SpscQueue::new(16);
758
759        // Push some items
760        queue.push(1).unwrap();
761        queue.push(2).unwrap();
762        queue.push(3).unwrap();
763
764        // Pop into buffer
765        let mut buffer: [MaybeUninit<i32>; 8] = [MaybeUninit::uninit(); 8];
766        let count = queue.pop_batch_into(&mut buffer);
767
768        assert_eq!(count, 3);
769
770        // SAFETY: We just initialized these elements
771        #[allow(unsafe_code)]
772        unsafe {
773            assert_eq!(buffer[0].assume_init(), 1);
774            assert_eq!(buffer[1].assume_init(), 2);
775            assert_eq!(buffer[2].assume_init(), 3);
776        }
777
778        assert!(queue.is_empty());
779    }
780
781    #[test]
782    fn test_pop_batch_into_partial() {
783        let queue: SpscQueue<i32> = SpscQueue::new(16);
784
785        // Push 5 items
786        for i in 0..5 {
787            queue.push(i).unwrap();
788        }
789
790        // Pop only 3 (buffer smaller than available)
791        let mut buffer: [MaybeUninit<i32>; 3] = [MaybeUninit::uninit(); 3];
792        let count = queue.pop_batch_into(&mut buffer);
793
794        assert_eq!(count, 3);
795        assert_eq!(queue.len(), 2); // 2 items remaining
796
797        // SAFETY: We just initialized these elements
798        #[allow(unsafe_code)]
799        unsafe {
800            assert_eq!(buffer[0].assume_init(), 0);
801            assert_eq!(buffer[1].assume_init(), 1);
802            assert_eq!(buffer[2].assume_init(), 2);
803        }
804    }
805
806    #[test]
807    fn test_pop_batch_into_empty() {
808        let queue: SpscQueue<i32> = SpscQueue::new(16);
809
810        let mut buffer: [MaybeUninit<i32>; 8] = [MaybeUninit::uninit(); 8];
811        let count = queue.pop_batch_into(&mut buffer);
812
813        assert_eq!(count, 0);
814    }
815
816    #[test]
817    fn test_pop_batch_into_empty_buffer() {
818        let queue: SpscQueue<i32> = SpscQueue::new(16);
819        queue.push(1).unwrap();
820
821        let mut buffer: [MaybeUninit<i32>; 0] = [];
822        let count = queue.pop_batch_into(&mut buffer);
823
824        assert_eq!(count, 0);
825        assert_eq!(queue.len(), 1); // Item still in queue
826    }
827
828    #[test]
829    fn test_pop_each() {
830        let queue: SpscQueue<i32> = SpscQueue::new(16);
831
832        queue.push(1).unwrap();
833        queue.push(2).unwrap();
834        queue.push(3).unwrap();
835
836        let mut sum = 0;
837        let count = queue.pop_each(10, |item| {
838            sum += item;
839            true
840        });
841
842        assert_eq!(count, 3);
843        assert_eq!(sum, 6);
844        assert!(queue.is_empty());
845    }
846
847    #[test]
848    fn test_pop_each_early_stop() {
849        let queue: SpscQueue<i32> = SpscQueue::new(16);
850
851        queue.push(1).unwrap();
852        queue.push(2).unwrap();
853        queue.push(3).unwrap();
854        queue.push(4).unwrap();
855        queue.push(5).unwrap();
856
857        let mut items = Vec::new();
858        let count = queue.pop_each(10, |item| {
859            items.push(item);
860            item < 3 // Stop after item 3
861        });
862
863        assert_eq!(count, 3); // Processed 1, 2, 3
864        assert_eq!(items, vec![1, 2, 3]);
865        assert_eq!(queue.len(), 2); // 4, 5 remaining
866    }
867
868    #[test]
869    fn test_pop_each_max_count() {
870        let queue: SpscQueue<i32> = SpscQueue::new(16);
871
872        for i in 0..10 {
873            queue.push(i).unwrap();
874        }
875
876        let mut count_processed = 0;
877        let count = queue.pop_each(5, |_| {
878            count_processed += 1;
879            true
880        });
881
882        assert_eq!(count, 5);
883        assert_eq!(count_processed, 5);
884        assert_eq!(queue.len(), 5); // 5 remaining
885    }
886
887    #[test]
888    fn test_pop_each_empty() {
889        let queue: SpscQueue<i32> = SpscQueue::new(16);
890
891        let mut called = false;
892        let count = queue.pop_each(10, |_| {
893            called = true;
894            true
895        });
896
897        assert_eq!(count, 0);
898        assert!(!called);
899    }
900
901    #[test]
902    fn test_pop_each_zero_max() {
903        let queue: SpscQueue<i32> = SpscQueue::new(16);
904        queue.push(1).unwrap();
905
906        let count = queue.pop_each(0, |_| true);
907
908        assert_eq!(count, 0);
909        assert_eq!(queue.len(), 1); // Item still in queue
910    }
911
912    #[test]
913    fn test_pop_batch_into_wrap_around() {
914        let queue: SpscQueue<i32> = SpscQueue::new(4); // Capacity 4
915
916        // Fill and empty to advance indices
917        for _ in 0..3 {
918            for i in 0..3 {
919                queue.push(i).unwrap();
920            }
921            for _ in 0..3 {
922                queue.pop();
923            }
924        }
925
926        // Now indices are wrapped, push new items
927        queue.push(10).unwrap();
928        queue.push(11).unwrap();
929
930        let mut buffer: [MaybeUninit<i32>; 4] = [MaybeUninit::uninit(); 4];
931        let count = queue.pop_batch_into(&mut buffer);
932
933        assert_eq!(count, 2);
934
935        #[allow(unsafe_code)]
936        unsafe {
937            assert_eq!(buffer[0].assume_init(), 10);
938            assert_eq!(buffer[1].assume_init(), 11);
939        }
940    }
941
942    #[test]
943    fn test_pop_each_wrap_around() {
944        let queue: SpscQueue<i32> = SpscQueue::new(4);
945
946        // Fill and empty to advance indices
947        for _ in 0..3 {
948            for i in 0..3 {
949                queue.push(i).unwrap();
950            }
951            let _ = queue.pop_batch(3);
952        }
953
954        // Now push with wrapped indices
955        queue.push(100).unwrap();
956        queue.push(200).unwrap();
957
958        let mut items = Vec::new();
959        queue.pop_each(10, |item| {
960            items.push(item);
961            true
962        });
963
964        assert_eq!(items, vec![100, 200]);
965    }
966}