lite_sync/
oneshot.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use super::atomic_waker::AtomicWaker;
8
9/// Trait for types that can be used as oneshot state
10/// 
11/// Types implementing this trait can be converted to/from u8 for atomic storage.
12/// This allows for zero-allocation, lock-free state transitions.
13/// 
14/// 可用作 oneshot 状态的类型的 trait
15/// 
16/// 实现此 trait 的类型可以与 u8 互相转换以进行原子存储。
17/// 这允许零分配、无锁的状态转换。
18/// 
19/// # Built-in Implementations
20/// 
21/// - `()`: Simple completion notification without state
22/// 
23/// # Example: Custom State
24/// 
25/// ```
26/// use lite_sync::oneshot::{State, Sender};
27/// 
28/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
29/// enum CustomState {
30///     Success,
31///     Failure,
32///     Timeout,
33/// }
34/// 
35/// impl State for CustomState {
36///     fn to_u8(&self) -> u8 {
37///         match self {
38///             CustomState::Success => 1,
39///             CustomState::Failure => 2,
40///             CustomState::Timeout => 3,
41///         }
42///     }
43///     
44///     fn from_u8(value: u8) -> Option<Self> {
45///         match value {
46///             1 => Some(CustomState::Success),
47///             2 => Some(CustomState::Failure),
48///             3 => Some(CustomState::Timeout),
49///             _ => None,
50///         }
51///     }
52///     
53///     fn pending_value() -> u8 {
54///         0
55///     }
56/// }
57/// 
58/// # tokio_test::block_on(async {
59/// // Usage:
60/// let (notifier, receiver) = Sender::<CustomState>::new();
61/// tokio::spawn(async move {
62///     notifier.notify(CustomState::Success);
63/// });
64/// let result = receiver.await; // Direct await
65/// assert_eq!(result, CustomState::Success);
66/// # });
67/// ```
68pub trait State: Sized + Send + Sync + 'static {
69    /// Convert the state to u8 for atomic storage
70    /// 
71    /// 将状态转换为 u8 以进行原子存储
72    fn to_u8(&self) -> u8;
73    
74    /// Convert u8 back to the state type
75    /// 
76    /// Returns None if the value doesn't represent a valid state
77    /// 
78    /// 将 u8 转换回状态类型
79    /// 
80    /// 如果值不代表有效状态则返回 None
81    fn from_u8(value: u8) -> Option<Self>;
82    
83    /// The pending state value (before completion)
84    /// 
85    /// 待处理状态值(完成前)
86    fn pending_value() -> u8;
87}
88
89/// Implementation for unit type () - simple completion notification without state
90/// 
91/// 为单元类型 () 实现 - 简单的完成通知,无需状态信息
92impl State for () {
93    #[inline]
94    fn to_u8(&self) -> u8 {
95        1 // Completed
96    }
97    
98    #[inline]
99    fn from_u8(value: u8) -> Option<Self> {
100        match value {
101            1 => Some(()),
102            _ => None,
103        }
104    }
105    
106    #[inline]
107    fn pending_value() -> u8 {
108        0 // Pending
109    }
110}
111
112#[inline]
113pub fn channel<T: State>() -> (Sender<T>, Receiver<T>) {
114    let (notifier, receiver) = Sender::<T>::new();
115    (notifier, receiver)
116}
117
118/// Inner state for one-shot completion notification
119/// 
120/// Uses AtomicWaker for zero Box allocation waker storage:
121/// - Waker itself is just 2 pointers (16 bytes on 64-bit), no additional heap allocation
122/// - Atomic state machine ensures safe concurrent access
123/// - Reuses common AtomicWaker implementation
124/// 
125/// 一次性完成通知的内部状态
126/// 
127/// 使用 AtomicWaker 实现零 Box 分配的 waker 存储:
128/// - Waker 本身只是 2 个指针(64 位系统上 16 字节),无额外堆分配
129/// - 原子状态机确保安全的并发访问
130/// - 复用通用的 AtomicWaker 实现
131pub(crate) struct Inner<T: State> {
132    waker: AtomicWaker,
133    state: AtomicU8,
134    _marker: std::marker::PhantomData<T>,
135}
136
137impl<T: State> Inner<T> {
138    /// Create a new oneshot inner state
139    /// 
140    /// Extremely fast: just initializes empty waker and pending state
141    /// 
142    /// 创建一个新的 oneshot 内部状态
143    /// 
144    /// 极快:仅初始化空 waker 和待处理状态
145    #[inline]
146    pub(crate) fn new() -> Arc<Self> {
147        Arc::new(Self {
148            waker: AtomicWaker::new(),
149            state: AtomicU8::new(T::pending_value()),
150            _marker: std::marker::PhantomData,
151        })
152    }
153    
154    /// Send a completion notification (set state and wake)
155    /// 
156    /// 发送完成通知(设置状态并唤醒)
157    #[inline]
158    pub(crate) fn send(&self, state: T) {
159        // Store completion state first with Release ordering
160        self.state.store(state.to_u8(), Ordering::Release);
161        
162        // Wake the registered waker if any
163        self.waker.wake();
164    }
165    
166    /// Register a waker to be notified on completion
167    /// 
168    /// 注册一个 waker 以在完成时收到通知
169    #[inline]
170    fn register_waker(&self, waker: &std::task::Waker) {
171        self.waker.register(waker);
172    }
173}
174
175// PERFORMANCE OPTIMIZATION: No Drop implementation for Inner
176// Waker cleanup is handled by Receiver::drop instead, which is more efficient because:
177// 1. In the common case (sender notifies before receiver drops), waker is already consumed
178// 2. Only Receiver creates wakers, so only Receiver needs to clean them up
179// 3. This makes Inner::drop a complete no-op, eliminating atomic load overhead
180//
181// 性能优化:Inner 不实现 Drop
182// Waker 清理由 Receiver::drop 处理,这更高效因为:
183// 1. 在常见情况下(发送方在接收方 drop 前通知),waker 已被消费
184// 2. 只有 Receiver 创建 waker,所以只有 Receiver 需要清理它们
185// 3. 这使得 Inner::drop 完全成为 no-op,消除了原子加载开销
186
187/// Completion notifier for one-shot tasks
188/// 
189/// 一次性任务完成通知器
190pub struct Sender<T: State> {
191    inner: Arc<Inner<T>>,
192}
193
194impl<T: State> Sender<T> {
195    /// Create a new oneshot completion notifier with receiver
196    /// 
197    /// 创建一个新的 oneshot 完成通知器和接收器
198    /// 
199    /// # Returns
200    /// Returns a tuple of (notifier, receiver)
201    /// 
202    /// 返回 (通知器, 接收器) 元组
203    #[inline]
204    pub fn new() -> (Self, Receiver<T>) {
205        let inner = Inner::new();
206        
207        let notifier = Sender {
208            inner: inner.clone(),
209        };
210        let receiver = Receiver {
211            inner,
212        };
213        
214        (notifier, receiver)
215    }
216    
217    /// Notify completion with the given state
218    /// 
219    /// 使用给定状态通知完成
220    #[inline]
221    pub fn notify(&self, state: T) {
222        self.inner.send(state);
223    }
224}
225
226/// Completion receiver for one-shot tasks
227/// 
228/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
229/// 
230/// 一次性任务完成通知接收器
231/// 
232/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
233/// 
234/// # Examples
235/// 
236/// ## Using unit type for simple completion
237/// 
238/// ```
239/// use lite_sync::oneshot::Sender;
240/// 
241/// # tokio_test::block_on(async {
242/// let (notifier, receiver) = Sender::<()>::new();
243/// 
244/// tokio::spawn(async move {
245///     // ... do work ...
246///     notifier.notify(());  // Signal completion
247/// });
248/// 
249/// // Two equivalent ways to await:
250/// let result = receiver.await;               // Direct await via Future impl
251/// assert_eq!(result, ());
252/// # });
253/// ```
254/// 
255/// ## Awaiting on mutable reference
256/// 
257/// ```
258/// use lite_sync::oneshot::Sender;
259/// 
260/// # tokio_test::block_on(async {
261/// let (notifier, mut receiver) = Sender::<()>::new();
262/// 
263/// tokio::spawn(async move {
264///     // ... do work ...
265///     notifier.notify(());
266/// });
267/// 
268/// // Can also await on &mut receiver
269/// let result = (&mut receiver).await;
270/// assert_eq!(result, ());
271/// # });
272/// ```
273/// 
274/// ## Using custom state
275/// 
276/// ```
277/// use lite_sync::oneshot::{State, Sender};
278/// 
279/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
280/// enum CustomState {
281///     Success,
282///     Failure,
283///     Timeout,
284/// }
285/// 
286/// impl State for CustomState {
287///     fn to_u8(&self) -> u8 {
288///         match self {
289///             CustomState::Success => 1,
290///             CustomState::Failure => 2,
291///             CustomState::Timeout => 3,
292///         }
293///     }
294///     
295///     fn from_u8(value: u8) -> Option<Self> {
296///         match value {
297///             1 => Some(CustomState::Success),
298///             2 => Some(CustomState::Failure),
299///             3 => Some(CustomState::Timeout),
300///             _ => None,
301///         }
302///     }
303///     
304///     fn pending_value() -> u8 {
305///         0
306///     }
307/// }
308/// 
309/// # tokio_test::block_on(async {
310/// let (notifier, receiver) = Sender::<CustomState>::new();
311/// 
312/// tokio::spawn(async move {
313///     notifier.notify(CustomState::Success);
314/// });
315/// 
316/// match receiver.await {
317///     CustomState::Success => { /* Success! */ },
318///     CustomState::Failure => { /* Failed */ },
319///     CustomState::Timeout => { /* Timed out */ },
320/// }
321/// # });
322/// ```
323pub struct Receiver<T: State> {
324    inner: Arc<Inner<T>>,
325}
326
327// Receiver is Unpin because all its fields are Unpin
328impl<T: State> Unpin for Receiver<T> {}
329
330// Receiver drop is automatically handled by AtomicWaker's drop implementation
331// No need for explicit drop implementation
332//
333// Receiver 的 drop 由 AtomicWaker 的 drop 实现自动处理
334// 无需显式的 drop 实现
335
336impl<T: State> Receiver<T> {
337    /// Wait for task completion asynchronously
338    /// 
339    /// This is equivalent to using `.await` directly on the receiver
340    /// 
341    /// 异步等待任务完成
342    /// 
343    /// 这等同于直接在 receiver 上使用 `.await`
344    /// 
345    /// # Returns
346    /// Returns the completion state
347    /// 
348    /// # 返回值
349    /// 返回完成状态
350    #[inline]
351    pub async fn wait(self) -> T {
352        self.await
353    }
354}
355
356/// Direct Future implementation for Receiver
357/// 
358/// This allows both `receiver.await` and `(&mut receiver).await` to work
359/// 
360/// Optimized implementation:
361/// - Fast path: Immediate return if already completed (no allocation)
362/// - Slow path: Direct waker registration (no Box allocation, just copy two pointers)
363/// - No intermediate future state needed
364/// 
365/// 为 Receiver 直接实现 Future
366/// 
367/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
368/// 
369/// 优化实现:
370/// - 快速路径:如已完成则立即返回(无分配)
371/// - 慢速路径:直接注册 waker(无 Box 分配,只复制两个指针)
372/// - 无需中间 future 状态
373impl<T: State> Future for Receiver<T> {
374    type Output = T;
375    
376    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
377        // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
378        let this = self.get_mut();
379        
380        // Fast path: check if already completed
381        let current = this.inner.state.load(Ordering::Acquire);
382        if let Some(state) = T::from_u8(current)
383            && current != T::pending_value() {
384                return Poll::Ready(state);
385            }
386        
387        // Slow path: register waker for notification
388        this.inner.register_waker(cx.waker());
389        
390        // Check again after registering waker to avoid race condition
391        // The sender might have completed between our first check and waker registration
392        let current = this.inner.state.load(Ordering::Acquire);
393        if let Some(state) = T::from_u8(current)
394            && current != T::pending_value() {
395                return Poll::Ready(state);
396            }
397        
398        Poll::Pending
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405    
406    /// Test-only state type for completion notification
407    /// 
408    /// 测试专用的完成通知状态类型
409    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
410    enum TestCompletion {
411        /// Task was called/completed successfully
412        /// 
413        /// 任务被调用/成功完成
414        Called,
415        
416        /// Task was cancelled
417        /// 
418        /// 任务被取消
419        Cancelled,
420    }
421    
422    impl State for TestCompletion {
423        fn to_u8(&self) -> u8 {
424            match self {
425                TestCompletion::Called => 1,
426                TestCompletion::Cancelled => 2,
427            }
428        }
429        
430        fn from_u8(value: u8) -> Option<Self> {
431            match value {
432                1 => Some(TestCompletion::Called),
433                2 => Some(TestCompletion::Cancelled),
434                _ => None,
435            }
436        }
437        
438        fn pending_value() -> u8 {
439            0
440        }
441    }
442    
443    #[tokio::test]
444    async fn test_oneshot_called() {
445        let (notifier, receiver) = Sender::<TestCompletion>::new();
446        
447        tokio::spawn(async move {
448            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
449            notifier.notify(TestCompletion::Called);
450        });
451        
452        let result = receiver.wait().await;
453        assert_eq!(result, TestCompletion::Called);
454    }
455    
456    #[tokio::test]
457    async fn test_oneshot_cancelled() {
458        let (notifier, receiver) = Sender::<TestCompletion>::new();
459        
460        tokio::spawn(async move {
461            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
462            notifier.notify(TestCompletion::Cancelled);
463        });
464        
465        let result = receiver.wait().await;
466        assert_eq!(result, TestCompletion::Cancelled);
467    }
468    
469    #[tokio::test]
470    async fn test_oneshot_immediate_called() {
471        let (notifier, receiver) = Sender::<TestCompletion>::new();
472        
473        // Notify before waiting (fast path)
474        notifier.notify(TestCompletion::Called);
475        
476        let result = receiver.wait().await;
477        assert_eq!(result, TestCompletion::Called);
478    }
479    
480    #[tokio::test]
481    async fn test_oneshot_immediate_cancelled() {
482        let (notifier, receiver) = Sender::<TestCompletion>::new();
483        
484        // Notify before waiting (fast path)
485        notifier.notify(TestCompletion::Cancelled);
486        
487        let result = receiver.wait().await;
488        assert_eq!(result, TestCompletion::Cancelled);
489    }
490    
491    // Test with custom state type
492    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
493    enum CustomState {
494        Success,
495        Failure,
496        Timeout,
497    }
498    
499    impl State for CustomState {
500        fn to_u8(&self) -> u8 {
501            match self {
502                CustomState::Success => 1,
503                CustomState::Failure => 2,
504                CustomState::Timeout => 3,
505            }
506        }
507        
508        fn from_u8(value: u8) -> Option<Self> {
509            match value {
510                1 => Some(CustomState::Success),
511                2 => Some(CustomState::Failure),
512                3 => Some(CustomState::Timeout),
513                _ => None,
514            }
515        }
516        
517        fn pending_value() -> u8 {
518            0
519        }
520    }
521    
522    #[tokio::test]
523    async fn test_oneshot_custom_state() {
524        let (notifier, receiver) = Sender::<CustomState>::new();
525        
526        tokio::spawn(async move {
527            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
528            notifier.notify(CustomState::Success);
529        });
530        
531        let result = receiver.wait().await;
532        assert_eq!(result, CustomState::Success);
533    }
534    
535    #[tokio::test]
536    async fn test_oneshot_custom_state_timeout() {
537        let (notifier, receiver) = Sender::<CustomState>::new();
538        
539        // Immediate notification
540        notifier.notify(CustomState::Timeout);
541        
542        let result = receiver.wait().await;
543        assert_eq!(result, CustomState::Timeout);
544    }
545    
546    #[tokio::test]
547    async fn test_oneshot_unit_type() {
548        let (notifier, receiver) = Sender::<()>::new();
549        
550        tokio::spawn(async move {
551            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
552            notifier.notify(());
553        });
554        
555        let result = receiver.wait().await;
556        assert_eq!(result, ());
557    }
558    
559    #[tokio::test]
560    async fn test_oneshot_unit_type_immediate() {
561        let (notifier, receiver) = Sender::<()>::new();
562        
563        // Immediate notification (fast path)
564        notifier.notify(());
565        
566        let result = receiver.wait().await;
567        assert_eq!(result, ());
568    }
569    
570    // Tests for Future implementation (direct await)
571    #[tokio::test]
572    async fn test_oneshot_into_future_called() {
573        let (notifier, receiver) = Sender::<TestCompletion>::new();
574        
575        tokio::spawn(async move {
576            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
577            notifier.notify(TestCompletion::Called);
578        });
579        
580        // Direct await without .wait()
581        let result = receiver.await;
582        assert_eq!(result, TestCompletion::Called);
583    }
584    
585    #[tokio::test]
586    async fn test_oneshot_into_future_immediate() {
587        let (notifier, receiver) = Sender::<TestCompletion>::new();
588        
589        // Notify before awaiting (fast path)
590        notifier.notify(TestCompletion::Cancelled);
591        
592        // Direct await
593        let result = receiver.await;
594        assert_eq!(result, TestCompletion::Cancelled);
595    }
596    
597    #[tokio::test]
598    async fn test_oneshot_into_future_unit_type() {
599        let (notifier, receiver) = Sender::<()>::new();
600        
601        tokio::spawn(async move {
602            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
603            notifier.notify(());
604        });
605        
606        // Direct await with unit type
607        let result = receiver.await;
608        assert_eq!(result, ());
609    }
610    
611    #[tokio::test]
612    async fn test_oneshot_into_future_custom_state() {
613        let (notifier, receiver) = Sender::<CustomState>::new();
614        
615        tokio::spawn(async move {
616            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
617            notifier.notify(CustomState::Failure);
618        });
619        
620        // Direct await with custom state
621        let result = receiver.await;
622        assert_eq!(result, CustomState::Failure);
623    }
624    
625    // Test awaiting on &mut receiver
626    #[tokio::test]
627    async fn test_oneshot_await_mut_reference() {
628        let (notifier, mut receiver) = Sender::<TestCompletion>::new();
629        
630        tokio::spawn(async move {
631            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
632            notifier.notify(TestCompletion::Called);
633        });
634        
635        // Await on mutable reference
636        let result = (&mut receiver).await;
637        assert_eq!(result, TestCompletion::Called);
638    }
639    
640    #[tokio::test]
641    async fn test_oneshot_await_mut_reference_unit_type() {
642        let (notifier, mut receiver) = Sender::<()>::new();
643        
644        // Immediate notification
645        notifier.notify(());
646        
647        // Await on mutable reference (fast path)
648        let result = (&mut receiver).await;
649        assert_eq!(result, ());
650    }
651}
652