lite_sync/oneshot/
common.rs

1//! Common types and traits shared between oneshot implementations.
2//!
3//! 一次性通道实现之间共享的公共类型和 trait。
4
5use std::fmt;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use crate::atomic_waker::AtomicWaker;
12
13// ============================================================================
14// Error Types
15// ============================================================================
16
17pub mod error {
18    //! Oneshot error types.
19
20    use std::fmt;
21
22    /// Error returned when the sender is dropped before sending a value
23    /// 
24    /// 当发送器在发送值之前被丢弃时返回的错误
25    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
26    pub struct RecvError;
27
28    impl fmt::Display for RecvError {
29        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30            write!(f, "channel closed")
31        }
32    }
33
34    impl std::error::Error for RecvError {}
35
36    /// Error returned from `try_recv` when no value has been sent yet or channel is closed
37    /// 
38    /// 当尚未发送值或通道已关闭时,`try_recv` 返回的错误
39    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
40    pub enum TryRecvError {
41        /// The channel is empty (no value sent yet)
42        /// 
43        /// 通道为空(尚未发送值)
44        Empty,
45        /// The sender was dropped without sending a value
46        /// 
47        /// 发送器在发送值之前被丢弃
48        Closed,
49    }
50
51    impl fmt::Display for TryRecvError {
52        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53            match self {
54                TryRecvError::Empty => write!(f, "channel empty"),
55                TryRecvError::Closed => write!(f, "channel closed"),
56            }
57        }
58    }
59
60    impl std::error::Error for TryRecvError {}
61}
62
63pub use self::error::RecvError;
64pub use self::error::TryRecvError;
65
66// ============================================================================
67// Storage Trait
68// ============================================================================
69
70/// Result of trying to take a value from storage
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum TakeResult<T> {
73    /// Value is ready
74    Ready(T),
75    /// Value is not ready yet (pending)
76    Pending,
77    /// Sender was dropped without sending
78    Closed,
79}
80
81impl<T> TakeResult<T> {
82    #[inline]
83    pub fn ok(self) -> Option<T> {
84        match self {
85            TakeResult::Ready(v) => Some(v),
86            _ => None,
87        }
88    }
89    
90    #[inline]
91    pub fn is_closed(&self) -> bool {
92        matches!(self, TakeResult::Closed)
93    }
94}
95
96/// Trait for oneshot value storage mechanisms
97/// 
98/// 一次性值存储机制的 trait
99pub trait OneshotStorage: Send + Sync + Sized {
100    /// The value type that can be stored
101    type Value: Send;
102    
103    /// Create a new storage instance
104    fn new() -> Self;
105    
106    /// Store a value (called by sender)
107    fn store(&self, value: Self::Value);
108    
109    /// Try to take the stored value (called by receiver)
110    fn try_take(&self) -> TakeResult<Self::Value>;
111    
112    /// Check if the sender was dropped without sending
113    fn is_sender_dropped(&self) -> bool;
114    
115    /// Mark as sender dropped (called in Sender::drop)
116    fn mark_sender_dropped(&self);
117    
118    /// Check if the receiver was closed
119    /// 
120    /// 检查接收器是否已关闭
121    fn is_receiver_closed(&self) -> bool;
122    
123    /// Mark as receiver closed (called in Receiver::close)
124    /// 
125    /// 标记接收器已关闭
126    fn mark_receiver_closed(&self);
127}
128
129// ============================================================================
130// Inner State
131// ============================================================================
132
133/// Inner state for oneshot channel
134pub struct Inner<S: OneshotStorage> {
135    pub(crate) waker: AtomicWaker,
136    pub(crate) storage: S,
137}
138
139impl<S: OneshotStorage> Inner<S> {
140    #[inline]
141    pub fn new() -> Arc<Self> {
142        Arc::new(Self {
143            waker: AtomicWaker::new(),
144            storage: S::new(),
145        })
146    }
147    
148    #[inline]
149    pub fn send(&self, value: S::Value) {
150        self.storage.store(value);
151        self.waker.wake();
152    }
153    
154    #[inline]
155    pub fn try_recv(&self) -> TakeResult<S::Value> {
156        self.storage.try_take()
157    }
158    
159    #[inline]
160    pub fn register_waker(&self, waker: &std::task::Waker) {
161        self.waker.register(waker);
162    }
163    
164    #[inline]
165    pub fn is_sender_dropped(&self) -> bool {
166        self.storage.is_sender_dropped()
167    }
168}
169
170impl<S: OneshotStorage> fmt::Debug for Inner<S> {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        f.debug_struct("Inner").finish_non_exhaustive()
173    }
174}
175
176// ============================================================================
177// Sender
178// ============================================================================
179
180/// Sender for one-shot value transfer
181/// 
182/// 一次性值传递的发送器
183pub struct Sender<S: OneshotStorage> {
184    pub(crate) inner: Arc<Inner<S>>,
185}
186
187impl<S: OneshotStorage> fmt::Debug for Sender<S> {
188    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189        f.debug_struct("Sender").finish_non_exhaustive()
190    }
191}
192
193impl<S: OneshotStorage> Sender<S> {
194    /// Create a new oneshot sender with receiver
195    #[inline]
196    pub fn new() -> (Self, Receiver<S>) {
197        let inner = Inner::new();
198        let sender = Sender { inner: inner.clone() };
199        let receiver = Receiver { inner };
200        (sender, receiver)
201    }
202    
203    /// Send a value through the channel
204    /// 
205    /// Returns `Err(value)` if the receiver was already dropped or closed.
206    #[inline]
207    pub fn send(self, value: S::Value) -> Result<(), S::Value> {
208        if self.is_closed() {
209            return Err(value);
210        }
211        self.send_unchecked(value);
212        Ok(())
213    }
214    
215    /// Send a value without checking if receiver is dropped
216    /// 
217    /// This is faster than `send()` as it skips the Arc reference count check.
218    #[inline]
219    pub fn send_unchecked(self, value: S::Value) {
220        self.inner.send(value);
221        std::mem::forget(self);
222    }
223    
224    /// Check if the receiver has been closed or dropped
225    /// 
226    /// 检查接收器是否已关闭或丢弃
227    #[inline]
228    pub fn is_closed(&self) -> bool {
229        // Receiver is closed if it was explicitly closed or dropped (Arc count == 1)
230        self.inner.storage.is_receiver_closed() || Arc::strong_count(&self.inner) == 1
231    }
232}
233
234impl<S: OneshotStorage> Drop for Sender<S> {
235    fn drop(&mut self) {
236        self.inner.storage.mark_sender_dropped();
237        self.inner.waker.wake();
238    }
239}
240
241// ============================================================================
242// Receiver
243// ============================================================================
244
245/// Receiver for one-shot value transfer
246/// 
247/// Implements `Future` directly for `.await` usage.
248/// 
249/// 一次性值传递的接收器
250/// 
251/// 直接实现了 `Future`,可直接使用 `.await`
252pub struct Receiver<S: OneshotStorage> {
253    pub(crate) inner: Arc<Inner<S>>,
254}
255
256impl<S: OneshotStorage> fmt::Debug for Receiver<S> {
257    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258        f.debug_struct("Receiver").finish_non_exhaustive()
259    }
260}
261
262impl<S: OneshotStorage> Unpin for Receiver<S> {}
263
264impl<S: OneshotStorage> Receiver<S> {
265    /// Wait for value asynchronously
266    #[inline]
267    pub async fn wait(self) -> Result<S::Value, RecvError> {
268        self.await
269    }
270    
271    /// Close the receiver, preventing any future messages from being sent.
272    /// 
273    /// Any `send` operation which happens after this method returns is guaranteed
274    /// to fail. After calling `close`, `try_recv` should be called to receive
275    /// a value if one was sent before the call to `close` completed.
276    /// 
277    /// 关闭接收器,阻止任何将来的消息发送。
278    /// 
279    /// 在此方法返回后发生的任何 `send` 操作都保证失败。
280    /// 调用 `close` 后,应调用 `try_recv` 来接收在 `close` 完成之前发送的值。
281    #[inline]
282    pub fn close(&mut self) {
283        self.inner.storage.mark_receiver_closed();
284    }
285    
286    /// Blocking receive, waiting for a value to be sent.
287    /// 
288    /// This method is intended for use in synchronous code.
289    /// 
290    /// # Panics
291    /// 
292    /// This function panics if called within an asynchronous execution context.
293    /// 
294    /// 阻塞接收,等待值被发送。
295    /// 
296    /// 此方法用于同步代码中。
297    /// 
298    /// # Panics
299    /// 
300    /// 如果在异步执行上下文中调用此函数,则会 panic。
301    #[inline]
302    pub fn blocking_recv(self) -> Result<S::Value, RecvError> {
303        use std::sync::atomic::{AtomicBool, Ordering};
304        use std::task::{RawWaker, RawWakerVTable, Waker};
305        
306        // Fast path: check if value is already ready
307        match self.inner.storage.try_take() {
308            TakeResult::Ready(value) => return Ok(value),
309            TakeResult::Closed => return Err(RecvError),
310            TakeResult::Pending => {}
311        }
312        
313        // Create a thread-parker waker
314        struct ThreadParker {
315            thread: std::thread::Thread,
316            notified: AtomicBool,
317        }
318        
319        const VTABLE: RawWakerVTable = RawWakerVTable::new(
320            |ptr| unsafe { 
321                Arc::increment_strong_count(ptr as *const ThreadParker);
322                RawWaker::new(ptr, &VTABLE)
323            },
324            |ptr| unsafe {
325                let parker = Arc::from_raw(ptr as *const ThreadParker);
326                parker.notified.store(true, Ordering::Release);
327                parker.thread.unpark();
328            },
329            |ptr| unsafe {
330                let parker = &*(ptr as *const ThreadParker);
331                parker.notified.store(true, Ordering::Release);
332                parker.thread.unpark();
333            },
334            |ptr| unsafe { Arc::decrement_strong_count(ptr as *const ThreadParker); },
335        );
336        
337        let parker = Arc::new(ThreadParker {
338            thread: std::thread::current(),
339            notified: AtomicBool::new(false),
340        });
341        
342        let raw_waker = RawWaker::new(Arc::into_raw(parker.clone()) as *const (), &VTABLE);
343        let waker = unsafe { Waker::from_raw(raw_waker) };
344        
345        // Register waker and poll
346        self.inner.register_waker(&waker);
347        
348        loop {
349            match self.inner.storage.try_take() {
350                TakeResult::Ready(value) => return Ok(value),
351                TakeResult::Closed => return Err(RecvError),
352                TakeResult::Pending => {}
353            }
354            
355            // Check if sender dropped
356            if Arc::strong_count(&self.inner) == 1 && self.inner.is_sender_dropped() {
357                return Err(RecvError);
358            }
359            
360            // Park if not notified
361            if !parker.notified.swap(false, Ordering::Acquire) {
362                std::thread::park();
363            }
364        }
365    }
366}
367
368impl<S: OneshotStorage> Future for Receiver<S> {
369    type Output = Result<S::Value, RecvError>;
370    
371    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
372        let this = self.get_mut();
373        
374        // Fast path: check if value ready or closed
375        match this.inner.try_recv() {
376            TakeResult::Ready(value) => return Poll::Ready(Ok(value)),
377            TakeResult::Closed => return Poll::Ready(Err(RecvError)),
378            TakeResult::Pending => {}
379        }
380        
381        // Slow path: register waker
382        this.inner.register_waker(cx.waker());
383        
384        // Check again after registering waker
385        match this.inner.try_recv() {
386            TakeResult::Ready(value) => return Poll::Ready(Ok(value)),
387            TakeResult::Closed => return Poll::Ready(Err(RecvError)),
388            TakeResult::Pending => {}
389        }
390        
391        // Also check via Arc count (for generic storage)
392        if Arc::strong_count(&this.inner) == 1 && this.inner.is_sender_dropped() {
393            return Poll::Ready(Err(RecvError));
394        }
395        
396        Poll::Pending
397    }
398}
399
400/// Create a oneshot channel with the given storage type
401#[inline]
402pub fn channel<S: OneshotStorage>() -> (Sender<S>, Receiver<S>) {
403    Sender::new()
404}