lite_sync/
spsc.rs

1/// High-performance async SPSC (Single Producer Single Consumer) channel
2/// 
3/// Built on top of `smallring` - a high-performance ring buffer with inline storage support.
4/// Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5/// The `N` const generic parameter allows specifying inline buffer size for zero-allocation small channels.
6/// 
7/// 高性能异步 SPSC(单生产者单消费者)通道
8/// 
9/// 基于 `smallring` 构建 - 一个支持内联存储的高性能环形缓冲区。
10/// 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
11/// `N` 常量泛型参数允许指定内联缓冲区大小,实现小容量通道的零分配。
12///
13/// # 安全性说明 (Safety Notes)
14///
15/// 本实现使用 `UnsafeCell` 来提供零成本内部可变性,而不是 `Mutex`。
16/// 这是安全的,基于以下保证:
17///
18/// 1. **单一所有权**:`Sender` 和 `Receiver` 都不实现 `Clone`,确保每个通道只有一个发送者和一个接收者
19/// 2. **访问隔离**:`Producer` 只被唯一的 `Sender` 访问,`Consumer` 只被唯一的 `Receiver` 访问
20/// 3. **无数据竞争**:由于单一所有权,不会有多个线程同时访问同一个 `Producer` 或 `Consumer`
21/// 4. **原子通信**:`Producer` 和 `Consumer` 内部使用原子操作进行跨线程通信
22/// 5. **类型系统保证**:通过类型系统强制 SPSC 语义,防止误用为 MPMC
23///
24/// 这种设计实现了零同步开销,完全消除了 `Mutex` 的性能损失。
25///
26/// # Safety Guarantees
27///
28/// This implementation uses `UnsafeCell` for zero-cost interior mutability instead of `Mutex`.
29/// This is safe based on the following guarantees:
30///
31/// 1. **Single Ownership**: Neither `Sender` nor `Receiver` implements `Clone`, ensuring only one sender and one receiver per channel
32/// 2. **Access Isolation**: `Producer` is only accessed by the unique `Sender`, `Consumer` only by the unique `Receiver`
33/// 3. **No Data Races**: Due to single ownership, there's no concurrent access to the same `Producer` or `Consumer`
34/// 4. **Atomic Communication**: `Producer` and `Consumer` use atomic operations internally for cross-thread communication
35/// 5. **Type System Enforcement**: SPSC semantics are enforced by the type system, preventing misuse as MPMC
36///
37/// This design achieves zero synchronization overhead, completely eliminating `Mutex` performance costs.
38use std::cell::UnsafeCell;
39use std::num::NonZeroUsize;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::sync::Arc;
42use super::notify::SingleWaiterNotify;
43
44/// SPSC channel creation function
45/// 
46/// Creates a bounded SPSC channel with the specified capacity.
47/// 
48/// # Type Parameters
49/// - `T`: The type of messages to be sent through the channel
50/// - `N`: The size of the inline buffer (number of elements stored inline before heap allocation)
51/// 
52/// # Parameters
53/// - `capacity`: Channel capacity (total number of elements the channel can hold)
54/// 
55/// # Returns
56/// A tuple of (Sender, Receiver)
57/// 
58/// # Examples
59/// 
60/// ```
61/// use lite_sync::spsc::channel;
62/// use std::num::NonZeroUsize;
63/// 
64/// #[tokio::main]
65/// async fn main() {
66///     // Create a channel with capacity 32 and inline buffer size 8
67///     let (tx, rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
68///     
69///     tokio::spawn(async move {
70///         tx.send(42).await.unwrap();
71///     });
72///     
73///     let value = rx.recv().await.unwrap();
74///     assert_eq!(value, 42);
75/// }
76/// ```
77/// 
78/// SPSC 通道创建函数
79/// 
80/// 创建指定容量的有界 SPSC 通道。
81/// 
82/// # 类型参数
83/// - `T`: 通过通道发送的消息类型
84/// - `N`: 内联缓冲区大小(在堆分配之前内联存储的元素数量)
85/// 
86/// # 参数
87/// - `capacity`: 通道容量(通道可以容纳的元素总数)
88/// 
89/// # 返回值
90/// 返回 (Sender, Receiver) 元组
91pub fn channel<T, const N: usize>(capacity: NonZeroUsize) -> (Sender<T, N>, Receiver<T, N>) {
92    let (producer, consumer) = smallring::new::<T, N>(capacity);
93    
94    let inner = Arc::new(Inner::<T, N> {
95        producer: UnsafeCell::new(producer),
96        consumer: UnsafeCell::new(consumer),
97        closed: AtomicBool::new(false),
98        recv_notify: SingleWaiterNotify::new(),
99        send_notify: SingleWaiterNotify::new(),
100    });
101    
102    let sender = Sender {
103        inner: inner.clone(),
104    };
105    
106    let receiver = Receiver {
107        inner,
108    };
109    
110    (sender, receiver)
111}
112
113/// Shared internal state for SPSC channel
114/// 
115/// Contains both shared state and the ring buffer halves.
116/// Uses UnsafeCell for zero-cost interior mutability of Producer/Consumer.
117/// 
118/// SPSC 通道的共享内部状态
119/// 
120/// 包含共享状态和环形缓冲区的两端。
121/// 使用 UnsafeCell 实现 Producer/Consumer 的零成本内部可变性。
122struct Inner<T, const N: usize = 32> {
123    /// Producer (wrapped in UnsafeCell for zero-cost interior mutability)
124    /// 
125    /// 生产者(用 UnsafeCell 包装以实现零成本内部可变性)
126    producer: UnsafeCell<smallring::Producer<T, N>>,
127    
128    /// Consumer (wrapped in UnsafeCell for zero-cost interior mutability)
129    /// 
130    /// 消费者(用 UnsafeCell 包装以实现零成本内部可变性)
131    consumer: UnsafeCell<smallring::Consumer<T, N>>,
132    
133    /// Channel closed flag
134    /// 
135    /// 通道关闭标志
136    closed: AtomicBool,
137    
138    /// Notifier for receiver waiting (lightweight single-waiter)
139    /// 
140    /// 接收者等待通知器(轻量级单等待者)
141    recv_notify: SingleWaiterNotify,
142    
143    /// Notifier for sender waiting when buffer is full (lightweight single-waiter)
144    /// 
145    /// 发送者等待通知器,当缓冲区满时使用(轻量级单等待者)
146    send_notify: SingleWaiterNotify,
147}
148
149// SAFETY: Inner<T> 可以在线程间安全共享的原因:
150// 1. Sender 和 Receiver 都不实现 Clone,确保单一所有权
151// 2. producer 只被唯一的 Sender 访问,不会有多线程竞争
152// 3. consumer 只被唯一的 Receiver 访问,不会有多线程竞争
153// 4. closed、recv_notify、send_notify 都已经是线程安全的
154// 5. Producer 和 Consumer 内部使用原子操作进行跨线程通信
155unsafe impl<T: Send, const N: usize> Sync for Inner<T, N> {}
156
157/// SPSC channel sender
158/// 
159/// SPSC 通道发送器
160pub struct Sender<T, const N: usize> {
161    inner: Arc<Inner<T, N>>,
162}
163
164/// SPSC channel receiver
165/// 
166/// SPSC 通道接收器
167pub struct Receiver<T, const N: usize> {
168    inner: Arc<Inner<T, N>>,
169}
170
171/// Draining iterator for the SPSC channel
172/// 
173/// SPSC 通道的消费迭代器
174/// 
175/// This iterator removes and returns messages from the channel until it's empty.
176/// 
177/// 此迭代器从通道中移除并返回消息,直到通道为空。
178/// 
179/// # Type Parameters
180/// - `T`: Message type
181/// - `N`: Inline buffer size
182/// 
183/// # 类型参数
184/// - `T`: 消息类型
185/// - `N`: 内联缓冲区大小
186pub struct Drain<'a, T, const N: usize> {
187    receiver: &'a mut Receiver<T, N>,
188}
189
190impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
191    type Item = T;
192    
193    #[inline]
194    fn next(&mut self) -> Option<Self::Item> {
195        self.receiver.try_recv().ok()
196    }
197    
198    #[inline]
199    fn size_hint(&self) -> (usize, Option<usize>) {
200        let len = self.receiver.len();
201        (len, Some(len))
202    }
203}
204
205/// Send error type
206/// 
207/// 发送错误类型
208#[derive(Debug, Clone, Copy, PartialEq, Eq)]
209pub enum SendError<T> {
210    /// Channel is closed
211    /// 
212    /// 通道已关闭
213    Closed(T),
214}
215
216/// Try-receive error type
217/// 
218/// 尝试接收错误类型
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum TryRecvError {
221    /// Channel is empty
222    /// 
223    /// 通道为空
224    Empty,
225    
226    /// Channel is closed
227    /// 
228    /// 通道已关闭
229    Closed,
230}
231
232/// Try-send error type
233/// 
234/// 尝试发送错误类型
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236pub enum TrySendError<T> {
237    /// Buffer is full
238    /// 
239    /// 缓冲区已满
240    Full(T),
241    
242    /// Channel is closed
243    /// 
244    /// 通道已关闭
245    Closed(T),
246}
247
248impl<T, const N: usize> Sender<T, N> {
249    /// Send a message to the channel (async, waits if buffer is full)
250    /// 
251    /// # Errors
252    /// Returns `SendError::Closed` if the receiver has been dropped
253    /// 
254    /// 向通道发送消息(异步,如果缓冲区满则等待)
255    /// 
256    /// # 错误
257    /// 如果接收器已被丢弃,返回 `SendError::Closed`
258    pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
259        loop {
260            match self.try_send(value) {
261                Ok(()) => return Ok(()),
262                Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
263                Err(TrySendError::Full(v)) => {
264                    // Store the value to retry
265                    // 存储值以便重试
266                    value = v;
267                    
268                    // Wait for space to become available
269                    // 等待空间可用
270                    self.inner.send_notify.notified().await;
271                    
272                    // Check if channel was closed while waiting
273                    // 检查等待时通道是否已关闭
274                    if self.inner.closed.load(Ordering::Acquire) {
275                        return Err(SendError::Closed(value));
276                    }
277                    
278                    // Retry with the value in next loop iteration
279                    // 在下一次循环迭代中使用该值重试
280                }
281            }
282        }
283    }
284    
285    /// Try to send a message without blocking
286    /// 
287    /// # Errors
288    /// - Returns `TrySendError::Full` if the buffer is full
289    /// - Returns `TrySendError::Closed` if the receiver has been dropped
290    /// 
291    /// 尝试非阻塞地发送消息
292    /// 
293    /// # 错误
294    /// - 如果缓冲区满,返回 `TrySendError::Full`
295    /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
296    pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
297        // Check if channel is closed first
298        // 首先检查通道是否已关闭
299        if self.inner.closed.load(Ordering::Acquire) {
300            return Err(TrySendError::Closed(value));
301        }
302        
303        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
304        // 不会有多个线程同时访问 producer
305        let producer = unsafe { &mut *self.inner.producer.get() };
306        
307        match producer.push(value) {
308            Ok(()) => {
309                // Successfully sent, notify receiver
310                // 成功发送,通知接收者
311                self.inner.recv_notify.notify_one();
312                Ok(())
313            }
314            Err(smallring::PushError::Full(v)) => {
315                Err(TrySendError::Full(v))
316            }
317        }
318    }
319    
320    /// Check if the channel is closed
321    /// 
322    /// 检查通道是否已关闭
323    #[inline]
324    pub fn is_closed(&self) -> bool {
325        self.inner.closed.load(Ordering::Acquire)
326    }
327    
328    /// Get the capacity of the channel
329    /// 
330    /// 获取通道的容量
331    #[inline]
332    pub fn capacity(&self) -> usize {
333        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
334        // capacity 只读取数据,不需要可变访问
335        let producer = unsafe { &*self.inner.producer.get() };
336        producer.capacity()
337    }
338    
339    /// Get the number of messages currently in the channel
340    /// 
341    /// 获取通道中当前的消息数量
342    #[inline]
343    pub fn len(&self) -> usize {
344        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
345        // slots 只读取数据,不需要可变访问
346        let producer = unsafe { &*self.inner.producer.get() };
347        producer.slots()
348    }
349
350    #[inline]
351    pub fn is_empty(&self) -> bool {
352        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
353        // is_empty 只读取数据,不需要可变访问
354        let producer = unsafe { &*self.inner.producer.get() };
355        producer.is_empty()
356    }
357    
358    /// Get the number of free slots in the channel
359    /// 
360    /// 获取通道中的空闲空间数量
361    #[inline]
362    pub fn free_slots(&self) -> usize {
363        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
364        // free_slots 只读取数据,不需要可变访问
365        let producer = unsafe { &*self.inner.producer.get() };
366        producer.free_slots()
367    }
368    
369    /// Check if the channel is full
370    /// 
371    /// 检查通道是否已满
372    #[inline]
373    pub fn is_full(&self) -> bool {
374        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
375        // is_full 只读取数据,不需要可变访问
376        let producer = unsafe { &*self.inner.producer.get() };
377        producer.is_full()
378    }
379}
380
381impl<T: Copy, const N: usize> Sender<T, N> {
382    /// Try to send multiple values from a slice without blocking
383    /// 
384    /// 尝试非阻塞地从切片发送多个值
385    /// 
386    /// This method attempts to send as many elements as possible from the slice.
387    /// It returns the number of elements successfully sent.
388    /// 
389    /// 此方法尝试从切片中发送尽可能多的元素。
390    /// 返回成功发送的元素数量。
391    /// 
392    /// # Parameters
393    /// - `values`: Slice of values to send
394    /// 
395    /// # Returns
396    /// Number of elements successfully sent (0 to values.len())
397    /// 
398    /// # 参数
399    /// - `values`: 要发送的值的切片
400    /// 
401    /// # 返回值
402    /// 成功发送的元素数量(0 到 values.len())
403    pub fn try_send_slice(&self, values: &[T]) -> usize {
404        // Check if channel is closed first
405        // 首先检查通道是否已关闭
406        if self.inner.closed.load(Ordering::Acquire) {
407            return 0;
408        }
409        
410        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
411        // 不会有多个线程同时访问 producer
412        let producer = unsafe { &mut *self.inner.producer.get() };
413        
414        let sent = producer.push_slice(values);
415        
416        if sent > 0 {
417            // Successfully sent some messages, notify receiver
418            // 成功发送一些消息,通知接收者
419            self.inner.recv_notify.notify_one();
420        }
421        
422        sent
423    }
424    
425    /// Send multiple values from a slice (async, waits if buffer is full)
426    /// 
427    /// 从切片发送多个值(异步,如果缓冲区满则等待)
428    /// 
429    /// This method will send all elements from the slice, waiting if necessary
430    /// when the buffer becomes full. Returns the number of elements sent, or
431    /// an error if the channel is closed.
432    /// 
433    /// 此方法将发送切片中的所有元素,必要时在缓冲区满时等待。
434    /// 返回发送的元素数量,如果通道关闭则返回错误。
435    /// 
436    /// # Parameters
437    /// - `values`: Slice of values to send
438    /// 
439    /// # Returns
440    /// - `Ok(usize)`: Number of elements successfully sent
441    /// - `Err(SendError)`: Channel is closed
442    /// 
443    /// # 参数
444    /// - `values`: 要发送的值的切片
445    /// 
446    /// # 返回值
447    /// - `Ok(usize)`: 成功发送的元素数量
448    /// - `Err(SendError)`: 通道已关闭
449    pub async fn send_slice(&self, values: &[T]) -> Result<usize, SendError<()>> {
450        let mut total_sent = 0;
451        
452        while total_sent < values.len() {
453            // Check if channel is closed
454            // 检查通道是否已关闭
455            if self.inner.closed.load(Ordering::Acquire) {
456                return Err(SendError::Closed(()));
457            }
458            
459            let sent = self.try_send_slice(&values[total_sent..]);
460            total_sent += sent;
461            
462            if total_sent < values.len() {
463                // Need to wait for space
464                // 需要等待空间
465                self.inner.send_notify.notified().await;
466                
467                // Check if channel was closed while waiting
468                // 检查等待时通道是否已关闭
469                if self.inner.closed.load(Ordering::Acquire) {
470                    return Err(SendError::Closed(()));
471                }
472            }
473        }
474        
475        Ok(total_sent)
476    }
477}
478
479impl<T, const N: usize> Receiver<T, N> {
480    /// Receive a message from the channel (async, waits if buffer is empty)
481    /// 
482    /// Returns `None` if the channel is closed and empty
483    /// 
484    /// 从通道接收消息(异步,如果缓冲区空则等待)
485    /// 
486    /// 如果通道已关闭且为空,返回 `None`
487    pub async fn recv(&self) -> Option<T> {
488        loop {
489            match self.try_recv() {
490                Ok(value) => return Some(value),
491                Err(TryRecvError::Closed) => return None,
492                Err(TryRecvError::Empty) => {
493                    // Check if channel is closed before waiting
494                    // 等待前检查通道是否已关闭
495                    if self.inner.closed.load(Ordering::Acquire) {
496                        // Double check if there are any remaining items
497                        // 再次检查是否有剩余项
498                        if let Ok(value) = self.try_recv() {
499                            return Some(value);
500                        }
501                        return None;
502                    }
503                    
504                    // Wait for data to become available
505                    // 等待数据可用
506                    self.inner.recv_notify.notified().await;
507                }
508            }
509        }
510    }
511    
512    /// Try to receive a message without blocking
513    /// 
514    /// # Errors
515    /// - Returns `TryRecvError::Empty` if the buffer is empty
516    /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
517    /// 
518    /// 尝试非阻塞地接收消息
519    /// 
520    /// # 错误
521    /// - 如果缓冲区空,返回 `TryRecvError::Empty`
522    /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
523    pub fn try_recv(&self) -> Result<T, TryRecvError> {
524        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
525        // 不会有多个线程同时访问 consumer
526        let consumer = unsafe { &mut *self.inner.consumer.get() };
527        
528        match consumer.pop() {
529            Ok(value) => {
530                // Successfully received, notify sender
531                // 成功接收,通知发送者
532                self.inner.send_notify.notify_one();
533                Ok(value)
534            }
535            Err(smallring::PopError::Empty) => {
536                if self.inner.closed.load(Ordering::Acquire) {
537                    Err(TryRecvError::Closed)
538                } else {
539                    Err(TryRecvError::Empty)
540                }
541            }
542        }
543    }
544    
545    /// Check if the channel is empty
546    /// 
547    /// 检查通道是否为空
548    #[inline]
549    pub fn is_empty(&self) -> bool {
550        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
551        // is_empty 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
552        let consumer = unsafe { &*self.inner.consumer.get() };
553        consumer.is_empty()
554    }
555    
556    /// Get the number of messages currently in the channel
557    /// 
558    /// 获取通道中当前的消息数量
559    #[inline]
560    pub fn len(&self) -> usize {
561        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
562        // slots 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
563        let consumer = unsafe { &*self.inner.consumer.get() };
564        consumer.slots()
565    }
566    
567    /// Get the capacity of the channel
568    /// 
569    /// 获取通道的容量
570    #[inline]
571    pub fn capacity(&self) -> usize {
572        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
573        // capacity 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
574        let consumer = unsafe { &*self.inner.consumer.get() };
575        consumer.buffer().capacity()
576    }
577    
578    /// Peek at the first message without removing it
579    /// 
580    /// 查看第一个消息但不移除它
581    /// 
582    /// # Returns
583    /// `Some(&T)` if there is a message, `None` if the channel is empty
584    /// 
585    /// # 返回值
586    /// 如果有消息则返回 `Some(&T)`,如果通道为空则返回 `None`
587    /// 
588    /// # Safety
589    /// The returned reference is valid only as long as no other operations
590    /// are performed on the Receiver that might modify the buffer.
591    /// 
592    /// # 安全性
593    /// 返回的引用仅在未对 Receiver 执行可能修改缓冲区的其他操作时有效。
594    #[inline]
595    pub fn peek(&self) -> Option<&T> {
596        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
597        // peek 只读取数据,不需要可变访问
598        let consumer = unsafe { &*self.inner.consumer.get() };
599        consumer.peek()
600    }
601    
602    /// Clear all messages from the channel
603    /// 
604    /// 清空通道中的所有消息
605    /// 
606    /// This method pops and drops all messages currently in the channel.
607    /// 
608    /// 此方法弹出并 drop 通道中当前的所有消息。
609    pub fn clear(&mut self) {
610        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
611        // 我们有可变引用,因此可以安全地访问 consumer
612        let consumer = unsafe { &mut *self.inner.consumer.get() };
613        consumer.clear();
614        
615        // Notify sender that space is available
616        // 通知发送者空间可用
617        self.inner.send_notify.notify_one();
618    }
619    
620    /// Create a draining iterator
621    /// 
622    /// 创建一个消费迭代器
623    /// 
624    /// Returns an iterator that removes and returns messages from the channel.
625    /// The iterator will continue until the channel is empty.
626    /// 
627    /// 返回一个从通道中移除并返回消息的迭代器。
628    /// 迭代器将持续运行直到通道为空。
629    /// 
630    /// # Examples
631    /// 
632    /// ```
633    /// use lite_sync::spsc::channel;
634    /// use std::num::NonZeroUsize;
635    /// 
636    /// # #[tokio::main]
637    /// # async fn main() {
638    /// let (tx, mut rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
639    /// tx.try_send(1).unwrap();
640    /// tx.try_send(2).unwrap();
641    /// tx.try_send(3).unwrap();
642    /// 
643    /// let items: Vec<i32> = rx.drain().collect();
644    /// assert_eq!(items, vec![1, 2, 3]);
645    /// assert!(rx.is_empty());
646    /// # }
647    /// ```
648    #[inline]
649    pub fn drain(&mut self) -> Drain<'_, T, N> {
650        Drain { receiver: self }
651    }
652}
653
654impl<T: Copy, const N: usize> Receiver<T, N> {
655    /// Try to receive multiple values into a slice without blocking
656    /// 
657    /// 尝试非阻塞地将多个值接收到切片
658    /// 
659    /// This method attempts to receive as many messages as possible into the provided slice.
660    /// It returns the number of messages successfully received.
661    /// 
662    /// 此方法尝试将尽可能多的消息接收到提供的切片中。
663    /// 返回成功接收的消息数量。
664    /// 
665    /// # Parameters
666    /// - `dest`: Destination slice to receive values into
667    /// 
668    /// # Returns
669    /// Number of messages successfully received (0 to dest.len())
670    /// 
671    /// # 参数
672    /// - `dest`: 用于接收值的目标切片
673    /// 
674    /// # 返回值
675    /// 成功接收的消息数量(0 到 dest.len())
676    pub fn try_recv_slice(&mut self, dest: &mut [T]) -> usize {
677        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
678        // 我们有可变引用,因此可以安全地访问 consumer
679        let consumer = unsafe { &mut *self.inner.consumer.get() };
680        
681        let received = consumer.pop_slice(dest);
682        
683        if received > 0 {
684            // Successfully received some messages, notify sender
685            // 成功接收一些消息,通知发送者
686            self.inner.send_notify.notify_one();
687        }
688        
689        received
690    }
691    
692    /// Receive multiple values into a slice (async, waits if buffer is empty)
693    /// 
694    /// 将多个值接收到切片(异步,如果缓冲区空则等待)
695    /// 
696    /// This method will fill the destination slice as much as possible, waiting if necessary
697    /// when the buffer becomes empty. Returns the number of messages received.
698    /// 
699    /// 此方法将尽可能填充目标切片,必要时在缓冲区空时等待。
700    /// 返回接收的消息数量。
701    /// 
702    /// # Parameters
703    /// - `dest`: Destination slice to receive values into
704    /// 
705    /// # Returns
706    /// Number of messages successfully received (0 to dest.len())
707    /// Returns 0 if the channel is closed and empty
708    /// 
709    /// # 参数
710    /// - `dest`: 用于接收值的目标切片
711    /// 
712    /// # 返回值
713    /// 成功接收的消息数量(0 到 dest.len())
714    /// 如果通道已关闭且为空,返回 0
715    pub async fn recv_slice(&mut self, dest: &mut [T]) -> usize {
716        let mut total_received = 0;
717        
718        while total_received < dest.len() {
719            let received = self.try_recv_slice(&mut dest[total_received..]);
720            total_received += received;
721            
722            if total_received < dest.len() {
723                // Check if channel is closed
724                // 检查通道是否已关闭
725                if self.inner.closed.load(Ordering::Acquire) {
726                    // Channel closed, return what we have
727                    // 通道已关闭,返回我们已有的内容
728                    return total_received;
729                }
730                
731                // Need to wait for data
732                // 需要等待数据
733                self.inner.recv_notify.notified().await;
734                
735                // Check again after waking up
736                // 唤醒后再次检查
737                if self.inner.closed.load(Ordering::Acquire) {
738                    // Try one more time to get remaining messages
739                    // 再尝试一次获取剩余消息
740                    let final_received = self.try_recv_slice(&mut dest[total_received..]);
741                    total_received += final_received;
742                    return total_received;
743                }
744            }
745        }
746        
747        total_received
748    }
749}
750
751impl<T, const N: usize> Drop for Receiver<T, N> {
752    fn drop(&mut self) {
753        // Mark channel as closed when receiver is dropped
754        // 当接收器被丢弃时标记通道为已关闭
755        self.inner.closed.store(true, Ordering::Release);
756        
757        // Notify sender in case it's waiting
758        // 通知发送者以防它正在等待
759        self.inner.send_notify.notify_one();
760    }
761}
762
763impl<T, const N: usize> Drop for Sender<T, N> {
764    fn drop(&mut self) {
765        // Mark channel as closed when sender is dropped
766        // 当发送器被丢弃时标记通道为已关闭
767        self.inner.closed.store(true, Ordering::Release);
768        
769        // Notify receiver in case it's waiting
770        // 通知接收器以防它正在等待
771        self.inner.recv_notify.notify_one();
772    }
773}
774
775#[cfg(test)]
776mod tests {
777    use super::*;
778    
779    #[tokio::test]
780    async fn test_basic_send_recv() {
781        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
782        
783        tx.send(1).await.unwrap();
784        tx.send(2).await.unwrap();
785        tx.send(3).await.unwrap();
786        
787        assert_eq!(rx.recv().await, Some(1));
788        assert_eq!(rx.recv().await, Some(2));
789        assert_eq!(rx.recv().await, Some(3));
790    }
791    
792    #[tokio::test]
793    async fn test_try_send_recv() {
794        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
795        
796        tx.try_send(1).unwrap();
797        tx.try_send(2).unwrap();
798        
799        assert_eq!(rx.try_recv().unwrap(), 1);
800        assert_eq!(rx.try_recv().unwrap(), 2);
801        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
802    }
803    
804    #[tokio::test]
805    async fn test_channel_closed_on_sender_drop() {
806        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
807        
808        tx.send(1).await.unwrap();
809        drop(tx);
810        
811        assert_eq!(rx.recv().await, Some(1));
812        assert_eq!(rx.recv().await, None);
813    }
814    
815    #[tokio::test]
816    async fn test_channel_closed_on_receiver_drop() {
817        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
818        
819        drop(rx);
820        
821        assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
822    }
823    
824    #[tokio::test]
825    async fn test_cross_task_communication() {
826        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
827        
828        let sender_handle = tokio::spawn(async move {
829            for i in 0..10 {
830                tx.send(i).await.unwrap();
831            }
832        });
833        
834        let receiver_handle = tokio::spawn(async move {
835            let mut sum = 0;
836            while let Some(value) = rx.recv().await {
837                sum += value;
838            }
839            sum
840        });
841        
842        sender_handle.await.unwrap();
843        let sum = receiver_handle.await.unwrap();
844        assert_eq!(sum, 45); // 0+1+2+...+9 = 45
845    }
846    
847    #[tokio::test]
848    async fn test_backpressure() {
849        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
850        
851        // Fill the buffer
852        tx.try_send(1).unwrap();
853        tx.try_send(2).unwrap();
854        tx.try_send(3).unwrap();
855        tx.try_send(4).unwrap();
856        
857        // Buffer should be full now
858        assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
859        
860        // This should block and then succeed when we consume
861        let send_handle = tokio::spawn(async move {
862            tx.send(5).await.unwrap();
863            tx.send(6).await.unwrap();
864        });
865        
866        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
867        
868        assert_eq!(rx.recv().await, Some(1));
869        assert_eq!(rx.recv().await, Some(2));
870        assert_eq!(rx.recv().await, Some(3));
871        assert_eq!(rx.recv().await, Some(4));
872        assert_eq!(rx.recv().await, Some(5));
873        assert_eq!(rx.recv().await, Some(6));
874        
875        send_handle.await.unwrap();
876    }
877    
878    #[tokio::test]
879    async fn test_capacity_and_len() {
880        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
881        
882        assert_eq!(rx.capacity(), 8);
883        assert_eq!(rx.len(), 0);
884        assert!(rx.is_empty());
885        
886        tx.try_send(1).unwrap();
887        tx.try_send(2).unwrap();
888        
889        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
890        assert_eq!(rx.len(), 2);
891        assert!(!rx.is_empty());
892    }
893    
894    // ==================== New API Tests ====================
895    
896    #[tokio::test]
897    async fn test_sender_capacity_queries() {
898        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
899        
900        assert_eq!(tx.capacity(), 8);
901        assert_eq!(tx.len(), 0);
902        assert_eq!(tx.free_slots(), 8);
903        assert!(!tx.is_full());
904        
905        tx.try_send(1).unwrap();
906        tx.try_send(2).unwrap();
907        tx.try_send(3).unwrap();
908        
909        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
910        assert_eq!(tx.len(), 3);
911        assert_eq!(tx.free_slots(), 5);
912        assert!(!tx.is_full());
913        
914        // Fill the buffer
915        tx.try_send(4).unwrap();
916        tx.try_send(5).unwrap();
917        tx.try_send(6).unwrap();
918        tx.try_send(7).unwrap();
919        tx.try_send(8).unwrap();
920        
921        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
922        assert_eq!(tx.len(), 8);
923        assert_eq!(tx.free_slots(), 0);
924        assert!(tx.is_full());
925        
926        // Pop one and check again
927        rx.recv().await;
928        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
929        
930        assert_eq!(tx.len(), 7);
931        assert_eq!(tx.free_slots(), 1);
932        assert!(!tx.is_full());
933    }
934    
935    #[tokio::test]
936    async fn test_try_send_slice() {
937        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
938        
939        let data = [1, 2, 3, 4, 5];
940        let sent = tx.try_send_slice(&data);
941        
942        assert_eq!(sent, 5);
943        assert_eq!(rx.len(), 5);
944        
945        for i in 0..5 {
946            assert_eq!(rx.recv().await.unwrap(), data[i]);
947        }
948    }
949    
950    #[tokio::test]
951    async fn test_try_send_slice_partial() {
952        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
953        
954        // Fill with 5 elements, leaving room for 3
955        let initial = [1, 2, 3, 4, 5];
956        tx.try_send_slice(&initial);
957        
958        // Try to send 10 more, should only send 3
959        let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
960        let sent = tx.try_send_slice(&more);
961        
962        assert_eq!(sent, 3);
963        assert_eq!(rx.len(), 8);
964        assert!(tx.is_full());
965        
966        // Verify values
967        for i in 1..=8 {
968            assert_eq!(rx.recv().await.unwrap(), i);
969        }
970    }
971    
972    #[tokio::test]
973    async fn test_send_slice() {
974        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
975        
976        let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
977        let result = tx.send_slice(&data).await;
978        
979        assert_eq!(result.unwrap(), 10);
980        assert_eq!(rx.len(), 10);
981        
982        for i in 0..10 {
983            assert_eq!(rx.recv().await.unwrap(), data[i]);
984        }
985    }
986    
987    #[tokio::test]
988    async fn test_send_slice_with_backpressure() {
989        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
990        
991        let data = [1, 2, 3, 4, 5, 6, 7, 8];
992        
993        let send_handle = tokio::spawn(async move {
994            tx.send_slice(&data).await.unwrap()
995        });
996        
997        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
998        
999        // Consume some messages to make room
1000        for i in 1..=4 {
1001            assert_eq!(rx.recv().await.unwrap(), i);
1002        }
1003        
1004        let sent = send_handle.await.unwrap();
1005        assert_eq!(sent, 8);
1006        
1007        // Verify remaining messages
1008        for i in 5..=8 {
1009            assert_eq!(rx.recv().await.unwrap(), i);
1010        }
1011    }
1012    
1013    #[tokio::test]
1014    async fn test_peek() {
1015        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1016        
1017        // Peek empty buffer
1018        assert!(rx.peek().is_none());
1019        
1020        tx.try_send(42).unwrap();
1021        tx.try_send(100).unwrap();
1022        tx.try_send(200).unwrap();
1023        
1024        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1025        
1026        // Peek should return first element without removing it
1027        assert_eq!(rx.peek(), Some(&42));
1028        assert_eq!(rx.peek(), Some(&42)); // Peek again, should be same
1029        assert_eq!(rx.len(), 3); // Length unchanged
1030    }
1031    
1032    #[tokio::test]
1033    async fn test_peek_after_recv() {
1034        let (tx, rx) = channel::<String, 32>(NonZeroUsize::new(8).unwrap());
1035        
1036        tx.try_send("first".to_string()).unwrap();
1037        tx.try_send("second".to_string()).unwrap();
1038        tx.try_send("third".to_string()).unwrap();
1039        
1040        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1041        
1042        assert_eq!(rx.peek(), Some(&"first".to_string()));
1043        rx.recv().await.unwrap();
1044        
1045        assert_eq!(rx.peek(), Some(&"second".to_string()));
1046        rx.recv().await.unwrap();
1047        
1048        assert_eq!(rx.peek(), Some(&"third".to_string()));
1049        rx.recv().await.unwrap();
1050        
1051        assert!(rx.peek().is_none());
1052    }
1053    
1054    #[tokio::test]
1055    async fn test_clear() {
1056        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1057        
1058        for i in 0..10 {
1059            tx.try_send(i).unwrap();
1060        }
1061        
1062        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1063        assert_eq!(rx.len(), 10);
1064        
1065        rx.clear();
1066        
1067        assert_eq!(rx.len(), 0);
1068        assert!(rx.is_empty());
1069    }
1070    
1071    #[tokio::test]
1072    async fn test_clear_with_drop() {
1073        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1074        use std::sync::Arc;
1075        
1076        #[derive(Debug)]
1077        struct DropCounter {
1078            counter: Arc<AtomicUsize>,
1079        }
1080        
1081        impl Drop for DropCounter {
1082            fn drop(&mut self) {
1083                self.counter.fetch_add(1, AtomicOrdering::SeqCst);
1084            }
1085        }
1086        
1087        let counter = Arc::new(AtomicUsize::new(0));
1088        
1089        {
1090            let (tx, mut rx) = channel::<DropCounter, 32>(NonZeroUsize::new(16).unwrap());
1091            
1092            for _ in 0..8 {
1093                tx.try_send(DropCounter { counter: counter.clone() }).unwrap();
1094            }
1095            
1096            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1097            assert_eq!(counter.load(AtomicOrdering::SeqCst), 0);
1098            
1099            rx.clear();
1100            
1101            assert_eq!(counter.load(AtomicOrdering::SeqCst), 8);
1102        }
1103    }
1104    
1105    #[tokio::test]
1106    async fn test_drain() {
1107        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1108        
1109        for i in 0..10 {
1110            tx.try_send(i).unwrap();
1111        }
1112        
1113        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1114        
1115        let collected: Vec<i32> = rx.drain().collect();
1116        
1117        assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1118        assert!(rx.is_empty());
1119    }
1120    
1121    #[tokio::test]
1122    async fn test_drain_empty() {
1123        let (_tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1124        
1125        let collected: Vec<i32> = rx.drain().collect();
1126        
1127        assert!(collected.is_empty());
1128    }
1129    
1130    #[tokio::test]
1131    async fn test_drain_size_hint() {
1132        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1133        
1134        for i in 0..5 {
1135            tx.try_send(i).unwrap();
1136        }
1137        
1138        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1139        
1140        let mut drain = rx.drain();
1141        
1142        assert_eq!(drain.size_hint(), (5, Some(5)));
1143        
1144        drain.next();
1145        assert_eq!(drain.size_hint(), (4, Some(4)));
1146        
1147        drain.next();
1148        assert_eq!(drain.size_hint(), (3, Some(3)));
1149    }
1150    
1151    #[tokio::test]
1152    async fn test_try_recv_slice() {
1153        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1154        
1155        // Send some data
1156        for i in 0..10 {
1157            tx.try_send(i).unwrap();
1158        }
1159        
1160        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1161        
1162        let mut dest = [0u32; 5];
1163        let received = rx.try_recv_slice(&mut dest);
1164        
1165        assert_eq!(received, 5);
1166        assert_eq!(dest, [0, 1, 2, 3, 4]);
1167        assert_eq!(rx.len(), 5);
1168    }
1169    
1170    #[tokio::test]
1171    async fn test_try_recv_slice_partial() {
1172        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1173        
1174        tx.try_send(1).unwrap();
1175        tx.try_send(2).unwrap();
1176        tx.try_send(3).unwrap();
1177        
1178        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1179        
1180        let mut dest = [0u32; 10];
1181        let received = rx.try_recv_slice(&mut dest);
1182        
1183        assert_eq!(received, 3);
1184        assert_eq!(&dest[0..3], &[1, 2, 3]);
1185        assert!(rx.is_empty());
1186    }
1187    
1188    #[tokio::test]
1189    async fn test_recv_slice() {
1190        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1191        
1192        for i in 1..=10 {
1193            tx.try_send(i).unwrap();
1194        }
1195        
1196        let mut dest = [0u32; 10];
1197        let received = rx.recv_slice(&mut dest).await;
1198        
1199        assert_eq!(received, 10);
1200        assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1201        assert!(rx.is_empty());
1202    }
1203    
1204    #[tokio::test]
1205    async fn test_recv_slice_with_wait() {
1206        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1207        
1208        let recv_handle = tokio::spawn(async move {
1209            let mut dest = [0u32; 8];
1210            let received = rx.recv_slice(&mut dest).await;
1211            (received, dest)
1212        });
1213        
1214        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1215        
1216        // Send data gradually
1217        for i in 1..=8 {
1218            tx.send(i).await.unwrap();
1219            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1220        }
1221        
1222        let (received, dest) = recv_handle.await.unwrap();
1223        assert_eq!(received, 8);
1224        assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8]);
1225    }
1226    
1227    #[tokio::test]
1228    async fn test_recv_slice_channel_closed() {
1229        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1230        
1231        tx.try_send(1).unwrap();
1232        tx.try_send(2).unwrap();
1233        tx.try_send(3).unwrap();
1234        
1235        drop(tx); // Close the channel
1236        
1237        let mut dest = [0u32; 10];
1238        let received = rx.recv_slice(&mut dest).await;
1239        
1240        // Should receive the 3 available messages, then stop
1241        assert_eq!(received, 3);
1242        assert_eq!(&dest[0..3], &[1, 2, 3]);
1243    }
1244    
1245    #[tokio::test]
1246    async fn test_combined_new_apis() {
1247        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1248        
1249        // Batch send
1250        let data = [1, 2, 3, 4, 5];
1251        tx.try_send_slice(&data);
1252        
1253        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1254        
1255        assert_eq!(tx.len(), 5);
1256        assert_eq!(rx.len(), 5);
1257        assert_eq!(rx.capacity(), 16);
1258        
1259        // Peek
1260        assert_eq!(rx.peek(), Some(&1));
1261        
1262        // Batch receive
1263        let mut dest = [0u32; 3];
1264        rx.try_recv_slice(&mut dest);
1265        assert_eq!(dest, [1, 2, 3]);
1266        
1267        assert_eq!(rx.len(), 2);
1268        assert_eq!(tx.free_slots(), 14);
1269        
1270        // Drain remaining
1271        let remaining: Vec<u32> = rx.drain().collect();
1272        assert_eq!(remaining, vec![4, 5]);
1273        
1274        assert!(rx.is_empty());
1275        assert!(!tx.is_full());
1276    }
1277}