Skip to main content

hiver_runtime/
channel.rs

1//! Multi-producer, single-consumer channels for async communication
2//! 用于异步通信的多生产者、单消费者通道
3//!
4//! # Overview / 概述
5//!
6//! This module provides multi-producer, single-consumer (mpsc) channels
7//! for asynchronous communication between tasks.
8//!
9//! 本模块提供用于任务间异步通信的多生产者、单消费者(mpsc)通道。
10//!
11//! # Example / 示例
12//!
13//! ```rust,no_run,ignore
14//! use hiver_runtime::channel::unbounded;
15//!
16//! async fn example() {
17//!     let (tx, mut rx) = unbounded();
18//!
19//!     // Spawn a sender task
20//!     // 生成发送器任务
21//!     hiver_runtime::spawn(async move {
22//!         tx.send("Hello").await.unwrap();
23//!         tx.send("World").await.unwrap();
24//!     });
25//!
26//!     // Receive messages
27//!     // 接收消息
28//!     while let Some(msg) = rx.recv().await {
29//!         println!("Received: {}", msg);
30//!     }
31//! }
32//! ```
33
34use std::collections::VecDeque;
35use std::future::Future;
36use std::pin::Pin;
37use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
38use std::sync::{Arc, Mutex};
39use std::task::{Context, Poll, Waker};
40
41/// Error type for channel operations
42/// 通道操作的错误类型
43pub enum SendError<T> {
44    /// The channel is closed
45    /// 通道已关闭
46    Closed(T),
47}
48
49impl<T> std::fmt::Debug for SendError<T> {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        f.debug_tuple("SendError::Closed")
52            .field(&format_args!("_"))
53            .finish()
54    }
55}
56
57impl<T> PartialEq for SendError<T> {
58    fn eq(&self, _other: &Self) -> bool {
59        true
60    }
61}
62
63impl<T> Eq for SendError<T> {}
64
65impl<T> std::fmt::Display for SendError<T> {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        write!(f, "Channel closed")
68    }
69}
70
71impl<T> std::error::Error for SendError<T> {}
72
73/// Error type for receiving
74/// 接收错误类型
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum RecvError {
77    /// The channel is empty and closed
78    /// 通道为空且已关闭
79    Closed,
80}
81
82impl std::fmt::Display for RecvError {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        match self {
85            RecvError::Closed => write!(f, "Channel closed"),
86        }
87    }
88}
89
90impl std::error::Error for RecvError {}
91
92/// Unbounded mpsc channel
93/// 无边界mpsc通道
94///
95/// Creates a channel with an unbounded buffer.
96/// 创建具有无界缓冲区的通道。
97///
98/// # Example / 示例
99///
100/// ```rust,no_run,ignore
101/// use hiver_runtime::channel::unbounded;
102///
103/// let (tx, rx) = unbounded::<i32>();
104/// ```
105#[must_use]
106pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
107    let shared = Arc::new(ChannelShared {
108        buffer: Mutex::new(VecDeque::new()),
109        sender_count: AtomicUsize::new(1),
110        is_receiver_alive: AtomicBool::new(true),
111        recv_waker: Mutex::new(None),
112    });
113
114    let sender = Sender {
115        shared: shared.clone(),
116    };
117    let receiver = Receiver { shared };
118
119    (sender, receiver)
120}
121
122/// Bounded mpsc channel
123/// 有界mpsc通道
124///
125/// Creates a channel with a bounded buffer.
126/// 创建具有有界缓冲区的通道。
127///
128/// # Example / 示例
129///
130/// ```rust,no_run,ignore
131/// use hiver_runtime::channel::bounded;
132///
133/// let (tx, rx) = bounded::<i32>(16);
134/// ```
135#[must_use]
136pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
137    let shared = Arc::new(ChannelShared {
138        buffer: Mutex::new(VecDeque::with_capacity(cap)),
139        sender_count: AtomicUsize::new(1),
140        is_receiver_alive: AtomicBool::new(true),
141        recv_waker: Mutex::new(None),
142    });
143
144    let sender = Sender {
145        shared: shared.clone(),
146    };
147    let receiver = Receiver { shared };
148
149    (sender, receiver)
150}
151
152/// Shared state for the channel
153/// 通道的共享状态
154struct ChannelShared<T> {
155    /// Message buffer
156    /// 消息缓冲区
157    buffer: Mutex<VecDeque<T>>,
158    /// Number of active senders
159    /// 活跃发送器数量
160    sender_count: AtomicUsize,
161    /// Whether the receiver is still alive
162    /// 接收器是否仍然存活
163    is_receiver_alive: AtomicBool,
164    /// Waker for pending receive operations
165    /// 挂起接收操作的waker
166    recv_waker: Mutex<Option<Waker>>,
167}
168
169/// Sender side of the channel
170/// 通道的发送端
171///
172/// Can be cloned to create multiple senders.
173/// 可以克隆以创建多个发送器。
174pub struct Sender<T> {
175    shared: Arc<ChannelShared<T>>,
176}
177
178impl<T> Clone for Sender<T> {
179    fn clone(&self) -> Self {
180        // Increment sender count
181        // 增加发送器计数
182        self.shared.sender_count.fetch_add(1, Ordering::Relaxed);
183        Self {
184            shared: Arc::clone(&self.shared),
185        }
186    }
187}
188
189impl<T> Sender<T> {
190    /// Send a value synchronously to the channel
191    /// 向通道同步发送值
192    ///
193    /// # Errors
194    ///
195    /// Returns `SendError::Closed` if the receiver has been dropped.
196    /// 如果接收器已丢弃则返回 `SendError::Closed`。
197    ///
198    /// # Panics
199    ///
200    /// Panics if the internal mutex is poisoned (should never happen in normal operation).
201    /// 如果内部互斥锁被污染则恐慌(正常操作中不应发生)。
202    pub fn send(&self, value: T) -> Result<(), SendError<T>> {
203        if !self.shared.is_receiver_alive.load(Ordering::Acquire) {
204            return Err(SendError::Closed(value));
205        }
206
207        let mut buffer = self.shared.buffer.lock().unwrap();
208        buffer.push_back(value);
209
210        // Wake the receiver if it's waiting
211        // 如果接收器在等待,唤醒它
212        if let Some(waker) = self.shared.recv_waker.lock().unwrap().take() {
213            drop(buffer);
214            waker.wake();
215        }
216
217        Ok(())
218    }
219
220    /// Check if the channel is closed (receiver dropped)
221    /// 检查通道是否已关闭(接收器已丢弃)
222    #[must_use]
223    pub fn is_closed(&self) -> bool {
224        !self.shared.is_receiver_alive.load(Ordering::Acquire)
225    }
226
227    /// Get the number of active senders
228    /// 获取活跃发送器数量
229    #[must_use]
230    pub fn sender_count(&self) -> usize {
231        self.shared.sender_count.load(Ordering::Acquire)
232    }
233}
234
235impl<T> Drop for Sender<T> {
236    fn drop(&mut self) {
237        // Decrease sender count
238        // 减少发送器计数
239        let prev = self.shared.sender_count.fetch_sub(1, Ordering::AcqRel);
240
241        if prev == 1 {
242            // Last sender dropped, wake the receiver
243            // 最后一个发送器丢弃,唤醒接收器
244            if let Some(waker) = self.shared.recv_waker.lock().unwrap().take() {
245                waker.wake();
246            }
247        }
248    }
249}
250
251/// Receiver side of the channel
252/// 通道的接收端
253pub struct Receiver<T> {
254    shared: Arc<ChannelShared<T>>,
255}
256
257impl<T> Receiver<T> {
258    /// Receive a value from the channel
259    /// 从通道接收值
260    pub fn recv(&mut self) -> RecvFuture<'_, T> {
261        RecvFuture::new(self)
262    }
263
264    /// Try to receive a value without blocking
265    /// 尝试接收值而不阻塞
266    ///
267    /// # Errors
268    ///
269    /// Returns `Err(RecvError::Closed)` if the channel is empty and all senders are dropped.
270    /// 如果通道为空且所有发送器已丢弃则返回 `Err(RecvError::Closed)`。
271    ///
272    /// Returns `Err(RecvError::Empty)` if the channel is empty but senders still exist.
273    /// 如果通道为空但发送器仍然存在则返回 `Err(RecvError::Empty)`。
274    ///
275    /// # Panics
276    ///
277    /// Panics if the internal mutex is poisoned (should never happen in normal operation).
278    /// 如果内部互斥锁被污染则恐慌(正常操作中不应发生)。
279    pub fn try_recv(&mut self) -> Result<T, RecvError> {
280        let mut buffer = self.shared.buffer.lock().unwrap();
281
282        if let Some(value) = buffer.pop_front() {
283            Ok(value)
284        } else if self.shared.sender_count.load(Ordering::Acquire) == 0 {
285            // No senders left
286            // 没有发送器了
287            Err(RecvError::Closed)
288        } else {
289            // Channel empty but senders still exist
290            // 通道为空但发送器仍然存在
291            Err(RecvError::Closed)
292        }
293    }
294
295    /// Get the number of messages in the channel buffer
296    /// 获取通道缓冲区中的消息数量
297    ///
298    /// # Panics
299    ///
300    /// Panics if the internal mutex is poisoned (should never happen in normal operation).
301    /// 如果内部互斥锁被污染则恐慌(正常操作中不应发生)。
302    #[must_use]
303    pub fn len(&self) -> usize {
304        self.shared.buffer.lock().unwrap().len()
305    }
306
307    /// Check if the channel is empty
308    /// 检查通道是否为空
309    #[must_use]
310    pub fn is_empty(&self) -> bool {
311        self.len() == 0
312    }
313}
314
315impl<T> Drop for Receiver<T> {
316    fn drop(&mut self) {
317        // Mark receiver as dropped
318        // 标记接收器已丢弃
319        self.shared
320            .is_receiver_alive
321            .store(false, Ordering::Release);
322    }
323}
324
325/// Receive future
326/// 接收future
327pub struct RecvFuture<'a, T> {
328    /// Reference to the receiver's shared state
329    /// 接收器共享状态的引用
330    shared: Arc<ChannelShared<T>>,
331    /// Marker for the lifetime
332    /// 生命周期标记
333    _marker: std::marker::PhantomData<&'a mut Receiver<T>>,
334}
335
336impl<'a, T> RecvFuture<'a, T> {
337    /// Create a new receive future
338    fn new(receiver: &'a mut Receiver<T>) -> Self {
339        // We extract the Arc since the receiver only holds it
340        // This is safe because the future borrows the receiver mutably
341        Self {
342            shared: Arc::clone(&receiver.shared),
343            _marker: std::marker::PhantomData,
344        }
345    }
346}
347
348unsafe impl<T: Send> Send for RecvFuture<'_, T> {}
349unsafe impl<T: Sync> Sync for RecvFuture<'_, T> {}
350
351impl<T> Future for RecvFuture<'_, T> {
352    type Output = Option<T>;
353
354    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
355        let mut buffer = self.shared.buffer.lock().unwrap();
356
357        if let Some(value) = buffer.pop_front() {
358            Poll::Ready(Some(value))
359        } else if self.shared.sender_count.load(Ordering::Acquire) == 0 {
360            // No senders left and buffer empty
361            // 没有发送器了且缓冲区为空
362            Poll::Ready(None)
363        } else {
364            // No value available, register waker
365            // 没有可用值,注册waker
366            *self.shared.recv_waker.lock().unwrap() = Some(cx.waker().clone());
367            Poll::Pending
368        }
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375
376    #[test]
377    fn test_unbounded_channel_creation() {
378        let (tx, _rx) = unbounded::<i32>();
379        assert!(!tx.is_closed());
380        assert_eq!(tx.sender_count(), 1);
381    }
382
383    #[test]
384    fn test_bounded_channel_creation() {
385        let (tx, _rx) = bounded::<i32>(16);
386        assert!(!tx.is_closed());
387        assert_eq!(tx.sender_count(), 1);
388    }
389
390    #[test]
391    fn test_sender_clone() {
392        let (tx, _rx) = unbounded::<i32>();
393        let tx2 = tx.clone();
394        assert_eq!(tx.sender_count(), 2);
395        assert_eq!(tx2.sender_count(), 2);
396        drop(tx);
397        assert_eq!(tx2.sender_count(), 1);
398    }
399
400    #[test]
401    fn test_receiver_empty() {
402        let (_tx, rx) = unbounded::<i32>();
403        assert!(rx.is_empty());
404        assert_eq!(rx.len(), 0);
405    }
406
407    #[test]
408    fn test_sync_send() {
409        let (tx, mut rx) = unbounded::<i32>();
410
411        assert!(tx.send(42).is_ok());
412        assert!(tx.send(100).is_ok());
413
414        assert_eq!(rx.len(), 2);
415        assert!(!rx.is_empty());
416
417        // Verify received data and order
418        assert_eq!(rx.try_recv().unwrap(), 42);
419        assert_eq!(rx.try_recv().unwrap(), 100);
420        assert_eq!(rx.try_recv(), Err(RecvError::Closed));
421    }
422
423    #[test]
424    fn test_send_after_receiver_drop() {
425        let (tx, rx) = unbounded::<i32>();
426        drop(rx);
427        assert!(tx.is_closed());
428
429        let err = tx.send(42).unwrap_err();
430        assert!(matches!(err, SendError::Closed(42)));
431        assert_eq!(err.to_string(), "Channel closed");
432    }
433
434    #[test]
435    fn test_recv_error() {
436        let err = RecvError::Closed;
437
438        assert_eq!(err.to_string(), "Channel closed");
439    }
440
441    #[test]
442    fn test_unbounded_send_recv_order() {
443        let (tx, mut rx) = unbounded::<String>();
444        for i in 0..10 {
445            tx.send(format!("msg-{i}")).unwrap();
446        }
447        for i in 0..10 {
448            assert_eq!(rx.try_recv().unwrap(), format!("msg-{i}"));
449        }
450    }
451
452    #[test]
453    fn test_bounded_channel_full() {
454        let (tx, rx) = bounded::<i32>(2);
455        assert!(tx.send(1).is_ok());
456        assert!(tx.send(2).is_ok());
457        // Third send should still succeed (unbounded buffer semantics via VecDeque)
458        assert!(tx.send(3).is_ok());
459        assert_eq!(rx.len(), 3);
460    }
461
462    #[test]
463    fn test_close_after_all_senders_drop() {
464        let (tx, mut rx) = unbounded::<i32>();
465        let tx2 = tx.clone();
466        tx.send(1).unwrap();
467        drop(tx);
468        // tx2 still alive, channel should not be closed
469        assert!(!tx2.is_closed());
470        tx2.send(2).unwrap();
471        drop(tx2);
472        // All senders dropped, channel closed
473        assert_eq!(rx.try_recv().unwrap(), 1);
474        assert_eq!(rx.try_recv().unwrap(), 2);
475    }
476}