lite_sync/
spsc.rs

1/// High-performance async SPSC (Single Producer Single Consumer) channel
2/// 
3/// Built on top of `smallring` - a high-performance ring buffer with inline storage support.
4/// Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5/// The `N` const generic parameter allows specifying inline buffer size for zero-allocation small channels.
6/// 
7/// 高性能异步 SPSC(单生产者单消费者)通道
8/// 
9/// 基于 `smallring` 构建 - 一个支持内联存储的高性能环形缓冲区。
10/// 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
11/// `N` 常量泛型参数允许指定内联缓冲区大小,实现小容量通道的零分配。
12///
13/// # 安全性说明 (Safety Notes)
14///
15/// 本实现使用 `UnsafeCell` 来提供零成本内部可变性,而不是 `Mutex`。
16/// 这是安全的,基于以下保证:
17///
18/// 1. **单一所有权**:`Sender` 和 `Receiver` 都不实现 `Clone`,确保每个通道只有一个发送者和一个接收者
19/// 2. **访问隔离**:`Producer` 只被唯一的 `Sender` 访问,`Consumer` 只被唯一的 `Receiver` 访问
20/// 3. **无数据竞争**:由于单一所有权,不会有多个线程同时访问同一个 `Producer` 或 `Consumer`
21/// 4. **原子通信**:`Producer` 和 `Consumer` 内部使用原子操作进行跨线程通信
22/// 5. **类型系统保证**:通过类型系统强制 SPSC 语义,防止误用为 MPMC
23///
24/// 这种设计实现了零同步开销,完全消除了 `Mutex` 的性能损失。
25///
26/// # Safety Guarantees
27///
28/// This implementation uses `UnsafeCell` for zero-cost interior mutability instead of `Mutex`.
29/// This is safe based on the following guarantees:
30///
31/// 1. **Single Ownership**: Neither `Sender` nor `Receiver` implements `Clone`, ensuring only one sender and one receiver per channel
32/// 2. **Access Isolation**: `Producer` is only accessed by the unique `Sender`, `Consumer` only by the unique `Receiver`
33/// 3. **No Data Races**: Due to single ownership, there's no concurrent access to the same `Producer` or `Consumer`
34/// 4. **Atomic Communication**: `Producer` and `Consumer` use atomic operations internally for cross-thread communication
35/// 5. **Type System Enforcement**: SPSC semantics are enforced by the type system, preventing misuse as MPMC
36///
37/// This design achieves zero synchronization overhead, completely eliminating `Mutex` performance costs.
38
39use std::cell::UnsafeCell;
40use std::num::NonZeroUsize;
41use std::sync::atomic::{AtomicBool, Ordering};
42use std::sync::Arc;
43use super::notify::SingleWaiterNotify;
44
45/// SPSC channel creation function
46/// 
47/// Creates a bounded SPSC channel with the specified capacity.
48/// 
49/// # Type Parameters
50/// - `T`: The type of messages to be sent through the channel
51/// - `N`: The size of the inline buffer (number of elements stored inline before heap allocation)
52/// 
53/// # Parameters
54/// - `capacity`: Channel capacity (total number of elements the channel can hold)
55/// 
56/// # Returns
57/// A tuple of (Sender, Receiver)
58/// 
59/// # Examples
60/// 
61/// ```
62/// use lite_sync::spsc::channel;
63/// use std::num::NonZeroUsize;
64/// 
65/// #[tokio::main]
66/// async fn main() {
67///     // Create a channel with capacity 32 and inline buffer size 8
68///     let (tx, rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
69///     
70///     tokio::spawn(async move {
71///         tx.send(42).await.unwrap();
72///     });
73///     
74///     let value = rx.recv().await.unwrap();
75///     assert_eq!(value, 42);
76/// }
77/// ```
78/// 
79/// SPSC 通道创建函数
80/// 
81/// 创建指定容量的有界 SPSC 通道。
82/// 
83/// # 类型参数
84/// - `T`: 通过通道发送的消息类型
85/// - `N`: 内联缓冲区大小(在堆分配之前内联存储的元素数量)
86/// 
87/// # 参数
88/// - `capacity`: 通道容量(通道可以容纳的元素总数)
89/// 
90/// # 返回值
91/// 返回 (Sender, Receiver) 元组
92pub fn channel<T, const N: usize>(capacity: NonZeroUsize) -> (Sender<T, N>, Receiver<T, N>) {
93    let (producer, consumer) = smallring::new::<T, N>(capacity);
94    
95    let inner = Arc::new(Inner::<T, N> {
96        producer: UnsafeCell::new(producer),
97        consumer: UnsafeCell::new(consumer),
98        closed: AtomicBool::new(false),
99        recv_notify: SingleWaiterNotify::new(),
100        send_notify: SingleWaiterNotify::new(),
101    });
102    
103    let sender = Sender {
104        inner: inner.clone(),
105    };
106    
107    let receiver = Receiver {
108        inner,
109    };
110    
111    (sender, receiver)
112}
113
114/// Shared internal state for SPSC channel
115/// 
116/// Contains both shared state and the ring buffer halves.
117/// Uses UnsafeCell for zero-cost interior mutability of Producer/Consumer.
118/// 
119/// SPSC 通道的共享内部状态
120/// 
121/// 包含共享状态和环形缓冲区的两端。
122/// 使用 UnsafeCell 实现 Producer/Consumer 的零成本内部可变性。
123struct Inner<T, const N: usize = 32> {
124    /// Producer (wrapped in UnsafeCell for zero-cost interior mutability)
125    /// 
126    /// 生产者(用 UnsafeCell 包装以实现零成本内部可变性)
127    producer: UnsafeCell<smallring::Producer<T, N>>,
128    
129    /// Consumer (wrapped in UnsafeCell for zero-cost interior mutability)
130    /// 
131    /// 消费者(用 UnsafeCell 包装以实现零成本内部可变性)
132    consumer: UnsafeCell<smallring::Consumer<T, N>>,
133    
134    /// Channel closed flag
135    /// 
136    /// 通道关闭标志
137    closed: AtomicBool,
138    
139    /// Notifier for receiver waiting (lightweight single-waiter)
140    /// 
141    /// 接收者等待通知器(轻量级单等待者)
142    recv_notify: SingleWaiterNotify,
143    
144    /// Notifier for sender waiting when buffer is full (lightweight single-waiter)
145    /// 
146    /// 发送者等待通知器,当缓冲区满时使用(轻量级单等待者)
147    send_notify: SingleWaiterNotify,
148}
149
150// SAFETY: Inner<T> 可以在线程间安全共享的原因:
151// 1. Sender 和 Receiver 都不实现 Clone,确保单一所有权
152// 2. producer 只被唯一的 Sender 访问,不会有多线程竞争
153// 3. consumer 只被唯一的 Receiver 访问,不会有多线程竞争
154// 4. closed、recv_notify、send_notify 都已经是线程安全的
155// 5. Producer 和 Consumer 内部使用原子操作进行跨线程通信
156unsafe impl<T: Send, const N: usize> Sync for Inner<T, N> {}
157
158/// SPSC channel sender
159/// 
160/// SPSC 通道发送器
161pub struct Sender<T, const N: usize> {
162    inner: Arc<Inner<T, N>>,
163}
164
165/// SPSC channel receiver
166/// 
167/// SPSC 通道接收器
168pub struct Receiver<T, const N: usize> {
169    inner: Arc<Inner<T, N>>,
170}
171
172/// Draining iterator for the SPSC channel
173/// 
174/// SPSC 通道的消费迭代器
175/// 
176/// This iterator removes and returns messages from the channel until it's empty.
177/// 
178/// 此迭代器从通道中移除并返回消息,直到通道为空。
179/// 
180/// # Type Parameters
181/// - `T`: Message type
182/// - `N`: Inline buffer size
183/// 
184/// # 类型参数
185/// - `T`: 消息类型
186/// - `N`: 内联缓冲区大小
187pub struct Drain<'a, T, const N: usize> {
188    receiver: &'a mut Receiver<T, N>,
189}
190
191impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
192    type Item = T;
193    
194    #[inline]
195    fn next(&mut self) -> Option<Self::Item> {
196        self.receiver.try_recv().ok()
197    }
198    
199    #[inline]
200    fn size_hint(&self) -> (usize, Option<usize>) {
201        let len = self.receiver.len();
202        (len, Some(len))
203    }
204}
205
206/// Send error type
207/// 
208/// 发送错误类型
209#[derive(Debug, Clone, Copy, PartialEq, Eq)]
210pub enum SendError<T> {
211    /// Channel is closed
212    /// 
213    /// 通道已关闭
214    Closed(T),
215}
216
217/// Try-receive error type
218/// 
219/// 尝试接收错误类型
220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
221pub enum TryRecvError {
222    /// Channel is empty
223    /// 
224    /// 通道为空
225    Empty,
226    
227    /// Channel is closed
228    /// 
229    /// 通道已关闭
230    Closed,
231}
232
233/// Try-send error type
234/// 
235/// 尝试发送错误类型
236#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237pub enum TrySendError<T> {
238    /// Buffer is full
239    /// 
240    /// 缓冲区已满
241    Full(T),
242    
243    /// Channel is closed
244    /// 
245    /// 通道已关闭
246    Closed(T),
247}
248
249impl<T, const N: usize> Sender<T, N> {
250    /// Send a message to the channel (async, waits if buffer is full)
251    /// 
252    /// # Errors
253    /// Returns `SendError::Closed` if the receiver has been dropped
254    /// 
255    /// 向通道发送消息(异步,如果缓冲区满则等待)
256    /// 
257    /// # 错误
258    /// 如果接收器已被丢弃,返回 `SendError::Closed`
259    pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
260        loop {
261            match self.try_send(value) {
262                Ok(()) => return Ok(()),
263                Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
264                Err(TrySendError::Full(v)) => {
265                    // Store the value to retry
266                    // 存储值以便重试
267                    value = v;
268                    
269                    // Wait for space to become available
270                    // 等待空间可用
271                    self.inner.send_notify.notified().await;
272                    
273                    // Check if channel was closed while waiting
274                    // 检查等待时通道是否已关闭
275                    if self.inner.closed.load(Ordering::Acquire) {
276                        return Err(SendError::Closed(value));
277                    }
278                    
279                    // Retry with the value in next loop iteration
280                    // 在下一次循环迭代中使用该值重试
281                }
282            }
283        }
284    }
285    
286    /// Try to send a message without blocking
287    /// 
288    /// # Errors
289    /// - Returns `TrySendError::Full` if the buffer is full
290    /// - Returns `TrySendError::Closed` if the receiver has been dropped
291    /// 
292    /// 尝试非阻塞地发送消息
293    /// 
294    /// # 错误
295    /// - 如果缓冲区满,返回 `TrySendError::Full`
296    /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
297    pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
298        // Check if channel is closed first
299        // 首先检查通道是否已关闭
300        if self.inner.closed.load(Ordering::Acquire) {
301            return Err(TrySendError::Closed(value));
302        }
303        
304        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
305        // 不会有多个线程同时访问 producer
306        let producer = unsafe { &mut *self.inner.producer.get() };
307        
308        match producer.push(value) {
309            Ok(()) => {
310                // Successfully sent, notify receiver
311                // 成功发送,通知接收者
312                self.inner.recv_notify.notify_one();
313                Ok(())
314            }
315            Err(smallring::PushError::Full(v)) => {
316                Err(TrySendError::Full(v))
317            }
318        }
319    }
320    
321    /// Check if the channel is closed
322    /// 
323    /// 检查通道是否已关闭
324    #[inline]
325    pub fn is_closed(&self) -> bool {
326        self.inner.closed.load(Ordering::Acquire)
327    }
328    
329    /// Get the capacity of the channel
330    /// 
331    /// 获取通道的容量
332    #[inline]
333    pub fn capacity(&self) -> usize {
334        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
335        // capacity 只读取数据,不需要可变访问
336        let producer = unsafe { &*self.inner.producer.get() };
337        producer.capacity()
338    }
339    
340    /// Get the number of messages currently in the channel
341    /// 
342    /// 获取通道中当前的消息数量
343    #[inline]
344    pub fn len(&self) -> usize {
345        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
346        // slots 只读取数据,不需要可变访问
347        let producer = unsafe { &*self.inner.producer.get() };
348        producer.slots()
349    }
350    
351    /// Get the number of free slots in the channel
352    /// 
353    /// 获取通道中的空闲空间数量
354    #[inline]
355    pub fn free_slots(&self) -> usize {
356        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
357        // free_slots 只读取数据,不需要可变访问
358        let producer = unsafe { &*self.inner.producer.get() };
359        producer.free_slots()
360    }
361    
362    /// Check if the channel is full
363    /// 
364    /// 检查通道是否已满
365    #[inline]
366    pub fn is_full(&self) -> bool {
367        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
368        // is_full 只读取数据,不需要可变访问
369        let producer = unsafe { &*self.inner.producer.get() };
370        producer.is_full()
371    }
372}
373
374impl<T: Copy, const N: usize> Sender<T, N> {
375    /// Try to send multiple values from a slice without blocking
376    /// 
377    /// 尝试非阻塞地从切片发送多个值
378    /// 
379    /// This method attempts to send as many elements as possible from the slice.
380    /// It returns the number of elements successfully sent.
381    /// 
382    /// 此方法尝试从切片中发送尽可能多的元素。
383    /// 返回成功发送的元素数量。
384    /// 
385    /// # Parameters
386    /// - `values`: Slice of values to send
387    /// 
388    /// # Returns
389    /// Number of elements successfully sent (0 to values.len())
390    /// 
391    /// # 参数
392    /// - `values`: 要发送的值的切片
393    /// 
394    /// # 返回值
395    /// 成功发送的元素数量(0 到 values.len())
396    pub fn try_send_slice(&self, values: &[T]) -> usize {
397        // Check if channel is closed first
398        // 首先检查通道是否已关闭
399        if self.inner.closed.load(Ordering::Acquire) {
400            return 0;
401        }
402        
403        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
404        // 不会有多个线程同时访问 producer
405        let producer = unsafe { &mut *self.inner.producer.get() };
406        
407        let sent = producer.push_slice(values);
408        
409        if sent > 0 {
410            // Successfully sent some messages, notify receiver
411            // 成功发送一些消息,通知接收者
412            self.inner.recv_notify.notify_one();
413        }
414        
415        sent
416    }
417    
418    /// Send multiple values from a slice (async, waits if buffer is full)
419    /// 
420    /// 从切片发送多个值(异步,如果缓冲区满则等待)
421    /// 
422    /// This method will send all elements from the slice, waiting if necessary
423    /// when the buffer becomes full. Returns the number of elements sent, or
424    /// an error if the channel is closed.
425    /// 
426    /// 此方法将发送切片中的所有元素,必要时在缓冲区满时等待。
427    /// 返回发送的元素数量,如果通道关闭则返回错误。
428    /// 
429    /// # Parameters
430    /// - `values`: Slice of values to send
431    /// 
432    /// # Returns
433    /// - `Ok(usize)`: Number of elements successfully sent
434    /// - `Err(SendError)`: Channel is closed
435    /// 
436    /// # 参数
437    /// - `values`: 要发送的值的切片
438    /// 
439    /// # 返回值
440    /// - `Ok(usize)`: 成功发送的元素数量
441    /// - `Err(SendError)`: 通道已关闭
442    pub async fn send_slice(&self, values: &[T]) -> Result<usize, SendError<()>> {
443        let mut total_sent = 0;
444        
445        while total_sent < values.len() {
446            // Check if channel is closed
447            // 检查通道是否已关闭
448            if self.inner.closed.load(Ordering::Acquire) {
449                return Err(SendError::Closed(()));
450            }
451            
452            let sent = self.try_send_slice(&values[total_sent..]);
453            total_sent += sent;
454            
455            if total_sent < values.len() {
456                // Need to wait for space
457                // 需要等待空间
458                self.inner.send_notify.notified().await;
459                
460                // Check if channel was closed while waiting
461                // 检查等待时通道是否已关闭
462                if self.inner.closed.load(Ordering::Acquire) {
463                    return Err(SendError::Closed(()));
464                }
465            }
466        }
467        
468        Ok(total_sent)
469    }
470}
471
472impl<T, const N: usize> Receiver<T, N> {
473    /// Receive a message from the channel (async, waits if buffer is empty)
474    /// 
475    /// Returns `None` if the channel is closed and empty
476    /// 
477    /// 从通道接收消息(异步,如果缓冲区空则等待)
478    /// 
479    /// 如果通道已关闭且为空,返回 `None`
480    pub async fn recv(&self) -> Option<T> {
481        loop {
482            match self.try_recv() {
483                Ok(value) => return Some(value),
484                Err(TryRecvError::Closed) => return None,
485                Err(TryRecvError::Empty) => {
486                    // Check if channel is closed before waiting
487                    // 等待前检查通道是否已关闭
488                    if self.inner.closed.load(Ordering::Acquire) {
489                        // Double check if there are any remaining items
490                        // 再次检查是否有剩余项
491                        if let Ok(value) = self.try_recv() {
492                            return Some(value);
493                        }
494                        return None;
495                    }
496                    
497                    // Wait for data to become available
498                    // 等待数据可用
499                    self.inner.recv_notify.notified().await;
500                }
501            }
502        }
503    }
504    
505    /// Try to receive a message without blocking
506    /// 
507    /// # Errors
508    /// - Returns `TryRecvError::Empty` if the buffer is empty
509    /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
510    /// 
511    /// 尝试非阻塞地接收消息
512    /// 
513    /// # 错误
514    /// - 如果缓冲区空,返回 `TryRecvError::Empty`
515    /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
516    pub fn try_recv(&self) -> Result<T, TryRecvError> {
517        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
518        // 不会有多个线程同时访问 consumer
519        let consumer = unsafe { &mut *self.inner.consumer.get() };
520        
521        match consumer.pop() {
522            Ok(value) => {
523                // Successfully received, notify sender
524                // 成功接收,通知发送者
525                self.inner.send_notify.notify_one();
526                Ok(value)
527            }
528            Err(smallring::PopError::Empty) => {
529                if self.inner.closed.load(Ordering::Acquire) {
530                    Err(TryRecvError::Closed)
531                } else {
532                    Err(TryRecvError::Empty)
533                }
534            }
535        }
536    }
537    
538    /// Check if the channel is empty
539    /// 
540    /// 检查通道是否为空
541    #[inline]
542    pub fn is_empty(&self) -> bool {
543        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
544        // is_empty 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
545        let consumer = unsafe { &*self.inner.consumer.get() };
546        consumer.is_empty()
547    }
548    
549    /// Get the number of messages currently in the channel
550    /// 
551    /// 获取通道中当前的消息数量
552    #[inline]
553    pub fn len(&self) -> usize {
554        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
555        // slots 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
556        let consumer = unsafe { &*self.inner.consumer.get() };
557        consumer.slots()
558    }
559    
560    /// Get the capacity of the channel
561    /// 
562    /// 获取通道的容量
563    #[inline]
564    pub fn capacity(&self) -> usize {
565        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
566        // capacity 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
567        let consumer = unsafe { &*self.inner.consumer.get() };
568        consumer.buffer().capacity()
569    }
570    
571    /// Peek at the first message without removing it
572    /// 
573    /// 查看第一个消息但不移除它
574    /// 
575    /// # Returns
576    /// `Some(&T)` if there is a message, `None` if the channel is empty
577    /// 
578    /// # 返回值
579    /// 如果有消息则返回 `Some(&T)`,如果通道为空则返回 `None`
580    /// 
581    /// # Safety
582    /// The returned reference is valid only as long as no other operations
583    /// are performed on the Receiver that might modify the buffer.
584    /// 
585    /// # 安全性
586    /// 返回的引用仅在未对 Receiver 执行可能修改缓冲区的其他操作时有效。
587    #[inline]
588    pub fn peek(&self) -> Option<&T> {
589        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
590        // peek 只读取数据,不需要可变访问
591        let consumer = unsafe { &*self.inner.consumer.get() };
592        consumer.peek()
593    }
594    
595    /// Clear all messages from the channel
596    /// 
597    /// 清空通道中的所有消息
598    /// 
599    /// This method pops and drops all messages currently in the channel.
600    /// 
601    /// 此方法弹出并 drop 通道中当前的所有消息。
602    pub fn clear(&mut self) {
603        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
604        // 我们有可变引用,因此可以安全地访问 consumer
605        let consumer = unsafe { &mut *self.inner.consumer.get() };
606        consumer.clear();
607        
608        // Notify sender that space is available
609        // 通知发送者空间可用
610        self.inner.send_notify.notify_one();
611    }
612    
613    /// Create a draining iterator
614    /// 
615    /// 创建一个消费迭代器
616    /// 
617    /// Returns an iterator that removes and returns messages from the channel.
618    /// The iterator will continue until the channel is empty.
619    /// 
620    /// 返回一个从通道中移除并返回消息的迭代器。
621    /// 迭代器将持续运行直到通道为空。
622    /// 
623    /// # Examples
624    /// 
625    /// ```
626    /// use lite_sync::spsc::channel;
627    /// use std::num::NonZeroUsize;
628    /// 
629    /// # #[tokio::main]
630    /// # async fn main() {
631    /// let (tx, mut rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
632    /// tx.try_send(1).unwrap();
633    /// tx.try_send(2).unwrap();
634    /// tx.try_send(3).unwrap();
635    /// 
636    /// let items: Vec<i32> = rx.drain().collect();
637    /// assert_eq!(items, vec![1, 2, 3]);
638    /// assert!(rx.is_empty());
639    /// # }
640    /// ```
641    #[inline]
642    pub fn drain(&mut self) -> Drain<'_, T, N> {
643        Drain { receiver: self }
644    }
645}
646
647impl<T: Copy, const N: usize> Receiver<T, N> {
648    /// Try to receive multiple values into a slice without blocking
649    /// 
650    /// 尝试非阻塞地将多个值接收到切片
651    /// 
652    /// This method attempts to receive as many messages as possible into the provided slice.
653    /// It returns the number of messages successfully received.
654    /// 
655    /// 此方法尝试将尽可能多的消息接收到提供的切片中。
656    /// 返回成功接收的消息数量。
657    /// 
658    /// # Parameters
659    /// - `dest`: Destination slice to receive values into
660    /// 
661    /// # Returns
662    /// Number of messages successfully received (0 to dest.len())
663    /// 
664    /// # 参数
665    /// - `dest`: 用于接收值的目标切片
666    /// 
667    /// # 返回值
668    /// 成功接收的消息数量(0 到 dest.len())
669    pub fn try_recv_slice(&mut self, dest: &mut [T]) -> usize {
670        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
671        // 我们有可变引用,因此可以安全地访问 consumer
672        let consumer = unsafe { &mut *self.inner.consumer.get() };
673        
674        let received = consumer.pop_slice(dest);
675        
676        if received > 0 {
677            // Successfully received some messages, notify sender
678            // 成功接收一些消息,通知发送者
679            self.inner.send_notify.notify_one();
680        }
681        
682        received
683    }
684    
685    /// Receive multiple values into a slice (async, waits if buffer is empty)
686    /// 
687    /// 将多个值接收到切片(异步,如果缓冲区空则等待)
688    /// 
689    /// This method will fill the destination slice as much as possible, waiting if necessary
690    /// when the buffer becomes empty. Returns the number of messages received.
691    /// 
692    /// 此方法将尽可能填充目标切片,必要时在缓冲区空时等待。
693    /// 返回接收的消息数量。
694    /// 
695    /// # Parameters
696    /// - `dest`: Destination slice to receive values into
697    /// 
698    /// # Returns
699    /// Number of messages successfully received (0 to dest.len())
700    /// Returns 0 if the channel is closed and empty
701    /// 
702    /// # 参数
703    /// - `dest`: 用于接收值的目标切片
704    /// 
705    /// # 返回值
706    /// 成功接收的消息数量(0 到 dest.len())
707    /// 如果通道已关闭且为空,返回 0
708    pub async fn recv_slice(&mut self, dest: &mut [T]) -> usize {
709        let mut total_received = 0;
710        
711        while total_received < dest.len() {
712            let received = self.try_recv_slice(&mut dest[total_received..]);
713            total_received += received;
714            
715            if total_received < dest.len() {
716                // Check if channel is closed
717                // 检查通道是否已关闭
718                if self.inner.closed.load(Ordering::Acquire) {
719                    // Channel closed, return what we have
720                    // 通道已关闭,返回我们已有的内容
721                    return total_received;
722                }
723                
724                // Need to wait for data
725                // 需要等待数据
726                self.inner.recv_notify.notified().await;
727                
728                // Check again after waking up
729                // 唤醒后再次检查
730                if self.inner.closed.load(Ordering::Acquire) {
731                    // Try one more time to get remaining messages
732                    // 再尝试一次获取剩余消息
733                    let final_received = self.try_recv_slice(&mut dest[total_received..]);
734                    total_received += final_received;
735                    return total_received;
736                }
737            }
738        }
739        
740        total_received
741    }
742}
743
744impl<T, const N: usize> Drop for Receiver<T, N> {
745    fn drop(&mut self) {
746        // Mark channel as closed when receiver is dropped
747        // 当接收器被丢弃时标记通道为已关闭
748        self.inner.closed.store(true, Ordering::Release);
749        
750        // Notify sender in case it's waiting
751        // 通知发送者以防它正在等待
752        self.inner.send_notify.notify_one();
753    }
754}
755
756impl<T, const N: usize> Drop for Sender<T, N> {
757    fn drop(&mut self) {
758        // Mark channel as closed when sender is dropped
759        // 当发送器被丢弃时标记通道为已关闭
760        self.inner.closed.store(true, Ordering::Release);
761        
762        // Notify receiver in case it's waiting
763        // 通知接收器以防它正在等待
764        self.inner.recv_notify.notify_one();
765    }
766}
767
768#[cfg(test)]
769mod tests {
770    use super::*;
771    
772    #[tokio::test]
773    async fn test_basic_send_recv() {
774        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
775        
776        tx.send(1).await.unwrap();
777        tx.send(2).await.unwrap();
778        tx.send(3).await.unwrap();
779        
780        assert_eq!(rx.recv().await, Some(1));
781        assert_eq!(rx.recv().await, Some(2));
782        assert_eq!(rx.recv().await, Some(3));
783    }
784    
785    #[tokio::test]
786    async fn test_try_send_recv() {
787        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
788        
789        tx.try_send(1).unwrap();
790        tx.try_send(2).unwrap();
791        
792        assert_eq!(rx.try_recv().unwrap(), 1);
793        assert_eq!(rx.try_recv().unwrap(), 2);
794        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
795    }
796    
797    #[tokio::test]
798    async fn test_channel_closed_on_sender_drop() {
799        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
800        
801        tx.send(1).await.unwrap();
802        drop(tx);
803        
804        assert_eq!(rx.recv().await, Some(1));
805        assert_eq!(rx.recv().await, None);
806    }
807    
808    #[tokio::test]
809    async fn test_channel_closed_on_receiver_drop() {
810        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
811        
812        drop(rx);
813        
814        assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
815    }
816    
817    #[tokio::test]
818    async fn test_cross_task_communication() {
819        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
820        
821        let sender_handle = tokio::spawn(async move {
822            for i in 0..10 {
823                tx.send(i).await.unwrap();
824            }
825        });
826        
827        let receiver_handle = tokio::spawn(async move {
828            let mut sum = 0;
829            while let Some(value) = rx.recv().await {
830                sum += value;
831            }
832            sum
833        });
834        
835        sender_handle.await.unwrap();
836        let sum = receiver_handle.await.unwrap();
837        assert_eq!(sum, 45); // 0+1+2+...+9 = 45
838    }
839    
840    #[tokio::test]
841    async fn test_backpressure() {
842        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
843        
844        // Fill the buffer
845        tx.try_send(1).unwrap();
846        tx.try_send(2).unwrap();
847        tx.try_send(3).unwrap();
848        tx.try_send(4).unwrap();
849        
850        // Buffer should be full now
851        assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
852        
853        // This should block and then succeed when we consume
854        let send_handle = tokio::spawn(async move {
855            tx.send(5).await.unwrap();
856            tx.send(6).await.unwrap();
857        });
858        
859        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
860        
861        assert_eq!(rx.recv().await, Some(1));
862        assert_eq!(rx.recv().await, Some(2));
863        assert_eq!(rx.recv().await, Some(3));
864        assert_eq!(rx.recv().await, Some(4));
865        assert_eq!(rx.recv().await, Some(5));
866        assert_eq!(rx.recv().await, Some(6));
867        
868        send_handle.await.unwrap();
869    }
870    
871    #[tokio::test]
872    async fn test_capacity_and_len() {
873        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
874        
875        assert_eq!(rx.capacity(), 8);
876        assert_eq!(rx.len(), 0);
877        assert!(rx.is_empty());
878        
879        tx.try_send(1).unwrap();
880        tx.try_send(2).unwrap();
881        
882        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
883        assert_eq!(rx.len(), 2);
884        assert!(!rx.is_empty());
885    }
886    
887    // ==================== New API Tests ====================
888    
889    #[tokio::test]
890    async fn test_sender_capacity_queries() {
891        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
892        
893        assert_eq!(tx.capacity(), 8);
894        assert_eq!(tx.len(), 0);
895        assert_eq!(tx.free_slots(), 8);
896        assert!(!tx.is_full());
897        
898        tx.try_send(1).unwrap();
899        tx.try_send(2).unwrap();
900        tx.try_send(3).unwrap();
901        
902        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
903        assert_eq!(tx.len(), 3);
904        assert_eq!(tx.free_slots(), 5);
905        assert!(!tx.is_full());
906        
907        // Fill the buffer
908        tx.try_send(4).unwrap();
909        tx.try_send(5).unwrap();
910        tx.try_send(6).unwrap();
911        tx.try_send(7).unwrap();
912        tx.try_send(8).unwrap();
913        
914        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
915        assert_eq!(tx.len(), 8);
916        assert_eq!(tx.free_slots(), 0);
917        assert!(tx.is_full());
918        
919        // Pop one and check again
920        rx.recv().await;
921        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
922        
923        assert_eq!(tx.len(), 7);
924        assert_eq!(tx.free_slots(), 1);
925        assert!(!tx.is_full());
926    }
927    
928    #[tokio::test]
929    async fn test_try_send_slice() {
930        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
931        
932        let data = [1, 2, 3, 4, 5];
933        let sent = tx.try_send_slice(&data);
934        
935        assert_eq!(sent, 5);
936        assert_eq!(rx.len(), 5);
937        
938        for i in 0..5 {
939            assert_eq!(rx.recv().await.unwrap(), data[i]);
940        }
941    }
942    
943    #[tokio::test]
944    async fn test_try_send_slice_partial() {
945        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
946        
947        // Fill with 5 elements, leaving room for 3
948        let initial = [1, 2, 3, 4, 5];
949        tx.try_send_slice(&initial);
950        
951        // Try to send 10 more, should only send 3
952        let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
953        let sent = tx.try_send_slice(&more);
954        
955        assert_eq!(sent, 3);
956        assert_eq!(rx.len(), 8);
957        assert!(tx.is_full());
958        
959        // Verify values
960        for i in 1..=8 {
961            assert_eq!(rx.recv().await.unwrap(), i);
962        }
963    }
964    
965    #[tokio::test]
966    async fn test_send_slice() {
967        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
968        
969        let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
970        let result = tx.send_slice(&data).await;
971        
972        assert_eq!(result.unwrap(), 10);
973        assert_eq!(rx.len(), 10);
974        
975        for i in 0..10 {
976            assert_eq!(rx.recv().await.unwrap(), data[i]);
977        }
978    }
979    
980    #[tokio::test]
981    async fn test_send_slice_with_backpressure() {
982        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
983        
984        let data = [1, 2, 3, 4, 5, 6, 7, 8];
985        
986        let send_handle = tokio::spawn(async move {
987            tx.send_slice(&data).await.unwrap()
988        });
989        
990        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
991        
992        // Consume some messages to make room
993        for i in 1..=4 {
994            assert_eq!(rx.recv().await.unwrap(), i);
995        }
996        
997        let sent = send_handle.await.unwrap();
998        assert_eq!(sent, 8);
999        
1000        // Verify remaining messages
1001        for i in 5..=8 {
1002            assert_eq!(rx.recv().await.unwrap(), i);
1003        }
1004    }
1005    
1006    #[tokio::test]
1007    async fn test_peek() {
1008        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1009        
1010        // Peek empty buffer
1011        assert!(rx.peek().is_none());
1012        
1013        tx.try_send(42).unwrap();
1014        tx.try_send(100).unwrap();
1015        tx.try_send(200).unwrap();
1016        
1017        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1018        
1019        // Peek should return first element without removing it
1020        assert_eq!(rx.peek(), Some(&42));
1021        assert_eq!(rx.peek(), Some(&42)); // Peek again, should be same
1022        assert_eq!(rx.len(), 3); // Length unchanged
1023    }
1024    
1025    #[tokio::test]
1026    async fn test_peek_after_recv() {
1027        let (tx, rx) = channel::<String, 32>(NonZeroUsize::new(8).unwrap());
1028        
1029        tx.try_send("first".to_string()).unwrap();
1030        tx.try_send("second".to_string()).unwrap();
1031        tx.try_send("third".to_string()).unwrap();
1032        
1033        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1034        
1035        assert_eq!(rx.peek(), Some(&"first".to_string()));
1036        rx.recv().await.unwrap();
1037        
1038        assert_eq!(rx.peek(), Some(&"second".to_string()));
1039        rx.recv().await.unwrap();
1040        
1041        assert_eq!(rx.peek(), Some(&"third".to_string()));
1042        rx.recv().await.unwrap();
1043        
1044        assert!(rx.peek().is_none());
1045    }
1046    
1047    #[tokio::test]
1048    async fn test_clear() {
1049        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1050        
1051        for i in 0..10 {
1052            tx.try_send(i).unwrap();
1053        }
1054        
1055        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1056        assert_eq!(rx.len(), 10);
1057        
1058        rx.clear();
1059        
1060        assert_eq!(rx.len(), 0);
1061        assert!(rx.is_empty());
1062    }
1063    
1064    #[tokio::test]
1065    async fn test_clear_with_drop() {
1066        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1067        use std::sync::Arc;
1068        
1069        #[derive(Debug)]
1070        struct DropCounter {
1071            counter: Arc<AtomicUsize>,
1072        }
1073        
1074        impl Drop for DropCounter {
1075            fn drop(&mut self) {
1076                self.counter.fetch_add(1, AtomicOrdering::SeqCst);
1077            }
1078        }
1079        
1080        let counter = Arc::new(AtomicUsize::new(0));
1081        
1082        {
1083            let (tx, mut rx) = channel::<DropCounter, 32>(NonZeroUsize::new(16).unwrap());
1084            
1085            for _ in 0..8 {
1086                tx.try_send(DropCounter { counter: counter.clone() }).unwrap();
1087            }
1088            
1089            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1090            assert_eq!(counter.load(AtomicOrdering::SeqCst), 0);
1091            
1092            rx.clear();
1093            
1094            assert_eq!(counter.load(AtomicOrdering::SeqCst), 8);
1095        }
1096    }
1097    
1098    #[tokio::test]
1099    async fn test_drain() {
1100        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1101        
1102        for i in 0..10 {
1103            tx.try_send(i).unwrap();
1104        }
1105        
1106        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1107        
1108        let collected: Vec<i32> = rx.drain().collect();
1109        
1110        assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1111        assert!(rx.is_empty());
1112    }
1113    
1114    #[tokio::test]
1115    async fn test_drain_empty() {
1116        let (_tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1117        
1118        let collected: Vec<i32> = rx.drain().collect();
1119        
1120        assert!(collected.is_empty());
1121    }
1122    
1123    #[tokio::test]
1124    async fn test_drain_size_hint() {
1125        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1126        
1127        for i in 0..5 {
1128            tx.try_send(i).unwrap();
1129        }
1130        
1131        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1132        
1133        let mut drain = rx.drain();
1134        
1135        assert_eq!(drain.size_hint(), (5, Some(5)));
1136        
1137        drain.next();
1138        assert_eq!(drain.size_hint(), (4, Some(4)));
1139        
1140        drain.next();
1141        assert_eq!(drain.size_hint(), (3, Some(3)));
1142    }
1143    
1144    #[tokio::test]
1145    async fn test_try_recv_slice() {
1146        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1147        
1148        // Send some data
1149        for i in 0..10 {
1150            tx.try_send(i).unwrap();
1151        }
1152        
1153        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1154        
1155        let mut dest = [0u32; 5];
1156        let received = rx.try_recv_slice(&mut dest);
1157        
1158        assert_eq!(received, 5);
1159        assert_eq!(dest, [0, 1, 2, 3, 4]);
1160        assert_eq!(rx.len(), 5);
1161    }
1162    
1163    #[tokio::test]
1164    async fn test_try_recv_slice_partial() {
1165        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1166        
1167        tx.try_send(1).unwrap();
1168        tx.try_send(2).unwrap();
1169        tx.try_send(3).unwrap();
1170        
1171        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1172        
1173        let mut dest = [0u32; 10];
1174        let received = rx.try_recv_slice(&mut dest);
1175        
1176        assert_eq!(received, 3);
1177        assert_eq!(&dest[0..3], &[1, 2, 3]);
1178        assert!(rx.is_empty());
1179    }
1180    
1181    #[tokio::test]
1182    async fn test_recv_slice() {
1183        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1184        
1185        for i in 1..=10 {
1186            tx.try_send(i).unwrap();
1187        }
1188        
1189        let mut dest = [0u32; 10];
1190        let received = rx.recv_slice(&mut dest).await;
1191        
1192        assert_eq!(received, 10);
1193        assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1194        assert!(rx.is_empty());
1195    }
1196    
1197    #[tokio::test]
1198    async fn test_recv_slice_with_wait() {
1199        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1200        
1201        let recv_handle = tokio::spawn(async move {
1202            let mut dest = [0u32; 8];
1203            let received = rx.recv_slice(&mut dest).await;
1204            (received, dest)
1205        });
1206        
1207        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1208        
1209        // Send data gradually
1210        for i in 1..=8 {
1211            tx.send(i).await.unwrap();
1212            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1213        }
1214        
1215        let (received, dest) = recv_handle.await.unwrap();
1216        assert_eq!(received, 8);
1217        assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8]);
1218    }
1219    
1220    #[tokio::test]
1221    async fn test_recv_slice_channel_closed() {
1222        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1223        
1224        tx.try_send(1).unwrap();
1225        tx.try_send(2).unwrap();
1226        tx.try_send(3).unwrap();
1227        
1228        drop(tx); // Close the channel
1229        
1230        let mut dest = [0u32; 10];
1231        let received = rx.recv_slice(&mut dest).await;
1232        
1233        // Should receive the 3 available messages, then stop
1234        assert_eq!(received, 3);
1235        assert_eq!(&dest[0..3], &[1, 2, 3]);
1236    }
1237    
1238    #[tokio::test]
1239    async fn test_combined_new_apis() {
1240        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1241        
1242        // Batch send
1243        let data = [1, 2, 3, 4, 5];
1244        tx.try_send_slice(&data);
1245        
1246        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1247        
1248        assert_eq!(tx.len(), 5);
1249        assert_eq!(rx.len(), 5);
1250        assert_eq!(rx.capacity(), 16);
1251        
1252        // Peek
1253        assert_eq!(rx.peek(), Some(&1));
1254        
1255        // Batch receive
1256        let mut dest = [0u32; 3];
1257        rx.try_recv_slice(&mut dest);
1258        assert_eq!(dest, [1, 2, 3]);
1259        
1260        assert_eq!(rx.len(), 2);
1261        assert_eq!(tx.free_slots(), 14);
1262        
1263        // Drain remaining
1264        let remaining: Vec<u32> = rx.drain().collect();
1265        assert_eq!(remaining, vec![4, 5]);
1266        
1267        assert!(rx.is_empty());
1268        assert!(!tx.is_full());
1269    }
1270}