lite_sync/oneshot/
lite.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use crate::atomic_waker::AtomicWaker;
8
9/// Error returned when the sender is dropped before sending a value
10/// 
11/// 当发送器在发送值之前被丢弃时返回的错误
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub struct SendError;
14
15impl std::fmt::Display for SendError {
16    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
17        write!(f, "sender dropped")
18    }
19}
20
21impl std::error::Error for SendError {}
22
23/// Trait for types that can be used as oneshot state
24/// 
25/// Types implementing this trait can be converted to/from u8 for atomic storage.
26/// This allows for zero-allocation, lock-free state transitions.
27/// 
28/// 可用作 oneshot 状态的类型的 trait
29/// 
30/// 实现此 trait 的类型可以与 u8 互相转换以进行原子存储。
31/// 这允许零分配、无锁的状态转换。
32/// 
33/// # Built-in Implementations
34/// 
35/// - `()`: Simple completion notification without state
36/// 
37/// # Example: Custom State
38/// 
39/// ```
40/// use lite_sync::oneshot::lite::{State, Sender};
41/// 
42/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
43/// enum CustomState {
44///     Success,
45///     Failure,
46///     Timeout,
47/// }
48/// 
49/// impl State for CustomState {
50///     fn to_u8(&self) -> u8 {
51///         match self {
52///             CustomState::Success => 1,
53///             CustomState::Failure => 2,
54///             CustomState::Timeout => 3,
55///         }
56///     }
57///     
58///     fn from_u8(value: u8) -> Option<Self> {
59///         match value {
60///             1 => Some(CustomState::Success),
61///             2 => Some(CustomState::Failure),
62///             3 => Some(CustomState::Timeout),
63///             _ => None,
64///         }
65///     }
66///     
67///     fn pending_value() -> u8 {
68///         0
69///     }
70///     
71///     fn closed_value() -> u8 {
72///         255
73///     }
74/// }
75/// 
76/// # tokio_test::block_on(async {
77/// // Usage:
78/// let (notifier, receiver) = Sender::<CustomState>::new();
79/// tokio::spawn(async move {
80///     notifier.notify(CustomState::Success);
81/// });
82/// let result = receiver.await; // Direct await
83/// assert_eq!(result, Ok(CustomState::Success));
84/// # });
85/// ```
86pub trait State: Sized + Send + Sync + 'static {
87    /// Convert the state to u8 for atomic storage
88    /// 
89    /// 将状态转换为 u8 以进行原子存储
90    fn to_u8(&self) -> u8;
91    
92    /// Convert u8 back to the state type
93    /// 
94    /// Returns None if the value doesn't represent a valid state
95    /// 
96    /// 将 u8 转换回状态类型
97    /// 
98    /// 如果值不代表有效状态则返回 None
99    fn from_u8(value: u8) -> Option<Self>;
100    
101    /// The pending state value (before completion)
102    /// 
103    /// 待处理状态值(完成前)
104    fn pending_value() -> u8;
105    
106    /// The closed state value (sender was dropped without sending)
107    /// 
108    /// 已关闭状态值(发送器被丢弃而未发送)
109    fn closed_value() -> u8;
110}
111
112/// Implementation for unit type () - simple completion notification without state
113/// 
114/// 为单元类型 () 实现 - 简单的完成通知,无需状态信息
115impl State for () {
116    #[inline]
117    fn to_u8(&self) -> u8 {
118        1 // Completed
119    }
120    
121    #[inline]
122    fn from_u8(value: u8) -> Option<Self> {
123        match value {
124            1 => Some(()),
125            _ => None,
126        }
127    }
128    
129    #[inline]
130    fn pending_value() -> u8 {
131        0 // Pending
132    }
133    
134    #[inline]
135    fn closed_value() -> u8 {
136        255 // Closed
137    }
138}
139
140#[inline]
141pub fn channel<T: State>() -> (Sender<T>, Receiver<T>) {
142    let (notifier, receiver) = Sender::<T>::new();
143    (notifier, receiver)
144}
145
146/// Inner state for one-shot completion notification
147/// 
148/// Uses AtomicWaker for zero Box allocation waker storage:
149/// - Waker itself is just 2 pointers (16 bytes on 64-bit), no additional heap allocation
150/// - Atomic state machine ensures safe concurrent access
151/// - Reuses common AtomicWaker implementation
152/// 
153/// 一次性完成通知的内部状态
154/// 
155/// 使用 AtomicWaker 实现零 Box 分配的 waker 存储:
156/// - Waker 本身只是 2 个指针(64 位系统上 16 字节),无额外堆分配
157/// - 原子状态机确保安全的并发访问
158/// - 复用通用的 AtomicWaker 实现
159pub(crate) struct Inner<T: State> {
160    waker: AtomicWaker,
161    state: AtomicU8,
162    _marker: std::marker::PhantomData<T>,
163}
164
165impl<T: State> std::fmt::Debug for Inner<T> {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        let state = self.state.load(std::sync::atomic::Ordering::Acquire);
168        let is_pending = state == T::pending_value();
169        f.debug_struct("Inner")
170            .field("state", &state)
171            .field("is_pending", &is_pending)
172            .finish()
173    }
174}
175
176impl<T: State> Inner<T> {
177    /// Create a new oneshot inner state
178    /// 
179    /// Extremely fast: just initializes empty waker and pending state
180    /// 
181    /// 创建一个新的 oneshot 内部状态
182    /// 
183    /// 极快:仅初始化空 waker 和待处理状态
184    #[inline]
185    pub(crate) fn new() -> Arc<Self> {
186        Arc::new(Self {
187            waker: AtomicWaker::new(),
188            state: AtomicU8::new(T::pending_value()),
189            _marker: std::marker::PhantomData,
190        })
191    }
192    
193    /// Send a completion notification (set state and wake)
194    /// 
195    /// 发送完成通知(设置状态并唤醒)
196    #[inline]
197    pub(crate) fn send(&self, state: T) {
198        // Store completion state first with Release ordering
199        self.state.store(state.to_u8(), Ordering::Release);
200        
201        // Wake the registered waker if any
202        self.waker.wake();
203    }
204    
205    /// Register a waker to be notified on completion
206    /// 
207    /// 注册一个 waker 以在完成时收到通知
208    #[inline]
209    fn register_waker(&self, waker: &std::task::Waker) {
210        self.waker.register(waker);
211    }
212}
213
214// PERFORMANCE OPTIMIZATION: No Drop implementation for Inner
215// Waker cleanup is handled by Receiver::drop instead, which is more efficient because:
216// 1. In the common case (sender notifies before receiver drops), waker is already consumed
217// 2. Only Receiver creates wakers, so only Receiver needs to clean them up
218// 3. This makes Inner::drop a complete no-op, eliminating atomic load overhead
219//
220// 性能优化:Inner 不实现 Drop
221// Waker 清理由 Receiver::drop 处理,这更高效因为:
222// 1. 在常见情况下(发送方在接收方 drop 前通知),waker 已被消费
223// 2. 只有 Receiver 创建 waker,所以只有 Receiver 需要清理它们
224// 3. 这使得 Inner::drop 完全成为 no-op,消除了原子加载开销
225
226/// Completion notifier for one-shot tasks
227/// 
228/// 一次性任务完成通知器
229pub struct Sender<T: State> {
230    inner: Arc<Inner<T>>,
231}
232
233impl<T: State> std::fmt::Debug for Sender<T> {
234    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235        let state = self.inner.state.load(std::sync::atomic::Ordering::Acquire);
236        let is_pending = state == T::pending_value();
237        f.debug_struct("Sender")
238            .field("state", &state)
239            .field("is_pending", &is_pending)
240            .finish()
241    }
242}
243
244impl<T: State> Sender<T> {
245    /// Create a new oneshot completion notifier with receiver
246    /// 
247    /// 创建一个新的 oneshot 完成通知器和接收器
248    /// 
249    /// # Returns
250    /// Returns a tuple of (notifier, receiver)
251    /// 
252    /// 返回 (通知器, 接收器) 元组
253    #[inline]
254    pub fn new() -> (Self, Receiver<T>) {
255        let inner = Inner::new();
256        
257        let notifier = Sender {
258            inner: inner.clone(),
259        };
260        let receiver = Receiver {
261            inner,
262        };
263        
264        (notifier, receiver)
265    }
266    
267    /// Notify completion with the given state
268    /// 
269    /// 使用给定状态通知完成
270    #[inline]
271    pub fn notify(&self, state: T) {
272        self.inner.send(state);
273    }
274}
275
276impl<T: State> Drop for Sender<T> {
277    fn drop(&mut self) {
278        // Mark the channel as closed when sender is dropped, but only if no value was sent
279        // This allows receivers to detect that the sender is gone
280        // and return an error instead of waiting forever
281        //
282        // 当发送器被丢弃时标记通道为已关闭,但仅在没有发送值时
283        // 这允许接收器检测到发送器已消失
284        // 并返回错误而不是永远等待
285        
286        // Try to transition from PENDING to CLOSED
287        // If this fails, it means a value was already sent, so we don't need to do anything
288        if self.inner.state.compare_exchange(
289            T::pending_value(),
290            T::closed_value(),
291            Ordering::Release,
292            Ordering::Acquire,
293        ).is_ok() {
294            // Successfully marked as closed, wake any waiting receiver
295            self.inner.waker.wake();
296        }
297    }
298}
299
300/// Completion receiver for one-shot tasks
301/// 
302/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
303/// 
304/// 一次性任务完成通知接收器
305/// 
306/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
307/// 
308/// # Examples
309/// 
310/// ## Using unit type for simple completion
311/// 
312/// ```
313/// use lite_sync::oneshot::lite::Sender;
314/// 
315/// # tokio_test::block_on(async {
316/// let (notifier, receiver) = Sender::<()>::new();
317/// 
318/// tokio::spawn(async move {
319///     // ... do work ...
320///     notifier.notify(());  // Signal completion
321/// });
322/// 
323/// // Two equivalent ways to await:
324/// let result = receiver.await;               // Direct await via Future impl
325/// assert_eq!(result, Ok(()));
326/// # });
327/// ```
328/// 
329/// ## Awaiting on mutable reference
330/// 
331/// ```
332/// use lite_sync::oneshot::lite::Sender;
333/// 
334/// # tokio_test::block_on(async {
335/// let (notifier, mut receiver) = Sender::<()>::new();
336/// 
337/// tokio::spawn(async move {
338///     // ... do work ...
339///     notifier.notify(());
340/// });
341/// 
342/// // Can also await on &mut receiver
343/// let result = (&mut receiver).await;
344/// assert_eq!(result, Ok(()));
345/// # });
346/// ```
347/// 
348/// ## Using custom state
349/// 
350/// ```
351/// use lite_sync::oneshot::lite::{State, Sender};
352/// 
353/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
354/// enum CustomState {
355///     Success,
356///     Failure,
357///     Timeout,
358/// }
359/// 
360/// impl State for CustomState {
361///     fn to_u8(&self) -> u8 {
362///         match self {
363///             CustomState::Success => 1,
364///             CustomState::Failure => 2,
365///             CustomState::Timeout => 3,
366///         }
367///     }
368///     
369///     fn from_u8(value: u8) -> Option<Self> {
370///         match value {
371///             1 => Some(CustomState::Success),
372///             2 => Some(CustomState::Failure),
373///             3 => Some(CustomState::Timeout),
374///             _ => None,
375///         }
376///     }
377///     
378///     fn pending_value() -> u8 {
379///         0
380///     }
381///     
382///     fn closed_value() -> u8 {
383///         255
384///     }
385/// }
386/// 
387/// # tokio_test::block_on(async {
388/// let (notifier, receiver) = Sender::<CustomState>::new();
389/// 
390/// tokio::spawn(async move {
391///     notifier.notify(CustomState::Success);
392/// });
393/// 
394/// match receiver.await {
395///     Ok(CustomState::Success) => { /* Success! */ },
396///     Ok(CustomState::Failure) => { /* Failed */ },
397///     Ok(CustomState::Timeout) => { /* Timed out */ },
398///     Err(_) => { /* Sender dropped */ },
399/// }
400/// # });
401/// ```
402pub struct Receiver<T: State> {
403    inner: Arc<Inner<T>>,
404}
405
406impl<T: State> std::fmt::Debug for Receiver<T> {
407    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
408        let state = self.inner.state.load(std::sync::atomic::Ordering::Acquire);
409        let is_pending = state == T::pending_value();
410        f.debug_struct("Receiver")
411            .field("state", &state)
412            .field("is_pending", &is_pending)
413            .finish()
414    }
415}
416
417// Receiver is Unpin because all its fields are Unpin
418impl<T: State> Unpin for Receiver<T> {}
419
420// Receiver drop is automatically handled by AtomicWaker's drop implementation
421// No need for explicit drop implementation
422//
423// Receiver 的 drop 由 AtomicWaker 的 drop 实现自动处理
424// 无需显式的 drop 实现
425
426impl<T: State> Receiver<T> {
427    /// Receive a value asynchronously
428    /// 
429    /// This is equivalent to using `.await` directly on the receiver
430    /// 
431    /// Returns `Err(SendError)` if the sender was dropped before sending a value
432    /// 
433    /// 异步接收一个值
434    /// 
435    /// 这等同于直接在 receiver 上使用 `.await`
436    /// 
437    /// 如果发送器在发送值之前被丢弃则返回 `Err(SendError)`
438    /// 
439    /// # Returns
440    /// Returns the completion state or error if sender was dropped
441    /// 
442    /// # 返回值
443    /// 返回完成状态或发送器被丢弃时的错误
444    #[inline]
445    pub async fn recv(self) -> Result<T, SendError> {
446        self.await
447    }
448    
449    /// Try to receive a value without blocking
450    /// 
451    /// Returns `None` if no value has been sent yet
452    /// Returns `Err(SendError)` if the sender was dropped
453    /// 
454    /// 尝试接收值而不阻塞
455    /// 
456    /// 如果还没有发送值则返回 `None`
457    /// 如果发送器被丢弃则返回 `Err(SendError)`
458    /// 
459    /// # Returns
460    /// Returns `Some(value)` if value is ready, `None` if pending, or `Err(SendError)` if sender dropped
461    /// 
462    /// # 返回值
463    /// 如果值已就绪返回 `Some(value)`,如果待处理返回 `None`,如果发送器被丢弃返回 `Err(SendError)`
464    #[inline]
465    pub fn try_recv(&mut self) -> Result<Option<T>, SendError> {
466        let current = self.inner.state.load(Ordering::Acquire);
467        
468        // Check if sender was dropped
469        if current == T::closed_value() {
470            return Err(SendError);
471        }
472        
473        // Check if value is ready
474        if let Some(state) = T::from_u8(current)
475            && current != T::pending_value() {
476                return Ok(Some(state));
477            }
478        
479        Ok(None)
480    }
481}
482
483/// Direct Future implementation for Receiver
484/// 
485/// This allows both `receiver.await` and `(&mut receiver).await` to work
486/// 
487/// Optimized implementation:
488/// - Fast path: Immediate return if already completed (no allocation)
489/// - Slow path: Direct waker registration (no Box allocation, just copy two pointers)
490/// - No intermediate future state needed
491/// - Detects when sender is dropped and returns error
492/// 
493/// 为 Receiver 直接实现 Future
494/// 
495/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
496/// 
497/// 优化实现:
498/// - 快速路径:如已完成则立即返回(无分配)
499/// - 慢速路径:直接注册 waker(无 Box 分配,只复制两个指针)
500/// - 无需中间 future 状态
501/// - 检测发送器何时被丢弃并返回错误
502impl<T: State> Future for Receiver<T> {
503    type Output = Result<T, SendError>;
504    
505    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
506        // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
507        let this = self.get_mut();
508        
509        // Fast path: check if already completed or closed
510        let current = this.inner.state.load(Ordering::Acquire);
511        
512        // Check if sender was dropped
513        if current == T::closed_value() {
514            return Poll::Ready(Err(SendError));
515        }
516        
517        if let Some(state) = T::from_u8(current)
518            && current != T::pending_value() {
519                return Poll::Ready(Ok(state));
520            }
521        
522        // Slow path: register waker for notification
523        this.inner.register_waker(cx.waker());
524        
525        // Check again after registering waker to avoid race condition
526        // The sender might have completed between our first check and waker registration
527        let current = this.inner.state.load(Ordering::Acquire);
528        
529        // Check if sender was dropped
530        if current == T::closed_value() {
531            return Poll::Ready(Err(SendError));
532        }
533        
534        if let Some(state) = T::from_u8(current)
535            && current != T::pending_value() {
536                return Poll::Ready(Ok(state));
537            }
538        
539        Poll::Pending
540    }
541}
542
543#[cfg(test)]
544mod tests {
545    use super::*;
546    
547    /// Test-only state type for completion notification
548    /// 
549    /// 测试专用的完成通知状态类型
550    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
551    enum TestCompletion {
552        /// Task was called/completed successfully
553        /// 
554        /// 任务被调用/成功完成
555        Called,
556        
557        /// Task was cancelled
558        /// 
559        /// 任务被取消
560        Cancelled,
561    }
562    
563    impl State for TestCompletion {
564        fn to_u8(&self) -> u8 {
565            match self {
566                TestCompletion::Called => 1,
567                TestCompletion::Cancelled => 2,
568            }
569        }
570        
571        fn from_u8(value: u8) -> Option<Self> {
572            match value {
573                1 => Some(TestCompletion::Called),
574                2 => Some(TestCompletion::Cancelled),
575                _ => None,
576            }
577        }
578        
579        fn pending_value() -> u8 {
580            0
581        }
582        
583        fn closed_value() -> u8 {
584            255
585        }
586    }
587    
588    #[tokio::test]
589    async fn test_oneshot_called() {
590        let (notifier, receiver) = Sender::<TestCompletion>::new();
591        
592        tokio::spawn(async move {
593            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
594            notifier.notify(TestCompletion::Called);
595        });
596        
597        let result = receiver.recv().await;
598        assert_eq!(result, Ok(TestCompletion::Called));
599    }
600    
601    #[tokio::test]
602    async fn test_oneshot_cancelled() {
603        let (notifier, receiver) = Sender::<TestCompletion>::new();
604        
605        tokio::spawn(async move {
606            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
607            notifier.notify(TestCompletion::Cancelled);
608        });
609        
610        let result = receiver.recv().await;
611        assert_eq!(result, Ok(TestCompletion::Cancelled));
612    }
613    
614    #[tokio::test]
615    async fn test_oneshot_immediate_called() {
616        let (notifier, receiver) = Sender::<TestCompletion>::new();
617        
618        // Notify before waiting (fast path)
619        notifier.notify(TestCompletion::Called);
620        
621        let result = receiver.recv().await;
622        assert_eq!(result, Ok(TestCompletion::Called));
623    }
624    
625    #[tokio::test]
626    async fn test_oneshot_immediate_cancelled() {
627        let (notifier, receiver) = Sender::<TestCompletion>::new();
628        
629        // Notify before waiting (fast path)
630        notifier.notify(TestCompletion::Cancelled);
631        
632        let result = receiver.recv().await;
633        assert_eq!(result, Ok(TestCompletion::Cancelled));
634    }
635    
636    // Test with custom state type
637    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
638    enum CustomState {
639        Success,
640        Failure,
641        Timeout,
642    }
643    
644    impl State for CustomState {
645        fn to_u8(&self) -> u8 {
646            match self {
647                CustomState::Success => 1,
648                CustomState::Failure => 2,
649                CustomState::Timeout => 3,
650            }
651        }
652        
653        fn from_u8(value: u8) -> Option<Self> {
654            match value {
655                1 => Some(CustomState::Success),
656                2 => Some(CustomState::Failure),
657                3 => Some(CustomState::Timeout),
658                _ => None,
659            }
660        }
661        
662        fn pending_value() -> u8 {
663            0
664        }
665        
666        fn closed_value() -> u8 {
667            255
668        }
669    }
670    
671    #[tokio::test]
672    async fn test_oneshot_custom_state() {
673        let (notifier, receiver) = Sender::<CustomState>::new();
674        
675        tokio::spawn(async move {
676            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
677            notifier.notify(CustomState::Success);
678        });
679        
680        let result = receiver.recv().await;
681        assert_eq!(result, Ok(CustomState::Success));
682    }
683    
684    #[tokio::test]
685    async fn test_oneshot_custom_state_timeout() {
686        let (notifier, receiver) = Sender::<CustomState>::new();
687        
688        // Immediate notification
689        notifier.notify(CustomState::Timeout);
690        
691        let result = receiver.recv().await;
692        assert_eq!(result, Ok(CustomState::Timeout));
693    }
694    
695    #[tokio::test]
696    async fn test_oneshot_unit_type() {
697        let (notifier, receiver) = Sender::<()>::new();
698        
699        tokio::spawn(async move {
700            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
701            notifier.notify(());
702        });
703        
704        let result = receiver.recv().await;
705        assert_eq!(result, Ok(()));
706    }
707    
708    #[tokio::test]
709    async fn test_oneshot_unit_type_immediate() {
710        let (notifier, receiver) = Sender::<()>::new();
711        
712        // Immediate notification (fast path)
713        notifier.notify(());
714        
715        let result = receiver.recv().await;
716        assert_eq!(result, Ok(()));
717    }
718    
719    // Tests for Future implementation (direct await)
720    #[tokio::test]
721    async fn test_oneshot_into_future_called() {
722        let (notifier, receiver) = Sender::<TestCompletion>::new();
723        
724        tokio::spawn(async move {
725            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
726            notifier.notify(TestCompletion::Called);
727        });
728        
729        // Direct await without .wait()
730        let result = receiver.await;
731        assert_eq!(result, Ok(TestCompletion::Called));
732    }
733    
734    #[tokio::test]
735    async fn test_oneshot_into_future_immediate() {
736        let (notifier, receiver) = Sender::<TestCompletion>::new();
737        
738        // Notify before awaiting (fast path)
739        notifier.notify(TestCompletion::Cancelled);
740        
741        // Direct await
742        let result = receiver.await;
743        assert_eq!(result, Ok(TestCompletion::Cancelled));
744    }
745    
746    #[tokio::test]
747    async fn test_oneshot_into_future_unit_type() {
748        let (notifier, receiver) = Sender::<()>::new();
749        
750        tokio::spawn(async move {
751            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
752            notifier.notify(());
753        });
754        
755        // Direct await with unit type
756        let result = receiver.await;
757        assert_eq!(result, Ok(()));
758    }
759    
760    #[tokio::test]
761    async fn test_oneshot_into_future_custom_state() {
762        let (notifier, receiver) = Sender::<CustomState>::new();
763        
764        tokio::spawn(async move {
765            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
766            notifier.notify(CustomState::Failure);
767        });
768        
769        // Direct await with custom state
770        let result = receiver.await;
771        assert_eq!(result, Ok(CustomState::Failure));
772    }
773    
774    // Test awaiting on &mut receiver
775    #[tokio::test]
776    async fn test_oneshot_await_mut_reference() {
777        let (notifier, mut receiver) = Sender::<TestCompletion>::new();
778        
779        tokio::spawn(async move {
780            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
781            notifier.notify(TestCompletion::Called);
782        });
783        
784        // Await on mutable reference
785        let result = (&mut receiver).await;
786        assert_eq!(result, Ok(TestCompletion::Called));
787    }
788    
789    #[tokio::test]
790    async fn test_oneshot_await_mut_reference_unit_type() {
791        let (notifier, mut receiver) = Sender::<()>::new();
792        
793        // Immediate notification
794        notifier.notify(());
795        
796        // Await on mutable reference (fast path)
797        let result = (&mut receiver).await;
798        assert_eq!(result, Ok(()));
799    }
800    
801    // Tests for try_recv
802    #[tokio::test]
803    async fn test_oneshot_try_recv_pending() {
804        let (_notifier, mut receiver) = Sender::<TestCompletion>::new();
805        
806        // Try receive before sending
807        let result = receiver.try_recv();
808        assert_eq!(result, Ok(None));
809    }
810    
811    #[tokio::test]
812    async fn test_oneshot_try_recv_ready() {
813        let (notifier, mut receiver) = Sender::<TestCompletion>::new();
814        
815        // Send value
816        notifier.notify(TestCompletion::Called);
817        
818        // Try receive after sending
819        let result = receiver.try_recv();
820        assert_eq!(result, Ok(Some(TestCompletion::Called)));
821    }
822    
823    #[tokio::test]
824    async fn test_oneshot_try_recv_sender_dropped() {
825        let (notifier, mut receiver) = Sender::<TestCompletion>::new();
826        
827        // Drop sender without sending
828        drop(notifier);
829        
830        // Try receive should return error
831        let result = receiver.try_recv();
832        assert_eq!(result, Err(SendError));
833    }
834    
835    // Tests for sender dropped behavior
836    #[tokio::test]
837    async fn test_oneshot_sender_dropped_before_recv() {
838        let (notifier, receiver) = Sender::<TestCompletion>::new();
839        
840        // Drop sender without sending
841        drop(notifier);
842        
843        // Recv should return error
844        let result = receiver.recv().await;
845        assert_eq!(result, Err(SendError));
846    }
847    
848    #[tokio::test]
849    async fn test_oneshot_sender_dropped_unit_type() {
850        let (notifier, receiver) = Sender::<()>::new();
851        
852        // Drop sender without sending
853        drop(notifier);
854        
855        // Recv should return error
856        let result = receiver.recv().await;
857        assert_eq!(result, Err(SendError));
858    }
859    
860    #[tokio::test]
861    async fn test_oneshot_sender_dropped_custom_state() {
862        let (notifier, receiver) = Sender::<CustomState>::new();
863        
864        // Drop sender without sending
865        drop(notifier);
866        
867        // Recv should return error
868        let result = receiver.recv().await;
869        assert_eq!(result, Err(SendError));
870    }
871}
872