lite_sync/spsc.rs
1/// High-performance async SPSC (Single Producer Single Consumer) channel
2///
3/// Built on top of `smallring` - a high-performance ring buffer with inline storage support.
4/// Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5/// The `N` const generic parameter allows specifying inline buffer size for zero-allocation small channels.
6///
7/// 高性能异步 SPSC(单生产者单消费者)通道
8///
9/// 基于 `smallring` 构建 - 一个支持内联存储的高性能环形缓冲区。
10/// 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
11/// `N` 常量泛型参数允许指定内联缓冲区大小,实现小容量通道的零分配。
12///
13/// # 安全性说明 (Safety Notes)
14///
15/// 本实现使用 `UnsafeCell` 来提供零成本内部可变性,而不是 `Mutex`。
16/// 这是安全的,基于以下保证:
17///
18/// 1. **单一所有权**:`Sender` 和 `Receiver` 都不实现 `Clone`,确保每个通道只有一个发送者和一个接收者
19/// 2. **访问隔离**:`Producer` 只被唯一的 `Sender` 访问,`Consumer` 只被唯一的 `Receiver` 访问
20/// 3. **无数据竞争**:由于单一所有权,不会有多个线程同时访问同一个 `Producer` 或 `Consumer`
21/// 4. **原子通信**:`Producer` 和 `Consumer` 内部使用原子操作进行跨线程通信
22/// 5. **类型系统保证**:通过类型系统强制 SPSC 语义,防止误用为 MPMC
23///
24/// 这种设计实现了零同步开销,完全消除了 `Mutex` 的性能损失。
25///
26/// # Safety Guarantees
27///
28/// This implementation uses `UnsafeCell` for zero-cost interior mutability instead of `Mutex`.
29/// This is safe based on the following guarantees:
30///
31/// 1. **Single Ownership**: Neither `Sender` nor `Receiver` implements `Clone`, ensuring only one sender and one receiver per channel
32/// 2. **Access Isolation**: `Producer` is only accessed by the unique `Sender`, `Consumer` only by the unique `Receiver`
33/// 3. **No Data Races**: Due to single ownership, there's no concurrent access to the same `Producer` or `Consumer`
34/// 4. **Atomic Communication**: `Producer` and `Consumer` use atomic operations internally for cross-thread communication
35/// 5. **Type System Enforcement**: SPSC semantics are enforced by the type system, preventing misuse as MPMC
36///
37/// This design achieves zero synchronization overhead, completely eliminating `Mutex` performance costs.
38use std::cell::UnsafeCell;
39use std::num::NonZeroUsize;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::sync::Arc;
42use super::notify::SingleWaiterNotify;
43
44use smallring::spsc::{Producer, Consumer, new, PushError, PopError};
45
46/// SPSC channel creation function
47///
48/// Creates a bounded SPSC channel with the specified capacity.
49///
50/// # Type Parameters
51/// - `T`: The type of messages to be sent through the channel
52/// - `N`: The size of the inline buffer (number of elements stored inline before heap allocation)
53///
54/// # Parameters
55/// - `capacity`: Channel capacity (total number of elements the channel can hold)
56///
57/// # Returns
58/// A tuple of (Sender, Receiver)
59///
60/// # Examples
61///
62/// ```
63/// use lite_sync::spsc::channel;
64/// use std::num::NonZeroUsize;
65///
66/// #[tokio::main]
67/// async fn main() {
68/// // Create a channel with capacity 32 and inline buffer size 8
69/// let (tx, rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
70///
71/// tokio::spawn(async move {
72/// tx.send(42).await.unwrap();
73/// });
74///
75/// let value = rx.recv().await.unwrap();
76/// assert_eq!(value, 42);
77/// }
78/// ```
79///
80/// SPSC 通道创建函数
81///
82/// 创建指定容量的有界 SPSC 通道。
83///
84/// # 类型参数
85/// - `T`: 通过通道发送的消息类型
86/// - `N`: 内联缓冲区大小(在堆分配之前内联存储的元素数量)
87///
88/// # 参数
89/// - `capacity`: 通道容量(通道可以容纳的元素总数)
90///
91/// # 返回值
92/// 返回 (Sender, Receiver) 元组
93pub fn channel<T, const N: usize>(capacity: NonZeroUsize) -> (Sender<T, N>, Receiver<T, N>) {
94 let (producer, consumer) = new::<T, N>(capacity);
95
96 let inner = Arc::new(Inner::<T, N> {
97 producer: UnsafeCell::new(producer),
98 consumer: UnsafeCell::new(consumer),
99 closed: AtomicBool::new(false),
100 recv_notify: SingleWaiterNotify::new(),
101 send_notify: SingleWaiterNotify::new(),
102 });
103
104 let sender = Sender {
105 inner: inner.clone(),
106 };
107
108 let receiver = Receiver {
109 inner,
110 };
111
112 (sender, receiver)
113}
114
115/// Shared internal state for SPSC channel
116///
117/// Contains both shared state and the ring buffer halves.
118/// Uses UnsafeCell for zero-cost interior mutability of Producer/Consumer.
119///
120/// SPSC 通道的共享内部状态
121///
122/// 包含共享状态和环形缓冲区的两端。
123/// 使用 UnsafeCell 实现 Producer/Consumer 的零成本内部可变性。
124struct Inner<T, const N: usize = 32> {
125 /// Producer (wrapped in UnsafeCell for zero-cost interior mutability)
126 ///
127 /// 生产者(用 UnsafeCell 包装以实现零成本内部可变性)
128 producer: UnsafeCell<Producer<T, N>>,
129
130 /// Consumer (wrapped in UnsafeCell for zero-cost interior mutability)
131 ///
132 /// 消费者(用 UnsafeCell 包装以实现零成本内部可变性)
133 consumer: UnsafeCell<Consumer<T, N>>,
134
135 /// Channel closed flag
136 ///
137 /// 通道关闭标志
138 closed: AtomicBool,
139
140 /// Notifier for receiver waiting (lightweight single-waiter)
141 ///
142 /// 接收者等待通知器(轻量级单等待者)
143 recv_notify: SingleWaiterNotify,
144
145 /// Notifier for sender waiting when buffer is full (lightweight single-waiter)
146 ///
147 /// 发送者等待通知器,当缓冲区满时使用(轻量级单等待者)
148 send_notify: SingleWaiterNotify,
149}
150
151// SAFETY: Inner<T> 可以在线程间安全共享的原因:
152// 1. Sender 和 Receiver 都不实现 Clone,确保单一所有权
153// 2. producer 只被唯一的 Sender 访问,不会有多线程竞争
154// 3. consumer 只被唯一的 Receiver 访问,不会有多线程竞争
155// 4. closed、recv_notify、send_notify 都已经是线程安全的
156// 5. Producer 和 Consumer 内部使用原子操作进行跨线程通信
157unsafe impl<T: Send, const N: usize> Sync for Inner<T, N> {}
158
159/// SPSC channel sender
160///
161/// SPSC 通道发送器
162pub struct Sender<T, const N: usize> {
163 inner: Arc<Inner<T, N>>,
164}
165
166/// SPSC channel receiver
167///
168/// SPSC 通道接收器
169pub struct Receiver<T, const N: usize> {
170 inner: Arc<Inner<T, N>>,
171}
172
173/// Draining iterator for the SPSC channel
174///
175/// SPSC 通道的消费迭代器
176///
177/// This iterator removes and returns messages from the channel until it's empty.
178///
179/// 此迭代器从通道中移除并返回消息,直到通道为空。
180///
181/// # Type Parameters
182/// - `T`: Message type
183/// - `N`: Inline buffer size
184///
185/// # 类型参数
186/// - `T`: 消息类型
187/// - `N`: 内联缓冲区大小
188pub struct Drain<'a, T, const N: usize> {
189 receiver: &'a mut Receiver<T, N>,
190}
191
192impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
193 type Item = T;
194
195 #[inline]
196 fn next(&mut self) -> Option<Self::Item> {
197 self.receiver.try_recv().ok()
198 }
199
200 #[inline]
201 fn size_hint(&self) -> (usize, Option<usize>) {
202 let len = self.receiver.len();
203 (len, Some(len))
204 }
205}
206
207/// Send error type
208///
209/// 发送错误类型
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
211pub enum SendError<T> {
212 /// Channel is closed
213 ///
214 /// 通道已关闭
215 Closed(T),
216}
217
218/// Try-receive error type
219///
220/// 尝试接收错误类型
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum TryRecvError {
223 /// Channel is empty
224 ///
225 /// 通道为空
226 Empty,
227
228 /// Channel is closed
229 ///
230 /// 通道已关闭
231 Closed,
232}
233
234/// Try-send error type
235///
236/// 尝试发送错误类型
237#[derive(Debug, Clone, Copy, PartialEq, Eq)]
238pub enum TrySendError<T> {
239 /// Buffer is full
240 ///
241 /// 缓冲区已满
242 Full(T),
243
244 /// Channel is closed
245 ///
246 /// 通道已关闭
247 Closed(T),
248}
249
250impl<T, const N: usize> Sender<T, N> {
251 /// Send a message to the channel (async, waits if buffer is full)
252 ///
253 /// # Errors
254 /// Returns `SendError::Closed` if the receiver has been dropped
255 ///
256 /// 向通道发送消息(异步,如果缓冲区满则等待)
257 ///
258 /// # 错误
259 /// 如果接收器已被丢弃,返回 `SendError::Closed`
260 pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
261 loop {
262 match self.try_send(value) {
263 Ok(()) => return Ok(()),
264 Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
265 Err(TrySendError::Full(v)) => {
266 // Store the value to retry
267 // 存储值以便重试
268 value = v;
269
270 // Wait for space to become available
271 // 等待空间可用
272 self.inner.send_notify.notified().await;
273
274 // Check if channel was closed while waiting
275 // 检查等待时通道是否已关闭
276 if self.inner.closed.load(Ordering::Acquire) {
277 return Err(SendError::Closed(value));
278 }
279
280 // Retry with the value in next loop iteration
281 // 在下一次循环迭代中使用该值重试
282 }
283 }
284 }
285 }
286
287 /// Try to send a message without blocking
288 ///
289 /// # Errors
290 /// - Returns `TrySendError::Full` if the buffer is full
291 /// - Returns `TrySendError::Closed` if the receiver has been dropped
292 ///
293 /// 尝试非阻塞地发送消息
294 ///
295 /// # 错误
296 /// - 如果缓冲区满,返回 `TrySendError::Full`
297 /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
298 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
299 // Check if channel is closed first
300 // 首先检查通道是否已关闭
301 if self.inner.closed.load(Ordering::Acquire) {
302 return Err(TrySendError::Closed(value));
303 }
304
305 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
306 // 不会有多个线程同时访问 producer
307 let producer = unsafe { &mut *self.inner.producer.get() };
308
309 match producer.push(value) {
310 Ok(()) => {
311 // Successfully sent, notify receiver
312 // 成功发送,通知接收者
313 self.inner.recv_notify.notify_one();
314 Ok(())
315 }
316 Err(PushError::Full(v)) => {
317 Err(TrySendError::Full(v))
318 }
319 }
320 }
321
322 /// Check if the channel is closed
323 ///
324 /// 检查通道是否已关闭
325 #[inline]
326 pub fn is_closed(&self) -> bool {
327 self.inner.closed.load(Ordering::Acquire)
328 }
329
330 /// Get the capacity of the channel
331 ///
332 /// 获取通道的容量
333 #[inline]
334 pub fn capacity(&self) -> usize {
335 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
336 // capacity 只读取数据,不需要可变访问
337 let producer = unsafe { &*self.inner.producer.get() };
338 producer.capacity()
339 }
340
341 /// Get the number of messages currently in the channel
342 ///
343 /// 获取通道中当前的消息数量
344 #[inline]
345 pub fn len(&self) -> usize {
346 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
347 // slots 只读取数据,不需要可变访问
348 let producer = unsafe { &*self.inner.producer.get() };
349 producer.slots()
350 }
351
352 #[inline]
353 pub fn is_empty(&self) -> bool {
354 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
355 // is_empty 只读取数据,不需要可变访问
356 let producer = unsafe { &*self.inner.producer.get() };
357 producer.is_empty()
358 }
359
360 /// Get the number of free slots in the channel
361 ///
362 /// 获取通道中的空闲空间数量
363 #[inline]
364 pub fn free_slots(&self) -> usize {
365 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
366 // free_slots 只读取数据,不需要可变访问
367 let producer = unsafe { &*self.inner.producer.get() };
368 producer.free_slots()
369 }
370
371 /// Check if the channel is full
372 ///
373 /// 检查通道是否已满
374 #[inline]
375 pub fn is_full(&self) -> bool {
376 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
377 // is_full 只读取数据,不需要可变访问
378 let producer = unsafe { &*self.inner.producer.get() };
379 producer.is_full()
380 }
381}
382
383impl<T: Copy, const N: usize> Sender<T, N> {
384 /// Try to send multiple values from a slice without blocking
385 ///
386 /// 尝试非阻塞地从切片发送多个值
387 ///
388 /// This method attempts to send as many elements as possible from the slice.
389 /// It returns the number of elements successfully sent.
390 ///
391 /// 此方法尝试从切片中发送尽可能多的元素。
392 /// 返回成功发送的元素数量。
393 ///
394 /// # Parameters
395 /// - `values`: Slice of values to send
396 ///
397 /// # Returns
398 /// Number of elements successfully sent (0 to values.len())
399 ///
400 /// # 参数
401 /// - `values`: 要发送的值的切片
402 ///
403 /// # 返回值
404 /// 成功发送的元素数量(0 到 values.len())
405 pub fn try_send_slice(&self, values: &[T]) -> usize {
406 // Check if channel is closed first
407 // 首先检查通道是否已关闭
408 if self.inner.closed.load(Ordering::Acquire) {
409 return 0;
410 }
411
412 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
413 // 不会有多个线程同时访问 producer
414 let producer = unsafe { &mut *self.inner.producer.get() };
415
416 let sent = producer.push_slice(values);
417
418 if sent > 0 {
419 // Successfully sent some messages, notify receiver
420 // 成功发送一些消息,通知接收者
421 self.inner.recv_notify.notify_one();
422 }
423
424 sent
425 }
426
427 /// Send multiple values from a slice (async, waits if buffer is full)
428 ///
429 /// 从切片发送多个值(异步,如果缓冲区满则等待)
430 ///
431 /// This method will send all elements from the slice, waiting if necessary
432 /// when the buffer becomes full. Returns the number of elements sent, or
433 /// an error if the channel is closed.
434 ///
435 /// 此方法将发送切片中的所有元素,必要时在缓冲区满时等待。
436 /// 返回发送的元素数量,如果通道关闭则返回错误。
437 ///
438 /// # Parameters
439 /// - `values`: Slice of values to send
440 ///
441 /// # Returns
442 /// - `Ok(usize)`: Number of elements successfully sent
443 /// - `Err(SendError)`: Channel is closed
444 ///
445 /// # 参数
446 /// - `values`: 要发送的值的切片
447 ///
448 /// # 返回值
449 /// - `Ok(usize)`: 成功发送的元素数量
450 /// - `Err(SendError)`: 通道已关闭
451 pub async fn send_slice(&self, values: &[T]) -> Result<usize, SendError<()>> {
452 let mut total_sent = 0;
453
454 while total_sent < values.len() {
455 // Check if channel is closed
456 // 检查通道是否已关闭
457 if self.inner.closed.load(Ordering::Acquire) {
458 return Err(SendError::Closed(()));
459 }
460
461 let sent = self.try_send_slice(&values[total_sent..]);
462 total_sent += sent;
463
464 if total_sent < values.len() {
465 // Need to wait for space
466 // 需要等待空间
467 self.inner.send_notify.notified().await;
468
469 // Check if channel was closed while waiting
470 // 检查等待时通道是否已关闭
471 if self.inner.closed.load(Ordering::Acquire) {
472 return Err(SendError::Closed(()));
473 }
474 }
475 }
476
477 Ok(total_sent)
478 }
479}
480
481impl<T, const N: usize> Receiver<T, N> {
482 /// Receive a message from the channel (async, waits if buffer is empty)
483 ///
484 /// Returns `None` if the channel is closed and empty
485 ///
486 /// 从通道接收消息(异步,如果缓冲区空则等待)
487 ///
488 /// 如果通道已关闭且为空,返回 `None`
489 pub async fn recv(&self) -> Option<T> {
490 loop {
491 match self.try_recv() {
492 Ok(value) => return Some(value),
493 Err(TryRecvError::Closed) => return None,
494 Err(TryRecvError::Empty) => {
495 // Check if channel is closed before waiting
496 // 等待前检查通道是否已关闭
497 if self.inner.closed.load(Ordering::Acquire) {
498 // Double check if there are any remaining items
499 // 再次检查是否有剩余项
500 if let Ok(value) = self.try_recv() {
501 return Some(value);
502 }
503 return None;
504 }
505
506 // Wait for data to become available
507 // 等待数据可用
508 self.inner.recv_notify.notified().await;
509 }
510 }
511 }
512 }
513
514 /// Try to receive a message without blocking
515 ///
516 /// # Errors
517 /// - Returns `TryRecvError::Empty` if the buffer is empty
518 /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
519 ///
520 /// 尝试非阻塞地接收消息
521 ///
522 /// # 错误
523 /// - 如果缓冲区空,返回 `TryRecvError::Empty`
524 /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
525 pub fn try_recv(&self) -> Result<T, TryRecvError> {
526 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
527 // 不会有多个线程同时访问 consumer
528 let consumer = unsafe { &mut *self.inner.consumer.get() };
529
530 match consumer.pop() {
531 Ok(value) => {
532 // Successfully received, notify sender
533 // 成功接收,通知发送者
534 self.inner.send_notify.notify_one();
535 Ok(value)
536 }
537 Err(PopError::Empty) => {
538 // Check if channel is closed
539 // 检查通道是否已关闭
540 if self.inner.closed.load(Ordering::Acquire) {
541 // Double-check for remaining items to avoid race condition
542 // where sender drops after push but before we check closed flag
543 // 再次检查是否有剩余项,以避免发送方在 push 后但在我们检查 closed 标志前 drop 的竞态条件
544 match consumer.pop() {
545 Ok(value) => {
546 self.inner.send_notify.notify_one();
547 Ok(value)
548 }
549 Err(PopError::Empty) => {
550 Err(TryRecvError::Closed)
551 }
552 }
553 } else {
554 Err(TryRecvError::Empty)
555 }
556 }
557 }
558 }
559
560 /// Check if the channel is empty
561 ///
562 /// 检查通道是否为空
563 #[inline]
564 pub fn is_empty(&self) -> bool {
565 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
566 // is_empty 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
567 let consumer = unsafe { &*self.inner.consumer.get() };
568 consumer.is_empty()
569 }
570
571 /// Get the number of messages currently in the channel
572 ///
573 /// 获取通道中当前的消息数量
574 #[inline]
575 pub fn len(&self) -> usize {
576 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
577 // slots 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
578 let consumer = unsafe { &*self.inner.consumer.get() };
579 consumer.slots()
580 }
581
582 /// Get the capacity of the channel
583 ///
584 /// 获取通道的容量
585 #[inline]
586 pub fn capacity(&self) -> usize {
587 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
588 // capacity 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
589 let consumer = unsafe { &*self.inner.consumer.get() };
590 consumer.buffer().capacity()
591 }
592
593 /// Peek at the first message without removing it
594 ///
595 /// 查看第一个消息但不移除它
596 ///
597 /// # Returns
598 /// `Some(&T)` if there is a message, `None` if the channel is empty
599 ///
600 /// # 返回值
601 /// 如果有消息则返回 `Some(&T)`,如果通道为空则返回 `None`
602 ///
603 /// # Safety
604 /// The returned reference is valid only as long as no other operations
605 /// are performed on the Receiver that might modify the buffer.
606 ///
607 /// # 安全性
608 /// 返回的引用仅在未对 Receiver 执行可能修改缓冲区的其他操作时有效。
609 #[inline]
610 pub fn peek(&self) -> Option<&T> {
611 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
612 // peek 只读取数据,不需要可变访问
613 let consumer = unsafe { &*self.inner.consumer.get() };
614 consumer.peek()
615 }
616
617 /// Clear all messages from the channel
618 ///
619 /// 清空通道中的所有消息
620 ///
621 /// This method pops and drops all messages currently in the channel.
622 ///
623 /// 此方法弹出并 drop 通道中当前的所有消息。
624 pub fn clear(&mut self) {
625 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
626 // 我们有可变引用,因此可以安全地访问 consumer
627 let consumer = unsafe { &mut *self.inner.consumer.get() };
628 consumer.clear();
629
630 // Notify sender that space is available
631 // 通知发送者空间可用
632 self.inner.send_notify.notify_one();
633 }
634
635 /// Create a draining iterator
636 ///
637 /// 创建一个消费迭代器
638 ///
639 /// Returns an iterator that removes and returns messages from the channel.
640 /// The iterator will continue until the channel is empty.
641 ///
642 /// 返回一个从通道中移除并返回消息的迭代器。
643 /// 迭代器将持续运行直到通道为空。
644 ///
645 /// # Examples
646 ///
647 /// ```
648 /// use lite_sync::spsc::channel;
649 /// use std::num::NonZeroUsize;
650 ///
651 /// # #[tokio::main]
652 /// # async fn main() {
653 /// let (tx, mut rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
654 /// tx.try_send(1).unwrap();
655 /// tx.try_send(2).unwrap();
656 /// tx.try_send(3).unwrap();
657 ///
658 /// let items: Vec<i32> = rx.drain().collect();
659 /// assert_eq!(items, vec![1, 2, 3]);
660 /// assert!(rx.is_empty());
661 /// # }
662 /// ```
663 #[inline]
664 pub fn drain(&mut self) -> Drain<'_, T, N> {
665 Drain { receiver: self }
666 }
667}
668
669impl<T: Copy, const N: usize> Receiver<T, N> {
670 /// Try to receive multiple values into a slice without blocking
671 ///
672 /// 尝试非阻塞地将多个值接收到切片
673 ///
674 /// This method attempts to receive as many messages as possible into the provided slice.
675 /// It returns the number of messages successfully received.
676 ///
677 /// 此方法尝试将尽可能多的消息接收到提供的切片中。
678 /// 返回成功接收的消息数量。
679 ///
680 /// # Parameters
681 /// - `dest`: Destination slice to receive values into
682 ///
683 /// # Returns
684 /// Number of messages successfully received (0 to dest.len())
685 ///
686 /// # 参数
687 /// - `dest`: 用于接收值的目标切片
688 ///
689 /// # 返回值
690 /// 成功接收的消息数量(0 到 dest.len())
691 pub fn try_recv_slice(&mut self, dest: &mut [T]) -> usize {
692 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
693 // 我们有可变引用,因此可以安全地访问 consumer
694 let consumer = unsafe { &mut *self.inner.consumer.get() };
695
696 let received = consumer.pop_slice(dest);
697
698 if received > 0 {
699 // Successfully received some messages, notify sender
700 // 成功接收一些消息,通知发送者
701 self.inner.send_notify.notify_one();
702 }
703
704 received
705 }
706
707 /// Receive multiple values into a slice (async, waits if buffer is empty)
708 ///
709 /// 将多个值接收到切片(异步,如果缓冲区空则等待)
710 ///
711 /// This method will fill the destination slice as much as possible, waiting if necessary
712 /// when the buffer becomes empty. Returns the number of messages received.
713 ///
714 /// 此方法将尽可能填充目标切片,必要时在缓冲区空时等待。
715 /// 返回接收的消息数量。
716 ///
717 /// # Parameters
718 /// - `dest`: Destination slice to receive values into
719 ///
720 /// # Returns
721 /// Number of messages successfully received (0 to dest.len())
722 /// Returns 0 if the channel is closed and empty
723 ///
724 /// # 参数
725 /// - `dest`: 用于接收值的目标切片
726 ///
727 /// # 返回值
728 /// 成功接收的消息数量(0 到 dest.len())
729 /// 如果通道已关闭且为空,返回 0
730 pub async fn recv_slice(&mut self, dest: &mut [T]) -> usize {
731 let mut total_received = 0;
732
733 while total_received < dest.len() {
734 let received = self.try_recv_slice(&mut dest[total_received..]);
735 total_received += received;
736
737 if total_received < dest.len() {
738 // Check if channel is closed
739 // 检查通道是否已关闭
740 if self.inner.closed.load(Ordering::Acquire) {
741 // Channel closed, return what we have
742 // 通道已关闭,返回我们已有的内容
743 return total_received;
744 }
745
746 // Need to wait for data
747 // 需要等待数据
748 self.inner.recv_notify.notified().await;
749
750 // Check again after waking up
751 // 唤醒后再次检查
752 if self.inner.closed.load(Ordering::Acquire) {
753 // Try one more time to get remaining messages
754 // 再尝试一次获取剩余消息
755 let final_received = self.try_recv_slice(&mut dest[total_received..]);
756 total_received += final_received;
757 return total_received;
758 }
759 }
760 }
761
762 total_received
763 }
764}
765
766impl<T, const N: usize> Drop for Receiver<T, N> {
767 fn drop(&mut self) {
768 // Mark channel as closed when receiver is dropped
769 // 当接收器被丢弃时标记通道为已关闭
770 self.inner.closed.store(true, Ordering::Release);
771
772 // Notify sender in case it's waiting
773 // 通知发送者以防它正在等待
774 self.inner.send_notify.notify_one();
775 }
776}
777
778impl<T, const N: usize> Drop for Sender<T, N> {
779 fn drop(&mut self) {
780 // Mark channel as closed when sender is dropped
781 // 当发送器被丢弃时标记通道为已关闭
782 self.inner.closed.store(true, Ordering::Release);
783
784 // Notify receiver in case it's waiting
785 // 通知接收器以防它正在等待
786 self.inner.recv_notify.notify_one();
787 }
788}
789
790#[cfg(test)]
791mod tests {
792 use super::*;
793
794 #[tokio::test]
795 async fn test_basic_send_recv() {
796 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
797
798 tx.send(1).await.unwrap();
799 tx.send(2).await.unwrap();
800 tx.send(3).await.unwrap();
801
802 assert_eq!(rx.recv().await, Some(1));
803 assert_eq!(rx.recv().await, Some(2));
804 assert_eq!(rx.recv().await, Some(3));
805 }
806
807 #[tokio::test]
808 async fn test_try_send_recv() {
809 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
810
811 tx.try_send(1).unwrap();
812 tx.try_send(2).unwrap();
813
814 assert_eq!(rx.try_recv().unwrap(), 1);
815 assert_eq!(rx.try_recv().unwrap(), 2);
816 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
817 }
818
819 #[tokio::test]
820 async fn test_channel_closed_on_sender_drop() {
821 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
822
823 tx.send(1).await.unwrap();
824 drop(tx);
825
826 assert_eq!(rx.recv().await, Some(1));
827 assert_eq!(rx.recv().await, None);
828 }
829
830 #[tokio::test]
831 async fn test_channel_closed_on_receiver_drop() {
832 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
833
834 drop(rx);
835
836 assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
837 }
838
839 #[tokio::test]
840 async fn test_cross_task_communication() {
841 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
842
843 let sender_handle = tokio::spawn(async move {
844 for i in 0..10 {
845 tx.send(i).await.unwrap();
846 }
847 });
848
849 let receiver_handle = tokio::spawn(async move {
850 let mut sum = 0;
851 while let Some(value) = rx.recv().await {
852 sum += value;
853 }
854 sum
855 });
856
857 sender_handle.await.unwrap();
858 let sum = receiver_handle.await.unwrap();
859 assert_eq!(sum, 45); // 0+1+2+...+9 = 45
860 }
861
862 #[tokio::test]
863 async fn test_backpressure() {
864 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
865
866 // Fill the buffer
867 tx.try_send(1).unwrap();
868 tx.try_send(2).unwrap();
869 tx.try_send(3).unwrap();
870 tx.try_send(4).unwrap();
871
872 // Buffer should be full now
873 assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
874
875 // This should block and then succeed when we consume
876 let send_handle = tokio::spawn(async move {
877 tx.send(5).await.unwrap();
878 tx.send(6).await.unwrap();
879 });
880
881 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
882
883 assert_eq!(rx.recv().await, Some(1));
884 assert_eq!(rx.recv().await, Some(2));
885 assert_eq!(rx.recv().await, Some(3));
886 assert_eq!(rx.recv().await, Some(4));
887 assert_eq!(rx.recv().await, Some(5));
888 assert_eq!(rx.recv().await, Some(6));
889
890 send_handle.await.unwrap();
891 }
892
893 #[tokio::test]
894 async fn test_capacity_and_len() {
895 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
896
897 assert_eq!(rx.capacity(), 8);
898 assert_eq!(rx.len(), 0);
899 assert!(rx.is_empty());
900
901 tx.try_send(1).unwrap();
902 tx.try_send(2).unwrap();
903
904 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
905 assert_eq!(rx.len(), 2);
906 assert!(!rx.is_empty());
907 }
908
909 // ==================== New API Tests ====================
910
911 #[tokio::test]
912 async fn test_sender_capacity_queries() {
913 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
914
915 assert_eq!(tx.capacity(), 8);
916 assert_eq!(tx.len(), 0);
917 assert_eq!(tx.free_slots(), 8);
918 assert!(!tx.is_full());
919
920 tx.try_send(1).unwrap();
921 tx.try_send(2).unwrap();
922 tx.try_send(3).unwrap();
923
924 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
925 assert_eq!(tx.len(), 3);
926 assert_eq!(tx.free_slots(), 5);
927 assert!(!tx.is_full());
928
929 // Fill the buffer
930 tx.try_send(4).unwrap();
931 tx.try_send(5).unwrap();
932 tx.try_send(6).unwrap();
933 tx.try_send(7).unwrap();
934 tx.try_send(8).unwrap();
935
936 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
937 assert_eq!(tx.len(), 8);
938 assert_eq!(tx.free_slots(), 0);
939 assert!(tx.is_full());
940
941 // Pop one and check again
942 rx.recv().await;
943 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
944
945 assert_eq!(tx.len(), 7);
946 assert_eq!(tx.free_slots(), 1);
947 assert!(!tx.is_full());
948 }
949
950 #[tokio::test]
951 async fn test_try_send_slice() {
952 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
953
954 let data = [1, 2, 3, 4, 5];
955 let sent = tx.try_send_slice(&data);
956
957 assert_eq!(sent, 5);
958 assert_eq!(rx.len(), 5);
959
960 for i in 0..5 {
961 assert_eq!(rx.recv().await.unwrap(), data[i]);
962 }
963 }
964
965 #[tokio::test]
966 async fn test_try_send_slice_partial() {
967 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
968
969 // Fill with 5 elements, leaving room for 3
970 let initial = [1, 2, 3, 4, 5];
971 tx.try_send_slice(&initial);
972
973 // Try to send 10 more, should only send 3
974 let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
975 let sent = tx.try_send_slice(&more);
976
977 assert_eq!(sent, 3);
978 assert_eq!(rx.len(), 8);
979 assert!(tx.is_full());
980
981 // Verify values
982 for i in 1..=8 {
983 assert_eq!(rx.recv().await.unwrap(), i);
984 }
985 }
986
987 #[tokio::test]
988 async fn test_send_slice() {
989 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
990
991 let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
992 let result = tx.send_slice(&data).await;
993
994 assert_eq!(result.unwrap(), 10);
995 assert_eq!(rx.len(), 10);
996
997 for i in 0..10 {
998 assert_eq!(rx.recv().await.unwrap(), data[i]);
999 }
1000 }
1001
1002 #[tokio::test]
1003 async fn test_send_slice_with_backpressure() {
1004 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1005
1006 let data = [1, 2, 3, 4, 5, 6, 7, 8];
1007
1008 let send_handle = tokio::spawn(async move {
1009 tx.send_slice(&data).await.unwrap()
1010 });
1011
1012 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1013
1014 // Consume some messages to make room
1015 for i in 1..=4 {
1016 assert_eq!(rx.recv().await.unwrap(), i);
1017 }
1018
1019 let sent = send_handle.await.unwrap();
1020 assert_eq!(sent, 8);
1021
1022 // Verify remaining messages
1023 for i in 5..=8 {
1024 assert_eq!(rx.recv().await.unwrap(), i);
1025 }
1026 }
1027
1028 #[tokio::test]
1029 async fn test_peek() {
1030 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1031
1032 // Peek empty buffer
1033 assert!(rx.peek().is_none());
1034
1035 tx.try_send(42).unwrap();
1036 tx.try_send(100).unwrap();
1037 tx.try_send(200).unwrap();
1038
1039 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1040
1041 // Peek should return first element without removing it
1042 assert_eq!(rx.peek(), Some(&42));
1043 assert_eq!(rx.peek(), Some(&42)); // Peek again, should be same
1044 assert_eq!(rx.len(), 3); // Length unchanged
1045 }
1046
1047 #[tokio::test]
1048 async fn test_peek_after_recv() {
1049 let (tx, rx) = channel::<String, 32>(NonZeroUsize::new(8).unwrap());
1050
1051 tx.try_send("first".to_string()).unwrap();
1052 tx.try_send("second".to_string()).unwrap();
1053 tx.try_send("third".to_string()).unwrap();
1054
1055 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1056
1057 assert_eq!(rx.peek(), Some(&"first".to_string()));
1058 rx.recv().await.unwrap();
1059
1060 assert_eq!(rx.peek(), Some(&"second".to_string()));
1061 rx.recv().await.unwrap();
1062
1063 assert_eq!(rx.peek(), Some(&"third".to_string()));
1064 rx.recv().await.unwrap();
1065
1066 assert!(rx.peek().is_none());
1067 }
1068
1069 #[tokio::test]
1070 async fn test_clear() {
1071 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1072
1073 for i in 0..10 {
1074 tx.try_send(i).unwrap();
1075 }
1076
1077 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1078 assert_eq!(rx.len(), 10);
1079
1080 rx.clear();
1081
1082 assert_eq!(rx.len(), 0);
1083 assert!(rx.is_empty());
1084 }
1085
1086 #[tokio::test]
1087 async fn test_clear_with_drop() {
1088 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1089 use std::sync::Arc;
1090
1091 #[derive(Debug)]
1092 struct DropCounter {
1093 counter: Arc<AtomicUsize>,
1094 }
1095
1096 impl Drop for DropCounter {
1097 fn drop(&mut self) {
1098 self.counter.fetch_add(1, AtomicOrdering::SeqCst);
1099 }
1100 }
1101
1102 let counter = Arc::new(AtomicUsize::new(0));
1103
1104 {
1105 let (tx, mut rx) = channel::<DropCounter, 32>(NonZeroUsize::new(16).unwrap());
1106
1107 for _ in 0..8 {
1108 tx.try_send(DropCounter { counter: counter.clone() }).unwrap();
1109 }
1110
1111 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1112 assert_eq!(counter.load(AtomicOrdering::SeqCst), 0);
1113
1114 rx.clear();
1115
1116 assert_eq!(counter.load(AtomicOrdering::SeqCst), 8);
1117 }
1118 }
1119
1120 #[tokio::test]
1121 async fn test_drain() {
1122 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1123
1124 for i in 0..10 {
1125 tx.try_send(i).unwrap();
1126 }
1127
1128 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1129
1130 let collected: Vec<i32> = rx.drain().collect();
1131
1132 assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1133 assert!(rx.is_empty());
1134 }
1135
1136 #[tokio::test]
1137 async fn test_drain_empty() {
1138 let (_tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1139
1140 let collected: Vec<i32> = rx.drain().collect();
1141
1142 assert!(collected.is_empty());
1143 }
1144
1145 #[tokio::test]
1146 async fn test_drain_size_hint() {
1147 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1148
1149 for i in 0..5 {
1150 tx.try_send(i).unwrap();
1151 }
1152
1153 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1154
1155 let mut drain = rx.drain();
1156
1157 assert_eq!(drain.size_hint(), (5, Some(5)));
1158
1159 drain.next();
1160 assert_eq!(drain.size_hint(), (4, Some(4)));
1161
1162 drain.next();
1163 assert_eq!(drain.size_hint(), (3, Some(3)));
1164 }
1165
1166 #[tokio::test]
1167 async fn test_try_recv_slice() {
1168 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1169
1170 // Send some data
1171 for i in 0..10 {
1172 tx.try_send(i).unwrap();
1173 }
1174
1175 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1176
1177 let mut dest = [0u32; 5];
1178 let received = rx.try_recv_slice(&mut dest);
1179
1180 assert_eq!(received, 5);
1181 assert_eq!(dest, [0, 1, 2, 3, 4]);
1182 assert_eq!(rx.len(), 5);
1183 }
1184
1185 #[tokio::test]
1186 async fn test_try_recv_slice_partial() {
1187 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1188
1189 tx.try_send(1).unwrap();
1190 tx.try_send(2).unwrap();
1191 tx.try_send(3).unwrap();
1192
1193 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1194
1195 let mut dest = [0u32; 10];
1196 let received = rx.try_recv_slice(&mut dest);
1197
1198 assert_eq!(received, 3);
1199 assert_eq!(&dest[0..3], &[1, 2, 3]);
1200 assert!(rx.is_empty());
1201 }
1202
1203 #[tokio::test]
1204 async fn test_recv_slice() {
1205 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1206
1207 for i in 1..=10 {
1208 tx.try_send(i).unwrap();
1209 }
1210
1211 let mut dest = [0u32; 10];
1212 let received = rx.recv_slice(&mut dest).await;
1213
1214 assert_eq!(received, 10);
1215 assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1216 assert!(rx.is_empty());
1217 }
1218
1219 #[tokio::test]
1220 async fn test_recv_slice_with_wait() {
1221 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1222
1223 let recv_handle = tokio::spawn(async move {
1224 let mut dest = [0u32; 8];
1225 let received = rx.recv_slice(&mut dest).await;
1226 (received, dest)
1227 });
1228
1229 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1230
1231 // Send data gradually
1232 for i in 1..=8 {
1233 tx.send(i).await.unwrap();
1234 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1235 }
1236
1237 let (received, dest) = recv_handle.await.unwrap();
1238 assert_eq!(received, 8);
1239 assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8]);
1240 }
1241
1242 #[tokio::test]
1243 async fn test_recv_slice_channel_closed() {
1244 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1245
1246 tx.try_send(1).unwrap();
1247 tx.try_send(2).unwrap();
1248 tx.try_send(3).unwrap();
1249
1250 drop(tx); // Close the channel
1251
1252 let mut dest = [0u32; 10];
1253 let received = rx.recv_slice(&mut dest).await;
1254
1255 // Should receive the 3 available messages, then stop
1256 assert_eq!(received, 3);
1257 assert_eq!(&dest[0..3], &[1, 2, 3]);
1258 }
1259
1260 #[tokio::test]
1261 async fn test_combined_new_apis() {
1262 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1263
1264 // Batch send
1265 let data = [1, 2, 3, 4, 5];
1266 tx.try_send_slice(&data);
1267
1268 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1269
1270 assert_eq!(tx.len(), 5);
1271 assert_eq!(rx.len(), 5);
1272 assert_eq!(rx.capacity(), 16);
1273
1274 // Peek
1275 assert_eq!(rx.peek(), Some(&1));
1276
1277 // Batch receive
1278 let mut dest = [0u32; 3];
1279 rx.try_recv_slice(&mut dest);
1280 assert_eq!(dest, [1, 2, 3]);
1281
1282 assert_eq!(rx.len(), 2);
1283 assert_eq!(tx.free_slots(), 14);
1284
1285 // Drain remaining
1286 let remaining: Vec<u32> = rx.drain().collect();
1287 assert_eq!(remaining, vec![4, 5]);
1288
1289 assert!(rx.is_empty());
1290 assert!(!tx.is_full());
1291 }
1292}