lite_sync/oneshot/
generic.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use std::cell::UnsafeCell;
7use std::mem::MaybeUninit;
8
9use crate::atomic_waker::AtomicWaker;
10
11// States for the value cell
12const EMPTY: u8 = 0;   // No value stored
13const READY: u8 = 1;   // Value is ready
14
15/// Inner state for one-shot value transfer
16/// 
17/// Uses UnsafeCell<MaybeUninit<T>> for direct value storage without any overhead:
18/// - Waker itself is just 2 pointers (16 bytes on 64-bit), no additional heap allocation
19/// - Value is stored directly with zero overhead (no Option discriminant)
20/// - Atomic state machine (AtomicU8) tracks initialization state
21/// - UnsafeCell allows interior mutability with proper synchronization
22/// 
23/// 一次性值传递的内部状态
24/// 
25/// 使用 UnsafeCell<MaybeUninit<T>> 实现零开销的直接值存储:
26/// - Waker 本身只是 2 个指针(64 位系统上 16 字节),无额外堆分配
27/// - 值直接存储,零开销(无 Option 判别标记)
28/// - 原子状态机(AtomicU8)跟踪初始化状态
29/// - UnsafeCell 允许通过适当同步实现内部可变性
30pub(crate) struct Inner<T> {
31    waker: AtomicWaker,
32    state: AtomicU8,
33    value: UnsafeCell<MaybeUninit<T>>,
34}
35
36impl<T> Inner<T> {
37    /// Create a new oneshot inner state
38    /// 
39    /// Extremely fast: just initializes empty waker and uninitialized memory
40    /// 
41    /// 创建一个新的 oneshot 内部状态
42    /// 
43    /// 极快:仅初始化空 waker 和未初始化内存
44    #[inline]
45    pub(crate) fn new() -> Arc<Self> {
46        Arc::new(Self {
47            waker: AtomicWaker::new(),
48            state: AtomicU8::new(EMPTY),
49            value: UnsafeCell::new(MaybeUninit::uninit()),
50        })
51    }
52    
53    /// Send a value (store value and wake)
54    /// 
55    /// 发送一个值(存储值并唤醒)
56    #[inline]
57    pub(crate) fn send(&self, value: T) -> Result<(), T> {
58        // Try to transition from EMPTY to READY
59        match self.state.compare_exchange(
60            EMPTY,
61            READY,
62            Ordering::AcqRel,
63            Ordering::Acquire,
64        ) {
65            Ok(_) => {
66                // Successfully transitioned, store the value
67                // SAFETY: We have exclusive access via the state transition
68                // No other thread can access the value until state is READY
69                // We're initializing previously uninitialized memory
70                unsafe {
71                    (*self.value.get()).write(value);
72                }
73                
74                // Wake the registered waker if any
75                self.waker.wake();
76                Ok(())
77            }
78            Err(_) => {
79                // State was not EMPTY, value already sent
80                Err(value)
81            }
82        }
83    }
84    
85    /// Try to receive the value without blocking
86    /// 
87    /// 尝试接收值而不阻塞
88    #[inline]
89    fn try_recv(&self) -> Option<T> {
90        // Check if value is ready
91        if self.state.load(Ordering::Acquire) == READY {
92            // Try to transition from READY back to EMPTY to claim the value
93            match self.state.compare_exchange(
94                READY,
95                EMPTY,
96                Ordering::AcqRel,
97                Ordering::Acquire,
98            ) {
99                Ok(_) => {
100                    // Successfully claimed the value
101                    // SAFETY: We have exclusive access via the state transition
102                    // State was READY, so value must be initialized
103                    unsafe {
104                        Some((*self.value.get()).assume_init_read())
105                    }
106                }
107                Err(_) => {
108                    // Someone else already claimed it
109                    None
110                }
111            }
112        } else {
113            None
114        }
115    }
116    
117    /// Register a waker to be notified on completion
118    /// 
119    /// 注册一个 waker 以在完成时收到通知
120    #[inline]
121    fn register_waker(&self, waker: &std::task::Waker) {
122        self.waker.register(waker);
123    }
124}
125
126// SAFETY: Inner<T> is Send + Sync as long as T is Send
127// - UnsafeCell<MaybeUninit<T>> is protected by atomic state transitions
128// - Only one thread can access the value at a time (enforced by state machine)
129// - Value is only accessed when state is READY (initialized)
130// - Waker is properly synchronized via AtomicWaker
131//
132// SAFETY: 只要 T 是 Send,Inner<T> 就是 Send + Sync
133// - UnsafeCell<MaybeUninit<T>> 由原子状态转换保护
134// - 每次只有一个线程可以访问值(由状态机强制执行)
135// - 只有在状态为 READY(已初始化)时才访问值
136// - Waker 通过 AtomicWaker 正确同步
137unsafe impl<T: Send> Send for Inner<T> {}
138unsafe impl<T: Send> Sync for Inner<T> {}
139
140impl<T> Drop for Inner<T> {
141    fn drop(&mut self) {
142        // Clean up the value if it was sent but not received
143        // SAFETY: We have exclusive access in drop (&mut self)
144        // Only drop if state is READY (value was initialized)
145        if *self.state.get_mut() == READY {
146            unsafe {
147                // Value is initialized, must drop it
148                (*self.value.get()).assume_init_drop();
149            }
150        }
151        // If state is EMPTY, value is uninitialized, nothing to drop
152    }
153}
154
155impl<T> std::fmt::Debug for Inner<T> {
156    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157        let state = self.state.load(Ordering::Acquire);
158        let state_str = match state {
159            EMPTY => "Empty",
160            READY => "Ready",
161            _ => "Unknown",
162        };
163        f.debug_struct("Inner")
164            .field("state", &state_str)
165            .finish()
166    }
167}
168
169/// Error returned when the receiver is dropped before receiving a value
170/// 
171/// 当接收器在接收值之前被丢弃时返回的错误
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub struct RecvError;
174
175impl std::fmt::Display for RecvError {
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177        write!(f, "channel closed")
178    }
179}
180
181impl std::error::Error for RecvError {}
182
183#[inline]
184pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
185    Sender::new()
186}
187
188/// Sender for one-shot value transfer
189/// 
190/// 一次性值传递的发送器
191pub struct Sender<T> {
192    inner: Arc<Inner<T>>,
193}
194
195impl<T> std::fmt::Debug for Sender<T> {
196    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197        let state = self.inner.state.load(Ordering::Acquire);
198        let state_str = match state {
199            EMPTY => "Empty",
200            READY => "Ready",
201            _ => "Unknown",
202        };
203        f.debug_struct("Sender")
204            .field("state", &state_str)
205            .finish()
206    }
207}
208
209impl<T> Sender<T> {
210    /// Create a new oneshot sender with receiver
211    /// 
212    /// 创建一个新的 oneshot 发送器和接收器
213    /// 
214    /// # Returns
215    /// Returns a tuple of (sender, receiver)
216    /// 
217    /// 返回 (发送器, 接收器) 元组
218    #[inline]
219    pub fn new() -> (Self, Receiver<T>) {
220        let inner = Inner::new();
221        
222        let sender = Sender {
223            inner: inner.clone(),
224        };
225        let receiver = Receiver {
226            inner,
227        };
228        
229        (sender, receiver)
230    }
231    
232    /// Send a value through the channel
233    /// 
234    /// Returns `Err(value)` if the receiver was already dropped
235    /// 
236    /// 通过通道发送一个值
237    /// 
238    /// 如果接收器已被丢弃则返回 `Err(value)`
239    #[inline]
240    pub fn send(self, value: T) -> Result<(), T> {
241        self.inner.send(value)
242    }
243}
244
245/// Receiver for one-shot value transfer
246/// 
247/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
248/// 
249/// 一次性值传递的接收器
250/// 
251/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
252/// 
253/// # Examples
254/// 
255/// ## Basic usage
256/// 
257/// ```
258/// use lite_sync::oneshot::generic::Sender;
259/// 
260/// # tokio_test::block_on(async {
261/// let (sender, receiver) = Sender::<String>::new();
262/// 
263/// tokio::spawn(async move {
264///     sender.send("Hello, World!".to_string()).unwrap();
265/// });
266/// 
267/// // Direct await via Future impl
268/// let message = receiver.await;
269/// assert_eq!(message, "Hello, World!");
270/// # });
271/// ```
272/// 
273/// ## Awaiting on mutable reference
274/// 
275/// ```
276/// use lite_sync::oneshot::generic::Sender;
277/// 
278/// # tokio_test::block_on(async {
279/// let (sender, mut receiver) = Sender::<i32>::new();
280/// 
281/// tokio::spawn(async move {
282///     sender.send(42).unwrap();
283/// });
284/// 
285/// // Can also await on &mut receiver
286/// let value = (&mut receiver).await;
287/// assert_eq!(value, 42);
288/// # });
289/// ```
290pub struct Receiver<T> {
291    inner: Arc<Inner<T>>,
292}
293
294impl<T> std::fmt::Debug for Receiver<T> {
295    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296        let state = self.inner.state.load(Ordering::Acquire);
297        let state_str = match state {
298            EMPTY => "Empty",
299            READY => "Ready",
300            _ => "Unknown",
301        };
302        f.debug_struct("Receiver")
303            .field("state", &state_str)
304            .finish()
305    }
306}
307
308// Receiver is Unpin because all its fields are Unpin
309impl<T> Unpin for Receiver<T> {}
310
311impl<T> Receiver<T> {
312    /// Wait for value asynchronously
313    /// 
314    /// This is equivalent to using `.await` directly on the receiver
315    /// 
316    /// 异步等待值
317    /// 
318    /// 这等同于直接在 receiver 上使用 `.await`
319    /// 
320    /// # Returns
321    /// Returns the received value
322    /// 
323    /// # 返回值
324    /// 返回接收到的值
325    #[inline]
326    pub async fn wait(self) -> T {
327        self.await
328    }
329    
330    /// Try to receive a value without blocking
331    /// 
332    /// Returns `None` if no value has been sent yet
333    /// 
334    /// 尝试接收值而不阻塞
335    /// 
336    /// 如果还没有发送值则返回 `None`
337    #[inline]
338    pub fn try_recv(&mut self) -> Option<T> {
339        self.inner.try_recv()
340    }
341}
342
343/// Direct Future implementation for Receiver
344/// 
345/// This allows both `receiver.await` and `(&mut receiver).await` to work
346/// 
347/// Optimized implementation:
348/// - Fast path: Immediate return if value already sent (no allocation)
349/// - Slow path: Direct waker registration (no Box allocation, just copy two pointers)
350/// - No intermediate future state needed
351/// 
352/// 为 Receiver 直接实现 Future
353/// 
354/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
355/// 
356/// 优化实现:
357/// - 快速路径:如值已发送则立即返回(无分配)
358/// - 慢速路径:直接注册 waker(无 Box 分配,只复制两个指针)
359/// - 无需中间 future 状态
360impl<T> Future for Receiver<T> {
361    type Output = T;
362    
363    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
364        // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
365        let this = self.get_mut();
366        
367        // Fast path: check if value already sent
368        if let Some(value) = this.inner.try_recv() {
369            return Poll::Ready(value);
370        }
371        
372        // Slow path: register waker for notification
373        this.inner.register_waker(cx.waker());
374        
375        // Check again after registering waker to avoid race condition
376        // The sender might have sent between our first check and waker registration
377        if let Some(value) = this.inner.try_recv() {
378            return Poll::Ready(value);
379        }
380        
381        Poll::Pending
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388    
389    #[tokio::test]
390    async fn test_oneshot_string() {
391        let (sender, receiver) = Sender::<String>::new();
392        
393        tokio::spawn(async move {
394            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
395            sender.send("Hello".to_string()).unwrap();
396        });
397        
398        let result = receiver.wait().await;
399        assert_eq!(result, "Hello");
400    }
401    
402    #[tokio::test]
403    async fn test_oneshot_integer() {
404        let (sender, receiver) = Sender::<i32>::new();
405        
406        tokio::spawn(async move {
407            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
408            sender.send(42).unwrap();
409        });
410        
411        let result = receiver.wait().await;
412        assert_eq!(result, 42);
413    }
414    
415    #[tokio::test]
416    async fn test_oneshot_immediate() {
417        let (sender, receiver) = Sender::<String>::new();
418        
419        // Send before waiting (fast path)
420        sender.send("Immediate".to_string()).unwrap();
421        
422        let result = receiver.wait().await;
423        assert_eq!(result, "Immediate");
424    }
425    
426    #[tokio::test]
427    async fn test_oneshot_custom_struct() {
428        #[derive(Debug, Clone, PartialEq)]
429        struct CustomData {
430            id: u64,
431            name: String,
432        }
433        
434        let (sender, receiver) = Sender::<CustomData>::new();
435        
436        let data = CustomData {
437            id: 123,
438            name: "Test".to_string(),
439        };
440        
441        tokio::spawn(async move {
442            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
443            sender.send(data).unwrap();
444        });
445        
446        let result = receiver.wait().await;
447        assert_eq!(result.id, 123);
448        assert_eq!(result.name, "Test");
449    }
450    
451    #[tokio::test]
452    async fn test_oneshot_direct_await() {
453        let (sender, receiver) = Sender::<i32>::new();
454        
455        tokio::spawn(async move {
456            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
457            sender.send(99).unwrap();
458        });
459        
460        // Direct await without .wait()
461        let result = receiver.await;
462        assert_eq!(result, 99);
463    }
464    
465    #[tokio::test]
466    async fn test_oneshot_await_mut_reference() {
467        let (sender, mut receiver) = Sender::<String>::new();
468        
469        tokio::spawn(async move {
470            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
471            sender.send("Mutable".to_string()).unwrap();
472        });
473        
474        // Await on mutable reference
475        let result = (&mut receiver).await;
476        assert_eq!(result, "Mutable");
477    }
478    
479    #[tokio::test]
480    async fn test_oneshot_immediate_await() {
481        let (sender, receiver) = Sender::<Vec<u8>>::new();
482        
483        // Immediate send (fast path)
484        sender.send(vec![1, 2, 3]).unwrap();
485        
486        // Direct await
487        let result = receiver.await;
488        assert_eq!(result, vec![1, 2, 3]);
489    }
490    
491    #[tokio::test]
492    async fn test_oneshot_try_recv() {
493        let (sender, mut receiver) = Sender::<i32>::new();
494        
495        // Try receive before sending
496        assert_eq!(receiver.try_recv(), None);
497        
498        // Send value
499        sender.send(42).unwrap();
500        
501        // Try receive after sending
502        assert_eq!(receiver.try_recv(), Some(42));
503    }
504    
505    #[tokio::test]
506    async fn test_oneshot_large_data() {
507        let (sender, receiver) = Sender::<Vec<u8>>::new();
508        
509        let large_vec = vec![0u8; 1024 * 1024]; // 1MB
510        
511        tokio::spawn(async move {
512            sender.send(large_vec).unwrap();
513        });
514        
515        let result = receiver.await;
516        assert_eq!(result.len(), 1024 * 1024);
517    }
518}