lite_sync/
oneshot.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use super::atomic_waker::AtomicWaker;
8
9/// Trait for types that can be used as oneshot state
10/// 
11/// Types implementing this trait can be converted to/from u8 for atomic storage.
12/// This allows for zero-allocation, lock-free state transitions.
13/// 
14/// 可用作 oneshot 状态的类型的 trait
15/// 
16/// 实现此 trait 的类型可以与 u8 互相转换以进行原子存储。
17/// 这允许零分配、无锁的状态转换。
18/// 
19/// # Built-in Implementations
20/// 
21/// - `()`: Simple completion notification without state
22/// 
23/// # Example: Custom State
24/// 
25/// ```
26/// use lite_sync::oneshot::{State, Sender};
27/// 
28/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
29/// enum CustomState {
30///     Success,
31///     Failure,
32///     Timeout,
33/// }
34/// 
35/// impl State for CustomState {
36///     fn to_u8(&self) -> u8 {
37///         match self {
38///             CustomState::Success => 1,
39///             CustomState::Failure => 2,
40///             CustomState::Timeout => 3,
41///         }
42///     }
43///     
44///     fn from_u8(value: u8) -> Option<Self> {
45///         match value {
46///             1 => Some(CustomState::Success),
47///             2 => Some(CustomState::Failure),
48///             3 => Some(CustomState::Timeout),
49///             _ => None,
50///         }
51///     }
52///     
53///     fn pending_value() -> u8 {
54///         0
55///     }
56/// }
57/// 
58/// # tokio_test::block_on(async {
59/// // Usage:
60/// let (notifier, receiver) = Sender::<CustomState>::new();
61/// tokio::spawn(async move {
62///     notifier.notify(CustomState::Success);
63/// });
64/// let result = receiver.await; // Direct await
65/// assert_eq!(result, CustomState::Success);
66/// # });
67/// ```
68pub trait State: Sized + Send + Sync + 'static {
69    /// Convert the state to u8 for atomic storage
70    /// 
71    /// 将状态转换为 u8 以进行原子存储
72    fn to_u8(&self) -> u8;
73    
74    /// Convert u8 back to the state type
75    /// 
76    /// Returns None if the value doesn't represent a valid state
77    /// 
78    /// 将 u8 转换回状态类型
79    /// 
80    /// 如果值不代表有效状态则返回 None
81    fn from_u8(value: u8) -> Option<Self>;
82    
83    /// The pending state value (before completion)
84    /// 
85    /// 待处理状态值(完成前)
86    fn pending_value() -> u8;
87}
88
89/// Implementation for unit type () - simple completion notification without state
90/// 
91/// 为单元类型 () 实现 - 简单的完成通知,无需状态信息
92impl State for () {
93    #[inline]
94    fn to_u8(&self) -> u8 {
95        1 // Completed
96    }
97    
98    #[inline]
99    fn from_u8(value: u8) -> Option<Self> {
100        match value {
101            1 => Some(()),
102            _ => None,
103        }
104    }
105    
106    #[inline]
107    fn pending_value() -> u8 {
108        0 // Pending
109    }
110}
111
112#[inline]
113pub fn channel<T: State>() -> (Sender<T>, Receiver<T>) {
114    let (notifier, receiver) = Sender::<T>::new();
115    (notifier, receiver)
116}
117
118/// Inner state for one-shot completion notification
119/// 
120/// Uses AtomicWaker for zero Box allocation waker storage:
121/// - Waker itself is just 2 pointers (16 bytes on 64-bit), no additional heap allocation
122/// - Atomic state machine ensures safe concurrent access
123/// - Reuses common AtomicWaker implementation
124/// 
125/// 一次性完成通知的内部状态
126/// 
127/// 使用 AtomicWaker 实现零 Box 分配的 waker 存储:
128/// - Waker 本身只是 2 个指针(64 位系统上 16 字节),无额外堆分配
129/// - 原子状态机确保安全的并发访问
130/// - 复用通用的 AtomicWaker 实现
131pub(crate) struct Inner<T: State> {
132    waker: AtomicWaker,
133    state: AtomicU8,
134    _marker: std::marker::PhantomData<T>,
135}
136
137impl<T: State> Inner<T> {
138    /// Create a new oneshot inner state
139    /// 
140    /// Extremely fast: just initializes empty waker and pending state
141    /// 
142    /// 创建一个新的 oneshot 内部状态
143    /// 
144    /// 极快:仅初始化空 waker 和待处理状态
145    #[inline]
146    pub(crate) fn new() -> Arc<Self> {
147        Arc::new(Self {
148            waker: AtomicWaker::new(),
149            state: AtomicU8::new(T::pending_value()),
150            _marker: std::marker::PhantomData,
151        })
152    }
153    
154    /// Send a completion notification (set state and wake)
155    /// 
156    /// 发送完成通知(设置状态并唤醒)
157    #[inline]
158    pub(crate) fn send(&self, state: T) {
159        // Store completion state first with Release ordering
160        self.state.store(state.to_u8(), Ordering::Release);
161        
162        // Wake the registered waker if any
163        self.waker.wake();
164    }
165    
166    /// Register a waker to be notified on completion
167    /// 
168    /// 注册一个 waker 以在完成时收到通知
169    #[inline]
170    fn register_waker(&self, waker: &std::task::Waker) {
171        self.waker.register(waker);
172    }
173}
174
175// PERFORMANCE OPTIMIZATION: No Drop implementation for Inner
176// Waker cleanup is handled by Receiver::drop instead, which is more efficient because:
177// 1. In the common case (sender notifies before receiver drops), waker is already consumed
178// 2. Only Receiver creates wakers, so only Receiver needs to clean them up
179// 3. This makes Inner::drop a complete no-op, eliminating atomic load overhead
180//
181// 性能优化:Inner 不实现 Drop
182// Waker 清理由 Receiver::drop 处理,这更高效因为:
183// 1. 在常见情况下(发送方在接收方 drop 前通知),waker 已被消费
184// 2. 只有 Receiver 创建 waker,所以只有 Receiver 需要清理它们
185// 3. 这使得 Inner::drop 完全成为 no-op,消除了原子加载开销
186
187/// Completion notifier for one-shot tasks
188/// 
189/// 一次性任务完成通知器
190pub struct Sender<T: State> {
191    inner: Arc<Inner<T>>,
192}
193
194impl<T: State> Sender<T> {
195    /// Create a new oneshot completion notifier with receiver
196    /// 
197    /// 创建一个新的 oneshot 完成通知器和接收器
198    /// 
199    /// # Returns
200    /// Returns a tuple of (notifier, receiver)
201    /// 
202    /// 返回 (通知器, 接收器) 元组
203    #[inline]
204    pub fn new() -> (Self, Receiver<T>) {
205        let inner = Inner::new();
206        
207        let notifier = Sender {
208            inner: inner.clone(),
209        };
210        let receiver = Receiver {
211            inner,
212        };
213        
214        (notifier, receiver)
215    }
216    
217    /// Notify completion with the given state
218    /// 
219    /// 使用给定状态通知完成
220    #[inline]
221    pub fn notify(&self, state: T) {
222        self.inner.send(state);
223    }
224}
225
226/// Completion receiver for one-shot tasks
227/// 
228/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
229/// 
230/// 一次性任务完成通知接收器
231/// 
232/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
233/// 
234/// # Examples
235/// 
236/// ## Using unit type for simple completion
237/// 
238/// ```
239/// use lite_sync::oneshot::Sender;
240/// 
241/// # tokio_test::block_on(async {
242/// let (notifier, receiver) = Sender::<()>::new();
243/// 
244/// tokio::spawn(async move {
245///     // ... do work ...
246///     notifier.notify(());  // Signal completion
247/// });
248/// 
249/// // Two equivalent ways to await:
250/// let result = receiver.await;               // Direct await via Future impl
251/// assert_eq!(result, ());
252/// # });
253/// ```
254/// 
255/// ## Awaiting on mutable reference
256/// 
257/// ```
258/// use lite_sync::oneshot::Sender;
259/// 
260/// # tokio_test::block_on(async {
261/// let (notifier, mut receiver) = Sender::<()>::new();
262/// 
263/// tokio::spawn(async move {
264///     // ... do work ...
265///     notifier.notify(());
266/// });
267/// 
268/// // Can also await on &mut receiver
269/// let result = (&mut receiver).await;
270/// assert_eq!(result, ());
271/// # });
272/// ```
273/// 
274/// ## Using custom state
275/// 
276/// ```
277/// use lite_sync::oneshot::{State, Sender};
278/// 
279/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
280/// enum CustomState {
281///     Success,
282///     Failure,
283///     Timeout,
284/// }
285/// 
286/// impl State for CustomState {
287///     fn to_u8(&self) -> u8 {
288///         match self {
289///             CustomState::Success => 1,
290///             CustomState::Failure => 2,
291///             CustomState::Timeout => 3,
292///         }
293///     }
294///     
295///     fn from_u8(value: u8) -> Option<Self> {
296///         match value {
297///             1 => Some(CustomState::Success),
298///             2 => Some(CustomState::Failure),
299///             3 => Some(CustomState::Timeout),
300///             _ => None,
301///         }
302///     }
303///     
304///     fn pending_value() -> u8 {
305///         0
306///     }
307/// }
308/// 
309/// # tokio_test::block_on(async {
310/// let (notifier, receiver) = Sender::<CustomState>::new();
311/// 
312/// tokio::spawn(async move {
313///     notifier.notify(CustomState::Success);
314/// });
315/// 
316/// match receiver.await {
317///     CustomState::Success => { /* Success! */ },
318///     CustomState::Failure => { /* Failed */ },
319///     CustomState::Timeout => { /* Timed out */ },
320/// }
321/// # });
322/// ```
323pub struct Receiver<T: State> {
324    inner: Arc<Inner<T>>,
325}
326
327// Receiver is Unpin because all its fields are Unpin
328impl<T: State> Unpin for Receiver<T> {}
329
330// Receiver drop is automatically handled by AtomicWaker's drop implementation
331// No need for explicit drop implementation
332//
333// Receiver 的 drop 由 AtomicWaker 的 drop 实现自动处理
334// 无需显式的 drop 实现
335
336impl<T: State> Receiver<T> {
337    /// Wait for task completion asynchronously
338    /// 
339    /// This is equivalent to using `.await` directly on the receiver
340    /// 
341    /// 异步等待任务完成
342    /// 
343    /// 这等同于直接在 receiver 上使用 `.await`
344    /// 
345    /// # Returns
346    /// Returns the completion state
347    /// 
348    /// # 返回值
349    /// 返回完成状态
350    #[inline]
351    pub async fn wait(self) -> T {
352        self.await
353    }
354}
355
356/// Direct Future implementation for Receiver
357/// 
358/// This allows both `receiver.await` and `(&mut receiver).await` to work
359/// 
360/// Optimized implementation:
361/// - Fast path: Immediate return if already completed (no allocation)
362/// - Slow path: Direct waker registration (no Box allocation, just copy two pointers)
363/// - No intermediate future state needed
364/// 
365/// 为 Receiver 直接实现 Future
366/// 
367/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
368/// 
369/// 优化实现:
370/// - 快速路径:如已完成则立即返回(无分配)
371/// - 慢速路径:直接注册 waker(无 Box 分配,只复制两个指针)
372/// - 无需中间 future 状态
373impl<T: State> Future for Receiver<T> {
374    type Output = T;
375    
376    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
377        // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
378        let this = self.get_mut();
379        
380        // Fast path: check if already completed
381        let current = this.inner.state.load(Ordering::Acquire);
382        if let Some(state) = T::from_u8(current) {
383            if current != T::pending_value() {
384                return Poll::Ready(state);
385            }
386        }
387        
388        // Slow path: register waker for notification
389        this.inner.register_waker(cx.waker());
390        
391        // Check again after registering waker to avoid race condition
392        // The sender might have completed between our first check and waker registration
393        let current = this.inner.state.load(Ordering::Acquire);
394        if let Some(state) = T::from_u8(current) {
395            if current != T::pending_value() {
396                return Poll::Ready(state);
397            }
398        }
399        
400        Poll::Pending
401    }
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407    
408    /// Test-only state type for completion notification
409    /// 
410    /// 测试专用的完成通知状态类型
411    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
412    enum TestCompletion {
413        /// Task was called/completed successfully
414        /// 
415        /// 任务被调用/成功完成
416        Called,
417        
418        /// Task was cancelled
419        /// 
420        /// 任务被取消
421        Cancelled,
422    }
423    
424    impl State for TestCompletion {
425        fn to_u8(&self) -> u8 {
426            match self {
427                TestCompletion::Called => 1,
428                TestCompletion::Cancelled => 2,
429            }
430        }
431        
432        fn from_u8(value: u8) -> Option<Self> {
433            match value {
434                1 => Some(TestCompletion::Called),
435                2 => Some(TestCompletion::Cancelled),
436                _ => None,
437            }
438        }
439        
440        fn pending_value() -> u8 {
441            0
442        }
443    }
444    
445    #[tokio::test]
446    async fn test_oneshot_called() {
447        let (notifier, receiver) = Sender::<TestCompletion>::new();
448        
449        tokio::spawn(async move {
450            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
451            notifier.notify(TestCompletion::Called);
452        });
453        
454        let result = receiver.wait().await;
455        assert_eq!(result, TestCompletion::Called);
456    }
457    
458    #[tokio::test]
459    async fn test_oneshot_cancelled() {
460        let (notifier, receiver) = Sender::<TestCompletion>::new();
461        
462        tokio::spawn(async move {
463            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
464            notifier.notify(TestCompletion::Cancelled);
465        });
466        
467        let result = receiver.wait().await;
468        assert_eq!(result, TestCompletion::Cancelled);
469    }
470    
471    #[tokio::test]
472    async fn test_oneshot_immediate_called() {
473        let (notifier, receiver) = Sender::<TestCompletion>::new();
474        
475        // Notify before waiting (fast path)
476        notifier.notify(TestCompletion::Called);
477        
478        let result = receiver.wait().await;
479        assert_eq!(result, TestCompletion::Called);
480    }
481    
482    #[tokio::test]
483    async fn test_oneshot_immediate_cancelled() {
484        let (notifier, receiver) = Sender::<TestCompletion>::new();
485        
486        // Notify before waiting (fast path)
487        notifier.notify(TestCompletion::Cancelled);
488        
489        let result = receiver.wait().await;
490        assert_eq!(result, TestCompletion::Cancelled);
491    }
492    
493    // Test with custom state type
494    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
495    enum CustomState {
496        Success,
497        Failure,
498        Timeout,
499    }
500    
501    impl State for CustomState {
502        fn to_u8(&self) -> u8 {
503            match self {
504                CustomState::Success => 1,
505                CustomState::Failure => 2,
506                CustomState::Timeout => 3,
507            }
508        }
509        
510        fn from_u8(value: u8) -> Option<Self> {
511            match value {
512                1 => Some(CustomState::Success),
513                2 => Some(CustomState::Failure),
514                3 => Some(CustomState::Timeout),
515                _ => None,
516            }
517        }
518        
519        fn pending_value() -> u8 {
520            0
521        }
522    }
523    
524    #[tokio::test]
525    async fn test_oneshot_custom_state() {
526        let (notifier, receiver) = Sender::<CustomState>::new();
527        
528        tokio::spawn(async move {
529            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
530            notifier.notify(CustomState::Success);
531        });
532        
533        let result = receiver.wait().await;
534        assert_eq!(result, CustomState::Success);
535    }
536    
537    #[tokio::test]
538    async fn test_oneshot_custom_state_timeout() {
539        let (notifier, receiver) = Sender::<CustomState>::new();
540        
541        // Immediate notification
542        notifier.notify(CustomState::Timeout);
543        
544        let result = receiver.wait().await;
545        assert_eq!(result, CustomState::Timeout);
546    }
547    
548    #[tokio::test]
549    async fn test_oneshot_unit_type() {
550        let (notifier, receiver) = Sender::<()>::new();
551        
552        tokio::spawn(async move {
553            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
554            notifier.notify(());
555        });
556        
557        let result = receiver.wait().await;
558        assert_eq!(result, ());
559    }
560    
561    #[tokio::test]
562    async fn test_oneshot_unit_type_immediate() {
563        let (notifier, receiver) = Sender::<()>::new();
564        
565        // Immediate notification (fast path)
566        notifier.notify(());
567        
568        let result = receiver.wait().await;
569        assert_eq!(result, ());
570    }
571    
572    // Tests for Future implementation (direct await)
573    #[tokio::test]
574    async fn test_oneshot_into_future_called() {
575        let (notifier, receiver) = Sender::<TestCompletion>::new();
576        
577        tokio::spawn(async move {
578            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
579            notifier.notify(TestCompletion::Called);
580        });
581        
582        // Direct await without .wait()
583        let result = receiver.await;
584        assert_eq!(result, TestCompletion::Called);
585    }
586    
587    #[tokio::test]
588    async fn test_oneshot_into_future_immediate() {
589        let (notifier, receiver) = Sender::<TestCompletion>::new();
590        
591        // Notify before awaiting (fast path)
592        notifier.notify(TestCompletion::Cancelled);
593        
594        // Direct await
595        let result = receiver.await;
596        assert_eq!(result, TestCompletion::Cancelled);
597    }
598    
599    #[tokio::test]
600    async fn test_oneshot_into_future_unit_type() {
601        let (notifier, receiver) = Sender::<()>::new();
602        
603        tokio::spawn(async move {
604            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
605            notifier.notify(());
606        });
607        
608        // Direct await with unit type
609        let result = receiver.await;
610        assert_eq!(result, ());
611    }
612    
613    #[tokio::test]
614    async fn test_oneshot_into_future_custom_state() {
615        let (notifier, receiver) = Sender::<CustomState>::new();
616        
617        tokio::spawn(async move {
618            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
619            notifier.notify(CustomState::Failure);
620        });
621        
622        // Direct await with custom state
623        let result = receiver.await;
624        assert_eq!(result, CustomState::Failure);
625    }
626    
627    // Test awaiting on &mut receiver
628    #[tokio::test]
629    async fn test_oneshot_await_mut_reference() {
630        let (notifier, mut receiver) = Sender::<TestCompletion>::new();
631        
632        tokio::spawn(async move {
633            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
634            notifier.notify(TestCompletion::Called);
635        });
636        
637        // Await on mutable reference
638        let result = (&mut receiver).await;
639        assert_eq!(result, TestCompletion::Called);
640    }
641    
642    #[tokio::test]
643    async fn test_oneshot_await_mut_reference_unit_type() {
644        let (notifier, mut receiver) = Sender::<()>::new();
645        
646        // Immediate notification
647        notifier.notify(());
648        
649        // Await on mutable reference (fast path)
650        let result = (&mut receiver).await;
651        assert_eq!(result, ());
652    }
653}
654