lite_sync/oneshot/
lite.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use crate::atomic_waker::AtomicWaker;
8
9/// Trait for types that can be used as oneshot state
10/// 
11/// Types implementing this trait can be converted to/from u8 for atomic storage.
12/// This allows for zero-allocation, lock-free state transitions.
13/// 
14/// 可用作 oneshot 状态的类型的 trait
15/// 
16/// 实现此 trait 的类型可以与 u8 互相转换以进行原子存储。
17/// 这允许零分配、无锁的状态转换。
18/// 
19/// # Built-in Implementations
20/// 
21/// - `()`: Simple completion notification without state
22/// 
23/// # Example: Custom State
24/// 
25/// ```
26/// use lite_sync::oneshot::lite::{State, Sender};
27/// 
28/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
29/// enum CustomState {
30///     Success,
31///     Failure,
32///     Timeout,
33/// }
34/// 
35/// impl State for CustomState {
36///     fn to_u8(&self) -> u8 {
37///         match self {
38///             CustomState::Success => 1,
39///             CustomState::Failure => 2,
40///             CustomState::Timeout => 3,
41///         }
42///     }
43///     
44///     fn from_u8(value: u8) -> Option<Self> {
45///         match value {
46///             1 => Some(CustomState::Success),
47///             2 => Some(CustomState::Failure),
48///             3 => Some(CustomState::Timeout),
49///             _ => None,
50///         }
51///     }
52///     
53///     fn pending_value() -> u8 {
54///         0
55///     }
56/// }
57/// 
58/// # tokio_test::block_on(async {
59/// // Usage:
60/// let (notifier, receiver) = Sender::<CustomState>::new();
61/// tokio::spawn(async move {
62///     notifier.notify(CustomState::Success);
63/// });
64/// let result = receiver.await; // Direct await
65/// assert_eq!(result, CustomState::Success);
66/// # });
67/// ```
68pub trait State: Sized + Send + Sync + 'static {
69    /// Convert the state to u8 for atomic storage
70    /// 
71    /// 将状态转换为 u8 以进行原子存储
72    fn to_u8(&self) -> u8;
73    
74    /// Convert u8 back to the state type
75    /// 
76    /// Returns None if the value doesn't represent a valid state
77    /// 
78    /// 将 u8 转换回状态类型
79    /// 
80    /// 如果值不代表有效状态则返回 None
81    fn from_u8(value: u8) -> Option<Self>;
82    
83    /// The pending state value (before completion)
84    /// 
85    /// 待处理状态值(完成前)
86    fn pending_value() -> u8;
87}
88
89/// Implementation for unit type () - simple completion notification without state
90/// 
91/// 为单元类型 () 实现 - 简单的完成通知,无需状态信息
92impl State for () {
93    #[inline]
94    fn to_u8(&self) -> u8 {
95        1 // Completed
96    }
97    
98    #[inline]
99    fn from_u8(value: u8) -> Option<Self> {
100        match value {
101            1 => Some(()),
102            _ => None,
103        }
104    }
105    
106    #[inline]
107    fn pending_value() -> u8 {
108        0 // Pending
109    }
110}
111
112#[inline]
113pub fn channel<T: State>() -> (Sender<T>, Receiver<T>) {
114    let (notifier, receiver) = Sender::<T>::new();
115    (notifier, receiver)
116}
117
118/// Inner state for one-shot completion notification
119/// 
120/// Uses AtomicWaker for zero Box allocation waker storage:
121/// - Waker itself is just 2 pointers (16 bytes on 64-bit), no additional heap allocation
122/// - Atomic state machine ensures safe concurrent access
123/// - Reuses common AtomicWaker implementation
124/// 
125/// 一次性完成通知的内部状态
126/// 
127/// 使用 AtomicWaker 实现零 Box 分配的 waker 存储:
128/// - Waker 本身只是 2 个指针(64 位系统上 16 字节),无额外堆分配
129/// - 原子状态机确保安全的并发访问
130/// - 复用通用的 AtomicWaker 实现
131pub(crate) struct Inner<T: State> {
132    waker: AtomicWaker,
133    state: AtomicU8,
134    _marker: std::marker::PhantomData<T>,
135}
136
137impl<T: State> std::fmt::Debug for Inner<T> {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        let state = self.state.load(std::sync::atomic::Ordering::Acquire);
140        let is_pending = state == T::pending_value();
141        f.debug_struct("Inner")
142            .field("state", &state)
143            .field("is_pending", &is_pending)
144            .finish()
145    }
146}
147
148impl<T: State> Inner<T> {
149    /// Create a new oneshot inner state
150    /// 
151    /// Extremely fast: just initializes empty waker and pending state
152    /// 
153    /// 创建一个新的 oneshot 内部状态
154    /// 
155    /// 极快:仅初始化空 waker 和待处理状态
156    #[inline]
157    pub(crate) fn new() -> Arc<Self> {
158        Arc::new(Self {
159            waker: AtomicWaker::new(),
160            state: AtomicU8::new(T::pending_value()),
161            _marker: std::marker::PhantomData,
162        })
163    }
164    
165    /// Send a completion notification (set state and wake)
166    /// 
167    /// 发送完成通知(设置状态并唤醒)
168    #[inline]
169    pub(crate) fn send(&self, state: T) {
170        // Store completion state first with Release ordering
171        self.state.store(state.to_u8(), Ordering::Release);
172        
173        // Wake the registered waker if any
174        self.waker.wake();
175    }
176    
177    /// Register a waker to be notified on completion
178    /// 
179    /// 注册一个 waker 以在完成时收到通知
180    #[inline]
181    fn register_waker(&self, waker: &std::task::Waker) {
182        self.waker.register(waker);
183    }
184}
185
186// PERFORMANCE OPTIMIZATION: No Drop implementation for Inner
187// Waker cleanup is handled by Receiver::drop instead, which is more efficient because:
188// 1. In the common case (sender notifies before receiver drops), waker is already consumed
189// 2. Only Receiver creates wakers, so only Receiver needs to clean them up
190// 3. This makes Inner::drop a complete no-op, eliminating atomic load overhead
191//
192// 性能优化:Inner 不实现 Drop
193// Waker 清理由 Receiver::drop 处理,这更高效因为:
194// 1. 在常见情况下(发送方在接收方 drop 前通知),waker 已被消费
195// 2. 只有 Receiver 创建 waker,所以只有 Receiver 需要清理它们
196// 3. 这使得 Inner::drop 完全成为 no-op,消除了原子加载开销
197
198/// Completion notifier for one-shot tasks
199/// 
200/// 一次性任务完成通知器
201pub struct Sender<T: State> {
202    inner: Arc<Inner<T>>,
203}
204
205impl<T: State> std::fmt::Debug for Sender<T> {
206    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207        let state = self.inner.state.load(std::sync::atomic::Ordering::Acquire);
208        let is_pending = state == T::pending_value();
209        f.debug_struct("Sender")
210            .field("state", &state)
211            .field("is_pending", &is_pending)
212            .finish()
213    }
214}
215
216impl<T: State> Sender<T> {
217    /// Create a new oneshot completion notifier with receiver
218    /// 
219    /// 创建一个新的 oneshot 完成通知器和接收器
220    /// 
221    /// # Returns
222    /// Returns a tuple of (notifier, receiver)
223    /// 
224    /// 返回 (通知器, 接收器) 元组
225    #[inline]
226    pub fn new() -> (Self, Receiver<T>) {
227        let inner = Inner::new();
228        
229        let notifier = Sender {
230            inner: inner.clone(),
231        };
232        let receiver = Receiver {
233            inner,
234        };
235        
236        (notifier, receiver)
237    }
238    
239    /// Notify completion with the given state
240    /// 
241    /// 使用给定状态通知完成
242    #[inline]
243    pub fn notify(&self, state: T) {
244        self.inner.send(state);
245    }
246}
247
248/// Completion receiver for one-shot tasks
249/// 
250/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
251/// 
252/// 一次性任务完成通知接收器
253/// 
254/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
255/// 
256/// # Examples
257/// 
258/// ## Using unit type for simple completion
259/// 
260/// ```
261/// use lite_sync::oneshot::lite::Sender;
262/// 
263/// # tokio_test::block_on(async {
264/// let (notifier, receiver) = Sender::<()>::new();
265/// 
266/// tokio::spawn(async move {
267///     // ... do work ...
268///     notifier.notify(());  // Signal completion
269/// });
270/// 
271/// // Two equivalent ways to await:
272/// let result = receiver.await;               // Direct await via Future impl
273/// assert_eq!(result, ());
274/// # });
275/// ```
276/// 
277/// ## Awaiting on mutable reference
278/// 
279/// ```
280/// use lite_sync::oneshot::lite::Sender;
281/// 
282/// # tokio_test::block_on(async {
283/// let (notifier, mut receiver) = Sender::<()>::new();
284/// 
285/// tokio::spawn(async move {
286///     // ... do work ...
287///     notifier.notify(());
288/// });
289/// 
290/// // Can also await on &mut receiver
291/// let result = (&mut receiver).await;
292/// assert_eq!(result, ());
293/// # });
294/// ```
295/// 
296/// ## Using custom state
297/// 
298/// ```
299/// use lite_sync::oneshot::lite::{State, Sender};
300/// 
301/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
302/// enum CustomState {
303///     Success,
304///     Failure,
305///     Timeout,
306/// }
307/// 
308/// impl State for CustomState {
309///     fn to_u8(&self) -> u8 {
310///         match self {
311///             CustomState::Success => 1,
312///             CustomState::Failure => 2,
313///             CustomState::Timeout => 3,
314///         }
315///     }
316///     
317///     fn from_u8(value: u8) -> Option<Self> {
318///         match value {
319///             1 => Some(CustomState::Success),
320///             2 => Some(CustomState::Failure),
321///             3 => Some(CustomState::Timeout),
322///             _ => None,
323///         }
324///     }
325///     
326///     fn pending_value() -> u8 {
327///         0
328///     }
329/// }
330/// 
331/// # tokio_test::block_on(async {
332/// let (notifier, receiver) = Sender::<CustomState>::new();
333/// 
334/// tokio::spawn(async move {
335///     notifier.notify(CustomState::Success);
336/// });
337/// 
338/// match receiver.await {
339///     CustomState::Success => { /* Success! */ },
340///     CustomState::Failure => { /* Failed */ },
341///     CustomState::Timeout => { /* Timed out */ },
342/// }
343/// # });
344/// ```
345pub struct Receiver<T: State> {
346    inner: Arc<Inner<T>>,
347}
348
349impl<T: State> std::fmt::Debug for Receiver<T> {
350    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351        let state = self.inner.state.load(std::sync::atomic::Ordering::Acquire);
352        let is_pending = state == T::pending_value();
353        f.debug_struct("Receiver")
354            .field("state", &state)
355            .field("is_pending", &is_pending)
356            .finish()
357    }
358}
359
360// Receiver is Unpin because all its fields are Unpin
361impl<T: State> Unpin for Receiver<T> {}
362
363// Receiver drop is automatically handled by AtomicWaker's drop implementation
364// No need for explicit drop implementation
365//
366// Receiver 的 drop 由 AtomicWaker 的 drop 实现自动处理
367// 无需显式的 drop 实现
368
369impl<T: State> Receiver<T> {
370    /// Wait for task completion asynchronously
371    /// 
372    /// This is equivalent to using `.await` directly on the receiver
373    /// 
374    /// 异步等待任务完成
375    /// 
376    /// 这等同于直接在 receiver 上使用 `.await`
377    /// 
378    /// # Returns
379    /// Returns the completion state
380    /// 
381    /// # 返回值
382    /// 返回完成状态
383    #[inline]
384    pub async fn wait(self) -> T {
385        self.await
386    }
387}
388
389/// Direct Future implementation for Receiver
390/// 
391/// This allows both `receiver.await` and `(&mut receiver).await` to work
392/// 
393/// Optimized implementation:
394/// - Fast path: Immediate return if already completed (no allocation)
395/// - Slow path: Direct waker registration (no Box allocation, just copy two pointers)
396/// - No intermediate future state needed
397/// 
398/// 为 Receiver 直接实现 Future
399/// 
400/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
401/// 
402/// 优化实现:
403/// - 快速路径:如已完成则立即返回(无分配)
404/// - 慢速路径:直接注册 waker(无 Box 分配,只复制两个指针)
405/// - 无需中间 future 状态
406impl<T: State> Future for Receiver<T> {
407    type Output = T;
408    
409    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
410        // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
411        let this = self.get_mut();
412        
413        // Fast path: check if already completed
414        let current = this.inner.state.load(Ordering::Acquire);
415        if let Some(state) = T::from_u8(current)
416            && current != T::pending_value() {
417                return Poll::Ready(state);
418            }
419        
420        // Slow path: register waker for notification
421        this.inner.register_waker(cx.waker());
422        
423        // Check again after registering waker to avoid race condition
424        // The sender might have completed between our first check and waker registration
425        let current = this.inner.state.load(Ordering::Acquire);
426        if let Some(state) = T::from_u8(current)
427            && current != T::pending_value() {
428                return Poll::Ready(state);
429            }
430        
431        Poll::Pending
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    
439    /// Test-only state type for completion notification
440    /// 
441    /// 测试专用的完成通知状态类型
442    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
443    enum TestCompletion {
444        /// Task was called/completed successfully
445        /// 
446        /// 任务被调用/成功完成
447        Called,
448        
449        /// Task was cancelled
450        /// 
451        /// 任务被取消
452        Cancelled,
453    }
454    
455    impl State for TestCompletion {
456        fn to_u8(&self) -> u8 {
457            match self {
458                TestCompletion::Called => 1,
459                TestCompletion::Cancelled => 2,
460            }
461        }
462        
463        fn from_u8(value: u8) -> Option<Self> {
464            match value {
465                1 => Some(TestCompletion::Called),
466                2 => Some(TestCompletion::Cancelled),
467                _ => None,
468            }
469        }
470        
471        fn pending_value() -> u8 {
472            0
473        }
474    }
475    
476    #[tokio::test]
477    async fn test_oneshot_called() {
478        let (notifier, receiver) = Sender::<TestCompletion>::new();
479        
480        tokio::spawn(async move {
481            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
482            notifier.notify(TestCompletion::Called);
483        });
484        
485        let result = receiver.wait().await;
486        assert_eq!(result, TestCompletion::Called);
487    }
488    
489    #[tokio::test]
490    async fn test_oneshot_cancelled() {
491        let (notifier, receiver) = Sender::<TestCompletion>::new();
492        
493        tokio::spawn(async move {
494            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
495            notifier.notify(TestCompletion::Cancelled);
496        });
497        
498        let result = receiver.wait().await;
499        assert_eq!(result, TestCompletion::Cancelled);
500    }
501    
502    #[tokio::test]
503    async fn test_oneshot_immediate_called() {
504        let (notifier, receiver) = Sender::<TestCompletion>::new();
505        
506        // Notify before waiting (fast path)
507        notifier.notify(TestCompletion::Called);
508        
509        let result = receiver.wait().await;
510        assert_eq!(result, TestCompletion::Called);
511    }
512    
513    #[tokio::test]
514    async fn test_oneshot_immediate_cancelled() {
515        let (notifier, receiver) = Sender::<TestCompletion>::new();
516        
517        // Notify before waiting (fast path)
518        notifier.notify(TestCompletion::Cancelled);
519        
520        let result = receiver.wait().await;
521        assert_eq!(result, TestCompletion::Cancelled);
522    }
523    
524    // Test with custom state type
525    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
526    enum CustomState {
527        Success,
528        Failure,
529        Timeout,
530    }
531    
532    impl State for CustomState {
533        fn to_u8(&self) -> u8 {
534            match self {
535                CustomState::Success => 1,
536                CustomState::Failure => 2,
537                CustomState::Timeout => 3,
538            }
539        }
540        
541        fn from_u8(value: u8) -> Option<Self> {
542            match value {
543                1 => Some(CustomState::Success),
544                2 => Some(CustomState::Failure),
545                3 => Some(CustomState::Timeout),
546                _ => None,
547            }
548        }
549        
550        fn pending_value() -> u8 {
551            0
552        }
553    }
554    
555    #[tokio::test]
556    async fn test_oneshot_custom_state() {
557        let (notifier, receiver) = Sender::<CustomState>::new();
558        
559        tokio::spawn(async move {
560            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
561            notifier.notify(CustomState::Success);
562        });
563        
564        let result = receiver.wait().await;
565        assert_eq!(result, CustomState::Success);
566    }
567    
568    #[tokio::test]
569    async fn test_oneshot_custom_state_timeout() {
570        let (notifier, receiver) = Sender::<CustomState>::new();
571        
572        // Immediate notification
573        notifier.notify(CustomState::Timeout);
574        
575        let result = receiver.wait().await;
576        assert_eq!(result, CustomState::Timeout);
577    }
578    
579    #[tokio::test]
580    async fn test_oneshot_unit_type() {
581        let (notifier, receiver) = Sender::<()>::new();
582        
583        tokio::spawn(async move {
584            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
585            notifier.notify(());
586        });
587        
588        let result = receiver.wait().await;
589        assert_eq!(result, ());
590    }
591    
592    #[tokio::test]
593    async fn test_oneshot_unit_type_immediate() {
594        let (notifier, receiver) = Sender::<()>::new();
595        
596        // Immediate notification (fast path)
597        notifier.notify(());
598        
599        let result = receiver.wait().await;
600        assert_eq!(result, ());
601    }
602    
603    // Tests for Future implementation (direct await)
604    #[tokio::test]
605    async fn test_oneshot_into_future_called() {
606        let (notifier, receiver) = Sender::<TestCompletion>::new();
607        
608        tokio::spawn(async move {
609            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
610            notifier.notify(TestCompletion::Called);
611        });
612        
613        // Direct await without .wait()
614        let result = receiver.await;
615        assert_eq!(result, TestCompletion::Called);
616    }
617    
618    #[tokio::test]
619    async fn test_oneshot_into_future_immediate() {
620        let (notifier, receiver) = Sender::<TestCompletion>::new();
621        
622        // Notify before awaiting (fast path)
623        notifier.notify(TestCompletion::Cancelled);
624        
625        // Direct await
626        let result = receiver.await;
627        assert_eq!(result, TestCompletion::Cancelled);
628    }
629    
630    #[tokio::test]
631    async fn test_oneshot_into_future_unit_type() {
632        let (notifier, receiver) = Sender::<()>::new();
633        
634        tokio::spawn(async move {
635            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
636            notifier.notify(());
637        });
638        
639        // Direct await with unit type
640        let result = receiver.await;
641        assert_eq!(result, ());
642    }
643    
644    #[tokio::test]
645    async fn test_oneshot_into_future_custom_state() {
646        let (notifier, receiver) = Sender::<CustomState>::new();
647        
648        tokio::spawn(async move {
649            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
650            notifier.notify(CustomState::Failure);
651        });
652        
653        // Direct await with custom state
654        let result = receiver.await;
655        assert_eq!(result, CustomState::Failure);
656    }
657    
658    // Test awaiting on &mut receiver
659    #[tokio::test]
660    async fn test_oneshot_await_mut_reference() {
661        let (notifier, mut receiver) = Sender::<TestCompletion>::new();
662        
663        tokio::spawn(async move {
664            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
665            notifier.notify(TestCompletion::Called);
666        });
667        
668        // Await on mutable reference
669        let result = (&mut receiver).await;
670        assert_eq!(result, TestCompletion::Called);
671    }
672    
673    #[tokio::test]
674    async fn test_oneshot_await_mut_reference_unit_type() {
675        let (notifier, mut receiver) = Sender::<()>::new();
676        
677        // Immediate notification
678        notifier.notify(());
679        
680        // Await on mutable reference (fast path)
681        let result = (&mut receiver).await;
682        assert_eq!(result, ());
683    }
684}
685