lite_sync/oneshot/
lite.rs

1//! Lightweight oneshot channel for State-encodable types.
2//!
3//! 用于 State 可编码类型的轻量级一次性通道。
4
5use std::sync::atomic::{AtomicU8, Ordering};
6
7use super::common::{OneshotStorage, TakeResult};
8
9// Re-export common types
10pub use super::common::error;
11pub use super::common::RecvError;
12pub use super::common::TryRecvError;
13
14// ============================================================================
15// State Trait
16// ============================================================================
17
18/// Trait for types that can be used as oneshot state
19/// 
20/// Types implementing this trait can be converted to/from u8 for atomic storage.
21/// This allows for zero-allocation, lock-free state transitions.
22/// 
23/// 可用作 oneshot 状态的类型的 trait
24/// 
25/// 实现此 trait 的类型可以与 u8 互相转换以进行原子存储。
26/// 这允许零分配、无锁的状态转换。
27/// 
28/// # Built-in Implementations
29/// 
30/// - `()`: Simple completion notification without state
31/// 
32/// # Example: Custom State
33/// 
34/// ```
35/// use lite_sync::oneshot::lite::{State, Sender};
36/// 
37/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
38/// enum CustomState {
39///     Success,
40///     Failure,
41///     Timeout,
42/// }
43/// 
44/// impl State for CustomState {
45///     fn to_u8(&self) -> u8 {
46///         match self {
47///             CustomState::Success => 1,
48///             CustomState::Failure => 2,
49///             CustomState::Timeout => 3,
50///         }
51///     }
52///     
53///     fn from_u8(value: u8) -> Option<Self> {
54///         match value {
55///             1 => Some(CustomState::Success),
56///             2 => Some(CustomState::Failure),
57///             3 => Some(CustomState::Timeout),
58///             _ => None,
59///         }
60///     }
61///     
62///     fn pending_value() -> u8 {
63///         0
64///     }
65///     
66///     fn closed_value() -> u8 {
67///         255
68///     }
69///     
70///     fn receiver_closed_value() -> u8 {
71///         254
72///     }
73/// }
74/// 
75/// # tokio_test::block_on(async {
76/// // Usage:
77/// let (notifier, receiver) = Sender::<CustomState>::new();
78/// tokio::spawn(async move {
79///     notifier.send(CustomState::Success);
80/// });
81/// let result = receiver.await; // Direct await
82/// assert_eq!(result, Ok(CustomState::Success));
83/// # });
84/// ```
85pub trait State: Sized + Send + Sync + 'static {
86    /// Convert the state to u8 for atomic storage
87    /// 
88    /// 将状态转换为 u8 以进行原子存储
89    fn to_u8(&self) -> u8;
90    
91    /// Convert u8 back to the state type
92    /// 
93    /// Returns None if the value doesn't represent a valid state
94    /// 
95    /// 将 u8 转换回状态类型
96    /// 
97    /// 如果值不代表有效状态则返回 None
98    fn from_u8(value: u8) -> Option<Self>;
99    
100    /// The pending state value (before completion)
101    /// 
102    /// 待处理状态值(完成前)
103    fn pending_value() -> u8;
104    
105    /// The closed state value (sender was dropped without sending)
106    /// 
107    /// 已关闭状态值(发送器被丢弃而未发送)
108    fn closed_value() -> u8;
109    
110    /// The receiver closed state value
111    /// 
112    /// 接收器关闭状态值
113    fn receiver_closed_value() -> u8;
114}
115
116/// Implementation for unit type () - simple completion notification without state
117/// 
118/// 为单元类型 () 实现 - 简单的完成通知,无需状态信息
119impl State for () {
120    #[inline]
121    fn to_u8(&self) -> u8 {
122        1 // Completed
123    }
124    
125    #[inline]
126    fn from_u8(value: u8) -> Option<Self> {
127        match value {
128            1 => Some(()),
129            _ => None,
130        }
131    }
132    
133    #[inline]
134    fn pending_value() -> u8 {
135        0 // Pending
136    }
137    
138    #[inline]
139    fn closed_value() -> u8 {
140        255 // Sender closed
141    }
142    
143    #[inline]
144    fn receiver_closed_value() -> u8 {
145        254 // Receiver closed
146    }
147}
148
149// ============================================================================
150// Lite Storage
151// ============================================================================
152
153/// Storage for State-encodable types using only `AtomicU8`
154/// 
155/// 使用 `AtomicU8` 存储 State 可编码类型
156pub struct LiteStorage<S: State> {
157    state: AtomicU8,
158    _marker: std::marker::PhantomData<S>,
159}
160
161unsafe impl<S: State> Send for LiteStorage<S> {}
162unsafe impl<S: State> Sync for LiteStorage<S> {}
163
164impl<S: State> OneshotStorage for LiteStorage<S> {
165    type Value = S;
166    
167    #[inline]
168    fn new() -> Self {
169        Self {
170            state: AtomicU8::new(S::pending_value()),
171            _marker: std::marker::PhantomData,
172        }
173    }
174    
175    #[inline]
176    fn store(&self, value: S) {
177        self.state.store(value.to_u8(), Ordering::Release);
178    }
179    
180    #[inline]
181    fn try_take(&self) -> TakeResult<S> {
182        let current = self.state.load(Ordering::Acquire);
183        
184        if current == S::closed_value() || current == S::receiver_closed_value() {
185            return TakeResult::Closed;
186        }
187        
188        if current == S::pending_value() {
189            return TakeResult::Pending;
190        }
191        
192        // Value is ready
193        if let Some(state) = S::from_u8(current) {
194            TakeResult::Ready(state)
195        } else {
196            TakeResult::Pending
197        }
198    }
199    
200    #[inline]
201    fn is_sender_dropped(&self) -> bool {
202        self.state.load(Ordering::Acquire) == S::closed_value()
203    }
204    
205    #[inline]
206    fn mark_sender_dropped(&self) {
207        self.state.store(S::closed_value(), Ordering::Release);
208    }
209    
210    #[inline]
211    fn is_receiver_closed(&self) -> bool {
212        self.state.load(Ordering::Acquire) == S::receiver_closed_value()
213    }
214    
215    #[inline]
216    fn mark_receiver_closed(&self) {
217        self.state.store(S::receiver_closed_value(), Ordering::Release);
218    }
219}
220
221// ============================================================================
222// Type Aliases
223// ============================================================================
224
225/// Sender for one-shot state transfer
226/// 
227/// 用于一次性状态传递的发送器
228pub type Sender<S> = super::common::Sender<LiteStorage<S>>;
229
230/// Receiver for one-shot state transfer
231/// 
232/// 用于一次性状态传递的接收器
233pub type Receiver<S> = super::common::Receiver<LiteStorage<S>>;
234
235/// Create a new oneshot channel for State types
236/// 
237/// 创建一个用于 State 类型的新 oneshot 通道
238#[inline]
239pub fn channel<S: State>() -> (Sender<S>, Receiver<S>) {
240    Sender::new()
241}
242
243// ============================================================================
244// Receiver Extension Methods
245// ============================================================================
246
247impl<S: State> Receiver<S> {
248    /// Receive a value asynchronously
249    /// 
250    /// This is equivalent to using `.await` directly on the receiver
251    /// 
252    /// 异步接收一个值
253    /// 
254    /// 这等同于直接在 receiver 上使用 `.await`
255    #[inline]
256    pub async fn recv(self) -> Result<S, RecvError> {
257        self.await
258    }
259    
260    /// Try to receive a value without blocking
261    /// 
262    /// Returns `Ok(value)` if value is ready, `Err(TryRecvError::Empty)` if pending,
263    /// or `Err(TryRecvError::Closed)` if sender was dropped.
264    /// 
265    /// 尝试接收值而不阻塞
266    /// 
267    /// 如果值就绪返回 `Ok(value)`,如果待处理返回 `Err(TryRecvError::Empty)`,
268    /// 如果发送器被丢弃返回 `Err(TryRecvError::Closed)`
269    #[inline]
270    pub fn try_recv(&mut self) -> Result<S, TryRecvError> {
271        match self.inner.try_recv() {
272            TakeResult::Ready(v) => Ok(v),
273            TakeResult::Pending => Err(TryRecvError::Empty),
274            TakeResult::Closed => Err(TryRecvError::Closed),
275        }
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    
283    /// Test-only state type for completion notification
284    /// 
285    /// 测试专用的完成通知状态类型
286    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
287    enum TestCompletion {
288        /// Task was called/completed successfully
289        /// 
290        /// 任务被调用/成功完成
291        Called,
292        
293        /// Task was cancelled
294        /// 
295        /// 任务被取消
296        Cancelled,
297    }
298    
299    impl State for TestCompletion {
300        fn to_u8(&self) -> u8 {
301            match self {
302                TestCompletion::Called => 1,
303                TestCompletion::Cancelled => 2,
304            }
305        }
306        
307        fn from_u8(value: u8) -> Option<Self> {
308            match value {
309                1 => Some(TestCompletion::Called),
310                2 => Some(TestCompletion::Cancelled),
311                _ => None,
312            }
313        }
314        
315        fn pending_value() -> u8 {
316            0
317        }
318        
319        fn closed_value() -> u8 {
320            255
321        }
322        
323        fn receiver_closed_value() -> u8 {
324            254
325        }
326    }
327    
328    #[tokio::test]
329    async fn test_oneshot_called() {
330        let (notifier, receiver) = Sender::<TestCompletion>::new();
331        
332        tokio::spawn(async move {
333            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
334            notifier.send(TestCompletion::Called).unwrap();
335        });
336        
337        let result = receiver.recv().await;
338        assert_eq!(result, Ok(TestCompletion::Called));
339    }
340    
341    #[tokio::test]
342    async fn test_oneshot_cancelled() {
343        let (notifier, receiver) = Sender::<TestCompletion>::new();
344        
345        tokio::spawn(async move {
346            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
347            notifier.send(TestCompletion::Cancelled).unwrap();
348        });
349        
350        let result = receiver.recv().await;
351        assert_eq!(result, Ok(TestCompletion::Cancelled));
352    }
353    
354    #[tokio::test]
355    async fn test_oneshot_immediate_called() {
356        let (notifier, receiver) = Sender::<TestCompletion>::new();
357        
358        // Notify before waiting (fast path)
359        notifier.send(TestCompletion::Called).unwrap();
360        
361        let result = receiver.recv().await;
362        assert_eq!(result, Ok(TestCompletion::Called));
363    }
364    
365    #[tokio::test]
366    async fn test_oneshot_immediate_cancelled() {
367        let (notifier, receiver) = Sender::<TestCompletion>::new();
368        
369        // Notify before waiting (fast path)
370        notifier.send(TestCompletion::Cancelled).unwrap();
371        
372        let result = receiver.recv().await;
373        assert_eq!(result, Ok(TestCompletion::Cancelled));
374    }
375    
376    // Test with custom state type
377    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
378    enum CustomState {
379        Success,
380        Failure,
381        Timeout,
382    }
383    
384    impl State for CustomState {
385        fn to_u8(&self) -> u8 {
386            match self {
387                CustomState::Success => 1,
388                CustomState::Failure => 2,
389                CustomState::Timeout => 3,
390            }
391        }
392        
393        fn from_u8(value: u8) -> Option<Self> {
394            match value {
395                1 => Some(CustomState::Success),
396                2 => Some(CustomState::Failure),
397                3 => Some(CustomState::Timeout),
398                _ => None,
399            }
400        }
401        
402        fn pending_value() -> u8 {
403            0
404        }
405        
406        fn closed_value() -> u8 {
407            255
408        }
409        
410        fn receiver_closed_value() -> u8 {
411            254
412        }
413    }
414    
415    #[tokio::test]
416    async fn test_oneshot_custom_state() {
417        let (notifier, receiver) = Sender::<CustomState>::new();
418        
419        tokio::spawn(async move {
420            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
421            notifier.send(CustomState::Success).unwrap();
422        });
423        
424        let result = receiver.recv().await;
425        assert_eq!(result, Ok(CustomState::Success));
426    }
427    
428    #[tokio::test]
429    async fn test_oneshot_custom_state_timeout() {
430        let (notifier, receiver) = Sender::<CustomState>::new();
431        
432        // Immediate notification
433        notifier.send(CustomState::Timeout).unwrap();
434        
435        let result = receiver.recv().await;
436        assert_eq!(result, Ok(CustomState::Timeout));
437    }
438    
439    #[tokio::test]
440    async fn test_oneshot_unit_type() {
441        let (notifier, receiver) = Sender::<()>::new();
442        
443        tokio::spawn(async move {
444            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
445            notifier.send(()).unwrap();
446        });
447        
448        let result = receiver.recv().await;
449        assert_eq!(result, Ok(()));
450    }
451    
452    #[tokio::test]
453    async fn test_oneshot_unit_type_immediate() {
454        let (notifier, receiver) = Sender::<()>::new();
455        
456        // Immediate notification (fast path)
457        notifier.send(()).unwrap();
458        
459        let result = receiver.recv().await;
460        assert_eq!(result, Ok(()));
461    }
462    
463    // Tests for Future implementation (direct await)
464    #[tokio::test]
465    async fn test_oneshot_into_future_called() {
466        let (notifier, receiver) = Sender::<TestCompletion>::new();
467        
468        tokio::spawn(async move {
469            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
470            notifier.send(TestCompletion::Called).unwrap();
471        });
472        
473        // Direct await without .wait()
474        let result = receiver.await;
475        assert_eq!(result, Ok(TestCompletion::Called));
476    }
477    
478    #[tokio::test]
479    async fn test_oneshot_into_future_immediate() {
480        let (notifier, receiver) = Sender::<TestCompletion>::new();
481        
482        // Notify before awaiting (fast path)
483        notifier.send(TestCompletion::Cancelled).unwrap();
484        
485        // Direct await
486        let result = receiver.await;
487        assert_eq!(result, Ok(TestCompletion::Cancelled));
488    }
489    
490    #[tokio::test]
491    async fn test_oneshot_into_future_unit_type() {
492        let (notifier, receiver) = Sender::<()>::new();
493        
494        tokio::spawn(async move {
495            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
496            notifier.send(()).unwrap();
497        });
498        
499        // Direct await with unit type
500        let result = receiver.await;
501        assert_eq!(result, Ok(()));
502    }
503    
504    #[tokio::test]
505    async fn test_oneshot_into_future_custom_state() {
506        let (notifier, receiver) = Sender::<CustomState>::new();
507        
508        tokio::spawn(async move {
509            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
510            notifier.send(CustomState::Failure).unwrap();
511        });
512        
513        // Direct await with custom state
514        let result = receiver.await;
515        assert_eq!(result, Ok(CustomState::Failure));
516    }
517    
518    // Test awaiting on &mut receiver
519    #[tokio::test]
520    async fn test_oneshot_await_mut_reference() {
521        let (notifier, mut receiver) = Sender::<TestCompletion>::new();
522        
523        tokio::spawn(async move {
524            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
525            notifier.send(TestCompletion::Called).unwrap();
526        });
527        
528        // Await on mutable reference
529        let result = (&mut receiver).await;
530        assert_eq!(result, Ok(TestCompletion::Called));
531    }
532    
533    #[tokio::test]
534    async fn test_oneshot_await_mut_reference_unit_type() {
535        let (notifier, mut receiver) = Sender::<()>::new();
536        
537        // Immediate notification
538        notifier.send(()).unwrap();
539        
540        // Await on mutable reference (fast path)
541        let result = (&mut receiver).await;
542        assert_eq!(result, Ok(()));
543    }
544    
545    // Tests for try_recv
546    #[tokio::test]
547    async fn test_oneshot_try_recv_pending() {
548        let (_notifier, mut receiver) = Sender::<TestCompletion>::new();
549        
550        // Try receive before sending
551        let result = receiver.try_recv();
552        assert_eq!(result, Err(TryRecvError::Empty));
553    }
554    
555    #[tokio::test]
556    async fn test_oneshot_try_recv_ready() {
557        let (notifier, mut receiver) = Sender::<TestCompletion>::new();
558        
559        // Send value
560        notifier.send(TestCompletion::Called).unwrap();
561        
562        // Try receive after sending
563        let result = receiver.try_recv();
564        assert_eq!(result, Ok(TestCompletion::Called));
565    }
566    
567    #[tokio::test]
568    async fn test_oneshot_try_recv_sender_dropped() {
569        let (notifier, mut receiver) = Sender::<TestCompletion>::new();
570        
571        // Drop sender without sending
572        drop(notifier);
573        
574        // Try receive should return error
575        let result = receiver.try_recv();
576        assert_eq!(result, Err(TryRecvError::Closed));
577    }
578    
579    // Tests for sender dropped behavior
580    #[tokio::test]
581    async fn test_oneshot_sender_dropped_before_recv() {
582        let (notifier, receiver) = Sender::<TestCompletion>::new();
583        
584        // Drop sender without sending
585        drop(notifier);
586        
587        // Recv should return error
588        let result = receiver.recv().await;
589        assert_eq!(result, Err(RecvError));
590    }
591    
592    #[tokio::test]
593    async fn test_oneshot_sender_dropped_unit_type() {
594        let (notifier, receiver) = Sender::<()>::new();
595        
596        // Drop sender without sending
597        drop(notifier);
598        
599        // Recv should return error
600        let result = receiver.recv().await;
601        assert_eq!(result, Err(RecvError));
602    }
603    
604    #[tokio::test]
605    async fn test_oneshot_sender_dropped_custom_state() {
606        let (notifier, receiver) = Sender::<CustomState>::new();
607        
608        // Drop sender without sending
609        drop(notifier);
610        
611        // Recv should return error
612        let result = receiver.recv().await;
613        assert_eq!(result, Err(RecvError));
614    }
615    
616    // Tests for is_closed
617    #[test]
618    fn test_sender_is_closed_initially_false() {
619        let (sender, _receiver) = Sender::<()>::new();
620        assert!(!sender.is_closed());
621    }
622    
623    #[test]
624    fn test_sender_is_closed_after_receiver_drop() {
625        let (sender, receiver) = Sender::<()>::new();
626        drop(receiver);
627        assert!(sender.is_closed());
628    }
629    
630    #[test]
631    fn test_sender_is_closed_after_receiver_close() {
632        let (sender, mut receiver) = Sender::<()>::new();
633        receiver.close();
634        assert!(sender.is_closed());
635    }
636    
637    // Tests for close
638    #[test]
639    fn test_receiver_close_prevents_send() {
640        let (sender, mut receiver) = Sender::<TestCompletion>::new();
641        receiver.close();
642        
643        // Send should fail after close
644        assert!(sender.send(TestCompletion::Called).is_err());
645    }
646    
647    // Tests for blocking_recv
648    #[test]
649    fn test_blocking_recv_immediate() {
650        let (sender, receiver) = Sender::<TestCompletion>::new();
651        
652        // Send before blocking_recv (fast path)
653        sender.send(TestCompletion::Called).unwrap();
654        
655        let result = receiver.blocking_recv();
656        assert_eq!(result, Ok(TestCompletion::Called));
657    }
658    
659    #[test]
660    fn test_blocking_recv_with_thread() {
661        let (sender, receiver) = Sender::<()>::new();
662        
663        std::thread::spawn(move || {
664            std::thread::sleep(std::time::Duration::from_millis(10));
665            sender.send(()).unwrap();
666        });
667        
668        let result = receiver.blocking_recv();
669        assert_eq!(result, Ok(()));
670    }
671    
672    #[test]
673    fn test_blocking_recv_sender_dropped() {
674        let (sender, receiver) = Sender::<()>::new();
675        
676        std::thread::spawn(move || {
677            std::thread::sleep(std::time::Duration::from_millis(10));
678            drop(sender);
679        });
680        
681        let result = receiver.blocking_recv();
682        assert_eq!(result, Err(RecvError));
683    }
684}
685