lite_sync/spsc.rs
1/// High-performance async SPSC (Single Producer Single Consumer) channel
2///
3/// Built on top of `smallring` - a high-performance ring buffer with inline storage support.
4/// Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5/// The `N` const generic parameter allows specifying inline buffer size for zero-allocation small channels.
6///
7/// 高性能异步 SPSC(单生产者单消费者)通道
8///
9/// 基于 `smallring` 构建 - 一个支持内联存储的高性能环形缓冲区。
10/// 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
11/// `N` 常量泛型参数允许指定内联缓冲区大小,实现小容量通道的零分配。
12///
13/// # 安全性说明 (Safety Notes)
14///
15/// 本实现使用 `UnsafeCell` 来提供零成本内部可变性,而不是 `Mutex`。
16/// 这是安全的,基于以下保证:
17///
18/// 1. **单一所有权**:`Sender` 和 `Receiver` 都不实现 `Clone`,确保每个通道只有一个发送者和一个接收者
19/// 2. **访问隔离**:`Producer` 只被唯一的 `Sender` 访问,`Consumer` 只被唯一的 `Receiver` 访问
20/// 3. **无数据竞争**:由于单一所有权,不会有多个线程同时访问同一个 `Producer` 或 `Consumer`
21/// 4. **原子通信**:`Producer` 和 `Consumer` 内部使用原子操作进行跨线程通信
22/// 5. **类型系统保证**:通过类型系统强制 SPSC 语义,防止误用为 MPMC
23///
24/// 这种设计实现了零同步开销,完全消除了 `Mutex` 的性能损失。
25///
26/// # Safety Guarantees
27///
28/// This implementation uses `UnsafeCell` for zero-cost interior mutability instead of `Mutex`.
29/// This is safe based on the following guarantees:
30///
31/// 1. **Single Ownership**: Neither `Sender` nor `Receiver` implements `Clone`, ensuring only one sender and one receiver per channel
32/// 2. **Access Isolation**: `Producer` is only accessed by the unique `Sender`, `Consumer` only by the unique `Receiver`
33/// 3. **No Data Races**: Due to single ownership, there's no concurrent access to the same `Producer` or `Consumer`
34/// 4. **Atomic Communication**: `Producer` and `Consumer` use atomic operations internally for cross-thread communication
35/// 5. **Type System Enforcement**: SPSC semantics are enforced by the type system, preventing misuse as MPMC
36///
37/// This design achieves zero synchronization overhead, completely eliminating `Mutex` performance costs.
38use std::cell::UnsafeCell;
39use std::num::NonZeroUsize;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::sync::Arc;
42use super::notify::SingleWaiterNotify;
43
44use smallring::spsc::{Producer, Consumer, new, PushError, PopError};
45
46/// SPSC channel creation function
47///
48/// Creates a bounded SPSC channel with the specified capacity.
49///
50/// # Type Parameters
51/// - `T`: The type of messages to be sent through the channel
52/// - `N`: The size of the inline buffer (number of elements stored inline before heap allocation)
53///
54/// # Parameters
55/// - `capacity`: Channel capacity (total number of elements the channel can hold)
56///
57/// # Returns
58/// A tuple of (Sender, Receiver)
59///
60/// # Examples
61///
62/// ```
63/// use lite_sync::spsc::channel;
64/// use std::num::NonZeroUsize;
65///
66/// #[tokio::main]
67/// async fn main() {
68/// // Create a channel with capacity 32 and inline buffer size 8
69/// let (tx, rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
70///
71/// tokio::spawn(async move {
72/// tx.send(42).await.unwrap();
73/// });
74///
75/// let value = rx.recv().await.unwrap();
76/// assert_eq!(value, 42);
77/// }
78/// ```
79///
80/// SPSC 通道创建函数
81///
82/// 创建指定容量的有界 SPSC 通道。
83///
84/// # 类型参数
85/// - `T`: 通过通道发送的消息类型
86/// - `N`: 内联缓冲区大小(在堆分配之前内联存储的元素数量)
87///
88/// # 参数
89/// - `capacity`: 通道容量(通道可以容纳的元素总数)
90///
91/// # 返回值
92/// 返回 (Sender, Receiver) 元组
93pub fn channel<T, const N: usize>(capacity: NonZeroUsize) -> (Sender<T, N>, Receiver<T, N>) {
94 let (producer, consumer) = new::<T, N>(capacity);
95
96 let inner = Arc::new(Inner::<T, N> {
97 producer: UnsafeCell::new(producer),
98 consumer: UnsafeCell::new(consumer),
99 closed: AtomicBool::new(false),
100 recv_notify: SingleWaiterNotify::new(),
101 send_notify: SingleWaiterNotify::new(),
102 });
103
104 let sender = Sender {
105 inner: inner.clone(),
106 };
107
108 let receiver = Receiver {
109 inner,
110 };
111
112 (sender, receiver)
113}
114
115/// Shared internal state for SPSC channel
116///
117/// Contains both shared state and the ring buffer halves.
118/// Uses UnsafeCell for zero-cost interior mutability of Producer/Consumer.
119///
120/// SPSC 通道的共享内部状态
121///
122/// 包含共享状态和环形缓冲区的两端。
123/// 使用 UnsafeCell 实现 Producer/Consumer 的零成本内部可变性。
124struct Inner<T, const N: usize = 32> {
125 /// Producer (wrapped in UnsafeCell for zero-cost interior mutability)
126 ///
127 /// 生产者(用 UnsafeCell 包装以实现零成本内部可变性)
128 producer: UnsafeCell<Producer<T, N>>,
129
130 /// Consumer (wrapped in UnsafeCell for zero-cost interior mutability)
131 ///
132 /// 消费者(用 UnsafeCell 包装以实现零成本内部可变性)
133 consumer: UnsafeCell<Consumer<T, N>>,
134
135 /// Channel closed flag
136 ///
137 /// 通道关闭标志
138 closed: AtomicBool,
139
140 /// Notifier for receiver waiting (lightweight single-waiter)
141 ///
142 /// 接收者等待通知器(轻量级单等待者)
143 recv_notify: SingleWaiterNotify,
144
145 /// Notifier for sender waiting when buffer is full (lightweight single-waiter)
146 ///
147 /// 发送者等待通知器,当缓冲区满时使用(轻量级单等待者)
148 send_notify: SingleWaiterNotify,
149}
150
151// SAFETY: Inner<T> 可以在线程间安全共享的原因:
152// 1. Sender 和 Receiver 都不实现 Clone,确保单一所有权
153// 2. producer 只被唯一的 Sender 访问,不会有多线程竞争
154// 3. consumer 只被唯一的 Receiver 访问,不会有多线程竞争
155// 4. closed、recv_notify、send_notify 都已经是线程安全的
156// 5. Producer 和 Consumer 内部使用原子操作进行跨线程通信
157unsafe impl<T: Send, const N: usize> Sync for Inner<T, N> {}
158
159impl<T, const N: usize> std::fmt::Debug for Inner<T, N> {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 f.debug_struct("Inner")
162 .field("closed", &self.closed.load(Ordering::Acquire))
163 .finish()
164 }
165}
166
167/// SPSC channel sender
168///
169/// SPSC 通道发送器
170pub struct Sender<T, const N: usize> {
171 inner: Arc<Inner<T, N>>,
172}
173
174impl<T, const N: usize> std::fmt::Debug for Sender<T, N> {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 f.debug_struct("Sender")
177 .field("closed", &self.is_closed())
178 .field("len", &self.len())
179 .field("capacity", &self.capacity())
180 .finish()
181 }
182}
183
184/// SPSC channel receiver
185///
186/// SPSC 通道接收器
187pub struct Receiver<T, const N: usize> {
188 inner: Arc<Inner<T, N>>,
189}
190
191impl<T, const N: usize> std::fmt::Debug for Receiver<T, N> {
192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 f.debug_struct("Receiver")
194 .field("is_empty", &self.is_empty())
195 .field("len", &self.len())
196 .field("capacity", &self.capacity())
197 .finish()
198 }
199}
200
201/// Draining iterator for the SPSC channel
202///
203/// SPSC 通道的消费迭代器
204///
205/// This iterator removes and returns messages from the channel until it's empty.
206///
207/// 此迭代器从通道中移除并返回消息,直到通道为空。
208///
209/// # Type Parameters
210/// - `T`: Message type
211/// - `N`: Inline buffer size
212///
213/// # 类型参数
214/// - `T`: 消息类型
215/// - `N`: 内联缓冲区大小
216pub struct Drain<'a, T, const N: usize> {
217 receiver: &'a mut Receiver<T, N>,
218}
219
220impl<'a, T, const N: usize> std::fmt::Debug for Drain<'a, T, N> {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 f.debug_struct("Drain")
223 .field("len", &self.receiver.len())
224 .field("is_empty", &self.receiver.is_empty())
225 .finish()
226 }
227}
228
229impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
230 type Item = T;
231
232 #[inline]
233 fn next(&mut self) -> Option<Self::Item> {
234 self.receiver.try_recv().ok()
235 }
236
237 #[inline]
238 fn size_hint(&self) -> (usize, Option<usize>) {
239 let len = self.receiver.len();
240 (len, Some(len))
241 }
242}
243
244/// Send error type
245///
246/// 发送错误类型
247#[derive(Debug, Clone, Copy, PartialEq, Eq)]
248pub enum SendError<T> {
249 /// Channel is closed
250 ///
251 /// 通道已关闭
252 Closed(T),
253}
254
255/// Try-receive error type
256///
257/// 尝试接收错误类型
258#[derive(Debug, Clone, Copy, PartialEq, Eq)]
259pub enum TryRecvError {
260 /// Channel is empty
261 ///
262 /// 通道为空
263 Empty,
264
265 /// Channel is closed
266 ///
267 /// 通道已关闭
268 Closed,
269}
270
271/// Try-send error type
272///
273/// 尝试发送错误类型
274#[derive(Debug, Clone, Copy, PartialEq, Eq)]
275pub enum TrySendError<T> {
276 /// Buffer is full
277 ///
278 /// 缓冲区已满
279 Full(T),
280
281 /// Channel is closed
282 ///
283 /// 通道已关闭
284 Closed(T),
285}
286
287impl<T, const N: usize> Sender<T, N> {
288 /// Send a message to the channel (async, waits if buffer is full)
289 ///
290 /// # Errors
291 /// Returns `SendError::Closed` if the receiver has been dropped
292 ///
293 /// 向通道发送消息(异步,如果缓冲区满则等待)
294 ///
295 /// # 错误
296 /// 如果接收器已被丢弃,返回 `SendError::Closed`
297 pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
298 loop {
299 match self.try_send(value) {
300 Ok(()) => return Ok(()),
301 Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
302 Err(TrySendError::Full(v)) => {
303 // Store the value to retry
304 // 存储值以便重试
305 value = v;
306
307 // Wait for space to become available
308 // 等待空间可用
309 self.inner.send_notify.notified().await;
310
311 // Check if channel was closed while waiting
312 // 检查等待时通道是否已关闭
313 if self.inner.closed.load(Ordering::Acquire) {
314 return Err(SendError::Closed(value));
315 }
316
317 // Retry with the value in next loop iteration
318 // 在下一次循环迭代中使用该值重试
319 }
320 }
321 }
322 }
323
324 /// Try to send a message without blocking
325 ///
326 /// # Errors
327 /// - Returns `TrySendError::Full` if the buffer is full
328 /// - Returns `TrySendError::Closed` if the receiver has been dropped
329 ///
330 /// 尝试非阻塞地发送消息
331 ///
332 /// # 错误
333 /// - 如果缓冲区满,返回 `TrySendError::Full`
334 /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
335 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
336 // Check if channel is closed first
337 // 首先检查通道是否已关闭
338 if self.inner.closed.load(Ordering::Acquire) {
339 return Err(TrySendError::Closed(value));
340 }
341
342 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
343 // 不会有多个线程同时访问 producer
344 let producer = unsafe { &mut *self.inner.producer.get() };
345
346 match producer.push(value) {
347 Ok(()) => {
348 // Successfully sent, notify receiver
349 // 成功发送,通知接收者
350 self.inner.recv_notify.notify_one();
351 Ok(())
352 }
353 Err(PushError::Full(v)) => {
354 Err(TrySendError::Full(v))
355 }
356 }
357 }
358
359 /// Check if the channel is closed
360 ///
361 /// 检查通道是否已关闭
362 #[inline]
363 pub fn is_closed(&self) -> bool {
364 self.inner.closed.load(Ordering::Acquire)
365 }
366
367 /// Get the capacity of the channel
368 ///
369 /// 获取通道的容量
370 #[inline]
371 pub fn capacity(&self) -> usize {
372 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
373 // capacity 只读取数据,不需要可变访问
374 let producer = unsafe { &*self.inner.producer.get() };
375 producer.capacity()
376 }
377
378 /// Get the number of messages currently in the channel
379 ///
380 /// 获取通道中当前的消息数量
381 #[inline]
382 pub fn len(&self) -> usize {
383 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
384 // slots 只读取数据,不需要可变访问
385 let producer = unsafe { &*self.inner.producer.get() };
386 producer.slots()
387 }
388
389 #[inline]
390 pub fn is_empty(&self) -> bool {
391 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
392 // is_empty 只读取数据,不需要可变访问
393 let producer = unsafe { &*self.inner.producer.get() };
394 producer.is_empty()
395 }
396
397 /// Get the number of free slots in the channel
398 ///
399 /// 获取通道中的空闲空间数量
400 #[inline]
401 pub fn free_slots(&self) -> usize {
402 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
403 // free_slots 只读取数据,不需要可变访问
404 let producer = unsafe { &*self.inner.producer.get() };
405 producer.free_slots()
406 }
407
408 /// Check if the channel is full
409 ///
410 /// 检查通道是否已满
411 #[inline]
412 pub fn is_full(&self) -> bool {
413 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
414 // is_full 只读取数据,不需要可变访问
415 let producer = unsafe { &*self.inner.producer.get() };
416 producer.is_full()
417 }
418}
419
420impl<T: Copy, const N: usize> Sender<T, N> {
421 /// Try to send multiple values from a slice without blocking
422 ///
423 /// 尝试非阻塞地从切片发送多个值
424 ///
425 /// This method attempts to send as many elements as possible from the slice.
426 /// It returns the number of elements successfully sent.
427 ///
428 /// 此方法尝试从切片中发送尽可能多的元素。
429 /// 返回成功发送的元素数量。
430 ///
431 /// # Parameters
432 /// - `values`: Slice of values to send
433 ///
434 /// # Returns
435 /// Number of elements successfully sent (0 to values.len())
436 ///
437 /// # 参数
438 /// - `values`: 要发送的值的切片
439 ///
440 /// # 返回值
441 /// 成功发送的元素数量(0 到 values.len())
442 pub fn try_send_slice(&self, values: &[T]) -> usize {
443 // Check if channel is closed first
444 // 首先检查通道是否已关闭
445 if self.inner.closed.load(Ordering::Acquire) {
446 return 0;
447 }
448
449 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
450 // 不会有多个线程同时访问 producer
451 let producer = unsafe { &mut *self.inner.producer.get() };
452
453 let sent = producer.push_slice(values);
454
455 if sent > 0 {
456 // Successfully sent some messages, notify receiver
457 // 成功发送一些消息,通知接收者
458 self.inner.recv_notify.notify_one();
459 }
460
461 sent
462 }
463
464 /// Send multiple values from a slice (async, waits if buffer is full)
465 ///
466 /// 从切片发送多个值(异步,如果缓冲区满则等待)
467 ///
468 /// This method will send all elements from the slice, waiting if necessary
469 /// when the buffer becomes full. Returns the number of elements sent, or
470 /// an error if the channel is closed.
471 ///
472 /// 此方法将发送切片中的所有元素,必要时在缓冲区满时等待。
473 /// 返回发送的元素数量,如果通道关闭则返回错误。
474 ///
475 /// # Parameters
476 /// - `values`: Slice of values to send
477 ///
478 /// # Returns
479 /// - `Ok(usize)`: Number of elements successfully sent
480 /// - `Err(SendError)`: Channel is closed
481 ///
482 /// # 参数
483 /// - `values`: 要发送的值的切片
484 ///
485 /// # 返回值
486 /// - `Ok(usize)`: 成功发送的元素数量
487 /// - `Err(SendError)`: 通道已关闭
488 pub async fn send_slice(&self, values: &[T]) -> Result<usize, SendError<()>> {
489 let mut total_sent = 0;
490
491 while total_sent < values.len() {
492 // Check if channel is closed
493 // 检查通道是否已关闭
494 if self.inner.closed.load(Ordering::Acquire) {
495 return Err(SendError::Closed(()));
496 }
497
498 let sent = self.try_send_slice(&values[total_sent..]);
499 total_sent += sent;
500
501 if total_sent < values.len() {
502 // Need to wait for space
503 // 需要等待空间
504 self.inner.send_notify.notified().await;
505
506 // Check if channel was closed while waiting
507 // 检查等待时通道是否已关闭
508 if self.inner.closed.load(Ordering::Acquire) {
509 return Err(SendError::Closed(()));
510 }
511 }
512 }
513
514 Ok(total_sent)
515 }
516}
517
518impl<T, const N: usize> Receiver<T, N> {
519 /// Receive a message from the channel (async, waits if buffer is empty)
520 ///
521 /// Returns `None` if the channel is closed and empty
522 ///
523 /// 从通道接收消息(异步,如果缓冲区空则等待)
524 ///
525 /// 如果通道已关闭且为空,返回 `None`
526 pub async fn recv(&self) -> Option<T> {
527 loop {
528 match self.try_recv() {
529 Ok(value) => return Some(value),
530 Err(TryRecvError::Closed) => return None,
531 Err(TryRecvError::Empty) => {
532 // Check if channel is closed before waiting
533 // 等待前检查通道是否已关闭
534 if self.inner.closed.load(Ordering::Acquire) {
535 // Double check if there are any remaining items
536 // 再次检查是否有剩余项
537 if let Ok(value) = self.try_recv() {
538 return Some(value);
539 }
540 return None;
541 }
542
543 // Wait for data to become available
544 // 等待数据可用
545 self.inner.recv_notify.notified().await;
546 }
547 }
548 }
549 }
550
551 /// Try to receive a message without blocking
552 ///
553 /// # Errors
554 /// - Returns `TryRecvError::Empty` if the buffer is empty
555 /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
556 ///
557 /// 尝试非阻塞地接收消息
558 ///
559 /// # 错误
560 /// - 如果缓冲区空,返回 `TryRecvError::Empty`
561 /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
562 pub fn try_recv(&self) -> Result<T, TryRecvError> {
563 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
564 // 不会有多个线程同时访问 consumer
565 let consumer = unsafe { &mut *self.inner.consumer.get() };
566
567 match consumer.pop() {
568 Ok(value) => {
569 // Successfully received, notify sender
570 // 成功接收,通知发送者
571 self.inner.send_notify.notify_one();
572 Ok(value)
573 }
574 Err(PopError::Empty) => {
575 // Check if channel is closed
576 // 检查通道是否已关闭
577 if self.inner.closed.load(Ordering::Acquire) {
578 // Double-check for remaining items to avoid race condition
579 // where sender drops after push but before we check closed flag
580 // 再次检查是否有剩余项,以避免发送方在 push 后但在我们检查 closed 标志前 drop 的竞态条件
581 match consumer.pop() {
582 Ok(value) => {
583 self.inner.send_notify.notify_one();
584 Ok(value)
585 }
586 Err(PopError::Empty) => {
587 Err(TryRecvError::Closed)
588 }
589 }
590 } else {
591 Err(TryRecvError::Empty)
592 }
593 }
594 }
595 }
596
597 /// Check if the channel is empty
598 ///
599 /// 检查通道是否为空
600 #[inline]
601 pub fn is_empty(&self) -> bool {
602 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
603 // is_empty 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
604 let consumer = unsafe { &*self.inner.consumer.get() };
605 consumer.is_empty()
606 }
607
608 /// Get the number of messages currently in the channel
609 ///
610 /// 获取通道中当前的消息数量
611 #[inline]
612 pub fn len(&self) -> usize {
613 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
614 // slots 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
615 let consumer = unsafe { &*self.inner.consumer.get() };
616 consumer.slots()
617 }
618
619 /// Get the capacity of the channel
620 ///
621 /// 获取通道的容量
622 #[inline]
623 pub fn capacity(&self) -> usize {
624 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
625 // capacity 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
626 let consumer = unsafe { &*self.inner.consumer.get() };
627 consumer.buffer().capacity()
628 }
629
630 /// Peek at the first message without removing it
631 ///
632 /// 查看第一个消息但不移除它
633 ///
634 /// # Returns
635 /// `Some(&T)` if there is a message, `None` if the channel is empty
636 ///
637 /// # 返回值
638 /// 如果有消息则返回 `Some(&T)`,如果通道为空则返回 `None`
639 ///
640 /// # Safety
641 /// The returned reference is valid only as long as no other operations
642 /// are performed on the Receiver that might modify the buffer.
643 ///
644 /// # 安全性
645 /// 返回的引用仅在未对 Receiver 执行可能修改缓冲区的其他操作时有效。
646 #[inline]
647 pub fn peek(&self) -> Option<&T> {
648 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
649 // peek 只读取数据,不需要可变访问
650 let consumer = unsafe { &*self.inner.consumer.get() };
651 consumer.peek()
652 }
653
654 /// Clear all messages from the channel
655 ///
656 /// 清空通道中的所有消息
657 ///
658 /// This method pops and drops all messages currently in the channel.
659 ///
660 /// 此方法弹出并 drop 通道中当前的所有消息。
661 pub fn clear(&mut self) {
662 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
663 // 我们有可变引用,因此可以安全地访问 consumer
664 let consumer = unsafe { &mut *self.inner.consumer.get() };
665 consumer.clear();
666
667 // Notify sender that space is available
668 // 通知发送者空间可用
669 self.inner.send_notify.notify_one();
670 }
671
672 /// Create a draining iterator
673 ///
674 /// 创建一个消费迭代器
675 ///
676 /// Returns an iterator that removes and returns messages from the channel.
677 /// The iterator will continue until the channel is empty.
678 ///
679 /// 返回一个从通道中移除并返回消息的迭代器。
680 /// 迭代器将持续运行直到通道为空。
681 ///
682 /// # Examples
683 ///
684 /// ```
685 /// use lite_sync::spsc::channel;
686 /// use std::num::NonZeroUsize;
687 ///
688 /// # #[tokio::main]
689 /// # async fn main() {
690 /// let (tx, mut rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
691 /// tx.try_send(1).unwrap();
692 /// tx.try_send(2).unwrap();
693 /// tx.try_send(3).unwrap();
694 ///
695 /// let items: Vec<i32> = rx.drain().collect();
696 /// assert_eq!(items, vec![1, 2, 3]);
697 /// assert!(rx.is_empty());
698 /// # }
699 /// ```
700 #[inline]
701 pub fn drain(&mut self) -> Drain<'_, T, N> {
702 Drain { receiver: self }
703 }
704}
705
706impl<T: Copy, const N: usize> Receiver<T, N> {
707 /// Try to receive multiple values into a slice without blocking
708 ///
709 /// 尝试非阻塞地将多个值接收到切片
710 ///
711 /// This method attempts to receive as many messages as possible into the provided slice.
712 /// It returns the number of messages successfully received.
713 ///
714 /// 此方法尝试将尽可能多的消息接收到提供的切片中。
715 /// 返回成功接收的消息数量。
716 ///
717 /// # Parameters
718 /// - `dest`: Destination slice to receive values into
719 ///
720 /// # Returns
721 /// Number of messages successfully received (0 to dest.len())
722 ///
723 /// # 参数
724 /// - `dest`: 用于接收值的目标切片
725 ///
726 /// # 返回值
727 /// 成功接收的消息数量(0 到 dest.len())
728 pub fn try_recv_slice(&mut self, dest: &mut [T]) -> usize {
729 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
730 // 我们有可变引用,因此可以安全地访问 consumer
731 let consumer = unsafe { &mut *self.inner.consumer.get() };
732
733 let received = consumer.pop_slice(dest);
734
735 if received > 0 {
736 // Successfully received some messages, notify sender
737 // 成功接收一些消息,通知发送者
738 self.inner.send_notify.notify_one();
739 }
740
741 received
742 }
743
744 /// Receive multiple values into a slice (async, waits if buffer is empty)
745 ///
746 /// 将多个值接收到切片(异步,如果缓冲区空则等待)
747 ///
748 /// This method will fill the destination slice as much as possible, waiting if necessary
749 /// when the buffer becomes empty. Returns the number of messages received.
750 ///
751 /// 此方法将尽可能填充目标切片,必要时在缓冲区空时等待。
752 /// 返回接收的消息数量。
753 ///
754 /// # Parameters
755 /// - `dest`: Destination slice to receive values into
756 ///
757 /// # Returns
758 /// Number of messages successfully received (0 to dest.len())
759 /// Returns 0 if the channel is closed and empty
760 ///
761 /// # 参数
762 /// - `dest`: 用于接收值的目标切片
763 ///
764 /// # 返回值
765 /// 成功接收的消息数量(0 到 dest.len())
766 /// 如果通道已关闭且为空,返回 0
767 pub async fn recv_slice(&mut self, dest: &mut [T]) -> usize {
768 let mut total_received = 0;
769
770 while total_received < dest.len() {
771 let received = self.try_recv_slice(&mut dest[total_received..]);
772 total_received += received;
773
774 if total_received < dest.len() {
775 // Check if channel is closed
776 // 检查通道是否已关闭
777 if self.inner.closed.load(Ordering::Acquire) {
778 // Channel closed, return what we have
779 // 通道已关闭,返回我们已有的内容
780 return total_received;
781 }
782
783 // Need to wait for data
784 // 需要等待数据
785 self.inner.recv_notify.notified().await;
786
787 // Check again after waking up
788 // 唤醒后再次检查
789 if self.inner.closed.load(Ordering::Acquire) {
790 // Try one more time to get remaining messages
791 // 再尝试一次获取剩余消息
792 let final_received = self.try_recv_slice(&mut dest[total_received..]);
793 total_received += final_received;
794 return total_received;
795 }
796 }
797 }
798
799 total_received
800 }
801}
802
803impl<T, const N: usize> Drop for Receiver<T, N> {
804 fn drop(&mut self) {
805 // Mark channel as closed when receiver is dropped
806 // 当接收器被丢弃时标记通道为已关闭
807 self.inner.closed.store(true, Ordering::Release);
808
809 // Notify sender in case it's waiting
810 // 通知发送者以防它正在等待
811 self.inner.send_notify.notify_one();
812 }
813}
814
815impl<T, const N: usize> Drop for Sender<T, N> {
816 fn drop(&mut self) {
817 // Mark channel as closed when sender is dropped
818 // 当发送器被丢弃时标记通道为已关闭
819 self.inner.closed.store(true, Ordering::Release);
820
821 // Notify receiver in case it's waiting
822 // 通知接收器以防它正在等待
823 self.inner.recv_notify.notify_one();
824 }
825}
826
827#[cfg(test)]
828mod tests {
829 use super::*;
830
831 #[tokio::test]
832 async fn test_basic_send_recv() {
833 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
834
835 tx.send(1).await.unwrap();
836 tx.send(2).await.unwrap();
837 tx.send(3).await.unwrap();
838
839 assert_eq!(rx.recv().await, Some(1));
840 assert_eq!(rx.recv().await, Some(2));
841 assert_eq!(rx.recv().await, Some(3));
842 }
843
844 #[tokio::test]
845 async fn test_try_send_recv() {
846 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
847
848 tx.try_send(1).unwrap();
849 tx.try_send(2).unwrap();
850
851 assert_eq!(rx.try_recv().unwrap(), 1);
852 assert_eq!(rx.try_recv().unwrap(), 2);
853 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
854 }
855
856 #[tokio::test]
857 async fn test_channel_closed_on_sender_drop() {
858 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
859
860 tx.send(1).await.unwrap();
861 drop(tx);
862
863 assert_eq!(rx.recv().await, Some(1));
864 assert_eq!(rx.recv().await, None);
865 }
866
867 #[tokio::test]
868 async fn test_channel_closed_on_receiver_drop() {
869 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
870
871 drop(rx);
872
873 assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
874 }
875
876 #[tokio::test]
877 async fn test_cross_task_communication() {
878 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
879
880 let sender_handle = tokio::spawn(async move {
881 for i in 0..10 {
882 tx.send(i).await.unwrap();
883 }
884 });
885
886 let receiver_handle = tokio::spawn(async move {
887 let mut sum = 0;
888 while let Some(value) = rx.recv().await {
889 sum += value;
890 }
891 sum
892 });
893
894 sender_handle.await.unwrap();
895 let sum = receiver_handle.await.unwrap();
896 assert_eq!(sum, 45); // 0+1+2+...+9 = 45
897 }
898
899 #[tokio::test]
900 async fn test_backpressure() {
901 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
902
903 // Fill the buffer
904 tx.try_send(1).unwrap();
905 tx.try_send(2).unwrap();
906 tx.try_send(3).unwrap();
907 tx.try_send(4).unwrap();
908
909 // Buffer should be full now
910 assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
911
912 // This should block and then succeed when we consume
913 let send_handle = tokio::spawn(async move {
914 tx.send(5).await.unwrap();
915 tx.send(6).await.unwrap();
916 });
917
918 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
919
920 assert_eq!(rx.recv().await, Some(1));
921 assert_eq!(rx.recv().await, Some(2));
922 assert_eq!(rx.recv().await, Some(3));
923 assert_eq!(rx.recv().await, Some(4));
924 assert_eq!(rx.recv().await, Some(5));
925 assert_eq!(rx.recv().await, Some(6));
926
927 send_handle.await.unwrap();
928 }
929
930 #[tokio::test]
931 async fn test_capacity_and_len() {
932 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
933
934 assert_eq!(rx.capacity(), 8);
935 assert_eq!(rx.len(), 0);
936 assert!(rx.is_empty());
937
938 tx.try_send(1).unwrap();
939 tx.try_send(2).unwrap();
940
941 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
942 assert_eq!(rx.len(), 2);
943 assert!(!rx.is_empty());
944 }
945
946 // ==================== New API Tests ====================
947
948 #[tokio::test]
949 async fn test_sender_capacity_queries() {
950 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
951
952 assert_eq!(tx.capacity(), 8);
953 assert_eq!(tx.len(), 0);
954 assert_eq!(tx.free_slots(), 8);
955 assert!(!tx.is_full());
956
957 tx.try_send(1).unwrap();
958 tx.try_send(2).unwrap();
959 tx.try_send(3).unwrap();
960
961 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
962 assert_eq!(tx.len(), 3);
963 assert_eq!(tx.free_slots(), 5);
964 assert!(!tx.is_full());
965
966 // Fill the buffer
967 tx.try_send(4).unwrap();
968 tx.try_send(5).unwrap();
969 tx.try_send(6).unwrap();
970 tx.try_send(7).unwrap();
971 tx.try_send(8).unwrap();
972
973 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
974 assert_eq!(tx.len(), 8);
975 assert_eq!(tx.free_slots(), 0);
976 assert!(tx.is_full());
977
978 // Pop one and check again
979 rx.recv().await;
980 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
981
982 assert_eq!(tx.len(), 7);
983 assert_eq!(tx.free_slots(), 1);
984 assert!(!tx.is_full());
985 }
986
987 #[tokio::test]
988 async fn test_try_send_slice() {
989 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
990
991 let data = [1, 2, 3, 4, 5];
992 let sent = tx.try_send_slice(&data);
993
994 assert_eq!(sent, 5);
995 assert_eq!(rx.len(), 5);
996
997 for i in 0..5 {
998 assert_eq!(rx.recv().await.unwrap(), data[i]);
999 }
1000 }
1001
1002 #[tokio::test]
1003 async fn test_try_send_slice_partial() {
1004 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1005
1006 // Fill with 5 elements, leaving room for 3
1007 let initial = [1, 2, 3, 4, 5];
1008 tx.try_send_slice(&initial);
1009
1010 // Try to send 10 more, should only send 3
1011 let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
1012 let sent = tx.try_send_slice(&more);
1013
1014 assert_eq!(sent, 3);
1015 assert_eq!(rx.len(), 8);
1016 assert!(tx.is_full());
1017
1018 // Verify values
1019 for i in 1..=8 {
1020 assert_eq!(rx.recv().await.unwrap(), i);
1021 }
1022 }
1023
1024 #[tokio::test]
1025 async fn test_send_slice() {
1026 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1027
1028 let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
1029 let result = tx.send_slice(&data).await;
1030
1031 assert_eq!(result.unwrap(), 10);
1032 assert_eq!(rx.len(), 10);
1033
1034 for i in 0..10 {
1035 assert_eq!(rx.recv().await.unwrap(), data[i]);
1036 }
1037 }
1038
1039 #[tokio::test]
1040 async fn test_send_slice_with_backpressure() {
1041 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1042
1043 let data = [1, 2, 3, 4, 5, 6, 7, 8];
1044
1045 let send_handle = tokio::spawn(async move {
1046 tx.send_slice(&data).await.unwrap()
1047 });
1048
1049 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1050
1051 // Consume some messages to make room
1052 for i in 1..=4 {
1053 assert_eq!(rx.recv().await.unwrap(), i);
1054 }
1055
1056 let sent = send_handle.await.unwrap();
1057 assert_eq!(sent, 8);
1058
1059 // Verify remaining messages
1060 for i in 5..=8 {
1061 assert_eq!(rx.recv().await.unwrap(), i);
1062 }
1063 }
1064
1065 #[tokio::test]
1066 async fn test_peek() {
1067 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1068
1069 // Peek empty buffer
1070 assert!(rx.peek().is_none());
1071
1072 tx.try_send(42).unwrap();
1073 tx.try_send(100).unwrap();
1074 tx.try_send(200).unwrap();
1075
1076 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1077
1078 // Peek should return first element without removing it
1079 assert_eq!(rx.peek(), Some(&42));
1080 assert_eq!(rx.peek(), Some(&42)); // Peek again, should be same
1081 assert_eq!(rx.len(), 3); // Length unchanged
1082 }
1083
1084 #[tokio::test]
1085 async fn test_peek_after_recv() {
1086 let (tx, rx) = channel::<String, 32>(NonZeroUsize::new(8).unwrap());
1087
1088 tx.try_send("first".to_string()).unwrap();
1089 tx.try_send("second".to_string()).unwrap();
1090 tx.try_send("third".to_string()).unwrap();
1091
1092 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1093
1094 assert_eq!(rx.peek(), Some(&"first".to_string()));
1095 rx.recv().await.unwrap();
1096
1097 assert_eq!(rx.peek(), Some(&"second".to_string()));
1098 rx.recv().await.unwrap();
1099
1100 assert_eq!(rx.peek(), Some(&"third".to_string()));
1101 rx.recv().await.unwrap();
1102
1103 assert!(rx.peek().is_none());
1104 }
1105
1106 #[tokio::test]
1107 async fn test_clear() {
1108 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1109
1110 for i in 0..10 {
1111 tx.try_send(i).unwrap();
1112 }
1113
1114 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1115 assert_eq!(rx.len(), 10);
1116
1117 rx.clear();
1118
1119 assert_eq!(rx.len(), 0);
1120 assert!(rx.is_empty());
1121 }
1122
1123 #[tokio::test]
1124 async fn test_clear_with_drop() {
1125 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1126 use std::sync::Arc;
1127
1128 #[derive(Debug)]
1129 struct DropCounter {
1130 counter: Arc<AtomicUsize>,
1131 }
1132
1133 impl Drop for DropCounter {
1134 fn drop(&mut self) {
1135 self.counter.fetch_add(1, AtomicOrdering::SeqCst);
1136 }
1137 }
1138
1139 let counter = Arc::new(AtomicUsize::new(0));
1140
1141 {
1142 let (tx, mut rx) = channel::<DropCounter, 32>(NonZeroUsize::new(16).unwrap());
1143
1144 for _ in 0..8 {
1145 tx.try_send(DropCounter { counter: counter.clone() }).unwrap();
1146 }
1147
1148 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1149 assert_eq!(counter.load(AtomicOrdering::SeqCst), 0);
1150
1151 rx.clear();
1152
1153 assert_eq!(counter.load(AtomicOrdering::SeqCst), 8);
1154 }
1155 }
1156
1157 #[tokio::test]
1158 async fn test_drain() {
1159 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1160
1161 for i in 0..10 {
1162 tx.try_send(i).unwrap();
1163 }
1164
1165 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1166
1167 let collected: Vec<i32> = rx.drain().collect();
1168
1169 assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1170 assert!(rx.is_empty());
1171 }
1172
1173 #[tokio::test]
1174 async fn test_drain_empty() {
1175 let (_tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1176
1177 let collected: Vec<i32> = rx.drain().collect();
1178
1179 assert!(collected.is_empty());
1180 }
1181
1182 #[tokio::test]
1183 async fn test_drain_size_hint() {
1184 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1185
1186 for i in 0..5 {
1187 tx.try_send(i).unwrap();
1188 }
1189
1190 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1191
1192 let mut drain = rx.drain();
1193
1194 assert_eq!(drain.size_hint(), (5, Some(5)));
1195
1196 drain.next();
1197 assert_eq!(drain.size_hint(), (4, Some(4)));
1198
1199 drain.next();
1200 assert_eq!(drain.size_hint(), (3, Some(3)));
1201 }
1202
1203 #[tokio::test]
1204 async fn test_try_recv_slice() {
1205 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1206
1207 // Send some data
1208 for i in 0..10 {
1209 tx.try_send(i).unwrap();
1210 }
1211
1212 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1213
1214 let mut dest = [0u32; 5];
1215 let received = rx.try_recv_slice(&mut dest);
1216
1217 assert_eq!(received, 5);
1218 assert_eq!(dest, [0, 1, 2, 3, 4]);
1219 assert_eq!(rx.len(), 5);
1220 }
1221
1222 #[tokio::test]
1223 async fn test_try_recv_slice_partial() {
1224 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1225
1226 tx.try_send(1).unwrap();
1227 tx.try_send(2).unwrap();
1228 tx.try_send(3).unwrap();
1229
1230 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1231
1232 let mut dest = [0u32; 10];
1233 let received = rx.try_recv_slice(&mut dest);
1234
1235 assert_eq!(received, 3);
1236 assert_eq!(&dest[0..3], &[1, 2, 3]);
1237 assert!(rx.is_empty());
1238 }
1239
1240 #[tokio::test]
1241 async fn test_recv_slice() {
1242 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1243
1244 for i in 1..=10 {
1245 tx.try_send(i).unwrap();
1246 }
1247
1248 let mut dest = [0u32; 10];
1249 let received = rx.recv_slice(&mut dest).await;
1250
1251 assert_eq!(received, 10);
1252 assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1253 assert!(rx.is_empty());
1254 }
1255
1256 #[tokio::test]
1257 async fn test_recv_slice_with_wait() {
1258 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1259
1260 let recv_handle = tokio::spawn(async move {
1261 let mut dest = [0u32; 8];
1262 let received = rx.recv_slice(&mut dest).await;
1263 (received, dest)
1264 });
1265
1266 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1267
1268 // Send data gradually
1269 for i in 1..=8 {
1270 tx.send(i).await.unwrap();
1271 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1272 }
1273
1274 let (received, dest) = recv_handle.await.unwrap();
1275 assert_eq!(received, 8);
1276 assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8]);
1277 }
1278
1279 #[tokio::test]
1280 async fn test_recv_slice_channel_closed() {
1281 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1282
1283 tx.try_send(1).unwrap();
1284 tx.try_send(2).unwrap();
1285 tx.try_send(3).unwrap();
1286
1287 drop(tx); // Close the channel
1288
1289 let mut dest = [0u32; 10];
1290 let received = rx.recv_slice(&mut dest).await;
1291
1292 // Should receive the 3 available messages, then stop
1293 assert_eq!(received, 3);
1294 assert_eq!(&dest[0..3], &[1, 2, 3]);
1295 }
1296
1297 #[tokio::test]
1298 async fn test_combined_new_apis() {
1299 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1300
1301 // Batch send
1302 let data = [1, 2, 3, 4, 5];
1303 tx.try_send_slice(&data);
1304
1305 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1306
1307 assert_eq!(tx.len(), 5);
1308 assert_eq!(rx.len(), 5);
1309 assert_eq!(rx.capacity(), 16);
1310
1311 // Peek
1312 assert_eq!(rx.peek(), Some(&1));
1313
1314 // Batch receive
1315 let mut dest = [0u32; 3];
1316 rx.try_recv_slice(&mut dest);
1317 assert_eq!(dest, [1, 2, 3]);
1318
1319 assert_eq!(rx.len(), 2);
1320 assert_eq!(tx.free_slots(), 14);
1321
1322 // Drain remaining
1323 let remaining: Vec<u32> = rx.drain().collect();
1324 assert_eq!(remaining, vec![4, 5]);
1325
1326 assert!(rx.is_empty());
1327 assert!(!tx.is_full());
1328 }
1329}