lite_sync/
spsc.rs

1/// High-performance async SPSC (Single Producer Single Consumer) channel
2/// 
3/// Built on top of `smallring` - a high-performance ring buffer with inline storage support.
4/// Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5/// The `N` const generic parameter allows specifying inline buffer size for zero-allocation small channels.
6/// 
7/// 高性能异步 SPSC(单生产者单消费者)通道
8/// 
9/// 基于 `smallring` 构建 - 一个支持内联存储的高性能环形缓冲区。
10/// 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
11/// `N` 常量泛型参数允许指定内联缓冲区大小,实现小容量通道的零分配。
12///
13/// # 安全性说明 (Safety Notes)
14///
15/// 本实现使用 `UnsafeCell` 来提供零成本内部可变性,而不是 `Mutex`。
16/// 这是安全的,基于以下保证:
17///
18/// 1. **单一所有权**:`Sender` 和 `Receiver` 都不实现 `Clone`,确保每个通道只有一个发送者和一个接收者
19/// 2. **访问隔离**:`Producer` 只被唯一的 `Sender` 访问,`Consumer` 只被唯一的 `Receiver` 访问
20/// 3. **无数据竞争**:由于单一所有权,不会有多个线程同时访问同一个 `Producer` 或 `Consumer`
21/// 4. **原子通信**:`Producer` 和 `Consumer` 内部使用原子操作进行跨线程通信
22/// 5. **类型系统保证**:通过类型系统强制 SPSC 语义,防止误用为 MPMC
23///
24/// 这种设计实现了零同步开销,完全消除了 `Mutex` 的性能损失。
25///
26/// # Safety Guarantees
27///
28/// This implementation uses `UnsafeCell` for zero-cost interior mutability instead of `Mutex`.
29/// This is safe based on the following guarantees:
30///
31/// 1. **Single Ownership**: Neither `Sender` nor `Receiver` implements `Clone`, ensuring only one sender and one receiver per channel
32/// 2. **Access Isolation**: `Producer` is only accessed by the unique `Sender`, `Consumer` only by the unique `Receiver`
33/// 3. **No Data Races**: Due to single ownership, there's no concurrent access to the same `Producer` or `Consumer`
34/// 4. **Atomic Communication**: `Producer` and `Consumer` use atomic operations internally for cross-thread communication
35/// 5. **Type System Enforcement**: SPSC semantics are enforced by the type system, preventing misuse as MPMC
36///
37/// This design achieves zero synchronization overhead, completely eliminating `Mutex` performance costs.
38use std::cell::UnsafeCell;
39use std::num::NonZeroUsize;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::sync::Arc;
42use super::notify::SingleWaiterNotify;
43
44/// SPSC channel creation function
45/// 
46/// Creates a bounded SPSC channel with the specified capacity.
47/// 
48/// # Type Parameters
49/// - `T`: The type of messages to be sent through the channel
50/// - `N`: The size of the inline buffer (number of elements stored inline before heap allocation)
51/// 
52/// # Parameters
53/// - `capacity`: Channel capacity (total number of elements the channel can hold)
54/// 
55/// # Returns
56/// A tuple of (Sender, Receiver)
57/// 
58/// # Examples
59/// 
60/// ```
61/// use lite_sync::spsc::channel;
62/// use std::num::NonZeroUsize;
63/// 
64/// #[tokio::main]
65/// async fn main() {
66///     // Create a channel with capacity 32 and inline buffer size 8
67///     let (tx, rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
68///     
69///     tokio::spawn(async move {
70///         tx.send(42).await.unwrap();
71///     });
72///     
73///     let value = rx.recv().await.unwrap();
74///     assert_eq!(value, 42);
75/// }
76/// ```
77/// 
78/// SPSC 通道创建函数
79/// 
80/// 创建指定容量的有界 SPSC 通道。
81/// 
82/// # 类型参数
83/// - `T`: 通过通道发送的消息类型
84/// - `N`: 内联缓冲区大小(在堆分配之前内联存储的元素数量)
85/// 
86/// # 参数
87/// - `capacity`: 通道容量(通道可以容纳的元素总数)
88/// 
89/// # 返回值
90/// 返回 (Sender, Receiver) 元组
91pub fn channel<T, const N: usize>(capacity: NonZeroUsize) -> (Sender<T, N>, Receiver<T, N>) {
92    let (producer, consumer) = smallring::new::<T, N>(capacity);
93    
94    let inner = Arc::new(Inner::<T, N> {
95        producer: UnsafeCell::new(producer),
96        consumer: UnsafeCell::new(consumer),
97        closed: AtomicBool::new(false),
98        recv_notify: SingleWaiterNotify::new(),
99        send_notify: SingleWaiterNotify::new(),
100    });
101    
102    let sender = Sender {
103        inner: inner.clone(),
104    };
105    
106    let receiver = Receiver {
107        inner,
108    };
109    
110    (sender, receiver)
111}
112
113/// Shared internal state for SPSC channel
114/// 
115/// Contains both shared state and the ring buffer halves.
116/// Uses UnsafeCell for zero-cost interior mutability of Producer/Consumer.
117/// 
118/// SPSC 通道的共享内部状态
119/// 
120/// 包含共享状态和环形缓冲区的两端。
121/// 使用 UnsafeCell 实现 Producer/Consumer 的零成本内部可变性。
122struct Inner<T, const N: usize = 32> {
123    /// Producer (wrapped in UnsafeCell for zero-cost interior mutability)
124    /// 
125    /// 生产者(用 UnsafeCell 包装以实现零成本内部可变性)
126    producer: UnsafeCell<smallring::Producer<T, N>>,
127    
128    /// Consumer (wrapped in UnsafeCell for zero-cost interior mutability)
129    /// 
130    /// 消费者(用 UnsafeCell 包装以实现零成本内部可变性)
131    consumer: UnsafeCell<smallring::Consumer<T, N>>,
132    
133    /// Channel closed flag
134    /// 
135    /// 通道关闭标志
136    closed: AtomicBool,
137    
138    /// Notifier for receiver waiting (lightweight single-waiter)
139    /// 
140    /// 接收者等待通知器(轻量级单等待者)
141    recv_notify: SingleWaiterNotify,
142    
143    /// Notifier for sender waiting when buffer is full (lightweight single-waiter)
144    /// 
145    /// 发送者等待通知器,当缓冲区满时使用(轻量级单等待者)
146    send_notify: SingleWaiterNotify,
147}
148
149// SAFETY: Inner<T> 可以在线程间安全共享的原因:
150// 1. Sender 和 Receiver 都不实现 Clone,确保单一所有权
151// 2. producer 只被唯一的 Sender 访问,不会有多线程竞争
152// 3. consumer 只被唯一的 Receiver 访问,不会有多线程竞争
153// 4. closed、recv_notify、send_notify 都已经是线程安全的
154// 5. Producer 和 Consumer 内部使用原子操作进行跨线程通信
155unsafe impl<T: Send, const N: usize> Sync for Inner<T, N> {}
156
157/// SPSC channel sender
158/// 
159/// SPSC 通道发送器
160pub struct Sender<T, const N: usize> {
161    inner: Arc<Inner<T, N>>,
162}
163
164/// SPSC channel receiver
165/// 
166/// SPSC 通道接收器
167pub struct Receiver<T, const N: usize> {
168    inner: Arc<Inner<T, N>>,
169}
170
171/// Draining iterator for the SPSC channel
172/// 
173/// SPSC 通道的消费迭代器
174/// 
175/// This iterator removes and returns messages from the channel until it's empty.
176/// 
177/// 此迭代器从通道中移除并返回消息,直到通道为空。
178/// 
179/// # Type Parameters
180/// - `T`: Message type
181/// - `N`: Inline buffer size
182/// 
183/// # 类型参数
184/// - `T`: 消息类型
185/// - `N`: 内联缓冲区大小
186pub struct Drain<'a, T, const N: usize> {
187    receiver: &'a mut Receiver<T, N>,
188}
189
190impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
191    type Item = T;
192    
193    #[inline]
194    fn next(&mut self) -> Option<Self::Item> {
195        self.receiver.try_recv().ok()
196    }
197    
198    #[inline]
199    fn size_hint(&self) -> (usize, Option<usize>) {
200        let len = self.receiver.len();
201        (len, Some(len))
202    }
203}
204
205/// Send error type
206/// 
207/// 发送错误类型
208#[derive(Debug, Clone, Copy, PartialEq, Eq)]
209pub enum SendError<T> {
210    /// Channel is closed
211    /// 
212    /// 通道已关闭
213    Closed(T),
214}
215
216/// Try-receive error type
217/// 
218/// 尝试接收错误类型
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum TryRecvError {
221    /// Channel is empty
222    /// 
223    /// 通道为空
224    Empty,
225    
226    /// Channel is closed
227    /// 
228    /// 通道已关闭
229    Closed,
230}
231
232/// Try-send error type
233/// 
234/// 尝试发送错误类型
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236pub enum TrySendError<T> {
237    /// Buffer is full
238    /// 
239    /// 缓冲区已满
240    Full(T),
241    
242    /// Channel is closed
243    /// 
244    /// 通道已关闭
245    Closed(T),
246}
247
248impl<T, const N: usize> Sender<T, N> {
249    /// Send a message to the channel (async, waits if buffer is full)
250    /// 
251    /// # Errors
252    /// Returns `SendError::Closed` if the receiver has been dropped
253    /// 
254    /// 向通道发送消息(异步,如果缓冲区满则等待)
255    /// 
256    /// # 错误
257    /// 如果接收器已被丢弃,返回 `SendError::Closed`
258    pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
259        loop {
260            match self.try_send(value) {
261                Ok(()) => return Ok(()),
262                Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
263                Err(TrySendError::Full(v)) => {
264                    // Store the value to retry
265                    // 存储值以便重试
266                    value = v;
267                    
268                    // Wait for space to become available
269                    // 等待空间可用
270                    self.inner.send_notify.notified().await;
271                    
272                    // Check if channel was closed while waiting
273                    // 检查等待时通道是否已关闭
274                    if self.inner.closed.load(Ordering::Acquire) {
275                        return Err(SendError::Closed(value));
276                    }
277                    
278                    // Retry with the value in next loop iteration
279                    // 在下一次循环迭代中使用该值重试
280                }
281            }
282        }
283    }
284    
285    /// Try to send a message without blocking
286    /// 
287    /// # Errors
288    /// - Returns `TrySendError::Full` if the buffer is full
289    /// - Returns `TrySendError::Closed` if the receiver has been dropped
290    /// 
291    /// 尝试非阻塞地发送消息
292    /// 
293    /// # 错误
294    /// - 如果缓冲区满,返回 `TrySendError::Full`
295    /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
296    pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
297        // Check if channel is closed first
298        // 首先检查通道是否已关闭
299        if self.inner.closed.load(Ordering::Acquire) {
300            return Err(TrySendError::Closed(value));
301        }
302        
303        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
304        // 不会有多个线程同时访问 producer
305        let producer = unsafe { &mut *self.inner.producer.get() };
306        
307        match producer.push(value) {
308            Ok(()) => {
309                // Successfully sent, notify receiver
310                // 成功发送,通知接收者
311                self.inner.recv_notify.notify_one();
312                Ok(())
313            }
314            Err(smallring::PushError::Full(v)) => {
315                Err(TrySendError::Full(v))
316            }
317        }
318    }
319    
320    /// Check if the channel is closed
321    /// 
322    /// 检查通道是否已关闭
323    #[inline]
324    pub fn is_closed(&self) -> bool {
325        self.inner.closed.load(Ordering::Acquire)
326    }
327    
328    /// Get the capacity of the channel
329    /// 
330    /// 获取通道的容量
331    #[inline]
332    pub fn capacity(&self) -> usize {
333        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
334        // capacity 只读取数据,不需要可变访问
335        let producer = unsafe { &*self.inner.producer.get() };
336        producer.capacity()
337    }
338    
339    /// Get the number of messages currently in the channel
340    /// 
341    /// 获取通道中当前的消息数量
342    #[inline]
343    pub fn len(&self) -> usize {
344        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
345        // slots 只读取数据,不需要可变访问
346        let producer = unsafe { &*self.inner.producer.get() };
347        producer.slots()
348    }
349
350    #[inline]
351    pub fn is_empty(&self) -> bool {
352        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
353        // is_empty 只读取数据,不需要可变访问
354        let producer = unsafe { &*self.inner.producer.get() };
355        producer.is_empty()
356    }
357    
358    /// Get the number of free slots in the channel
359    /// 
360    /// 获取通道中的空闲空间数量
361    #[inline]
362    pub fn free_slots(&self) -> usize {
363        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
364        // free_slots 只读取数据,不需要可变访问
365        let producer = unsafe { &*self.inner.producer.get() };
366        producer.free_slots()
367    }
368    
369    /// Check if the channel is full
370    /// 
371    /// 检查通道是否已满
372    #[inline]
373    pub fn is_full(&self) -> bool {
374        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
375        // is_full 只读取数据,不需要可变访问
376        let producer = unsafe { &*self.inner.producer.get() };
377        producer.is_full()
378    }
379}
380
381impl<T: Copy, const N: usize> Sender<T, N> {
382    /// Try to send multiple values from a slice without blocking
383    /// 
384    /// 尝试非阻塞地从切片发送多个值
385    /// 
386    /// This method attempts to send as many elements as possible from the slice.
387    /// It returns the number of elements successfully sent.
388    /// 
389    /// 此方法尝试从切片中发送尽可能多的元素。
390    /// 返回成功发送的元素数量。
391    /// 
392    /// # Parameters
393    /// - `values`: Slice of values to send
394    /// 
395    /// # Returns
396    /// Number of elements successfully sent (0 to values.len())
397    /// 
398    /// # 参数
399    /// - `values`: 要发送的值的切片
400    /// 
401    /// # 返回值
402    /// 成功发送的元素数量(0 到 values.len())
403    pub fn try_send_slice(&self, values: &[T]) -> usize {
404        // Check if channel is closed first
405        // 首先检查通道是否已关闭
406        if self.inner.closed.load(Ordering::Acquire) {
407            return 0;
408        }
409        
410        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
411        // 不会有多个线程同时访问 producer
412        let producer = unsafe { &mut *self.inner.producer.get() };
413        
414        let sent = producer.push_slice(values);
415        
416        if sent > 0 {
417            // Successfully sent some messages, notify receiver
418            // 成功发送一些消息,通知接收者
419            self.inner.recv_notify.notify_one();
420        }
421        
422        sent
423    }
424    
425    /// Send multiple values from a slice (async, waits if buffer is full)
426    /// 
427    /// 从切片发送多个值(异步,如果缓冲区满则等待)
428    /// 
429    /// This method will send all elements from the slice, waiting if necessary
430    /// when the buffer becomes full. Returns the number of elements sent, or
431    /// an error if the channel is closed.
432    /// 
433    /// 此方法将发送切片中的所有元素,必要时在缓冲区满时等待。
434    /// 返回发送的元素数量,如果通道关闭则返回错误。
435    /// 
436    /// # Parameters
437    /// - `values`: Slice of values to send
438    /// 
439    /// # Returns
440    /// - `Ok(usize)`: Number of elements successfully sent
441    /// - `Err(SendError)`: Channel is closed
442    /// 
443    /// # 参数
444    /// - `values`: 要发送的值的切片
445    /// 
446    /// # 返回值
447    /// - `Ok(usize)`: 成功发送的元素数量
448    /// - `Err(SendError)`: 通道已关闭
449    pub async fn send_slice(&self, values: &[T]) -> Result<usize, SendError<()>> {
450        let mut total_sent = 0;
451        
452        while total_sent < values.len() {
453            // Check if channel is closed
454            // 检查通道是否已关闭
455            if self.inner.closed.load(Ordering::Acquire) {
456                return Err(SendError::Closed(()));
457            }
458            
459            let sent = self.try_send_slice(&values[total_sent..]);
460            total_sent += sent;
461            
462            if total_sent < values.len() {
463                // Need to wait for space
464                // 需要等待空间
465                self.inner.send_notify.notified().await;
466                
467                // Check if channel was closed while waiting
468                // 检查等待时通道是否已关闭
469                if self.inner.closed.load(Ordering::Acquire) {
470                    return Err(SendError::Closed(()));
471                }
472            }
473        }
474        
475        Ok(total_sent)
476    }
477}
478
479impl<T, const N: usize> Receiver<T, N> {
480    /// Receive a message from the channel (async, waits if buffer is empty)
481    /// 
482    /// Returns `None` if the channel is closed and empty
483    /// 
484    /// 从通道接收消息(异步,如果缓冲区空则等待)
485    /// 
486    /// 如果通道已关闭且为空,返回 `None`
487    pub async fn recv(&self) -> Option<T> {
488        loop {
489            match self.try_recv() {
490                Ok(value) => return Some(value),
491                Err(TryRecvError::Closed) => return None,
492                Err(TryRecvError::Empty) => {
493                    // Check if channel is closed before waiting
494                    // 等待前检查通道是否已关闭
495                    if self.inner.closed.load(Ordering::Acquire) {
496                        // Double check if there are any remaining items
497                        // 再次检查是否有剩余项
498                        if let Ok(value) = self.try_recv() {
499                            return Some(value);
500                        }
501                        return None;
502                    }
503                    
504                    // Wait for data to become available
505                    // 等待数据可用
506                    self.inner.recv_notify.notified().await;
507                }
508            }
509        }
510    }
511    
512    /// Try to receive a message without blocking
513    /// 
514    /// # Errors
515    /// - Returns `TryRecvError::Empty` if the buffer is empty
516    /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
517    /// 
518    /// 尝试非阻塞地接收消息
519    /// 
520    /// # 错误
521    /// - 如果缓冲区空,返回 `TryRecvError::Empty`
522    /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
523    pub fn try_recv(&self) -> Result<T, TryRecvError> {
524        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
525        // 不会有多个线程同时访问 consumer
526        let consumer = unsafe { &mut *self.inner.consumer.get() };
527        
528        match consumer.pop() {
529            Ok(value) => {
530                // Successfully received, notify sender
531                // 成功接收,通知发送者
532                self.inner.send_notify.notify_one();
533                Ok(value)
534            }
535            Err(smallring::PopError::Empty) => {
536                // Check if channel is closed
537                // 检查通道是否已关闭
538                if self.inner.closed.load(Ordering::Acquire) {
539                    // Double-check for remaining items to avoid race condition
540                    // where sender drops after push but before we check closed flag
541                    // 再次检查是否有剩余项,以避免发送方在 push 后但在我们检查 closed 标志前 drop 的竞态条件
542                    match consumer.pop() {
543                        Ok(value) => {
544                            self.inner.send_notify.notify_one();
545                            Ok(value)
546                        }
547                        Err(smallring::PopError::Empty) => {
548                            Err(TryRecvError::Closed)
549                        }
550                    }
551                } else {
552                    Err(TryRecvError::Empty)
553                }
554            }
555        }
556    }
557    
558    /// Check if the channel is empty
559    /// 
560    /// 检查通道是否为空
561    #[inline]
562    pub fn is_empty(&self) -> bool {
563        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
564        // is_empty 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
565        let consumer = unsafe { &*self.inner.consumer.get() };
566        consumer.is_empty()
567    }
568    
569    /// Get the number of messages currently in the channel
570    /// 
571    /// 获取通道中当前的消息数量
572    #[inline]
573    pub fn len(&self) -> usize {
574        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
575        // slots 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
576        let consumer = unsafe { &*self.inner.consumer.get() };
577        consumer.slots()
578    }
579    
580    /// Get the capacity of the channel
581    /// 
582    /// 获取通道的容量
583    #[inline]
584    pub fn capacity(&self) -> usize {
585        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
586        // capacity 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
587        let consumer = unsafe { &*self.inner.consumer.get() };
588        consumer.buffer().capacity()
589    }
590    
591    /// Peek at the first message without removing it
592    /// 
593    /// 查看第一个消息但不移除它
594    /// 
595    /// # Returns
596    /// `Some(&T)` if there is a message, `None` if the channel is empty
597    /// 
598    /// # 返回值
599    /// 如果有消息则返回 `Some(&T)`,如果通道为空则返回 `None`
600    /// 
601    /// # Safety
602    /// The returned reference is valid only as long as no other operations
603    /// are performed on the Receiver that might modify the buffer.
604    /// 
605    /// # 安全性
606    /// 返回的引用仅在未对 Receiver 执行可能修改缓冲区的其他操作时有效。
607    #[inline]
608    pub fn peek(&self) -> Option<&T> {
609        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
610        // peek 只读取数据,不需要可变访问
611        let consumer = unsafe { &*self.inner.consumer.get() };
612        consumer.peek()
613    }
614    
615    /// Clear all messages from the channel
616    /// 
617    /// 清空通道中的所有消息
618    /// 
619    /// This method pops and drops all messages currently in the channel.
620    /// 
621    /// 此方法弹出并 drop 通道中当前的所有消息。
622    pub fn clear(&mut self) {
623        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
624        // 我们有可变引用,因此可以安全地访问 consumer
625        let consumer = unsafe { &mut *self.inner.consumer.get() };
626        consumer.clear();
627        
628        // Notify sender that space is available
629        // 通知发送者空间可用
630        self.inner.send_notify.notify_one();
631    }
632    
633    /// Create a draining iterator
634    /// 
635    /// 创建一个消费迭代器
636    /// 
637    /// Returns an iterator that removes and returns messages from the channel.
638    /// The iterator will continue until the channel is empty.
639    /// 
640    /// 返回一个从通道中移除并返回消息的迭代器。
641    /// 迭代器将持续运行直到通道为空。
642    /// 
643    /// # Examples
644    /// 
645    /// ```
646    /// use lite_sync::spsc::channel;
647    /// use std::num::NonZeroUsize;
648    /// 
649    /// # #[tokio::main]
650    /// # async fn main() {
651    /// let (tx, mut rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
652    /// tx.try_send(1).unwrap();
653    /// tx.try_send(2).unwrap();
654    /// tx.try_send(3).unwrap();
655    /// 
656    /// let items: Vec<i32> = rx.drain().collect();
657    /// assert_eq!(items, vec![1, 2, 3]);
658    /// assert!(rx.is_empty());
659    /// # }
660    /// ```
661    #[inline]
662    pub fn drain(&mut self) -> Drain<'_, T, N> {
663        Drain { receiver: self }
664    }
665}
666
667impl<T: Copy, const N: usize> Receiver<T, N> {
668    /// Try to receive multiple values into a slice without blocking
669    /// 
670    /// 尝试非阻塞地将多个值接收到切片
671    /// 
672    /// This method attempts to receive as many messages as possible into the provided slice.
673    /// It returns the number of messages successfully received.
674    /// 
675    /// 此方法尝试将尽可能多的消息接收到提供的切片中。
676    /// 返回成功接收的消息数量。
677    /// 
678    /// # Parameters
679    /// - `dest`: Destination slice to receive values into
680    /// 
681    /// # Returns
682    /// Number of messages successfully received (0 to dest.len())
683    /// 
684    /// # 参数
685    /// - `dest`: 用于接收值的目标切片
686    /// 
687    /// # 返回值
688    /// 成功接收的消息数量(0 到 dest.len())
689    pub fn try_recv_slice(&mut self, dest: &mut [T]) -> usize {
690        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
691        // 我们有可变引用,因此可以安全地访问 consumer
692        let consumer = unsafe { &mut *self.inner.consumer.get() };
693        
694        let received = consumer.pop_slice(dest);
695        
696        if received > 0 {
697            // Successfully received some messages, notify sender
698            // 成功接收一些消息,通知发送者
699            self.inner.send_notify.notify_one();
700        }
701        
702        received
703    }
704    
705    /// Receive multiple values into a slice (async, waits if buffer is empty)
706    /// 
707    /// 将多个值接收到切片(异步,如果缓冲区空则等待)
708    /// 
709    /// This method will fill the destination slice as much as possible, waiting if necessary
710    /// when the buffer becomes empty. Returns the number of messages received.
711    /// 
712    /// 此方法将尽可能填充目标切片,必要时在缓冲区空时等待。
713    /// 返回接收的消息数量。
714    /// 
715    /// # Parameters
716    /// - `dest`: Destination slice to receive values into
717    /// 
718    /// # Returns
719    /// Number of messages successfully received (0 to dest.len())
720    /// Returns 0 if the channel is closed and empty
721    /// 
722    /// # 参数
723    /// - `dest`: 用于接收值的目标切片
724    /// 
725    /// # 返回值
726    /// 成功接收的消息数量(0 到 dest.len())
727    /// 如果通道已关闭且为空,返回 0
728    pub async fn recv_slice(&mut self, dest: &mut [T]) -> usize {
729        let mut total_received = 0;
730        
731        while total_received < dest.len() {
732            let received = self.try_recv_slice(&mut dest[total_received..]);
733            total_received += received;
734            
735            if total_received < dest.len() {
736                // Check if channel is closed
737                // 检查通道是否已关闭
738                if self.inner.closed.load(Ordering::Acquire) {
739                    // Channel closed, return what we have
740                    // 通道已关闭,返回我们已有的内容
741                    return total_received;
742                }
743                
744                // Need to wait for data
745                // 需要等待数据
746                self.inner.recv_notify.notified().await;
747                
748                // Check again after waking up
749                // 唤醒后再次检查
750                if self.inner.closed.load(Ordering::Acquire) {
751                    // Try one more time to get remaining messages
752                    // 再尝试一次获取剩余消息
753                    let final_received = self.try_recv_slice(&mut dest[total_received..]);
754                    total_received += final_received;
755                    return total_received;
756                }
757            }
758        }
759        
760        total_received
761    }
762}
763
764impl<T, const N: usize> Drop for Receiver<T, N> {
765    fn drop(&mut self) {
766        // Mark channel as closed when receiver is dropped
767        // 当接收器被丢弃时标记通道为已关闭
768        self.inner.closed.store(true, Ordering::Release);
769        
770        // Notify sender in case it's waiting
771        // 通知发送者以防它正在等待
772        self.inner.send_notify.notify_one();
773    }
774}
775
776impl<T, const N: usize> Drop for Sender<T, N> {
777    fn drop(&mut self) {
778        // Mark channel as closed when sender is dropped
779        // 当发送器被丢弃时标记通道为已关闭
780        self.inner.closed.store(true, Ordering::Release);
781        
782        // Notify receiver in case it's waiting
783        // 通知接收器以防它正在等待
784        self.inner.recv_notify.notify_one();
785    }
786}
787
788#[cfg(test)]
789mod tests {
790    use super::*;
791    
792    #[tokio::test]
793    async fn test_basic_send_recv() {
794        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
795        
796        tx.send(1).await.unwrap();
797        tx.send(2).await.unwrap();
798        tx.send(3).await.unwrap();
799        
800        assert_eq!(rx.recv().await, Some(1));
801        assert_eq!(rx.recv().await, Some(2));
802        assert_eq!(rx.recv().await, Some(3));
803    }
804    
805    #[tokio::test]
806    async fn test_try_send_recv() {
807        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
808        
809        tx.try_send(1).unwrap();
810        tx.try_send(2).unwrap();
811        
812        assert_eq!(rx.try_recv().unwrap(), 1);
813        assert_eq!(rx.try_recv().unwrap(), 2);
814        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
815    }
816    
817    #[tokio::test]
818    async fn test_channel_closed_on_sender_drop() {
819        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
820        
821        tx.send(1).await.unwrap();
822        drop(tx);
823        
824        assert_eq!(rx.recv().await, Some(1));
825        assert_eq!(rx.recv().await, None);
826    }
827    
828    #[tokio::test]
829    async fn test_channel_closed_on_receiver_drop() {
830        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
831        
832        drop(rx);
833        
834        assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
835    }
836    
837    #[tokio::test]
838    async fn test_cross_task_communication() {
839        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
840        
841        let sender_handle = tokio::spawn(async move {
842            for i in 0..10 {
843                tx.send(i).await.unwrap();
844            }
845        });
846        
847        let receiver_handle = tokio::spawn(async move {
848            let mut sum = 0;
849            while let Some(value) = rx.recv().await {
850                sum += value;
851            }
852            sum
853        });
854        
855        sender_handle.await.unwrap();
856        let sum = receiver_handle.await.unwrap();
857        assert_eq!(sum, 45); // 0+1+2+...+9 = 45
858    }
859    
860    #[tokio::test]
861    async fn test_backpressure() {
862        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
863        
864        // Fill the buffer
865        tx.try_send(1).unwrap();
866        tx.try_send(2).unwrap();
867        tx.try_send(3).unwrap();
868        tx.try_send(4).unwrap();
869        
870        // Buffer should be full now
871        assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
872        
873        // This should block and then succeed when we consume
874        let send_handle = tokio::spawn(async move {
875            tx.send(5).await.unwrap();
876            tx.send(6).await.unwrap();
877        });
878        
879        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
880        
881        assert_eq!(rx.recv().await, Some(1));
882        assert_eq!(rx.recv().await, Some(2));
883        assert_eq!(rx.recv().await, Some(3));
884        assert_eq!(rx.recv().await, Some(4));
885        assert_eq!(rx.recv().await, Some(5));
886        assert_eq!(rx.recv().await, Some(6));
887        
888        send_handle.await.unwrap();
889    }
890    
891    #[tokio::test]
892    async fn test_capacity_and_len() {
893        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
894        
895        assert_eq!(rx.capacity(), 8);
896        assert_eq!(rx.len(), 0);
897        assert!(rx.is_empty());
898        
899        tx.try_send(1).unwrap();
900        tx.try_send(2).unwrap();
901        
902        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
903        assert_eq!(rx.len(), 2);
904        assert!(!rx.is_empty());
905    }
906    
907    // ==================== New API Tests ====================
908    
909    #[tokio::test]
910    async fn test_sender_capacity_queries() {
911        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
912        
913        assert_eq!(tx.capacity(), 8);
914        assert_eq!(tx.len(), 0);
915        assert_eq!(tx.free_slots(), 8);
916        assert!(!tx.is_full());
917        
918        tx.try_send(1).unwrap();
919        tx.try_send(2).unwrap();
920        tx.try_send(3).unwrap();
921        
922        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
923        assert_eq!(tx.len(), 3);
924        assert_eq!(tx.free_slots(), 5);
925        assert!(!tx.is_full());
926        
927        // Fill the buffer
928        tx.try_send(4).unwrap();
929        tx.try_send(5).unwrap();
930        tx.try_send(6).unwrap();
931        tx.try_send(7).unwrap();
932        tx.try_send(8).unwrap();
933        
934        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
935        assert_eq!(tx.len(), 8);
936        assert_eq!(tx.free_slots(), 0);
937        assert!(tx.is_full());
938        
939        // Pop one and check again
940        rx.recv().await;
941        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
942        
943        assert_eq!(tx.len(), 7);
944        assert_eq!(tx.free_slots(), 1);
945        assert!(!tx.is_full());
946    }
947    
948    #[tokio::test]
949    async fn test_try_send_slice() {
950        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
951        
952        let data = [1, 2, 3, 4, 5];
953        let sent = tx.try_send_slice(&data);
954        
955        assert_eq!(sent, 5);
956        assert_eq!(rx.len(), 5);
957        
958        for i in 0..5 {
959            assert_eq!(rx.recv().await.unwrap(), data[i]);
960        }
961    }
962    
963    #[tokio::test]
964    async fn test_try_send_slice_partial() {
965        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
966        
967        // Fill with 5 elements, leaving room for 3
968        let initial = [1, 2, 3, 4, 5];
969        tx.try_send_slice(&initial);
970        
971        // Try to send 10 more, should only send 3
972        let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
973        let sent = tx.try_send_slice(&more);
974        
975        assert_eq!(sent, 3);
976        assert_eq!(rx.len(), 8);
977        assert!(tx.is_full());
978        
979        // Verify values
980        for i in 1..=8 {
981            assert_eq!(rx.recv().await.unwrap(), i);
982        }
983    }
984    
985    #[tokio::test]
986    async fn test_send_slice() {
987        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
988        
989        let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
990        let result = tx.send_slice(&data).await;
991        
992        assert_eq!(result.unwrap(), 10);
993        assert_eq!(rx.len(), 10);
994        
995        for i in 0..10 {
996            assert_eq!(rx.recv().await.unwrap(), data[i]);
997        }
998    }
999    
1000    #[tokio::test]
1001    async fn test_send_slice_with_backpressure() {
1002        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1003        
1004        let data = [1, 2, 3, 4, 5, 6, 7, 8];
1005        
1006        let send_handle = tokio::spawn(async move {
1007            tx.send_slice(&data).await.unwrap()
1008        });
1009        
1010        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1011        
1012        // Consume some messages to make room
1013        for i in 1..=4 {
1014            assert_eq!(rx.recv().await.unwrap(), i);
1015        }
1016        
1017        let sent = send_handle.await.unwrap();
1018        assert_eq!(sent, 8);
1019        
1020        // Verify remaining messages
1021        for i in 5..=8 {
1022            assert_eq!(rx.recv().await.unwrap(), i);
1023        }
1024    }
1025    
1026    #[tokio::test]
1027    async fn test_peek() {
1028        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1029        
1030        // Peek empty buffer
1031        assert!(rx.peek().is_none());
1032        
1033        tx.try_send(42).unwrap();
1034        tx.try_send(100).unwrap();
1035        tx.try_send(200).unwrap();
1036        
1037        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1038        
1039        // Peek should return first element without removing it
1040        assert_eq!(rx.peek(), Some(&42));
1041        assert_eq!(rx.peek(), Some(&42)); // Peek again, should be same
1042        assert_eq!(rx.len(), 3); // Length unchanged
1043    }
1044    
1045    #[tokio::test]
1046    async fn test_peek_after_recv() {
1047        let (tx, rx) = channel::<String, 32>(NonZeroUsize::new(8).unwrap());
1048        
1049        tx.try_send("first".to_string()).unwrap();
1050        tx.try_send("second".to_string()).unwrap();
1051        tx.try_send("third".to_string()).unwrap();
1052        
1053        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1054        
1055        assert_eq!(rx.peek(), Some(&"first".to_string()));
1056        rx.recv().await.unwrap();
1057        
1058        assert_eq!(rx.peek(), Some(&"second".to_string()));
1059        rx.recv().await.unwrap();
1060        
1061        assert_eq!(rx.peek(), Some(&"third".to_string()));
1062        rx.recv().await.unwrap();
1063        
1064        assert!(rx.peek().is_none());
1065    }
1066    
1067    #[tokio::test]
1068    async fn test_clear() {
1069        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1070        
1071        for i in 0..10 {
1072            tx.try_send(i).unwrap();
1073        }
1074        
1075        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1076        assert_eq!(rx.len(), 10);
1077        
1078        rx.clear();
1079        
1080        assert_eq!(rx.len(), 0);
1081        assert!(rx.is_empty());
1082    }
1083    
1084    #[tokio::test]
1085    async fn test_clear_with_drop() {
1086        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1087        use std::sync::Arc;
1088        
1089        #[derive(Debug)]
1090        struct DropCounter {
1091            counter: Arc<AtomicUsize>,
1092        }
1093        
1094        impl Drop for DropCounter {
1095            fn drop(&mut self) {
1096                self.counter.fetch_add(1, AtomicOrdering::SeqCst);
1097            }
1098        }
1099        
1100        let counter = Arc::new(AtomicUsize::new(0));
1101        
1102        {
1103            let (tx, mut rx) = channel::<DropCounter, 32>(NonZeroUsize::new(16).unwrap());
1104            
1105            for _ in 0..8 {
1106                tx.try_send(DropCounter { counter: counter.clone() }).unwrap();
1107            }
1108            
1109            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1110            assert_eq!(counter.load(AtomicOrdering::SeqCst), 0);
1111            
1112            rx.clear();
1113            
1114            assert_eq!(counter.load(AtomicOrdering::SeqCst), 8);
1115        }
1116    }
1117    
1118    #[tokio::test]
1119    async fn test_drain() {
1120        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1121        
1122        for i in 0..10 {
1123            tx.try_send(i).unwrap();
1124        }
1125        
1126        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1127        
1128        let collected: Vec<i32> = rx.drain().collect();
1129        
1130        assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1131        assert!(rx.is_empty());
1132    }
1133    
1134    #[tokio::test]
1135    async fn test_drain_empty() {
1136        let (_tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1137        
1138        let collected: Vec<i32> = rx.drain().collect();
1139        
1140        assert!(collected.is_empty());
1141    }
1142    
1143    #[tokio::test]
1144    async fn test_drain_size_hint() {
1145        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1146        
1147        for i in 0..5 {
1148            tx.try_send(i).unwrap();
1149        }
1150        
1151        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1152        
1153        let mut drain = rx.drain();
1154        
1155        assert_eq!(drain.size_hint(), (5, Some(5)));
1156        
1157        drain.next();
1158        assert_eq!(drain.size_hint(), (4, Some(4)));
1159        
1160        drain.next();
1161        assert_eq!(drain.size_hint(), (3, Some(3)));
1162    }
1163    
1164    #[tokio::test]
1165    async fn test_try_recv_slice() {
1166        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1167        
1168        // Send some data
1169        for i in 0..10 {
1170            tx.try_send(i).unwrap();
1171        }
1172        
1173        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1174        
1175        let mut dest = [0u32; 5];
1176        let received = rx.try_recv_slice(&mut dest);
1177        
1178        assert_eq!(received, 5);
1179        assert_eq!(dest, [0, 1, 2, 3, 4]);
1180        assert_eq!(rx.len(), 5);
1181    }
1182    
1183    #[tokio::test]
1184    async fn test_try_recv_slice_partial() {
1185        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1186        
1187        tx.try_send(1).unwrap();
1188        tx.try_send(2).unwrap();
1189        tx.try_send(3).unwrap();
1190        
1191        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1192        
1193        let mut dest = [0u32; 10];
1194        let received = rx.try_recv_slice(&mut dest);
1195        
1196        assert_eq!(received, 3);
1197        assert_eq!(&dest[0..3], &[1, 2, 3]);
1198        assert!(rx.is_empty());
1199    }
1200    
1201    #[tokio::test]
1202    async fn test_recv_slice() {
1203        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1204        
1205        for i in 1..=10 {
1206            tx.try_send(i).unwrap();
1207        }
1208        
1209        let mut dest = [0u32; 10];
1210        let received = rx.recv_slice(&mut dest).await;
1211        
1212        assert_eq!(received, 10);
1213        assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1214        assert!(rx.is_empty());
1215    }
1216    
1217    #[tokio::test]
1218    async fn test_recv_slice_with_wait() {
1219        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1220        
1221        let recv_handle = tokio::spawn(async move {
1222            let mut dest = [0u32; 8];
1223            let received = rx.recv_slice(&mut dest).await;
1224            (received, dest)
1225        });
1226        
1227        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1228        
1229        // Send data gradually
1230        for i in 1..=8 {
1231            tx.send(i).await.unwrap();
1232            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1233        }
1234        
1235        let (received, dest) = recv_handle.await.unwrap();
1236        assert_eq!(received, 8);
1237        assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8]);
1238    }
1239    
1240    #[tokio::test]
1241    async fn test_recv_slice_channel_closed() {
1242        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1243        
1244        tx.try_send(1).unwrap();
1245        tx.try_send(2).unwrap();
1246        tx.try_send(3).unwrap();
1247        
1248        drop(tx); // Close the channel
1249        
1250        let mut dest = [0u32; 10];
1251        let received = rx.recv_slice(&mut dest).await;
1252        
1253        // Should receive the 3 available messages, then stop
1254        assert_eq!(received, 3);
1255        assert_eq!(&dest[0..3], &[1, 2, 3]);
1256    }
1257    
1258    #[tokio::test]
1259    async fn test_combined_new_apis() {
1260        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1261        
1262        // Batch send
1263        let data = [1, 2, 3, 4, 5];
1264        tx.try_send_slice(&data);
1265        
1266        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1267        
1268        assert_eq!(tx.len(), 5);
1269        assert_eq!(rx.len(), 5);
1270        assert_eq!(rx.capacity(), 16);
1271        
1272        // Peek
1273        assert_eq!(rx.peek(), Some(&1));
1274        
1275        // Batch receive
1276        let mut dest = [0u32; 3];
1277        rx.try_recv_slice(&mut dest);
1278        assert_eq!(dest, [1, 2, 3]);
1279        
1280        assert_eq!(rx.len(), 2);
1281        assert_eq!(tx.free_slots(), 14);
1282        
1283        // Drain remaining
1284        let remaining: Vec<u32> = rx.drain().collect();
1285        assert_eq!(remaining, vec![4, 5]);
1286        
1287        assert!(rx.is_empty());
1288        assert!(!tx.is_full());
1289    }
1290}