kestrel_timer/utils/spsc.rs
1/// High-performance async SPSC (Single Producer Single Consumer) channel
2///
3/// Built on top of custom SmallVec-based ring buffer for optimal performance.
4/// Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5///
6/// 高性能异步 SPSC(单生产者单消费者)通道
7///
8/// 基于自定义 SmallVec 环形缓冲区构建以获得最佳性能。
9/// 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
10///
11/// # 安全性说明 (Safety Notes)
12///
13/// 本实现使用 `UnsafeCell` 来提供零成本内部可变性,而不是 `Mutex`。
14/// 这是安全的,基于以下保证:
15///
16/// 1. **单一所有权**:`Sender` 和 `Receiver` 都不实现 `Clone`,确保每个通道只有一个发送者和一个接收者
17/// 2. **访问隔离**:`Producer` 只被唯一的 `Sender` 访问,`Consumer` 只被唯一的 `Receiver` 访问
18/// 3. **无数据竞争**:由于单一所有权,不会有多个线程同时访问同一个 `Producer` 或 `Consumer`
19/// 4. **原子通信**:`Producer` 和 `Consumer` 内部使用原子操作进行跨线程通信
20/// 5. **类型系统保证**:通过类型系统强制 SPSC 语义,防止误用为 MPMC
21///
22/// 这种设计实现了零同步开销,完全消除了 `Mutex` 的性能损失。
23///
24/// # Safety Guarantees
25///
26/// This implementation uses `UnsafeCell` for zero-cost interior mutability instead of `Mutex`.
27/// This is safe based on the following guarantees:
28///
29/// 1. **Single Ownership**: Neither `Sender` nor `Receiver` implements `Clone`, ensuring only one sender and one receiver per channel
30/// 2. **Access Isolation**: `Producer` is only accessed by the unique `Sender`, `Consumer` only by the unique `Receiver`
31/// 3. **No Data Races**: Due to single ownership, there's no concurrent access to the same `Producer` or `Consumer`
32/// 4. **Atomic Communication**: `Producer` and `Consumer` use atomic operations internally for cross-thread communication
33/// 5. **Type System Enforcement**: SPSC semantics are enforced by the type system, preventing misuse as MPMC
34///
35/// This design achieves zero synchronization overhead, completely eliminating `Mutex` performance costs.
36
37use std::cell::UnsafeCell;
38use std::sync::atomic::{AtomicBool, Ordering};
39use std::sync::Arc;
40use super::ringbuf;
41use super::notify::SingleWaiterNotify;
42
43/// SPSC channel creation function
44///
45/// Creates a bounded SPSC channel with the specified capacity.
46///
47/// # Parameters
48/// - `capacity`: Channel capacity
49///
50/// # Returns
51/// A tuple of (Sender, Receiver)
52///
53/// # Examples
54///
55/// ```
56/// use kestrel_timer::utils::spsc::channel;
57///
58/// #[tokio::main]
59/// async fn main() {
60/// let (tx, rx) = channel(32);
61///
62/// tokio::spawn(async move {
63/// tx.send(42).await.unwrap();
64/// });
65///
66/// let value = rx.recv().await.unwrap();
67/// assert_eq!(value, 42);
68/// }
69/// ```
70///
71/// SPSC 通道创建函数
72///
73/// 创建指定容量的有界 SPSC 通道。
74///
75/// # 参数
76/// - `capacity`: 通道容量
77///
78/// # 返回值
79/// 返回 (Sender, Receiver) 元组
80pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
81 assert!(capacity > 0, "Channel capacity must be greater than 0");
82
83 let (producer, consumer) = ringbuf::new(capacity);
84
85 let inner = Arc::new(Inner {
86 producer: UnsafeCell::new(producer),
87 consumer: UnsafeCell::new(consumer),
88 closed: AtomicBool::new(false),
89 recv_notify: SingleWaiterNotify::new(),
90 send_notify: SingleWaiterNotify::new(),
91 });
92
93 let sender = Sender {
94 inner: inner.clone(),
95 };
96
97 let receiver = Receiver {
98 inner,
99 };
100
101 (sender, receiver)
102}
103
104/// Shared internal state for SPSC channel
105///
106/// Contains both shared state and the ring buffer halves.
107/// Uses UnsafeCell for zero-cost interior mutability of Producer/Consumer.
108///
109/// SPSC 通道的共享内部状态
110///
111/// 包含共享状态和环形缓冲区的两端。
112/// 使用 UnsafeCell 实现 Producer/Consumer 的零成本内部可变性。
113struct Inner<T> {
114 /// Producer (wrapped in UnsafeCell for zero-cost interior mutability)
115 ///
116 /// 生产者(用 UnsafeCell 包装以实现零成本内部可变性)
117 producer: UnsafeCell<ringbuf::Producer<T>>,
118
119 /// Consumer (wrapped in UnsafeCell for zero-cost interior mutability)
120 ///
121 /// 消费者(用 UnsafeCell 包装以实现零成本内部可变性)
122 consumer: UnsafeCell<ringbuf::Consumer<T>>,
123
124 /// Channel closed flag
125 ///
126 /// 通道关闭标志
127 closed: AtomicBool,
128
129 /// Notifier for receiver waiting (lightweight single-waiter)
130 ///
131 /// 接收者等待通知器(轻量级单等待者)
132 recv_notify: SingleWaiterNotify,
133
134 /// Notifier for sender waiting when buffer is full (lightweight single-waiter)
135 ///
136 /// 发送者等待通知器,当缓冲区满时使用(轻量级单等待者)
137 send_notify: SingleWaiterNotify,
138}
139
140// SAFETY: Inner<T> 可以在线程间安全共享的原因:
141// 1. Sender 和 Receiver 都不实现 Clone,确保单一所有权
142// 2. producer 只被唯一的 Sender 访问,不会有多线程竞争
143// 3. consumer 只被唯一的 Receiver 访问,不会有多线程竞争
144// 4. closed、recv_notify、send_notify 都已经是线程安全的
145// 5. Producer 和 Consumer 内部使用原子操作进行跨线程通信
146unsafe impl<T: Send> Sync for Inner<T> {}
147
148/// SPSC channel sender
149///
150/// SPSC 通道发送器
151pub struct Sender<T> {
152 inner: Arc<Inner<T>>,
153}
154
155/// SPSC channel receiver
156///
157/// SPSC 通道接收器
158pub struct Receiver<T> {
159 inner: Arc<Inner<T>>,
160}
161
162/// Send error type
163///
164/// 发送错误类型
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166pub enum SendError<T> {
167 /// Channel is closed
168 ///
169 /// 通道已关闭
170 Closed(T),
171}
172
173/// Try-receive error type
174///
175/// 尝试接收错误类型
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177pub enum TryRecvError {
178 /// Channel is empty
179 ///
180 /// 通道为空
181 Empty,
182
183 /// Channel is closed
184 ///
185 /// 通道已关闭
186 Closed,
187}
188
189/// Try-send error type
190///
191/// 尝试发送错误类型
192#[derive(Debug, Clone, Copy, PartialEq, Eq)]
193pub enum TrySendError<T> {
194 /// Buffer is full
195 ///
196 /// 缓冲区已满
197 Full(T),
198
199 /// Channel is closed
200 ///
201 /// 通道已关闭
202 Closed(T),
203}
204
205impl<T> Sender<T> {
206 /// Send a message to the channel (async, waits if buffer is full)
207 ///
208 /// # Errors
209 /// Returns `SendError::Closed` if the receiver has been dropped
210 ///
211 /// 向通道发送消息(异步,如果缓冲区满则等待)
212 ///
213 /// # 错误
214 /// 如果接收器已被丢弃,返回 `SendError::Closed`
215 pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
216 loop {
217 match self.try_send(value) {
218 Ok(()) => return Ok(()),
219 Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
220 Err(TrySendError::Full(v)) => {
221 // Store the value to retry
222 // 存储值以便重试
223 value = v;
224
225 // Wait for space to become available
226 // 等待空间可用
227 self.inner.send_notify.notified().await;
228
229 // Check if channel was closed while waiting
230 // 检查等待时通道是否已关闭
231 if self.inner.closed.load(Ordering::Acquire) {
232 return Err(SendError::Closed(value));
233 }
234
235 // Retry with the value in next loop iteration
236 // 在下一次循环迭代中使用该值重试
237 }
238 }
239 }
240 }
241
242 /// Try to send a message without blocking
243 ///
244 /// # Errors
245 /// - Returns `TrySendError::Full` if the buffer is full
246 /// - Returns `TrySendError::Closed` if the receiver has been dropped
247 ///
248 /// 尝试非阻塞地发送消息
249 ///
250 /// # 错误
251 /// - 如果缓冲区满,返回 `TrySendError::Full`
252 /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
253 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
254 // Check if channel is closed first
255 // 首先检查通道是否已关闭
256 if self.inner.closed.load(Ordering::Acquire) {
257 return Err(TrySendError::Closed(value));
258 }
259
260 // SAFETY: Sender 不实现 Clone,因此只有一个 Sender 实例
261 // 不会有多个线程同时访问 producer
262 let producer = unsafe { &mut *self.inner.producer.get() };
263
264 match producer.push(value) {
265 Ok(()) => {
266 // Successfully sent, notify receiver
267 // 成功发送,通知接收者
268 self.inner.recv_notify.notify_one();
269 Ok(())
270 }
271 Err(ringbuf::PushError::Full(v)) => {
272 Err(TrySendError::Full(v))
273 }
274 }
275 }
276
277 /// Check if the channel is closed
278 ///
279 /// 检查通道是否已关闭
280 #[inline]
281 pub fn is_closed(&self) -> bool {
282 self.inner.closed.load(Ordering::Acquire)
283 }
284}
285
286impl<T> Receiver<T> {
287 /// Receive a message from the channel (async, waits if buffer is empty)
288 ///
289 /// Returns `None` if the channel is closed and empty
290 ///
291 /// 从通道接收消息(异步,如果缓冲区空则等待)
292 ///
293 /// 如果通道已关闭且为空,返回 `None`
294 pub async fn recv(&self) -> Option<T> {
295 loop {
296 match self.try_recv() {
297 Ok(value) => return Some(value),
298 Err(TryRecvError::Closed) => return None,
299 Err(TryRecvError::Empty) => {
300 // Check if channel is closed before waiting
301 // 等待前检查通道是否已关闭
302 if self.inner.closed.load(Ordering::Acquire) {
303 // Double check if there are any remaining items
304 // 再次检查是否有剩余项
305 if let Ok(value) = self.try_recv() {
306 return Some(value);
307 }
308 return None;
309 }
310
311 // Wait for data to become available
312 // 等待数据可用
313 self.inner.recv_notify.notified().await;
314 }
315 }
316 }
317 }
318
319 /// Try to receive a message without blocking
320 ///
321 /// # Errors
322 /// - Returns `TryRecvError::Empty` if the buffer is empty
323 /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
324 ///
325 /// 尝试非阻塞地接收消息
326 ///
327 /// # 错误
328 /// - 如果缓冲区空,返回 `TryRecvError::Empty`
329 /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
330 pub fn try_recv(&self) -> Result<T, TryRecvError> {
331 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
332 // 不会有多个线程同时访问 consumer
333 let consumer = unsafe { &mut *self.inner.consumer.get() };
334
335 match consumer.pop() {
336 Ok(value) => {
337 // Successfully received, notify sender
338 // 成功接收,通知发送者
339 self.inner.send_notify.notify_one();
340 Ok(value)
341 }
342 Err(ringbuf::PopError::Empty) => {
343 if self.inner.closed.load(Ordering::Acquire) {
344 Err(TryRecvError::Closed)
345 } else {
346 Err(TryRecvError::Empty)
347 }
348 }
349 }
350 }
351
352 /// Check if the channel is empty
353 ///
354 /// 检查通道是否为空
355 #[inline]
356 pub fn is_empty(&self) -> bool {
357 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
358 // is_empty 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
359 let consumer = unsafe { &*self.inner.consumer.get() };
360 consumer.is_empty()
361 }
362
363 /// Get the number of messages currently in the channel
364 ///
365 /// 获取通道中当前的消息数量
366 #[inline]
367 pub fn len(&self) -> usize {
368 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
369 // slots 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
370 let consumer = unsafe { &*self.inner.consumer.get() };
371 consumer.slots()
372 }
373
374 /// Get the capacity of the channel
375 ///
376 /// 获取通道的容量
377 #[inline]
378 pub fn capacity(&self) -> usize {
379 // SAFETY: Receiver 不实现 Clone,因此只有一个 Receiver 实例
380 // capacity 只读取数据,不需要可变访问,但我们通过 UnsafeCell 访问以保持一致性
381 let consumer = unsafe { &*self.inner.consumer.get() };
382 consumer.buffer().capacity()
383 }
384}
385
386impl<T> Drop for Receiver<T> {
387 fn drop(&mut self) {
388 // Mark channel as closed when receiver is dropped
389 // 当接收器被丢弃时标记通道为已关闭
390 self.inner.closed.store(true, Ordering::Release);
391
392 // Notify sender in case it's waiting
393 // 通知发送者以防它正在等待
394 self.inner.send_notify.notify_one();
395 }
396}
397
398impl<T> Drop for Sender<T> {
399 fn drop(&mut self) {
400 // Mark channel as closed when sender is dropped
401 // 当发送器被丢弃时标记通道为已关闭
402 self.inner.closed.store(true, Ordering::Release);
403
404 // Notify receiver in case it's waiting
405 // 通知接收器以防它正在等待
406 self.inner.recv_notify.notify_one();
407 }
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413
414 #[tokio::test]
415 async fn test_basic_send_recv() {
416 let (tx, rx) = channel(4);
417
418 tx.send(1).await.unwrap();
419 tx.send(2).await.unwrap();
420 tx.send(3).await.unwrap();
421
422 assert_eq!(rx.recv().await, Some(1));
423 assert_eq!(rx.recv().await, Some(2));
424 assert_eq!(rx.recv().await, Some(3));
425 }
426
427 #[tokio::test]
428 async fn test_try_send_recv() {
429 let (tx, rx) = channel(4);
430
431 tx.try_send(1).unwrap();
432 tx.try_send(2).unwrap();
433
434 assert_eq!(rx.try_recv().unwrap(), 1);
435 assert_eq!(rx.try_recv().unwrap(), 2);
436 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
437 }
438
439 #[tokio::test]
440 async fn test_channel_closed_on_sender_drop() {
441 let (tx, rx) = channel(4);
442
443 tx.send(1).await.unwrap();
444 drop(tx);
445
446 assert_eq!(rx.recv().await, Some(1));
447 assert_eq!(rx.recv().await, None);
448 }
449
450 #[tokio::test]
451 async fn test_channel_closed_on_receiver_drop() {
452 let (tx, rx) = channel::<i32>(4);
453
454 drop(rx);
455
456 assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
457 }
458
459 #[tokio::test]
460 async fn test_cross_task_communication() {
461 let (tx, rx) = channel(4);
462
463 let sender_handle = tokio::spawn(async move {
464 for i in 0..10 {
465 tx.send(i).await.unwrap();
466 }
467 });
468
469 let receiver_handle = tokio::spawn(async move {
470 let mut sum = 0;
471 while let Some(value) = rx.recv().await {
472 sum += value;
473 }
474 sum
475 });
476
477 sender_handle.await.unwrap();
478 let sum = receiver_handle.await.unwrap();
479 assert_eq!(sum, 45); // 0+1+2+...+9 = 45
480 }
481
482 #[tokio::test]
483 async fn test_backpressure() {
484 let (tx, rx) = channel(4);
485
486 // Fill the buffer
487 tx.try_send(1).unwrap();
488 tx.try_send(2).unwrap();
489 tx.try_send(3).unwrap();
490 tx.try_send(4).unwrap();
491
492 // Buffer should be full now
493 assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
494
495 // This should block and then succeed when we consume
496 let send_handle = tokio::spawn(async move {
497 tx.send(5).await.unwrap();
498 tx.send(6).await.unwrap();
499 });
500
501 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
502
503 assert_eq!(rx.recv().await, Some(1));
504 assert_eq!(rx.recv().await, Some(2));
505 assert_eq!(rx.recv().await, Some(3));
506 assert_eq!(rx.recv().await, Some(4));
507 assert_eq!(rx.recv().await, Some(5));
508 assert_eq!(rx.recv().await, Some(6));
509
510 send_handle.await.unwrap();
511 }
512
513 #[tokio::test]
514 async fn test_capacity_and_len() {
515 let (tx, rx) = channel::<i32>(8);
516
517 assert_eq!(rx.capacity(), 8);
518 assert_eq!(rx.len(), 0);
519 assert!(rx.is_empty());
520
521 tx.try_send(1).unwrap();
522 tx.try_send(2).unwrap();
523
524 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
525 assert_eq!(rx.len(), 2);
526 assert!(!rx.is_empty());
527 }
528}