lite_sync/spsc.rs
1/// High-performance async SPSC (Single Producer Single Consumer) channel
2///
3/// Built on top of `smallring` - a high-performance ring buffer with inline storage support.
4/// Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5/// The `N` const generic parameter allows specifying inline buffer size for zero-allocation small channels.
6///
7/// 高性能异步 SPSC(单生产者单消费者)通道
8///
9/// 基于 `smallring` 构建 - 一个支持内联存储的高性能环形缓冲区。
10/// 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
11/// `N` 常量泛型参数允许指定内联缓冲区大小,实现小容量通道的零分配。
12///
13/// # 安全性说明 (Safety Notes)
14///
15/// 本实现使用 `UnsafeCell` 来提供零成本内部可变性,而不是 `Mutex`。
16/// 这是安全的,基于以下保证:
17///
18/// 1. **单一所有权**:`Sender` 和 `Receiver` 都不实现 `Clone`,确保每个通道只有一个发送者和一个接收者
19/// 2. **访问隔离**:`Producer` 只被唯一的 `Sender` 访问,`Consumer` 只被唯一的 `Receiver` 访问
20/// 3. **无数据竞争**:由于单一所有权,不会有多个线程同时访问同一个 `Producer` 或 `Consumer`
21/// 4. **原子通信**:`Producer` 和 `Consumer` 内部使用原子操作进行跨线程通信
22/// 5. **类型系统保证**:通过类型系统强制 SPSC 语义,防止误用为 MPMC
23///
24/// 这种设计实现了零同步开销,完全消除了 `Mutex` 的性能损失。
25///
26/// # Safety Guarantees
27///
28/// This implementation uses `UnsafeCell` for zero-cost interior mutability instead of `Mutex`.
29/// This is safe based on the following guarantees:
30///
31/// 1. **Single Ownership**: Neither `Sender` nor `Receiver` implements `Clone`, ensuring only one sender and one receiver per channel
32/// 2. **Access Isolation**: `Producer` is only accessed by the unique `Sender`, `Consumer` only by the unique `Receiver`
33/// 3. **No Data Races**: Due to single ownership, there's no concurrent access to the same `Producer` or `Consumer`
34/// 4. **Atomic Communication**: `Producer` and `Consumer` use atomic operations internally for cross-thread communication
35/// 5. **Type System Enforcement**: SPSC semantics are enforced by the type system, preventing misuse as MPMC
36///
37/// This design achieves zero synchronization overhead, completely eliminating `Mutex` performance costs.
38use std::cell::UnsafeCell;
39use std::num::NonZeroUsize;
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::sync::Arc;
42use super::notify::SingleWaiterNotify;
43
44/// SPSC channel creation function
45///
46/// Creates a bounded SPSC channel with the specified capacity.
47///
48/// # Type Parameters
49/// - `T`: The type of messages to be sent through the channel
50/// - `N`: The size of the inline buffer (number of elements stored inline before heap allocation)
51///
52/// # Parameters
53/// - `capacity`: Channel capacity (total number of elements the channel can hold)
54///
55/// # Returns
56/// A tuple of (Sender, Receiver)
57///
58/// # Examples
59///
60/// ```
61/// use lite_sync::spsc::channel;
62/// use std::num::NonZeroUsize;
63///
64/// #[tokio::main]
65/// async fn main() {
66/// // Create a channel with capacity 32 and inline buffer size 8
67/// let (tx, rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
68///
69/// tokio::spawn(async move {
70/// tx.send(42).await.unwrap();
71/// });
72///
73/// let value = rx.recv().await.unwrap();
74/// assert_eq!(value, 42);
75/// }
76/// ```
77///
78/// SPSC 通道创建函数
79///
80/// 创建指定容量的有界 SPSC 通道。
81///
82/// # 类型参数
83/// - `T`: 通过通道发送的消息类型
84/// - `N`: 内联缓冲区大小(在堆分配之前内联存储的元素数量)
85///
86/// # 参数
87/// - `capacity`: 通道容量(通道可以容纳的元素总数)
88///
89/// # 返回值
90/// 返回 (Sender, Receiver) 元组
91pub fn channel<T, const N: usize>(capacity: NonZeroUsize) -> (Sender<T, N>, Receiver<T, N>) {
92 let (producer, consumer) = smallring::new::<T, N>(capacity);
93
94 let inner = Arc::new(Inner::<T, N> {
95 producer: UnsafeCell::new(producer),
96 consumer: UnsafeCell::new(consumer),
97 closed: AtomicBool::new(false),
98 recv_notify: SingleWaiterNotify::new(),
99 send_notify: SingleWaiterNotify::new(),
100 });
101
102 let sender = Sender {
103 inner: inner.clone(),
104 };
105
106 let receiver = Receiver {
107 inner,
108 };
109
110 (sender, receiver)
111}
112
113/// Shared internal state for SPSC channel
114///
115/// Contains both shared state and the ring buffer halves.
116/// Uses UnsafeCell for zero-cost interior mutability of Producer/Consumer.
117///
118/// SPSC 通道的共享内部状态
119///
120/// 包含共享状态和环形缓冲区的两端。
121/// 使用 UnsafeCell 实现 Producer/Consumer 的零成本内部可变性。
122struct Inner<T, const N: usize = 32> {
123 /// Producer (wrapped in UnsafeCell for zero-cost interior mutability)
124 ///
125 /// 生产者(用 UnsafeCell 包装以实现零成本内部可变性)
126 producer: UnsafeCell<smallring::Producer<T, N>>,
127
128 /// Consumer (wrapped in UnsafeCell for zero-cost interior mutability)
129 ///
130 /// 消费者(用 UnsafeCell 包装以实现零成本内部可变性)
131 consumer: UnsafeCell<smallring::Consumer<T, N>>,
132
133 /// Channel closed flag
134 ///
135 /// 通道关闭标志
136 closed: AtomicBool,
137
138 /// Notifier for receiver waiting (lightweight single-waiter)
139 ///
140 /// 接收者等待通知器(轻量级单等待者)
141 recv_notify: SingleWaiterNotify,
142
143 /// Notifier for sender waiting when buffer is full (lightweight single-waiter)
144 ///
145 /// 发送者等待通知器,当缓冲区满时使用(轻量级单等待者)
146 send_notify: SingleWaiterNotify,
147}
148
149// SAFETY: Inner<T> 可以在线程间安全共享的原因:
150// 1. Sender 和 Receiver 都不实现 Clone,确保单一所有权
151// 2. producer 只被唯一的 Sender 访问,不会有多线程竞争
152// 3. consumer 只被唯一的 Receiver 访问,不会有多线程竞争
153// 4. closed、recv_notify、send_notify 都已经是线程安全的
154// 5. Producer 和 Consumer 内部使用原子操作进行跨线程通信
155unsafe impl<T: Send, const N: usize> Sync for Inner<T, N> {}
156
157/// SPSC channel sender
158///
159/// SPSC 通道发送器
160pub struct Sender<T, const N: usize> {
161 inner: Arc<Inner<T, N>>,
162}
163
164/// SPSC channel receiver
165///
166/// SPSC 通道接收器
167pub struct Receiver<T, const N: usize> {
168 inner: Arc<Inner<T, N>>,
169}
170
171/// Draining iterator for the SPSC channel
172///
173/// SPSC 通道的消费迭代器
174///
175/// This iterator removes and returns messages from the channel until it's empty.
176///
177/// 此迭代器从通道中移除并返回消息,直到通道为空。
178///
179/// # Type Parameters
180/// - `T`: Message type
181/// - `N`: Inline buffer size
182///
183/// # 类型参数
184/// - `T`: 消息类型
185/// - `N`: 内联缓冲区大小
186pub struct Drain<'a, T, const N: usize> {
187 receiver: &'a mut Receiver<T, N>,
188}
189
190impl<'a, T, const N: usize> Iterator for Drain<'a, T, N> {
191 type Item = T;
192
193 #[inline]
194 fn next(&mut self) -> Option<Self::Item> {
195 self.receiver.try_recv().ok()
196 }
197
198 #[inline]
199 fn size_hint(&self) -> (usize, Option<usize>) {
200 let len = self.receiver.len();
201 (len, Some(len))
202 }
203}
204
205/// Send error type
206///
207/// 发送错误类型
208#[derive(Debug, Clone, Copy, PartialEq, Eq)]
209pub enum SendError<T> {
210 /// Channel is closed
211 ///
212 /// 通道已关闭
213 Closed(T),
214}
215
216/// Try-receive error type
217///
218/// 尝试接收错误类型
219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
220pub enum TryRecvError {
221 /// Channel is empty
222 ///
223 /// 通道为空
224 Empty,
225
226 /// Channel is closed
227 ///
228 /// 通道已关闭
229 Closed,
230}
231
232/// Try-send error type
233///
234/// 尝试发送错误类型
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236pub enum TrySendError<T> {
237 /// Buffer is full
238 ///
239 /// 缓冲区已满
240 Full(T),
241
242 /// Channel is closed
243 ///
244 /// 通道已关闭
245 Closed(T),
246}
247
248impl<T, const N: usize> Sender<T, N> {
249 /// Send a message to the channel (async, waits if buffer is full)
250 ///
251 /// # Errors
252 /// Returns `SendError::Closed` if the receiver has been dropped
253 ///
254 /// 向通道发送消息(异步,如果缓冲区满则等待)
255 ///
256 /// # 错误
257 /// 如果接收器已被丢弃,返回 `SendError::Closed`
258 pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
259 loop {
260 match self.try_send(value) {
261 Ok(()) => return Ok(()),
262 Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
263 Err(TrySendError::Full(v)) => {
264 // Store the value to retry
265 // 存储值以便重试
266 value = v;
267
268 // Wait for space to become available
269 // 等待空间可用
270 self.inner.send_notify.notified().await;
271
272 // Check if channel was closed while waiting
273 // 检查等待时通道是否已关闭
274 if self.inner.closed.load(Ordering::Acquire) {
275 return Err(SendError::Closed(value));
276 }
277
278 // Retry with the value in next loop iteration
279 // 在下一次循环迭代中使用该值重试
280 }
281 }
282 }
283 }
284
285 /// Try to send a message without blocking
286 ///
287 /// # Errors
288 /// - Returns `TrySendError::Full` if the buffer is full
289 /// - Returns `TrySendError::Closed` if the receiver has been dropped
290 ///
291 /// 尝试非阻塞地发送消息
292 ///
293 /// # 错误
294 /// - 如果缓冲区满,返回 `TrySendError::Full`
295 /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
296 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
297 // Check if channel is closed first
298 // 首先检查通道是否已关闭
299 if self.inner.closed.load(Ordering::Acquire) {
300 return Err(TrySendError::Closed(value));
301 }
302
303 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
304 // 不会有多个线程同时访问 producer
305 let producer = unsafe { &mut *self.inner.producer.get() };
306
307 match producer.push(value) {
308 Ok(()) => {
309 // Successfully sent, notify receiver
310 // 成功发送,通知接收者
311 self.inner.recv_notify.notify_one();
312 Ok(())
313 }
314 Err(smallring::PushError::Full(v)) => {
315 Err(TrySendError::Full(v))
316 }
317 }
318 }
319
320 /// Check if the channel is closed
321 ///
322 /// 检查通道是否已关闭
323 #[inline]
324 pub fn is_closed(&self) -> bool {
325 self.inner.closed.load(Ordering::Acquire)
326 }
327
328 /// Get the capacity of the channel
329 ///
330 /// 获取通道的容量
331 #[inline]
332 pub fn capacity(&self) -> usize {
333 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
334 // capacity 只读取数据,不需要可变访问
335 let producer = unsafe { &*self.inner.producer.get() };
336 producer.capacity()
337 }
338
339 /// Get the number of messages currently in the channel
340 ///
341 /// 获取通道中当前的消息数量
342 #[inline]
343 pub fn len(&self) -> usize {
344 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
345 // slots 只读取数据,不需要可变访问
346 let producer = unsafe { &*self.inner.producer.get() };
347 producer.slots()
348 }
349
350 #[inline]
351 pub fn is_empty(&self) -> bool {
352 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
353 // is_empty 只读取数据,不需要可变访问
354 let producer = unsafe { &*self.inner.producer.get() };
355 producer.is_empty()
356 }
357
358 /// Get the number of free slots in the channel
359 ///
360 /// 获取通道中的空闲空间数量
361 #[inline]
362 pub fn free_slots(&self) -> usize {
363 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
364 // free_slots 只读取数据,不需要可变访问
365 let producer = unsafe { &*self.inner.producer.get() };
366 producer.free_slots()
367 }
368
369 /// Check if the channel is full
370 ///
371 /// 检查通道是否已满
372 #[inline]
373 pub fn is_full(&self) -> bool {
374 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
375 // is_full 只读取数据,不需要可变访问
376 let producer = unsafe { &*self.inner.producer.get() };
377 producer.is_full()
378 }
379}
380
381impl<T: Copy, const N: usize> Sender<T, N> {
382 /// Try to send multiple values from a slice without blocking
383 ///
384 /// 尝试非阻塞地从切片发送多个值
385 ///
386 /// This method attempts to send as many elements as possible from the slice.
387 /// It returns the number of elements successfully sent.
388 ///
389 /// 此方法尝试从切片中发送尽可能多的元素。
390 /// 返回成功发送的元素数量。
391 ///
392 /// # Parameters
393 /// - `values`: Slice of values to send
394 ///
395 /// # Returns
396 /// Number of elements successfully sent (0 to values.len())
397 ///
398 /// # 参数
399 /// - `values`: 要发送的值的切片
400 ///
401 /// # 返回值
402 /// 成功发送的元素数量(0 到 values.len())
403 pub fn try_send_slice(&self, values: &[T]) -> usize {
404 // Check if channel is closed first
405 // 首先检查通道是否已关闭
406 if self.inner.closed.load(Ordering::Acquire) {
407 return 0;
408 }
409
410 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
411 // 不会有多个线程同时访问 producer
412 let producer = unsafe { &mut *self.inner.producer.get() };
413
414 let sent = producer.push_slice(values);
415
416 if sent > 0 {
417 // Successfully sent some messages, notify receiver
418 // 成功发送一些消息,通知接收者
419 self.inner.recv_notify.notify_one();
420 }
421
422 sent
423 }
424
425 /// Send multiple values from a slice (async, waits if buffer is full)
426 ///
427 /// 从切片发送多个值(异步,如果缓冲区满则等待)
428 ///
429 /// This method will send all elements from the slice, waiting if necessary
430 /// when the buffer becomes full. Returns the number of elements sent, or
431 /// an error if the channel is closed.
432 ///
433 /// 此方法将发送切片中的所有元素,必要时在缓冲区满时等待。
434 /// 返回发送的元素数量,如果通道关闭则返回错误。
435 ///
436 /// # Parameters
437 /// - `values`: Slice of values to send
438 ///
439 /// # Returns
440 /// - `Ok(usize)`: Number of elements successfully sent
441 /// - `Err(SendError)`: Channel is closed
442 ///
443 /// # 参数
444 /// - `values`: 要发送的值的切片
445 ///
446 /// # 返回值
447 /// - `Ok(usize)`: 成功发送的元素数量
448 /// - `Err(SendError)`: 通道已关闭
449 pub async fn send_slice(&self, values: &[T]) -> Result<usize, SendError<()>> {
450 let mut total_sent = 0;
451
452 while total_sent < values.len() {
453 // Check if channel is closed
454 // 检查通道是否已关闭
455 if self.inner.closed.load(Ordering::Acquire) {
456 return Err(SendError::Closed(()));
457 }
458
459 let sent = self.try_send_slice(&values[total_sent..]);
460 total_sent += sent;
461
462 if total_sent < values.len() {
463 // Need to wait for space
464 // 需要等待空间
465 self.inner.send_notify.notified().await;
466
467 // Check if channel was closed while waiting
468 // 检查等待时通道是否已关闭
469 if self.inner.closed.load(Ordering::Acquire) {
470 return Err(SendError::Closed(()));
471 }
472 }
473 }
474
475 Ok(total_sent)
476 }
477}
478
479impl<T, const N: usize> Receiver<T, N> {
480 /// Receive a message from the channel (async, waits if buffer is empty)
481 ///
482 /// Returns `None` if the channel is closed and empty
483 ///
484 /// 从通道接收消息(异步,如果缓冲区空则等待)
485 ///
486 /// 如果通道已关闭且为空,返回 `None`
487 pub async fn recv(&self) -> Option<T> {
488 loop {
489 match self.try_recv() {
490 Ok(value) => return Some(value),
491 Err(TryRecvError::Closed) => return None,
492 Err(TryRecvError::Empty) => {
493 // Check if channel is closed before waiting
494 // 等待前检查通道是否已关闭
495 if self.inner.closed.load(Ordering::Acquire) {
496 // Double check if there are any remaining items
497 // 再次检查是否有剩余项
498 if let Ok(value) = self.try_recv() {
499 return Some(value);
500 }
501 return None;
502 }
503
504 // Wait for data to become available
505 // 等待数据可用
506 self.inner.recv_notify.notified().await;
507 }
508 }
509 }
510 }
511
512 /// Try to receive a message without blocking
513 ///
514 /// # Errors
515 /// - Returns `TryRecvError::Empty` if the buffer is empty
516 /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
517 ///
518 /// 尝试非阻塞地接收消息
519 ///
520 /// # 错误
521 /// - 如果缓冲区空,返回 `TryRecvError::Empty`
522 /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
523 pub fn try_recv(&self) -> Result<T, TryRecvError> {
524 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
525 // 不会有多个线程同时访问 consumer
526 let consumer = unsafe { &mut *self.inner.consumer.get() };
527
528 match consumer.pop() {
529 Ok(value) => {
530 // Successfully received, notify sender
531 // 成功接收,通知发送者
532 self.inner.send_notify.notify_one();
533 Ok(value)
534 }
535 Err(smallring::PopError::Empty) => {
536 // Check if channel is closed
537 // 检查通道是否已关闭
538 if self.inner.closed.load(Ordering::Acquire) {
539 // Double-check for remaining items to avoid race condition
540 // where sender drops after push but before we check closed flag
541 // 再次检查是否有剩余项,以避免发送方在 push 后但在我们检查 closed 标志前 drop 的竞态条件
542 match consumer.pop() {
543 Ok(value) => {
544 self.inner.send_notify.notify_one();
545 Ok(value)
546 }
547 Err(smallring::PopError::Empty) => {
548 Err(TryRecvError::Closed)
549 }
550 }
551 } else {
552 Err(TryRecvError::Empty)
553 }
554 }
555 }
556 }
557
558 /// Check if the channel is empty
559 ///
560 /// 检查通道是否为空
561 #[inline]
562 pub fn is_empty(&self) -> bool {
563 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
564 // is_empty 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
565 let consumer = unsafe { &*self.inner.consumer.get() };
566 consumer.is_empty()
567 }
568
569 /// Get the number of messages currently in the channel
570 ///
571 /// 获取通道中当前的消息数量
572 #[inline]
573 pub fn len(&self) -> usize {
574 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
575 // slots 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
576 let consumer = unsafe { &*self.inner.consumer.get() };
577 consumer.slots()
578 }
579
580 /// Get the capacity of the channel
581 ///
582 /// 获取通道的容量
583 #[inline]
584 pub fn capacity(&self) -> usize {
585 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
586 // capacity 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
587 let consumer = unsafe { &*self.inner.consumer.get() };
588 consumer.buffer().capacity()
589 }
590
591 /// Peek at the first message without removing it
592 ///
593 /// 查看第一个消息但不移除它
594 ///
595 /// # Returns
596 /// `Some(&T)` if there is a message, `None` if the channel is empty
597 ///
598 /// # 返回值
599 /// 如果有消息则返回 `Some(&T)`,如果通道为空则返回 `None`
600 ///
601 /// # Safety
602 /// The returned reference is valid only as long as no other operations
603 /// are performed on the Receiver that might modify the buffer.
604 ///
605 /// # 安全性
606 /// 返回的引用仅在未对 Receiver 执行可能修改缓冲区的其他操作时有效。
607 #[inline]
608 pub fn peek(&self) -> Option<&T> {
609 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
610 // peek 只读取数据,不需要可变访问
611 let consumer = unsafe { &*self.inner.consumer.get() };
612 consumer.peek()
613 }
614
615 /// Clear all messages from the channel
616 ///
617 /// 清空通道中的所有消息
618 ///
619 /// This method pops and drops all messages currently in the channel.
620 ///
621 /// 此方法弹出并 drop 通道中当前的所有消息。
622 pub fn clear(&mut self) {
623 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
624 // 我们有可变引用,因此可以安全地访问 consumer
625 let consumer = unsafe { &mut *self.inner.consumer.get() };
626 consumer.clear();
627
628 // Notify sender that space is available
629 // 通知发送者空间可用
630 self.inner.send_notify.notify_one();
631 }
632
633 /// Create a draining iterator
634 ///
635 /// 创建一个消费迭代器
636 ///
637 /// Returns an iterator that removes and returns messages from the channel.
638 /// The iterator will continue until the channel is empty.
639 ///
640 /// 返回一个从通道中移除并返回消息的迭代器。
641 /// 迭代器将持续运行直到通道为空。
642 ///
643 /// # Examples
644 ///
645 /// ```
646 /// use lite_sync::spsc::channel;
647 /// use std::num::NonZeroUsize;
648 ///
649 /// # #[tokio::main]
650 /// # async fn main() {
651 /// let (tx, mut rx) = channel::<i32, 8>(NonZeroUsize::new(32).unwrap());
652 /// tx.try_send(1).unwrap();
653 /// tx.try_send(2).unwrap();
654 /// tx.try_send(3).unwrap();
655 ///
656 /// let items: Vec<i32> = rx.drain().collect();
657 /// assert_eq!(items, vec![1, 2, 3]);
658 /// assert!(rx.is_empty());
659 /// # }
660 /// ```
661 #[inline]
662 pub fn drain(&mut self) -> Drain<'_, T, N> {
663 Drain { receiver: self }
664 }
665}
666
667impl<T: Copy, const N: usize> Receiver<T, N> {
668 /// Try to receive multiple values into a slice without blocking
669 ///
670 /// 尝试非阻塞地将多个值接收到切片
671 ///
672 /// This method attempts to receive as many messages as possible into the provided slice.
673 /// It returns the number of messages successfully received.
674 ///
675 /// 此方法尝试将尽可能多的消息接收到提供的切片中。
676 /// 返回成功接收的消息数量。
677 ///
678 /// # Parameters
679 /// - `dest`: Destination slice to receive values into
680 ///
681 /// # Returns
682 /// Number of messages successfully received (0 to dest.len())
683 ///
684 /// # 参数
685 /// - `dest`: 用于接收值的目标切片
686 ///
687 /// # 返回值
688 /// 成功接收的消息数量(0 到 dest.len())
689 pub fn try_recv_slice(&mut self, dest: &mut [T]) -> usize {
690 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
691 // 我们有可变引用,因此可以安全地访问 consumer
692 let consumer = unsafe { &mut *self.inner.consumer.get() };
693
694 let received = consumer.pop_slice(dest);
695
696 if received > 0 {
697 // Successfully received some messages, notify sender
698 // 成功接收一些消息,通知发送者
699 self.inner.send_notify.notify_one();
700 }
701
702 received
703 }
704
705 /// Receive multiple values into a slice (async, waits if buffer is empty)
706 ///
707 /// 将多个值接收到切片(异步,如果缓冲区空则等待)
708 ///
709 /// This method will fill the destination slice as much as possible, waiting if necessary
710 /// when the buffer becomes empty. Returns the number of messages received.
711 ///
712 /// 此方法将尽可能填充目标切片,必要时在缓冲区空时等待。
713 /// 返回接收的消息数量。
714 ///
715 /// # Parameters
716 /// - `dest`: Destination slice to receive values into
717 ///
718 /// # Returns
719 /// Number of messages successfully received (0 to dest.len())
720 /// Returns 0 if the channel is closed and empty
721 ///
722 /// # 参数
723 /// - `dest`: 用于接收值的目标切片
724 ///
725 /// # 返回值
726 /// 成功接收的消息数量(0 到 dest.len())
727 /// 如果通道已关闭且为空,返回 0
728 pub async fn recv_slice(&mut self, dest: &mut [T]) -> usize {
729 let mut total_received = 0;
730
731 while total_received < dest.len() {
732 let received = self.try_recv_slice(&mut dest[total_received..]);
733 total_received += received;
734
735 if total_received < dest.len() {
736 // Check if channel is closed
737 // 检查通道是否已关闭
738 if self.inner.closed.load(Ordering::Acquire) {
739 // Channel closed, return what we have
740 // 通道已关闭,返回我们已有的内容
741 return total_received;
742 }
743
744 // Need to wait for data
745 // 需要等待数据
746 self.inner.recv_notify.notified().await;
747
748 // Check again after waking up
749 // 唤醒后再次检查
750 if self.inner.closed.load(Ordering::Acquire) {
751 // Try one more time to get remaining messages
752 // 再尝试一次获取剩余消息
753 let final_received = self.try_recv_slice(&mut dest[total_received..]);
754 total_received += final_received;
755 return total_received;
756 }
757 }
758 }
759
760 total_received
761 }
762}
763
764impl<T, const N: usize> Drop for Receiver<T, N> {
765 fn drop(&mut self) {
766 // Mark channel as closed when receiver is dropped
767 // 当接收器被丢弃时标记通道为已关闭
768 self.inner.closed.store(true, Ordering::Release);
769
770 // Notify sender in case it's waiting
771 // 通知发送者以防它正在等待
772 self.inner.send_notify.notify_one();
773 }
774}
775
776impl<T, const N: usize> Drop for Sender<T, N> {
777 fn drop(&mut self) {
778 // Mark channel as closed when sender is dropped
779 // 当发送器被丢弃时标记通道为已关闭
780 self.inner.closed.store(true, Ordering::Release);
781
782 // Notify receiver in case it's waiting
783 // 通知接收器以防它正在等待
784 self.inner.recv_notify.notify_one();
785 }
786}
787
788#[cfg(test)]
789mod tests {
790 use super::*;
791
792 #[tokio::test]
793 async fn test_basic_send_recv() {
794 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
795
796 tx.send(1).await.unwrap();
797 tx.send(2).await.unwrap();
798 tx.send(3).await.unwrap();
799
800 assert_eq!(rx.recv().await, Some(1));
801 assert_eq!(rx.recv().await, Some(2));
802 assert_eq!(rx.recv().await, Some(3));
803 }
804
805 #[tokio::test]
806 async fn test_try_send_recv() {
807 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
808
809 tx.try_send(1).unwrap();
810 tx.try_send(2).unwrap();
811
812 assert_eq!(rx.try_recv().unwrap(), 1);
813 assert_eq!(rx.try_recv().unwrap(), 2);
814 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
815 }
816
817 #[tokio::test]
818 async fn test_channel_closed_on_sender_drop() {
819 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
820
821 tx.send(1).await.unwrap();
822 drop(tx);
823
824 assert_eq!(rx.recv().await, Some(1));
825 assert_eq!(rx.recv().await, None);
826 }
827
828 #[tokio::test]
829 async fn test_channel_closed_on_receiver_drop() {
830 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
831
832 drop(rx);
833
834 assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
835 }
836
837 #[tokio::test]
838 async fn test_cross_task_communication() {
839 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
840
841 let sender_handle = tokio::spawn(async move {
842 for i in 0..10 {
843 tx.send(i).await.unwrap();
844 }
845 });
846
847 let receiver_handle = tokio::spawn(async move {
848 let mut sum = 0;
849 while let Some(value) = rx.recv().await {
850 sum += value;
851 }
852 sum
853 });
854
855 sender_handle.await.unwrap();
856 let sum = receiver_handle.await.unwrap();
857 assert_eq!(sum, 45); // 0+1+2+...+9 = 45
858 }
859
860 #[tokio::test]
861 async fn test_backpressure() {
862 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(4).unwrap());
863
864 // Fill the buffer
865 tx.try_send(1).unwrap();
866 tx.try_send(2).unwrap();
867 tx.try_send(3).unwrap();
868 tx.try_send(4).unwrap();
869
870 // Buffer should be full now
871 assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
872
873 // This should block and then succeed when we consume
874 let send_handle = tokio::spawn(async move {
875 tx.send(5).await.unwrap();
876 tx.send(6).await.unwrap();
877 });
878
879 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
880
881 assert_eq!(rx.recv().await, Some(1));
882 assert_eq!(rx.recv().await, Some(2));
883 assert_eq!(rx.recv().await, Some(3));
884 assert_eq!(rx.recv().await, Some(4));
885 assert_eq!(rx.recv().await, Some(5));
886 assert_eq!(rx.recv().await, Some(6));
887
888 send_handle.await.unwrap();
889 }
890
891 #[tokio::test]
892 async fn test_capacity_and_len() {
893 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
894
895 assert_eq!(rx.capacity(), 8);
896 assert_eq!(rx.len(), 0);
897 assert!(rx.is_empty());
898
899 tx.try_send(1).unwrap();
900 tx.try_send(2).unwrap();
901
902 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
903 assert_eq!(rx.len(), 2);
904 assert!(!rx.is_empty());
905 }
906
907 // ==================== New API Tests ====================
908
909 #[tokio::test]
910 async fn test_sender_capacity_queries() {
911 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
912
913 assert_eq!(tx.capacity(), 8);
914 assert_eq!(tx.len(), 0);
915 assert_eq!(tx.free_slots(), 8);
916 assert!(!tx.is_full());
917
918 tx.try_send(1).unwrap();
919 tx.try_send(2).unwrap();
920 tx.try_send(3).unwrap();
921
922 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
923 assert_eq!(tx.len(), 3);
924 assert_eq!(tx.free_slots(), 5);
925 assert!(!tx.is_full());
926
927 // Fill the buffer
928 tx.try_send(4).unwrap();
929 tx.try_send(5).unwrap();
930 tx.try_send(6).unwrap();
931 tx.try_send(7).unwrap();
932 tx.try_send(8).unwrap();
933
934 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
935 assert_eq!(tx.len(), 8);
936 assert_eq!(tx.free_slots(), 0);
937 assert!(tx.is_full());
938
939 // Pop one and check again
940 rx.recv().await;
941 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
942
943 assert_eq!(tx.len(), 7);
944 assert_eq!(tx.free_slots(), 1);
945 assert!(!tx.is_full());
946 }
947
948 #[tokio::test]
949 async fn test_try_send_slice() {
950 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
951
952 let data = [1, 2, 3, 4, 5];
953 let sent = tx.try_send_slice(&data);
954
955 assert_eq!(sent, 5);
956 assert_eq!(rx.len(), 5);
957
958 for i in 0..5 {
959 assert_eq!(rx.recv().await.unwrap(), data[i]);
960 }
961 }
962
963 #[tokio::test]
964 async fn test_try_send_slice_partial() {
965 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
966
967 // Fill with 5 elements, leaving room for 3
968 let initial = [1, 2, 3, 4, 5];
969 tx.try_send_slice(&initial);
970
971 // Try to send 10 more, should only send 3
972 let more = [6, 7, 8, 9, 10, 11, 12, 13, 14, 15];
973 let sent = tx.try_send_slice(&more);
974
975 assert_eq!(sent, 3);
976 assert_eq!(rx.len(), 8);
977 assert!(tx.is_full());
978
979 // Verify values
980 for i in 1..=8 {
981 assert_eq!(rx.recv().await.unwrap(), i);
982 }
983 }
984
985 #[tokio::test]
986 async fn test_send_slice() {
987 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
988
989 let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
990 let result = tx.send_slice(&data).await;
991
992 assert_eq!(result.unwrap(), 10);
993 assert_eq!(rx.len(), 10);
994
995 for i in 0..10 {
996 assert_eq!(rx.recv().await.unwrap(), data[i]);
997 }
998 }
999
1000 #[tokio::test]
1001 async fn test_send_slice_with_backpressure() {
1002 let (tx, rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1003
1004 let data = [1, 2, 3, 4, 5, 6, 7, 8];
1005
1006 let send_handle = tokio::spawn(async move {
1007 tx.send_slice(&data).await.unwrap()
1008 });
1009
1010 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1011
1012 // Consume some messages to make room
1013 for i in 1..=4 {
1014 assert_eq!(rx.recv().await.unwrap(), i);
1015 }
1016
1017 let sent = send_handle.await.unwrap();
1018 assert_eq!(sent, 8);
1019
1020 // Verify remaining messages
1021 for i in 5..=8 {
1022 assert_eq!(rx.recv().await.unwrap(), i);
1023 }
1024 }
1025
1026 #[tokio::test]
1027 async fn test_peek() {
1028 let (tx, rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1029
1030 // Peek empty buffer
1031 assert!(rx.peek().is_none());
1032
1033 tx.try_send(42).unwrap();
1034 tx.try_send(100).unwrap();
1035 tx.try_send(200).unwrap();
1036
1037 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1038
1039 // Peek should return first element without removing it
1040 assert_eq!(rx.peek(), Some(&42));
1041 assert_eq!(rx.peek(), Some(&42)); // Peek again, should be same
1042 assert_eq!(rx.len(), 3); // Length unchanged
1043 }
1044
1045 #[tokio::test]
1046 async fn test_peek_after_recv() {
1047 let (tx, rx) = channel::<String, 32>(NonZeroUsize::new(8).unwrap());
1048
1049 tx.try_send("first".to_string()).unwrap();
1050 tx.try_send("second".to_string()).unwrap();
1051 tx.try_send("third".to_string()).unwrap();
1052
1053 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1054
1055 assert_eq!(rx.peek(), Some(&"first".to_string()));
1056 rx.recv().await.unwrap();
1057
1058 assert_eq!(rx.peek(), Some(&"second".to_string()));
1059 rx.recv().await.unwrap();
1060
1061 assert_eq!(rx.peek(), Some(&"third".to_string()));
1062 rx.recv().await.unwrap();
1063
1064 assert!(rx.peek().is_none());
1065 }
1066
1067 #[tokio::test]
1068 async fn test_clear() {
1069 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1070
1071 for i in 0..10 {
1072 tx.try_send(i).unwrap();
1073 }
1074
1075 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1076 assert_eq!(rx.len(), 10);
1077
1078 rx.clear();
1079
1080 assert_eq!(rx.len(), 0);
1081 assert!(rx.is_empty());
1082 }
1083
1084 #[tokio::test]
1085 async fn test_clear_with_drop() {
1086 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
1087 use std::sync::Arc;
1088
1089 #[derive(Debug)]
1090 struct DropCounter {
1091 counter: Arc<AtomicUsize>,
1092 }
1093
1094 impl Drop for DropCounter {
1095 fn drop(&mut self) {
1096 self.counter.fetch_add(1, AtomicOrdering::SeqCst);
1097 }
1098 }
1099
1100 let counter = Arc::new(AtomicUsize::new(0));
1101
1102 {
1103 let (tx, mut rx) = channel::<DropCounter, 32>(NonZeroUsize::new(16).unwrap());
1104
1105 for _ in 0..8 {
1106 tx.try_send(DropCounter { counter: counter.clone() }).unwrap();
1107 }
1108
1109 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1110 assert_eq!(counter.load(AtomicOrdering::SeqCst), 0);
1111
1112 rx.clear();
1113
1114 assert_eq!(counter.load(AtomicOrdering::SeqCst), 8);
1115 }
1116 }
1117
1118 #[tokio::test]
1119 async fn test_drain() {
1120 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1121
1122 for i in 0..10 {
1123 tx.try_send(i).unwrap();
1124 }
1125
1126 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1127
1128 let collected: Vec<i32> = rx.drain().collect();
1129
1130 assert_eq!(collected, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1131 assert!(rx.is_empty());
1132 }
1133
1134 #[tokio::test]
1135 async fn test_drain_empty() {
1136 let (_tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(8).unwrap());
1137
1138 let collected: Vec<i32> = rx.drain().collect();
1139
1140 assert!(collected.is_empty());
1141 }
1142
1143 #[tokio::test]
1144 async fn test_drain_size_hint() {
1145 let (tx, mut rx) = channel::<i32, 32>(NonZeroUsize::new(16).unwrap());
1146
1147 for i in 0..5 {
1148 tx.try_send(i).unwrap();
1149 }
1150
1151 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1152
1153 let mut drain = rx.drain();
1154
1155 assert_eq!(drain.size_hint(), (5, Some(5)));
1156
1157 drain.next();
1158 assert_eq!(drain.size_hint(), (4, Some(4)));
1159
1160 drain.next();
1161 assert_eq!(drain.size_hint(), (3, Some(3)));
1162 }
1163
1164 #[tokio::test]
1165 async fn test_try_recv_slice() {
1166 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1167
1168 // Send some data
1169 for i in 0..10 {
1170 tx.try_send(i).unwrap();
1171 }
1172
1173 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1174
1175 let mut dest = [0u32; 5];
1176 let received = rx.try_recv_slice(&mut dest);
1177
1178 assert_eq!(received, 5);
1179 assert_eq!(dest, [0, 1, 2, 3, 4]);
1180 assert_eq!(rx.len(), 5);
1181 }
1182
1183 #[tokio::test]
1184 async fn test_try_recv_slice_partial() {
1185 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1186
1187 tx.try_send(1).unwrap();
1188 tx.try_send(2).unwrap();
1189 tx.try_send(3).unwrap();
1190
1191 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1192
1193 let mut dest = [0u32; 10];
1194 let received = rx.try_recv_slice(&mut dest);
1195
1196 assert_eq!(received, 3);
1197 assert_eq!(&dest[0..3], &[1, 2, 3]);
1198 assert!(rx.is_empty());
1199 }
1200
1201 #[tokio::test]
1202 async fn test_recv_slice() {
1203 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1204
1205 for i in 1..=10 {
1206 tx.try_send(i).unwrap();
1207 }
1208
1209 let mut dest = [0u32; 10];
1210 let received = rx.recv_slice(&mut dest).await;
1211
1212 assert_eq!(received, 10);
1213 assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
1214 assert!(rx.is_empty());
1215 }
1216
1217 #[tokio::test]
1218 async fn test_recv_slice_with_wait() {
1219 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(4).unwrap());
1220
1221 let recv_handle = tokio::spawn(async move {
1222 let mut dest = [0u32; 8];
1223 let received = rx.recv_slice(&mut dest).await;
1224 (received, dest)
1225 });
1226
1227 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1228
1229 // Send data gradually
1230 for i in 1..=8 {
1231 tx.send(i).await.unwrap();
1232 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1233 }
1234
1235 let (received, dest) = recv_handle.await.unwrap();
1236 assert_eq!(received, 8);
1237 assert_eq!(dest, [1, 2, 3, 4, 5, 6, 7, 8]);
1238 }
1239
1240 #[tokio::test]
1241 async fn test_recv_slice_channel_closed() {
1242 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(8).unwrap());
1243
1244 tx.try_send(1).unwrap();
1245 tx.try_send(2).unwrap();
1246 tx.try_send(3).unwrap();
1247
1248 drop(tx); // Close the channel
1249
1250 let mut dest = [0u32; 10];
1251 let received = rx.recv_slice(&mut dest).await;
1252
1253 // Should receive the 3 available messages, then stop
1254 assert_eq!(received, 3);
1255 assert_eq!(&dest[0..3], &[1, 2, 3]);
1256 }
1257
1258 #[tokio::test]
1259 async fn test_combined_new_apis() {
1260 let (tx, mut rx) = channel::<u32, 32>(NonZeroUsize::new(16).unwrap());
1261
1262 // Batch send
1263 let data = [1, 2, 3, 4, 5];
1264 tx.try_send_slice(&data);
1265
1266 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
1267
1268 assert_eq!(tx.len(), 5);
1269 assert_eq!(rx.len(), 5);
1270 assert_eq!(rx.capacity(), 16);
1271
1272 // Peek
1273 assert_eq!(rx.peek(), Some(&1));
1274
1275 // Batch receive
1276 let mut dest = [0u32; 3];
1277 rx.try_recv_slice(&mut dest);
1278 assert_eq!(dest, [1, 2, 3]);
1279
1280 assert_eq!(rx.len(), 2);
1281 assert_eq!(tx.free_slots(), 14);
1282
1283 // Drain remaining
1284 let remaining: Vec<u32> = rx.drain().collect();
1285 assert_eq!(remaining, vec![4, 5]);
1286
1287 assert!(rx.is_empty());
1288 assert!(!tx.is_full());
1289 }
1290}