kestrel_timer/utils/
oneshot.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, AtomicPtr, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll, Waker};
6use std::ptr;
7
8use crate::task::{ONESHOT_PENDING, ONESHOT_CALLED, ONESHOT_CANCELLED, TaskCompletion};
9
10/// Trait for types that can be used as oneshot state
11/// 
12/// Types implementing this trait can be converted to/from u8 for atomic storage.
13/// This allows for zero-allocation, lock-free state transitions.
14/// 
15/// 可用作 oneshot 状态的类型的 trait
16/// 
17/// 实现此 trait 的类型可以与 u8 互相转换以进行原子存储。
18/// 这允许零分配、无锁的状态转换。
19/// 
20/// # Built-in Implementations
21/// 
22/// - `TaskCompletion`: Called or Cancelled states
23/// - `()`: Simple completion notification without state
24/// 
25/// # Example: Custom State
26/// 
27/// ```rust,ignore
28/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
29/// enum CustomState {
30///     Success,
31///     Failure,
32///     Timeout,
33/// }
34/// 
35/// impl State for CustomState {
36///     fn to_u8(&self) -> u8 {
37///         match self {
38///             CustomState::Success => 1,
39///             CustomState::Failure => 2,
40///             CustomState::Timeout => 3,
41///         }
42///     }
43///     
44///     fn from_u8(value: u8) -> Option<Self> {
45///         match value {
46///             1 => Some(CustomState::Success),
47///             2 => Some(CustomState::Failure),
48///             3 => Some(CustomState::Timeout),
49///             _ => None,
50///         }
51///     }
52///     
53///     fn pending_value() -> u8 {
54///         0
55///     }
56/// }
57/// 
58/// // Usage:
59/// let (notifier, receiver) = Sender::<CustomState>::new();
60/// tokio::spawn(async move {
61///     notifier.notify(CustomState::Success);
62/// });
63/// let result = receiver.await; // Direct await
64/// ```
65pub trait State: Sized + Send + Sync + 'static {
66    /// Convert the state to u8 for atomic storage
67    /// 
68    /// 将状态转换为 u8 以进行原子存储
69    fn to_u8(&self) -> u8;
70    
71    /// Convert u8 back to the state type
72    /// 
73    /// Returns None if the value doesn't represent a valid state
74    /// 
75    /// 将 u8 转换回状态类型
76    /// 
77    /// 如果值不代表有效状态则返回 None
78    fn from_u8(value: u8) -> Option<Self>;
79    
80    /// The pending state value (before completion)
81    /// 
82    /// 待处理状态值(完成前)
83    fn pending_value() -> u8;
84}
85
86impl State for TaskCompletion {
87    #[inline]
88    fn to_u8(&self) -> u8 {
89        match self {
90            TaskCompletion::Called => ONESHOT_CALLED,
91            TaskCompletion::Cancelled => ONESHOT_CANCELLED,
92        }
93    }
94    
95    #[inline]
96    fn from_u8(value: u8) -> Option<Self> {
97        match value {
98            ONESHOT_CALLED => Some(TaskCompletion::Called),
99            ONESHOT_CANCELLED => Some(TaskCompletion::Cancelled),
100            _ => None,
101        }
102    }
103    
104    #[inline]
105    fn pending_value() -> u8 {
106        ONESHOT_PENDING
107    }
108}
109
110/// Implementation for unit type () - simple completion notification without state
111/// 
112/// 为单元类型 () 实现 - 简单的完成通知,无需状态信息
113impl State for () {
114    #[inline]
115    fn to_u8(&self) -> u8 {
116        1 // Completed
117    }
118    
119    #[inline]
120    fn from_u8(value: u8) -> Option<Self> {
121        match value {
122            1 => Some(()),
123            _ => None,
124        }
125    }
126    
127    #[inline]
128    fn pending_value() -> u8 {
129        0 // Pending
130    }
131}
132
133#[inline]
134pub fn channel<T: State>() -> (Sender<T>, Receiver<T>) {
135    let (notifier, receiver) = Sender::<T>::new();
136    (notifier, receiver)
137}
138
139/// Inner state for one-shot completion notification
140/// 
141/// Uses AtomicPtr<Waker> for minimal overhead:
142/// - Creation: Just initialize null pointer (no allocation)
143/// - Send: Atomic swap and wake (minimal operations)
144/// - Recv: Atomic store waker pointer (no intermediate futures)
145/// 
146/// Much lighter than both Notify (has waitlist) and AtomicWaker (has state machine)
147/// 
148/// 一次性完成通知的内部状态
149/// 
150/// 使用 AtomicPtr<Waker> 实现最小开销:
151/// - 创建:仅初始化空指针(无分配)
152/// - 发送:原子交换并唤醒(最少操作)
153/// - 接收:原子存储 waker 指针(无中间 future)
154/// 
155/// 比 Notify(有等待列表)和 AtomicWaker(有状态机)都更轻量
156pub(crate) struct Inner<T: State> {
157    waker: AtomicPtr<Waker>,
158    state: AtomicU8,
159    _marker: std::marker::PhantomData<T>,
160}
161
162impl<T: State> Inner<T> {
163    /// Create a new oneshot inner state
164    /// 
165    /// Extremely fast: just initializes null pointer and pending state
166    /// 
167    /// 创建一个新的 oneshot 内部状态
168    /// 
169    /// 极快:仅初始化空指针和待处理状态
170    #[inline]
171    pub(crate) fn new() -> Arc<Self> {
172        Arc::new(Self {
173            waker: AtomicPtr::new(ptr::null_mut()),
174            state: AtomicU8::new(T::pending_value()),
175            _marker: std::marker::PhantomData,
176        })
177    }
178    
179    /// Send a completion notification (set state and wake)
180    /// 
181    /// Optimized: atomically swap waker and wake if exists
182    /// 
183    /// 发送完成通知(设置状态并唤醒)
184    /// 
185    /// 优化:原子交换 waker 并在存在时唤醒
186    #[inline]
187    pub(crate) fn send(&self, state: T) {
188        // Store state first with Release ordering
189        self.state.store(state.to_u8(), Ordering::Release);
190        
191        // Atomically take the waker (if any) and wake it
192        let waker_ptr = self.waker.swap(ptr::null_mut(), Ordering::AcqRel);
193        if !waker_ptr.is_null() {
194            // SAFETY: We know this pointer was created by Box::into_raw in register_waker
195            // and we're the only one who can swap it out
196            unsafe {
197                let waker = Box::from_raw(waker_ptr);
198                waker.wake();
199            }
200        }
201    }
202    
203    /// Register a waker to be notified on completion
204    /// 
205    /// 注册一个 waker 以在完成时收到通知
206    #[inline]
207    fn register_waker(&self, waker: &Waker) {
208        // Create a boxed waker and convert to raw pointer
209        let new_waker = Box::into_raw(Box::new(waker.clone()));
210        
211        // Atomically swap the old waker with the new one
212        let old_waker = self.waker.swap(new_waker, Ordering::AcqRel);
213        
214        // Drop the old waker if it exists
215        if !old_waker.is_null() {
216            // SAFETY: This pointer was created by Box::into_raw
217            unsafe {
218                let _ = Box::from_raw(old_waker);
219            }
220        }
221    }
222}
223
224impl<T: State> Drop for Inner<T> {
225    fn drop(&mut self) {
226        // Clean up any remaining waker
227        let waker_ptr = self.waker.load(Ordering::Acquire);
228        if !waker_ptr.is_null() {
229            // SAFETY: This pointer was created by Box::into_raw and we're dropping
230            unsafe {
231                let _ = Box::from_raw(waker_ptr);
232            }
233        }
234    }
235}
236
237/// Completion notifier for one-shot tasks
238/// 
239/// Optimized implementation using AtomicPtr<Waker> + AtomicU8:
240/// - Faster creation than Notify (no waitlist allocation) or AtomicWaker (no state machine)
241/// - Lower memory footprint (just two atomic fields)
242/// - Direct waker management without intermediate futures
243/// - Single Arc allocation for both sender and receiver
244/// 
245/// 一次性任务完成通知器
246/// 
247/// 使用 AtomicPtr<Waker> + AtomicU8 的优化实现:
248/// - 比 Notify(无等待列表分配)或 AtomicWaker(无状态机)创建更快
249/// - 更小的内存占用(仅两个原子字段)
250/// - 直接管理 waker,无中间 future
251/// - 发送器和接收器共享单个 Arc 分配
252pub struct Sender<T: State = TaskCompletion> {
253    inner: Arc<Inner<T>>,
254}
255
256impl<T: State> Sender<T> {
257    /// Create a new oneshot completion notifier with receiver
258    /// 
259    /// 创建一个新的 oneshot 完成通知器和接收器
260    /// 
261    /// # Returns
262    /// Returns a tuple of (notifier, receiver)
263    /// 
264    /// 返回 (通知器, 接收器) 元组
265    #[inline]
266    pub fn new() -> (Self, Receiver<T>) {
267        let inner = Inner::new();
268        
269        let notifier = Sender {
270            inner: inner.clone(),
271        };
272        let receiver = Receiver {
273            inner,
274        };
275        
276        (notifier, receiver)
277    }
278    
279    /// Notify completion with the given state
280    /// 
281    /// 使用给定状态通知完成
282    #[inline]
283    pub fn notify(&self, state: T) {
284        self.inner.send(state);
285    }
286}
287
288/// Completion receiver for one-shot tasks
289/// 
290/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
291/// 
292/// 一次性任务完成通知接收器
293/// 
294/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
295/// 
296/// # Examples
297/// 
298/// ## Using default TaskCompletion type
299/// 
300/// ```rust,ignore
301/// let (notifier, receiver) = Sender::new();
302/// 
303/// tokio::spawn(async move {
304///     notifier.notify(TaskCompletion::Called);
305/// });
306/// 
307/// // Two equivalent ways to await:
308/// let result = receiver.await;               // Direct await via Future impl
309/// // or
310/// let result = receiver.wait().await;        // Explicit wait method
311/// ```
312/// 
313/// ## Awaiting on mutable reference
314/// 
315/// ```rust,ignore
316/// let (notifier, mut receiver) = Sender::new();
317/// 
318/// tokio::spawn(async move {
319///     notifier.notify(TaskCompletion::Called);
320/// });
321/// 
322/// // Can also await on &mut receiver
323/// let result = (&mut receiver).await;
324/// ```
325/// 
326/// ## Using unit type for simple completion
327/// 
328/// ```rust,ignore
329/// let (notifier, receiver) = Sender::<()>::new();
330/// 
331/// tokio::spawn(async move {
332///     // ... do work ...
333///     notifier.notify(());  // Signal completion
334/// });
335/// 
336/// receiver.await;  // Wait for completion
337/// ```
338/// 
339/// ## Using custom state
340/// 
341/// ```rust,ignore
342/// let (notifier, receiver) = Sender::<CustomState>::new();
343/// 
344/// tokio::spawn(async move {
345///     notifier.notify(CustomState::Success);
346/// });
347/// 
348/// match receiver.await {
349///     CustomState::Success => println!("Success!"),
350///     CustomState::Failure => println!("Failed"),
351///     CustomState::Timeout => println!("Timed out"),
352/// }
353/// ```
354pub struct Receiver<T: State = TaskCompletion> {
355    inner: Arc<Inner<T>>,
356}
357
358// Receiver is Unpin because all its fields are Unpin
359impl<T: State> Unpin for Receiver<T> {}
360
361impl<T: State> Receiver<T> {
362    /// Wait for task completion asynchronously
363    /// 
364    /// This is equivalent to using `.await` directly on the receiver
365    /// 
366    /// 异步等待任务完成
367    /// 
368    /// 这等同于直接在 receiver 上使用 `.await`
369    /// 
370    /// # Returns
371    /// Returns the completion state
372    /// 
373    /// # 返回值
374    /// 返回完成状态
375    #[inline]
376    pub async fn wait(self) -> T {
377        self.await
378    }
379}
380
381/// Direct Future implementation for Receiver
382/// 
383/// This allows both `receiver.await` and `(&mut receiver).await` to work
384/// 
385/// Optimized implementation:
386/// - Fast path: Immediate return if already completed (no allocation)
387/// - Slow path: Direct waker registration (simpler than Notify's waitlist)
388/// - No intermediate future state needed
389/// 
390/// 为 Receiver 直接实现 Future
391/// 
392/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
393/// 
394/// 优化实现:
395/// - 快速路径:如已完成则立即返回(无分配)
396/// - 慢速路径:直接注册 waker(比 Notify 的等待列表更简单)
397/// - 无需中间 future 状态
398impl<T: State> Future for Receiver<T> {
399    type Output = T;
400    
401    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
402        // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
403        let this = self.get_mut();
404        
405        // Fast path: check if already completed
406        let current = this.inner.state.load(Ordering::Acquire);
407        if let Some(state) = T::from_u8(current) {
408            if current != T::pending_value() {
409                return Poll::Ready(state);
410            }
411        }
412        
413        // Slow path: register waker for notification
414        // This is much lighter than Notify's waitlist management
415        this.inner.register_waker(cx.waker());
416        
417        // Check again after registering waker to avoid race condition
418        // The sender might have completed between our first check and waker registration
419        let current = this.inner.state.load(Ordering::Acquire);
420        if let Some(state) = T::from_u8(current) {
421            if current != T::pending_value() {
422                return Poll::Ready(state);
423            }
424        }
425        
426        Poll::Pending
427    }
428}
429
430#[cfg(test)]
431mod tests {
432    use super::*;
433    
434    #[tokio::test]
435    async fn test_oneshot_called() {
436        let (notifier, receiver) = Sender::new();
437        
438        tokio::spawn(async move {
439            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
440            notifier.notify(TaskCompletion::Called);
441        });
442        
443        let result = receiver.wait().await;
444        assert_eq!(result, TaskCompletion::Called);
445    }
446    
447    #[tokio::test]
448    async fn test_oneshot_cancelled() {
449        let (notifier, receiver) = Sender::new();
450        
451        tokio::spawn(async move {
452            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
453            notifier.notify(TaskCompletion::Cancelled);
454        });
455        
456        let result = receiver.wait().await;
457        assert_eq!(result, TaskCompletion::Cancelled);
458    }
459    
460    #[tokio::test]
461    async fn test_oneshot_immediate_called() {
462        let (notifier, receiver) = Sender::new();
463        
464        // Notify before waiting (fast path)
465        notifier.notify(TaskCompletion::Called);
466        
467        let result = receiver.wait().await;
468        assert_eq!(result, TaskCompletion::Called);
469    }
470    
471    #[tokio::test]
472    async fn test_oneshot_immediate_cancelled() {
473        let (notifier, receiver) = Sender::new();
474        
475        // Notify before waiting (fast path)
476        notifier.notify(TaskCompletion::Cancelled);
477        
478        let result = receiver.wait().await;
479        assert_eq!(result, TaskCompletion::Cancelled);
480    }
481    
482    // Test with custom state type
483    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
484    enum CustomState {
485        Success,
486        Failure,
487        Timeout,
488    }
489    
490    impl State for CustomState {
491        fn to_u8(&self) -> u8 {
492            match self {
493                CustomState::Success => 1,
494                CustomState::Failure => 2,
495                CustomState::Timeout => 3,
496            }
497        }
498        
499        fn from_u8(value: u8) -> Option<Self> {
500            match value {
501                1 => Some(CustomState::Success),
502                2 => Some(CustomState::Failure),
503                3 => Some(CustomState::Timeout),
504                _ => None,
505            }
506        }
507        
508        fn pending_value() -> u8 {
509            0
510        }
511    }
512    
513    #[tokio::test]
514    async fn test_oneshot_custom_state() {
515        let (notifier, receiver) = Sender::<CustomState>::new();
516        
517        tokio::spawn(async move {
518            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
519            notifier.notify(CustomState::Success);
520        });
521        
522        let result = receiver.wait().await;
523        assert_eq!(result, CustomState::Success);
524    }
525    
526    #[tokio::test]
527    async fn test_oneshot_custom_state_timeout() {
528        let (notifier, receiver) = Sender::<CustomState>::new();
529        
530        // Immediate notification
531        notifier.notify(CustomState::Timeout);
532        
533        let result = receiver.wait().await;
534        assert_eq!(result, CustomState::Timeout);
535    }
536    
537    #[tokio::test]
538    async fn test_oneshot_unit_type() {
539        let (notifier, receiver) = Sender::<()>::new();
540        
541        tokio::spawn(async move {
542            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
543            notifier.notify(());
544        });
545        
546        let result = receiver.wait().await;
547        assert_eq!(result, ());
548    }
549    
550    #[tokio::test]
551    async fn test_oneshot_unit_type_immediate() {
552        let (notifier, receiver) = Sender::<()>::new();
553        
554        // Immediate notification (fast path)
555        notifier.notify(());
556        
557        let result = receiver.wait().await;
558        assert_eq!(result, ());
559    }
560    
561    // Tests for IntoFuture implementation
562    #[tokio::test]
563    async fn test_oneshot_into_future_called() {
564        let (notifier, receiver) = Sender::new();
565        
566        tokio::spawn(async move {
567            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
568            notifier.notify(TaskCompletion::Called);
569        });
570        
571        // Direct await without .wait()
572        let result = receiver.await;
573        assert_eq!(result, TaskCompletion::Called);
574    }
575    
576    #[tokio::test]
577    async fn test_oneshot_into_future_immediate() {
578        let (notifier, receiver) = Sender::new();
579        
580        // Notify before awaiting (fast path)
581        notifier.notify(TaskCompletion::Cancelled);
582        
583        // Direct await
584        let result = receiver.await;
585        assert_eq!(result, TaskCompletion::Cancelled);
586    }
587    
588    #[tokio::test]
589    async fn test_oneshot_into_future_unit_type() {
590        let (notifier, receiver) = Sender::<()>::new();
591        
592        tokio::spawn(async move {
593            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
594            notifier.notify(());
595        });
596        
597        // Direct await with unit type
598        let result = receiver.await;
599        assert_eq!(result, ());
600    }
601    
602    #[tokio::test]
603    async fn test_oneshot_into_future_custom_state() {
604        let (notifier, receiver) = Sender::<CustomState>::new();
605        
606        tokio::spawn(async move {
607            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
608            notifier.notify(CustomState::Failure);
609        });
610        
611        // Direct await with custom state
612        let result = receiver.await;
613        assert_eq!(result, CustomState::Failure);
614    }
615    
616    // Test awaiting on &mut receiver
617    #[tokio::test]
618    async fn test_oneshot_await_mut_reference() {
619        let (notifier, mut receiver) = Sender::new();
620        
621        tokio::spawn(async move {
622            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
623            notifier.notify(TaskCompletion::Called);
624        });
625        
626        // Await on mutable reference
627        let result = (&mut receiver).await;
628        assert_eq!(result, TaskCompletion::Called);
629    }
630    
631    #[tokio::test]
632    async fn test_oneshot_await_mut_reference_unit_type() {
633        let (notifier, mut receiver) = Sender::<()>::new();
634        
635        // Immediate notification
636        notifier.notify(());
637        
638        // Await on mutable reference (fast path)
639        let result = (&mut receiver).await;
640        assert_eq!(result, ());
641    }
642}
643