kestrel_timer/utils/oneshot.rs
1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use crate::task::{ONESHOT_PENDING, ONESHOT_CALLED, ONESHOT_CANCELLED, TaskCompletion};
8use super::atomic_waker::AtomicWaker;
9
10/// Trait for types that can be used as oneshot state
11///
12/// Types implementing this trait can be converted to/from u8 for atomic storage.
13/// This allows for zero-allocation, lock-free state transitions.
14///
15/// 可用作 oneshot 状态的类型的 trait
16///
17/// 实现此 trait 的类型可以与 u8 互相转换以进行原子存储。
18/// 这允许零分配、无锁的状态转换。
19///
20/// # Built-in Implementations
21///
22/// - `TaskCompletion`: Called or Cancelled states
23/// - `()`: Simple completion notification without state
24///
25/// # Example: Custom State
26///
27/// ```rust,ignore
28/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
29/// enum CustomState {
30/// Success,
31/// Failure,
32/// Timeout,
33/// }
34///
35/// impl State for CustomState {
36/// fn to_u8(&self) -> u8 {
37/// match self {
38/// CustomState::Success => 1,
39/// CustomState::Failure => 2,
40/// CustomState::Timeout => 3,
41/// }
42/// }
43///
44/// fn from_u8(value: u8) -> Option<Self> {
45/// match value {
46/// 1 => Some(CustomState::Success),
47/// 2 => Some(CustomState::Failure),
48/// 3 => Some(CustomState::Timeout),
49/// _ => None,
50/// }
51/// }
52///
53/// fn pending_value() -> u8 {
54/// 0
55/// }
56/// }
57///
58/// // Usage:
59/// let (notifier, receiver) = Sender::<CustomState>::new();
60/// tokio::spawn(async move {
61/// notifier.notify(CustomState::Success);
62/// });
63/// let result = receiver.await; // Direct await
64/// ```
65pub trait State: Sized + Send + Sync + 'static {
66 /// Convert the state to u8 for atomic storage
67 ///
68 /// 将状态转换为 u8 以进行原子存储
69 fn to_u8(&self) -> u8;
70
71 /// Convert u8 back to the state type
72 ///
73 /// Returns None if the value doesn't represent a valid state
74 ///
75 /// 将 u8 转换回状态类型
76 ///
77 /// 如果值不代表有效状态则返回 None
78 fn from_u8(value: u8) -> Option<Self>;
79
80 /// The pending state value (before completion)
81 ///
82 /// 待处理状态值(完成前)
83 fn pending_value() -> u8;
84}
85
86impl State for TaskCompletion {
87 #[inline]
88 fn to_u8(&self) -> u8 {
89 match self {
90 TaskCompletion::Called => ONESHOT_CALLED,
91 TaskCompletion::Cancelled => ONESHOT_CANCELLED,
92 }
93 }
94
95 #[inline]
96 fn from_u8(value: u8) -> Option<Self> {
97 match value {
98 ONESHOT_CALLED => Some(TaskCompletion::Called),
99 ONESHOT_CANCELLED => Some(TaskCompletion::Cancelled),
100 _ => None,
101 }
102 }
103
104 #[inline]
105 fn pending_value() -> u8 {
106 ONESHOT_PENDING
107 }
108}
109
110/// Implementation for unit type () - simple completion notification without state
111///
112/// 为单元类型 () 实现 - 简单的完成通知,无需状态信息
113impl State for () {
114 #[inline]
115 fn to_u8(&self) -> u8 {
116 1 // Completed
117 }
118
119 #[inline]
120 fn from_u8(value: u8) -> Option<Self> {
121 match value {
122 1 => Some(()),
123 _ => None,
124 }
125 }
126
127 #[inline]
128 fn pending_value() -> u8 {
129 0 // Pending
130 }
131}
132
133#[inline]
134pub fn channel<T: State>() -> (Sender<T>, Receiver<T>) {
135 let (notifier, receiver) = Sender::<T>::new();
136 (notifier, receiver)
137}
138
139/// Inner state for one-shot completion notification
140///
141/// Uses AtomicWaker for zero Box allocation waker storage:
142/// - Waker itself is just 2 pointers (16 bytes on 64-bit), no additional heap allocation
143/// - Atomic state machine ensures safe concurrent access
144/// - Reuses common AtomicWaker implementation
145///
146/// 一次性完成通知的内部状态
147///
148/// 使用 AtomicWaker 实现零 Box 分配的 waker 存储:
149/// - Waker 本身只是 2 个指针(64 位系统上 16 字节),无额外堆分配
150/// - 原子状态机确保安全的并发访问
151/// - 复用通用的 AtomicWaker 实现
152pub(crate) struct Inner<T: State> {
153 waker: AtomicWaker,
154 state: AtomicU8,
155 _marker: std::marker::PhantomData<T>,
156}
157
158impl<T: State> Inner<T> {
159 /// Create a new oneshot inner state
160 ///
161 /// Extremely fast: just initializes empty waker and pending state
162 ///
163 /// 创建一个新的 oneshot 内部状态
164 ///
165 /// 极快:仅初始化空 waker 和待处理状态
166 #[inline]
167 pub(crate) fn new() -> Arc<Self> {
168 Arc::new(Self {
169 waker: AtomicWaker::new(),
170 state: AtomicU8::new(T::pending_value()),
171 _marker: std::marker::PhantomData,
172 })
173 }
174
175 /// Send a completion notification (set state and wake)
176 ///
177 /// 发送完成通知(设置状态并唤醒)
178 #[inline]
179 pub(crate) fn send(&self, state: T) {
180 // Store completion state first with Release ordering
181 self.state.store(state.to_u8(), Ordering::Release);
182
183 // Wake the registered waker if any
184 self.waker.wake();
185 }
186
187 /// Register a waker to be notified on completion
188 ///
189 /// 注册一个 waker 以在完成时收到通知
190 #[inline]
191 fn register_waker(&self, waker: &std::task::Waker) {
192 self.waker.register(waker);
193 }
194}
195
196// PERFORMANCE OPTIMIZATION: No Drop implementation for Inner
197// Waker cleanup is handled by Receiver::drop instead, which is more efficient because:
198// 1. In the common case (sender notifies before receiver drops), waker is already consumed
199// 2. Only Receiver creates wakers, so only Receiver needs to clean them up
200// 3. This makes Inner::drop a complete no-op, eliminating atomic load overhead
201//
202// 性能优化:Inner 不实现 Drop
203// Waker 清理由 Receiver::drop 处理,这更高效因为:
204// 1. 在常见情况下(发送方在接收方 drop 前通知),waker 已被消费
205// 2. 只有 Receiver 创建 waker,所以只有 Receiver 需要清理它们
206// 3. 这使得 Inner::drop 完全成为 no-op,消除了原子加载开销
207
208/// Completion notifier for one-shot tasks
209///
210/// Optimized implementation using direct RawWaker storage + AtomicU8:
211/// - Zero Box allocation: stores waker components directly in atomic fields
212/// - Faster creation than Notify (no waitlist) or AtomicWaker (no state machine)
213/// - Lower memory footprint (three atomic fields: data ptr, vtable ptr, state)
214/// - Direct waker management without intermediate futures or heap allocation
215/// - Single Arc allocation for both sender and receiver
216///
217/// 一次性任务完成通知器
218///
219/// 使用直接 RawWaker 存储 + AtomicU8 的优化实现:
220/// - 零 Box 分配:直接在原子字段中存储 waker 组件
221/// - 比 Notify(无等待列表)或 AtomicWaker(无状态机)创建更快
222/// - 更小的内存占用(三个原子字段:data 指针、vtable 指针、状态)
223/// - 直接管理 waker,无中间 future 或堆分配
224/// - 发送器和接收器共享单个 Arc 分配
225pub struct Sender<T: State = TaskCompletion> {
226 inner: Arc<Inner<T>>,
227}
228
229impl<T: State> Sender<T> {
230 /// Create a new oneshot completion notifier with receiver
231 ///
232 /// 创建一个新的 oneshot 完成通知器和接收器
233 ///
234 /// # Returns
235 /// Returns a tuple of (notifier, receiver)
236 ///
237 /// 返回 (通知器, 接收器) 元组
238 #[inline]
239 pub fn new() -> (Self, Receiver<T>) {
240 let inner = Inner::new();
241
242 let notifier = Sender {
243 inner: inner.clone(),
244 };
245 let receiver = Receiver {
246 inner,
247 };
248
249 (notifier, receiver)
250 }
251
252 /// Notify completion with the given state
253 ///
254 /// 使用给定状态通知完成
255 #[inline]
256 pub fn notify(&self, state: T) {
257 self.inner.send(state);
258 }
259}
260
261/// Completion receiver for one-shot tasks
262///
263/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
264///
265/// 一次性任务完成通知接收器
266///
267/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
268///
269/// # Examples
270///
271/// ## Using default TaskCompletion type
272///
273/// ```rust,ignore
274/// let (notifier, receiver) = Sender::new();
275///
276/// tokio::spawn(async move {
277/// notifier.notify(TaskCompletion::Called);
278/// });
279///
280/// // Two equivalent ways to await:
281/// let result = receiver.await; // Direct await via Future impl
282/// // or
283/// let result = receiver.wait().await; // Explicit wait method
284/// ```
285///
286/// ## Awaiting on mutable reference
287///
288/// ```rust,ignore
289/// let (notifier, mut receiver) = Sender::new();
290///
291/// tokio::spawn(async move {
292/// notifier.notify(TaskCompletion::Called);
293/// });
294///
295/// // Can also await on &mut receiver
296/// let result = (&mut receiver).await;
297/// ```
298///
299/// ## Using unit type for simple completion
300///
301/// ```rust,ignore
302/// let (notifier, receiver) = Sender::<()>::new();
303///
304/// tokio::spawn(async move {
305/// // ... do work ...
306/// notifier.notify(()); // Signal completion
307/// });
308///
309/// receiver.await; // Wait for completion
310/// ```
311///
312/// ## Using custom state
313///
314/// ```rust,ignore
315/// let (notifier, receiver) = Sender::<CustomState>::new();
316///
317/// tokio::spawn(async move {
318/// notifier.notify(CustomState::Success);
319/// });
320///
321/// match receiver.await {
322/// CustomState::Success => println!("Success!"),
323/// CustomState::Failure => println!("Failed"),
324/// CustomState::Timeout => println!("Timed out"),
325/// }
326/// ```
327pub struct Receiver<T: State = TaskCompletion> {
328 inner: Arc<Inner<T>>,
329}
330
331// Receiver is Unpin because all its fields are Unpin
332impl<T: State> Unpin for Receiver<T> {}
333
334// Receiver drop is automatically handled by AtomicWaker's drop implementation
335// No need for explicit drop implementation
336//
337// Receiver 的 drop 由 AtomicWaker 的 drop 实现自动处理
338// 无需显式的 drop 实现
339
340impl<T: State> Receiver<T> {
341 /// Wait for task completion asynchronously
342 ///
343 /// This is equivalent to using `.await` directly on the receiver
344 ///
345 /// 异步等待任务完成
346 ///
347 /// 这等同于直接在 receiver 上使用 `.await`
348 ///
349 /// # Returns
350 /// Returns the completion state
351 ///
352 /// # 返回值
353 /// 返回完成状态
354 #[inline]
355 pub async fn wait(self) -> T {
356 self.await
357 }
358}
359
360/// Direct Future implementation for Receiver
361///
362/// This allows both `receiver.await` and `(&mut receiver).await` to work
363///
364/// Optimized implementation:
365/// - Fast path: Immediate return if already completed (no allocation)
366/// - Slow path: Direct waker registration (no Box allocation, just copy two pointers)
367/// - No intermediate future state needed
368///
369/// 为 Receiver 直接实现 Future
370///
371/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
372///
373/// 优化实现:
374/// - 快速路径:如已完成则立即返回(无分配)
375/// - 慢速路径:直接注册 waker(无 Box 分配,只复制两个指针)
376/// - 无需中间 future 状态
377impl<T: State> Future for Receiver<T> {
378 type Output = T;
379
380 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
381 // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
382 let this = self.get_mut();
383
384 // Fast path: check if already completed
385 let current = this.inner.state.load(Ordering::Acquire);
386 if let Some(state) = T::from_u8(current) {
387 if current != T::pending_value() {
388 return Poll::Ready(state);
389 }
390 }
391
392 // Slow path: register waker for notification
393 this.inner.register_waker(cx.waker());
394
395 // Check again after registering waker to avoid race condition
396 // The sender might have completed between our first check and waker registration
397 let current = this.inner.state.load(Ordering::Acquire);
398 if let Some(state) = T::from_u8(current) {
399 if current != T::pending_value() {
400 return Poll::Ready(state);
401 }
402 }
403
404 Poll::Pending
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411
412 #[tokio::test]
413 async fn test_oneshot_called() {
414 let (notifier, receiver) = Sender::new();
415
416 tokio::spawn(async move {
417 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
418 notifier.notify(TaskCompletion::Called);
419 });
420
421 let result = receiver.wait().await;
422 assert_eq!(result, TaskCompletion::Called);
423 }
424
425 #[tokio::test]
426 async fn test_oneshot_cancelled() {
427 let (notifier, receiver) = Sender::new();
428
429 tokio::spawn(async move {
430 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
431 notifier.notify(TaskCompletion::Cancelled);
432 });
433
434 let result = receiver.wait().await;
435 assert_eq!(result, TaskCompletion::Cancelled);
436 }
437
438 #[tokio::test]
439 async fn test_oneshot_immediate_called() {
440 let (notifier, receiver) = Sender::new();
441
442 // Notify before waiting (fast path)
443 notifier.notify(TaskCompletion::Called);
444
445 let result = receiver.wait().await;
446 assert_eq!(result, TaskCompletion::Called);
447 }
448
449 #[tokio::test]
450 async fn test_oneshot_immediate_cancelled() {
451 let (notifier, receiver) = Sender::new();
452
453 // Notify before waiting (fast path)
454 notifier.notify(TaskCompletion::Cancelled);
455
456 let result = receiver.wait().await;
457 assert_eq!(result, TaskCompletion::Cancelled);
458 }
459
460 // Test with custom state type
461 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
462 enum CustomState {
463 Success,
464 Failure,
465 Timeout,
466 }
467
468 impl State for CustomState {
469 fn to_u8(&self) -> u8 {
470 match self {
471 CustomState::Success => 1,
472 CustomState::Failure => 2,
473 CustomState::Timeout => 3,
474 }
475 }
476
477 fn from_u8(value: u8) -> Option<Self> {
478 match value {
479 1 => Some(CustomState::Success),
480 2 => Some(CustomState::Failure),
481 3 => Some(CustomState::Timeout),
482 _ => None,
483 }
484 }
485
486 fn pending_value() -> u8 {
487 0
488 }
489 }
490
491 #[tokio::test]
492 async fn test_oneshot_custom_state() {
493 let (notifier, receiver) = Sender::<CustomState>::new();
494
495 tokio::spawn(async move {
496 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
497 notifier.notify(CustomState::Success);
498 });
499
500 let result = receiver.wait().await;
501 assert_eq!(result, CustomState::Success);
502 }
503
504 #[tokio::test]
505 async fn test_oneshot_custom_state_timeout() {
506 let (notifier, receiver) = Sender::<CustomState>::new();
507
508 // Immediate notification
509 notifier.notify(CustomState::Timeout);
510
511 let result = receiver.wait().await;
512 assert_eq!(result, CustomState::Timeout);
513 }
514
515 #[tokio::test]
516 async fn test_oneshot_unit_type() {
517 let (notifier, receiver) = Sender::<()>::new();
518
519 tokio::spawn(async move {
520 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
521 notifier.notify(());
522 });
523
524 let result = receiver.wait().await;
525 assert_eq!(result, ());
526 }
527
528 #[tokio::test]
529 async fn test_oneshot_unit_type_immediate() {
530 let (notifier, receiver) = Sender::<()>::new();
531
532 // Immediate notification (fast path)
533 notifier.notify(());
534
535 let result = receiver.wait().await;
536 assert_eq!(result, ());
537 }
538
539 // Tests for IntoFuture implementation
540 #[tokio::test]
541 async fn test_oneshot_into_future_called() {
542 let (notifier, receiver) = Sender::new();
543
544 tokio::spawn(async move {
545 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
546 notifier.notify(TaskCompletion::Called);
547 });
548
549 // Direct await without .wait()
550 let result = receiver.await;
551 assert_eq!(result, TaskCompletion::Called);
552 }
553
554 #[tokio::test]
555 async fn test_oneshot_into_future_immediate() {
556 let (notifier, receiver) = Sender::new();
557
558 // Notify before awaiting (fast path)
559 notifier.notify(TaskCompletion::Cancelled);
560
561 // Direct await
562 let result = receiver.await;
563 assert_eq!(result, TaskCompletion::Cancelled);
564 }
565
566 #[tokio::test]
567 async fn test_oneshot_into_future_unit_type() {
568 let (notifier, receiver) = Sender::<()>::new();
569
570 tokio::spawn(async move {
571 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
572 notifier.notify(());
573 });
574
575 // Direct await with unit type
576 let result = receiver.await;
577 assert_eq!(result, ());
578 }
579
580 #[tokio::test]
581 async fn test_oneshot_into_future_custom_state() {
582 let (notifier, receiver) = Sender::<CustomState>::new();
583
584 tokio::spawn(async move {
585 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
586 notifier.notify(CustomState::Failure);
587 });
588
589 // Direct await with custom state
590 let result = receiver.await;
591 assert_eq!(result, CustomState::Failure);
592 }
593
594 // Test awaiting on &mut receiver
595 #[tokio::test]
596 async fn test_oneshot_await_mut_reference() {
597 let (notifier, mut receiver) = Sender::new();
598
599 tokio::spawn(async move {
600 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
601 notifier.notify(TaskCompletion::Called);
602 });
603
604 // Await on mutable reference
605 let result = (&mut receiver).await;
606 assert_eq!(result, TaskCompletion::Called);
607 }
608
609 #[tokio::test]
610 async fn test_oneshot_await_mut_reference_unit_type() {
611 let (notifier, mut receiver) = Sender::<()>::new();
612
613 // Immediate notification
614 notifier.notify(());
615
616 // Await on mutable reference (fast path)
617 let result = (&mut receiver).await;
618 assert_eq!(result, ());
619 }
620}
621