lite_sync/
spsc.rs

1/// High-performance async SPSC (Single Producer Single Consumer) channel
2/// 
3/// Built on top of `smallring` - a high-performance ring buffer with inline storage support.
4/// Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5/// The `N` const generic parameter allows specifying inline buffer size for zero-allocation small channels.
6/// 
7/// 高性能异步 SPSC(单生产者单消费者)通道
8/// 
9/// 基于 `smallring` 构建 - 一个支持内联存储的高性能环形缓冲区。
10/// 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
11/// `N` 常量泛型参数允许指定内联缓冲区大小,实现小容量通道的零分配。
12///
13/// # 安全性说明 (Safety Notes)
14///
15/// 本实现使用 `UnsafeCell` 来提供零成本内部可变性,而不是 `Mutex`。
16/// 这是安全的,基于以下保证:
17///
18/// 1. **单一所有权**:`Sender` 和 `Receiver` 都不实现 `Clone`,确保每个通道只有一个发送者和一个接收者
19/// 2. **访问隔离**:`Producer` 只被唯一的 `Sender` 访问,`Consumer` 只被唯一的 `Receiver` 访问
20/// 3. **无数据竞争**:由于单一所有权,不会有多个线程同时访问同一个 `Producer` 或 `Consumer`
21/// 4. **原子通信**:`Producer` 和 `Consumer` 内部使用原子操作进行跨线程通信
22/// 5. **类型系统保证**:通过类型系统强制 SPSC 语义,防止误用为 MPMC
23///
24/// 这种设计实现了零同步开销,完全消除了 `Mutex` 的性能损失。
25///
26/// # Safety Guarantees
27///
28/// This implementation uses `UnsafeCell` for zero-cost interior mutability instead of `Mutex`.
29/// This is safe based on the following guarantees:
30///
31/// 1. **Single Ownership**: Neither `Sender` nor `Receiver` implements `Clone`, ensuring only one sender and one receiver per channel
32/// 2. **Access Isolation**: `Producer` is only accessed by the unique `Sender`, `Consumer` only by the unique `Receiver`
33/// 3. **No Data Races**: Due to single ownership, there's no concurrent access to the same `Producer` or `Consumer`
34/// 4. **Atomic Communication**: `Producer` and `Consumer` use atomic operations internally for cross-thread communication
35/// 5. **Type System Enforcement**: SPSC semantics are enforced by the type system, preventing misuse as MPMC
36///
37/// This design achieves zero synchronization overhead, completely eliminating `Mutex` performance costs.
38use std::cell::UnsafeCell;
39use std::num::NonZeroUsize;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::sync::Arc;
42use super::notify::SingleWaiterNotify;
43
44use smallring::spsc::{Producer, Consumer, new, PushError, PopError};
45
46/// SPSC channel creation function
47/// 
48/// Creates a bounded SPSC channel with the specified capacity.
49/// 
50/// # Type Parameters
51/// - `T`: The type of messages to be sent through the channel
52/// - `N`: The size of the inline buffer (number of elements stored inline before heap allocation)
53/// 
54/// # Parameters
55/// - `capacity`: Channel capacity (total number of elements the channel can hold)
56/// 
57/// # Returns
58/// A tuple of (Sender, Receiver)
59/// 
60/// # Examples
61/// 
62/// ```
63/// use lite_sync::spsc::channel;
64/// use std::num::NonZeroUsize;
65/// 
66/// #[tokio::main]
67/// async fn main() {
68///     // Create a channel with capacity 32 and inline buffer size 8
69///     let (tx, rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
70///     
71///     tokio::spawn(async move {
72///         tx.send(42).await.unwrap();
73///     });
74///     
75///     let value = rx.recv().await.unwrap();
76///     assert_eq!(value, 42);
77/// }
78/// ```
79/// 
80/// SPSC 通道创建函数
81/// 
82/// 创建指定容量的有界 SPSC 通道。
83/// 
84/// # 类型参数
85/// - `T`: 通过通道发送的消息类型
86/// - `N`: 内联缓冲区大小(在堆分配之前内联存储的元素数量)
87/// 
88/// # 参数
89/// - `capacity`: 通道容量(通道可以容纳的元素总数)
90/// 
91/// # 返回值
92/// 返回 (Sender, Receiver) 元组
93pub fn channel<T, const N: usize>(capacity: NonZeroUsize) -> (Sender<T, N>, Receiver<T, N>) {
94    let (producer, consumer) = new::<T, N>(capacity);
95    
96    let inner = Arc::new(Inner::<T, N> {
97        producer: UnsafeCell::new(producer),
98        consumer: UnsafeCell::new(consumer),
99        closed: AtomicBool::new(false),
100        recv_notify: SingleWaiterNotify::new(),
101        send_notify: SingleWaiterNotify::new(),
102    });
103    
104    let sender = Sender {
105        inner: inner.clone(),
106    };
107    
108    let receiver = Receiver {
109        inner,
110    };
111    
112    (sender, receiver)
113}
114
115/// Shared internal state for SPSC channel
116/// 
117/// Contains both shared state and the ring buffer halves.
118/// Uses UnsafeCell for zero-cost interior mutability of Producer/Consumer.
119/// 
120/// SPSC 通道的共享内部状态
121/// 
122/// 包含共享状态和环形缓冲区的两端。
123/// 使用 UnsafeCell 实现 Producer/Consumer 的零成本内部可变性。
124struct Inner<T, const N: usize = 32> {
125    /// Producer (wrapped in UnsafeCell for zero-cost interior mutability)
126    /// 
127    /// 生产者(用 UnsafeCell 包装以实现零成本内部可变性)
128    producer: UnsafeCell<Producer<T, N>>,
129    
130    /// Consumer (wrapped in UnsafeCell for zero-cost interior mutability)
131    /// 
132    /// 消费者(用 UnsafeCell 包装以实现零成本内部可变性)
133    consumer: UnsafeCell<Consumer<T, N>>,
134    
135    /// Channel closed flag
136    /// 
137    /// 通道关闭标志
138    closed: AtomicBool,
139    
140    /// Notifier for receiver waiting (lightweight single-waiter)
141    /// 
142    /// 接收者等待通知器(轻量级单等待者)
143    recv_notify: SingleWaiterNotify,
144    
145    /// Notifier for sender waiting when buffer is full (lightweight single-waiter)
146    /// 
147    /// 发送者等待通知器,当缓冲区满时使用(轻量级单等待者)
148    send_notify: SingleWaiterNotify,
149}
150
151// SAFETY: Inner<T> 可以在线程间安全共享的原因:
152// 1. Sender 和 Receiver 都不实现 Clone,确保单一所有权
153// 2. producer 只被唯一的 Sender 访问,不会有多线程竞争
154// 3. consumer 只被唯一的 Receiver 访问,不会有多线程竞争
155// 4. closed、recv_notify、send_notify 都已经是线程安全的
156// 5. Producer 和 Consumer 内部使用原子操作进行跨线程通信
157unsafe impl<T: Send, const N: usize> Sync for Inner<T, N> {}
158
159/// SPSC channel sender
160/// 
161/// SPSC 通道发送器
162pub struct Sender<T, const N: usize> {
163    inner: Arc<Inner<T, N>>,
164}
165
166/// SPSC channel receiver
167/// 
168/// SPSC 通道接收器
169pub struct Receiver<T, const N: usize> {
170    inner: Arc<Inner<T, N>>,
171}
172
173/// Draining iterator for the SPSC channel
174/// 
175/// SPSC 通道的消费迭代器
176/// 
177/// This iterator removes and returns messages from the channel until it's empty.
178/// 
179/// 此迭代器从通道中移除并返回消息,直到通道为空。
180/// 
181/// # Type Parameters
182/// - `T`: Message type
183/// - `N`: Inline buffer size
184/// 
185/// # 类型参数
186/// - `T`: 消息类型
187/// - `N`: 内联缓冲区大小
188pub struct Drain<'a, T, const N: usize> {
189    receiver: &'a mut Receiver<T, N>,
190}
191
192impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
193    type Item = T;
194    
195    #[inline]
196    fn next(&mut self) -> Option<Self::Item> {
197        self.receiver.try_recv().ok()
198    }
199    
200    #[inline]
201    fn size_hint(&self) -> (usize, Option<usize>) {
202        let len = self.receiver.len();
203        (len, Some(len))
204    }
205}
206
207/// Send error type
208/// 
209/// 发送错误类型
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
211pub enum SendError<T> {
212    /// Channel is closed
213    /// 
214    /// 通道已关闭
215    Closed(T),
216}
217
218/// Try-receive error type
219/// 
220/// 尝试接收错误类型
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum TryRecvError {
223    /// Channel is empty
224    /// 
225    /// 通道为空
226    Empty,
227    
228    /// Channel is closed
229    /// 
230    /// 通道已关闭
231    Closed,
232}
233
234/// Try-send error type
235/// 
236/// 尝试发送错误类型
237#[derive(Debug, Clone, Copy, PartialEq, Eq)]
238pub enum TrySendError<T> {
239    /// Buffer is full
240    /// 
241    /// 缓冲区已满
242    Full(T),
243    
244    /// Channel is closed
245    /// 
246    /// 通道已关闭
247    Closed(T),
248}
249
250impl<T, const N: usize> Sender<T, N> {
251    /// Send a message to the channel (async, waits if buffer is full)
252    /// 
253    /// # Errors
254    /// Returns `SendError::Closed` if the receiver has been dropped
255    /// 
256    /// 向通道发送消息(异步,如果缓冲区满则等待)
257    /// 
258    /// # 错误
259    /// 如果接收器已被丢弃,返回 `SendError::Closed`
260    pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
261        loop {
262            match self.try_send(value) {
263                Ok(()) => return Ok(()),
264                Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
265                Err(TrySendError::Full(v)) => {
266                    // Store the value to retry
267                    // 存储值以便重试
268                    value = v;
269                    
270                    // Wait for space to become available
271                    // 等待空间可用
272                    self.inner.send_notify.notified().await;
273                    
274                    // Check if channel was closed while waiting
275                    // 检查等待时通道是否已关闭
276                    if self.inner.closed.load(Ordering::Acquire) {
277                        return Err(SendError::Closed(value));
278                    }
279                    
280                    // Retry with the value in next loop iteration
281                    // 在下一次循环迭代中使用该值重试
282                }
283            }
284        }
285    }
286    
287    /// Try to send a message without blocking
288    /// 
289    /// # Errors
290    /// - Returns `TrySendError::Full` if the buffer is full
291    /// - Returns `TrySendError::Closed` if the receiver has been dropped
292    /// 
293    /// 尝试非阻塞地发送消息
294    /// 
295    /// # 错误
296    /// - 如果缓冲区满,返回 `TrySendError::Full`
297    /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
298    pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
299        // Check if channel is closed first
300        // 首先检查通道是否已关闭
301        if self.inner.closed.load(Ordering::Acquire) {
302            return Err(TrySendError::Closed(value));
303        }
304        
305        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
306        // 不会有多个线程同时访问 producer
307        let producer = unsafe { &mut *self.inner.producer.get() };
308        
309        match producer.push(value) {
310            Ok(()) => {
311                // Successfully sent, notify receiver
312                // 成功发送,通知接收者
313                self.inner.recv_notify.notify_one();
314                Ok(())
315            }
316            Err(PushError::Full(v)) => {
317                Err(TrySendError::Full(v))
318            }
319        }
320    }
321    
322    /// Check if the channel is closed
323    /// 
324    /// 检查通道是否已关闭
325    #[inline]
326    pub fn is_closed(&self) -> bool {
327        self.inner.closed.load(Ordering::Acquire)
328    }
329    
330    /// Get the capacity of the channel
331    /// 
332    /// 获取通道的容量
333    #[inline]
334    pub fn capacity(&self) -> usize {
335        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
336        // capacity 只读取数据,不需要可变访问
337        let producer = unsafe { &*self.inner.producer.get() };
338        producer.capacity()
339    }
340    
341    /// Get the number of messages currently in the channel
342    /// 
343    /// 获取通道中当前的消息数量
344    #[inline]
345    pub fn len(&self) -> usize {
346        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
347        // slots 只读取数据,不需要可变访问
348        let producer = unsafe { &*self.inner.producer.get() };
349        producer.slots()
350    }
351
352    #[inline]
353    pub fn is_empty(&self) -> bool {
354        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
355        // is_empty 只读取数据,不需要可变访问
356        let producer = unsafe { &*self.inner.producer.get() };
357        producer.is_empty()
358    }
359    
360    /// Get the number of free slots in the channel
361    /// 
362    /// 获取通道中的空闲空间数量
363    #[inline]
364    pub fn free_slots(&self) -> usize {
365        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
366        // free_slots 只读取数据,不需要可变访问
367        let producer = unsafe { &*self.inner.producer.get() };
368        producer.free_slots()
369    }
370    
371    /// Check if the channel is full
372    /// 
373    /// 检查通道是否已满
374    #[inline]
375    pub fn is_full(&self) -> bool {
376        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
377        // is_full 只读取数据,不需要可变访问
378        let producer = unsafe { &*self.inner.producer.get() };
379        producer.is_full()
380    }
381}
382
383impl<T: Copy, const N: usize> Sender<T, N> {
384    /// Try to send multiple values from a slice without blocking
385    /// 
386    /// 尝试非阻塞地从切片发送多个值
387    /// 
388    /// This method attempts to send as many elements as possible from the slice.
389    /// It returns the number of elements successfully sent.
390    /// 
391    /// 此方法尝试从切片中发送尽可能多的元素。
392    /// 返回成功发送的元素数量。
393    /// 
394    /// # Parameters
395    /// - `values`: Slice of values to send
396    /// 
397    /// # Returns
398    /// Number of elements successfully sent (0 to values.len())
399    /// 
400    /// # 参数
401    /// - `values`: 要发送的值的切片
402    /// 
403    /// # 返回值
404    /// 成功发送的元素数量(0 到 values.len())
405    pub fn try_send_slice(&self, values: &[T]) -> usize {
406        // Check if channel is closed first
407        // 首先检查通道是否已关闭
408        if self.inner.closed.load(Ordering::Acquire) {
409            return 0;
410        }
411        
412        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
413        // 不会有多个线程同时访问 producer
414        let producer = unsafe { &mut *self.inner.producer.get() };
415        
416        let sent = producer.push_slice(values);
417        
418        if sent > 0 {
419            // Successfully sent some messages, notify receiver
420            // 成功发送一些消息,通知接收者
421            self.inner.recv_notify.notify_one();
422        }
423        
424        sent
425    }
426    
427    /// Send multiple values from a slice (async, waits if buffer is full)
428    /// 
429    /// 从切片发送多个值(异步,如果缓冲区满则等待)
430    /// 
431    /// This method will send all elements from the slice, waiting if necessary
432    /// when the buffer becomes full. Returns the number of elements sent, or
433    /// an error if the channel is closed.
434    /// 
435    /// 此方法将发送切片中的所有元素,必要时在缓冲区满时等待。
436    /// 返回发送的元素数量,如果通道关闭则返回错误。
437    /// 
438    /// # Parameters
439    /// - `values`: Slice of values to send
440    /// 
441    /// # Returns
442    /// - `Ok(usize)`: Number of elements successfully sent
443    /// - `Err(SendError)`: Channel is closed
444    /// 
445    /// # 参数
446    /// - `values`: 要发送的值的切片
447    /// 
448    /// # 返回值
449    /// - `Ok(usize)`: 成功发送的元素数量
450    /// - `Err(SendError)`: 通道已关闭
451    pub async fn send_slice(&self, values: &[T]) -> Result<usize, SendError<()>> {
452        let mut total_sent = 0;
453        
454        while total_sent < values.len() {
455            // Check if channel is closed
456            // 检查通道是否已关闭
457            if self.inner.closed.load(Ordering::Acquire) {
458                return Err(SendError::Closed(()));
459            }
460            
461            let sent = self.try_send_slice(&values[total_sent..]);
462            total_sent += sent;
463            
464            if total_sent < values.len() {
465                // Need to wait for space
466                // 需要等待空间
467                self.inner.send_notify.notified().await;
468                
469                // Check if channel was closed while waiting
470                // 检查等待时通道是否已关闭
471                if self.inner.closed.load(Ordering::Acquire) {
472                    return Err(SendError::Closed(()));
473                }
474            }
475        }
476        
477        Ok(total_sent)
478    }
479}
480
481impl<T, const N: usize> Receiver<T, N> {
482    /// Receive a message from the channel (async, waits if buffer is empty)
483    /// 
484    /// Returns `None` if the channel is closed and empty
485    /// 
486    /// 从通道接收消息(异步,如果缓冲区空则等待)
487    /// 
488    /// 如果通道已关闭且为空,返回 `None`
489    pub async fn recv(&self) -> Option<T> {
490        loop {
491            match self.try_recv() {
492                Ok(value) => return Some(value),
493                Err(TryRecvError::Closed) => return None,
494                Err(TryRecvError::Empty) => {
495                    // Check if channel is closed before waiting
496                    // 等待前检查通道是否已关闭
497                    if self.inner.closed.load(Ordering::Acquire) {
498                        // Double check if there are any remaining items
499                        // 再次检查是否有剩余项
500                        if let Ok(value) = self.try_recv() {
501                            return Some(value);
502                        }
503                        return None;
504                    }
505                    
506                    // Wait for data to become available
507                    // 等待数据可用
508                    self.inner.recv_notify.notified().await;
509                }
510            }
511        }
512    }
513    
514    /// Try to receive a message without blocking
515    /// 
516    /// # Errors
517    /// - Returns `TryRecvError::Empty` if the buffer is empty
518    /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
519    /// 
520    /// 尝试非阻塞地接收消息
521    /// 
522    /// # 错误
523    /// - 如果缓冲区空,返回 `TryRecvError::Empty`
524    /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
525    pub fn try_recv(&self) -> Result<T, TryRecvError> {
526        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
527        // 不会有多个线程同时访问 consumer
528        let consumer = unsafe { &mut *self.inner.consumer.get() };
529        
530        match consumer.pop() {
531            Ok(value) => {
532                // Successfully received, notify sender
533                // 成功接收,通知发送者
534                self.inner.send_notify.notify_one();
535                Ok(value)
536            }
537            Err(PopError::Empty) => {
538                // Check if channel is closed
539                // 检查通道是否已关闭
540                if self.inner.closed.load(Ordering::Acquire) {
541                    // Double-check for remaining items to avoid race condition
542                    // where sender drops after push but before we check closed flag
543                    // 再次检查是否有剩余项,以避免发送方在 push 后但在我们检查 closed 标志前 drop 的竞态条件
544                    match consumer.pop() {
545                        Ok(value) => {
546                            self.inner.send_notify.notify_one();
547                            Ok(value)
548                        }
549                        Err(PopError::Empty) => {
550                            Err(TryRecvError::Closed)
551                        }
552                    }
553                } else {
554                    Err(TryRecvError::Empty)
555                }
556            }
557        }
558    }
559    
560    /// Check if the channel is empty
561    /// 
562    /// 检查通道是否为空
563    #[inline]
564    pub fn is_empty(&self) -> bool {
565        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
566        // is_empty 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
567        let consumer = unsafe { &*self.inner.consumer.get() };
568        consumer.is_empty()
569    }
570    
571    /// Get the number of messages currently in the channel
572    /// 
573    /// 获取通道中当前的消息数量
574    #[inline]
575    pub fn len(&self) -> usize {
576        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
577        // slots 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
578        let consumer = unsafe { &*self.inner.consumer.get() };
579        consumer.slots()
580    }
581    
582    /// Get the capacity of the channel
583    /// 
584    /// 获取通道的容量
585    #[inline]
586    pub fn capacity(&self) -> usize {
587        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
588        // capacity 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
589        let consumer = unsafe { &*self.inner.consumer.get() };
590        consumer.buffer().capacity()
591    }
592    
593    /// Peek at the first message without removing it
594    /// 
595    /// 查看第一个消息但不移除它
596    /// 
597    /// # Returns
598    /// `Some(&T)` if there is a message, `None` if the channel is empty
599    /// 
600    /// # 返回值
601    /// 如果有消息则返回 `Some(&T)`,如果通道为空则返回 `None`
602    /// 
603    /// # Safety
604    /// The returned reference is valid only as long as no other operations
605    /// are performed on the Receiver that might modify the buffer.
606    /// 
607    /// # 安全性
608    /// 返回的引用仅在未对 Receiver 执行可能修改缓冲区的其他操作时有效。
609    #[inline]
610    pub fn peek(&self) -> Option<&T> {
611        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
612        // peek 只读取数据,不需要可变访问
613        let consumer = unsafe { &*self.inner.consumer.get() };
614        consumer.peek()
615    }
616    
617    /// Clear all messages from the channel
618    /// 
619    /// 清空通道中的所有消息
620    /// 
621    /// This method pops and drops all messages currently in the channel.
622    /// 
623    /// 此方法弹出并 drop 通道中当前的所有消息。
624    pub fn clear(&mut self) {
625        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
626        // 我们有可变引用,因此可以安全地访问 consumer
627        let consumer = unsafe { &mut *self.inner.consumer.get() };
628        consumer.clear();
629        
630        // Notify sender that space is available
631        // 通知发送者空间可用
632        self.inner.send_notify.notify_one();
633    }
634    
635    /// Create a draining iterator
636    /// 
637    /// 创建一个消费迭代器
638    /// 
639    /// Returns an iterator that removes and returns messages from the channel.
640    /// The iterator will continue until the channel is empty.
641    /// 
642    /// 返回一个从通道中移除并返回消息的迭代器。
643    /// 迭代器将持续运行直到通道为空。
644    /// 
645    /// # Examples
646    /// 
647    /// ```
648    /// use lite_sync::spsc::channel;
649    /// use std::num::NonZeroUsize;
650    /// 
651    /// # #[tokio::main]
652    /// # async fn main() {
653    /// let (tx, mut rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
654    /// tx.try_send(1).unwrap();
655    /// tx.try_send(2).unwrap();
656    /// tx.try_send(3).unwrap();
657    /// 
658    /// let items: Vec<i32> = rx.drain().collect();
659    /// assert_eq!(items, vec![1, 2, 3]);
660    /// assert!(rx.is_empty());
661    /// # }
662    /// ```
663    #[inline]
664    pub fn drain(&mut self) -> Drain<'_, T, N> {
665        Drain { receiver: self }
666    }
667}
668
669impl<T: Copy, const N: usize> Receiver<T, N> {
670    /// Try to receive multiple values into a slice without blocking
671    /// 
672    /// 尝试非阻塞地将多个值接收到切片
673    /// 
674    /// This method attempts to receive as many messages as possible into the provided slice.
675    /// It returns the number of messages successfully received.
676    /// 
677    /// 此方法尝试将尽可能多的消息接收到提供的切片中。
678    /// 返回成功接收的消息数量。
679    /// 
680    /// # Parameters
681    /// - `dest`: Destination slice to receive values into
682    /// 
683    /// # Returns
684    /// Number of messages successfully received (0 to dest.len())
685    /// 
686    /// # 参数
687    /// - `dest`: 用于接收值的目标切片
688    /// 
689    /// # 返回值
690    /// 成功接收的消息数量(0 到 dest.len())
691    pub fn try_recv_slice(&mut self, dest: &mut [T]) -> usize {
692        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
693        // 我们有可变引用,因此可以安全地访问 consumer
694        let consumer = unsafe { &mut *self.inner.consumer.get() };
695        
696        let received = consumer.pop_slice(dest);
697        
698        if received > 0 {
699            // Successfully received some messages, notify sender
700            // 成功接收一些消息,通知发送者
701            self.inner.send_notify.notify_one();
702        }
703        
704        received
705    }
706    
707    /// Receive multiple values into a slice (async, waits if buffer is empty)
708    /// 
709    /// 将多个值接收到切片(异步,如果缓冲区空则等待)
710    /// 
711    /// This method will fill the destination slice as much as possible, waiting if necessary
712    /// when the buffer becomes empty. Returns the number of messages received.
713    /// 
714    /// 此方法将尽可能填充目标切片,必要时在缓冲区空时等待。
715    /// 返回接收的消息数量。
716    /// 
717    /// # Parameters
718    /// - `dest`: Destination slice to receive values into
719    /// 
720    /// # Returns
721    /// Number of messages successfully received (0 to dest.len())
722    /// Returns 0 if the channel is closed and empty
723    /// 
724    /// # 参数
725    /// - `dest`: 用于接收值的目标切片
726    /// 
727    /// # 返回值
728    /// 成功接收的消息数量(0 到 dest.len())
729    /// 如果通道已关闭且为空,返回 0
730    pub async fn recv_slice(&mut self, dest: &mut [T]) -> usize {
731        let mut total_received = 0;
732        
733        while total_received < dest.len() {
734            let received = self.try_recv_slice(&mut dest[total_received..]);
735            total_received += received;
736            
737            if total_received < dest.len() {
738                // Check if channel is closed
739                // 检查通道是否已关闭
740                if self.inner.closed.load(Ordering::Acquire) {
741                    // Channel closed, return what we have
742                    // 通道已关闭,返回我们已有的内容
743                    return total_received;
744                }
745                
746                // Need to wait for data
747                // 需要等待数据
748                self.inner.recv_notify.notified().await;
749                
750                // Check again after waking up
751                // 唤醒后再次检查
752                if self.inner.closed.load(Ordering::Acquire) {
753                    // Try one more time to get remaining messages
754                    // 再尝试一次获取剩余消息
755                    let final_received = self.try_recv_slice(&mut dest[total_received..]);
756                    total_received += final_received;
757                    return total_received;
758                }
759            }
760        }
761        
762        total_received
763    }
764}
765
766impl<T, const N: usize> Drop for Receiver<T, N> {
767    fn drop(&mut self) {
768        // Mark channel as closed when receiver is dropped
769        // 当接收器被丢弃时标记通道为已关闭
770        self.inner.closed.store(true, Ordering::Release);
771        
772        // Notify sender in case it's waiting
773        // 通知发送者以防它正在等待
774        self.inner.send_notify.notify_one();
775    }
776}
777
778impl<T, const N: usize> Drop for Sender<T, N> {
779    fn drop(&mut self) {
780        // Mark channel as closed when sender is dropped
781        // 当发送器被丢弃时标记通道为已关闭
782        self.inner.closed.store(true, Ordering::Release);
783        
784        // Notify receiver in case it's waiting
785        // 通知接收器以防它正在等待
786        self.inner.recv_notify.notify_one();
787    }
788}
789
790#[cfg(test)]
791mod tests {
792    use super::*;
793    
794    #[tokio::test]
795    async fn test_basic_send_recv() {
796        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
797        
798        tx.send(1).await.unwrap();
799        tx.send(2).await.unwrap();
800        tx.send(3).await.unwrap();
801        
802        assert_eq!(rx.recv().await, Some(1));
803        assert_eq!(rx.recv().await, Some(2));
804        assert_eq!(rx.recv().await, Some(3));
805    }
806    
807    #[tokio::test]
808    async fn test_try_send_recv() {
809        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
810        
811        tx.try_send(1).unwrap();
812        tx.try_send(2).unwrap();
813        
814        assert_eq!(rx.try_recv().unwrap(), 1);
815        assert_eq!(rx.try_recv().unwrap(), 2);
816        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
817    }
818    
819    #[tokio::test]
820    async fn test_channel_closed_on_sender_drop() {
821        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
822        
823        tx.send(1).await.unwrap();
824        drop(tx);
825        
826        assert_eq!(rx.recv().await, Some(1));
827        assert_eq!(rx.recv().await, None);
828    }
829    
830    #[tokio::test]
831    async fn test_channel_closed_on_receiver_drop() {
832        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
833        
834        drop(rx);
835        
836        assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
837    }
838    
839    #[tokio::test]
840    async fn test_cross_task_communication() {
841        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
842        
843        let sender_handle = tokio::spawn(async move {
844            for i in 0..10 {
845                tx.send(i).await.unwrap();
846            }
847        });
848        
849        let receiver_handle = tokio::spawn(async move {
850            let mut sum = 0;
851            while let Some(value) = rx.recv().await {
852                sum += value;
853            }
854            sum
855        });
856        
857        sender_handle.await.unwrap();
858        let sum = receiver_handle.await.unwrap();
859        assert_eq!(sum, 45); // 0+1+2+...+9 = 45
860    }
861    
862    #[tokio::test]
863    async fn test_backpressure() {
864        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
865        
866        // Fill the buffer
867        tx.try_send(1).unwrap();
868        tx.try_send(2).unwrap();
869        tx.try_send(3).unwrap();
870        tx.try_send(4).unwrap();
871        
872        // Buffer should be full now
873        assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
874        
875        // This should block and then succeed when we consume
876        let send_handle = tokio::spawn(async move {
877            tx.send(5).await.unwrap();
878            tx.send(6).await.unwrap();
879        });
880        
881        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
882        
883        assert_eq!(rx.recv().await, Some(1));
884        assert_eq!(rx.recv().await, Some(2));
885        assert_eq!(rx.recv().await, Some(3));
886        assert_eq!(rx.recv().await, Some(4));
887        assert_eq!(rx.recv().await, Some(5));
888        assert_eq!(rx.recv().await, Some(6));
889        
890        send_handle.await.unwrap();
891    }
892    
893    #[tokio::test]
894    async fn test_capacity_and_len() {
895        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
896        
897        assert_eq!(rx.capacity(), 8);
898        assert_eq!(rx.len(), 0);
899        assert!(rx.is_empty());
900        
901        tx.try_send(1).unwrap();
902        tx.try_send(2).unwrap();
903        
904        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
905        assert_eq!(rx.len(), 2);
906        assert!(!rx.is_empty());
907    }
908    
909    // ==================== New API Tests ====================
910    
911    #[tokio::test]
912    async fn test_sender_capacity_queries() {
913        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
914        
915        assert_eq!(tx.capacity(), 8);
916        assert_eq!(tx.len(), 0);
917        assert_eq!(tx.free_slots(), 8);
918        assert!(!tx.is_full());
919        
920        tx.try_send(1).unwrap();
921        tx.try_send(2).unwrap();
922        tx.try_send(3).unwrap();
923        
924        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
925        assert_eq!(tx.len(), 3);
926        assert_eq!(tx.free_slots(), 5);
927        assert!(!tx.is_full());
928        
929        // Fill the buffer
930        tx.try_send(4).unwrap();
931        tx.try_send(5).unwrap();
932        tx.try_send(6).unwrap();
933        tx.try_send(7).unwrap();
934        tx.try_send(8).unwrap();
935        
936        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
937        assert_eq!(tx.len(), 8);
938        assert_eq!(tx.free_slots(), 0);
939        assert!(tx.is_full());
940        
941        // Pop one and check again
942        rx.recv().await;
943        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
944        
945        assert_eq!(tx.len(), 7);
946        assert_eq!(tx.free_slots(), 1);
947        assert!(!tx.is_full());
948    }
949    
950    #[tokio::test]
951    async fn test_try_send_slice() {
952        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
953        
954        let data = [1, 2, 3, 4, 5];
955        let sent = tx.try_send_slice(&data);
956        
957        assert_eq!(sent, 5);
958        assert_eq!(rx.len(), 5);
959        
960        for i in 0..5 {
961            assert_eq!(rx.recv().await.unwrap(), data[i]);
962        }
963    }
964    
965    #[tokio::test]
966    async fn test_try_send_slice_partial() {
967        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
968        
969        // Fill with 5 elements, leaving room for 3
970        let initial = [1, 2, 3, 4, 5];
971        tx.try_send_slice(&initial);
972        
973        // Try to send 10 more, should only send 3
974        let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
975        let sent = tx.try_send_slice(&more);
976        
977        assert_eq!(sent, 3);
978        assert_eq!(rx.len(), 8);
979        assert!(tx.is_full());
980        
981        // Verify values
982        for i in 1..=8 {
983            assert_eq!(rx.recv().await.unwrap(), i);
984        }
985    }
986    
987    #[tokio::test]
988    async fn test_send_slice() {
989        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
990        
991        let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
992        let result = tx.send_slice(&data).await;
993        
994        assert_eq!(result.unwrap(), 10);
995        assert_eq!(rx.len(), 10);
996        
997        for i in 0..10 {
998            assert_eq!(rx.recv().await.unwrap(), data[i]);
999        }
1000    }
1001    
1002    #[tokio::test]
1003    async fn test_send_slice_with_backpressure() {
1004        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1005        
1006        let data = [1, 2, 3, 4, 5, 6, 7, 8];
1007        
1008        let send_handle = tokio::spawn(async move {
1009            tx.send_slice(&data).await.unwrap()
1010        });
1011        
1012        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1013        
1014        // Consume some messages to make room
1015        for i in 1..=4 {
1016            assert_eq!(rx.recv().await.unwrap(), i);
1017        }
1018        
1019        let sent = send_handle.await.unwrap();
1020        assert_eq!(sent, 8);
1021        
1022        // Verify remaining messages
1023        for i in 5..=8 {
1024            assert_eq!(rx.recv().await.unwrap(), i);
1025        }
1026    }
1027    
1028    #[tokio::test]
1029    async fn test_peek() {
1030        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1031        
1032        // Peek empty buffer
1033        assert!(rx.peek().is_none());
1034        
1035        tx.try_send(42).unwrap();
1036        tx.try_send(100).unwrap();
1037        tx.try_send(200).unwrap();
1038        
1039        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1040        
1041        // Peek should return first element without removing it
1042        assert_eq!(rx.peek(), Some(&42));
1043        assert_eq!(rx.peek(), Some(&42)); // Peek again, should be same
1044        assert_eq!(rx.len(), 3); // Length unchanged
1045    }
1046    
1047    #[tokio::test]
1048    async fn test_peek_after_recv() {
1049        let (tx, rx) = channel::<String, 32>(NonZeroUsize::new(8).unwrap());
1050        
1051        tx.try_send("first".to_string()).unwrap();
1052        tx.try_send("second".to_string()).unwrap();
1053        tx.try_send("third".to_string()).unwrap();
1054        
1055        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1056        
1057        assert_eq!(rx.peek(), Some(&"first".to_string()));
1058        rx.recv().await.unwrap();
1059        
1060        assert_eq!(rx.peek(), Some(&"second".to_string()));
1061        rx.recv().await.unwrap();
1062        
1063        assert_eq!(rx.peek(), Some(&"third".to_string()));
1064        rx.recv().await.unwrap();
1065        
1066        assert!(rx.peek().is_none());
1067    }
1068    
1069    #[tokio::test]
1070    async fn test_clear() {
1071        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1072        
1073        for i in 0..10 {
1074            tx.try_send(i).unwrap();
1075        }
1076        
1077        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1078        assert_eq!(rx.len(), 10);
1079        
1080        rx.clear();
1081        
1082        assert_eq!(rx.len(), 0);
1083        assert!(rx.is_empty());
1084    }
1085    
1086    #[tokio::test]
1087    async fn test_clear_with_drop() {
1088        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1089        use std::sync::Arc;
1090        
1091        #[derive(Debug)]
1092        struct DropCounter {
1093            counter: Arc<AtomicUsize>,
1094        }
1095        
1096        impl Drop for DropCounter {
1097            fn drop(&mut self) {
1098                self.counter.fetch_add(1, AtomicOrdering::SeqCst);
1099            }
1100        }
1101        
1102        let counter = Arc::new(AtomicUsize::new(0));
1103        
1104        {
1105            let (tx, mut rx) = channel::<DropCounter, 32>(NonZeroUsize::new(16).unwrap());
1106            
1107            for _ in 0..8 {
1108                tx.try_send(DropCounter { counter: counter.clone() }).unwrap();
1109            }
1110            
1111            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1112            assert_eq!(counter.load(AtomicOrdering::SeqCst), 0);
1113            
1114            rx.clear();
1115            
1116            assert_eq!(counter.load(AtomicOrdering::SeqCst), 8);
1117        }
1118    }
1119    
1120    #[tokio::test]
1121    async fn test_drain() {
1122        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1123        
1124        for i in 0..10 {
1125            tx.try_send(i).unwrap();
1126        }
1127        
1128        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1129        
1130        let collected: Vec<i32> = rx.drain().collect();
1131        
1132        assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1133        assert!(rx.is_empty());
1134    }
1135    
1136    #[tokio::test]
1137    async fn test_drain_empty() {
1138        let (_tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1139        
1140        let collected: Vec<i32> = rx.drain().collect();
1141        
1142        assert!(collected.is_empty());
1143    }
1144    
1145    #[tokio::test]
1146    async fn test_drain_size_hint() {
1147        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1148        
1149        for i in 0..5 {
1150            tx.try_send(i).unwrap();
1151        }
1152        
1153        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1154        
1155        let mut drain = rx.drain();
1156        
1157        assert_eq!(drain.size_hint(), (5, Some(5)));
1158        
1159        drain.next();
1160        assert_eq!(drain.size_hint(), (4, Some(4)));
1161        
1162        drain.next();
1163        assert_eq!(drain.size_hint(), (3, Some(3)));
1164    }
1165    
1166    #[tokio::test]
1167    async fn test_try_recv_slice() {
1168        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1169        
1170        // Send some data
1171        for i in 0..10 {
1172            tx.try_send(i).unwrap();
1173        }
1174        
1175        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1176        
1177        let mut dest = [0u32; 5];
1178        let received = rx.try_recv_slice(&mut dest);
1179        
1180        assert_eq!(received, 5);
1181        assert_eq!(dest, [0, 1, 2, 3, 4]);
1182        assert_eq!(rx.len(), 5);
1183    }
1184    
1185    #[tokio::test]
1186    async fn test_try_recv_slice_partial() {
1187        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1188        
1189        tx.try_send(1).unwrap();
1190        tx.try_send(2).unwrap();
1191        tx.try_send(3).unwrap();
1192        
1193        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1194        
1195        let mut dest = [0u32; 10];
1196        let received = rx.try_recv_slice(&mut dest);
1197        
1198        assert_eq!(received, 3);
1199        assert_eq!(&dest[0..3], &[1, 2, 3]);
1200        assert!(rx.is_empty());
1201    }
1202    
1203    #[tokio::test]
1204    async fn test_recv_slice() {
1205        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1206        
1207        for i in 1..=10 {
1208            tx.try_send(i).unwrap();
1209        }
1210        
1211        let mut dest = [0u32; 10];
1212        let received = rx.recv_slice(&mut dest).await;
1213        
1214        assert_eq!(received, 10);
1215        assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1216        assert!(rx.is_empty());
1217    }
1218    
1219    #[tokio::test]
1220    async fn test_recv_slice_with_wait() {
1221        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1222        
1223        let recv_handle = tokio::spawn(async move {
1224            let mut dest = [0u32; 8];
1225            let received = rx.recv_slice(&mut dest).await;
1226            (received, dest)
1227        });
1228        
1229        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1230        
1231        // Send data gradually
1232        for i in 1..=8 {
1233            tx.send(i).await.unwrap();
1234            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1235        }
1236        
1237        let (received, dest) = recv_handle.await.unwrap();
1238        assert_eq!(received, 8);
1239        assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8]);
1240    }
1241    
1242    #[tokio::test]
1243    async fn test_recv_slice_channel_closed() {
1244        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1245        
1246        tx.try_send(1).unwrap();
1247        tx.try_send(2).unwrap();
1248        tx.try_send(3).unwrap();
1249        
1250        drop(tx); // Close the channel
1251        
1252        let mut dest = [0u32; 10];
1253        let received = rx.recv_slice(&mut dest).await;
1254        
1255        // Should receive the 3 available messages, then stop
1256        assert_eq!(received, 3);
1257        assert_eq!(&dest[0..3], &[1, 2, 3]);
1258    }
1259    
1260    #[tokio::test]
1261    async fn test_combined_new_apis() {
1262        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1263        
1264        // Batch send
1265        let data = [1, 2, 3, 4, 5];
1266        tx.try_send_slice(&data);
1267        
1268        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1269        
1270        assert_eq!(tx.len(), 5);
1271        assert_eq!(rx.len(), 5);
1272        assert_eq!(rx.capacity(), 16);
1273        
1274        // Peek
1275        assert_eq!(rx.peek(), Some(&1));
1276        
1277        // Batch receive
1278        let mut dest = [0u32; 3];
1279        rx.try_recv_slice(&mut dest);
1280        assert_eq!(dest, [1, 2, 3]);
1281        
1282        assert_eq!(rx.len(), 2);
1283        assert_eq!(tx.free_slots(), 14);
1284        
1285        // Drain remaining
1286        let remaining: Vec<u32> = rx.drain().collect();
1287        assert_eq!(remaining, vec![4, 5]);
1288        
1289        assert!(rx.is_empty());
1290        assert!(!tx.is_full());
1291    }
1292}