lite_sync/oneshot/
generic.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use std::cell::UnsafeCell;
7use std::mem::MaybeUninit;
8
9use crate::atomic_waker::AtomicWaker;
10
11// States for the value cell
12const EMPTY: u8 = 0;   // No value stored
13const READY: u8 = 1;   // Value is ready
14
15/// Inner state for one-shot value transfer
16/// 
17/// Uses UnsafeCell<MaybeUninit<T>> for direct value storage without any overhead:
18/// - Waker itself is just 2 pointers (16 bytes on 64-bit), no additional heap allocation
19/// - Value is stored directly with zero overhead (no Option discriminant)
20/// - Atomic state machine (AtomicU8) tracks initialization state
21/// - UnsafeCell allows interior mutability with proper synchronization
22/// 
23/// 一次性值传递的内部状态
24/// 
25/// 使用 UnsafeCell<MaybeUninit<T>> 实现零开销的直接值存储:
26/// - Waker 本身只是 2 个指针(64 位系统上 16 字节),无额外堆分配
27/// - 值直接存储,零开销(无 Option 判别标记)
28/// - 原子状态机(AtomicU8)跟踪初始化状态
29/// - UnsafeCell 允许通过适当同步实现内部可变性
30pub(crate) struct Inner<T> {
31    waker: AtomicWaker,
32    state: AtomicU8,
33    value: UnsafeCell<MaybeUninit<T>>,
34}
35
36impl<T> Inner<T> {
37    /// Create a new oneshot inner state
38    /// 
39    /// Extremely fast: just initializes empty waker and uninitialized memory
40    /// 
41    /// 创建一个新的 oneshot 内部状态
42    /// 
43    /// 极快:仅初始化空 waker 和未初始化内存
44    #[inline]
45    pub(crate) fn new() -> Arc<Self> {
46        Arc::new(Self {
47            waker: AtomicWaker::new(),
48            state: AtomicU8::new(EMPTY),
49            value: UnsafeCell::new(MaybeUninit::uninit()),
50        })
51    }
52    
53    /// Send a value (store value and wake)
54    /// 
55    /// 发送一个值(存储值并唤醒)
56    #[inline]
57    pub(crate) fn send(&self, value: T) -> Result<(), T> {
58        // Try to transition from EMPTY to READY
59        match self.state.compare_exchange(
60            EMPTY,
61            READY,
62            Ordering::AcqRel,
63            Ordering::Acquire,
64        ) {
65            Ok(_) => {
66                // Successfully transitioned, store the value
67                // SAFETY: We have exclusive access via the state transition
68                // No other thread can access the value until state is READY
69                // We're initializing previously uninitialized memory
70                unsafe {
71                    (*self.value.get()).write(value);
72                }
73                
74                // Wake the registered waker if any
75                self.waker.wake();
76                Ok(())
77            }
78            Err(_) => {
79                // State was not EMPTY, value already sent
80                Err(value)
81            }
82        }
83    }
84    
85    /// Try to receive the value without blocking
86    /// 
87    /// 尝试接收值而不阻塞
88    #[inline]
89    fn try_recv(&self) -> Option<T> {
90        // Check if value is ready
91        if self.state.load(Ordering::Acquire) == READY {
92            // Try to transition from READY back to EMPTY to claim the value
93            match self.state.compare_exchange(
94                READY,
95                EMPTY,
96                Ordering::AcqRel,
97                Ordering::Acquire,
98            ) {
99                Ok(_) => {
100                    // Successfully claimed the value
101                    // SAFETY: We have exclusive access via the state transition
102                    // State was READY, so value must be initialized
103                    unsafe {
104                        Some((*self.value.get()).assume_init_read())
105                    }
106                }
107                Err(_) => {
108                    // Someone else already claimed it
109                    None
110                }
111            }
112        } else {
113            None
114        }
115    }
116    
117    /// Register a waker to be notified on completion
118    /// 
119    /// 注册一个 waker 以在完成时收到通知
120    #[inline]
121    fn register_waker(&self, waker: &std::task::Waker) {
122        self.waker.register(waker);
123    }
124}
125
126// SAFETY: Inner<T> is Send + Sync as long as T is Send
127// - UnsafeCell<MaybeUninit<T>> is protected by atomic state transitions
128// - Only one thread can access the value at a time (enforced by state machine)
129// - Value is only accessed when state is READY (initialized)
130// - Waker is properly synchronized via AtomicWaker
131//
132// SAFETY: 只要 T 是 Send,Inner<T> 就是 Send + Sync
133// - UnsafeCell<MaybeUninit<T>> 由原子状态转换保护
134// - 每次只有一个线程可以访问值(由状态机强制执行)
135// - 只有在状态为 READY(已初始化)时才访问值
136// - Waker 通过 AtomicWaker 正确同步
137unsafe impl<T: Send> Send for Inner<T> {}
138unsafe impl<T: Send> Sync for Inner<T> {}
139
140impl<T> Drop for Inner<T> {
141    fn drop(&mut self) {
142        // Clean up the value if it was sent but not received
143        // SAFETY: We have exclusive access in drop (&mut self)
144        // Only drop if state is READY (value was initialized)
145        if *self.state.get_mut() == READY {
146            unsafe {
147                // Value is initialized, must drop it
148                (*self.value.get()).assume_init_drop();
149            }
150        }
151        // If state is EMPTY, value is uninitialized, nothing to drop
152    }
153}
154
155/// Error returned when the receiver is dropped before receiving a value
156/// 
157/// 当接收器在接收值之前被丢弃时返回的错误
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159pub struct RecvError;
160
161impl std::fmt::Display for RecvError {
162    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163        write!(f, "channel closed")
164    }
165}
166
167impl std::error::Error for RecvError {}
168
169#[inline]
170pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
171    Sender::new()
172}
173
174/// Sender for one-shot value transfer
175/// 
176/// 一次性值传递的发送器
177pub struct Sender<T> {
178    inner: Arc<Inner<T>>,
179}
180
181impl<T> Sender<T> {
182    /// Create a new oneshot sender with receiver
183    /// 
184    /// 创建一个新的 oneshot 发送器和接收器
185    /// 
186    /// # Returns
187    /// Returns a tuple of (sender, receiver)
188    /// 
189    /// 返回 (发送器, 接收器) 元组
190    #[inline]
191    pub fn new() -> (Self, Receiver<T>) {
192        let inner = Inner::new();
193        
194        let sender = Sender {
195            inner: inner.clone(),
196        };
197        let receiver = Receiver {
198            inner,
199        };
200        
201        (sender, receiver)
202    }
203    
204    /// Send a value through the channel
205    /// 
206    /// Returns `Err(value)` if the receiver was already dropped
207    /// 
208    /// 通过通道发送一个值
209    /// 
210    /// 如果接收器已被丢弃则返回 `Err(value)`
211    #[inline]
212    pub fn send(self, value: T) -> Result<(), T> {
213        self.inner.send(value)
214    }
215}
216
217/// Receiver for one-shot value transfer
218/// 
219/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
220/// 
221/// 一次性值传递的接收器
222/// 
223/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
224/// 
225/// # Examples
226/// 
227/// ## Basic usage
228/// 
229/// ```
230/// use lite_sync::oneshot::generic::Sender;
231/// 
232/// # tokio_test::block_on(async {
233/// let (sender, receiver) = Sender::<String>::new();
234/// 
235/// tokio::spawn(async move {
236///     sender.send("Hello, World!".to_string()).unwrap();
237/// });
238/// 
239/// // Direct await via Future impl
240/// let message = receiver.await;
241/// assert_eq!(message, "Hello, World!");
242/// # });
243/// ```
244/// 
245/// ## Awaiting on mutable reference
246/// 
247/// ```
248/// use lite_sync::oneshot::generic::Sender;
249/// 
250/// # tokio_test::block_on(async {
251/// let (sender, mut receiver) = Sender::<i32>::new();
252/// 
253/// tokio::spawn(async move {
254///     sender.send(42).unwrap();
255/// });
256/// 
257/// // Can also await on &mut receiver
258/// let value = (&mut receiver).await;
259/// assert_eq!(value, 42);
260/// # });
261/// ```
262pub struct Receiver<T> {
263    inner: Arc<Inner<T>>,
264}
265
266// Receiver is Unpin because all its fields are Unpin
267impl<T> Unpin for Receiver<T> {}
268
269impl<T> Receiver<T> {
270    /// Wait for value asynchronously
271    /// 
272    /// This is equivalent to using `.await` directly on the receiver
273    /// 
274    /// 异步等待值
275    /// 
276    /// 这等同于直接在 receiver 上使用 `.await`
277    /// 
278    /// # Returns
279    /// Returns the received value
280    /// 
281    /// # 返回值
282    /// 返回接收到的值
283    #[inline]
284    pub async fn wait(self) -> T {
285        self.await
286    }
287    
288    /// Try to receive a value without blocking
289    /// 
290    /// Returns `None` if no value has been sent yet
291    /// 
292    /// 尝试接收值而不阻塞
293    /// 
294    /// 如果还没有发送值则返回 `None`
295    #[inline]
296    pub fn try_recv(&mut self) -> Option<T> {
297        self.inner.try_recv()
298    }
299}
300
301/// Direct Future implementation for Receiver
302/// 
303/// This allows both `receiver.await` and `(&mut receiver).await` to work
304/// 
305/// Optimized implementation:
306/// - Fast path: Immediate return if value already sent (no allocation)
307/// - Slow path: Direct waker registration (no Box allocation, just copy two pointers)
308/// - No intermediate future state needed
309/// 
310/// 为 Receiver 直接实现 Future
311/// 
312/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
313/// 
314/// 优化实现:
315/// - 快速路径:如值已发送则立即返回(无分配)
316/// - 慢速路径:直接注册 waker(无 Box 分配,只复制两个指针)
317/// - 无需中间 future 状态
318impl<T> Future for Receiver<T> {
319    type Output = T;
320    
321    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
322        // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
323        let this = self.get_mut();
324        
325        // Fast path: check if value already sent
326        if let Some(value) = this.inner.try_recv() {
327            return Poll::Ready(value);
328        }
329        
330        // Slow path: register waker for notification
331        this.inner.register_waker(cx.waker());
332        
333        // Check again after registering waker to avoid race condition
334        // The sender might have sent between our first check and waker registration
335        if let Some(value) = this.inner.try_recv() {
336            return Poll::Ready(value);
337        }
338        
339        Poll::Pending
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346    
347    #[tokio::test]
348    async fn test_oneshot_string() {
349        let (sender, receiver) = Sender::<String>::new();
350        
351        tokio::spawn(async move {
352            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
353            sender.send("Hello".to_string()).unwrap();
354        });
355        
356        let result = receiver.wait().await;
357        assert_eq!(result, "Hello");
358    }
359    
360    #[tokio::test]
361    async fn test_oneshot_integer() {
362        let (sender, receiver) = Sender::<i32>::new();
363        
364        tokio::spawn(async move {
365            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
366            sender.send(42).unwrap();
367        });
368        
369        let result = receiver.wait().await;
370        assert_eq!(result, 42);
371    }
372    
373    #[tokio::test]
374    async fn test_oneshot_immediate() {
375        let (sender, receiver) = Sender::<String>::new();
376        
377        // Send before waiting (fast path)
378        sender.send("Immediate".to_string()).unwrap();
379        
380        let result = receiver.wait().await;
381        assert_eq!(result, "Immediate");
382    }
383    
384    #[tokio::test]
385    async fn test_oneshot_custom_struct() {
386        #[derive(Debug, Clone, PartialEq)]
387        struct CustomData {
388            id: u64,
389            name: String,
390        }
391        
392        let (sender, receiver) = Sender::<CustomData>::new();
393        
394        let data = CustomData {
395            id: 123,
396            name: "Test".to_string(),
397        };
398        
399        tokio::spawn(async move {
400            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
401            sender.send(data).unwrap();
402        });
403        
404        let result = receiver.wait().await;
405        assert_eq!(result.id, 123);
406        assert_eq!(result.name, "Test");
407    }
408    
409    #[tokio::test]
410    async fn test_oneshot_direct_await() {
411        let (sender, receiver) = Sender::<i32>::new();
412        
413        tokio::spawn(async move {
414            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
415            sender.send(99).unwrap();
416        });
417        
418        // Direct await without .wait()
419        let result = receiver.await;
420        assert_eq!(result, 99);
421    }
422    
423    #[tokio::test]
424    async fn test_oneshot_await_mut_reference() {
425        let (sender, mut receiver) = Sender::<String>::new();
426        
427        tokio::spawn(async move {
428            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
429            sender.send("Mutable".to_string()).unwrap();
430        });
431        
432        // Await on mutable reference
433        let result = (&mut receiver).await;
434        assert_eq!(result, "Mutable");
435    }
436    
437    #[tokio::test]
438    async fn test_oneshot_immediate_await() {
439        let (sender, receiver) = Sender::<Vec<u8>>::new();
440        
441        // Immediate send (fast path)
442        sender.send(vec![1, 2, 3]).unwrap();
443        
444        // Direct await
445        let result = receiver.await;
446        assert_eq!(result, vec![1, 2, 3]);
447    }
448    
449    #[tokio::test]
450    async fn test_oneshot_try_recv() {
451        let (sender, mut receiver) = Sender::<i32>::new();
452        
453        // Try receive before sending
454        assert_eq!(receiver.try_recv(), None);
455        
456        // Send value
457        sender.send(42).unwrap();
458        
459        // Try receive after sending
460        assert_eq!(receiver.try_recv(), Some(42));
461    }
462    
463    #[tokio::test]
464    async fn test_oneshot_large_data() {
465        let (sender, receiver) = Sender::<Vec<u8>>::new();
466        
467        let large_vec = vec![0u8; 1024 * 1024]; // 1MB
468        
469        tokio::spawn(async move {
470            sender.send(large_vec).unwrap();
471        });
472        
473        let result = receiver.await;
474        assert_eq!(result.len(), 1024 * 1024);
475    }
476}