lite_sync/
spsc.rs

1//! High-performance async SPSC (Single Producer Single Consumer) channel
2//!
3//! Built on top of `smallring` - a high-performance ring buffer with inline storage support.
4//! Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5//! The `N` const generic parameter allows specifying inline buffer size for zero-allocation small channels.
6//!
7//! 高性能异步 SPSC(单生产者单消费者)通道
8//!
9//! 基于 `smallring` 构建 - 一个支持内联存储的高性能环形缓冲区。
10//! 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
11//! `N` 常量泛型参数允许指定内联缓冲区大小,实现小容量通道的零分配。
12//!
13//! # 安全性说明 (Safety Notes)
14//!
15//! 本实现使用 `UnsafeCell` 来提供零成本内部可变性,而不是 `Mutex`。
16//! 这是安全的,基于以下保证:
17//!
18//! 1. **单一所有权**:`Sender` 和 `Receiver` 都不实现 `Clone`,确保每个通道只有一个发送者和一个接收者
19//! 2. **访问隔离**:`Producer` 只被唯一的 `Sender` 访问,`Consumer` 只被唯一的 `Receiver` 访问
20//! 3. **无数据竞争**:由于单一所有权,不会有多个线程同时访问同一个 `Producer` 或 `Consumer`
21//! 4. **原子通信**:`Producer` 和 `Consumer` 内部使用原子操作进行跨线程通信
22//! 5. **类型系统保证**:通过类型系统强制 SPSC 语义,防止误用为 MPMC
23//!
24//! 这种设计实现了零同步开销,完全消除了 `Mutex` 的性能损失。
25//!
26//! # Safety Guarantees
27//!
28//! This implementation uses `UnsafeCell` for zero-cost interior mutability instead of `Mutex`.
29//! This is safe based on the following guarantees:
30//!
31//! 1. **Single Ownership**: Neither `Sender` nor `Receiver` implements `Clone`, ensuring only one sender and one receiver per channel
32//! 2. **Access Isolation**: `Producer` is only accessed by the unique `Sender`, `Consumer` only by the unique `Receiver`
33//! 3. **No Data Races**: Due to single ownership, there's no concurrent access to the same `Producer` or `Consumer`
34//! 4. **Atomic Communication**: `Producer` and `Consumer` use atomic operations internally for cross-thread communication
35//! 5. **Type System Enforcement**: SPSC semantics are enforced by the type system, preventing misuse as MPMC
36//!
37//! This design achieves zero synchronization overhead, completely eliminating `Mutex` performance costs.
38use crate::shim::atomic::{AtomicBool, Ordering};
39use crate::shim::cell::UnsafeCell;
40use crate::shim::notify::SingleWaiterNotify;
41use crate::shim::sync::Arc;
42use core::num::NonZeroUsize;
43use smallring::spsc::{Consumer, PopError, Producer, PushError, new};
44
45/// SPSC channel creation function
46///
47/// Creates a bounded SPSC channel with the specified capacity.
48///
49/// # Type Parameters
50/// - `T`: The type of messages to be sent through the channel
51/// - `N`: The size of the inline buffer (number of elements stored inline before heap allocation)
52///
53/// # Parameters
54/// - `capacity`: Channel capacity (total number of elements the channel can hold)
55///
56/// # Returns
57/// A tuple of (Sender, Receiver)
58///
59/// # Examples
60///
61/// ```
62/// use lite_sync::spsc::channel;
63/// use core::num::NonZeroUsize;
64///
65/// #[tokio::main]
66/// async fn main() {
67///     #[cfg(not(feature = "loom"))]
68///     {
69///         // Create a channel with capacity 32 and inline buffer size 8
70///         let (tx, rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
71///     
72///         tokio::spawn(async move {
73///             tx.send(42).await.unwrap();
74///         });
75///     
76///         let value = rx.recv().await.unwrap();
77///         assert_eq!(value, 42);
78///     }
79/// }
80/// ```
81///
82/// SPSC 通道创建函数
83///
84/// 创建指定容量的有界 SPSC 通道。
85///
86/// # 类型参数
87/// - `T`: 通过通道发送的消息类型
88/// - `N`: 内联缓冲区大小(在堆分配之前内联存储的元素数量)
89///
90/// # 参数
91/// - `capacity`: 通道容量(通道可以容纳的元素总数)
92///
93/// # 返回值
94/// 返回 (Sender, Receiver) 元组
95pub fn channel<T, const N: usize>(capacity: NonZeroUsize) -> (Sender<T, N>, Receiver<T, N>) {
96    let (producer, consumer) = new::<T, N>(capacity);
97
98    let inner = Arc::new(Inner::<T, N> {
99        producer: UnsafeCell::new(producer),
100        consumer: UnsafeCell::new(consumer),
101        closed: AtomicBool::new(false),
102        recv_notify: SingleWaiterNotify::new(),
103        send_notify: SingleWaiterNotify::new(),
104    });
105
106    let sender = Sender {
107        inner: inner.clone(),
108    };
109
110    let receiver = Receiver { inner };
111
112    (sender, receiver)
113}
114
115/// Shared internal state for SPSC channel
116///
117/// Contains both shared state and the ring buffer halves.
118/// Uses UnsafeCell for zero-cost interior mutability of Producer/Consumer.
119///
120/// SPSC 通道的共享内部状态
121///
122/// 包含共享状态和环形缓冲区的两端。
123/// 使用 UnsafeCell 实现 Producer/Consumer 的零成本内部可变性。
124struct Inner<T, const N: usize = 32> {
125    /// Producer (wrapped in UnsafeCell for zero-cost interior mutability)
126    ///
127    /// 生产者(用 UnsafeCell 包装以实现零成本内部可变性)
128    producer: UnsafeCell<Producer<T, N>>,
129
130    /// Consumer (wrapped in UnsafeCell for zero-cost interior mutability)
131    ///
132    /// 消费者(用 UnsafeCell 包装以实现零成本内部可变性)
133    consumer: UnsafeCell<Consumer<T, N>>,
134
135    /// Channel closed flag
136    ///
137    /// 通道关闭标志
138    closed: AtomicBool,
139
140    /// Notifier for receiver waiting (lightweight single-waiter)
141    ///
142    /// 接收者等待通知器(轻量级单等待者)
143    recv_notify: SingleWaiterNotify,
144
145    /// Notifier for sender waiting when buffer is full (lightweight single-waiter)
146    ///
147    /// 发送者等待通知器,当缓冲区满时使用(轻量级单等待者)
148    send_notify: SingleWaiterNotify,
149}
150
151// SAFETY: Inner<T> 可以在线程间安全共享的原因:
152// 1. Sender 和 Receiver 都不实现 Clone,确保单一所有权
153// 2. producer 只被唯一的 Sender 访问,不会有多线程竞争
154// 3. consumer 只被唯一的 Receiver 访问,不会有多线程竞争
155// 4. closed、recv_notify、send_notify 都已经是线程安全的
156// 5. Producer 和 Consumer 内部使用原子操作进行跨线程通信
157unsafe impl<T: Send, const N: usize> Sync for Inner<T, N> {}
158
159impl<T, const N: usize> core::fmt::Debug for Inner<T, N> {
160    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
161        f.debug_struct("Inner")
162            .field("closed", &self.closed.load(Ordering::Acquire))
163            .finish()
164    }
165}
166
167/// SPSC channel sender
168///
169/// SPSC 通道发送器
170pub struct Sender<T, const N: usize> {
171    inner: Arc<Inner<T, N>>,
172}
173
174impl<T, const N: usize> core::fmt::Debug for Sender<T, N> {
175    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
176        f.debug_struct("Sender")
177            .field("closed", &self.is_closed())
178            .field("len", &self.len())
179            .field("capacity", &self.capacity())
180            .finish()
181    }
182}
183
184/// SPSC channel receiver
185///
186/// SPSC 通道接收器
187pub struct Receiver<T, const N: usize> {
188    inner: Arc<Inner<T, N>>,
189}
190
191impl<T, const N: usize> core::fmt::Debug for Receiver<T, N> {
192    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
193        f.debug_struct("Receiver")
194            .field("is_empty", &self.is_empty())
195            .field("len", &self.len())
196            .field("capacity", &self.capacity())
197            .finish()
198    }
199}
200
201/// Draining iterator for the SPSC channel
202///
203/// SPSC 通道的消费迭代器
204///
205/// This iterator removes and returns messages from the channel until it's empty.
206///
207/// 此迭代器从通道中移除并返回消息,直到通道为空。
208///
209/// # Type Parameters
210/// - `T`: Message type
211/// - `N`: Inline buffer size
212///
213/// # 类型参数
214/// - `T`: 消息类型
215/// - `N`: 内联缓冲区大小
216pub struct Drain<'a, T, const N: usize> {
217    receiver: &'a mut Receiver<T, N>,
218}
219
220impl<'a, T, const N: usize> core::fmt::Debug for Drain<'a, T, N> {
221    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
222        f.debug_struct("Drain")
223            .field("len", &self.receiver.len())
224            .field("is_empty", &self.receiver.is_empty())
225            .finish()
226    }
227}
228
229impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
230    type Item = T;
231
232    #[inline]
233    fn next(&mut self) -> Option<Self::Item> {
234        self.receiver.try_recv().ok()
235    }
236
237    #[inline]
238    fn size_hint(&self) -> (usize, Option<usize>) {
239        let len = self.receiver.len();
240        (len, Some(len))
241    }
242}
243
244/// Send error type
245///
246/// 发送错误类型
247#[derive(Debug, Clone, Copy, PartialEq, Eq)]
248pub enum SendError<T> {
249    /// Channel is closed
250    ///
251    /// 通道已关闭
252    Closed(T),
253}
254
255/// Try-receive error type
256///
257/// 尝试接收错误类型
258#[derive(Debug, Clone, Copy, PartialEq, Eq)]
259pub enum TryRecvError {
260    /// Channel is empty
261    ///
262    /// 通道为空
263    Empty,
264
265    /// Channel is closed
266    ///
267    /// 通道已关闭
268    Closed,
269}
270
271/// Try-send error type
272///
273/// 尝试发送错误类型
274#[derive(Debug, Clone, Copy, PartialEq, Eq)]
275pub enum TrySendError<T> {
276    /// Buffer is full
277    ///
278    /// 缓冲区已满
279    Full(T),
280
281    /// Channel is closed
282    ///
283    /// 通道已关闭
284    Closed(T),
285}
286
287impl<T, const N: usize> Sender<T, N> {
288    /// Send a message to the channel (async, waits if buffer is full)
289    ///
290    /// # Errors
291    /// Returns `SendError::Closed` if the receiver has been dropped
292    ///
293    /// 向通道发送消息(异步,如果缓冲区满则等待)
294    ///
295    /// # 错误
296    /// 如果接收器已被丢弃,返回 `SendError::Closed`
297    pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
298        loop {
299            match self.try_send(value) {
300                Ok(()) => return Ok(()),
301                Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
302                Err(TrySendError::Full(v)) => {
303                    // Store the value to retry
304                    // 存储值以便重试
305                    value = v;
306
307                    // Wait for space to become available
308                    // 等待空间可用
309                    self.inner.send_notify.notified().await;
310
311                    // Check if channel was closed while waiting
312                    // 检查等待时通道是否已关闭
313                    if self.inner.closed.load(Ordering::Acquire) {
314                        return Err(SendError::Closed(value));
315                    }
316
317                    // Retry with the value in next loop iteration
318                    // 在下一次循环迭代中使用该值重试
319                }
320            }
321        }
322    }
323
324    /// Try to send a message without blocking
325    ///
326    /// # Errors
327    /// - Returns `TrySendError::Full` if the buffer is full
328    /// - Returns `TrySendError::Closed` if the receiver has been dropped
329    ///
330    /// 尝试非阻塞地发送消息
331    ///
332    /// # 错误
333    /// - 如果缓冲区满,返回 `TrySendError::Full`
334    /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
335    pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
336        // Check if channel is closed first
337        // 首先检查通道是否已关闭
338        if self.inner.closed.load(Ordering::Acquire) {
339            return Err(TrySendError::Closed(value));
340        }
341
342        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
343        // 不会有多个线程同时访问 producer
344        self.inner.producer.with_mut(|producer| unsafe {
345            match (*producer).push(value) {
346                Ok(()) => {
347                    // Successfully sent, notify receiver
348                    // 成功发送,通知接收者
349                    self.inner.recv_notify.notify_one();
350                    Ok(())
351                }
352                Err(PushError::Full(v)) => Err(TrySendError::Full(v)),
353            }
354        })
355    }
356
357    /// Check if the channel is closed
358    ///
359    /// 检查通道是否已关闭
360    #[inline]
361    pub fn is_closed(&self) -> bool {
362        self.inner.closed.load(Ordering::Acquire)
363    }
364
365    /// Get the capacity of the channel
366    ///
367    /// 获取通道的容量
368    #[inline]
369    pub fn capacity(&self) -> usize {
370        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
371        // capacity 只读取数据,不需要可变访问
372        self.inner
373            .producer
374            .with(|producer| unsafe { (*producer).capacity() })
375    }
376
377    /// Get the number of messages currently in the channel
378    ///
379    /// 获取通道中当前的消息数量
380    #[inline]
381    pub fn len(&self) -> usize {
382        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
383        // slots 只读取数据,不需要可变访问
384        self.inner
385            .producer
386            .with(|producer| unsafe { (*producer).slots() })
387    }
388
389    #[inline]
390    pub fn is_empty(&self) -> bool {
391        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
392        // is_empty 只读取数据,不需要可变访问
393        self.inner
394            .producer
395            .with(|producer| unsafe { (*producer).is_empty() })
396    }
397
398    /// Get the number of free slots in the channel
399    ///
400    /// 获取通道中的空闲空间数量
401    #[inline]
402    pub fn free_slots(&self) -> usize {
403        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
404        // free_slots 只读取数据,不需要可变访问
405        self.inner
406            .producer
407            .with(|producer| unsafe { (*producer).free_slots() })
408    }
409
410    /// Check if the channel is full
411    ///
412    /// 检查通道是否已满
413    #[inline]
414    pub fn is_full(&self) -> bool {
415        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
416        // is_full 只读取数据,不需要可变访问
417        self.inner
418            .producer
419            .with(|producer| unsafe { (*producer).is_full() })
420    }
421}
422
423impl<T: Copy, const N: usize> Sender<T, N> {
424    /// Try to send multiple values from a slice without blocking
425    ///
426    /// 尝试非阻塞地从切片发送多个值
427    ///
428    /// This method attempts to send as many elements as possible from the slice.
429    /// It returns the number of elements successfully sent.
430    ///
431    /// 此方法尝试从切片中发送尽可能多的元素。
432    /// 返回成功发送的元素数量。
433    ///
434    /// # Parameters
435    /// - `values`: Slice of values to send
436    ///
437    /// # Returns
438    /// Number of elements successfully sent (0 to values.len())
439    ///
440    /// # 参数
441    /// - `values`: 要发送的值的切片
442    ///
443    /// # 返回值
444    /// 成功发送的元素数量(0 到 values.len())
445    pub fn try_send_slice(&self, values: &[T]) -> usize {
446        // Check if channel is closed first
447        // 首先检查通道是否已关闭
448        if self.inner.closed.load(Ordering::Acquire) {
449            return 0;
450        }
451
452        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
453        // 不会有多个线程同时访问 producer
454        self.inner.producer.with_mut(|producer| unsafe {
455            let sent = (*producer).push_slice(values);
456
457            if sent > 0 {
458                // Successfully sent some messages, notify receiver
459                // 成功发送一些消息,通知接收者
460                self.inner.recv_notify.notify_one();
461            }
462
463            sent
464        })
465    }
466
467    /// Send multiple values from a slice (async, waits if buffer is full)
468    ///
469    /// 从切片发送多个值(异步,如果缓冲区满则等待)
470    ///
471    /// This method will send all elements from the slice, waiting if necessary
472    /// when the buffer becomes full. Returns the number of elements sent, or
473    /// an error if the channel is closed.
474    ///
475    /// 此方法将发送切片中的所有元素,必要时在缓冲区满时等待。
476    /// 返回发送的元素数量,如果通道关闭则返回错误。
477    ///
478    /// # Parameters
479    /// - `values`: Slice of values to send
480    ///
481    /// # Returns
482    /// - `Ok(usize)`: Number of elements successfully sent
483    /// - `Err(SendError)`: Channel is closed
484    ///
485    /// # 参数
486    /// - `values`: 要发送的值的切片
487    ///
488    /// # 返回值
489    /// - `Ok(usize)`: 成功发送的元素数量
490    /// - `Err(SendError)`: 通道已关闭
491    pub async fn send_slice(&self, values: &[T]) -> Result<usize, SendError<()>> {
492        let mut total_sent = 0;
493
494        while total_sent < values.len() {
495            // Check if channel is closed
496            // 检查通道是否已关闭
497            if self.inner.closed.load(Ordering::Acquire) {
498                return Err(SendError::Closed(()));
499            }
500
501            let sent = self.try_send_slice(&values[total_sent..]);
502            total_sent += sent;
503
504            if total_sent < values.len() {
505                // Need to wait for space
506                // 需要等待空间
507                self.inner.send_notify.notified().await;
508
509                // Check if channel was closed while waiting
510                // 检查等待时通道是否已关闭
511                if self.inner.closed.load(Ordering::Acquire) {
512                    return Err(SendError::Closed(()));
513                }
514            }
515        }
516
517        Ok(total_sent)
518    }
519}
520
521impl<T, const N: usize> Receiver<T, N> {
522    /// Receive a message from the channel (async, waits if buffer is empty)
523    ///
524    /// Returns `None` if the channel is closed and empty
525    ///
526    /// 从通道接收消息(异步,如果缓冲区空则等待)
527    ///
528    /// 如果通道已关闭且为空,返回 `None`
529    pub async fn recv(&self) -> Option<T> {
530        loop {
531            match self.try_recv() {
532                Ok(value) => return Some(value),
533                Err(TryRecvError::Closed) => return None,
534                Err(TryRecvError::Empty) => {
535                    // Check if channel is closed before waiting
536                    // 等待前检查通道是否已关闭
537                    if self.inner.closed.load(Ordering::Acquire) {
538                        // Double check if there are any remaining items
539                        // 再次检查是否有剩余项
540                        if let Ok(value) = self.try_recv() {
541                            return Some(value);
542                        }
543                        return None;
544                    }
545
546                    // Wait for data to become available
547                    // 等待数据可用
548                    self.inner.recv_notify.notified().await;
549                }
550            }
551        }
552    }
553
554    /// Try to receive a message without blocking
555    ///
556    /// # Errors
557    /// - Returns `TryRecvError::Empty` if the buffer is empty
558    /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
559    ///
560    /// 尝试非阻塞地接收消息
561    ///
562    /// # 错误
563    /// - 如果缓冲区空,返回 `TryRecvError::Empty`
564    /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
565    pub fn try_recv(&self) -> Result<T, TryRecvError> {
566        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
567        // 不会有多个线程同时访问 consumer
568        self.inner.consumer.with_mut(|consumer| unsafe {
569            match (*consumer).pop() {
570                Ok(value) => {
571                    // Successfully received, notify sender
572                    // 成功接收,通知发送者
573                    self.inner.send_notify.notify_one();
574                    Ok(value)
575                }
576                Err(PopError::Empty) => {
577                    // Check if channel is closed
578                    // 检查通道是否已关闭
579                    if self.inner.closed.load(Ordering::Acquire) {
580                        // Double-check for remaining items to avoid race condition
581                        // where sender drops after push but before we check closed flag
582                        // 再次检查是否有剩余项,以避免发送方在 push 后但在我们检查 closed 标志前 drop 的竞态条件
583                        match (*consumer).pop() {
584                            Ok(value) => {
585                                self.inner.send_notify.notify_one();
586                                Ok(value)
587                            }
588                            Err(PopError::Empty) => Err(TryRecvError::Closed),
589                        }
590                    } else {
591                        Err(TryRecvError::Empty)
592                    }
593                }
594            }
595        })
596    }
597
598    /// Check if the channel is empty
599    ///
600    /// 检查通道是否为空
601    #[inline]
602    pub fn is_empty(&self) -> bool {
603        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
604        // is_empty 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
605        self.inner
606            .consumer
607            .with(|consumer| unsafe { (*consumer).is_empty() })
608    }
609
610    /// Get the number of messages currently in the channel
611    ///
612    /// 获取通道中当前的消息数量
613    #[inline]
614    pub fn len(&self) -> usize {
615        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
616        // slots 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
617        self.inner
618            .consumer
619            .with(|consumer| unsafe { (*consumer).slots() })
620    }
621
622    /// Get the capacity of the channel
623    ///
624    /// 获取通道的容量
625    #[inline]
626    pub fn capacity(&self) -> usize {
627        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
628        // capacity 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
629        self.inner
630            .consumer
631            .with(|consumer| unsafe { (*consumer).buffer().capacity() })
632    }
633
634    /// Peek at the first message without removing it
635    ///
636    /// 查看第一个消息但不移除它
637    ///
638    /// # Returns
639    /// `Some(&T)` if there is a message, `None` if the channel is empty
640    ///
641    /// # 返回值
642    /// 如果有消息则返回 `Some(&T)`,如果通道为空则返回 `None`
643    ///
644    /// # Safety
645    /// The returned reference is valid only as long as no other operations
646    /// are performed on the Receiver that might modify the buffer.
647    ///
648    /// # 安全性
649    /// 返回的引用仅在未对 Receiver 执行可能修改缓冲区的其他操作时有效。
650    #[inline]
651    pub fn peek(&self) -> Option<&T> {
652        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
653        // peek 只读取数据,不需要可变访问
654        self.inner
655            .consumer
656            .with(|consumer| unsafe { core::mem::transmute((*consumer).peek()) })
657    }
658
659    /// Clear all messages from the channel
660    ///
661    /// 清空通道中的所有消息
662    ///
663    /// This method pops and drops all messages currently in the channel.
664    ///
665    /// 此方法弹出并 drop 通道中当前的所有消息。
666    pub fn clear(&mut self) {
667        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
668        // 我们有可变引用,因此可以安全地访问 consumer
669        self.inner
670            .consumer
671            .with_mut(|consumer| unsafe { (*consumer).clear() });
672
673        // Notify sender that space is available
674        // 通知发送者空间可用
675        self.inner.send_notify.notify_one();
676    }
677
678    /// Create a draining iterator
679    ///
680    /// 创建一个消费迭代器
681    ///
682    /// Returns an iterator that removes and returns messages from the channel.
683    /// The iterator will continue until the channel is empty.
684    ///
685    /// 返回一个从通道中移除并返回消息的迭代器。
686    /// 迭代器将持续运行直到通道为空。
687    ///
688    /// # Examples
689    ///
690    /// ```
691    /// use lite_sync::spsc::channel;
692    /// use core::num::NonZeroUsize;
693    ///
694    ///     # #[tokio::main]
695    ///     # async fn main() {
696    ///     #[cfg(not(feature = "loom"))]
697    ///     {
698    ///         let (tx, mut rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
699    ///         tx.try_send(1).unwrap();
700    ///         tx.try_send(2).unwrap();
701    ///         tx.try_send(3).unwrap();
702    ///
703    ///         let items: Vec<i32> = rx.drain().collect();
704    ///         assert_eq!(items, vec![1, 2, 3]);
705    ///         assert!(rx.is_empty());
706    ///     }
707    ///     # }
708    /// ```
709    #[inline]
710    pub fn drain(&mut self) -> Drain<'_, T, N> {
711        Drain { receiver: self }
712    }
713}
714
715impl<T: Copy, const N: usize> Receiver<T, N> {
716    /// Try to receive multiple values into a slice without blocking
717    ///
718    /// 尝试非阻塞地将多个值接收到切片
719    ///
720    /// This method attempts to receive as many messages as possible into the provided slice.
721    /// It returns the number of messages successfully received.
722    ///
723    /// 此方法尝试将尽可能多的消息接收到提供的切片中。
724    /// 返回成功接收的消息数量。
725    ///
726    /// # Parameters
727    /// - `dest`: Destination slice to receive values into
728    ///
729    /// # Returns
730    /// Number of messages successfully received (0 to dest.len())
731    ///
732    /// # 参数
733    /// - `dest`: 用于接收值的目标切片
734    ///
735    /// # 返回值
736    /// 成功接收的消息数量(0 到 dest.len())
737    pub fn try_recv_slice(&mut self, dest: &mut [T]) -> usize {
738        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
739        // 我们有可变引用,因此可以安全地访问 consumer
740        self.inner.consumer.with_mut(|consumer| unsafe {
741            let received = (*consumer).pop_slice(dest);
742
743            if received > 0 {
744                // Successfully received some messages, notify sender
745                // 成功接收一些消息,通知发送者
746                self.inner.send_notify.notify_one();
747            }
748
749            received
750        })
751    }
752
753    /// Receive multiple values into a slice (async, waits if buffer is empty)
754    ///
755    /// 将多个值接收到切片(异步,如果缓冲区空则等待)
756    ///
757    /// This method will fill the destination slice as much as possible, waiting if necessary
758    /// when the buffer becomes empty. Returns the number of messages received.
759    ///
760    /// 此方法将尽可能填充目标切片,必要时在缓冲区空时等待。
761    /// 返回接收的消息数量。
762    ///
763    /// # Parameters
764    /// - `dest`: Destination slice to receive values into
765    ///
766    /// # Returns
767    /// Number of messages successfully received (0 to dest.len())
768    /// Returns 0 if the channel is closed and empty
769    ///
770    /// # 参数
771    /// - `dest`: 用于接收值的目标切片
772    ///
773    /// # 返回值
774    /// 成功接收的消息数量(0 到 dest.len())
775    /// 如果通道已关闭且为空,返回 0
776    pub async fn recv_slice(&mut self, dest: &mut [T]) -> usize {
777        let mut total_received = 0;
778
779        while total_received < dest.len() {
780            let received = self.try_recv_slice(&mut dest[total_received..]);
781            total_received += received;
782
783            if total_received < dest.len() {
784                // Check if channel is closed
785                // 检查通道是否已关闭
786                if self.inner.closed.load(Ordering::Acquire) {
787                    // Channel closed, return what we have
788                    // 通道已关闭,返回我们已有的内容
789                    return total_received;
790                }
791
792                // Need to wait for data
793                // 需要等待数据
794                self.inner.recv_notify.notified().await;
795
796                // Check again after waking up
797                // 唤醒后再次检查
798                if self.inner.closed.load(Ordering::Acquire) {
799                    // Try one more time to get remaining messages
800                    // 再尝试一次获取剩余消息
801                    let final_received = self.try_recv_slice(&mut dest[total_received..]);
802                    total_received += final_received;
803                    return total_received;
804                }
805            }
806        }
807
808        total_received
809    }
810}
811
812impl<T, const N: usize> Drop for Receiver<T, N> {
813    fn drop(&mut self) {
814        // Mark channel as closed when receiver is dropped
815        // 当接收器被丢弃时标记通道为已关闭
816        self.inner.closed.store(true, Ordering::Release);
817
818        // Notify sender in case it's waiting
819        // 通知发送者以防它正在等待
820        self.inner.send_notify.notify_one();
821    }
822}
823
824impl<T, const N: usize> Drop for Sender<T, N> {
825    fn drop(&mut self) {
826        // Mark channel as closed when sender is dropped
827        // 当发送器被丢弃时标记通道为已关闭
828        self.inner.closed.store(true, Ordering::Release);
829
830        // Notify receiver in case it's waiting
831        // 通知接收器以防它正在等待
832        self.inner.recv_notify.notify_one();
833    }
834}
835
836#[cfg(all(test, not(feature = "loom")))]
837mod tests {
838    use super::*;
839
840    #[tokio::test]
841    async fn test_basic_send_recv() {
842        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
843
844        tx.send(1).await.unwrap();
845        tx.send(2).await.unwrap();
846        tx.send(3).await.unwrap();
847
848        assert_eq!(rx.recv().await, Some(1));
849        assert_eq!(rx.recv().await, Some(2));
850        assert_eq!(rx.recv().await, Some(3));
851    }
852
853    #[tokio::test]
854    async fn test_try_send_recv() {
855        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
856
857        tx.try_send(1).unwrap();
858        tx.try_send(2).unwrap();
859
860        assert_eq!(rx.try_recv().unwrap(), 1);
861        assert_eq!(rx.try_recv().unwrap(), 2);
862        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
863    }
864
865    #[tokio::test]
866    async fn test_channel_closed_on_sender_drop() {
867        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
868
869        tx.send(1).await.unwrap();
870        drop(tx);
871
872        assert_eq!(rx.recv().await, Some(1));
873        assert_eq!(rx.recv().await, None);
874    }
875
876    #[tokio::test]
877    async fn test_channel_closed_on_receiver_drop() {
878        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
879
880        drop(rx);
881
882        assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
883    }
884
885    #[tokio::test]
886    async fn test_cross_task_communication() {
887        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
888
889        let sender_handle = tokio::spawn(async move {
890            for i in 0..10 {
891                tx.send(i).await.unwrap();
892            }
893        });
894
895        let receiver_handle = tokio::spawn(async move {
896            let mut sum = 0;
897            while let Some(value) = rx.recv().await {
898                sum += value;
899            }
900            sum
901        });
902
903        sender_handle.await.unwrap();
904        let sum = receiver_handle.await.unwrap();
905        assert_eq!(sum, 45); // 0+1+2+...+9 = 45
906    }
907
908    #[tokio::test]
909    async fn test_backpressure() {
910        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
911
912        // Fill the buffer
913        tx.try_send(1).unwrap();
914        tx.try_send(2).unwrap();
915        tx.try_send(3).unwrap();
916        tx.try_send(4).unwrap();
917
918        // Buffer should be full now
919        assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
920
921        // This should block and then succeed when we consume
922        let send_handle = tokio::spawn(async move {
923            tx.send(5).await.unwrap();
924            tx.send(6).await.unwrap();
925        });
926
927        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
928
929        assert_eq!(rx.recv().await, Some(1));
930        assert_eq!(rx.recv().await, Some(2));
931        assert_eq!(rx.recv().await, Some(3));
932        assert_eq!(rx.recv().await, Some(4));
933        assert_eq!(rx.recv().await, Some(5));
934        assert_eq!(rx.recv().await, Some(6));
935
936        send_handle.await.unwrap();
937    }
938
939    #[tokio::test]
940    async fn test_capacity_and_len() {
941        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
942
943        assert_eq!(rx.capacity(), 8);
944        assert_eq!(rx.len(), 0);
945        assert!(rx.is_empty());
946
947        tx.try_send(1).unwrap();
948        tx.try_send(2).unwrap();
949
950        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
951        assert_eq!(rx.len(), 2);
952        assert!(!rx.is_empty());
953    }
954
955    // ==================== New API Tests ====================
956
957    #[tokio::test]
958    async fn test_sender_capacity_queries() {
959        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
960
961        assert_eq!(tx.capacity(), 8);
962        assert_eq!(tx.len(), 0);
963        assert_eq!(tx.free_slots(), 8);
964        assert!(!tx.is_full());
965
966        tx.try_send(1).unwrap();
967        tx.try_send(2).unwrap();
968        tx.try_send(3).unwrap();
969
970        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
971        assert_eq!(tx.len(), 3);
972        assert_eq!(tx.free_slots(), 5);
973        assert!(!tx.is_full());
974
975        // Fill the buffer
976        tx.try_send(4).unwrap();
977        tx.try_send(5).unwrap();
978        tx.try_send(6).unwrap();
979        tx.try_send(7).unwrap();
980        tx.try_send(8).unwrap();
981
982        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
983        assert_eq!(tx.len(), 8);
984        assert_eq!(tx.free_slots(), 0);
985        assert!(tx.is_full());
986
987        // Pop one and check again
988        rx.recv().await;
989        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
990
991        assert_eq!(tx.len(), 7);
992        assert_eq!(tx.free_slots(), 1);
993        assert!(!tx.is_full());
994    }
995
996    #[tokio::test]
997    async fn test_try_send_slice() {
998        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
999
1000        let data = [1, 2, 3, 4, 5];
1001        let sent = tx.try_send_slice(&data);
1002
1003        assert_eq!(sent, 5);
1004        assert_eq!(rx.len(), 5);
1005
1006        for i in 0..5 {
1007            assert_eq!(rx.recv().await.unwrap(), data[i]);
1008        }
1009    }
1010
1011    #[tokio::test]
1012    async fn test_try_send_slice_partial() {
1013        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1014
1015        // Fill with 5 elements, leaving room for 3
1016        let initial = [1, 2, 3, 4, 5];
1017        tx.try_send_slice(&initial);
1018
1019        // Try to send 10 more, should only send 3
1020        let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
1021        let sent = tx.try_send_slice(&more);
1022
1023        assert_eq!(sent, 3);
1024        assert_eq!(rx.len(), 8);
1025        assert!(tx.is_full());
1026
1027        // Verify values
1028        for i in 1..=8 {
1029            assert_eq!(rx.recv().await.unwrap(), i);
1030        }
1031    }
1032
1033    #[tokio::test]
1034    async fn test_send_slice() {
1035        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1036
1037        let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
1038        let result = tx.send_slice(&data).await;
1039
1040        assert_eq!(result.unwrap(), 10);
1041        assert_eq!(rx.len(), 10);
1042
1043        for i in 0..10 {
1044            assert_eq!(rx.recv().await.unwrap(), data[i]);
1045        }
1046    }
1047
1048    #[tokio::test]
1049    async fn test_send_slice_with_backpressure() {
1050        let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1051
1052        let data = [1, 2, 3, 4, 5, 6, 7, 8];
1053
1054        let send_handle = tokio::spawn(async move { tx.send_slice(&data).await.unwrap() });
1055
1056        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1057
1058        // Consume some messages to make room
1059        for i in 1..=4 {
1060            assert_eq!(rx.recv().await.unwrap(), i);
1061        }
1062
1063        let sent = send_handle.await.unwrap();
1064        assert_eq!(sent, 8);
1065
1066        // Verify remaining messages
1067        for i in 5..=8 {
1068            assert_eq!(rx.recv().await.unwrap(), i);
1069        }
1070    }
1071
1072    #[tokio::test]
1073    async fn test_peek() {
1074        let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1075
1076        // Peek empty buffer
1077        assert!(rx.peek().is_none());
1078
1079        tx.try_send(42).unwrap();
1080        tx.try_send(100).unwrap();
1081        tx.try_send(200).unwrap();
1082
1083        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1084
1085        // Peek should return first element without removing it
1086        assert_eq!(rx.peek(), Some(&42));
1087        assert_eq!(rx.peek(), Some(&42)); // Peek again, should be same
1088        assert_eq!(rx.len(), 3); // Length unchanged
1089    }
1090
1091    #[tokio::test]
1092    async fn test_peek_after_recv() {
1093        let (tx, rx) = channel::<String, 32>(NonZeroUsize::new(8).unwrap());
1094
1095        tx.try_send("first".to_string()).unwrap();
1096        tx.try_send("second".to_string()).unwrap();
1097        tx.try_send("third".to_string()).unwrap();
1098
1099        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1100
1101        assert_eq!(rx.peek(), Some(&"first".to_string()));
1102        rx.recv().await.unwrap();
1103
1104        assert_eq!(rx.peek(), Some(&"second".to_string()));
1105        rx.recv().await.unwrap();
1106
1107        assert_eq!(rx.peek(), Some(&"third".to_string()));
1108        rx.recv().await.unwrap();
1109
1110        assert!(rx.peek().is_none());
1111    }
1112
1113    #[tokio::test]
1114    async fn test_clear() {
1115        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1116
1117        for i in 0..10 {
1118            tx.try_send(i).unwrap();
1119        }
1120
1121        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1122        assert_eq!(rx.len(), 10);
1123
1124        rx.clear();
1125
1126        assert_eq!(rx.len(), 0);
1127        assert!(rx.is_empty());
1128    }
1129
1130    #[tokio::test]
1131    async fn test_clear_with_drop() {
1132        use std::sync::Arc;
1133        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1134
1135        #[derive(Debug)]
1136        struct DropCounter {
1137            counter: Arc<AtomicUsize>,
1138        }
1139
1140        impl Drop for DropCounter {
1141            fn drop(&mut self) {
1142                self.counter.fetch_add(1, AtomicOrdering::SeqCst);
1143            }
1144        }
1145
1146        let counter = Arc::new(AtomicUsize::new(0));
1147
1148        {
1149            let (tx, mut rx) = channel::<DropCounter, 32>(NonZeroUsize::new(16).unwrap());
1150
1151            for _ in 0..8 {
1152                tx.try_send(DropCounter {
1153                    counter: counter.clone(),
1154                })
1155                .unwrap();
1156            }
1157
1158            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1159            assert_eq!(counter.load(AtomicOrdering::SeqCst), 0);
1160
1161            rx.clear();
1162
1163            assert_eq!(counter.load(AtomicOrdering::SeqCst), 8);
1164        }
1165    }
1166
1167    #[tokio::test]
1168    async fn test_drain() {
1169        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1170
1171        for i in 0..10 {
1172            tx.try_send(i).unwrap();
1173        }
1174
1175        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1176
1177        let collected: Vec<i32> = rx.drain().collect();
1178
1179        assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1180        assert!(rx.is_empty());
1181    }
1182
1183    #[tokio::test]
1184    async fn test_drain_empty() {
1185        let (_tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1186
1187        let collected: Vec<i32> = rx.drain().collect();
1188
1189        assert!(collected.is_empty());
1190    }
1191
1192    #[tokio::test]
1193    async fn test_drain_size_hint() {
1194        let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1195
1196        for i in 0..5 {
1197            tx.try_send(i).unwrap();
1198        }
1199
1200        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1201
1202        let mut drain = rx.drain();
1203
1204        assert_eq!(drain.size_hint(), (5, Some(5)));
1205
1206        drain.next();
1207        assert_eq!(drain.size_hint(), (4, Some(4)));
1208
1209        drain.next();
1210        assert_eq!(drain.size_hint(), (3, Some(3)));
1211    }
1212
1213    #[tokio::test]
1214    async fn test_try_recv_slice() {
1215        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1216
1217        // Send some data
1218        for i in 0..10 {
1219            tx.try_send(i).unwrap();
1220        }
1221
1222        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1223
1224        let mut dest = [0u32; 5];
1225        let received = rx.try_recv_slice(&mut dest);
1226
1227        assert_eq!(received, 5);
1228        assert_eq!(dest, [0, 1, 2, 3, 4]);
1229        assert_eq!(rx.len(), 5);
1230    }
1231
1232    #[tokio::test]
1233    async fn test_try_recv_slice_partial() {
1234        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1235
1236        tx.try_send(1).unwrap();
1237        tx.try_send(2).unwrap();
1238        tx.try_send(3).unwrap();
1239
1240        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1241
1242        let mut dest = [0u32; 10];
1243        let received = rx.try_recv_slice(&mut dest);
1244
1245        assert_eq!(received, 3);
1246        assert_eq!(&dest[0..3], &[1, 2, 3]);
1247        assert!(rx.is_empty());
1248    }
1249
1250    #[tokio::test]
1251    async fn test_recv_slice() {
1252        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1253
1254        for i in 1..=10 {
1255            tx.try_send(i).unwrap();
1256        }
1257
1258        let mut dest = [0u32; 10];
1259        let received = rx.recv_slice(&mut dest).await;
1260
1261        assert_eq!(received, 10);
1262        assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1263        assert!(rx.is_empty());
1264    }
1265
1266    #[tokio::test]
1267    async fn test_recv_slice_with_wait() {
1268        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1269
1270        let recv_handle = tokio::spawn(async move {
1271            let mut dest = [0u32; 8];
1272            let received = rx.recv_slice(&mut dest).await;
1273            (received, dest)
1274        });
1275
1276        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1277
1278        // Send data gradually
1279        for i in 1..=8 {
1280            tx.send(i).await.unwrap();
1281            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1282        }
1283
1284        let (received, dest) = recv_handle.await.unwrap();
1285        assert_eq!(received, 8);
1286        assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8]);
1287    }
1288
1289    #[tokio::test]
1290    async fn test_recv_slice_channel_closed() {
1291        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1292
1293        tx.try_send(1).unwrap();
1294        tx.try_send(2).unwrap();
1295        tx.try_send(3).unwrap();
1296
1297        drop(tx); // Close the channel
1298
1299        let mut dest = [0u32; 10];
1300        let received = rx.recv_slice(&mut dest).await;
1301
1302        // Should receive the 3 available messages, then stop
1303        assert_eq!(received, 3);
1304        assert_eq!(&dest[0..3], &[1, 2, 3]);
1305    }
1306
1307    #[tokio::test]
1308    async fn test_combined_new_apis() {
1309        let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1310
1311        // Batch send
1312        let data = [1, 2, 3, 4, 5];
1313        tx.try_send_slice(&data);
1314
1315        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1316
1317        assert_eq!(tx.len(), 5);
1318        assert_eq!(rx.len(), 5);
1319        assert_eq!(rx.capacity(), 16);
1320
1321        // Peek
1322        assert_eq!(rx.peek(), Some(&1));
1323
1324        // Batch receive
1325        let mut dest = [0u32; 3];
1326        rx.try_recv_slice(&mut dest);
1327        assert_eq!(dest, [1, 2, 3]);
1328
1329        assert_eq!(rx.len(), 2);
1330        assert_eq!(tx.free_slots(), 14);
1331
1332        // Drain remaining
1333        let remaining: Vec<u32> = rx.drain().collect();
1334        assert_eq!(remaining, vec![4, 5]);
1335
1336        assert!(rx.is_empty());
1337        assert!(!tx.is_full());
1338    }
1339}