lite_sync/spsc.rs
1/// High-performance async SPSC (Single Producer Single Consumer) channel
2///
3/// Built on top of `smallring` - a high-performance ring buffer with inline storage support.
4/// Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5/// The `N` const generic parameter allows specifying inline buffer size for zero-allocation small channels.
6///
7/// 高性能异步 SPSC(单生产者单消费者)通道
8///
9/// 基于 `smallring` 构建 - 一个支持内联存储的高性能环形缓冲区。
10/// 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
11/// `N` 常量泛型参数允许指定内联缓冲区大小,实现小容量通道的零分配。
12///
13/// # 安全性说明 (Safety Notes)
14///
15/// 本实现使用 `UnsafeCell` 来提供零成本内部可变性,而不是 `Mutex`。
16/// 这是安全的,基于以下保证:
17///
18/// 1. **单一所有权**:`Sender` 和 `Receiver` 都不实现 `Clone`,确保每个通道只有一个发送者和一个接收者
19/// 2. **访问隔离**:`Producer` 只被唯一的 `Sender` 访问,`Consumer` 只被唯一的 `Receiver` 访问
20/// 3. **无数据竞争**:由于单一所有权,不会有多个线程同时访问同一个 `Producer` 或 `Consumer`
21/// 4. **原子通信**:`Producer` 和 `Consumer` 内部使用原子操作进行跨线程通信
22/// 5. **类型系统保证**:通过类型系统强制 SPSC 语义,防止误用为 MPMC
23///
24/// 这种设计实现了零同步开销,完全消除了 `Mutex` 的性能损失。
25///
26/// # Safety Guarantees
27///
28/// This implementation uses `UnsafeCell` for zero-cost interior mutability instead of `Mutex`.
29/// This is safe based on the following guarantees:
30///
31/// 1. **Single Ownership**: Neither `Sender` nor `Receiver` implements `Clone`, ensuring only one sender and one receiver per channel
32/// 2. **Access Isolation**: `Producer` is only accessed by the unique `Sender`, `Consumer` only by the unique `Receiver`
33/// 3. **No Data Races**: Due to single ownership, there's no concurrent access to the same `Producer` or `Consumer`
34/// 4. **Atomic Communication**: `Producer` and `Consumer` use atomic operations internally for cross-thread communication
35/// 5. **Type System Enforcement**: SPSC semantics are enforced by the type system, preventing misuse as MPMC
36///
37/// This design achieves zero synchronization overhead, completely eliminating `Mutex` performance costs.
38
39use std::cell::UnsafeCell;
40use std::num::NonZeroUsize;
41use std::sync::atomic::{AtomicBool, Ordering};
42use std::sync::Arc;
43use super::notify::SingleWaiterNotify;
44
45/// SPSC channel creation function
46///
47/// Creates a bounded SPSC channel with the specified capacity.
48///
49/// # Type Parameters
50/// - `T`: The type of messages to be sent through the channel
51/// - `N`: The size of the inline buffer (number of elements stored inline before heap allocation)
52///
53/// # Parameters
54/// - `capacity`: Channel capacity (total number of elements the channel can hold)
55///
56/// # Returns
57/// A tuple of (Sender, Receiver)
58///
59/// # Examples
60///
61/// ```
62/// use lite_sync::spsc::channel;
63/// use std::num::NonZeroUsize;
64///
65/// #[tokio::main]
66/// async fn main() {
67/// // Create a channel with capacity 32 and inline buffer size 8
68/// let (tx, rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
69///
70/// tokio::spawn(async move {
71/// tx.send(42).await.unwrap();
72/// });
73///
74/// let value = rx.recv().await.unwrap();
75/// assert_eq!(value, 42);
76/// }
77/// ```
78///
79/// SPSC 通道创建函数
80///
81/// 创建指定容量的有界 SPSC 通道。
82///
83/// # 类型参数
84/// - `T`: 通过通道发送的消息类型
85/// - `N`: 内联缓冲区大小(在堆分配之前内联存储的元素数量)
86///
87/// # 参数
88/// - `capacity`: 通道容量(通道可以容纳的元素总数)
89///
90/// # 返回值
91/// 返回 (Sender, Receiver) 元组
92pub fn channel<T, const N: usize>(capacity: NonZeroUsize) -> (Sender<T, N>, Receiver<T, N>) {
93 let (producer, consumer) = smallring::new::<T, N>(capacity);
94
95 let inner = Arc::new(Inner::<T, N> {
96 producer: UnsafeCell::new(producer),
97 consumer: UnsafeCell::new(consumer),
98 closed: AtomicBool::new(false),
99 recv_notify: SingleWaiterNotify::new(),
100 send_notify: SingleWaiterNotify::new(),
101 });
102
103 let sender = Sender {
104 inner: inner.clone(),
105 };
106
107 let receiver = Receiver {
108 inner,
109 };
110
111 (sender, receiver)
112}
113
114/// Shared internal state for SPSC channel
115///
116/// Contains both shared state and the ring buffer halves.
117/// Uses UnsafeCell for zero-cost interior mutability of Producer/Consumer.
118///
119/// SPSC 通道的共享内部状态
120///
121/// 包含共享状态和环形缓冲区的两端。
122/// 使用 UnsafeCell 实现 Producer/Consumer 的零成本内部可变性。
123struct Inner<T, const N: usize = 32> {
124 /// Producer (wrapped in UnsafeCell for zero-cost interior mutability)
125 ///
126 /// 生产者(用 UnsafeCell 包装以实现零成本内部可变性)
127 producer: UnsafeCell<smallring::Producer<T, N>>,
128
129 /// Consumer (wrapped in UnsafeCell for zero-cost interior mutability)
130 ///
131 /// 消费者(用 UnsafeCell 包装以实现零成本内部可变性)
132 consumer: UnsafeCell<smallring::Consumer<T, N>>,
133
134 /// Channel closed flag
135 ///
136 /// 通道关闭标志
137 closed: AtomicBool,
138
139 /// Notifier for receiver waiting (lightweight single-waiter)
140 ///
141 /// 接收者等待通知器(轻量级单等待者)
142 recv_notify: SingleWaiterNotify,
143
144 /// Notifier for sender waiting when buffer is full (lightweight single-waiter)
145 ///
146 /// 发送者等待通知器,当缓冲区满时使用(轻量级单等待者)
147 send_notify: SingleWaiterNotify,
148}
149
150// SAFETY: Inner<T> 可以在线程间安全共享的原因:
151// 1. Sender 和 Receiver 都不实现 Clone,确保单一所有权
152// 2. producer 只被唯一的 Sender 访问,不会有多线程竞争
153// 3. consumer 只被唯一的 Receiver 访问,不会有多线程竞争
154// 4. closed、recv_notify、send_notify 都已经是线程安全的
155// 5. Producer 和 Consumer 内部使用原子操作进行跨线程通信
156unsafe impl<T: Send, const N: usize> Sync for Inner<T, N> {}
157
158/// SPSC channel sender
159///
160/// SPSC 通道发送器
161pub struct Sender<T, const N: usize> {
162 inner: Arc<Inner<T, N>>,
163}
164
165/// SPSC channel receiver
166///
167/// SPSC 通道接收器
168pub struct Receiver<T, const N: usize> {
169 inner: Arc<Inner<T, N>>,
170}
171
172/// Draining iterator for the SPSC channel
173///
174/// SPSC 通道的消费迭代器
175///
176/// This iterator removes and returns messages from the channel until it's empty.
177///
178/// 此迭代器从通道中移除并返回消息,直到通道为空。
179///
180/// # Type Parameters
181/// - `T`: Message type
182/// - `N`: Inline buffer size
183///
184/// # 类型参数
185/// - `T`: 消息类型
186/// - `N`: 内联缓冲区大小
187pub struct Drain<'a, T, const N: usize> {
188 receiver: &'a mut Receiver<T, N>,
189}
190
191impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
192 type Item = T;
193
194 #[inline]
195 fn next(&mut self) -> Option<Self::Item> {
196 self.receiver.try_recv().ok()
197 }
198
199 #[inline]
200 fn size_hint(&self) -> (usize, Option<usize>) {
201 let len = self.receiver.len();
202 (len, Some(len))
203 }
204}
205
206/// Send error type
207///
208/// 发送错误类型
209#[derive(Debug, Clone, Copy, PartialEq, Eq)]
210pub enum SendError<T> {
211 /// Channel is closed
212 ///
213 /// 通道已关闭
214 Closed(T),
215}
216
217/// Try-receive error type
218///
219/// 尝试接收错误类型
220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
221pub enum TryRecvError {
222 /// Channel is empty
223 ///
224 /// 通道为空
225 Empty,
226
227 /// Channel is closed
228 ///
229 /// 通道已关闭
230 Closed,
231}
232
233/// Try-send error type
234///
235/// 尝试发送错误类型
236#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237pub enum TrySendError<T> {
238 /// Buffer is full
239 ///
240 /// 缓冲区已满
241 Full(T),
242
243 /// Channel is closed
244 ///
245 /// 通道已关闭
246 Closed(T),
247}
248
249impl<T, const N: usize> Sender<T, N> {
250 /// Send a message to the channel (async, waits if buffer is full)
251 ///
252 /// # Errors
253 /// Returns `SendError::Closed` if the receiver has been dropped
254 ///
255 /// 向通道发送消息(异步,如果缓冲区满则等待)
256 ///
257 /// # 错误
258 /// 如果接收器已被丢弃,返回 `SendError::Closed`
259 pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
260 loop {
261 match self.try_send(value) {
262 Ok(()) => return Ok(()),
263 Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
264 Err(TrySendError::Full(v)) => {
265 // Store the value to retry
266 // 存储值以便重试
267 value = v;
268
269 // Wait for space to become available
270 // 等待空间可用
271 self.inner.send_notify.notified().await;
272
273 // Check if channel was closed while waiting
274 // 检查等待时通道是否已关闭
275 if self.inner.closed.load(Ordering::Acquire) {
276 return Err(SendError::Closed(value));
277 }
278
279 // Retry with the value in next loop iteration
280 // 在下一次循环迭代中使用该值重试
281 }
282 }
283 }
284 }
285
286 /// Try to send a message without blocking
287 ///
288 /// # Errors
289 /// - Returns `TrySendError::Full` if the buffer is full
290 /// - Returns `TrySendError::Closed` if the receiver has been dropped
291 ///
292 /// 尝试非阻塞地发送消息
293 ///
294 /// # 错误
295 /// - 如果缓冲区满,返回 `TrySendError::Full`
296 /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
297 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
298 // Check if channel is closed first
299 // 首先检查通道是否已关闭
300 if self.inner.closed.load(Ordering::Acquire) {
301 return Err(TrySendError::Closed(value));
302 }
303
304 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
305 // 不会有多个线程同时访问 producer
306 let producer = unsafe { &mut *self.inner.producer.get() };
307
308 match producer.push(value) {
309 Ok(()) => {
310 // Successfully sent, notify receiver
311 // 成功发送,通知接收者
312 self.inner.recv_notify.notify_one();
313 Ok(())
314 }
315 Err(smallring::PushError::Full(v)) => {
316 Err(TrySendError::Full(v))
317 }
318 }
319 }
320
321 /// Check if the channel is closed
322 ///
323 /// 检查通道是否已关闭
324 #[inline]
325 pub fn is_closed(&self) -> bool {
326 self.inner.closed.load(Ordering::Acquire)
327 }
328
329 /// Get the capacity of the channel
330 ///
331 /// 获取通道的容量
332 #[inline]
333 pub fn capacity(&self) -> usize {
334 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
335 // capacity 只读取数据,不需要可变访问
336 let producer = unsafe { &*self.inner.producer.get() };
337 producer.capacity()
338 }
339
340 /// Get the number of messages currently in the channel
341 ///
342 /// 获取通道中当前的消息数量
343 #[inline]
344 pub fn len(&self) -> usize {
345 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
346 // slots 只读取数据,不需要可变访问
347 let producer = unsafe { &*self.inner.producer.get() };
348 producer.slots()
349 }
350
351 /// Get the number of free slots in the channel
352 ///
353 /// 获取通道中的空闲空间数量
354 #[inline]
355 pub fn free_slots(&self) -> usize {
356 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
357 // free_slots 只读取数据,不需要可变访问
358 let producer = unsafe { &*self.inner.producer.get() };
359 producer.free_slots()
360 }
361
362 /// Check if the channel is full
363 ///
364 /// 检查通道是否已满
365 #[inline]
366 pub fn is_full(&self) -> bool {
367 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
368 // is_full 只读取数据,不需要可变访问
369 let producer = unsafe { &*self.inner.producer.get() };
370 producer.is_full()
371 }
372}
373
374impl<T: Copy, const N: usize> Sender<T, N> {
375 /// Try to send multiple values from a slice without blocking
376 ///
377 /// 尝试非阻塞地从切片发送多个值
378 ///
379 /// This method attempts to send as many elements as possible from the slice.
380 /// It returns the number of elements successfully sent.
381 ///
382 /// 此方法尝试从切片中发送尽可能多的元素。
383 /// 返回成功发送的元素数量。
384 ///
385 /// # Parameters
386 /// - `values`: Slice of values to send
387 ///
388 /// # Returns
389 /// Number of elements successfully sent (0 to values.len())
390 ///
391 /// # 参数
392 /// - `values`: 要发送的值的切片
393 ///
394 /// # 返回值
395 /// 成功发送的元素数量(0 到 values.len())
396 pub fn try_send_slice(&self, values: &[T]) -> usize {
397 // Check if channel is closed first
398 // 首先检查通道是否已关闭
399 if self.inner.closed.load(Ordering::Acquire) {
400 return 0;
401 }
402
403 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
404 // 不会有多个线程同时访问 producer
405 let producer = unsafe { &mut *self.inner.producer.get() };
406
407 let sent = producer.push_slice(values);
408
409 if sent > 0 {
410 // Successfully sent some messages, notify receiver
411 // 成功发送一些消息,通知接收者
412 self.inner.recv_notify.notify_one();
413 }
414
415 sent
416 }
417
418 /// Send multiple values from a slice (async, waits if buffer is full)
419 ///
420 /// 从切片发送多个值(异步,如果缓冲区满则等待)
421 ///
422 /// This method will send all elements from the slice, waiting if necessary
423 /// when the buffer becomes full. Returns the number of elements sent, or
424 /// an error if the channel is closed.
425 ///
426 /// 此方法将发送切片中的所有元素,必要时在缓冲区满时等待。
427 /// 返回发送的元素数量,如果通道关闭则返回错误。
428 ///
429 /// # Parameters
430 /// - `values`: Slice of values to send
431 ///
432 /// # Returns
433 /// - `Ok(usize)`: Number of elements successfully sent
434 /// - `Err(SendError)`: Channel is closed
435 ///
436 /// # 参数
437 /// - `values`: 要发送的值的切片
438 ///
439 /// # 返回值
440 /// - `Ok(usize)`: 成功发送的元素数量
441 /// - `Err(SendError)`: 通道已关闭
442 pub async fn send_slice(&self, values: &[T]) -> Result<usize, SendError<()>> {
443 let mut total_sent = 0;
444
445 while total_sent < values.len() {
446 // Check if channel is closed
447 // 检查通道是否已关闭
448 if self.inner.closed.load(Ordering::Acquire) {
449 return Err(SendError::Closed(()));
450 }
451
452 let sent = self.try_send_slice(&values[total_sent..]);
453 total_sent += sent;
454
455 if total_sent < values.len() {
456 // Need to wait for space
457 // 需要等待空间
458 self.inner.send_notify.notified().await;
459
460 // Check if channel was closed while waiting
461 // 检查等待时通道是否已关闭
462 if self.inner.closed.load(Ordering::Acquire) {
463 return Err(SendError::Closed(()));
464 }
465 }
466 }
467
468 Ok(total_sent)
469 }
470}
471
472impl<T, const N: usize> Receiver<T, N> {
473 /// Receive a message from the channel (async, waits if buffer is empty)
474 ///
475 /// Returns `None` if the channel is closed and empty
476 ///
477 /// 从通道接收消息(异步,如果缓冲区空则等待)
478 ///
479 /// 如果通道已关闭且为空,返回 `None`
480 pub async fn recv(&self) -> Option<T> {
481 loop {
482 match self.try_recv() {
483 Ok(value) => return Some(value),
484 Err(TryRecvError::Closed) => return None,
485 Err(TryRecvError::Empty) => {
486 // Check if channel is closed before waiting
487 // 等待前检查通道是否已关闭
488 if self.inner.closed.load(Ordering::Acquire) {
489 // Double check if there are any remaining items
490 // 再次检查是否有剩余项
491 if let Ok(value) = self.try_recv() {
492 return Some(value);
493 }
494 return None;
495 }
496
497 // Wait for data to become available
498 // 等待数据可用
499 self.inner.recv_notify.notified().await;
500 }
501 }
502 }
503 }
504
505 /// Try to receive a message without blocking
506 ///
507 /// # Errors
508 /// - Returns `TryRecvError::Empty` if the buffer is empty
509 /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
510 ///
511 /// 尝试非阻塞地接收消息
512 ///
513 /// # 错误
514 /// - 如果缓冲区空,返回 `TryRecvError::Empty`
515 /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
516 pub fn try_recv(&self) -> Result<T, TryRecvError> {
517 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
518 // 不会有多个线程同时访问 consumer
519 let consumer = unsafe { &mut *self.inner.consumer.get() };
520
521 match consumer.pop() {
522 Ok(value) => {
523 // Successfully received, notify sender
524 // 成功接收,通知发送者
525 self.inner.send_notify.notify_one();
526 Ok(value)
527 }
528 Err(smallring::PopError::Empty) => {
529 if self.inner.closed.load(Ordering::Acquire) {
530 Err(TryRecvError::Closed)
531 } else {
532 Err(TryRecvError::Empty)
533 }
534 }
535 }
536 }
537
538 /// Check if the channel is empty
539 ///
540 /// 检查通道是否为空
541 #[inline]
542 pub fn is_empty(&self) -> bool {
543 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
544 // is_empty 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
545 let consumer = unsafe { &*self.inner.consumer.get() };
546 consumer.is_empty()
547 }
548
549 /// Get the number of messages currently in the channel
550 ///
551 /// 获取通道中当前的消息数量
552 #[inline]
553 pub fn len(&self) -> usize {
554 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
555 // slots 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
556 let consumer = unsafe { &*self.inner.consumer.get() };
557 consumer.slots()
558 }
559
560 /// Get the capacity of the channel
561 ///
562 /// 获取通道的容量
563 #[inline]
564 pub fn capacity(&self) -> usize {
565 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
566 // capacity 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
567 let consumer = unsafe { &*self.inner.consumer.get() };
568 consumer.buffer().capacity()
569 }
570
571 /// Peek at the first message without removing it
572 ///
573 /// 查看第一个消息但不移除它
574 ///
575 /// # Returns
576 /// `Some(&T)` if there is a message, `None` if the channel is empty
577 ///
578 /// # 返回值
579 /// 如果有消息则返回 `Some(&T)`,如果通道为空则返回 `None`
580 ///
581 /// # Safety
582 /// The returned reference is valid only as long as no other operations
583 /// are performed on the Receiver that might modify the buffer.
584 ///
585 /// # 安全性
586 /// 返回的引用仅在未对 Receiver 执行可能修改缓冲区的其他操作时有效。
587 #[inline]
588 pub fn peek(&self) -> Option<&T> {
589 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
590 // peek 只读取数据,不需要可变访问
591 let consumer = unsafe { &*self.inner.consumer.get() };
592 consumer.peek()
593 }
594
595 /// Clear all messages from the channel
596 ///
597 /// 清空通道中的所有消息
598 ///
599 /// This method pops and drops all messages currently in the channel.
600 ///
601 /// 此方法弹出并 drop 通道中当前的所有消息。
602 pub fn clear(&mut self) {
603 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
604 // 我们有可变引用,因此可以安全地访问 consumer
605 let consumer = unsafe { &mut *self.inner.consumer.get() };
606 consumer.clear();
607
608 // Notify sender that space is available
609 // 通知发送者空间可用
610 self.inner.send_notify.notify_one();
611 }
612
613 /// Create a draining iterator
614 ///
615 /// 创建一个消费迭代器
616 ///
617 /// Returns an iterator that removes and returns messages from the channel.
618 /// The iterator will continue until the channel is empty.
619 ///
620 /// 返回一个从通道中移除并返回消息的迭代器。
621 /// 迭代器将持续运行直到通道为空。
622 ///
623 /// # Examples
624 ///
625 /// ```
626 /// use lite_sync::spsc::channel;
627 /// use std::num::NonZeroUsize;
628 ///
629 /// # #[tokio::main]
630 /// # async fn main() {
631 /// let (tx, mut rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
632 /// tx.try_send(1).unwrap();
633 /// tx.try_send(2).unwrap();
634 /// tx.try_send(3).unwrap();
635 ///
636 /// let items: Vec<i32> = rx.drain().collect();
637 /// assert_eq!(items, vec![1, 2, 3]);
638 /// assert!(rx.is_empty());
639 /// # }
640 /// ```
641 #[inline]
642 pub fn drain(&mut self) -> Drain<'_, T, N> {
643 Drain { receiver: self }
644 }
645}
646
647impl<T: Copy, const N: usize> Receiver<T, N> {
648 /// Try to receive multiple values into a slice without blocking
649 ///
650 /// 尝试非阻塞地将多个值接收到切片
651 ///
652 /// This method attempts to receive as many messages as possible into the provided slice.
653 /// It returns the number of messages successfully received.
654 ///
655 /// 此方法尝试将尽可能多的消息接收到提供的切片中。
656 /// 返回成功接收的消息数量。
657 ///
658 /// # Parameters
659 /// - `dest`: Destination slice to receive values into
660 ///
661 /// # Returns
662 /// Number of messages successfully received (0 to dest.len())
663 ///
664 /// # 参数
665 /// - `dest`: 用于接收值的目标切片
666 ///
667 /// # 返回值
668 /// 成功接收的消息数量(0 到 dest.len())
669 pub fn try_recv_slice(&mut self, dest: &mut [T]) -> usize {
670 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
671 // 我们有可变引用,因此可以安全地访问 consumer
672 let consumer = unsafe { &mut *self.inner.consumer.get() };
673
674 let received = consumer.pop_slice(dest);
675
676 if received > 0 {
677 // Successfully received some messages, notify sender
678 // 成功接收一些消息,通知发送者
679 self.inner.send_notify.notify_one();
680 }
681
682 received
683 }
684
685 /// Receive multiple values into a slice (async, waits if buffer is empty)
686 ///
687 /// 将多个值接收到切片(异步,如果缓冲区空则等待)
688 ///
689 /// This method will fill the destination slice as much as possible, waiting if necessary
690 /// when the buffer becomes empty. Returns the number of messages received.
691 ///
692 /// 此方法将尽可能填充目标切片,必要时在缓冲区空时等待。
693 /// 返回接收的消息数量。
694 ///
695 /// # Parameters
696 /// - `dest`: Destination slice to receive values into
697 ///
698 /// # Returns
699 /// Number of messages successfully received (0 to dest.len())
700 /// Returns 0 if the channel is closed and empty
701 ///
702 /// # 参数
703 /// - `dest`: 用于接收值的目标切片
704 ///
705 /// # 返回值
706 /// 成功接收的消息数量(0 到 dest.len())
707 /// 如果通道已关闭且为空,返回 0
708 pub async fn recv_slice(&mut self, dest: &mut [T]) -> usize {
709 let mut total_received = 0;
710
711 while total_received < dest.len() {
712 let received = self.try_recv_slice(&mut dest[total_received..]);
713 total_received += received;
714
715 if total_received < dest.len() {
716 // Check if channel is closed
717 // 检查通道是否已关闭
718 if self.inner.closed.load(Ordering::Acquire) {
719 // Channel closed, return what we have
720 // 通道已关闭,返回我们已有的内容
721 return total_received;
722 }
723
724 // Need to wait for data
725 // 需要等待数据
726 self.inner.recv_notify.notified().await;
727
728 // Check again after waking up
729 // 唤醒后再次检查
730 if self.inner.closed.load(Ordering::Acquire) {
731 // Try one more time to get remaining messages
732 // 再尝试一次获取剩余消息
733 let final_received = self.try_recv_slice(&mut dest[total_received..]);
734 total_received += final_received;
735 return total_received;
736 }
737 }
738 }
739
740 total_received
741 }
742}
743
744impl<T, const N: usize> Drop for Receiver<T, N> {
745 fn drop(&mut self) {
746 // Mark channel as closed when receiver is dropped
747 // 当接收器被丢弃时标记通道为已关闭
748 self.inner.closed.store(true, Ordering::Release);
749
750 // Notify sender in case it's waiting
751 // 通知发送者以防它正在等待
752 self.inner.send_notify.notify_one();
753 }
754}
755
756impl<T, const N: usize> Drop for Sender<T, N> {
757 fn drop(&mut self) {
758 // Mark channel as closed when sender is dropped
759 // 当发送器被丢弃时标记通道为已关闭
760 self.inner.closed.store(true, Ordering::Release);
761
762 // Notify receiver in case it's waiting
763 // 通知接收器以防它正在等待
764 self.inner.recv_notify.notify_one();
765 }
766}
767
768#[cfg(test)]
769mod tests {
770 use super::*;
771
772 #[tokio::test]
773 async fn test_basic_send_recv() {
774 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
775
776 tx.send(1).await.unwrap();
777 tx.send(2).await.unwrap();
778 tx.send(3).await.unwrap();
779
780 assert_eq!(rx.recv().await, Some(1));
781 assert_eq!(rx.recv().await, Some(2));
782 assert_eq!(rx.recv().await, Some(3));
783 }
784
785 #[tokio::test]
786 async fn test_try_send_recv() {
787 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
788
789 tx.try_send(1).unwrap();
790 tx.try_send(2).unwrap();
791
792 assert_eq!(rx.try_recv().unwrap(), 1);
793 assert_eq!(rx.try_recv().unwrap(), 2);
794 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
795 }
796
797 #[tokio::test]
798 async fn test_channel_closed_on_sender_drop() {
799 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
800
801 tx.send(1).await.unwrap();
802 drop(tx);
803
804 assert_eq!(rx.recv().await, Some(1));
805 assert_eq!(rx.recv().await, None);
806 }
807
808 #[tokio::test]
809 async fn test_channel_closed_on_receiver_drop() {
810 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
811
812 drop(rx);
813
814 assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
815 }
816
817 #[tokio::test]
818 async fn test_cross_task_communication() {
819 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
820
821 let sender_handle = tokio::spawn(async move {
822 for i in 0..10 {
823 tx.send(i).await.unwrap();
824 }
825 });
826
827 let receiver_handle = tokio::spawn(async move {
828 let mut sum = 0;
829 while let Some(value) = rx.recv().await {
830 sum += value;
831 }
832 sum
833 });
834
835 sender_handle.await.unwrap();
836 let sum = receiver_handle.await.unwrap();
837 assert_eq!(sum, 45); // 0+1+2+...+9 = 45
838 }
839
840 #[tokio::test]
841 async fn test_backpressure() {
842 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
843
844 // Fill the buffer
845 tx.try_send(1).unwrap();
846 tx.try_send(2).unwrap();
847 tx.try_send(3).unwrap();
848 tx.try_send(4).unwrap();
849
850 // Buffer should be full now
851 assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
852
853 // This should block and then succeed when we consume
854 let send_handle = tokio::spawn(async move {
855 tx.send(5).await.unwrap();
856 tx.send(6).await.unwrap();
857 });
858
859 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
860
861 assert_eq!(rx.recv().await, Some(1));
862 assert_eq!(rx.recv().await, Some(2));
863 assert_eq!(rx.recv().await, Some(3));
864 assert_eq!(rx.recv().await, Some(4));
865 assert_eq!(rx.recv().await, Some(5));
866 assert_eq!(rx.recv().await, Some(6));
867
868 send_handle.await.unwrap();
869 }
870
871 #[tokio::test]
872 async fn test_capacity_and_len() {
873 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
874
875 assert_eq!(rx.capacity(), 8);
876 assert_eq!(rx.len(), 0);
877 assert!(rx.is_empty());
878
879 tx.try_send(1).unwrap();
880 tx.try_send(2).unwrap();
881
882 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
883 assert_eq!(rx.len(), 2);
884 assert!(!rx.is_empty());
885 }
886
887 // ==================== New API Tests ====================
888
889 #[tokio::test]
890 async fn test_sender_capacity_queries() {
891 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
892
893 assert_eq!(tx.capacity(), 8);
894 assert_eq!(tx.len(), 0);
895 assert_eq!(tx.free_slots(), 8);
896 assert!(!tx.is_full());
897
898 tx.try_send(1).unwrap();
899 tx.try_send(2).unwrap();
900 tx.try_send(3).unwrap();
901
902 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
903 assert_eq!(tx.len(), 3);
904 assert_eq!(tx.free_slots(), 5);
905 assert!(!tx.is_full());
906
907 // Fill the buffer
908 tx.try_send(4).unwrap();
909 tx.try_send(5).unwrap();
910 tx.try_send(6).unwrap();
911 tx.try_send(7).unwrap();
912 tx.try_send(8).unwrap();
913
914 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
915 assert_eq!(tx.len(), 8);
916 assert_eq!(tx.free_slots(), 0);
917 assert!(tx.is_full());
918
919 // Pop one and check again
920 rx.recv().await;
921 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
922
923 assert_eq!(tx.len(), 7);
924 assert_eq!(tx.free_slots(), 1);
925 assert!(!tx.is_full());
926 }
927
928 #[tokio::test]
929 async fn test_try_send_slice() {
930 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
931
932 let data = [1, 2, 3, 4, 5];
933 let sent = tx.try_send_slice(&data);
934
935 assert_eq!(sent, 5);
936 assert_eq!(rx.len(), 5);
937
938 for i in 0..5 {
939 assert_eq!(rx.recv().await.unwrap(), data[i]);
940 }
941 }
942
943 #[tokio::test]
944 async fn test_try_send_slice_partial() {
945 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
946
947 // Fill with 5 elements, leaving room for 3
948 let initial = [1, 2, 3, 4, 5];
949 tx.try_send_slice(&initial);
950
951 // Try to send 10 more, should only send 3
952 let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
953 let sent = tx.try_send_slice(&more);
954
955 assert_eq!(sent, 3);
956 assert_eq!(rx.len(), 8);
957 assert!(tx.is_full());
958
959 // Verify values
960 for i in 1..=8 {
961 assert_eq!(rx.recv().await.unwrap(), i);
962 }
963 }
964
965 #[tokio::test]
966 async fn test_send_slice() {
967 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
968
969 let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
970 let result = tx.send_slice(&data).await;
971
972 assert_eq!(result.unwrap(), 10);
973 assert_eq!(rx.len(), 10);
974
975 for i in 0..10 {
976 assert_eq!(rx.recv().await.unwrap(), data[i]);
977 }
978 }
979
980 #[tokio::test]
981 async fn test_send_slice_with_backpressure() {
982 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
983
984 let data = [1, 2, 3, 4, 5, 6, 7, 8];
985
986 let send_handle = tokio::spawn(async move {
987 tx.send_slice(&data).await.unwrap()
988 });
989
990 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
991
992 // Consume some messages to make room
993 for i in 1..=4 {
994 assert_eq!(rx.recv().await.unwrap(), i);
995 }
996
997 let sent = send_handle.await.unwrap();
998 assert_eq!(sent, 8);
999
1000 // Verify remaining messages
1001 for i in 5..=8 {
1002 assert_eq!(rx.recv().await.unwrap(), i);
1003 }
1004 }
1005
1006 #[tokio::test]
1007 async fn test_peek() {
1008 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1009
1010 // Peek empty buffer
1011 assert!(rx.peek().is_none());
1012
1013 tx.try_send(42).unwrap();
1014 tx.try_send(100).unwrap();
1015 tx.try_send(200).unwrap();
1016
1017 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1018
1019 // Peek should return first element without removing it
1020 assert_eq!(rx.peek(), Some(&42));
1021 assert_eq!(rx.peek(), Some(&42)); // Peek again, should be same
1022 assert_eq!(rx.len(), 3); // Length unchanged
1023 }
1024
1025 #[tokio::test]
1026 async fn test_peek_after_recv() {
1027 let (tx, rx) = channel::<String, 32>(NonZeroUsize::new(8).unwrap());
1028
1029 tx.try_send("first".to_string()).unwrap();
1030 tx.try_send("second".to_string()).unwrap();
1031 tx.try_send("third".to_string()).unwrap();
1032
1033 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1034
1035 assert_eq!(rx.peek(), Some(&"first".to_string()));
1036 rx.recv().await.unwrap();
1037
1038 assert_eq!(rx.peek(), Some(&"second".to_string()));
1039 rx.recv().await.unwrap();
1040
1041 assert_eq!(rx.peek(), Some(&"third".to_string()));
1042 rx.recv().await.unwrap();
1043
1044 assert!(rx.peek().is_none());
1045 }
1046
1047 #[tokio::test]
1048 async fn test_clear() {
1049 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1050
1051 for i in 0..10 {
1052 tx.try_send(i).unwrap();
1053 }
1054
1055 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1056 assert_eq!(rx.len(), 10);
1057
1058 rx.clear();
1059
1060 assert_eq!(rx.len(), 0);
1061 assert!(rx.is_empty());
1062 }
1063
1064 #[tokio::test]
1065 async fn test_clear_with_drop() {
1066 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1067 use std::sync::Arc;
1068
1069 #[derive(Debug)]
1070 struct DropCounter {
1071 counter: Arc<AtomicUsize>,
1072 }
1073
1074 impl Drop for DropCounter {
1075 fn drop(&mut self) {
1076 self.counter.fetch_add(1, AtomicOrdering::SeqCst);
1077 }
1078 }
1079
1080 let counter = Arc::new(AtomicUsize::new(0));
1081
1082 {
1083 let (tx, mut rx) = channel::<DropCounter, 32>(NonZeroUsize::new(16).unwrap());
1084
1085 for _ in 0..8 {
1086 tx.try_send(DropCounter { counter: counter.clone() }).unwrap();
1087 }
1088
1089 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1090 assert_eq!(counter.load(AtomicOrdering::SeqCst), 0);
1091
1092 rx.clear();
1093
1094 assert_eq!(counter.load(AtomicOrdering::SeqCst), 8);
1095 }
1096 }
1097
1098 #[tokio::test]
1099 async fn test_drain() {
1100 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1101
1102 for i in 0..10 {
1103 tx.try_send(i).unwrap();
1104 }
1105
1106 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1107
1108 let collected: Vec<i32> = rx.drain().collect();
1109
1110 assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1111 assert!(rx.is_empty());
1112 }
1113
1114 #[tokio::test]
1115 async fn test_drain_empty() {
1116 let (_tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1117
1118 let collected: Vec<i32> = rx.drain().collect();
1119
1120 assert!(collected.is_empty());
1121 }
1122
1123 #[tokio::test]
1124 async fn test_drain_size_hint() {
1125 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1126
1127 for i in 0..5 {
1128 tx.try_send(i).unwrap();
1129 }
1130
1131 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1132
1133 let mut drain = rx.drain();
1134
1135 assert_eq!(drain.size_hint(), (5, Some(5)));
1136
1137 drain.next();
1138 assert_eq!(drain.size_hint(), (4, Some(4)));
1139
1140 drain.next();
1141 assert_eq!(drain.size_hint(), (3, Some(3)));
1142 }
1143
1144 #[tokio::test]
1145 async fn test_try_recv_slice() {
1146 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1147
1148 // Send some data
1149 for i in 0..10 {
1150 tx.try_send(i).unwrap();
1151 }
1152
1153 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1154
1155 let mut dest = [0u32; 5];
1156 let received = rx.try_recv_slice(&mut dest);
1157
1158 assert_eq!(received, 5);
1159 assert_eq!(dest, [0, 1, 2, 3, 4]);
1160 assert_eq!(rx.len(), 5);
1161 }
1162
1163 #[tokio::test]
1164 async fn test_try_recv_slice_partial() {
1165 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1166
1167 tx.try_send(1).unwrap();
1168 tx.try_send(2).unwrap();
1169 tx.try_send(3).unwrap();
1170
1171 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1172
1173 let mut dest = [0u32; 10];
1174 let received = rx.try_recv_slice(&mut dest);
1175
1176 assert_eq!(received, 3);
1177 assert_eq!(&dest[0..3], &[1, 2, 3]);
1178 assert!(rx.is_empty());
1179 }
1180
1181 #[tokio::test]
1182 async fn test_recv_slice() {
1183 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1184
1185 for i in 1..=10 {
1186 tx.try_send(i).unwrap();
1187 }
1188
1189 let mut dest = [0u32; 10];
1190 let received = rx.recv_slice(&mut dest).await;
1191
1192 assert_eq!(received, 10);
1193 assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1194 assert!(rx.is_empty());
1195 }
1196
1197 #[tokio::test]
1198 async fn test_recv_slice_with_wait() {
1199 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1200
1201 let recv_handle = tokio::spawn(async move {
1202 let mut dest = [0u32; 8];
1203 let received = rx.recv_slice(&mut dest).await;
1204 (received, dest)
1205 });
1206
1207 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1208
1209 // Send data gradually
1210 for i in 1..=8 {
1211 tx.send(i).await.unwrap();
1212 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1213 }
1214
1215 let (received, dest) = recv_handle.await.unwrap();
1216 assert_eq!(received, 8);
1217 assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8]);
1218 }
1219
1220 #[tokio::test]
1221 async fn test_recv_slice_channel_closed() {
1222 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1223
1224 tx.try_send(1).unwrap();
1225 tx.try_send(2).unwrap();
1226 tx.try_send(3).unwrap();
1227
1228 drop(tx); // Close the channel
1229
1230 let mut dest = [0u32; 10];
1231 let received = rx.recv_slice(&mut dest).await;
1232
1233 // Should receive the 3 available messages, then stop
1234 assert_eq!(received, 3);
1235 assert_eq!(&dest[0..3], &[1, 2, 3]);
1236 }
1237
1238 #[tokio::test]
1239 async fn test_combined_new_apis() {
1240 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1241
1242 // Batch send
1243 let data = [1, 2, 3, 4, 5];
1244 tx.try_send_slice(&data);
1245
1246 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1247
1248 assert_eq!(tx.len(), 5);
1249 assert_eq!(rx.len(), 5);
1250 assert_eq!(rx.capacity(), 16);
1251
1252 // Peek
1253 assert_eq!(rx.peek(), Some(&1));
1254
1255 // Batch receive
1256 let mut dest = [0u32; 3];
1257 rx.try_recv_slice(&mut dest);
1258 assert_eq!(dest, [1, 2, 3]);
1259
1260 assert_eq!(rx.len(), 2);
1261 assert_eq!(tx.free_slots(), 14);
1262
1263 // Drain remaining
1264 let remaining: Vec<u32> = rx.drain().collect();
1265 assert_eq!(remaining, vec![4, 5]);
1266
1267 assert!(rx.is_empty());
1268 assert!(!tx.is_full());
1269 }
1270}