lite_sync/oneshot/
common.rs

1//! Common types and traits shared between oneshot implementations.
2//!
3//! 一次性通道实现之间共享的公共类型和 trait。
4
5use crate::shim::sync::Arc;
6use core::fmt;
7use core::future::Future;
8use core::pin::Pin;
9use core::task::{Context, Poll};
10
11use crate::atomic_waker::AtomicWaker;
12
13// ============================================================================
14// Error Types
15// ============================================================================
16
17pub mod error {
18    //! Oneshot error types.
19
20    use core::fmt;
21
22    /// Error returned when the sender is dropped before sending a value
23    ///
24    /// 当发送器在发送值之前被丢弃时返回的错误
25    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
26    pub struct RecvError;
27
28    impl fmt::Display for RecvError {
29        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30            write!(f, "channel closed")
31        }
32    }
33
34    impl core::error::Error for RecvError {}
35
36    /// Error returned from `try_recv` when no value has been sent yet or channel is closed
37    ///
38    /// 当尚未发送值或通道已关闭时,`try_recv` 返回的错误
39    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
40    pub enum TryRecvError {
41        /// The channel is empty (no value sent yet)
42        ///
43        /// 通道为空(尚未发送值)
44        Empty,
45        /// The sender was dropped without sending a value
46        ///
47        /// 发送器在发送值之前被丢弃
48        Closed,
49    }
50
51    impl fmt::Display for TryRecvError {
52        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53            match self {
54                TryRecvError::Empty => write!(f, "channel empty"),
55                TryRecvError::Closed => write!(f, "channel closed"),
56            }
57        }
58    }
59
60    impl core::error::Error for TryRecvError {}
61}
62
63pub use self::error::RecvError;
64pub use self::error::TryRecvError;
65
66// ============================================================================
67// Storage Trait
68// ============================================================================
69
70/// Result of trying to take a value from storage
71#[derive(Debug, Clone, Copy, PartialEq, Eq)]
72pub enum TakeResult<T> {
73    /// Value is ready
74    Ready(T),
75    /// Value is not ready yet (pending)
76    Pending,
77    /// Sender was dropped without sending
78    Closed,
79}
80
81impl<T> TakeResult<T> {
82    #[inline]
83    pub fn ok(self) -> Option<T> {
84        match self {
85            TakeResult::Ready(v) => Some(v),
86            _ => None,
87        }
88    }
89
90    #[inline]
91    pub fn is_closed(&self) -> bool {
92        matches!(self, TakeResult::Closed)
93    }
94}
95
96/// Trait for oneshot value storage mechanisms
97///
98/// 一次性值存储机制的 trait
99pub trait OneshotStorage: Send + Sync + Sized {
100    /// The value type that can be stored
101    type Value: Send;
102
103    /// Create a new storage instance
104    fn new() -> Self;
105
106    /// Store a value (called by sender)
107    fn store(&self, value: Self::Value);
108
109    /// Try to take the stored value (called by receiver)
110    fn try_take(&self) -> TakeResult<Self::Value>;
111
112    /// Check if the sender was dropped without sending
113    fn is_sender_dropped(&self) -> bool;
114
115    /// Mark as sender dropped (called in Sender::drop)
116    fn mark_sender_dropped(&self);
117
118    /// Check if the receiver was closed
119    ///
120    /// 检查接收器是否已关闭
121    fn is_receiver_closed(&self) -> bool;
122
123    /// Mark as receiver closed (called in Receiver::close)
124    ///
125    /// 标记接收器已关闭
126    fn mark_receiver_closed(&self);
127}
128
129// ============================================================================
130// Inner State
131// ============================================================================
132
133/// Inner state for oneshot channel
134pub struct Inner<S: OneshotStorage> {
135    pub(crate) waker: AtomicWaker,
136    pub(crate) storage: S,
137}
138
139impl<S: OneshotStorage> Inner<S> {
140    #[inline]
141    pub fn new() -> Arc<Self> {
142        Arc::new(Self {
143            waker: AtomicWaker::new(),
144            storage: S::new(),
145        })
146    }
147
148    #[inline]
149    pub fn send(&self, value: S::Value) {
150        self.storage.store(value);
151        self.waker.wake();
152    }
153
154    #[inline]
155    pub fn try_recv(&self) -> TakeResult<S::Value> {
156        self.storage.try_take()
157    }
158
159    #[inline]
160    pub fn register_waker(&self, waker: &core::task::Waker) {
161        self.waker.register(waker);
162    }
163
164    #[inline]
165    pub fn is_sender_dropped(&self) -> bool {
166        self.storage.is_sender_dropped()
167    }
168}
169
170impl<S: OneshotStorage> fmt::Debug for Inner<S> {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        f.debug_struct("Inner").finish_non_exhaustive()
173    }
174}
175
176// ============================================================================
177// Sender
178// ============================================================================
179
180/// Sender for one-shot value transfer
181///
182/// 一次性值传递的发送器
183pub struct Sender<S: OneshotStorage> {
184    pub(crate) inner: Arc<Inner<S>>,
185}
186
187impl<S: OneshotStorage> fmt::Debug for Sender<S> {
188    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189        f.debug_struct("Sender").finish_non_exhaustive()
190    }
191}
192
193impl<S: OneshotStorage> Sender<S> {
194    /// Create a new oneshot sender with receiver
195    #[inline]
196    pub fn new() -> (Self, Receiver<S>) {
197        let inner = Inner::new();
198        let sender = Sender {
199            inner: inner.clone(),
200        };
201        let receiver = Receiver { inner };
202        (sender, receiver)
203    }
204
205    /// Send a value through the channel
206    ///
207    /// Returns `Err(value)` if the receiver was already dropped or closed.
208    #[inline]
209    pub fn send(self, value: S::Value) -> Result<(), S::Value> {
210        if self.is_closed() {
211            return Err(value);
212        }
213        self.send_unchecked(value);
214        Ok(())
215    }
216
217    /// Send a value without checking if receiver is dropped
218    ///
219    /// This is faster than `send()` as it skips the Arc reference count check.
220    #[inline]
221    pub fn send_unchecked(self, value: S::Value) {
222        self.inner.send(value);
223        core::mem::forget(self);
224    }
225
226    /// Check if the receiver has been closed or dropped
227    ///
228    /// 检查接收器是否已关闭或丢弃
229    #[inline]
230    pub fn is_closed(&self) -> bool {
231        // Receiver is closed if it was explicitly closed or dropped (Arc count == 1)
232        self.inner.storage.is_receiver_closed() || Arc::strong_count(&self.inner) == 1
233    }
234}
235
236impl<S: OneshotStorage> Drop for Sender<S> {
237    fn drop(&mut self) {
238        self.inner.storage.mark_sender_dropped();
239        self.inner.waker.wake();
240    }
241}
242
243// ============================================================================
244// Receiver
245// ============================================================================
246
247/// Receiver for one-shot value transfer
248///
249/// Implements `Future` directly for `.await` usage.
250///
251/// 一次性值传递的接收器
252///
253/// 直接实现了 `Future`,可直接使用 `.await`
254pub struct Receiver<S: OneshotStorage> {
255    pub(crate) inner: Arc<Inner<S>>,
256}
257
258impl<S: OneshotStorage> fmt::Debug for Receiver<S> {
259    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
260        f.debug_struct("Receiver").finish_non_exhaustive()
261    }
262}
263
264impl<S: OneshotStorage> Unpin for Receiver<S> {}
265
266impl<S: OneshotStorage> Receiver<S> {
267    /// Wait for value asynchronously
268    #[inline]
269    pub async fn wait(self) -> Result<S::Value, RecvError> {
270        self.await
271    }
272
273    /// Close the receiver, preventing any future messages from being sent.
274    ///
275    /// Any `send` operation which happens after this method returns is guaranteed
276    /// to fail. After calling `close`, `try_recv` should be called to receive
277    /// a value if one was sent before the call to `close` completed.
278    ///
279    /// 关闭接收器,阻止任何将来的消息发送。
280    ///
281    /// 在此方法返回后发生的任何 `send` 操作都保证失败。
282    /// 调用 `close` 后,应调用 `try_recv` 来接收在 `close` 完成之前发送的值。
283    #[inline]
284    pub fn close(&mut self) {
285        self.inner.storage.mark_receiver_closed();
286    }
287
288    /// Blocking receive, waiting for a value to be sent.
289    ///
290    /// This method is intended for use in synchronous code.
291    ///
292    /// # Panics
293    ///
294    /// This function panics if called within an asynchronous execution context.
295    ///
296    /// 阻塞接收,等待值被发送。
297    ///
298    /// 此方法用于同步代码中。
299    ///
300    /// # Panics
301    ///
302    /// 如果在异步执行上下文中调用此函数,则会 panic。
303    #[inline]
304    #[cfg(any(feature = "std", feature = "loom", test))]
305    pub fn blocking_recv(self) -> Result<S::Value, RecvError> {
306        use crate::shim::atomic::{AtomicBool, Ordering};
307        use core::task::{RawWaker, RawWakerVTable, Waker};
308
309        // Fast path: check if value is already ready
310        match self.inner.storage.try_take() {
311            TakeResult::Ready(value) => return Ok(value),
312            TakeResult::Closed => return Err(RecvError),
313            TakeResult::Pending => {}
314        }
315
316        // Create a thread-parker waker
317        struct ThreadParker {
318            thread: crate::shim::thread::Thread,
319            notified: AtomicBool,
320        }
321
322        const VTABLE: RawWakerVTable = RawWakerVTable::new(
323            |ptr| unsafe {
324                Arc::increment_strong_count(ptr as *const ThreadParker);
325                RawWaker::new(ptr, &VTABLE)
326            },
327            |ptr| unsafe {
328                let parker = Arc::from_raw(ptr as *const ThreadParker);
329                parker.notified.store(true, Ordering::Release);
330                parker.thread.unpark();
331            },
332            |ptr| unsafe {
333                let parker = &*(ptr as *const ThreadParker);
334                parker.notified.store(true, Ordering::Release);
335                parker.thread.unpark();
336            },
337            |ptr| unsafe {
338                Arc::decrement_strong_count(ptr as *const ThreadParker);
339            },
340        );
341
342        let parker = Arc::new(ThreadParker {
343            thread: crate::shim::thread::current(),
344            notified: AtomicBool::new(false),
345        });
346
347        let raw_waker = RawWaker::new(Arc::into_raw(parker.clone()) as *const (), &VTABLE);
348        let waker = unsafe { Waker::from_raw(raw_waker) };
349
350        // Register waker and poll
351        self.inner.register_waker(&waker);
352
353        loop {
354            match self.inner.storage.try_take() {
355                TakeResult::Ready(value) => return Ok(value),
356                TakeResult::Closed => return Err(RecvError),
357                TakeResult::Pending => {}
358            }
359
360            // Check if sender dropped
361            if Arc::strong_count(&self.inner) == 1 && self.inner.is_sender_dropped() {
362                return Err(RecvError);
363            }
364
365            // Park if not notified
366            if !parker.notified.swap(false, Ordering::Acquire) {
367                crate::shim::thread::park();
368            }
369        }
370    }
371}
372
373impl<S: OneshotStorage> Future for Receiver<S> {
374    type Output = Result<S::Value, RecvError>;
375
376    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
377        let this = self.get_mut();
378
379        // Fast path: check if value ready or closed
380        match this.inner.try_recv() {
381            TakeResult::Ready(value) => return Poll::Ready(Ok(value)),
382            TakeResult::Closed => return Poll::Ready(Err(RecvError)),
383            TakeResult::Pending => {}
384        }
385
386        // Slow path: register waker
387        this.inner.register_waker(cx.waker());
388
389        // Check again after registering waker
390        match this.inner.try_recv() {
391            TakeResult::Ready(value) => return Poll::Ready(Ok(value)),
392            TakeResult::Closed => return Poll::Ready(Err(RecvError)),
393            TakeResult::Pending => {}
394        }
395
396        // Also check via Arc count (for generic storage)
397        if Arc::strong_count(&this.inner) == 1 && this.inner.is_sender_dropped() {
398            return Poll::Ready(Err(RecvError));
399        }
400
401        Poll::Pending
402    }
403}
404
405/// Create a oneshot channel with the given storage type
406#[inline]
407pub fn channel<S: OneshotStorage>() -> (Sender<S>, Receiver<S>) {
408    Sender::new()
409}