smallring/
spsc.rs

1use super::core::RingBufCore;
2use crate::shim::atomic::Ordering;
3use crate::shim::sync::Arc;
4use std::fmt;
5use std::num::NonZero;
6
7/// Ring buffer error for push operations
8///
9/// push 操作的环形缓冲区错误
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum PushError<T> {
12    /// Buffer is full
13    ///
14    /// 缓冲区已满
15    Full(T),
16}
17
18/// Ring buffer error for pop operations
19///
20/// pop 操作的环形缓冲区错误
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum PopError {
23    /// Buffer is empty
24    ///
25    /// 缓冲区为空
26    Empty,
27}
28
29/// Shared data between producer and consumer
30///
31/// 生产者和消费者之间的共享数据
32///
33/// # Type Parameters
34/// - `T`: Element type
35/// - `N`: Stack capacity threshold (elements stored on stack when capacity ≤ N)
36///
37/// # 类型参数
38/// - `T`: 元素类型
39/// - `N`: 栈容量阈值(当容量 ≤ N 时元素存储在栈上)
40pub struct SharedData<T, const N: usize> {
41    /// Core ring buffer implementation
42    ///
43    /// 核心环形缓冲区实现
44    core: RingBufCore<T, N>,
45}
46
47/// Producer half of the ring buffer
48///
49/// 环形缓冲区的生产者端
50///
51/// # Type Parameters
52/// - `T`: Element type
53/// - `N`: Stack capacity threshold
54///
55/// # 类型参数
56/// - `T`: 元素类型
57/// - `N`: 栈容量阈值
58pub struct Producer<T, const N: usize> {
59    /// Shared data
60    ///
61    /// 共享数据
62    shared: Arc<SharedData<T, N>>,
63
64    /// Cached read index for performance (avoid reading atomic repeatedly)
65    ///
66    /// 缓存的读索引以提升性能(避免重复读取原子变量)
67    cached_read: usize,
68}
69
70/// Consumer half of the ring buffer
71///
72/// 环形缓冲区的消费者端
73///
74/// # Type Parameters
75/// - `T`: Element type
76/// - `N`: Stack capacity threshold
77///
78/// # 类型参数
79/// - `T`: 元素类型
80/// - `N`: 栈容量阈值
81pub struct Consumer<T, const N: usize> {
82    /// Shared data
83    ///
84    /// 共享数据
85    shared: Arc<SharedData<T, N>>,
86
87    /// Cached write index for performance (avoid reading atomic repeatedly)
88    ///
89    /// 缓存的写索引以提升性能(避免重复读取原子变量)
90    cached_write: usize,
91}
92
93/// Draining iterator for the ring buffer
94///
95/// 环形缓冲区的消费迭代器
96///
97/// This iterator removes and returns elements from the buffer until it's empty.
98///
99/// 此迭代器从缓冲区中移除并返回元素,直到缓冲区为空。
100///
101/// # Type Parameters
102/// - `T`: Element type
103/// - `N`: Stack capacity threshold
104///
105/// # 类型参数
106/// - `T`: 元素类型
107/// - `N`: 栈容量阈值
108pub struct Drain<'a, T, const N: usize> {
109    consumer: &'a mut Consumer<T, N>,
110}
111
112impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
113    type Item = T;
114
115    #[inline]
116    fn next(&mut self) -> Option<Self::Item> {
117        self.consumer.pop().ok()
118    }
119
120    #[inline]
121    fn size_hint(&self) -> (usize, Option<usize>) {
122        let len = self.consumer.slots();
123        (len, Some(len))
124    }
125}
126
127impl<T, const N: usize> SharedData<T, N> {
128    /// Get the capacity of the buffer
129    ///
130    /// 获取缓冲区容量
131    #[inline]
132    pub fn capacity(&self) -> usize {
133        self.core.capacity()
134    }
135}
136
137/// Create a new ring buffer with the specified capacity
138///
139/// 创建指定容量的新环形缓冲区
140///
141/// # Type Parameters
142/// - `T`: Element type
143/// - `N`: Stack capacity threshold (use stack storage when capacity ≤ N, heap otherwise)
144///
145/// # Parameters
146/// - `capacity`: Desired capacity (will be rounded up to next power of 2)
147///
148/// # Returns
149/// A tuple of (Producer, Consumer)
150///
151/// # 类型参数
152/// - `T`: 元素类型
153/// - `N`: 栈容量阈值(当容量 ≤ N 时使用栈存储,否则使用堆)
154///
155/// # 参数
156/// - `capacity`: 期望容量(将向上取整到下一个 2 的幂次)
157///
158/// # 返回值
159/// 返回 (Producer, Consumer) 元组
160pub fn new<T, const N: usize>(capacity: NonZero<usize>) -> (Producer<T, N>, Consumer<T, N>) {
161    let core = RingBufCore::new(capacity.get());
162
163    let shared = Arc::new(SharedData { core });
164
165    let producer = Producer {
166        shared: shared.clone(),
167        cached_read: 0,
168    };
169
170    let consumer = Consumer {
171        shared,
172        cached_write: 0,
173    };
174
175    (producer, consumer)
176}
177
178impl<T: fmt::Debug, const N: usize> fmt::Debug for Producer<T, N> {
179    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180        f.debug_struct("Producer")
181            .field("capacity", &self.shared.core.capacity())
182            .field("slots", &self.slots())
183            .field("free_slots", &self.free_slots())
184            .field("is_empty", &self.is_empty())
185            .field("is_full", &self.is_full())
186            .finish()
187    }
188}
189
190impl<T, const N: usize> Producer<T, N> {
191    /// Get the capacity of the buffer
192    ///
193    /// 获取缓冲区容量
194    #[inline]
195    pub fn capacity(&self) -> usize {
196        self.shared.core.capacity()
197    }
198
199    /// Get the number of elements currently in the buffer
200    ///
201    /// 获取缓冲区中当前的元素数量
202    #[inline]
203    pub fn slots(&self) -> usize {
204        let write = self.shared.core.write_idx().load(Ordering::Relaxed);
205        let read = self.shared.core.read_idx().load(Ordering::Acquire);
206        write.wrapping_sub(read)
207    }
208
209    /// Get the number of elements currently in the buffer (alias for `slots`)
210    ///
211    /// 获取缓冲区中当前的元素数量(`slots` 的别名)
212    #[inline]
213    pub fn len(&self) -> usize {
214        self.slots()
215    }
216
217    /// Check if the buffer is empty
218    ///
219    /// 检查缓冲区是否为空
220    #[inline]
221    pub fn is_empty(&self) -> bool {
222        let write = self.shared.core.write_idx().load(Ordering::Relaxed);
223        let read = self.shared.core.read_idx().load(Ordering::Acquire);
224        write == read
225    }
226
227    /// Get the number of free slots in the buffer
228    ///
229    /// 获取缓冲区中的空闲空间数量
230    #[inline]
231    pub fn free_slots(&self) -> usize {
232        self.shared.core.capacity() - self.slots()
233    }
234
235    /// Check if the buffer is full
236    ///
237    /// 检查缓冲区是否已满
238    #[inline]
239    pub fn is_full(&self) -> bool {
240        let write = self.shared.core.write_idx().load(Ordering::Relaxed);
241        let read = self.shared.core.read_idx().load(Ordering::Acquire);
242        write.wrapping_sub(read) >= self.shared.core.capacity()
243    }
244
245    /// Push a value into the buffer
246    ///
247    /// 向缓冲区推送一个值
248    ///
249    /// # Errors
250    /// Returns `PushError::Full` if the buffer is full
251    ///
252    /// # 错误
253    /// 如果缓冲区满则返回 `PushError::Full`
254    #[inline]
255    pub fn push(&mut self, value: T) -> Result<(), PushError<T>> {
256        let write = self.shared.core.write_idx().load(Ordering::Relaxed);
257        let mut read = self.cached_read;
258
259        // Check if buffer is full
260        // 检查缓冲区是否已满
261        if write.wrapping_sub(read) >= self.shared.core.capacity() {
262            // Update cached read index from consumer
263            // 从消费者更新缓存的读索引
264            read = self.shared.core.read_idx().load(Ordering::Acquire);
265            self.cached_read = read;
266
267            if write.wrapping_sub(read) >= self.shared.core.capacity() {
268                return Err(PushError::Full(value));
269            }
270        }
271
272        // Write value to buffer
273        // 将值写入缓冲区
274        let index = write & self.shared.core.mask();
275        unsafe {
276            self.shared.core.write_at(index, value);
277        }
278
279        // Update write index with Release ordering to ensure visibility
280        // 使用 Release 顺序更新写索引以确保可见性
281        self.shared
282            .core
283            .write_idx()
284            .store(write.wrapping_add(1), Ordering::Release);
285
286        Ok(())
287    }
288}
289
290impl<T: Copy, const N: usize> Producer<T, N> {
291    /// Push multiple values from a slice into the buffer
292    ///
293    /// 将切片中的多个值批量推送到缓冲区
294    ///
295    /// This method attempts to push as many elements as possible from the slice.
296    /// It returns the number of elements successfully pushed.
297    ///
298    /// 此方法尝试从切片中推送尽可能多的元素。
299    /// 返回成功推送的元素数量。
300    ///
301    /// # Parameters
302    /// - `values`: Slice of values to push
303    ///
304    /// # Returns
305    /// Number of elements successfully pushed (0 to values.len())
306    ///
307    /// # 参数
308    /// - `values`: 要推送的值的切片
309    ///
310    /// # 返回值
311    /// 成功推送的元素数量(0 到 values.len())
312    ///
313    /// # Performance
314    /// This method uses optimized memory copy operations for better performance
315    /// than pushing elements one by one.
316    ///
317    /// # 性能
318    /// 此方法使用优化的内存拷贝操作,性能优于逐个推送元素。
319    #[inline]
320    pub fn push_slice(&mut self, values: &[T]) -> usize {
321        if values.is_empty() {
322            return 0;
323        }
324
325        let write = self.shared.core.write_idx().load(Ordering::Relaxed);
326        let mut read = self.cached_read;
327
328        // Calculate available space
329        // 计算可用空间
330        let mut available = self
331            .shared
332            .core
333            .capacity()
334            .saturating_sub(write.wrapping_sub(read));
335
336        if available == 0 {
337            // Update cached read index
338            // 更新缓存的读索引
339            read = self.shared.core.read_idx().load(Ordering::Acquire);
340            self.cached_read = read;
341            available = self
342                .shared
343                .core
344                .capacity()
345                .saturating_sub(write.wrapping_sub(read));
346
347            if available == 0 {
348                return 0;
349            }
350        }
351
352        // Determine how many elements we can push
353        // 确定我们可以推送多少元素
354        let to_push = available.min(values.len());
355
356        // Use core's batch copy functionality
357        // 使用核心模块的批量拷贝功能
358        unsafe {
359            self.shared.core.copy_from_slice(write, values, to_push);
360        }
361
362        // Update write index with Release ordering
363        // 使用 Release 顺序更新写索引
364        self.shared
365            .core
366            .write_idx()
367            .store(write.wrapping_add(to_push), Ordering::Release);
368
369        to_push
370    }
371}
372
373impl<T: fmt::Debug, const N: usize> fmt::Debug for Consumer<T, N> {
374    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
375        f.debug_struct("Consumer")
376            .field("capacity", &self.shared.core.capacity())
377            .field("slots", &self.slots())
378            .field("is_empty", &self.is_empty())
379            .finish()
380    }
381}
382
383impl<T, const N: usize> Consumer<T, N> {
384    /// Pop a value from the buffer
385    ///
386    /// 从缓冲区弹出一个值
387    ///
388    /// # Errors
389    /// Returns `PopError::Empty` if the buffer is empty
390    ///
391    /// # 错误
392    /// 如果缓冲区空则返回 `PopError::Empty`
393    #[inline]
394    pub fn pop(&mut self) -> Result<T, PopError> {
395        let read = self.shared.core.read_idx().load(Ordering::Relaxed);
396        let mut write = self.cached_write;
397
398        // Check if buffer is empty
399        // 检查缓冲区是否为空
400        if read == write {
401            // Update cached write index from producer
402            // 从生产者更新缓存的写索引
403            write = self.shared.core.write_idx().load(Ordering::Acquire);
404            self.cached_write = write;
405
406            if read == write {
407                return Err(PopError::Empty);
408            }
409        }
410
411        // Read value from buffer
412        // 从缓冲区读取值
413        let index = read & self.shared.core.mask();
414        let value = unsafe { self.shared.core.read_at(index) };
415
416        // Update read index with Release ordering to ensure visibility
417        // 使用 Release 顺序更新读索引以确保可见性
418        self.shared
419            .core
420            .read_idx()
421            .store(read.wrapping_add(1), Ordering::Release);
422
423        Ok(value)
424    }
425
426    /// Check if the buffer is empty
427    ///
428    /// 检查缓冲区是否为空
429    #[inline]
430    pub fn is_empty(&self) -> bool {
431        let read = self.shared.core.read_idx().load(Ordering::Relaxed);
432        let write = self.shared.core.write_idx().load(Ordering::Acquire);
433        read == write
434    }
435
436    /// Get the number of elements currently in the buffer
437    ///
438    /// 获取缓冲区中当前的元素数量
439    #[inline]
440    pub fn slots(&self) -> usize {
441        let read = self.shared.core.read_idx().load(Ordering::Relaxed);
442        let write = self.shared.core.write_idx().load(Ordering::Acquire);
443        write.wrapping_sub(read)
444    }
445
446    /// Get the number of elements currently in the buffer (alias for `slots`)
447    ///
448    /// 获取缓冲区中当前的元素数量(`slots` 的别名)
449    #[inline]
450    pub fn len(&self) -> usize {
451        self.slots()
452    }
453
454    /// Get the capacity of the buffer
455    ///
456    /// 获取缓冲区容量
457    #[inline]
458    pub fn capacity(&self) -> usize {
459        self.shared.core.capacity()
460    }
461
462    /// Peek at the first element without removing it
463    ///
464    /// 查看第一个元素但不移除它
465    ///
466    /// # Returns
467    /// `Some(&T)` if there is an element, `None` if the buffer is empty
468    ///
469    /// # 返回值
470    /// 如果有元素则返回 `Some(&T)`,如果缓冲区为空则返回 `None`
471    ///
472    /// # Safety
473    /// The returned reference is valid only as long as no other operations
474    /// are performed on the Consumer that might modify the buffer.
475    ///
476    /// # 安全性
477    /// 返回的引用仅在未对 Consumer 执行可能修改缓冲区的其他操作时有效。
478    #[inline]
479    pub fn peek(&self) -> Option<&T> {
480        let read = self.shared.core.read_idx().load(Ordering::Relaxed);
481        let write = self.shared.core.write_idx().load(Ordering::Acquire);
482
483        if read == write {
484            return None;
485        }
486
487        let index = read & self.shared.core.mask();
488        unsafe { Some(self.shared.core.peek_at(index)) }
489    }
490
491    /// Clear all elements from the buffer
492    ///
493    /// 清空缓冲区中的所有元素
494    ///
495    /// This method pops and drops all elements currently in the buffer.
496    ///
497    /// 此方法弹出并 drop 缓冲区中当前的所有元素。
498    pub fn clear(&mut self) {
499        while self.pop().is_ok() {
500            // Elements are dropped automatically
501            // 元素自动被 drop
502        }
503    }
504
505    /// Create a draining iterator
506    ///
507    /// 创建一个消费迭代器
508    ///
509    /// Returns an iterator that removes and returns elements from the buffer.
510    /// The iterator will continue until the buffer is empty.
511    ///
512    /// 返回一个从缓冲区中移除并返回元素的迭代器。
513    /// 迭代器将持续运行直到缓冲区为空。
514    ///
515    /// # Examples
516    ///
517    /// ```
518    /// use smallring::spsc::new;
519    /// use std::num::NonZero;
520    ///
521    /// let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
522    /// producer.push(1).unwrap();
523    /// producer.push(2).unwrap();
524    /// producer.push(3).unwrap();
525    ///
526    /// let items: Vec<i32> = consumer.drain().collect();
527    /// assert_eq!(items, vec![1, 2, 3]);
528    /// assert!(consumer.is_empty());
529    /// ```
530    #[inline]
531    pub fn drain(&mut self) -> Drain<'_, T, N> {
532        Drain { consumer: self }
533    }
534
535    /// Get a reference to the shared buffer data
536    ///
537    /// 获取共享缓冲区数据的引用
538    #[inline]
539    pub fn buffer(&self) -> &SharedData<T, N> {
540        &self.shared
541    }
542}
543
544impl<T: Copy, const N: usize> Consumer<T, N> {
545    /// Pop multiple values into a slice
546    ///
547    /// 将多个值批量弹出到切片
548    ///
549    /// This method attempts to pop as many elements as possible into the provided slice.
550    /// It returns the number of elements successfully popped.
551    ///
552    /// 此方法尝试将尽可能多的元素弹出到提供的切片中。
553    /// 返回成功弹出的元素数量。
554    ///
555    /// # Parameters
556    /// - `dest`: Destination slice to pop values into
557    ///
558    /// # Returns
559    /// Number of elements successfully popped (0 to dest.len())
560    ///
561    /// # 参数
562    /// - `dest`: 用于接收值的目标切片
563    ///
564    /// # 返回值
565    /// 成功弹出的元素数量(0 到 dest.len())
566    ///
567    /// # Performance
568    /// This method uses optimized memory copy operations for better performance
569    /// than popping elements one by one.
570    ///
571    /// # 性能
572    /// 此方法使用优化的内存拷贝操作,性能优于逐个弹出元素。
573    #[inline]
574    pub fn pop_slice(&mut self, dest: &mut [T]) -> usize {
575        if dest.is_empty() {
576            return 0;
577        }
578
579        let read = self.shared.core.read_idx().load(Ordering::Relaxed);
580        let mut write = self.cached_write;
581
582        // Calculate available elements
583        // 计算可用元素数量
584        let mut available = write.wrapping_sub(read);
585
586        if available == 0 {
587            // Update cached write index
588            // 更新缓存的写索引
589            write = self.shared.core.write_idx().load(Ordering::Acquire);
590            self.cached_write = write;
591            available = write.wrapping_sub(read);
592
593            if available == 0 {
594                return 0;
595            }
596        }
597
598        // Determine how many elements we can pop
599        // 确定我们可以弹出多少元素
600        let to_pop = available.min(dest.len());
601
602        // Use core's batch copy functionality
603        // 使用核心模块的批量拷贝功能
604        unsafe {
605            self.shared.core.copy_to_slice(read, dest, to_pop);
606        }
607
608        // Update read index with Release ordering
609        // 使用 Release 顺序更新读索引
610        self.shared
611            .core
612            .read_idx()
613            .store(read.wrapping_add(to_pop), Ordering::Release);
614
615        to_pop
616    }
617}
618
619impl<T, const N: usize> Drop for Consumer<T, N> {
620    fn drop(&mut self) {
621        // Clean up any remaining elements in the buffer
622        // 清理缓冲区中的剩余元素
623        while self.pop().is_ok() {
624            // Elements are dropped automatically
625            // 元素自动被 drop
626        }
627    }
628}
629
630#[cfg(all(test, not(feature = "loom")))]
631mod tests {
632    use super::*;
633
634    #[test]
635    fn test_basic_push_pop() {
636        let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
637
638        assert!(producer.push(1).is_ok());
639        assert!(producer.push(2).is_ok());
640        assert!(producer.push(3).is_ok());
641
642        assert_eq!(consumer.pop().unwrap(), 1);
643        assert_eq!(consumer.pop().unwrap(), 2);
644        assert_eq!(consumer.pop().unwrap(), 3);
645        assert!(consumer.pop().is_err());
646    }
647
648    #[test]
649    fn test_capacity_rounding() {
650        let (_, consumer) = new::<i32, 32>(NonZero::new(5).unwrap());
651        // 5 should round up to 8 (next power of 2)
652        assert_eq!(consumer.buffer().capacity(), 8);
653
654        let (_, consumer) = new::<i32, 64>(NonZero::new(32).unwrap());
655        assert_eq!(consumer.buffer().capacity(), 32);
656
657        let (_, consumer) = new::<i32, 128>(NonZero::new(33).unwrap());
658        // 33 should round up to 64
659        assert_eq!(consumer.buffer().capacity(), 64);
660    }
661
662    #[test]
663    fn test_buffer_full() {
664        let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
665        // Actual capacity is 4, but we can only store 3 items (one slot reserved)
666
667        assert!(producer.push(1).is_ok());
668        assert!(producer.push(2).is_ok());
669        assert!(producer.push(3).is_ok());
670        assert!(producer.push(4).is_ok());
671
672        // Buffer should be full now
673        assert!(matches!(producer.push(5), Err(PushError::Full(5))));
674
675        // Pop one item to make space
676        assert_eq!(consumer.pop().unwrap(), 1);
677
678        // Now we should be able to push again
679        assert!(producer.push(5).is_ok());
680    }
681
682    #[test]
683    fn test_buffer_empty() {
684        let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
685
686        assert!(consumer.pop().is_err());
687        assert!(consumer.is_empty());
688
689        producer.push(42).unwrap();
690        assert!(!consumer.is_empty());
691
692        consumer.pop().unwrap();
693        assert!(consumer.is_empty());
694    }
695
696    #[test]
697    fn test_slots() {
698        let (mut producer, consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
699
700        assert_eq!(consumer.slots(), 0);
701
702        producer.push(1).unwrap();
703        producer.push(2).unwrap();
704        producer.push(3).unwrap();
705
706        assert_eq!(consumer.slots(), 3);
707    }
708
709    #[test]
710    fn test_wrap_around() {
711        let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
712
713        // Fill and empty the buffer multiple times to test wrap-around
714        for round in 0..10 {
715            for i in 0..4 {
716                producer.push(round * 10 + i).unwrap();
717            }
718
719            for i in 0..4 {
720                assert_eq!(consumer.pop().unwrap(), round * 10 + i);
721            }
722        }
723    }
724
725    #[test]
726    fn test_drop_cleanup() {
727        use std::sync::Arc;
728        use std::sync::atomic::{AtomicUsize, Ordering};
729
730        #[derive(Debug)]
731        struct DropCounter {
732            counter: Arc<AtomicUsize>,
733        }
734
735        impl Drop for DropCounter {
736            fn drop(&mut self) {
737                self.counter.fetch_add(1, Ordering::SeqCst);
738            }
739        }
740
741        let counter = Arc::new(AtomicUsize::new(0));
742
743        {
744            let (mut producer, consumer) = new::<DropCounter, 32>(NonZero::new(8).unwrap());
745
746            for _ in 0..5 {
747                producer
748                    .push(DropCounter {
749                        counter: counter.clone(),
750                    })
751                    .unwrap();
752            }
753
754            // Drop consumer, which should drop all remaining items
755            drop(consumer);
756        }
757
758        // All 5 items should have been dropped
759        assert_eq!(counter.load(Ordering::SeqCst), 5);
760    }
761
762    #[test]
763    fn test_concurrent_access() {
764        use std::thread;
765
766        let (mut producer, mut consumer) = new::<u64, 128>(NonZero::new(128).unwrap());
767
768        let producer_handle = thread::spawn(move || {
769            for i in 0..1000 {
770                loop {
771                    if producer.push(i).is_ok() {
772                        break;
773                    }
774                    thread::yield_now();
775                }
776            }
777        });
778
779        let consumer_handle = thread::spawn(move || {
780            let mut received = Vec::new();
781            for _ in 0..1000 {
782                loop {
783                    match consumer.pop() {
784                        Ok(val) => {
785                            received.push(val);
786                            break;
787                        }
788                        Err(_) => thread::yield_now(),
789                    }
790                }
791            }
792            received
793        });
794
795        producer_handle.join().unwrap();
796        let received = consumer_handle.join().unwrap();
797
798        // Verify all numbers were received in order
799        assert_eq!(received.len(), 1000);
800        for (i, &val) in received.iter().enumerate() {
801            assert_eq!(val, i as u64);
802        }
803    }
804
805    #[test]
806    fn test_small_capacity_stack_allocation() {
807        // Test that small capacities (≤32) use stack allocation
808        // This test mainly ensures the code compiles and works with FixedVec
809        let (mut producer, mut consumer) = new::<u8, 32>(NonZero::new(16).unwrap());
810
811        for i in 0..10 {
812            producer.push(i).unwrap();
813        }
814
815        for i in 0..10 {
816            assert_eq!(consumer.pop().unwrap(), i);
817        }
818    }
819
820    #[test]
821    fn test_large_capacity_heap_allocation() {
822        // Test that large capacities (>32) work correctly with heap allocation
823        let (mut producer, mut consumer) = new::<u8, 32>(NonZero::new(64).unwrap());
824
825        for i in 0..50 {
826            producer.push(i).unwrap();
827        }
828
829        for i in 0..50 {
830            assert_eq!(consumer.pop().unwrap(), i);
831        }
832    }
833
834    // ==================== 边界条件测试 / Boundary Condition Tests ====================
835
836    #[test]
837    fn test_capacity_one() {
838        // Test minimum capacity
839        // 测试最小容量
840        let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(1).unwrap());
841
842        // After rounding, capacity should be 1 (2^0)
843        // 向上取整后,容量应为 1 (2^0)
844        assert_eq!(consumer.buffer().capacity(), 1);
845
846        assert!(producer.push(42).is_ok());
847        assert!(matches!(producer.push(99), Err(PushError::Full(99))));
848
849        assert_eq!(consumer.pop().unwrap(), 42);
850        assert!(consumer.pop().is_err());
851    }
852
853    #[test]
854    fn test_power_of_two_capacities() {
855        // Test various power-of-2 capacities
856        // 测试各种 2 的幂次容量
857        for power in 0..10 {
858            let capacity = 1 << power; // 2^power
859            let (_, consumer) = new::<u8, 128>(NonZero::new(capacity).unwrap());
860            assert_eq!(consumer.buffer().capacity(), capacity);
861        }
862    }
863
864    #[test]
865    fn test_non_power_of_two_rounding() {
866        // Test that non-power-of-2 capacities are rounded up correctly
867        // 测试非 2 的幂次容量正确向上取整
868        let test_cases = vec![
869            (3, 4),
870            (5, 8),
871            (7, 8),
872            (9, 16),
873            (15, 16),
874            (17, 32),
875            (31, 32),
876            (33, 64),
877            (100, 128),
878            (1000, 1024),
879        ];
880
881        for (input, expected) in test_cases {
882            let (_, consumer) = new::<u8, 128>(NonZero::new(input).unwrap());
883            assert_eq!(
884                consumer.buffer().capacity(),
885                expected,
886                "Capacity {} should round up to {}",
887                input,
888                expected
889            );
890        }
891    }
892
893    #[test]
894    fn test_single_element_operations() {
895        // Test push and pop with single element repeatedly
896        // 测试单个元素的重复推送和弹出
897        let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
898
899        for i in 0..100 {
900            producer.push(i).unwrap();
901            assert_eq!(consumer.slots(), 1);
902            assert_eq!(consumer.pop().unwrap(), i);
903            assert_eq!(consumer.slots(), 0);
904            assert!(consumer.is_empty());
905        }
906    }
907
908    #[test]
909    fn test_alternating_push_pop() {
910        // Test alternating push and pop operations
911        // 测试交替推送和弹出操作
912        let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
913
914        for i in 0..50 {
915            producer.push(i * 2).unwrap();
916            producer.push(i * 2 + 1).unwrap();
917            assert_eq!(consumer.pop().unwrap(), i * 2);
918            assert_eq!(consumer.pop().unwrap(), i * 2 + 1);
919        }
920    }
921
922    #[test]
923    fn test_index_wrapping() {
924        // Test that indices wrap correctly after overflow
925        // 测试索引在溢出后正确环绕
926        let (mut producer, mut consumer) = new::<usize, 32>(NonZero::new(4).unwrap());
927
928        // Fill and empty many times to cause index wrapping
929        // 多次填充和清空以导致索引环绕
930        for iteration in 0..1000 {
931            for i in 0..4 {
932                producer.push(iteration * 4 + i).unwrap();
933            }
934            for i in 0..4 {
935                assert_eq!(consumer.pop().unwrap(), iteration * 4 + i);
936            }
937        }
938    }
939
940    // ==================== 不同 N 值测试 / Different N Value Tests ====================
941
942    #[test]
943    fn test_various_stack_thresholds() {
944        // Test with N = 16
945        let (mut p1, mut c1) = new::<u32, 16>(NonZero::new(8).unwrap());
946        p1.push(1).unwrap();
947        assert_eq!(c1.pop().unwrap(), 1);
948
949        // Test with N = 64
950        let (mut p2, mut c2) = new::<u32, 64>(NonZero::new(32).unwrap());
951        p2.push(2).unwrap();
952        assert_eq!(c2.pop().unwrap(), 2);
953
954        // Test with N = 128
955        let (mut p3, mut c3) = new::<u32, 128>(NonZero::new(64).unwrap());
956        p3.push(3).unwrap();
957        assert_eq!(c3.pop().unwrap(), 3);
958
959        // Test with N = 256
960        let (mut p4, mut c4) = new::<u32, 256>(NonZero::new(128).unwrap());
961        p4.push(4).unwrap();
962        assert_eq!(c4.pop().unwrap(), 4);
963    }
964
965    #[test]
966    fn test_small_n_with_large_capacity() {
967        // Test N=8 with capacity > N (should use heap)
968        // 测试 N=8 但容量 > N(应使用堆)
969        let (mut producer, mut consumer) = new::<u64, 8>(NonZero::new(32).unwrap());
970
971        for i in 0..20 {
972            producer.push(i).unwrap();
973        }
974
975        for i in 0..20 {
976            assert_eq!(consumer.pop().unwrap(), i);
977        }
978    }
979
980    #[test]
981    fn test_large_n_with_small_capacity() {
982        // Test N=256 with capacity < N (should use stack)
983        // 测试 N=256 但容量 < N(应使用栈)
984        let (mut producer, mut consumer) = new::<u64, 256>(NonZero::new(16).unwrap());
985
986        for i in 0..10 {
987            producer.push(i).unwrap();
988        }
989
990        for i in 0..10 {
991            assert_eq!(consumer.pop().unwrap(), i);
992        }
993    }
994
995    // ==================== 类型测试 / Type Tests ====================
996
997    #[test]
998    fn test_zero_sized_types() {
999        // Test with zero-sized type
1000        // 测试零大小类型
1001        let (mut producer, mut consumer) = new::<(), 32>(NonZero::new(4).unwrap());
1002
1003        producer.push(()).unwrap();
1004        producer.push(()).unwrap();
1005
1006        assert_eq!(consumer.pop().unwrap(), ());
1007        assert_eq!(consumer.pop().unwrap(), ());
1008    }
1009
1010    #[test]
1011    fn test_large_types() {
1012        // Test with large struct
1013        // 测试大型结构体
1014        #[derive(Debug, PartialEq, Clone)]
1015        struct LargeStruct {
1016            data: [u64; 32],
1017        }
1018
1019        let (mut producer, mut consumer) = new::<LargeStruct, 32>(NonZero::new(4).unwrap());
1020
1021        let item1 = LargeStruct { data: [1; 32] };
1022        let item2 = LargeStruct { data: [2; 32] };
1023
1024        producer.push(item1.clone()).unwrap();
1025        producer.push(item2.clone()).unwrap();
1026
1027        assert_eq!(consumer.pop().unwrap(), item1);
1028        assert_eq!(consumer.pop().unwrap(), item2);
1029    }
1030
1031    #[test]
1032    fn test_string_type() {
1033        // Test with String (heap-allocated type)
1034        // 测试 String(堆分配类型)
1035        let (mut producer, mut consumer) = new::<String, 32>(NonZero::new(8).unwrap());
1036
1037        let messages = vec!["Hello", "World", "Rust", "Ring", "Buffer"];
1038
1039        for msg in &messages {
1040            producer.push(msg.to_string()).unwrap();
1041        }
1042
1043        for msg in &messages {
1044            assert_eq!(consumer.pop().unwrap(), msg.to_string());
1045        }
1046    }
1047
1048    #[test]
1049    fn test_option_type() {
1050        // Test with Option<T>
1051        // 测试 Option<T>
1052        let (mut producer, mut consumer) = new::<Option<i32>, 32>(NonZero::new(4).unwrap());
1053
1054        producer.push(Some(42)).unwrap();
1055        producer.push(None).unwrap();
1056        producer.push(Some(100)).unwrap();
1057
1058        assert_eq!(consumer.pop().unwrap(), Some(42));
1059        assert_eq!(consumer.pop().unwrap(), None);
1060        assert_eq!(consumer.pop().unwrap(), Some(100));
1061    }
1062
1063    // ==================== 并发测试 / Concurrency Tests ====================
1064
1065    #[test]
1066    fn test_concurrent_small_buffer() {
1067        // Test concurrent access with small buffer (high contention)
1068        // 测试小缓冲区的并发访问(高竞争)
1069        use std::thread;
1070
1071        let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(4).unwrap());
1072
1073        let count = 100;
1074
1075        let producer_handle = thread::spawn(move || {
1076            for i in 0..count {
1077                loop {
1078                    if producer.push(i).is_ok() {
1079                        break;
1080                    }
1081                    thread::yield_now();
1082                }
1083            }
1084        });
1085
1086        let consumer_handle = thread::spawn(move || {
1087            let mut sum = 0;
1088            for _ in 0..count {
1089                loop {
1090                    match consumer.pop() {
1091                        Ok(val) => {
1092                            sum += val;
1093                            break;
1094                        }
1095                        Err(_) => thread::yield_now(),
1096                    }
1097                }
1098            }
1099            sum
1100        });
1101
1102        producer_handle.join().unwrap();
1103        let sum = consumer_handle.join().unwrap();
1104
1105        // Sum of 0..100 = 100*99/2 = 4950
1106        assert_eq!(sum, (count * (count - 1)) / 2);
1107    }
1108
1109    #[test]
1110    fn test_concurrent_large_buffer() {
1111        // Test concurrent access with large buffer (low contention)
1112        // 测试大缓冲区的并发访问(低竞争)
1113        use std::thread;
1114
1115        let (mut producer, mut consumer) = new::<u64, 512>(NonZero::new(512).unwrap());
1116
1117        let count = 10000;
1118
1119        let producer_handle = thread::spawn(move || {
1120            for i in 0..count {
1121                loop {
1122                    if producer.push(i).is_ok() {
1123                        break;
1124                    }
1125                    thread::yield_now();
1126                }
1127            }
1128        });
1129
1130        let consumer_handle = thread::spawn(move || {
1131            let mut received = 0;
1132            for _ in 0..count {
1133                loop {
1134                    match consumer.pop() {
1135                        Ok(_) => {
1136                            received += 1;
1137                            break;
1138                        }
1139                        Err(_) => thread::yield_now(),
1140                    }
1141                }
1142            }
1143            received
1144        });
1145
1146        producer_handle.join().unwrap();
1147        let received = consumer_handle.join().unwrap();
1148
1149        assert_eq!(received, count);
1150    }
1151
1152    #[test]
1153    fn test_concurrent_with_different_speeds() {
1154        // Test when producer and consumer have different speeds
1155        // 测试生产者和消费者速度不同的情况
1156        use std::thread;
1157        use std::time::Duration;
1158
1159        let (mut producer, mut consumer) = new::<u32, 64>(NonZero::new(32).unwrap());
1160
1161        let producer_handle = thread::spawn(move || {
1162            for i in 0..50 {
1163                loop {
1164                    if producer.push(i).is_ok() {
1165                        break;
1166                    }
1167                    thread::yield_now();
1168                }
1169                // Slow producer
1170                // 慢速生产者
1171                if i % 10 == 0 {
1172                    thread::sleep(Duration::from_micros(1));
1173                }
1174            }
1175        });
1176
1177        let consumer_handle = thread::spawn(move || {
1178            let mut received = Vec::new();
1179            for _ in 0..50 {
1180                loop {
1181                    match consumer.pop() {
1182                        Ok(val) => {
1183                            received.push(val);
1184                            break;
1185                        }
1186                        Err(_) => thread::yield_now(),
1187                    }
1188                }
1189            }
1190            received
1191        });
1192
1193        producer_handle.join().unwrap();
1194        let received = consumer_handle.join().unwrap();
1195
1196        assert_eq!(received.len(), 50);
1197        for (i, &val) in received.iter().enumerate() {
1198            assert_eq!(val, i as u32);
1199        }
1200    }
1201
1202    // ==================== 错误处理测试 / Error Handling Tests ====================
1203
1204    #[test]
1205    fn test_push_error_value_returned() {
1206        // Test that PushError returns the value
1207        // 测试 PushError 返回值
1208        let (mut producer, _consumer) = new::<String, 32>(NonZero::new(2).unwrap());
1209
1210        producer.push("first".to_string()).unwrap();
1211        producer.push("second".to_string()).unwrap();
1212
1213        let value = "third".to_string();
1214        match producer.push(value.clone()) {
1215            Err(PushError::Full(returned_value)) => {
1216                assert_eq!(returned_value, value);
1217            }
1218            Ok(_) => panic!("Expected PushError::Full"),
1219        }
1220    }
1221
1222    #[test]
1223    fn test_pop_error() {
1224        // Test PopError::Empty
1225        // 测试 PopError::Empty
1226        let (_producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
1227
1228        match consumer.pop() {
1229            Err(PopError::Empty) => {} // Expected
1230            Ok(_) => panic!("Expected PopError::Empty"),
1231        }
1232    }
1233
1234    // ==================== Consumer 方法测试 / Consumer Method Tests ====================
1235
1236    #[test]
1237    fn test_is_empty_after_operations() {
1238        // Test is_empty() with various operations
1239        // 测试各种操作后的 is_empty()
1240        let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(4).unwrap());
1241
1242        assert!(consumer.is_empty());
1243
1244        producer.push(1).unwrap();
1245        assert!(!consumer.is_empty());
1246
1247        producer.push(2).unwrap();
1248        assert!(!consumer.is_empty());
1249
1250        consumer.pop().unwrap();
1251        assert!(!consumer.is_empty());
1252
1253        consumer.pop().unwrap();
1254        assert!(consumer.is_empty());
1255    }
1256
1257    #[test]
1258    fn test_slots_accuracy() {
1259        // Test that slots() returns accurate count
1260        // 测试 slots() 返回准确计数
1261        let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
1262
1263        assert_eq!(consumer.slots(), 0);
1264
1265        for i in 1..=10 {
1266            producer.push(i).unwrap();
1267            assert_eq!(consumer.slots(), i as usize);
1268        }
1269
1270        for i in (0..10).rev() {
1271            consumer.pop().unwrap();
1272            assert_eq!(consumer.slots(), i);
1273        }
1274
1275        assert_eq!(consumer.slots(), 0);
1276    }
1277
1278    #[test]
1279    fn test_slots_with_wrap_around() {
1280        // Test slots() after indices wrap around
1281        // 测试索引环绕后的 slots()
1282        let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(4).unwrap());
1283
1284        // Cause many wrap-arounds
1285        // 导致多次环绕
1286        for _ in 0..100 {
1287            for i in 0..3 {
1288                producer.push(i).unwrap();
1289            }
1290            assert_eq!(consumer.slots(), 3);
1291
1292            for _ in 0..3 {
1293                consumer.pop().unwrap();
1294            }
1295            assert_eq!(consumer.slots(), 0);
1296        }
1297    }
1298
1299    // ==================== Drop 和内存测试 / Drop and Memory Tests ====================
1300
1301    #[test]
1302    fn test_partial_drop_cleanup() {
1303        // Test that consumer drops only remaining items
1304        // 测试消费者仅 drop 剩余项
1305        use std::sync::Arc;
1306        use std::sync::atomic::{AtomicUsize, Ordering};
1307
1308        #[derive(Debug)]
1309        struct DropCounter {
1310            counter: Arc<AtomicUsize>,
1311        }
1312
1313        impl Drop for DropCounter {
1314            fn drop(&mut self) {
1315                self.counter.fetch_add(1, Ordering::SeqCst);
1316            }
1317        }
1318
1319        let counter = Arc::new(AtomicUsize::new(0));
1320
1321        {
1322            let (mut producer, mut consumer) = new::<DropCounter, 32>(NonZero::new(16).unwrap());
1323
1324            // Push 10 items (capacity is 16, so no overflow)
1325            for _ in 0..10 {
1326                producer
1327                    .push(DropCounter {
1328                        counter: counter.clone(),
1329                    })
1330                    .unwrap();
1331            }
1332
1333            // Pop 6 items (they should be dropped)
1334            for _ in 0..6 {
1335                consumer.pop().unwrap();
1336            }
1337
1338            // At this point, 6 items have been dropped
1339            assert_eq!(counter.load(Ordering::SeqCst), 6);
1340
1341            // Drop consumer, which should drop remaining 4 items
1342            drop(consumer);
1343        }
1344
1345        // All 10 items should have been dropped
1346        assert_eq!(counter.load(Ordering::SeqCst), 10);
1347    }
1348
1349    #[test]
1350    fn test_empty_buffer_drop() {
1351        // Test dropping empty buffer
1352        // 测试 drop 空缓冲区
1353        use std::sync::Arc;
1354        use std::sync::atomic::{AtomicUsize, Ordering};
1355
1356        #[derive(Debug)]
1357        struct DropCounter {
1358            counter: Arc<AtomicUsize>,
1359        }
1360
1361        impl Drop for DropCounter {
1362            fn drop(&mut self) {
1363                self.counter.fetch_add(1, Ordering::SeqCst);
1364            }
1365        }
1366
1367        let counter = Arc::new(AtomicUsize::new(0));
1368
1369        {
1370            let (mut producer, mut consumer) = new::<DropCounter, 32>(NonZero::new(8).unwrap());
1371
1372            producer
1373                .push(DropCounter {
1374                    counter: counter.clone(),
1375                })
1376                .unwrap();
1377            consumer.pop().unwrap();
1378
1379            // Buffer is now empty
1380            assert!(consumer.is_empty());
1381
1382            // Drop should not drop anything
1383            drop(consumer);
1384        }
1385
1386        // Only 1 item should have been dropped (the one we popped)
1387        assert_eq!(counter.load(Ordering::SeqCst), 1);
1388    }
1389
1390    // ==================== 压力测试 / Stress Tests ====================
1391
1392    #[test]
1393    fn test_high_throughput() {
1394        // Stress test with high throughput
1395        // 高吞吐量压力测试
1396        use std::thread;
1397
1398        let (mut producer, mut consumer) = new::<u64, 256>(NonZero::new(256).unwrap());
1399        let count = 100000;
1400
1401        let producer_handle = thread::spawn(move || {
1402            for i in 0..count {
1403                loop {
1404                    if producer.push(i).is_ok() {
1405                        break;
1406                    }
1407                    thread::yield_now();
1408                }
1409            }
1410        });
1411
1412        let consumer_handle = thread::spawn(move || {
1413            let mut last = None;
1414            for _ in 0..count {
1415                loop {
1416                    match consumer.pop() {
1417                        Ok(val) => {
1418                            if let Some(prev) = last {
1419                                assert_eq!(val, prev + 1, "Values must be sequential");
1420                            }
1421                            last = Some(val);
1422                            break;
1423                        }
1424                        Err(_) => thread::yield_now(),
1425                    }
1426                }
1427            }
1428            last
1429        });
1430
1431        producer_handle.join().unwrap();
1432        let last = consumer_handle.join().unwrap();
1433
1434        assert_eq!(last, Some(count - 1));
1435    }
1436
1437    // ==================== 新增 API 测试 / New API Tests ====================
1438
1439    #[test]
1440    fn test_producer_capacity_queries() {
1441        // Test Producer capacity query methods
1442        // 测试 Producer 容量查询方法
1443        let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
1444
1445        assert_eq!(producer.capacity(), 8);
1446        assert_eq!(producer.len(), 0);
1447        assert_eq!(producer.slots(), 0);
1448        assert_eq!(producer.free_slots(), 8);
1449        assert!(!producer.is_full());
1450
1451        producer.push(1).unwrap();
1452        producer.push(2).unwrap();
1453        producer.push(3).unwrap();
1454
1455        assert_eq!(producer.len(), 3);
1456        assert_eq!(producer.slots(), 3);
1457        assert_eq!(producer.free_slots(), 5);
1458        assert!(!producer.is_full());
1459
1460        // Fill the buffer
1461        producer.push(4).unwrap();
1462        producer.push(5).unwrap();
1463        producer.push(6).unwrap();
1464        producer.push(7).unwrap();
1465        producer.push(8).unwrap();
1466
1467        assert_eq!(producer.len(), 8);
1468        assert_eq!(producer.slots(), 8);
1469        assert_eq!(producer.free_slots(), 0);
1470        assert!(producer.is_full());
1471
1472        // Pop one and check again
1473        consumer.pop().unwrap();
1474
1475        assert_eq!(producer.len(), 7);
1476        assert_eq!(producer.free_slots(), 1);
1477        assert!(!producer.is_full());
1478    }
1479
1480    #[test]
1481    fn test_consumer_len_and_capacity() {
1482        // Test Consumer len() and capacity() methods
1483        // 测试 Consumer 的 len() 和 capacity() 方法
1484        let (mut producer, consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
1485
1486        assert_eq!(consumer.len(), 0);
1487        assert_eq!(consumer.capacity(), 16);
1488
1489        for i in 0..10 {
1490            producer.push(i).unwrap();
1491        }
1492
1493        assert_eq!(consumer.len(), 10);
1494        assert_eq!(consumer.capacity(), 16);
1495    }
1496
1497    #[test]
1498    fn test_peek() {
1499        // Test peek operation
1500        // 测试 peek 操作
1501        let (mut producer, consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
1502
1503        // Peek empty buffer
1504        assert!(consumer.peek().is_none());
1505
1506        producer.push(42).unwrap();
1507        producer.push(100).unwrap();
1508        producer.push(200).unwrap();
1509
1510        // Peek should return first element without removing it
1511        assert_eq!(consumer.peek(), Some(&42));
1512        assert_eq!(consumer.peek(), Some(&42)); // Peek again, should be same
1513        assert_eq!(consumer.len(), 3); // Length unchanged
1514    }
1515
1516    #[test]
1517    fn test_peek_after_pop() {
1518        // Test peek after pop operations
1519        // 测试 pop 后的 peek 操作
1520        let (mut producer, mut consumer) = new::<String, 32>(NonZero::new(8).unwrap());
1521
1522        producer.push("first".to_string()).unwrap();
1523        producer.push("second".to_string()).unwrap();
1524        producer.push("third".to_string()).unwrap();
1525
1526        assert_eq!(consumer.peek(), Some(&"first".to_string()));
1527        consumer.pop().unwrap();
1528
1529        assert_eq!(consumer.peek(), Some(&"second".to_string()));
1530        consumer.pop().unwrap();
1531
1532        assert_eq!(consumer.peek(), Some(&"third".to_string()));
1533        consumer.pop().unwrap();
1534
1535        assert!(consumer.peek().is_none());
1536    }
1537
1538    #[test]
1539    fn test_push_slice_basic() {
1540        // Test basic push_slice operation
1541        // 测试基本的 push_slice 操作
1542        let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(16).unwrap());
1543
1544        let data = [1, 2, 3, 4, 5];
1545        let pushed = producer.push_slice(&data);
1546
1547        assert_eq!(pushed, 5);
1548        assert_eq!(consumer.len(), 5);
1549
1550        for i in 0..5 {
1551            assert_eq!(consumer.pop().unwrap(), data[i]);
1552        }
1553    }
1554
1555    #[test]
1556    fn test_push_slice_partial() {
1557        // Test push_slice when buffer is partially full
1558        // 测试缓冲区部分满时的 push_slice
1559        let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
1560
1561        // Fill with 5 elements, leaving room for 3
1562        let initial = [1, 2, 3, 4, 5];
1563        producer.push_slice(&initial);
1564
1565        // Try to push 10 more, should only push 3
1566        let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
1567        let pushed = producer.push_slice(&more);
1568
1569        assert_eq!(pushed, 3);
1570        assert_eq!(consumer.len(), 8);
1571        assert!(producer.is_full());
1572
1573        // Verify values
1574        for i in 1..=8 {
1575            assert_eq!(consumer.pop().unwrap(), i);
1576        }
1577    }
1578
1579    #[test]
1580    fn test_push_slice_wrap_around() {
1581        // Test push_slice with wrap-around
1582        // 测试 push_slice 的环绕情况
1583        let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
1584
1585        // Fill buffer
1586        producer.push_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
1587
1588        // Pop some elements
1589        for _ in 0..5 {
1590            consumer.pop().unwrap();
1591        }
1592
1593        // Push more elements (will cause wrap-around)
1594        let data = [10, 11, 12, 13, 14];
1595        let pushed = producer.push_slice(&data);
1596
1597        assert_eq!(pushed, 5);
1598
1599        // Verify all values
1600        assert_eq!(consumer.pop().unwrap(), 6);
1601        assert_eq!(consumer.pop().unwrap(), 7);
1602        assert_eq!(consumer.pop().unwrap(), 8);
1603        assert_eq!(consumer.pop().unwrap(), 10);
1604        assert_eq!(consumer.pop().unwrap(), 11);
1605        assert_eq!(consumer.pop().unwrap(), 12);
1606        assert_eq!(consumer.pop().unwrap(), 13);
1607        assert_eq!(consumer.pop().unwrap(), 14);
1608    }
1609
1610    #[test]
1611    fn test_push_slice_empty() {
1612        // Test push_slice with empty slice
1613        // 测试空切片的 push_slice
1614        let (mut producer, _consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
1615
1616        let pushed = producer.push_slice(&[]);
1617        assert_eq!(pushed, 0);
1618    }
1619
1620    #[test]
1621    fn test_pop_slice_basic() {
1622        // Test basic pop_slice operation
1623        // 测试基本的 pop_slice 操作
1624        let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(16).unwrap());
1625
1626        // Push some data
1627        for i in 0..10 {
1628            producer.push(i).unwrap();
1629        }
1630
1631        let mut dest = [0u32; 5];
1632        let popped = consumer.pop_slice(&mut dest);
1633
1634        assert_eq!(popped, 5);
1635        assert_eq!(dest, [0, 1, 2, 3, 4]);
1636        assert_eq!(consumer.len(), 5);
1637    }
1638
1639    #[test]
1640    fn test_pop_slice_partial() {
1641        // Test pop_slice when buffer has fewer elements than dest
1642        // 测试当缓冲区元素少于目标切片时的 pop_slice
1643        let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(16).unwrap());
1644
1645        producer.push(1).unwrap();
1646        producer.push(2).unwrap();
1647        producer.push(3).unwrap();
1648
1649        let mut dest = [0u32; 10];
1650        let popped = consumer.pop_slice(&mut dest);
1651
1652        assert_eq!(popped, 3);
1653        assert_eq!(&dest[0..3], &[1, 2, 3]);
1654        assert!(consumer.is_empty());
1655    }
1656
1657    #[test]
1658    fn test_pop_slice_wrap_around() {
1659        // Test pop_slice with wrap-around
1660        // 测试 pop_slice 的环绕情况
1661        let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
1662
1663        // Fill buffer
1664        producer.push_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
1665
1666        // Pop 5 elements
1667        let mut temp = [0u32; 5];
1668        let popped = consumer.pop_slice(&mut temp);
1669        assert_eq!(popped, 5);
1670        assert_eq!(temp, [1, 2, 3, 4, 5]);
1671
1672        // Push 5 more elements (will cause wrap-around in the ring buffer)
1673        let pushed = producer.push_slice(&[9, 10, 11, 12, 13]);
1674        assert_eq!(pushed, 5);
1675
1676        // Pop remaining 3 elements from first batch
1677        let mut dest1 = [0u32; 3];
1678        let popped1 = consumer.pop_slice(&mut dest1);
1679        assert_eq!(popped1, 3);
1680        assert_eq!(dest1, [6, 7, 8]);
1681
1682        // Pop 5 elements from second batch
1683        let mut dest2 = [0u32; 5];
1684        let popped2 = consumer.pop_slice(&mut dest2);
1685        assert_eq!(popped2, 5);
1686        assert_eq!(dest2, [9, 10, 11, 12, 13]);
1687    }
1688
1689    #[test]
1690    fn test_pop_slice_empty() {
1691        // Test pop_slice on empty buffer
1692        // 测试空缓冲区的 pop_slice
1693        let (_producer, mut consumer) = new::<u32, 32>(NonZero::new(8).unwrap());
1694
1695        let mut dest = [0u32; 5];
1696        let popped = consumer.pop_slice(&mut dest);
1697
1698        assert_eq!(popped, 0);
1699    }
1700
1701    #[test]
1702    fn test_clear() {
1703        // Test clear operation
1704        // 测试 clear 操作
1705        let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
1706
1707        for i in 0..10 {
1708            producer.push(i).unwrap();
1709        }
1710
1711        assert_eq!(consumer.len(), 10);
1712
1713        consumer.clear();
1714
1715        assert_eq!(consumer.len(), 0);
1716        assert!(consumer.is_empty());
1717    }
1718
1719    #[test]
1720    fn test_clear_with_drop() {
1721        // Test that clear properly drops all elements
1722        // 测试 clear 正确 drop 所有元素
1723        use std::sync::Arc;
1724        use std::sync::atomic::{AtomicUsize, Ordering};
1725
1726        #[derive(Debug)]
1727        struct DropCounter {
1728            counter: Arc<AtomicUsize>,
1729        }
1730
1731        impl Drop for DropCounter {
1732            fn drop(&mut self) {
1733                self.counter.fetch_add(1, Ordering::SeqCst);
1734            }
1735        }
1736
1737        let counter = Arc::new(AtomicUsize::new(0));
1738
1739        {
1740            let (mut producer, mut consumer) = new::<DropCounter, 32>(NonZero::new(16).unwrap());
1741
1742            for _ in 0..8 {
1743                producer
1744                    .push(DropCounter {
1745                        counter: counter.clone(),
1746                    })
1747                    .unwrap();
1748            }
1749
1750            assert_eq!(counter.load(Ordering::SeqCst), 0);
1751
1752            consumer.clear();
1753
1754            assert_eq!(counter.load(Ordering::SeqCst), 8);
1755        }
1756    }
1757
1758    #[test]
1759    fn test_drain_iterator() {
1760        // Test drain iterator
1761        // 测试 drain 迭代器
1762        let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
1763
1764        for i in 0..10 {
1765            producer.push(i).unwrap();
1766        }
1767
1768        let collected: Vec<i32> = consumer.drain().collect();
1769
1770        assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1771        assert!(consumer.is_empty());
1772    }
1773
1774    #[test]
1775    fn test_drain_empty() {
1776        // Test drain on empty buffer
1777        // 测试空缓冲区的 drain
1778        let (_producer, mut consumer) = new::<i32, 32>(NonZero::new(8).unwrap());
1779
1780        let collected: Vec<i32> = consumer.drain().collect();
1781
1782        assert!(collected.is_empty());
1783    }
1784
1785    #[test]
1786    fn test_drain_size_hint() {
1787        // Test drain iterator size_hint
1788        // 测试 drain 迭代器的 size_hint
1789        let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
1790
1791        for i in 0..5 {
1792            producer.push(i).unwrap();
1793        }
1794
1795        let mut drain = consumer.drain();
1796
1797        assert_eq!(drain.size_hint(), (5, Some(5)));
1798
1799        drain.next();
1800        assert_eq!(drain.size_hint(), (4, Some(4)));
1801
1802        drain.next();
1803        assert_eq!(drain.size_hint(), (3, Some(3)));
1804    }
1805
1806    #[test]
1807    fn test_drain_partial() {
1808        // Test partially consuming drain iterator
1809        // 测试部分消费 drain 迭代器
1810        let (mut producer, mut consumer) = new::<i32, 32>(NonZero::new(16).unwrap());
1811
1812        for i in 0..10 {
1813            producer.push(i).unwrap();
1814        }
1815
1816        let mut drain = consumer.drain();
1817
1818        assert_eq!(drain.next(), Some(0));
1819        assert_eq!(drain.next(), Some(1));
1820        assert_eq!(drain.next(), Some(2));
1821
1822        drop(drain); // Drop the iterator
1823
1824        // Buffer should have 7 elements left
1825        assert_eq!(consumer.len(), 7);
1826    }
1827
1828    #[test]
1829    fn test_combined_operations() {
1830        // Test combining various new APIs
1831        // 测试组合使用各种新 API
1832        let (mut producer, mut consumer) = new::<u32, 32>(NonZero::new(16).unwrap());
1833
1834        // Batch push
1835        let data = [1, 2, 3, 4, 5];
1836        producer.push_slice(&data);
1837
1838        assert_eq!(producer.len(), 5);
1839        assert_eq!(consumer.len(), 5);
1840        assert_eq!(consumer.capacity(), 16);
1841
1842        // Peek
1843        assert_eq!(consumer.peek(), Some(&1));
1844
1845        // Batch pop
1846        let mut dest = [0u32; 3];
1847        consumer.pop_slice(&mut dest);
1848        assert_eq!(dest, [1, 2, 3]);
1849
1850        assert_eq!(consumer.len(), 2);
1851        assert_eq!(producer.free_slots(), 14);
1852
1853        // Clear remaining
1854        consumer.clear();
1855        assert!(consumer.is_empty());
1856        assert!(!producer.is_full());
1857    }
1858}