lite_sync/
spsc.rs

1/// High-performance async SPSC (Single Producer Single Consumer) channel
2/// 
3/// Built on top of `smallring` - a high-performance ring buffer with inline storage support.
4/// Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5/// The `N` const generic parameter allows specifying inline buffer size for zero-allocation small channels.
6/// 
7/// 高性能异步 SPSC(单生产者单消费者)通道
8/// 
9/// 基于 `smallring` 构建 - 一个支持内联存储的高性能环形缓冲区。
10/// 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
11/// `N` 常量泛型参数允许指定内联缓冲区大小,实现小容量通道的零分配。
12///
13/// # 安全性说明 (Safety Notes)
14///
15/// 本实现使用 `UnsafeCell` 来提供零成本内部可变性,而不是 `Mutex`。
16/// 这是安全的,基于以下保证:
17///
18/// 1. **单一所有权**:`Sender` 和 `Receiver` 都不实现 `Clone`,确保每个通道只有一个发送者和一个接收者
19/// 2. **访问隔离**:`Producer` 只被唯一的 `Sender` 访问,`Consumer` 只被唯一的 `Receiver` 访问
20/// 3. **无数据竞争**:由于单一所有权,不会有多个线程同时访问同一个 `Producer` 或 `Consumer`
21/// 4. **原子通信**:`Producer` 和 `Consumer` 内部使用原子操作进行跨线程通信
22/// 5. **类型系统保证**:通过类型系统强制 SPSC 语义,防止误用为 MPMC
23///
24/// 这种设计实现了零同步开销,完全消除了 `Mutex` 的性能损失。
25///
26/// # Safety Guarantees
27///
28/// This implementation uses `UnsafeCell` for zero-cost interior mutability instead of `Mutex`.
29/// This is safe based on the following guarantees:
30///
31/// 1. **Single Ownership**: Neither `Sender` nor `Receiver` implements `Clone`, ensuring only one sender and one receiver per channel
32/// 2. **Access Isolation**: `Producer` is only accessed by the unique `Sender`, `Consumer` only by the unique `Receiver`
33/// 3. **No Data Races**: Due to single ownership, there's no concurrent access to the same `Producer` or `Consumer`
34/// 4. **Atomic Communication**: `Producer` and `Consumer` use atomic operations internally for cross-thread communication
35/// 5. **Type System Enforcement**: SPSC semantics are enforced by the type system, preventing misuse as MPMC
36///
37/// This design achieves zero synchronization overhead, completely eliminating `Mutex` performance costs.
38use std::cell::UnsafeCell;
39use std::num::NonZeroUsize;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::sync::Arc;
42use super::notify::SingleWaiterNotify;
43
44use smallring::spsc::{Producer, Consumer, new, PushError, PopError};
45
46/// SPSC channel creation function
47/// 
48/// Creates a bounded SPSC channel with the specified capacity.
49/// 
50/// # Type Parameters
51/// - `T`: The type of messages to be sent through the channel
52/// - `N`: The size of the inline buffer (number of elements stored inline before heap allocation)
53/// 
54/// # Parameters
55/// - `capacity`: Channel capacity (total number of elements the channel can hold)
56/// 
57/// # Returns
58/// A tuple of (Sender, Receiver)
59/// 
60/// # Examples
61/// 
62/// ```
63/// use lite_sync::spsc::channel;
64/// use std::num::NonZeroUsize;
65/// 
66/// #[tokio::main]
67/// async fn main() {
68///     // Create a channel with capacity 32 and inline buffer size 8
69///     let (tx, rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
70///     
71///     tokio::spawn(async move {
72///         tx.send(42).await.unwrap();
73///     });
74///     
75///     let value = rx.recv().await.unwrap();
76///     assert_eq!(value, 42);
77/// }
78/// ```
79/// 
80/// SPSC 通道创建函数
81/// 
82/// 创建指定容量的有界 SPSC 通道。
83/// 
84/// # 类型参数
85/// - `T`: 通过通道发送的消息类型
86/// - `N`: 内联缓冲区大小(在堆分配之前内联存储的元素数量)
87/// 
88/// # 参数
89/// - `capacity`: 通道容量(通道可以容纳的元素总数)
90/// 
91/// # 返回值
92/// 返回 (Sender, Receiver) 元组
93pub fn channel<T, const N: usize>(capacity: NonZeroUsize) -> (Sender<T, N>, Receiver<T, N>) {
94    let (producer, consumer) = new::<T, N>(capacity);
95    
96    let inner = Arc::new(Inner::<T, N> {
97        producer: UnsafeCell::new(producer),
98        consumer: UnsafeCell::new(consumer),
99        closed: AtomicBool::new(false),
100        recv_notify: SingleWaiterNotify::new(),
101        send_notify: SingleWaiterNotify::new(),
102    });
103    
104    let sender = Sender {
105        inner: inner.clone(),
106    };
107    
108    let receiver = Receiver {
109        inner,
110    };
111    
112    (sender, receiver)
113}
114
115/// Shared internal state for SPSC channel
116/// 
117/// Contains both shared state and the ring buffer halves.
118/// Uses UnsafeCell for zero-cost interior mutability of Producer/Consumer.
119/// 
120/// SPSC 通道的共享内部状态
121/// 
122/// 包含共享状态和环形缓冲区的两端。
123/// 使用 UnsafeCell 实现 Producer/Consumer 的零成本内部可变性。
124struct Inner<T, const N: usize = 32> {
125    /// Producer (wrapped in UnsafeCell for zero-cost interior mutability)
126    /// 
127    /// 生产者(用 UnsafeCell 包装以实现零成本内部可变性)
128    producer: UnsafeCell<Producer<T, N>>,
129    
130    /// Consumer (wrapped in UnsafeCell for zero-cost interior mutability)
131    /// 
132    /// 消费者(用 UnsafeCell 包装以实现零成本内部可变性)
133    consumer: UnsafeCell<Consumer<T, N>>,
134    
135    /// Channel closed flag
136    /// 
137    /// 通道关闭标志
138    closed: AtomicBool,
139    
140    /// Notifier for receiver waiting (lightweight single-waiter)
141    /// 
142    /// 接收者等待通知器(轻量级单等待者)
143    recv_notify: SingleWaiterNotify,
144    
145    /// Notifier for sender waiting when buffer is full (lightweight single-waiter)
146    /// 
147    /// 发送者等待通知器,当缓冲区满时使用(轻量级单等待者)
148    send_notify: SingleWaiterNotify,
149}
150
151// SAFETY: Inner<T> 可以在线程间安全共享的原因:
152// 1. Sender 和 Receiver 都不实现 Clone,确保单一所有权
153// 2. producer 只被唯一的 Sender 访问,不会有多线程竞争
154// 3. consumer 只被唯一的 Receiver 访问,不会有多线程竞争
155// 4. closed、recv_notify、send_notify 都已经是线程安全的
156// 5. Producer 和 Consumer 内部使用原子操作进行跨线程通信
157unsafe impl<T: Send, const N: usize> Sync for Inner<T, N> {}
158
159impl<T, const N: usize> std::fmt::Debug for Inner<T, N> {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        f.debug_struct("Inner")
162            .field("closed", &self.closed.load(Ordering::Acquire))
163            .finish()
164    }
165}
166
167/// SPSC channel sender
168/// 
169/// SPSC 通道发送器
170pub struct Sender<T, const N: usize> {
171    inner: Arc<Inner<T, N>>,
172}
173
174impl<T, const N: usize> std::fmt::Debug for Sender<T, N> {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        f.debug_struct("Sender")
177            .field("closed", &self.is_closed())
178            .field("len", &self.len())
179            .field("capacity", &self.capacity())
180            .finish()
181    }
182}
183
184/// SPSC channel receiver
185/// 
186/// SPSC 通道接收器
187pub struct Receiver<T, const N: usize> {
188    inner: Arc<Inner<T, N>>,
189}
190
191impl<T, const N: usize> std::fmt::Debug for Receiver<T, N> {
192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        f.debug_struct("Receiver")
194            .field("is_empty", &self.is_empty())
195            .field("len", &self.len())
196            .field("capacity", &self.capacity())
197            .finish()
198    }
199}
200
201/// Draining iterator for the SPSC channel
202/// 
203/// SPSC 通道的消费迭代器
204/// 
205/// This iterator removes and returns messages from the channel until it's empty.
206/// 
207/// 此迭代器从通道中移除并返回消息,直到通道为空。
208/// 
209/// # Type Parameters
210/// - `T`: Message type
211/// - `N`: Inline buffer size
212/// 
213/// # 类型参数
214/// - `T`: 消息类型
215/// - `N`: 内联缓冲区大小
216pub struct Drain<'a, T, const N: usize> {
217    receiver: &'a mut Receiver<T, N>,
218}
219
220impl<'a, T, const N: usize> std::fmt::Debug for Drain<'a, T, N> {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        f.debug_struct("Drain")
223            .field("len", &self.receiver.len())
224            .field("is_empty", &self.receiver.is_empty())
225            .finish()
226    }
227}
228
229impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
230    type Item = T;
231    
232    #[inline]
233    fn next(&mut self) -> Option<Self::Item> {
234        self.receiver.try_recv().ok()
235    }
236    
237    #[inline]
238    fn size_hint(&self) -> (usize, Option<usize>) {
239        let len = self.receiver.len();
240        (len, Some(len))
241    }
242}
243
244/// Send error type
245/// 
246/// 发送错误类型
247#[derive(Debug, Clone, Copy, PartialEq, Eq)]
248pub enum SendError<T> {
249    /// Channel is closed
250    /// 
251    /// 通道已关闭
252    Closed(T),
253}
254
255/// Try-receive error type
256/// 
257/// 尝试接收错误类型
258#[derive(Debug, Clone, Copy, PartialEq, Eq)]
259pub enum TryRecvError {
260    /// Channel is empty
261    /// 
262    /// 通道为空
263    Empty,
264    
265    /// Channel is closed
266    /// 
267    /// 通道已关闭
268    Closed,
269}
270
271/// Try-send error type
272/// 
273/// 尝试发送错误类型
274#[derive(Debug, Clone, Copy, PartialEq, Eq)]
275pub enum TrySendError<T> {
276    /// Buffer is full
277    /// 
278    /// 缓冲区已满
279    Full(T),
280    
281    /// Channel is closed
282    /// 
283    /// 通道已关闭
284    Closed(T),
285}
286
287impl<T, const N: usize> Sender<T, N> {
288    /// Send a message to the channel (async, waits if buffer is full)
289    /// 
290    /// # Errors
291    /// Returns `SendError::Closed` if the receiver has been dropped
292    /// 
293    /// 向通道发送消息(异步,如果缓冲区满则等待)
294    /// 
295    /// # 错误
296    /// 如果接收器已被丢弃,返回 `SendError::Closed`
297    pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
298        loop {
299            match self.try_send(value) {
300                Ok(()) => return Ok(()),
301                Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
302                Err(TrySendError::Full(v)) => {
303                    // Store the value to retry
304                    // 存储值以便重试
305                    value = v;
306                    
307                    // Wait for space to become available
308                    // 等待空间可用
309                    self.inner.send_notify.notified().await;
310                    
311                    // Check if channel was closed while waiting
312                    // 检查等待时通道是否已关闭
313                    if self.inner.closed.load(Ordering::Acquire) {
314                        return Err(SendError::Closed(value));
315                    }
316                    
317                    // Retry with the value in next loop iteration
318                    // 在下一次循环迭代中使用该值重试
319                }
320            }
321        }
322    }
323    
324    /// Try to send a message without blocking
325    /// 
326    /// # Errors
327    /// - Returns `TrySendError::Full` if the buffer is full
328    /// - Returns `TrySendError::Closed` if the receiver has been dropped
329    /// 
330    /// 尝试非阻塞地发送消息
331    /// 
332    /// # 错误
333    /// - 如果缓冲区满,返回 `TrySendError::Full`
334    /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
335    pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
336        // Check if channel is closed first
337        // 首先检查通道是否已关闭
338        if self.inner.closed.load(Ordering::Acquire) {
339            return Err(TrySendError::Closed(value));
340        }
341        
342        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
343        // 不会有多个线程同时访问 producer
344        let producer = unsafe { &mut *self.inner.producer.get() };
345        
346        match producer.push(value) {
347            Ok(()) => {
348                // Successfully sent, notify receiver
349                // 成功发送,通知接收者
350                self.inner.recv_notify.notify_one();
351                Ok(())
352            }
353            Err(PushError::Full(v)) => {
354                Err(TrySendError::Full(v))
355            }
356        }
357    }
358    
359    /// Check if the channel is closed
360    /// 
361    /// 检查通道是否已关闭
362    #[inline]
363    pub fn is_closed(&self) -> bool {
364        self.inner.closed.load(Ordering::Acquire)
365    }
366    
367    /// Get the capacity of the channel
368    /// 
369    /// 获取通道的容量
370    #[inline]
371    pub fn capacity(&self) -> usize {
372        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
373        // capacity 只读取数据,不需要可变访问
374        let producer = unsafe { &*self.inner.producer.get() };
375        producer.capacity()
376    }
377    
378    /// Get the number of messages currently in the channel
379    /// 
380    /// 获取通道中当前的消息数量
381    #[inline]
382    pub fn len(&self) -> usize {
383        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
384        // slots 只读取数据,不需要可变访问
385        let producer = unsafe { &*self.inner.producer.get() };
386        producer.slots()
387    }
388
389    #[inline]
390    pub fn is_empty(&self) -> bool {
391        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
392        // is_empty 只读取数据,不需要可变访问
393        let producer = unsafe { &*self.inner.producer.get() };
394        producer.is_empty()
395    }
396    
397    /// Get the number of free slots in the channel
398    /// 
399    /// 获取通道中的空闲空间数量
400    #[inline]
401    pub fn free_slots(&self) -> usize {
402        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
403        // free_slots 只读取数据,不需要可变访问
404        let producer = unsafe { &*self.inner.producer.get() };
405        producer.free_slots()
406    }
407    
408    /// Check if the channel is full
409    /// 
410    /// 检查通道是否已满
411    #[inline]
412    pub fn is_full(&self) -> bool {
413        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
414        // is_full 只读取数据,不需要可变访问
415        let producer = unsafe { &*self.inner.producer.get() };
416        producer.is_full()
417    }
418}
419
420impl<T: Copy, const N: usize> Sender<T, N> {
421    /// Try to send multiple values from a slice without blocking
422    /// 
423    /// 尝试非阻塞地从切片发送多个值
424    /// 
425    /// This method attempts to send as many elements as possible from the slice.
426    /// It returns the number of elements successfully sent.
427    /// 
428    /// 此方法尝试从切片中发送尽可能多的元素。
429    /// 返回成功发送的元素数量。
430    /// 
431    /// # Parameters
432    /// - `values`: Slice of values to send
433    /// 
434    /// # Returns
435    /// Number of elements successfully sent (0 to values.len())
436    /// 
437    /// # 参数
438    /// - `values`: 要发送的值的切片
439    /// 
440    /// # 返回值
441    /// 成功发送的元素数量(0 到 values.len())
442    pub fn try_send_slice(&self, values: &[T]) -> usize {
443        // Check if channel is closed first
444        // 首先检查通道是否已关闭
445        if self.inner.closed.load(Ordering::Acquire) {
446            return 0;
447        }
448        
449        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
450        // 不会有多个线程同时访问 producer
451        let producer = unsafe { &mut *self.inner.producer.get() };
452        
453        let sent = producer.push_slice(values);
454        
455        if sent > 0 {
456            // Successfully sent some messages, notify receiver
457            // 成功发送一些消息,通知接收者
458            self.inner.recv_notify.notify_one();
459        }
460        
461        sent
462    }
463    
464    /// Send multiple values from a slice (async, waits if buffer is full)
465    /// 
466    /// 从切片发送多个值(异步,如果缓冲区满则等待)
467    /// 
468    /// This method will send all elements from the slice, waiting if necessary
469    /// when the buffer becomes full. Returns the number of elements sent, or
470    /// an error if the channel is closed.
471    /// 
472    /// 此方法将发送切片中的所有元素,必要时在缓冲区满时等待。
473    /// 返回发送的元素数量,如果通道关闭则返回错误。
474    /// 
475    /// # Parameters
476    /// - `values`: Slice of values to send
477    /// 
478    /// # Returns
479    /// - `Ok(usize)`: Number of elements successfully sent
480    /// - `Err(SendError)`: Channel is closed
481    /// 
482    /// # 参数
483    /// - `values`: 要发送的值的切片
484    /// 
485    /// # 返回值
486    /// - `Ok(usize)`: 成功发送的元素数量
487    /// - `Err(SendError)`: 通道已关闭
488    pub async fn send_slice(&self, values: &[T]) -> Result<usize, SendError<()>> {
489        let mut total_sent = 0;
490        
491        while total_sent < values.len() {
492            // Check if channel is closed
493            // 检查通道是否已关闭
494            if self.inner.closed.load(Ordering::Acquire) {
495                return Err(SendError::Closed(()));
496            }
497            
498            let sent = self.try_send_slice(&values[total_sent..]);
499            total_sent += sent;
500            
501            if total_sent < values.len() {
502                // Need to wait for space
503                // 需要等待空间
504                self.inner.send_notify.notified().await;
505                
506                // Check if channel was closed while waiting
507                // 检查等待时通道是否已关闭
508                if self.inner.closed.load(Ordering::Acquire) {
509                    return Err(SendError::Closed(()));
510                }
511            }
512        }
513        
514        Ok(total_sent)
515    }
516}
517
518impl<T, const N: usize> Receiver<T, N> {
519    /// Receive a message from the channel (async, waits if buffer is empty)
520    /// 
521    /// Returns `None` if the channel is closed and empty
522    /// 
523    /// 从通道接收消息(异步,如果缓冲区空则等待)
524    /// 
525    /// 如果通道已关闭且为空,返回 `None`
526    pub async fn recv(&self) -> Option<T> {
527        loop {
528            match self.try_recv() {
529                Ok(value) => return Some(value),
530                Err(TryRecvError::Closed) => return None,
531                Err(TryRecvError::Empty) => {
532                    // Check if channel is closed before waiting
533                    // 等待前检查通道是否已关闭
534                    if self.inner.closed.load(Ordering::Acquire) {
535                        // Double check if there are any remaining items
536                        // 再次检查是否有剩余项
537                        if let Ok(value) = self.try_recv() {
538                            return Some(value);
539                        }
540                        return None;
541                    }
542                    
543                    // Wait for data to become available
544                    // 等待数据可用
545                    self.inner.recv_notify.notified().await;
546                }
547            }
548        }
549    }
550    
551    /// Try to receive a message without blocking
552    /// 
553    /// # Errors
554    /// - Returns `TryRecvError::Empty` if the buffer is empty
555    /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
556    /// 
557    /// 尝试非阻塞地接收消息
558    /// 
559    /// # 错误
560    /// - 如果缓冲区空,返回 `TryRecvError::Empty`
561    /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
562    pub fn try_recv(&self) -> Result<T, TryRecvError> {
563        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
564        // 不会有多个线程同时访问 consumer
565        let consumer = unsafe { &mut *self.inner.consumer.get() };
566        
567        match consumer.pop() {
568            Ok(value) => {
569                // Successfully received, notify sender
570                // 成功接收,通知发送者
571                self.inner.send_notify.notify_one();
572                Ok(value)
573            }
574            Err(PopError::Empty) => {
575                // Check if channel is closed
576                // 检查通道是否已关闭
577                if self.inner.closed.load(Ordering::Acquire) {
578                    // Double-check for remaining items to avoid race condition
579                    // where sender drops after push but before we check closed flag
580                    // 再次检查是否有剩余项,以避免发送方在 push 后但在我们检查 closed 标志前 drop 的竞态条件
581                    match consumer.pop() {
582                        Ok(value) => {
583                            self.inner.send_notify.notify_one();
584                            Ok(value)
585                        }
586                        Err(PopError::Empty) => {
587                            Err(TryRecvError::Closed)
588                        }
589                    }
590                } else {
591                    Err(TryRecvError::Empty)
592                }
593            }
594        }
595    }
596    
597    /// Check if the channel is empty
598    /// 
599    /// 检查通道是否为空
600    #[inline]
601    pub fn is_empty(&self) -> bool {
602        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
603        // is_empty 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
604        let consumer = unsafe { &*self.inner.consumer.get() };
605        consumer.is_empty()
606    }
607    
608    /// Get the number of messages currently in the channel
609    /// 
610    /// 获取通道中当前的消息数量
611    #[inline]
612    pub fn len(&self) -> usize {
613        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
614        // slots 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
615        let consumer = unsafe { &*self.inner.consumer.get() };
616        consumer.slots()
617    }
618    
619    /// Get the capacity of the channel
620    /// 
621    /// 获取通道的容量
622    #[inline]
623    pub fn capacity(&self) -> usize {
624        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
625        // capacity 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
626        let consumer = unsafe { &*self.inner.consumer.get() };
627        consumer.buffer().capacity()
628    }
629    
630    /// Peek at the first message without removing it
631    /// 
632    /// 查看第一个消息但不移除它
633    /// 
634    /// # Returns
635    /// `Some(&T)` if there is a message, `None` if the channel is empty
636    /// 
637    /// # 返回值
638    /// 如果有消息则返回 `Some(&T)`,如果通道为空则返回 `None`
639    /// 
640    /// # Safety
641    /// The returned reference is valid only as long as no other operations
642    /// are performed on the Receiver that might modify the buffer.
643    /// 
644    /// # 安全性
645    /// 返回的引用仅在未对 Receiver 执行可能修改缓冲区的其他操作时有效。
646    #[inline]
647    pub fn peek(&self) -> Option<&T> {
648        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
649        // peek 只读取数据,不需要可变访问
650        let consumer = unsafe { &*self.inner.consumer.get() };
651        consumer.peek()
652    }
653    
654    /// Clear all messages from the channel
655    /// 
656    /// 清空通道中的所有消息
657    /// 
658    /// This method pops and drops all messages currently in the channel.
659    /// 
660    /// 此方法弹出并 drop 通道中当前的所有消息。
661    pub fn clear(&mut self) {
662        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
663        // 我们有可变引用,因此可以安全地访问 consumer
664        let consumer = unsafe { &mut *self.inner.consumer.get() };
665        consumer.clear();
666        
667        // Notify sender that space is available
668        // 通知发送者空间可用
669        self.inner.send_notify.notify_one();
670    }
671    
672    /// Create a draining iterator
673    /// 
674    /// 创建一个消费迭代器
675    /// 
676    /// Returns an iterator that removes and returns messages from the channel.
677    /// The iterator will continue until the channel is empty.
678    /// 
679    /// 返回一个从通道中移除并返回消息的迭代器。
680    /// 迭代器将持续运行直到通道为空。
681    /// 
682    /// # Examples
683    /// 
684    /// ```
685    /// use lite_sync::spsc::channel;
686    /// use std::num::NonZeroUsize;
687    /// 
688    /// # #[tokio::main]
689    /// # async fn main() {
690    /// let (tx, mut rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
691    /// tx.try_send(1).unwrap();
692    /// tx.try_send(2).unwrap();
693    /// tx.try_send(3).unwrap();
694    /// 
695    /// let items: Vec<i32> = rx.drain().collect();
696    /// assert_eq!(items, vec![1, 2, 3]);
697    /// assert!(rx.is_empty());
698    /// # }
699    /// ```
700    #[inline]
701    pub fn drain(&mut self) -> Drain<'_, T, N> {
702        Drain { receiver: self }
703    }
704}
705
706impl<T: Copy, const N: usize> Receiver<T, N> {
707    /// Try to receive multiple values into a slice without blocking
708    /// 
709    /// 尝试非阻塞地将多个值接收到切片
710    /// 
711    /// This method attempts to receive as many messages as possible into the provided slice.
712    /// It returns the number of messages successfully received.
713    /// 
714    /// 此方法尝试将尽可能多的消息接收到提供的切片中。
715    /// 返回成功接收的消息数量。
716    /// 
717    /// # Parameters
718    /// - `dest`: Destination slice to receive values into
719    /// 
720    /// # Returns
721    /// Number of messages successfully received (0 to dest.len())
722    /// 
723    /// # 参数
724    /// - `dest`: 用于接收值的目标切片
725    /// 
726    /// # 返回值
727    /// 成功接收的消息数量(0 到 dest.len())
728    pub fn try_recv_slice(&mut self, dest: &mut [T]) -> usize {
729        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
730        // 我们有可变引用,因此可以安全地访问 consumer
731        let consumer = unsafe { &mut *self.inner.consumer.get() };
732        
733        let received = consumer.pop_slice(dest);
734        
735        if received > 0 {
736            // Successfully received some messages, notify sender
737            // 成功接收一些消息,通知发送者
738            self.inner.send_notify.notify_one();
739        }
740        
741        received
742    }
743    
744    /// Receive multiple values into a slice (async, waits if buffer is empty)
745    /// 
746    /// 将多个值接收到切片(异步,如果缓冲区空则等待)
747    /// 
748    /// This method will fill the destination slice as much as possible, waiting if necessary
749    /// when the buffer becomes empty. Returns the number of messages received.
750    /// 
751    /// 此方法将尽可能填充目标切片,必要时在缓冲区空时等待。
752    /// 返回接收的消息数量。
753    /// 
754    /// # Parameters
755    /// - `dest`: Destination slice to receive values into
756    /// 
757    /// # Returns
758    /// Number of messages successfully received (0 to dest.len())
759    /// Returns 0 if the channel is closed and empty
760    /// 
761    /// # 参数
762    /// - `dest`: 用于接收值的目标切片
763    /// 
764    /// # 返回值
765    /// 成功接收的消息数量(0 到 dest.len())
766    /// 如果通道已关闭且为空,返回 0
767    pub async fn recv_slice(&mut self, dest: &mut [T]) -> usize {
768        let mut total_received = 0;
769        
770        while total_received < dest.len() {
771            let received = self.try_recv_slice(&mut dest[total_received..]);
772            total_received += received;
773            
774            if total_received < dest.len() {
775                // Check if channel is closed
776                // 检查通道是否已关闭
777                if self.inner.closed.load(Ordering::Acquire) {
778                    // Channel closed, return what we have
779                    // 通道已关闭,返回我们已有的内容
780                    return total_received;
781                }
782                
783                // Need to wait for data
784                // 需要等待数据
785                self.inner.recv_notify.notified().await;
786                
787                // Check again after waking up
788                // 唤醒后再次检查
789                if self.inner.closed.load(Ordering::Acquire) {
790                    // Try one more time to get remaining messages
791                    // 再尝试一次获取剩余消息
792                    let final_received = self.try_recv_slice(&mut dest[total_received..]);
793                    total_received += final_received;
794                    return total_received;
795                }
796            }
797        }
798        
799        total_received
800    }
801}
802
803impl<T, const N: usize> Drop for Receiver<T, N> {
804    fn drop(&mut self) {
805        // Mark channel as closed when receiver is dropped
806        // 当接收器被丢弃时标记通道为已关闭
807        self.inner.closed.store(true, Ordering::Release);
808        
809        // Notify sender in case it's waiting
810        // 通知发送者以防它正在等待
811        self.inner.send_notify.notify_one();
812    }
813}
814
815impl<T, const N: usize> Drop for Sender<T, N> {
816    fn drop(&mut self) {
817        // Mark channel as closed when sender is dropped
818        // 当发送器被丢弃时标记通道为已关闭
819        self.inner.closed.store(true, Ordering::Release);
820        
821        // Notify receiver in case it's waiting
822        // 通知接收器以防它正在等待
823        self.inner.recv_notify.notify_one();
824    }
825}
826
827#[cfg(test)]
828mod tests {
829    use super::*;
830    
831    #[tokio::test]
832    async fn test_basic_send_recv() {
833        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
834        
835        tx.send(1).await.unwrap();
836        tx.send(2).await.unwrap();
837        tx.send(3).await.unwrap();
838        
839        assert_eq!(rx.recv().await, Some(1));
840        assert_eq!(rx.recv().await, Some(2));
841        assert_eq!(rx.recv().await, Some(3));
842    }
843    
844    #[tokio::test]
845    async fn test_try_send_recv() {
846        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
847        
848        tx.try_send(1).unwrap();
849        tx.try_send(2).unwrap();
850        
851        assert_eq!(rx.try_recv().unwrap(), 1);
852        assert_eq!(rx.try_recv().unwrap(), 2);
853        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
854    }
855    
856    #[tokio::test]
857    async fn test_channel_closed_on_sender_drop() {
858        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
859        
860        tx.send(1).await.unwrap();
861        drop(tx);
862        
863        assert_eq!(rx.recv().await, Some(1));
864        assert_eq!(rx.recv().await, None);
865    }
866    
867    #[tokio::test]
868    async fn test_channel_closed_on_receiver_drop() {
869        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
870        
871        drop(rx);
872        
873        assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
874    }
875    
876    #[tokio::test]
877    async fn test_cross_task_communication() {
878        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
879        
880        let sender_handle = tokio::spawn(async move {
881            for i in 0..10 {
882                tx.send(i).await.unwrap();
883            }
884        });
885        
886        let receiver_handle = tokio::spawn(async move {
887            let mut sum = 0;
888            while let Some(value) = rx.recv().await {
889                sum += value;
890            }
891            sum
892        });
893        
894        sender_handle.await.unwrap();
895        let sum = receiver_handle.await.unwrap();
896        assert_eq!(sum, 45); // 0+1+2+...+9 = 45
897    }
898    
899    #[tokio::test]
900    async fn test_backpressure() {
901        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
902        
903        // Fill the buffer
904        tx.try_send(1).unwrap();
905        tx.try_send(2).unwrap();
906        tx.try_send(3).unwrap();
907        tx.try_send(4).unwrap();
908        
909        // Buffer should be full now
910        assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
911        
912        // This should block and then succeed when we consume
913        let send_handle = tokio::spawn(async move {
914            tx.send(5).await.unwrap();
915            tx.send(6).await.unwrap();
916        });
917        
918        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
919        
920        assert_eq!(rx.recv().await, Some(1));
921        assert_eq!(rx.recv().await, Some(2));
922        assert_eq!(rx.recv().await, Some(3));
923        assert_eq!(rx.recv().await, Some(4));
924        assert_eq!(rx.recv().await, Some(5));
925        assert_eq!(rx.recv().await, Some(6));
926        
927        send_handle.await.unwrap();
928    }
929    
930    #[tokio::test]
931    async fn test_capacity_and_len() {
932        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
933        
934        assert_eq!(rx.capacity(), 8);
935        assert_eq!(rx.len(), 0);
936        assert!(rx.is_empty());
937        
938        tx.try_send(1).unwrap();
939        tx.try_send(2).unwrap();
940        
941        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
942        assert_eq!(rx.len(), 2);
943        assert!(!rx.is_empty());
944    }
945    
946    // ==================== New API Tests ====================
947    
948    #[tokio::test]
949    async fn test_sender_capacity_queries() {
950        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
951        
952        assert_eq!(tx.capacity(), 8);
953        assert_eq!(tx.len(), 0);
954        assert_eq!(tx.free_slots(), 8);
955        assert!(!tx.is_full());
956        
957        tx.try_send(1).unwrap();
958        tx.try_send(2).unwrap();
959        tx.try_send(3).unwrap();
960        
961        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
962        assert_eq!(tx.len(), 3);
963        assert_eq!(tx.free_slots(), 5);
964        assert!(!tx.is_full());
965        
966        // Fill the buffer
967        tx.try_send(4).unwrap();
968        tx.try_send(5).unwrap();
969        tx.try_send(6).unwrap();
970        tx.try_send(7).unwrap();
971        tx.try_send(8).unwrap();
972        
973        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
974        assert_eq!(tx.len(), 8);
975        assert_eq!(tx.free_slots(), 0);
976        assert!(tx.is_full());
977        
978        // Pop one and check again
979        rx.recv().await;
980        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
981        
982        assert_eq!(tx.len(), 7);
983        assert_eq!(tx.free_slots(), 1);
984        assert!(!tx.is_full());
985    }
986    
987    #[tokio::test]
988    async fn test_try_send_slice() {
989        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
990        
991        let data = [1, 2, 3, 4, 5];
992        let sent = tx.try_send_slice(&data);
993        
994        assert_eq!(sent, 5);
995        assert_eq!(rx.len(), 5);
996        
997        for i in 0..5 {
998            assert_eq!(rx.recv().await.unwrap(), data[i]);
999        }
1000    }
1001    
1002    #[tokio::test]
1003    async fn test_try_send_slice_partial() {
1004        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1005        
1006        // Fill with 5 elements, leaving room for 3
1007        let initial = [1, 2, 3, 4, 5];
1008        tx.try_send_slice(&initial);
1009        
1010        // Try to send 10 more, should only send 3
1011        let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
1012        let sent = tx.try_send_slice(&more);
1013        
1014        assert_eq!(sent, 3);
1015        assert_eq!(rx.len(), 8);
1016        assert!(tx.is_full());
1017        
1018        // Verify values
1019        for i in 1..=8 {
1020            assert_eq!(rx.recv().await.unwrap(), i);
1021        }
1022    }
1023    
1024    #[tokio::test]
1025    async fn test_send_slice() {
1026        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1027        
1028        let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
1029        let result = tx.send_slice(&data).await;
1030        
1031        assert_eq!(result.unwrap(), 10);
1032        assert_eq!(rx.len(), 10);
1033        
1034        for i in 0..10 {
1035            assert_eq!(rx.recv().await.unwrap(), data[i]);
1036        }
1037    }
1038    
1039    #[tokio::test]
1040    async fn test_send_slice_with_backpressure() {
1041        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1042        
1043        let data = [1, 2, 3, 4, 5, 6, 7, 8];
1044        
1045        let send_handle = tokio::spawn(async move {
1046            tx.send_slice(&data).await.unwrap()
1047        });
1048        
1049        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1050        
1051        // Consume some messages to make room
1052        for i in 1..=4 {
1053            assert_eq!(rx.recv().await.unwrap(), i);
1054        }
1055        
1056        let sent = send_handle.await.unwrap();
1057        assert_eq!(sent, 8);
1058        
1059        // Verify remaining messages
1060        for i in 5..=8 {
1061            assert_eq!(rx.recv().await.unwrap(), i);
1062        }
1063    }
1064    
1065    #[tokio::test]
1066    async fn test_peek() {
1067        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1068        
1069        // Peek empty buffer
1070        assert!(rx.peek().is_none());
1071        
1072        tx.try_send(42).unwrap();
1073        tx.try_send(100).unwrap();
1074        tx.try_send(200).unwrap();
1075        
1076        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1077        
1078        // Peek should return first element without removing it
1079        assert_eq!(rx.peek(), Some(&42));
1080        assert_eq!(rx.peek(), Some(&42)); // Peek again, should be same
1081        assert_eq!(rx.len(), 3); // Length unchanged
1082    }
1083    
1084    #[tokio::test]
1085    async fn test_peek_after_recv() {
1086        let (tx, rx) = channel::<String, 32>(NonZeroUsize::new(8).unwrap());
1087        
1088        tx.try_send("first".to_string()).unwrap();
1089        tx.try_send("second".to_string()).unwrap();
1090        tx.try_send("third".to_string()).unwrap();
1091        
1092        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1093        
1094        assert_eq!(rx.peek(), Some(&"first".to_string()));
1095        rx.recv().await.unwrap();
1096        
1097        assert_eq!(rx.peek(), Some(&"second".to_string()));
1098        rx.recv().await.unwrap();
1099        
1100        assert_eq!(rx.peek(), Some(&"third".to_string()));
1101        rx.recv().await.unwrap();
1102        
1103        assert!(rx.peek().is_none());
1104    }
1105    
1106    #[tokio::test]
1107    async fn test_clear() {
1108        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1109        
1110        for i in 0..10 {
1111            tx.try_send(i).unwrap();
1112        }
1113        
1114        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1115        assert_eq!(rx.len(), 10);
1116        
1117        rx.clear();
1118        
1119        assert_eq!(rx.len(), 0);
1120        assert!(rx.is_empty());
1121    }
1122    
1123    #[tokio::test]
1124    async fn test_clear_with_drop() {
1125        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1126        use std::sync::Arc;
1127        
1128        #[derive(Debug)]
1129        struct DropCounter {
1130            counter: Arc<AtomicUsize>,
1131        }
1132        
1133        impl Drop for DropCounter {
1134            fn drop(&mut self) {
1135                self.counter.fetch_add(1, AtomicOrdering::SeqCst);
1136            }
1137        }
1138        
1139        let counter = Arc::new(AtomicUsize::new(0));
1140        
1141        {
1142            let (tx, mut rx) = channel::<DropCounter, 32>(NonZeroUsize::new(16).unwrap());
1143            
1144            for _ in 0..8 {
1145                tx.try_send(DropCounter { counter: counter.clone() }).unwrap();
1146            }
1147            
1148            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1149            assert_eq!(counter.load(AtomicOrdering::SeqCst), 0);
1150            
1151            rx.clear();
1152            
1153            assert_eq!(counter.load(AtomicOrdering::SeqCst), 8);
1154        }
1155    }
1156    
1157    #[tokio::test]
1158    async fn test_drain() {
1159        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1160        
1161        for i in 0..10 {
1162            tx.try_send(i).unwrap();
1163        }
1164        
1165        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1166        
1167        let collected: Vec<i32> = rx.drain().collect();
1168        
1169        assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1170        assert!(rx.is_empty());
1171    }
1172    
1173    #[tokio::test]
1174    async fn test_drain_empty() {
1175        let (_tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1176        
1177        let collected: Vec<i32> = rx.drain().collect();
1178        
1179        assert!(collected.is_empty());
1180    }
1181    
1182    #[tokio::test]
1183    async fn test_drain_size_hint() {
1184        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1185        
1186        for i in 0..5 {
1187            tx.try_send(i).unwrap();
1188        }
1189        
1190        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1191        
1192        let mut drain = rx.drain();
1193        
1194        assert_eq!(drain.size_hint(), (5, Some(5)));
1195        
1196        drain.next();
1197        assert_eq!(drain.size_hint(), (4, Some(4)));
1198        
1199        drain.next();
1200        assert_eq!(drain.size_hint(), (3, Some(3)));
1201    }
1202    
1203    #[tokio::test]
1204    async fn test_try_recv_slice() {
1205        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1206        
1207        // Send some data
1208        for i in 0..10 {
1209            tx.try_send(i).unwrap();
1210        }
1211        
1212        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1213        
1214        let mut dest = [0u32; 5];
1215        let received = rx.try_recv_slice(&mut dest);
1216        
1217        assert_eq!(received, 5);
1218        assert_eq!(dest, [0, 1, 2, 3, 4]);
1219        assert_eq!(rx.len(), 5);
1220    }
1221    
1222    #[tokio::test]
1223    async fn test_try_recv_slice_partial() {
1224        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1225        
1226        tx.try_send(1).unwrap();
1227        tx.try_send(2).unwrap();
1228        tx.try_send(3).unwrap();
1229        
1230        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1231        
1232        let mut dest = [0u32; 10];
1233        let received = rx.try_recv_slice(&mut dest);
1234        
1235        assert_eq!(received, 3);
1236        assert_eq!(&dest[0..3], &[1, 2, 3]);
1237        assert!(rx.is_empty());
1238    }
1239    
1240    #[tokio::test]
1241    async fn test_recv_slice() {
1242        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1243        
1244        for i in 1..=10 {
1245            tx.try_send(i).unwrap();
1246        }
1247        
1248        let mut dest = [0u32; 10];
1249        let received = rx.recv_slice(&mut dest).await;
1250        
1251        assert_eq!(received, 10);
1252        assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1253        assert!(rx.is_empty());
1254    }
1255    
1256    #[tokio::test]
1257    async fn test_recv_slice_with_wait() {
1258        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1259        
1260        let recv_handle = tokio::spawn(async move {
1261            let mut dest = [0u32; 8];
1262            let received = rx.recv_slice(&mut dest).await;
1263            (received, dest)
1264        });
1265        
1266        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1267        
1268        // Send data gradually
1269        for i in 1..=8 {
1270            tx.send(i).await.unwrap();
1271            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1272        }
1273        
1274        let (received, dest) = recv_handle.await.unwrap();
1275        assert_eq!(received, 8);
1276        assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8]);
1277    }
1278    
1279    #[tokio::test]
1280    async fn test_recv_slice_channel_closed() {
1281        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1282        
1283        tx.try_send(1).unwrap();
1284        tx.try_send(2).unwrap();
1285        tx.try_send(3).unwrap();
1286        
1287        drop(tx); // Close the channel
1288        
1289        let mut dest = [0u32; 10];
1290        let received = rx.recv_slice(&mut dest).await;
1291        
1292        // Should receive the 3 available messages, then stop
1293        assert_eq!(received, 3);
1294        assert_eq!(&dest[0..3], &[1, 2, 3]);
1295    }
1296    
1297    #[tokio::test]
1298    async fn test_combined_new_apis() {
1299        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1300        
1301        // Batch send
1302        let data = [1, 2, 3, 4, 5];
1303        tx.try_send_slice(&data);
1304        
1305        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1306        
1307        assert_eq!(tx.len(), 5);
1308        assert_eq!(rx.len(), 5);
1309        assert_eq!(rx.capacity(), 16);
1310        
1311        // Peek
1312        assert_eq!(rx.peek(), Some(&1));
1313        
1314        // Batch receive
1315        let mut dest = [0u32; 3];
1316        rx.try_recv_slice(&mut dest);
1317        assert_eq!(dest, [1, 2, 3]);
1318        
1319        assert_eq!(rx.len(), 2);
1320        assert_eq!(tx.free_slots(), 14);
1321        
1322        // Drain remaining
1323        let remaining: Vec<u32> = rx.drain().collect();
1324        assert_eq!(remaining, vec![4, 5]);
1325        
1326        assert!(rx.is_empty());
1327        assert!(!tx.is_full());
1328    }
1329}