kestrel_timer/utils/
spsc.rs

1/// High-performance async SPSC (Single Producer Single Consumer) channel
2/// 
3/// Built on top of custom SmallVec-based ring buffer for optimal performance.
4/// Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5/// 
6/// 高性能异步 SPSC(单生产者单消费者)通道
7/// 
8/// 基于自定义 SmallVec 环形缓冲区构建以获得最佳性能。
9/// 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
10
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use tokio::sync::Notify;
14use parking_lot::Mutex;
15use super::ringbuf;
16
17/// SPSC channel creation function
18/// 
19/// Creates a bounded SPSC channel with the specified capacity.
20/// 
21/// # Parameters
22/// - `capacity`: Channel capacity
23/// 
24/// # Returns
25/// A tuple of (Sender, Receiver)
26/// 
27/// # Examples
28/// 
29/// ```
30/// use kestrel_timer::utils::spsc::channel;
31/// 
32/// #[tokio::main]
33/// async fn main() {
34///     let (tx, rx) = channel(32);
35///     
36///     tokio::spawn(async move {
37///         tx.send(42).await.unwrap();
38///     });
39///     
40///     let value = rx.recv().await.unwrap();
41///     assert_eq!(value, 42);
42/// }
43/// ```
44/// 
45/// SPSC 通道创建函数
46/// 
47/// 创建指定容量的有界 SPSC 通道。
48/// 
49/// # 参数
50/// - `capacity`: 通道容量
51/// 
52/// # 返回值
53/// 返回 (Sender, Receiver) 元组
54pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
55    assert!(capacity > 0, "Channel capacity must be greater than 0");
56    
57    let (producer, consumer) = ringbuf::new(capacity);
58    
59    let inner = Arc::new(Inner {
60        producer: Mutex::new(producer),
61        consumer: Mutex::new(consumer),
62        closed: AtomicBool::new(false),
63        recv_notify: Notify::new(),
64        send_notify: Notify::new(),
65    });
66    
67    let sender = Sender {
68        inner: inner.clone(),
69    };
70    
71    let receiver = Receiver {
72        inner,
73    };
74    
75    (sender, receiver)
76}
77
78/// Shared internal state for SPSC channel
79/// 
80/// SPSC 通道的共享内部状态
81struct Inner<T> {
82    /// Producer (wrapped in Mutex for interior mutability)
83    /// 
84    /// 生产者(用 Mutex 包装以实现内部可变性)
85    producer: Mutex<ringbuf::Producer<T>>,
86    
87    /// Consumer (wrapped in Mutex for interior mutability)
88    /// 
89    /// 消费者(用 Mutex 包装以实现内部可变性)
90    consumer: Mutex<ringbuf::Consumer<T>>,
91    
92    /// Channel closed flag
93    /// 
94    /// 通道关闭标志
95    closed: AtomicBool,
96    
97    /// Notifier for receiver waiting
98    /// 
99    /// 接收者等待通知器
100    recv_notify: Notify,
101    
102    /// Notifier for sender waiting (when buffer is full)
103    /// 
104    /// 发送者等待通知器(当缓冲区满时)
105    send_notify: Notify,
106}
107
108/// SPSC channel sender
109/// 
110/// SPSC 通道发送器
111pub struct Sender<T> {
112    inner: Arc<Inner<T>>,
113}
114
115/// SPSC channel receiver
116/// 
117/// SPSC 通道接收器
118pub struct Receiver<T> {
119    inner: Arc<Inner<T>>,
120}
121
122/// Send error type
123/// 
124/// 发送错误类型
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub enum SendError<T> {
127    /// Channel is closed
128    /// 
129    /// 通道已关闭
130    Closed(T),
131}
132
133/// Try-receive error type
134/// 
135/// 尝试接收错误类型
136#[derive(Debug, Clone, Copy, PartialEq, Eq)]
137pub enum TryRecvError {
138    /// Channel is empty
139    /// 
140    /// 通道为空
141    Empty,
142    
143    /// Channel is closed
144    /// 
145    /// 通道已关闭
146    Closed,
147}
148
149/// Try-send error type
150/// 
151/// 尝试发送错误类型
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153pub enum TrySendError<T> {
154    /// Buffer is full
155    /// 
156    /// 缓冲区已满
157    Full(T),
158    
159    /// Channel is closed
160    /// 
161    /// 通道已关闭
162    Closed(T),
163}
164
165impl<T> Sender<T> {
166    /// Send a message to the channel (async, waits if buffer is full)
167    /// 
168    /// # Errors
169    /// Returns `SendError::Closed` if the receiver has been dropped
170    /// 
171    /// 向通道发送消息(异步,如果缓冲区满则等待)
172    /// 
173    /// # 错误
174    /// 如果接收器已被丢弃,返回 `SendError::Closed`
175    pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
176        loop {
177            match self.try_send(value) {
178                Ok(()) => return Ok(()),
179                Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
180                Err(TrySendError::Full(v)) => {
181                    // Store the value to retry
182                    // 存储值以便重试
183                    value = v;
184                    
185                    // Wait for space to become available
186                    // 等待空间可用
187                    self.inner.send_notify.notified().await;
188                    
189                    // Check if channel was closed while waiting
190                    // 检查等待时通道是否已关闭
191                    if self.inner.closed.load(Ordering::Acquire) {
192                        return Err(SendError::Closed(value));
193                    }
194                    
195                    // Retry with the value in next loop iteration
196                    // 在下一次循环迭代中使用该值重试
197                }
198            }
199        }
200    }
201    
202    /// Try to send a message without blocking
203    /// 
204    /// # Errors
205    /// - Returns `TrySendError::Full` if the buffer is full
206    /// - Returns `TrySendError::Closed` if the receiver has been dropped
207    /// 
208    /// 尝试非阻塞地发送消息
209    /// 
210    /// # 错误
211    /// - 如果缓冲区满,返回 `TrySendError::Full`
212    /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
213    pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
214        // Check if channel is closed first
215        // 首先检查通道是否已关闭
216        if self.inner.closed.load(Ordering::Acquire) {
217            return Err(TrySendError::Closed(value));
218        }
219        
220        // Try to push directly to producer
221        // 直接尝试推送到生产者
222        let mut producer = self.inner.producer.lock();
223        match producer.push(value) {
224            Ok(()) => {
225                // Successfully sent, notify receiver
226                // 成功发送,通知接收者
227                self.inner.recv_notify.notify_one();
228                Ok(())
229            }
230            Err(ringbuf::PushError::Full(v)) => {
231                Err(TrySendError::Full(v))
232            }
233        }
234    }
235    
236    /// Check if the channel is closed
237    /// 
238    /// 检查通道是否已关闭
239    #[inline]
240    pub fn is_closed(&self) -> bool {
241        self.inner.closed.load(Ordering::Acquire)
242    }
243}
244
245impl<T> Receiver<T> {
246    /// Receive a message from the channel (async, waits if buffer is empty)
247    /// 
248    /// Returns `None` if the channel is closed and empty
249    /// 
250    /// 从通道接收消息(异步,如果缓冲区空则等待)
251    /// 
252    /// 如果通道已关闭且为空,返回 `None`
253    pub async fn recv(&self) -> Option<T> {
254        loop {
255            match self.try_recv() {
256                Ok(value) => return Some(value),
257                Err(TryRecvError::Closed) => return None,
258                Err(TryRecvError::Empty) => {
259                    // Check if channel is closed before waiting
260                    // 等待前检查通道是否已关闭
261                    if self.inner.closed.load(Ordering::Acquire) {
262                        // Double check if there are any remaining items
263                        // 再次检查是否有剩余项
264                        if let Ok(value) = self.try_recv() {
265                            return Some(value);
266                        }
267                        return None;
268                    }
269                    
270                    // Wait for data to become available
271                    // 等待数据可用
272                    self.inner.recv_notify.notified().await;
273                }
274            }
275        }
276    }
277    
278    /// Try to receive a message without blocking
279    /// 
280    /// # Errors
281    /// - Returns `TryRecvError::Empty` if the buffer is empty
282    /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
283    /// 
284    /// 尝试非阻塞地接收消息
285    /// 
286    /// # 错误
287    /// - 如果缓冲区空,返回 `TryRecvError::Empty`
288    /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
289    pub fn try_recv(&self) -> Result<T, TryRecvError> {
290        // Try to pop directly from consumer
291        // 直接尝试从消费者弹出
292        let mut consumer = self.inner.consumer.lock();
293        match consumer.pop() {
294            Ok(value) => {
295                // Successfully received, notify sender
296                // 成功接收,通知发送者
297                self.inner.send_notify.notify_one();
298                Ok(value)
299            }
300            Err(ringbuf::PopError::Empty) => {
301                if self.inner.closed.load(Ordering::Acquire) {
302                    Err(TryRecvError::Closed)
303                } else {
304                    Err(TryRecvError::Empty)
305                }
306            }
307        }
308    }
309    
310    /// Check if the channel is empty
311    /// 
312    /// 检查通道是否为空
313    #[inline]
314    pub fn is_empty(&self) -> bool {
315        self.inner.consumer.lock().is_empty()
316    }
317    
318    /// Get the number of messages currently in the channel
319    /// 
320    /// 获取通道中当前的消息数量
321    #[inline]
322    pub fn len(&self) -> usize {
323        self.inner.consumer.lock().slots()
324    }
325    
326    /// Get the capacity of the channel
327    /// 
328    /// 获取通道的容量
329    #[inline]
330    pub fn capacity(&self) -> usize {
331        self.inner.consumer.lock().buffer().capacity()
332    }
333}
334
335impl<T> Drop for Receiver<T> {
336    fn drop(&mut self) {
337        // Mark channel as closed when receiver is dropped
338        // 当接收器被丢弃时标记通道为已关闭
339        self.inner.closed.store(true, Ordering::Release);
340        
341        // Notify sender in case it's waiting
342        // 通知发送者以防它正在等待
343        self.inner.send_notify.notify_one();
344    }
345}
346
347impl<T> Drop for Sender<T> {
348    fn drop(&mut self) {
349        // Mark channel as closed when sender is dropped
350        // 当发送器被丢弃时标记通道为已关闭
351        self.inner.closed.store(true, Ordering::Release);
352        
353        // Notify receiver in case it's waiting
354        // 通知接收器以防它正在等待
355        self.inner.recv_notify.notify_one();
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362    
363    #[tokio::test]
364    async fn test_basic_send_recv() {
365        let (tx, rx) = channel(4);
366        
367        tx.send(1).await.unwrap();
368        tx.send(2).await.unwrap();
369        tx.send(3).await.unwrap();
370        
371        assert_eq!(rx.recv().await, Some(1));
372        assert_eq!(rx.recv().await, Some(2));
373        assert_eq!(rx.recv().await, Some(3));
374    }
375    
376    #[tokio::test]
377    async fn test_try_send_recv() {
378        let (tx, rx) = channel(4);
379        
380        tx.try_send(1).unwrap();
381        tx.try_send(2).unwrap();
382        
383        assert_eq!(rx.try_recv().unwrap(), 1);
384        assert_eq!(rx.try_recv().unwrap(), 2);
385        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
386    }
387    
388    #[tokio::test]
389    async fn test_channel_closed_on_sender_drop() {
390        let (tx, rx) = channel(4);
391        
392        tx.send(1).await.unwrap();
393        drop(tx);
394        
395        assert_eq!(rx.recv().await, Some(1));
396        assert_eq!(rx.recv().await, None);
397    }
398    
399    #[tokio::test]
400    async fn test_channel_closed_on_receiver_drop() {
401        let (tx, rx) = channel::<i32>(4);
402        
403        drop(rx);
404        
405        assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
406    }
407    
408    #[tokio::test]
409    async fn test_cross_task_communication() {
410        let (tx, rx) = channel(4);
411        
412        let sender_handle = tokio::spawn(async move {
413            for i in 0..10 {
414                tx.send(i).await.unwrap();
415            }
416        });
417        
418        let receiver_handle = tokio::spawn(async move {
419            let mut sum = 0;
420            while let Some(value) = rx.recv().await {
421                sum += value;
422            }
423            sum
424        });
425        
426        sender_handle.await.unwrap();
427        let sum = receiver_handle.await.unwrap();
428        assert_eq!(sum, 45); // 0+1+2+...+9 = 45
429    }
430    
431    #[tokio::test]
432    async fn test_backpressure() {
433        let (tx, rx) = channel(4);
434        
435        // Fill the buffer
436        tx.try_send(1).unwrap();
437        tx.try_send(2).unwrap();
438        tx.try_send(3).unwrap();
439        tx.try_send(4).unwrap();
440        
441        // Buffer should be full now
442        assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
443        
444        // This should block and then succeed when we consume
445        let send_handle = tokio::spawn(async move {
446            tx.send(5).await.unwrap();
447            tx.send(6).await.unwrap();
448        });
449        
450        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
451        
452        assert_eq!(rx.recv().await, Some(1));
453        assert_eq!(rx.recv().await, Some(2));
454        assert_eq!(rx.recv().await, Some(3));
455        assert_eq!(rx.recv().await, Some(4));
456        assert_eq!(rx.recv().await, Some(5));
457        assert_eq!(rx.recv().await, Some(6));
458        
459        send_handle.await.unwrap();
460    }
461    
462    #[tokio::test]
463    async fn test_capacity_and_len() {
464        let (tx, rx) = channel::<i32>(8);
465        
466        assert_eq!(rx.capacity(), 8);
467        assert_eq!(rx.len(), 0);
468        assert!(rx.is_empty());
469        
470        tx.try_send(1).unwrap();
471        tx.try_send(2).unwrap();
472        
473        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
474        assert_eq!(rx.len(), 2);
475        assert!(!rx.is_empty());
476    }
477}