lite_sync/spsc.rs
1/// High-performance async SPSC (Single Producer Single Consumer) channel
2///
3/// Built on top of `smallring` - a high-performance ring buffer with inline storage support.
4/// Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5/// The `N` const generic parameter allows specifying inline buffer size for zero-allocation small channels.
6///
7/// 高性能异步 SPSC(单生产者单消费者)通道
8///
9/// 基于 `smallring` 构建 - 一个支持内联存储的高性能环形缓冲区。
10/// 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
11/// `N` 常量泛型参数允许指定内联缓冲区大小,实现小容量通道的零分配。
12///
13/// # 安全性说明 (Safety Notes)
14///
15/// 本实现使用 `UnsafeCell` 来提供零成本内部可变性,而不是 `Mutex`。
16/// 这是安全的,基于以下保证:
17///
18/// 1. **单一所有权**:`Sender` 和 `Receiver` 都不实现 `Clone`,确保每个通道只有一个发送者和一个接收者
19/// 2. **访问隔离**:`Producer` 只被唯一的 `Sender` 访问,`Consumer` 只被唯一的 `Receiver` 访问
20/// 3. **无数据竞争**:由于单一所有权,不会有多个线程同时访问同一个 `Producer` 或 `Consumer`
21/// 4. **原子通信**:`Producer` 和 `Consumer` 内部使用原子操作进行跨线程通信
22/// 5. **类型系统保证**:通过类型系统强制 SPSC 语义,防止误用为 MPMC
23///
24/// 这种设计实现了零同步开销,完全消除了 `Mutex` 的性能损失。
25///
26/// # Safety Guarantees
27///
28/// This implementation uses `UnsafeCell` for zero-cost interior mutability instead of `Mutex`.
29/// This is safe based on the following guarantees:
30///
31/// 1. **Single Ownership**: Neither `Sender` nor `Receiver` implements `Clone`, ensuring only one sender and one receiver per channel
32/// 2. **Access Isolation**: `Producer` is only accessed by the unique `Sender`, `Consumer` only by the unique `Receiver`
33/// 3. **No Data Races**: Due to single ownership, there's no concurrent access to the same `Producer` or `Consumer`
34/// 4. **Atomic Communication**: `Producer` and `Consumer` use atomic operations internally for cross-thread communication
35/// 5. **Type System Enforcement**: SPSC semantics are enforced by the type system, preventing misuse as MPMC
36///
37/// This design achieves zero synchronization overhead, completely eliminating `Mutex` performance costs.
38use std::cell::UnsafeCell;
39use std::num::NonZeroUsize;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::sync::Arc;
42use super::notify::SingleWaiterNotify;
43
44/// SPSC channel creation function
45///
46/// Creates a bounded SPSC channel with the specified capacity.
47///
48/// # Type Parameters
49/// - `T`: The type of messages to be sent through the channel
50/// - `N`: The size of the inline buffer (number of elements stored inline before heap allocation)
51///
52/// # Parameters
53/// - `capacity`: Channel capacity (total number of elements the channel can hold)
54///
55/// # Returns
56/// A tuple of (Sender, Receiver)
57///
58/// # Examples
59///
60/// ```
61/// use lite_sync::spsc::channel;
62/// use std::num::NonZeroUsize;
63///
64/// #[tokio::main]
65/// async fn main() {
66/// // Create a channel with capacity 32 and inline buffer size 8
67/// let (tx, rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
68///
69/// tokio::spawn(async move {
70/// tx.send(42).await.unwrap();
71/// });
72///
73/// let value = rx.recv().await.unwrap();
74/// assert_eq!(value, 42);
75/// }
76/// ```
77///
78/// SPSC 通道创建函数
79///
80/// 创建指定容量的有界 SPSC 通道。
81///
82/// # 类型参数
83/// - `T`: 通过通道发送的消息类型
84/// - `N`: 内联缓冲区大小(在堆分配之前内联存储的元素数量)
85///
86/// # 参数
87/// - `capacity`: 通道容量(通道可以容纳的元素总数)
88///
89/// # 返回值
90/// 返回 (Sender, Receiver) 元组
91pub fn channel<T, const N: usize>(capacity: NonZeroUsize) -> (Sender<T, N>, Receiver<T, N>) {
92 let (producer, consumer) = smallring::new::<T, N>(capacity);
93
94 let inner = Arc::new(Inner::<T, N> {
95 producer: UnsafeCell::new(producer),
96 consumer: UnsafeCell::new(consumer),
97 closed: AtomicBool::new(false),
98 recv_notify: SingleWaiterNotify::new(),
99 send_notify: SingleWaiterNotify::new(),
100 });
101
102 let sender = Sender {
103 inner: inner.clone(),
104 };
105
106 let receiver = Receiver {
107 inner,
108 };
109
110 (sender, receiver)
111}
112
113/// Shared internal state for SPSC channel
114///
115/// Contains both shared state and the ring buffer halves.
116/// Uses UnsafeCell for zero-cost interior mutability of Producer/Consumer.
117///
118/// SPSC 通道的共享内部状态
119///
120/// 包含共享状态和环形缓冲区的两端。
121/// 使用 UnsafeCell 实现 Producer/Consumer 的零成本内部可变性。
122struct Inner<T, const N: usize = 32> {
123 /// Producer (wrapped in UnsafeCell for zero-cost interior mutability)
124 ///
125 /// 生产者(用 UnsafeCell 包装以实现零成本内部可变性)
126 producer: UnsafeCell<smallring::Producer<T, N>>,
127
128 /// Consumer (wrapped in UnsafeCell for zero-cost interior mutability)
129 ///
130 /// 消费者(用 UnsafeCell 包装以实现零成本内部可变性)
131 consumer: UnsafeCell<smallring::Consumer<T, N>>,
132
133 /// Channel closed flag
134 ///
135 /// 通道关闭标志
136 closed: AtomicBool,
137
138 /// Notifier for receiver waiting (lightweight single-waiter)
139 ///
140 /// 接收者等待通知器(轻量级单等待者)
141 recv_notify: SingleWaiterNotify,
142
143 /// Notifier for sender waiting when buffer is full (lightweight single-waiter)
144 ///
145 /// 发送者等待通知器,当缓冲区满时使用(轻量级单等待者)
146 send_notify: SingleWaiterNotify,
147}
148
149// SAFETY: Inner<T> 可以在线程间安全共享的原因:
150// 1. Sender 和 Receiver 都不实现 Clone,确保单一所有权
151// 2. producer 只被唯一的 Sender 访问,不会有多线程竞争
152// 3. consumer 只被唯一的 Receiver 访问,不会有多线程竞争
153// 4. closed、recv_notify、send_notify 都已经是线程安全的
154// 5. Producer 和 Consumer 内部使用原子操作进行跨线程通信
155unsafe impl<T: Send, const N: usize> Sync for Inner<T, N> {}
156
157/// SPSC channel sender
158///
159/// SPSC 通道发送器
160pub struct Sender<T, const N: usize> {
161 inner: Arc<Inner<T, N>>,
162}
163
164/// SPSC channel receiver
165///
166/// SPSC 通道接收器
167pub struct Receiver<T, const N: usize> {
168 inner: Arc<Inner<T, N>>,
169}
170
171/// Draining iterator for the SPSC channel
172///
173/// SPSC 通道的消费迭代器
174///
175/// This iterator removes and returns messages from the channel until it's empty.
176///
177/// 此迭代器从通道中移除并返回消息,直到通道为空。
178///
179/// # Type Parameters
180/// - `T`: Message type
181/// - `N`: Inline buffer size
182///
183/// # 类型参数
184/// - `T`: 消息类型
185/// - `N`: 内联缓冲区大小
186pub struct Drain<'a, T, const N: usize> {
187 receiver: &'a mut Receiver<T, N>,
188}
189
190impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
191 type Item = T;
192
193 #[inline]
194 fn next(&mut self) -> Option<Self::Item> {
195 self.receiver.try_recv().ok()
196 }
197
198 #[inline]
199 fn size_hint(&self) -> (usize, Option<usize>) {
200 let len = self.receiver.len();
201 (len, Some(len))
202 }
203}
204
205/// Send error type
206///
207/// 发送错误类型
208#[derive(Debug, Clone, Copy, PartialEq, Eq)]
209pub enum SendError<T> {
210 /// Channel is closed
211 ///
212 /// 通道已关闭
213 Closed(T),
214}
215
216/// Try-receive error type
217///
218/// 尝试接收错误类型
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum TryRecvError {
221 /// Channel is empty
222 ///
223 /// 通道为空
224 Empty,
225
226 /// Channel is closed
227 ///
228 /// 通道已关闭
229 Closed,
230}
231
232/// Try-send error type
233///
234/// 尝试发送错误类型
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236pub enum TrySendError<T> {
237 /// Buffer is full
238 ///
239 /// 缓冲区已满
240 Full(T),
241
242 /// Channel is closed
243 ///
244 /// 通道已关闭
245 Closed(T),
246}
247
248impl<T, const N: usize> Sender<T, N> {
249 /// Send a message to the channel (async, waits if buffer is full)
250 ///
251 /// # Errors
252 /// Returns `SendError::Closed` if the receiver has been dropped
253 ///
254 /// 向通道发送消息(异步,如果缓冲区满则等待)
255 ///
256 /// # 错误
257 /// 如果接收器已被丢弃,返回 `SendError::Closed`
258 pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
259 loop {
260 match self.try_send(value) {
261 Ok(()) => return Ok(()),
262 Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
263 Err(TrySendError::Full(v)) => {
264 // Store the value to retry
265 // 存储值以便重试
266 value = v;
267
268 // Wait for space to become available
269 // 等待空间可用
270 self.inner.send_notify.notified().await;
271
272 // Check if channel was closed while waiting
273 // 检查等待时通道是否已关闭
274 if self.inner.closed.load(Ordering::Acquire) {
275 return Err(SendError::Closed(value));
276 }
277
278 // Retry with the value in next loop iteration
279 // 在下一次循环迭代中使用该值重试
280 }
281 }
282 }
283 }
284
285 /// Try to send a message without blocking
286 ///
287 /// # Errors
288 /// - Returns `TrySendError::Full` if the buffer is full
289 /// - Returns `TrySendError::Closed` if the receiver has been dropped
290 ///
291 /// 尝试非阻塞地发送消息
292 ///
293 /// # 错误
294 /// - 如果缓冲区满,返回 `TrySendError::Full`
295 /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
296 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
297 // Check if channel is closed first
298 // 首先检查通道是否已关闭
299 if self.inner.closed.load(Ordering::Acquire) {
300 return Err(TrySendError::Closed(value));
301 }
302
303 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
304 // 不会有多个线程同时访问 producer
305 let producer = unsafe { &mut *self.inner.producer.get() };
306
307 match producer.push(value) {
308 Ok(()) => {
309 // Successfully sent, notify receiver
310 // 成功发送,通知接收者
311 self.inner.recv_notify.notify_one();
312 Ok(())
313 }
314 Err(smallring::PushError::Full(v)) => {
315 Err(TrySendError::Full(v))
316 }
317 }
318 }
319
320 /// Check if the channel is closed
321 ///
322 /// 检查通道是否已关闭
323 #[inline]
324 pub fn is_closed(&self) -> bool {
325 self.inner.closed.load(Ordering::Acquire)
326 }
327
328 /// Get the capacity of the channel
329 ///
330 /// 获取通道的容量
331 #[inline]
332 pub fn capacity(&self) -> usize {
333 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
334 // capacity 只读取数据,不需要可变访问
335 let producer = unsafe { &*self.inner.producer.get() };
336 producer.capacity()
337 }
338
339 /// Get the number of messages currently in the channel
340 ///
341 /// 获取通道中当前的消息数量
342 #[inline]
343 pub fn len(&self) -> usize {
344 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
345 // slots 只读取数据,不需要可变访问
346 let producer = unsafe { &*self.inner.producer.get() };
347 producer.slots()
348 }
349
350 #[inline]
351 pub fn is_empty(&self) -> bool {
352 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
353 // is_empty 只读取数据,不需要可变访问
354 let producer = unsafe { &*self.inner.producer.get() };
355 producer.is_empty()
356 }
357
358 /// Get the number of free slots in the channel
359 ///
360 /// 获取通道中的空闲空间数量
361 #[inline]
362 pub fn free_slots(&self) -> usize {
363 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
364 // free_slots 只读取数据,不需要可变访问
365 let producer = unsafe { &*self.inner.producer.get() };
366 producer.free_slots()
367 }
368
369 /// Check if the channel is full
370 ///
371 /// 检查通道是否已满
372 #[inline]
373 pub fn is_full(&self) -> bool {
374 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
375 // is_full 只读取数据,不需要可变访问
376 let producer = unsafe { &*self.inner.producer.get() };
377 producer.is_full()
378 }
379}
380
381impl<T: Copy, const N: usize> Sender<T, N> {
382 /// Try to send multiple values from a slice without blocking
383 ///
384 /// 尝试非阻塞地从切片发送多个值
385 ///
386 /// This method attempts to send as many elements as possible from the slice.
387 /// It returns the number of elements successfully sent.
388 ///
389 /// 此方法尝试从切片中发送尽可能多的元素。
390 /// 返回成功发送的元素数量。
391 ///
392 /// # Parameters
393 /// - `values`: Slice of values to send
394 ///
395 /// # Returns
396 /// Number of elements successfully sent (0 to values.len())
397 ///
398 /// # 参数
399 /// - `values`: 要发送的值的切片
400 ///
401 /// # 返回值
402 /// 成功发送的元素数量(0 到 values.len())
403 pub fn try_send_slice(&self, values: &[T]) -> usize {
404 // Check if channel is closed first
405 // 首先检查通道是否已关闭
406 if self.inner.closed.load(Ordering::Acquire) {
407 return 0;
408 }
409
410 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
411 // 不会有多个线程同时访问 producer
412 let producer = unsafe { &mut *self.inner.producer.get() };
413
414 let sent = producer.push_slice(values);
415
416 if sent > 0 {
417 // Successfully sent some messages, notify receiver
418 // 成功发送一些消息,通知接收者
419 self.inner.recv_notify.notify_one();
420 }
421
422 sent
423 }
424
425 /// Send multiple values from a slice (async, waits if buffer is full)
426 ///
427 /// 从切片发送多个值(异步,如果缓冲区满则等待)
428 ///
429 /// This method will send all elements from the slice, waiting if necessary
430 /// when the buffer becomes full. Returns the number of elements sent, or
431 /// an error if the channel is closed.
432 ///
433 /// 此方法将发送切片中的所有元素,必要时在缓冲区满时等待。
434 /// 返回发送的元素数量,如果通道关闭则返回错误。
435 ///
436 /// # Parameters
437 /// - `values`: Slice of values to send
438 ///
439 /// # Returns
440 /// - `Ok(usize)`: Number of elements successfully sent
441 /// - `Err(SendError)`: Channel is closed
442 ///
443 /// # 参数
444 /// - `values`: 要发送的值的切片
445 ///
446 /// # 返回值
447 /// - `Ok(usize)`: 成功发送的元素数量
448 /// - `Err(SendError)`: 通道已关闭
449 pub async fn send_slice(&self, values: &[T]) -> Result<usize, SendError<()>> {
450 let mut total_sent = 0;
451
452 while total_sent < values.len() {
453 // Check if channel is closed
454 // 检查通道是否已关闭
455 if self.inner.closed.load(Ordering::Acquire) {
456 return Err(SendError::Closed(()));
457 }
458
459 let sent = self.try_send_slice(&values[total_sent..]);
460 total_sent += sent;
461
462 if total_sent < values.len() {
463 // Need to wait for space
464 // 需要等待空间
465 self.inner.send_notify.notified().await;
466
467 // Check if channel was closed while waiting
468 // 检查等待时通道是否已关闭
469 if self.inner.closed.load(Ordering::Acquire) {
470 return Err(SendError::Closed(()));
471 }
472 }
473 }
474
475 Ok(total_sent)
476 }
477}
478
479impl<T, const N: usize> Receiver<T, N> {
480 /// Receive a message from the channel (async, waits if buffer is empty)
481 ///
482 /// Returns `None` if the channel is closed and empty
483 ///
484 /// 从通道接收消息(异步,如果缓冲区空则等待)
485 ///
486 /// 如果通道已关闭且为空,返回 `None`
487 pub async fn recv(&self) -> Option<T> {
488 loop {
489 match self.try_recv() {
490 Ok(value) => return Some(value),
491 Err(TryRecvError::Closed) => return None,
492 Err(TryRecvError::Empty) => {
493 // Check if channel is closed before waiting
494 // 等待前检查通道是否已关闭
495 if self.inner.closed.load(Ordering::Acquire) {
496 // Double check if there are any remaining items
497 // 再次检查是否有剩余项
498 if let Ok(value) = self.try_recv() {
499 return Some(value);
500 }
501 return None;
502 }
503
504 // Wait for data to become available
505 // 等待数据可用
506 self.inner.recv_notify.notified().await;
507 }
508 }
509 }
510 }
511
512 /// Try to receive a message without blocking
513 ///
514 /// # Errors
515 /// - Returns `TryRecvError::Empty` if the buffer is empty
516 /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
517 ///
518 /// 尝试非阻塞地接收消息
519 ///
520 /// # 错误
521 /// - 如果缓冲区空,返回 `TryRecvError::Empty`
522 /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
523 pub fn try_recv(&self) -> Result<T, TryRecvError> {
524 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
525 // 不会有多个线程同时访问 consumer
526 let consumer = unsafe { &mut *self.inner.consumer.get() };
527
528 match consumer.pop() {
529 Ok(value) => {
530 // Successfully received, notify sender
531 // 成功接收,通知发送者
532 self.inner.send_notify.notify_one();
533 Ok(value)
534 }
535 Err(smallring::PopError::Empty) => {
536 if self.inner.closed.load(Ordering::Acquire) {
537 Err(TryRecvError::Closed)
538 } else {
539 Err(TryRecvError::Empty)
540 }
541 }
542 }
543 }
544
545 /// Check if the channel is empty
546 ///
547 /// 检查通道是否为空
548 #[inline]
549 pub fn is_empty(&self) -> bool {
550 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
551 // is_empty 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
552 let consumer = unsafe { &*self.inner.consumer.get() };
553 consumer.is_empty()
554 }
555
556 /// Get the number of messages currently in the channel
557 ///
558 /// 获取通道中当前的消息数量
559 #[inline]
560 pub fn len(&self) -> usize {
561 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
562 // slots 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
563 let consumer = unsafe { &*self.inner.consumer.get() };
564 consumer.slots()
565 }
566
567 /// Get the capacity of the channel
568 ///
569 /// 获取通道的容量
570 #[inline]
571 pub fn capacity(&self) -> usize {
572 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
573 // capacity 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
574 let consumer = unsafe { &*self.inner.consumer.get() };
575 consumer.buffer().capacity()
576 }
577
578 /// Peek at the first message without removing it
579 ///
580 /// 查看第一个消息但不移除它
581 ///
582 /// # Returns
583 /// `Some(&T)` if there is a message, `None` if the channel is empty
584 ///
585 /// # 返回值
586 /// 如果有消息则返回 `Some(&T)`,如果通道为空则返回 `None`
587 ///
588 /// # Safety
589 /// The returned reference is valid only as long as no other operations
590 /// are performed on the Receiver that might modify the buffer.
591 ///
592 /// # 安全性
593 /// 返回的引用仅在未对 Receiver 执行可能修改缓冲区的其他操作时有效。
594 #[inline]
595 pub fn peek(&self) -> Option<&T> {
596 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
597 // peek 只读取数据,不需要可变访问
598 let consumer = unsafe { &*self.inner.consumer.get() };
599 consumer.peek()
600 }
601
602 /// Clear all messages from the channel
603 ///
604 /// 清空通道中的所有消息
605 ///
606 /// This method pops and drops all messages currently in the channel.
607 ///
608 /// 此方法弹出并 drop 通道中当前的所有消息。
609 pub fn clear(&mut self) {
610 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
611 // 我们有可变引用,因此可以安全地访问 consumer
612 let consumer = unsafe { &mut *self.inner.consumer.get() };
613 consumer.clear();
614
615 // Notify sender that space is available
616 // 通知发送者空间可用
617 self.inner.send_notify.notify_one();
618 }
619
620 /// Create a draining iterator
621 ///
622 /// 创建一个消费迭代器
623 ///
624 /// Returns an iterator that removes and returns messages from the channel.
625 /// The iterator will continue until the channel is empty.
626 ///
627 /// 返回一个从通道中移除并返回消息的迭代器。
628 /// 迭代器将持续运行直到通道为空。
629 ///
630 /// # Examples
631 ///
632 /// ```
633 /// use lite_sync::spsc::channel;
634 /// use std::num::NonZeroUsize;
635 ///
636 /// # #[tokio::main]
637 /// # async fn main() {
638 /// let (tx, mut rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
639 /// tx.try_send(1).unwrap();
640 /// tx.try_send(2).unwrap();
641 /// tx.try_send(3).unwrap();
642 ///
643 /// let items: Vec<i32> = rx.drain().collect();
644 /// assert_eq!(items, vec![1, 2, 3]);
645 /// assert!(rx.is_empty());
646 /// # }
647 /// ```
648 #[inline]
649 pub fn drain(&mut self) -> Drain<'_, T, N> {
650 Drain { receiver: self }
651 }
652}
653
654impl<T: Copy, const N: usize> Receiver<T, N> {
655 /// Try to receive multiple values into a slice without blocking
656 ///
657 /// 尝试非阻塞地将多个值接收到切片
658 ///
659 /// This method attempts to receive as many messages as possible into the provided slice.
660 /// It returns the number of messages successfully received.
661 ///
662 /// 此方法尝试将尽可能多的消息接收到提供的切片中。
663 /// 返回成功接收的消息数量。
664 ///
665 /// # Parameters
666 /// - `dest`: Destination slice to receive values into
667 ///
668 /// # Returns
669 /// Number of messages successfully received (0 to dest.len())
670 ///
671 /// # 参数
672 /// - `dest`: 用于接收值的目标切片
673 ///
674 /// # 返回值
675 /// 成功接收的消息数量(0 到 dest.len())
676 pub fn try_recv_slice(&mut self, dest: &mut [T]) -> usize {
677 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
678 // 我们有可变引用,因此可以安全地访问 consumer
679 let consumer = unsafe { &mut *self.inner.consumer.get() };
680
681 let received = consumer.pop_slice(dest);
682
683 if received > 0 {
684 // Successfully received some messages, notify sender
685 // 成功接收一些消息,通知发送者
686 self.inner.send_notify.notify_one();
687 }
688
689 received
690 }
691
692 /// Receive multiple values into a slice (async, waits if buffer is empty)
693 ///
694 /// 将多个值接收到切片(异步,如果缓冲区空则等待)
695 ///
696 /// This method will fill the destination slice as much as possible, waiting if necessary
697 /// when the buffer becomes empty. Returns the number of messages received.
698 ///
699 /// 此方法将尽可能填充目标切片,必要时在缓冲区空时等待。
700 /// 返回接收的消息数量。
701 ///
702 /// # Parameters
703 /// - `dest`: Destination slice to receive values into
704 ///
705 /// # Returns
706 /// Number of messages successfully received (0 to dest.len())
707 /// Returns 0 if the channel is closed and empty
708 ///
709 /// # 参数
710 /// - `dest`: 用于接收值的目标切片
711 ///
712 /// # 返回值
713 /// 成功接收的消息数量(0 到 dest.len())
714 /// 如果通道已关闭且为空,返回 0
715 pub async fn recv_slice(&mut self, dest: &mut [T]) -> usize {
716 let mut total_received = 0;
717
718 while total_received < dest.len() {
719 let received = self.try_recv_slice(&mut dest[total_received..]);
720 total_received += received;
721
722 if total_received < dest.len() {
723 // Check if channel is closed
724 // 检查通道是否已关闭
725 if self.inner.closed.load(Ordering::Acquire) {
726 // Channel closed, return what we have
727 // 通道已关闭,返回我们已有的内容
728 return total_received;
729 }
730
731 // Need to wait for data
732 // 需要等待数据
733 self.inner.recv_notify.notified().await;
734
735 // Check again after waking up
736 // 唤醒后再次检查
737 if self.inner.closed.load(Ordering::Acquire) {
738 // Try one more time to get remaining messages
739 // 再尝试一次获取剩余消息
740 let final_received = self.try_recv_slice(&mut dest[total_received..]);
741 total_received += final_received;
742 return total_received;
743 }
744 }
745 }
746
747 total_received
748 }
749}
750
751impl<T, const N: usize> Drop for Receiver<T, N> {
752 fn drop(&mut self) {
753 // Mark channel as closed when receiver is dropped
754 // 当接收器被丢弃时标记通道为已关闭
755 self.inner.closed.store(true, Ordering::Release);
756
757 // Notify sender in case it's waiting
758 // 通知发送者以防它正在等待
759 self.inner.send_notify.notify_one();
760 }
761}
762
763impl<T, const N: usize> Drop for Sender<T, N> {
764 fn drop(&mut self) {
765 // Mark channel as closed when sender is dropped
766 // 当发送器被丢弃时标记通道为已关闭
767 self.inner.closed.store(true, Ordering::Release);
768
769 // Notify receiver in case it's waiting
770 // 通知接收器以防它正在等待
771 self.inner.recv_notify.notify_one();
772 }
773}
774
775#[cfg(test)]
776mod tests {
777 use super::*;
778
779 #[tokio::test]
780 async fn test_basic_send_recv() {
781 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
782
783 tx.send(1).await.unwrap();
784 tx.send(2).await.unwrap();
785 tx.send(3).await.unwrap();
786
787 assert_eq!(rx.recv().await, Some(1));
788 assert_eq!(rx.recv().await, Some(2));
789 assert_eq!(rx.recv().await, Some(3));
790 }
791
792 #[tokio::test]
793 async fn test_try_send_recv() {
794 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
795
796 tx.try_send(1).unwrap();
797 tx.try_send(2).unwrap();
798
799 assert_eq!(rx.try_recv().unwrap(), 1);
800 assert_eq!(rx.try_recv().unwrap(), 2);
801 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
802 }
803
804 #[tokio::test]
805 async fn test_channel_closed_on_sender_drop() {
806 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
807
808 tx.send(1).await.unwrap();
809 drop(tx);
810
811 assert_eq!(rx.recv().await, Some(1));
812 assert_eq!(rx.recv().await, None);
813 }
814
815 #[tokio::test]
816 async fn test_channel_closed_on_receiver_drop() {
817 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
818
819 drop(rx);
820
821 assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
822 }
823
824 #[tokio::test]
825 async fn test_cross_task_communication() {
826 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
827
828 let sender_handle = tokio::spawn(async move {
829 for i in 0..10 {
830 tx.send(i).await.unwrap();
831 }
832 });
833
834 let receiver_handle = tokio::spawn(async move {
835 let mut sum = 0;
836 while let Some(value) = rx.recv().await {
837 sum += value;
838 }
839 sum
840 });
841
842 sender_handle.await.unwrap();
843 let sum = receiver_handle.await.unwrap();
844 assert_eq!(sum, 45); // 0+1+2+...+9 = 45
845 }
846
847 #[tokio::test]
848 async fn test_backpressure() {
849 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
850
851 // Fill the buffer
852 tx.try_send(1).unwrap();
853 tx.try_send(2).unwrap();
854 tx.try_send(3).unwrap();
855 tx.try_send(4).unwrap();
856
857 // Buffer should be full now
858 assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
859
860 // This should block and then succeed when we consume
861 let send_handle = tokio::spawn(async move {
862 tx.send(5).await.unwrap();
863 tx.send(6).await.unwrap();
864 });
865
866 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
867
868 assert_eq!(rx.recv().await, Some(1));
869 assert_eq!(rx.recv().await, Some(2));
870 assert_eq!(rx.recv().await, Some(3));
871 assert_eq!(rx.recv().await, Some(4));
872 assert_eq!(rx.recv().await, Some(5));
873 assert_eq!(rx.recv().await, Some(6));
874
875 send_handle.await.unwrap();
876 }
877
878 #[tokio::test]
879 async fn test_capacity_and_len() {
880 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
881
882 assert_eq!(rx.capacity(), 8);
883 assert_eq!(rx.len(), 0);
884 assert!(rx.is_empty());
885
886 tx.try_send(1).unwrap();
887 tx.try_send(2).unwrap();
888
889 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
890 assert_eq!(rx.len(), 2);
891 assert!(!rx.is_empty());
892 }
893
894 // ==================== New API Tests ====================
895
896 #[tokio::test]
897 async fn test_sender_capacity_queries() {
898 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
899
900 assert_eq!(tx.capacity(), 8);
901 assert_eq!(tx.len(), 0);
902 assert_eq!(tx.free_slots(), 8);
903 assert!(!tx.is_full());
904
905 tx.try_send(1).unwrap();
906 tx.try_send(2).unwrap();
907 tx.try_send(3).unwrap();
908
909 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
910 assert_eq!(tx.len(), 3);
911 assert_eq!(tx.free_slots(), 5);
912 assert!(!tx.is_full());
913
914 // Fill the buffer
915 tx.try_send(4).unwrap();
916 tx.try_send(5).unwrap();
917 tx.try_send(6).unwrap();
918 tx.try_send(7).unwrap();
919 tx.try_send(8).unwrap();
920
921 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
922 assert_eq!(tx.len(), 8);
923 assert_eq!(tx.free_slots(), 0);
924 assert!(tx.is_full());
925
926 // Pop one and check again
927 rx.recv().await;
928 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
929
930 assert_eq!(tx.len(), 7);
931 assert_eq!(tx.free_slots(), 1);
932 assert!(!tx.is_full());
933 }
934
935 #[tokio::test]
936 async fn test_try_send_slice() {
937 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
938
939 let data = [1, 2, 3, 4, 5];
940 let sent = tx.try_send_slice(&data);
941
942 assert_eq!(sent, 5);
943 assert_eq!(rx.len(), 5);
944
945 for i in 0..5 {
946 assert_eq!(rx.recv().await.unwrap(), data[i]);
947 }
948 }
949
950 #[tokio::test]
951 async fn test_try_send_slice_partial() {
952 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
953
954 // Fill with 5 elements, leaving room for 3
955 let initial = [1, 2, 3, 4, 5];
956 tx.try_send_slice(&initial);
957
958 // Try to send 10 more, should only send 3
959 let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
960 let sent = tx.try_send_slice(&more);
961
962 assert_eq!(sent, 3);
963 assert_eq!(rx.len(), 8);
964 assert!(tx.is_full());
965
966 // Verify values
967 for i in 1..=8 {
968 assert_eq!(rx.recv().await.unwrap(), i);
969 }
970 }
971
972 #[tokio::test]
973 async fn test_send_slice() {
974 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
975
976 let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
977 let result = tx.send_slice(&data).await;
978
979 assert_eq!(result.unwrap(), 10);
980 assert_eq!(rx.len(), 10);
981
982 for i in 0..10 {
983 assert_eq!(rx.recv().await.unwrap(), data[i]);
984 }
985 }
986
987 #[tokio::test]
988 async fn test_send_slice_with_backpressure() {
989 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
990
991 let data = [1, 2, 3, 4, 5, 6, 7, 8];
992
993 let send_handle = tokio::spawn(async move {
994 tx.send_slice(&data).await.unwrap()
995 });
996
997 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
998
999 // Consume some messages to make room
1000 for i in 1..=4 {
1001 assert_eq!(rx.recv().await.unwrap(), i);
1002 }
1003
1004 let sent = send_handle.await.unwrap();
1005 assert_eq!(sent, 8);
1006
1007 // Verify remaining messages
1008 for i in 5..=8 {
1009 assert_eq!(rx.recv().await.unwrap(), i);
1010 }
1011 }
1012
1013 #[tokio::test]
1014 async fn test_peek() {
1015 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1016
1017 // Peek empty buffer
1018 assert!(rx.peek().is_none());
1019
1020 tx.try_send(42).unwrap();
1021 tx.try_send(100).unwrap();
1022 tx.try_send(200).unwrap();
1023
1024 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1025
1026 // Peek should return first element without removing it
1027 assert_eq!(rx.peek(), Some(&42));
1028 assert_eq!(rx.peek(), Some(&42)); // Peek again, should be same
1029 assert_eq!(rx.len(), 3); // Length unchanged
1030 }
1031
1032 #[tokio::test]
1033 async fn test_peek_after_recv() {
1034 let (tx, rx) = channel::<String, 32>(NonZeroUsize::new(8).unwrap());
1035
1036 tx.try_send("first".to_string()).unwrap();
1037 tx.try_send("second".to_string()).unwrap();
1038 tx.try_send("third".to_string()).unwrap();
1039
1040 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1041
1042 assert_eq!(rx.peek(), Some(&"first".to_string()));
1043 rx.recv().await.unwrap();
1044
1045 assert_eq!(rx.peek(), Some(&"second".to_string()));
1046 rx.recv().await.unwrap();
1047
1048 assert_eq!(rx.peek(), Some(&"third".to_string()));
1049 rx.recv().await.unwrap();
1050
1051 assert!(rx.peek().is_none());
1052 }
1053
1054 #[tokio::test]
1055 async fn test_clear() {
1056 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1057
1058 for i in 0..10 {
1059 tx.try_send(i).unwrap();
1060 }
1061
1062 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1063 assert_eq!(rx.len(), 10);
1064
1065 rx.clear();
1066
1067 assert_eq!(rx.len(), 0);
1068 assert!(rx.is_empty());
1069 }
1070
1071 #[tokio::test]
1072 async fn test_clear_with_drop() {
1073 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1074 use std::sync::Arc;
1075
1076 #[derive(Debug)]
1077 struct DropCounter {
1078 counter: Arc<AtomicUsize>,
1079 }
1080
1081 impl Drop for DropCounter {
1082 fn drop(&mut self) {
1083 self.counter.fetch_add(1, AtomicOrdering::SeqCst);
1084 }
1085 }
1086
1087 let counter = Arc::new(AtomicUsize::new(0));
1088
1089 {
1090 let (tx, mut rx) = channel::<DropCounter, 32>(NonZeroUsize::new(16).unwrap());
1091
1092 for _ in 0..8 {
1093 tx.try_send(DropCounter { counter: counter.clone() }).unwrap();
1094 }
1095
1096 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1097 assert_eq!(counter.load(AtomicOrdering::SeqCst), 0);
1098
1099 rx.clear();
1100
1101 assert_eq!(counter.load(AtomicOrdering::SeqCst), 8);
1102 }
1103 }
1104
1105 #[tokio::test]
1106 async fn test_drain() {
1107 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1108
1109 for i in 0..10 {
1110 tx.try_send(i).unwrap();
1111 }
1112
1113 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1114
1115 let collected: Vec<i32> = rx.drain().collect();
1116
1117 assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1118 assert!(rx.is_empty());
1119 }
1120
1121 #[tokio::test]
1122 async fn test_drain_empty() {
1123 let (_tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1124
1125 let collected: Vec<i32> = rx.drain().collect();
1126
1127 assert!(collected.is_empty());
1128 }
1129
1130 #[tokio::test]
1131 async fn test_drain_size_hint() {
1132 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1133
1134 for i in 0..5 {
1135 tx.try_send(i).unwrap();
1136 }
1137
1138 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1139
1140 let mut drain = rx.drain();
1141
1142 assert_eq!(drain.size_hint(), (5, Some(5)));
1143
1144 drain.next();
1145 assert_eq!(drain.size_hint(), (4, Some(4)));
1146
1147 drain.next();
1148 assert_eq!(drain.size_hint(), (3, Some(3)));
1149 }
1150
1151 #[tokio::test]
1152 async fn test_try_recv_slice() {
1153 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1154
1155 // Send some data
1156 for i in 0..10 {
1157 tx.try_send(i).unwrap();
1158 }
1159
1160 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1161
1162 let mut dest = [0u32; 5];
1163 let received = rx.try_recv_slice(&mut dest);
1164
1165 assert_eq!(received, 5);
1166 assert_eq!(dest, [0, 1, 2, 3, 4]);
1167 assert_eq!(rx.len(), 5);
1168 }
1169
1170 #[tokio::test]
1171 async fn test_try_recv_slice_partial() {
1172 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1173
1174 tx.try_send(1).unwrap();
1175 tx.try_send(2).unwrap();
1176 tx.try_send(3).unwrap();
1177
1178 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1179
1180 let mut dest = [0u32; 10];
1181 let received = rx.try_recv_slice(&mut dest);
1182
1183 assert_eq!(received, 3);
1184 assert_eq!(&dest[0..3], &[1, 2, 3]);
1185 assert!(rx.is_empty());
1186 }
1187
1188 #[tokio::test]
1189 async fn test_recv_slice() {
1190 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1191
1192 for i in 1..=10 {
1193 tx.try_send(i).unwrap();
1194 }
1195
1196 let mut dest = [0u32; 10];
1197 let received = rx.recv_slice(&mut dest).await;
1198
1199 assert_eq!(received, 10);
1200 assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1201 assert!(rx.is_empty());
1202 }
1203
1204 #[tokio::test]
1205 async fn test_recv_slice_with_wait() {
1206 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1207
1208 let recv_handle = tokio::spawn(async move {
1209 let mut dest = [0u32; 8];
1210 let received = rx.recv_slice(&mut dest).await;
1211 (received, dest)
1212 });
1213
1214 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1215
1216 // Send data gradually
1217 for i in 1..=8 {
1218 tx.send(i).await.unwrap();
1219 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1220 }
1221
1222 let (received, dest) = recv_handle.await.unwrap();
1223 assert_eq!(received, 8);
1224 assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8]);
1225 }
1226
1227 #[tokio::test]
1228 async fn test_recv_slice_channel_closed() {
1229 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1230
1231 tx.try_send(1).unwrap();
1232 tx.try_send(2).unwrap();
1233 tx.try_send(3).unwrap();
1234
1235 drop(tx); // Close the channel
1236
1237 let mut dest = [0u32; 10];
1238 let received = rx.recv_slice(&mut dest).await;
1239
1240 // Should receive the 3 available messages, then stop
1241 assert_eq!(received, 3);
1242 assert_eq!(&dest[0..3], &[1, 2, 3]);
1243 }
1244
1245 #[tokio::test]
1246 async fn test_combined_new_apis() {
1247 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1248
1249 // Batch send
1250 let data = [1, 2, 3, 4, 5];
1251 tx.try_send_slice(&data);
1252
1253 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1254
1255 assert_eq!(tx.len(), 5);
1256 assert_eq!(rx.len(), 5);
1257 assert_eq!(rx.capacity(), 16);
1258
1259 // Peek
1260 assert_eq!(rx.peek(), Some(&1));
1261
1262 // Batch receive
1263 let mut dest = [0u32; 3];
1264 rx.try_recv_slice(&mut dest);
1265 assert_eq!(dest, [1, 2, 3]);
1266
1267 assert_eq!(rx.len(), 2);
1268 assert_eq!(tx.free_slots(), 14);
1269
1270 // Drain remaining
1271 let remaining: Vec<u32> = rx.drain().collect();
1272 assert_eq!(remaining, vec![4, 5]);
1273
1274 assert!(rx.is_empty());
1275 assert!(!tx.is_full());
1276 }
1277}