kestrel_timer/utils/spsc.rs
1/// High-performance async SPSC (Single Producer Single Consumer) channel
2///
3/// Built on top of custom SmallVec-based ring buffer for optimal performance.
4/// Optimized for low latency and fast creation, designed to replace tokio mpsc in timer implementation.
5///
6/// 高性能异步 SPSC(单生产者单消费者)通道
7///
8/// 基于自定义 SmallVec 环形缓冲区构建以获得最佳性能。
9/// 针对低延迟和快速创建进行优化,用于替代定时器实现中的 tokio mpsc。
10
11use std::sync::atomic::{AtomicBool, Ordering};
12use std::sync::Arc;
13use tokio::sync::Notify;
14use parking_lot::Mutex;
15use super::ringbuf;
16
17/// SPSC channel creation function
18///
19/// Creates a bounded SPSC channel with the specified capacity.
20///
21/// # Parameters
22/// - `capacity`: Channel capacity
23///
24/// # Returns
25/// A tuple of (Sender, Receiver)
26///
27/// # Examples
28///
29/// ```
30/// use kestrel_timer::utils::spsc::channel;
31///
32/// #[tokio::main]
33/// async fn main() {
34/// let (tx, rx) = channel(32);
35///
36/// tokio::spawn(async move {
37/// tx.send(42).await.unwrap();
38/// });
39///
40/// let value = rx.recv().await.unwrap();
41/// assert_eq!(value, 42);
42/// }
43/// ```
44///
45/// SPSC 通道创建函数
46///
47/// 创建指定容量的有界 SPSC 通道。
48///
49/// # 参数
50/// - `capacity`: 通道容量
51///
52/// # 返回值
53/// 返回 (Sender, Receiver) 元组
54pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
55 assert!(capacity > 0, "Channel capacity must be greater than 0");
56
57 let (producer, consumer) = ringbuf::new(capacity);
58
59 let inner = Arc::new(Inner {
60 producer: Mutex::new(producer),
61 consumer: Mutex::new(consumer),
62 closed: AtomicBool::new(false),
63 recv_notify: Notify::new(),
64 send_notify: Notify::new(),
65 });
66
67 let sender = Sender {
68 inner: inner.clone(),
69 };
70
71 let receiver = Receiver {
72 inner,
73 };
74
75 (sender, receiver)
76}
77
78/// Shared internal state for SPSC channel
79///
80/// SPSC 通道的共享内部状态
81struct Inner<T> {
82 /// Producer (wrapped in Mutex for interior mutability)
83 ///
84 /// 生产者(用 Mutex 包装以实现内部可变性)
85 producer: Mutex<ringbuf::Producer<T>>,
86
87 /// Consumer (wrapped in Mutex for interior mutability)
88 ///
89 /// 消费者(用 Mutex 包装以实现内部可变性)
90 consumer: Mutex<ringbuf::Consumer<T>>,
91
92 /// Channel closed flag
93 ///
94 /// 通道关闭标志
95 closed: AtomicBool,
96
97 /// Notifier for receiver waiting
98 ///
99 /// 接收者等待通知器
100 recv_notify: Notify,
101
102 /// Notifier for sender waiting (when buffer is full)
103 ///
104 /// 发送者等待通知器(当缓冲区满时)
105 send_notify: Notify,
106}
107
108/// SPSC channel sender
109///
110/// SPSC 通道发送器
111pub struct Sender<T> {
112 inner: Arc<Inner<T>>,
113}
114
115/// SPSC channel receiver
116///
117/// SPSC 通道接收器
118pub struct Receiver<T> {
119 inner: Arc<Inner<T>>,
120}
121
122/// Send error type
123///
124/// 发送错误类型
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub enum SendError<T> {
127 /// Channel is closed
128 ///
129 /// 通道已关闭
130 Closed(T),
131}
132
133/// Try-receive error type
134///
135/// 尝试接收错误类型
136#[derive(Debug, Clone, Copy, PartialEq, Eq)]
137pub enum TryRecvError {
138 /// Channel is empty
139 ///
140 /// 通道为空
141 Empty,
142
143 /// Channel is closed
144 ///
145 /// 通道已关闭
146 Closed,
147}
148
149/// Try-send error type
150///
151/// 尝试发送错误类型
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
153pub enum TrySendError<T> {
154 /// Buffer is full
155 ///
156 /// 缓冲区已满
157 Full(T),
158
159 /// Channel is closed
160 ///
161 /// 通道已关闭
162 Closed(T),
163}
164
165impl<T> Sender<T> {
166 /// Send a message to the channel (async, waits if buffer is full)
167 ///
168 /// # Errors
169 /// Returns `SendError::Closed` if the receiver has been dropped
170 ///
171 /// 向通道发送消息(异步,如果缓冲区满则等待)
172 ///
173 /// # 错误
174 /// 如果接收器已被丢弃,返回 `SendError::Closed`
175 pub async fn send(&self, mut value: T) -> Result<(), SendError<T>> {
176 loop {
177 match self.try_send(value) {
178 Ok(()) => return Ok(()),
179 Err(TrySendError::Closed(v)) => return Err(SendError::Closed(v)),
180 Err(TrySendError::Full(v)) => {
181 // Store the value to retry
182 // 存储值以便重试
183 value = v;
184
185 // Wait for space to become available
186 // 等待空间可用
187 self.inner.send_notify.notified().await;
188
189 // Check if channel was closed while waiting
190 // 检查等待时通道是否已关闭
191 if self.inner.closed.load(Ordering::Acquire) {
192 return Err(SendError::Closed(value));
193 }
194
195 // Retry with the value in next loop iteration
196 // 在下一次循环迭代中使用该值重试
197 }
198 }
199 }
200 }
201
202 /// Try to send a message without blocking
203 ///
204 /// # Errors
205 /// - Returns `TrySendError::Full` if the buffer is full
206 /// - Returns `TrySendError::Closed` if the receiver has been dropped
207 ///
208 /// 尝试非阻塞地发送消息
209 ///
210 /// # 错误
211 /// - 如果缓冲区满,返回 `TrySendError::Full`
212 /// - 如果接收器已被丢弃,返回 `TrySendError::Closed`
213 pub fn try_send(&self, value: T) -> Result<(), TrySendError<T>> {
214 // Check if channel is closed first
215 // 首先检查通道是否已关闭
216 if self.inner.closed.load(Ordering::Acquire) {
217 return Err(TrySendError::Closed(value));
218 }
219
220 // Try to push directly to producer
221 // 直接尝试推送到生产者
222 let mut producer = self.inner.producer.lock();
223 match producer.push(value) {
224 Ok(()) => {
225 // Successfully sent, notify receiver
226 // 成功发送,通知接收者
227 self.inner.recv_notify.notify_one();
228 Ok(())
229 }
230 Err(ringbuf::PushError::Full(v)) => {
231 Err(TrySendError::Full(v))
232 }
233 }
234 }
235
236 /// Check if the channel is closed
237 ///
238 /// 检查通道是否已关闭
239 #[inline]
240 pub fn is_closed(&self) -> bool {
241 self.inner.closed.load(Ordering::Acquire)
242 }
243}
244
245impl<T> Receiver<T> {
246 /// Receive a message from the channel (async, waits if buffer is empty)
247 ///
248 /// Returns `None` if the channel is closed and empty
249 ///
250 /// 从通道接收消息(异步,如果缓冲区空则等待)
251 ///
252 /// 如果通道已关闭且为空,返回 `None`
253 pub async fn recv(&self) -> Option<T> {
254 loop {
255 match self.try_recv() {
256 Ok(value) => return Some(value),
257 Err(TryRecvError::Closed) => return None,
258 Err(TryRecvError::Empty) => {
259 // Check if channel is closed before waiting
260 // 等待前检查通道是否已关闭
261 if self.inner.closed.load(Ordering::Acquire) {
262 // Double check if there are any remaining items
263 // 再次检查是否有剩余项
264 if let Ok(value) = self.try_recv() {
265 return Some(value);
266 }
267 return None;
268 }
269
270 // Wait for data to become available
271 // 等待数据可用
272 self.inner.recv_notify.notified().await;
273 }
274 }
275 }
276 }
277
278 /// Try to receive a message without blocking
279 ///
280 /// # Errors
281 /// - Returns `TryRecvError::Empty` if the buffer is empty
282 /// - Returns `TryRecvError::Closed` if the sender has been dropped and buffer is empty
283 ///
284 /// 尝试非阻塞地接收消息
285 ///
286 /// # 错误
287 /// - 如果缓冲区空,返回 `TryRecvError::Empty`
288 /// - 如果发送器已被丢弃且缓冲区空,返回 `TryRecvError::Closed`
289 pub fn try_recv(&self) -> Result<T, TryRecvError> {
290 // Try to pop directly from consumer
291 // 直接尝试从消费者弹出
292 let mut consumer = self.inner.consumer.lock();
293 match consumer.pop() {
294 Ok(value) => {
295 // Successfully received, notify sender
296 // 成功接收,通知发送者
297 self.inner.send_notify.notify_one();
298 Ok(value)
299 }
300 Err(ringbuf::PopError::Empty) => {
301 if self.inner.closed.load(Ordering::Acquire) {
302 Err(TryRecvError::Closed)
303 } else {
304 Err(TryRecvError::Empty)
305 }
306 }
307 }
308 }
309
310 /// Check if the channel is empty
311 ///
312 /// 检查通道是否为空
313 #[inline]
314 pub fn is_empty(&self) -> bool {
315 self.inner.consumer.lock().is_empty()
316 }
317
318 /// Get the number of messages currently in the channel
319 ///
320 /// 获取通道中当前的消息数量
321 #[inline]
322 pub fn len(&self) -> usize {
323 self.inner.consumer.lock().slots()
324 }
325
326 /// Get the capacity of the channel
327 ///
328 /// 获取通道的容量
329 #[inline]
330 pub fn capacity(&self) -> usize {
331 self.inner.consumer.lock().buffer().capacity()
332 }
333}
334
335impl<T> Drop for Receiver<T> {
336 fn drop(&mut self) {
337 // Mark channel as closed when receiver is dropped
338 // 当接收器被丢弃时标记通道为已关闭
339 self.inner.closed.store(true, Ordering::Release);
340
341 // Notify sender in case it's waiting
342 // 通知发送者以防它正在等待
343 self.inner.send_notify.notify_one();
344 }
345}
346
347impl<T> Drop for Sender<T> {
348 fn drop(&mut self) {
349 // Mark channel as closed when sender is dropped
350 // 当发送器被丢弃时标记通道为已关闭
351 self.inner.closed.store(true, Ordering::Release);
352
353 // Notify receiver in case it's waiting
354 // 通知接收器以防它正在等待
355 self.inner.recv_notify.notify_one();
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362
363 #[tokio::test]
364 async fn test_basic_send_recv() {
365 let (tx, rx) = channel(4);
366
367 tx.send(1).await.unwrap();
368 tx.send(2).await.unwrap();
369 tx.send(3).await.unwrap();
370
371 assert_eq!(rx.recv().await, Some(1));
372 assert_eq!(rx.recv().await, Some(2));
373 assert_eq!(rx.recv().await, Some(3));
374 }
375
376 #[tokio::test]
377 async fn test_try_send_recv() {
378 let (tx, rx) = channel(4);
379
380 tx.try_send(1).unwrap();
381 tx.try_send(2).unwrap();
382
383 assert_eq!(rx.try_recv().unwrap(), 1);
384 assert_eq!(rx.try_recv().unwrap(), 2);
385 assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
386 }
387
388 #[tokio::test]
389 async fn test_channel_closed_on_sender_drop() {
390 let (tx, rx) = channel(4);
391
392 tx.send(1).await.unwrap();
393 drop(tx);
394
395 assert_eq!(rx.recv().await, Some(1));
396 assert_eq!(rx.recv().await, None);
397 }
398
399 #[tokio::test]
400 async fn test_channel_closed_on_receiver_drop() {
401 let (tx, rx) = channel::<i32>(4);
402
403 drop(rx);
404
405 assert!(matches!(tx.send(1).await, Err(SendError::Closed(1))));
406 }
407
408 #[tokio::test]
409 async fn test_cross_task_communication() {
410 let (tx, rx) = channel(4);
411
412 let sender_handle = tokio::spawn(async move {
413 for i in 0..10 {
414 tx.send(i).await.unwrap();
415 }
416 });
417
418 let receiver_handle = tokio::spawn(async move {
419 let mut sum = 0;
420 while let Some(value) = rx.recv().await {
421 sum += value;
422 }
423 sum
424 });
425
426 sender_handle.await.unwrap();
427 let sum = receiver_handle.await.unwrap();
428 assert_eq!(sum, 45); // 0+1+2+...+9 = 45
429 }
430
431 #[tokio::test]
432 async fn test_backpressure() {
433 let (tx, rx) = channel(4);
434
435 // Fill the buffer
436 tx.try_send(1).unwrap();
437 tx.try_send(2).unwrap();
438 tx.try_send(3).unwrap();
439 tx.try_send(4).unwrap();
440
441 // Buffer should be full now
442 assert!(matches!(tx.try_send(5), Err(TrySendError::Full(5))));
443
444 // This should block and then succeed when we consume
445 let send_handle = tokio::spawn(async move {
446 tx.send(5).await.unwrap();
447 tx.send(6).await.unwrap();
448 });
449
450 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
451
452 assert_eq!(rx.recv().await, Some(1));
453 assert_eq!(rx.recv().await, Some(2));
454 assert_eq!(rx.recv().await, Some(3));
455 assert_eq!(rx.recv().await, Some(4));
456 assert_eq!(rx.recv().await, Some(5));
457 assert_eq!(rx.recv().await, Some(6));
458
459 send_handle.await.unwrap();
460 }
461
462 #[tokio::test]
463 async fn test_capacity_and_len() {
464 let (tx, rx) = channel::<i32>(8);
465
466 assert_eq!(rx.capacity(), 8);
467 assert_eq!(rx.len(), 0);
468 assert!(rx.is_empty());
469
470 tx.try_send(1).unwrap();
471 tx.try_send(2).unwrap();
472
473 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
474 assert_eq!(rx.len(), 2);
475 assert!(!rx.is_empty());
476 }
477}