kestrel_timer/utils/
oneshot.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use crate::task::{ONESHOT_PENDING, ONESHOT_CALLED, ONESHOT_CANCELLED, TaskCompletion};
8use super::atomic_waker::AtomicWaker;
9
10/// Trait for types that can be used as oneshot state
11/// 
12/// Types implementing this trait can be converted to/from u8 for atomic storage.
13/// This allows for zero-allocation, lock-free state transitions.
14/// 
15/// 可用作 oneshot 状态的类型的 trait
16/// 
17/// 实现此 trait 的类型可以与 u8 互相转换以进行原子存储。
18/// 这允许零分配、无锁的状态转换。
19/// 
20/// # Built-in Implementations
21/// 
22/// - `TaskCompletion`: Called or Cancelled states
23/// - `()`: Simple completion notification without state
24/// 
25/// # Example: Custom State
26/// 
27/// ```rust,ignore
28/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
29/// enum CustomState {
30///     Success,
31///     Failure,
32///     Timeout,
33/// }
34/// 
35/// impl State for CustomState {
36///     fn to_u8(&self) -> u8 {
37///         match self {
38///             CustomState::Success => 1,
39///             CustomState::Failure => 2,
40///             CustomState::Timeout => 3,
41///         }
42///     }
43///     
44///     fn from_u8(value: u8) -> Option<Self> {
45///         match value {
46///             1 => Some(CustomState::Success),
47///             2 => Some(CustomState::Failure),
48///             3 => Some(CustomState::Timeout),
49///             _ => None,
50///         }
51///     }
52///     
53///     fn pending_value() -> u8 {
54///         0
55///     }
56/// }
57/// 
58/// // Usage:
59/// let (notifier, receiver) = Sender::<CustomState>::new();
60/// tokio::spawn(async move {
61///     notifier.notify(CustomState::Success);
62/// });
63/// let result = receiver.await; // Direct await
64/// ```
65pub trait State: Sized + Send + Sync + 'static {
66    /// Convert the state to u8 for atomic storage
67    /// 
68    /// 将状态转换为 u8 以进行原子存储
69    fn to_u8(&self) -> u8;
70    
71    /// Convert u8 back to the state type
72    /// 
73    /// Returns None if the value doesn't represent a valid state
74    /// 
75    /// 将 u8 转换回状态类型
76    /// 
77    /// 如果值不代表有效状态则返回 None
78    fn from_u8(value: u8) -> Option<Self>;
79    
80    /// The pending state value (before completion)
81    /// 
82    /// 待处理状态值(完成前)
83    fn pending_value() -> u8;
84}
85
86impl State for TaskCompletion {
87    #[inline]
88    fn to_u8(&self) -> u8 {
89        match self {
90            TaskCompletion::Called => ONESHOT_CALLED,
91            TaskCompletion::Cancelled => ONESHOT_CANCELLED,
92        }
93    }
94    
95    #[inline]
96    fn from_u8(value: u8) -> Option<Self> {
97        match value {
98            ONESHOT_CALLED => Some(TaskCompletion::Called),
99            ONESHOT_CANCELLED => Some(TaskCompletion::Cancelled),
100            _ => None,
101        }
102    }
103    
104    #[inline]
105    fn pending_value() -> u8 {
106        ONESHOT_PENDING
107    }
108}
109
110/// Implementation for unit type () - simple completion notification without state
111/// 
112/// 为单元类型 () 实现 - 简单的完成通知,无需状态信息
113impl State for () {
114    #[inline]
115    fn to_u8(&self) -> u8 {
116        1 // Completed
117    }
118    
119    #[inline]
120    fn from_u8(value: u8) -> Option<Self> {
121        match value {
122            1 => Some(()),
123            _ => None,
124        }
125    }
126    
127    #[inline]
128    fn pending_value() -> u8 {
129        0 // Pending
130    }
131}
132
133#[inline]
134pub fn channel<T: State>() -> (Sender<T>, Receiver<T>) {
135    let (notifier, receiver) = Sender::<T>::new();
136    (notifier, receiver)
137}
138
139/// Inner state for one-shot completion notification
140/// 
141/// Uses AtomicWaker for zero Box allocation waker storage:
142/// - Waker itself is just 2 pointers (16 bytes on 64-bit), no additional heap allocation
143/// - Atomic state machine ensures safe concurrent access
144/// - Reuses common AtomicWaker implementation
145/// 
146/// 一次性完成通知的内部状态
147/// 
148/// 使用 AtomicWaker 实现零 Box 分配的 waker 存储:
149/// - Waker 本身只是 2 个指针(64 位系统上 16 字节),无额外堆分配
150/// - 原子状态机确保安全的并发访问
151/// - 复用通用的 AtomicWaker 实现
152pub(crate) struct Inner<T: State> {
153    waker: AtomicWaker,
154    state: AtomicU8,
155    _marker: std::marker::PhantomData<T>,
156}
157
158impl<T: State> Inner<T> {
159    /// Create a new oneshot inner state
160    /// 
161    /// Extremely fast: just initializes empty waker and pending state
162    /// 
163    /// 创建一个新的 oneshot 内部状态
164    /// 
165    /// 极快:仅初始化空 waker 和待处理状态
166    #[inline]
167    pub(crate) fn new() -> Arc<Self> {
168        Arc::new(Self {
169            waker: AtomicWaker::new(),
170            state: AtomicU8::new(T::pending_value()),
171            _marker: std::marker::PhantomData,
172        })
173    }
174    
175    /// Send a completion notification (set state and wake)
176    /// 
177    /// 发送完成通知(设置状态并唤醒)
178    #[inline]
179    pub(crate) fn send(&self, state: T) {
180        // Store completion state first with Release ordering
181        self.state.store(state.to_u8(), Ordering::Release);
182        
183        // Wake the registered waker if any
184        self.waker.wake();
185    }
186    
187    /// Register a waker to be notified on completion
188    /// 
189    /// 注册一个 waker 以在完成时收到通知
190    #[inline]
191    fn register_waker(&self, waker: &std::task::Waker) {
192        self.waker.register(waker);
193    }
194}
195
196// PERFORMANCE OPTIMIZATION: No Drop implementation for Inner
197// Waker cleanup is handled by Receiver::drop instead, which is more efficient because:
198// 1. In the common case (sender notifies before receiver drops), waker is already consumed
199// 2. Only Receiver creates wakers, so only Receiver needs to clean them up
200// 3. This makes Inner::drop a complete no-op, eliminating atomic load overhead
201//
202// 性能优化:Inner 不实现 Drop
203// Waker 清理由 Receiver::drop 处理,这更高效因为:
204// 1. 在常见情况下(发送方在接收方 drop 前通知),waker 已被消费
205// 2. 只有 Receiver 创建 waker,所以只有 Receiver 需要清理它们
206// 3. 这使得 Inner::drop 完全成为 no-op,消除了原子加载开销
207
208/// Completion notifier for one-shot tasks
209/// 
210/// Optimized implementation using direct RawWaker storage + AtomicU8:
211/// - Zero Box allocation: stores waker components directly in atomic fields
212/// - Faster creation than Notify (no waitlist) or AtomicWaker (no state machine)
213/// - Lower memory footprint (three atomic fields: data ptr, vtable ptr, state)
214/// - Direct waker management without intermediate futures or heap allocation
215/// - Single Arc allocation for both sender and receiver
216/// 
217/// 一次性任务完成通知器
218/// 
219/// 使用直接 RawWaker 存储 + AtomicU8 的优化实现:
220/// - 零 Box 分配:直接在原子字段中存储 waker 组件
221/// - 比 Notify(无等待列表)或 AtomicWaker(无状态机)创建更快
222/// - 更小的内存占用(三个原子字段:data 指针、vtable 指针、状态)
223/// - 直接管理 waker,无中间 future 或堆分配
224/// - 发送器和接收器共享单个 Arc 分配
225pub struct Sender<T: State = TaskCompletion> {
226    inner: Arc<Inner<T>>,
227}
228
229impl<T: State> Sender<T> {
230    /// Create a new oneshot completion notifier with receiver
231    /// 
232    /// 创建一个新的 oneshot 完成通知器和接收器
233    /// 
234    /// # Returns
235    /// Returns a tuple of (notifier, receiver)
236    /// 
237    /// 返回 (通知器, 接收器) 元组
238    #[inline]
239    pub fn new() -> (Self, Receiver<T>) {
240        let inner = Inner::new();
241        
242        let notifier = Sender {
243            inner: inner.clone(),
244        };
245        let receiver = Receiver {
246            inner,
247        };
248        
249        (notifier, receiver)
250    }
251    
252    /// Notify completion with the given state
253    /// 
254    /// 使用给定状态通知完成
255    #[inline]
256    pub fn notify(&self, state: T) {
257        self.inner.send(state);
258    }
259}
260
261/// Completion receiver for one-shot tasks
262/// 
263/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
264/// 
265/// 一次性任务完成通知接收器
266/// 
267/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
268/// 
269/// # Examples
270/// 
271/// ## Using default TaskCompletion type
272/// 
273/// ```rust,ignore
274/// let (notifier, receiver) = Sender::new();
275/// 
276/// tokio::spawn(async move {
277///     notifier.notify(TaskCompletion::Called);
278/// });
279/// 
280/// // Two equivalent ways to await:
281/// let result = receiver.await;               // Direct await via Future impl
282/// // or
283/// let result = receiver.wait().await;        // Explicit wait method
284/// ```
285/// 
286/// ## Awaiting on mutable reference
287/// 
288/// ```rust,ignore
289/// let (notifier, mut receiver) = Sender::new();
290/// 
291/// tokio::spawn(async move {
292///     notifier.notify(TaskCompletion::Called);
293/// });
294/// 
295/// // Can also await on &mut receiver
296/// let result = (&mut receiver).await;
297/// ```
298/// 
299/// ## Using unit type for simple completion
300/// 
301/// ```rust,ignore
302/// let (notifier, receiver) = Sender::<()>::new();
303/// 
304/// tokio::spawn(async move {
305///     // ... do work ...
306///     notifier.notify(());  // Signal completion
307/// });
308/// 
309/// receiver.await;  // Wait for completion
310/// ```
311/// 
312/// ## Using custom state
313/// 
314/// ```rust,ignore
315/// let (notifier, receiver) = Sender::<CustomState>::new();
316/// 
317/// tokio::spawn(async move {
318///     notifier.notify(CustomState::Success);
319/// });
320/// 
321/// match receiver.await {
322///     CustomState::Success => println!("Success!"),
323///     CustomState::Failure => println!("Failed"),
324///     CustomState::Timeout => println!("Timed out"),
325/// }
326/// ```
327pub struct Receiver<T: State = TaskCompletion> {
328    inner: Arc<Inner<T>>,
329}
330
331// Receiver is Unpin because all its fields are Unpin
332impl<T: State> Unpin for Receiver<T> {}
333
334// Receiver drop is automatically handled by AtomicWaker's drop implementation
335// No need for explicit drop implementation
336//
337// Receiver 的 drop 由 AtomicWaker 的 drop 实现自动处理
338// 无需显式的 drop 实现
339
340impl<T: State> Receiver<T> {
341    /// Wait for task completion asynchronously
342    /// 
343    /// This is equivalent to using `.await` directly on the receiver
344    /// 
345    /// 异步等待任务完成
346    /// 
347    /// 这等同于直接在 receiver 上使用 `.await`
348    /// 
349    /// # Returns
350    /// Returns the completion state
351    /// 
352    /// # 返回值
353    /// 返回完成状态
354    #[inline]
355    pub async fn wait(self) -> T {
356        self.await
357    }
358}
359
360/// Direct Future implementation for Receiver
361/// 
362/// This allows both `receiver.await` and `(&mut receiver).await` to work
363/// 
364/// Optimized implementation:
365/// - Fast path: Immediate return if already completed (no allocation)
366/// - Slow path: Direct waker registration (no Box allocation, just copy two pointers)
367/// - No intermediate future state needed
368/// 
369/// 为 Receiver 直接实现 Future
370/// 
371/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
372/// 
373/// 优化实现:
374/// - 快速路径:如已完成则立即返回(无分配)
375/// - 慢速路径:直接注册 waker(无 Box 分配,只复制两个指针)
376/// - 无需中间 future 状态
377impl<T: State> Future for Receiver<T> {
378    type Output = T;
379    
380    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
381        // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
382        let this = self.get_mut();
383        
384        // Fast path: check if already completed
385        let current = this.inner.state.load(Ordering::Acquire);
386        if let Some(state) = T::from_u8(current) {
387            if current != T::pending_value() {
388                return Poll::Ready(state);
389            }
390        }
391        
392        // Slow path: register waker for notification
393        this.inner.register_waker(cx.waker());
394        
395        // Check again after registering waker to avoid race condition
396        // The sender might have completed between our first check and waker registration
397        let current = this.inner.state.load(Ordering::Acquire);
398        if let Some(state) = T::from_u8(current) {
399            if current != T::pending_value() {
400                return Poll::Ready(state);
401            }
402        }
403        
404        Poll::Pending
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    
412    #[tokio::test]
413    async fn test_oneshot_called() {
414        let (notifier, receiver) = Sender::new();
415        
416        tokio::spawn(async move {
417            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
418            notifier.notify(TaskCompletion::Called);
419        });
420        
421        let result = receiver.wait().await;
422        assert_eq!(result, TaskCompletion::Called);
423    }
424    
425    #[tokio::test]
426    async fn test_oneshot_cancelled() {
427        let (notifier, receiver) = Sender::new();
428        
429        tokio::spawn(async move {
430            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
431            notifier.notify(TaskCompletion::Cancelled);
432        });
433        
434        let result = receiver.wait().await;
435        assert_eq!(result, TaskCompletion::Cancelled);
436    }
437    
438    #[tokio::test]
439    async fn test_oneshot_immediate_called() {
440        let (notifier, receiver) = Sender::new();
441        
442        // Notify before waiting (fast path)
443        notifier.notify(TaskCompletion::Called);
444        
445        let result = receiver.wait().await;
446        assert_eq!(result, TaskCompletion::Called);
447    }
448    
449    #[tokio::test]
450    async fn test_oneshot_immediate_cancelled() {
451        let (notifier, receiver) = Sender::new();
452        
453        // Notify before waiting (fast path)
454        notifier.notify(TaskCompletion::Cancelled);
455        
456        let result = receiver.wait().await;
457        assert_eq!(result, TaskCompletion::Cancelled);
458    }
459    
460    // Test with custom state type
461    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
462    enum CustomState {
463        Success,
464        Failure,
465        Timeout,
466    }
467    
468    impl State for CustomState {
469        fn to_u8(&self) -> u8 {
470            match self {
471                CustomState::Success => 1,
472                CustomState::Failure => 2,
473                CustomState::Timeout => 3,
474            }
475        }
476        
477        fn from_u8(value: u8) -> Option<Self> {
478            match value {
479                1 => Some(CustomState::Success),
480                2 => Some(CustomState::Failure),
481                3 => Some(CustomState::Timeout),
482                _ => None,
483            }
484        }
485        
486        fn pending_value() -> u8 {
487            0
488        }
489    }
490    
491    #[tokio::test]
492    async fn test_oneshot_custom_state() {
493        let (notifier, receiver) = Sender::<CustomState>::new();
494        
495        tokio::spawn(async move {
496            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
497            notifier.notify(CustomState::Success);
498        });
499        
500        let result = receiver.wait().await;
501        assert_eq!(result, CustomState::Success);
502    }
503    
504    #[tokio::test]
505    async fn test_oneshot_custom_state_timeout() {
506        let (notifier, receiver) = Sender::<CustomState>::new();
507        
508        // Immediate notification
509        notifier.notify(CustomState::Timeout);
510        
511        let result = receiver.wait().await;
512        assert_eq!(result, CustomState::Timeout);
513    }
514    
515    #[tokio::test]
516    async fn test_oneshot_unit_type() {
517        let (notifier, receiver) = Sender::<()>::new();
518        
519        tokio::spawn(async move {
520            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
521            notifier.notify(());
522        });
523        
524        let result = receiver.wait().await;
525        assert_eq!(result, ());
526    }
527    
528    #[tokio::test]
529    async fn test_oneshot_unit_type_immediate() {
530        let (notifier, receiver) = Sender::<()>::new();
531        
532        // Immediate notification (fast path)
533        notifier.notify(());
534        
535        let result = receiver.wait().await;
536        assert_eq!(result, ());
537    }
538    
539    // Tests for IntoFuture implementation
540    #[tokio::test]
541    async fn test_oneshot_into_future_called() {
542        let (notifier, receiver) = Sender::new();
543        
544        tokio::spawn(async move {
545            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
546            notifier.notify(TaskCompletion::Called);
547        });
548        
549        // Direct await without .wait()
550        let result = receiver.await;
551        assert_eq!(result, TaskCompletion::Called);
552    }
553    
554    #[tokio::test]
555    async fn test_oneshot_into_future_immediate() {
556        let (notifier, receiver) = Sender::new();
557        
558        // Notify before awaiting (fast path)
559        notifier.notify(TaskCompletion::Cancelled);
560        
561        // Direct await
562        let result = receiver.await;
563        assert_eq!(result, TaskCompletion::Cancelled);
564    }
565    
566    #[tokio::test]
567    async fn test_oneshot_into_future_unit_type() {
568        let (notifier, receiver) = Sender::<()>::new();
569        
570        tokio::spawn(async move {
571            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
572            notifier.notify(());
573        });
574        
575        // Direct await with unit type
576        let result = receiver.await;
577        assert_eq!(result, ());
578    }
579    
580    #[tokio::test]
581    async fn test_oneshot_into_future_custom_state() {
582        let (notifier, receiver) = Sender::<CustomState>::new();
583        
584        tokio::spawn(async move {
585            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
586            notifier.notify(CustomState::Failure);
587        });
588        
589        // Direct await with custom state
590        let result = receiver.await;
591        assert_eq!(result, CustomState::Failure);
592    }
593    
594    // Test awaiting on &mut receiver
595    #[tokio::test]
596    async fn test_oneshot_await_mut_reference() {
597        let (notifier, mut receiver) = Sender::new();
598        
599        tokio::spawn(async move {
600            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
601            notifier.notify(TaskCompletion::Called);
602        });
603        
604        // Await on mutable reference
605        let result = (&mut receiver).await;
606        assert_eq!(result, TaskCompletion::Called);
607    }
608    
609    #[tokio::test]
610    async fn test_oneshot_await_mut_reference_unit_type() {
611        let (notifier, mut receiver) = Sender::<()>::new();
612        
613        // Immediate notification
614        notifier.notify(());
615        
616        // Await on mutable reference (fast path)
617        let result = (&mut receiver).await;
618        assert_eq!(result, ());
619    }
620}
621