kestrel_timer/utils/
spsc.rs

1/// High-performance async SPSC (Single Producer Single Consumer) channel
2/// 
3/// Built on top of custom SmallVec-based ring buffer for optimal performance.
4/// Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5/// 
6/// 高性能异步 SPSC(单生产者单消费者)通道
7/// 
8/// 基于自定义 SmallVec 环形缓冲区构建以获得最佳性能。
9/// 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
10///
11/// # 安全性说明 (Safety Notes)
12///
13/// 本实现使用 `UnsafeCell` 来提供零成本内部可变性,而不是 `Mutex`。
14/// 这是安全的,基于以下保证:
15///
16/// 1. **单一所有权**:`Sender` 和 `Receiver` 都不实现 `Clone`,确保每个通道只有一个发送者和一个接收者
17/// 2. **访问隔离**:`Producer` 只被唯一的 `Sender` 访问,`Consumer` 只被唯一的 `Receiver` 访问
18/// 3. **无数据竞争**:由于单一所有权,不会有多个线程同时访问同一个 `Producer` 或 `Consumer`
19/// 4. **原子通信**:`Producer` 和 `Consumer` 内部使用原子操作进行跨线程通信
20/// 5. **类型系统保证**:通过类型系统强制 SPSC 语义,防止误用为 MPMC
21///
22/// 这种设计实现了零同步开销,完全消除了 `Mutex` 的性能损失。
23///
24/// # Safety Guarantees
25///
26/// This implementation uses `UnsafeCell` for zero-cost interior mutability instead of `Mutex`.
27/// This is safe based on the following guarantees:
28///
29/// 1. **Single Ownership**: Neither `Sender` nor `Receiver` implements `Clone`, ensuring only one sender and one receiver per channel
30/// 2. **Access Isolation**: `Producer` is only accessed by the unique `Sender`, `Consumer` only by the unique `Receiver`
31/// 3. **No Data Races**: Due to single ownership, there's no concurrent access to the same `Producer` or `Consumer`
32/// 4. **Atomic Communication**: `Producer` and `Consumer` use atomic operations internally for cross-thread communication
33/// 5. **Type System Enforcement**: SPSC semantics are enforced by the type system, preventing misuse as MPMC
34///
35/// This design achieves zero synchronization overhead, completely eliminating `Mutex` performance costs.
36
37use std::cell::UnsafeCell;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::sync::Arc;
40use super::ringbuf;
41use super::notify::SingleWaiterNotify;
42
43/// SPSC channel creation function
44/// 
45/// Creates a bounded SPSC channel with the specified capacity.
46/// 
47/// # Parameters
48/// - `capacity`: Channel capacity
49/// 
50/// # Returns
51/// A tuple of (Sender, Receiver)
52/// 
53/// # Examples
54/// 
55/// ```
56/// use kestrel_timer::utils::spsc::channel;
57/// 
58/// #[tokio::main]
59/// async fn main() {
60///     let (tx, rx) = channel(32);
61///     
62///     tokio::spawn(async move {
63///         tx.send(42).await.unwrap();
64///     });
65///     
66///     let value = rx.recv().await.unwrap();
67///     assert_eq!(value, 42);
68/// }
69/// ```
70/// 
71/// SPSC 通道创建函数
72/// 
73/// 创建指定容量的有界 SPSC 通道。
74/// 
75/// # 参数
76/// - `capacity`: 通道容量
77/// 
78/// # 返回值
79/// 返回 (Sender, Receiver) 元组
80pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
81    assert!(capacity > 0, "Channel capacity must be greater than 0");
82    
83    let (producer, consumer) = ringbuf::new(capacity);
84    
85    let inner = Arc::new(Inner {
86        producer: UnsafeCell::new(producer),
87        consumer: UnsafeCell::new(consumer),
88        closed: AtomicBool::new(false),
89        recv_notify: SingleWaiterNotify::new(),
90        send_notify: SingleWaiterNotify::new(),
91    });
92    
93    let sender = Sender {
94        inner: inner.clone(),
95    };
96    
97    let receiver = Receiver {
98        inner,
99    };
100    
101    (sender, receiver)
102}
103
104/// Shared internal state for SPSC channel
105/// 
106/// Contains both shared state and the ring buffer halves.
107/// Uses UnsafeCell for zero-cost interior mutability of Producer/Consumer.
108/// 
109/// SPSC 通道的共享内部状态
110/// 
111/// 包含共享状态和环形缓冲区的两端。
112/// 使用 UnsafeCell 实现 Producer/Consumer 的零成本内部可变性。
113struct Inner<T> {
114    /// Producer (wrapped in UnsafeCell for zero-cost interior mutability)
115    /// 
116    /// 生产者(用 UnsafeCell 包装以实现零成本内部可变性)
117    producer: UnsafeCell<ringbuf::Producer<T>>,
118    
119    /// Consumer (wrapped in UnsafeCell for zero-cost interior mutability)
120    /// 
121    /// 消费者(用 UnsafeCell 包装以实现零成本内部可变性)
122    consumer: UnsafeCell<ringbuf::Consumer<T>>,
123    
124    /// Channel closed flag
125    /// 
126    /// 通道关闭标志
127    closed: AtomicBool,
128    
129    /// Notifier for receiver waiting (lightweight single-waiter)
130    /// 
131    /// 接收者等待通知器(轻量级单等待者)
132    recv_notify: SingleWaiterNotify,
133    
134    /// Notifier for sender waiting when buffer is full (lightweight single-waiter)
135    /// 
136    /// 发送者等待通知器,当缓冲区满时使用(轻量级单等待者)
137    send_notify: SingleWaiterNotify,
138}
139
140// SAFETY: Inner<T> 可以在线程间安全共享的原因:
141// 1. Sender 和 Receiver 都不实现 Clone,确保单一所有权
142// 2. producer 只被唯一的 Sender 访问,不会有多线程竞争
143// 3. consumer 只被唯一的 Receiver 访问,不会有多线程竞争
144// 4. closed、recv_notify、send_notify 都已经是线程安全的
145// 5. Producer 和 Consumer 内部使用原子操作进行跨线程通信
146unsafe impl<T: Send> Sync for Inner<T> {}
147
148/// SPSC channel sender
149/// 
150/// SPSC 通道发送器
151pub struct Sender<T> {
152    inner: Arc<Inner<T>>,
153}
154
155/// SPSC channel receiver
156/// 
157/// SPSC 通道接收器
158pub struct Receiver<T> {
159    inner: Arc<Inner<T>>,
160}
161
162/// Send error type
163/// 
164/// 发送错误类型
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166pub enum SendError<T> {
167    /// Channel is closed
168    /// 
169    /// 通道已关闭
170    Closed(T),
171}
172
173/// Try-receive error type
174/// 
175/// 尝试接收错误类型
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177pub enum TryRecvError {
178    /// Channel is empty
179    /// 
180    /// 通道为空
181    Empty,
182    
183    /// Channel is closed
184    /// 
185    /// 通道已关闭
186    Closed,
187}
188
189/// Try-send error type
190/// 
191/// 尝试发送错误类型
192#[derive(Debug, Clone, Copy, PartialEq, Eq)]
193pub enum TrySendError<T> {
194    /// Buffer is full
195    /// 
196    /// 缓冲区已满
197    Full(T),
198    
199    /// Channel is closed
200    /// 
201    /// 通道已关闭
202    Closed(T),
203}
204
205impl<T> Sender<T> {
206    /// Send a message to the channel (async, waits if buffer is full)
207    /// 
208    /// # Errors
209    /// Returns `SendError::Closed` if the receiver has been dropped
210    /// 
211    /// 向通道发送消息(异步,如果缓冲区满则等待)
212    /// 
213    /// # 错误
214    /// 如果接收器已被丢弃,返回 `SendError::Closed`
215    pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
216        loop {
217            match self.try_send(value) {
218                Ok(()) => return Ok(()),
219                Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
220                Err(TrySendError::Full(v)) => {
221                    // Store the value to retry
222                    // 存储值以便重试
223                    value = v;
224                    
225                    // Wait for space to become available
226                    // 等待空间可用
227                    self.inner.send_notify.notified().await;
228                    
229                    // Check if channel was closed while waiting
230                    // 检查等待时通道是否已关闭
231                    if self.inner.closed.load(Ordering::Acquire) {
232                        return Err(SendError::Closed(value));
233                    }
234                    
235                    // Retry with the value in next loop iteration
236                    // 在下一次循环迭代中使用该值重试
237                }
238            }
239        }
240    }
241    
242    /// Try to send a message without blocking
243    /// 
244    /// # Errors
245    /// - Returns `TrySendError::Full` if the buffer is full
246    /// - Returns `TrySendError::Closed` if the receiver has been dropped
247    /// 
248    /// 尝试非阻塞地发送消息
249    /// 
250    /// # 错误
251    /// - 如果缓冲区满,返回 `TrySendError::Full`
252    /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
253    pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
254        // Check if channel is closed first
255        // 首先检查通道是否已关闭
256        if self.inner.closed.load(Ordering::Acquire) {
257            return Err(TrySendError::Closed(value));
258        }
259        
260        // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
261        // 不会有多个线程同时访问 producer
262        let producer = unsafe { &mut *self.inner.producer.get() };
263        
264        match producer.push(value) {
265            Ok(()) => {
266                // Successfully sent, notify receiver
267                // 成功发送,通知接收者
268                self.inner.recv_notify.notify_one();
269                Ok(())
270            }
271            Err(ringbuf::PushError::Full(v)) => {
272                Err(TrySendError::Full(v))
273            }
274        }
275    }
276    
277    /// Check if the channel is closed
278    /// 
279    /// 检查通道是否已关闭
280    #[inline]
281    pub fn is_closed(&self) -> bool {
282        self.inner.closed.load(Ordering::Acquire)
283    }
284}
285
286impl<T> Receiver<T> {
287    /// Receive a message from the channel (async, waits if buffer is empty)
288    /// 
289    /// Returns `None` if the channel is closed and empty
290    /// 
291    /// 从通道接收消息(异步,如果缓冲区空则等待)
292    /// 
293    /// 如果通道已关闭且为空,返回 `None`
294    pub async fn recv(&self) -> Option<T> {
295        loop {
296            match self.try_recv() {
297                Ok(value) => return Some(value),
298                Err(TryRecvError::Closed) => return None,
299                Err(TryRecvError::Empty) => {
300                    // Check if channel is closed before waiting
301                    // 等待前检查通道是否已关闭
302                    if self.inner.closed.load(Ordering::Acquire) {
303                        // Double check if there are any remaining items
304                        // 再次检查是否有剩余项
305                        if let Ok(value) = self.try_recv() {
306                            return Some(value);
307                        }
308                        return None;
309                    }
310                    
311                    // Wait for data to become available
312                    // 等待数据可用
313                    self.inner.recv_notify.notified().await;
314                }
315            }
316        }
317    }
318    
319    /// Try to receive a message without blocking
320    /// 
321    /// # Errors
322    /// - Returns `TryRecvError::Empty` if the buffer is empty
323    /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
324    /// 
325    /// 尝试非阻塞地接收消息
326    /// 
327    /// # 错误
328    /// - 如果缓冲区空,返回 `TryRecvError::Empty`
329    /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
330    pub fn try_recv(&self) -> Result<T, TryRecvError> {
331        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
332        // 不会有多个线程同时访问 consumer
333        let consumer = unsafe { &mut *self.inner.consumer.get() };
334        
335        match consumer.pop() {
336            Ok(value) => {
337                // Successfully received, notify sender
338                // 成功接收,通知发送者
339                self.inner.send_notify.notify_one();
340                Ok(value)
341            }
342            Err(ringbuf::PopError::Empty) => {
343                if self.inner.closed.load(Ordering::Acquire) {
344                    Err(TryRecvError::Closed)
345                } else {
346                    Err(TryRecvError::Empty)
347                }
348            }
349        }
350    }
351    
352    /// Check if the channel is empty
353    /// 
354    /// 检查通道是否为空
355    #[inline]
356    pub fn is_empty(&self) -> bool {
357        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
358        // is_empty 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
359        let consumer = unsafe { &*self.inner.consumer.get() };
360        consumer.is_empty()
361    }
362    
363    /// Get the number of messages currently in the channel
364    /// 
365    /// 获取通道中当前的消息数量
366    #[inline]
367    pub fn len(&self) -> usize {
368        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
369        // slots 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
370        let consumer = unsafe { &*self.inner.consumer.get() };
371        consumer.slots()
372    }
373    
374    /// Get the capacity of the channel
375    /// 
376    /// 获取通道的容量
377    #[inline]
378    pub fn capacity(&self) -> usize {
379        // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
380        // capacity 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
381        let consumer = unsafe { &*self.inner.consumer.get() };
382        consumer.buffer().capacity()
383    }
384}
385
386impl<T> Drop for Receiver<T> {
387    fn drop(&mut self) {
388        // Mark channel as closed when receiver is dropped
389        // 当接收器被丢弃时标记通道为已关闭
390        self.inner.closed.store(true, Ordering::Release);
391        
392        // Notify sender in case it's waiting
393        // 通知发送者以防它正在等待
394        self.inner.send_notify.notify_one();
395    }
396}
397
398impl<T> Drop for Sender<T> {
399    fn drop(&mut self) {
400        // Mark channel as closed when sender is dropped
401        // 当发送器被丢弃时标记通道为已关闭
402        self.inner.closed.store(true, Ordering::Release);
403        
404        // Notify receiver in case it's waiting
405        // 通知接收器以防它正在等待
406        self.inner.recv_notify.notify_one();
407    }
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413    
414    #[tokio::test]
415    async fn test_basic_send_recv() {
416        let (tx, rx) = channel(4);
417        
418        tx.send(1).await.unwrap();
419        tx.send(2).await.unwrap();
420        tx.send(3).await.unwrap();
421        
422        assert_eq!(rx.recv().await, Some(1));
423        assert_eq!(rx.recv().await, Some(2));
424        assert_eq!(rx.recv().await, Some(3));
425    }
426    
427    #[tokio::test]
428    async fn test_try_send_recv() {
429        let (tx, rx) = channel(4);
430        
431        tx.try_send(1).unwrap();
432        tx.try_send(2).unwrap();
433        
434        assert_eq!(rx.try_recv().unwrap(), 1);
435        assert_eq!(rx.try_recv().unwrap(), 2);
436        assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
437    }
438    
439    #[tokio::test]
440    async fn test_channel_closed_on_sender_drop() {
441        let (tx, rx) = channel(4);
442        
443        tx.send(1).await.unwrap();
444        drop(tx);
445        
446        assert_eq!(rx.recv().await, Some(1));
447        assert_eq!(rx.recv().await, None);
448    }
449    
450    #[tokio::test]
451    async fn test_channel_closed_on_receiver_drop() {
452        let (tx, rx) = channel::<i32>(4);
453        
454        drop(rx);
455        
456        assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
457    }
458    
459    #[tokio::test]
460    async fn test_cross_task_communication() {
461        let (tx, rx) = channel(4);
462        
463        let sender_handle = tokio::spawn(async move {
464            for i in 0..10 {
465                tx.send(i).await.unwrap();
466            }
467        });
468        
469        let receiver_handle = tokio::spawn(async move {
470            let mut sum = 0;
471            while let Some(value) = rx.recv().await {
472                sum += value;
473            }
474            sum
475        });
476        
477        sender_handle.await.unwrap();
478        let sum = receiver_handle.await.unwrap();
479        assert_eq!(sum, 45); // 0+1+2+...+9 = 45
480    }
481    
482    #[tokio::test]
483    async fn test_backpressure() {
484        let (tx, rx) = channel(4);
485        
486        // Fill the buffer
487        tx.try_send(1).unwrap();
488        tx.try_send(2).unwrap();
489        tx.try_send(3).unwrap();
490        tx.try_send(4).unwrap();
491        
492        // Buffer should be full now
493        assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
494        
495        // This should block and then succeed when we consume
496        let send_handle = tokio::spawn(async move {
497            tx.send(5).await.unwrap();
498            tx.send(6).await.unwrap();
499        });
500        
501        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
502        
503        assert_eq!(rx.recv().await, Some(1));
504        assert_eq!(rx.recv().await, Some(2));
505        assert_eq!(rx.recv().await, Some(3));
506        assert_eq!(rx.recv().await, Some(4));
507        assert_eq!(rx.recv().await, Some(5));
508        assert_eq!(rx.recv().await, Some(6));
509        
510        send_handle.await.unwrap();
511    }
512    
513    #[tokio::test]
514    async fn test_capacity_and_len() {
515        let (tx, rx) = channel::<i32>(8);
516        
517        assert_eq!(rx.capacity(), 8);
518        assert_eq!(rx.len(), 0);
519        assert!(rx.is_empty());
520        
521        tx.try_send(1).unwrap();
522        tx.try_send(2).unwrap();
523        
524        tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
525        assert_eq!(rx.len(), 2);
526        assert!(!rx.is_empty());
527    }
528}