lite_sync/oneshot/lite.rs
1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use crate::atomic_waker::AtomicWaker;
8
9/// Error returned when the sender is dropped before sending a value
10///
11/// 当发送器在发送值之前被丢弃时返回的错误
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub struct SendError;
14
15impl std::fmt::Display for SendError {
16 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
17 write!(f, "sender dropped")
18 }
19}
20
21impl std::error::Error for SendError {}
22
23/// Trait for types that can be used as oneshot state
24///
25/// Types implementing this trait can be converted to/from u8 for atomic storage.
26/// This allows for zero-allocation, lock-free state transitions.
27///
28/// 可用作 oneshot 状态的类型的 trait
29///
30/// 实现此 trait 的类型可以与 u8 互相转换以进行原子存储。
31/// 这允许零分配、无锁的状态转换。
32///
33/// # Built-in Implementations
34///
35/// - `()`: Simple completion notification without state
36///
37/// # Example: Custom State
38///
39/// ```
40/// use lite_sync::oneshot::lite::{State, Sender};
41///
42/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
43/// enum CustomState {
44/// Success,
45/// Failure,
46/// Timeout,
47/// }
48///
49/// impl State for CustomState {
50/// fn to_u8(&self) -> u8 {
51/// match self {
52/// CustomState::Success => 1,
53/// CustomState::Failure => 2,
54/// CustomState::Timeout => 3,
55/// }
56/// }
57///
58/// fn from_u8(value: u8) -> Option<Self> {
59/// match value {
60/// 1 => Some(CustomState::Success),
61/// 2 => Some(CustomState::Failure),
62/// 3 => Some(CustomState::Timeout),
63/// _ => None,
64/// }
65/// }
66///
67/// fn pending_value() -> u8 {
68/// 0
69/// }
70///
71/// fn closed_value() -> u8 {
72/// 255
73/// }
74/// }
75///
76/// # tokio_test::block_on(async {
77/// // Usage:
78/// let (notifier, receiver) = Sender::<CustomState>::new();
79/// tokio::spawn(async move {
80/// notifier.notify(CustomState::Success);
81/// });
82/// let result = receiver.await; // Direct await
83/// assert_eq!(result, Ok(CustomState::Success));
84/// # });
85/// ```
86pub trait State: Sized + Send + Sync + 'static {
87 /// Convert the state to u8 for atomic storage
88 ///
89 /// 将状态转换为 u8 以进行原子存储
90 fn to_u8(&self) -> u8;
91
92 /// Convert u8 back to the state type
93 ///
94 /// Returns None if the value doesn't represent a valid state
95 ///
96 /// 将 u8 转换回状态类型
97 ///
98 /// 如果值不代表有效状态则返回 None
99 fn from_u8(value: u8) -> Option<Self>;
100
101 /// The pending state value (before completion)
102 ///
103 /// 待处理状态值(完成前)
104 fn pending_value() -> u8;
105
106 /// The closed state value (sender was dropped without sending)
107 ///
108 /// 已关闭状态值(发送器被丢弃而未发送)
109 fn closed_value() -> u8;
110}
111
112/// Implementation for unit type () - simple completion notification without state
113///
114/// 为单元类型 () 实现 - 简单的完成通知,无需状态信息
115impl State for () {
116 #[inline]
117 fn to_u8(&self) -> u8 {
118 1 // Completed
119 }
120
121 #[inline]
122 fn from_u8(value: u8) -> Option<Self> {
123 match value {
124 1 => Some(()),
125 _ => None,
126 }
127 }
128
129 #[inline]
130 fn pending_value() -> u8 {
131 0 // Pending
132 }
133
134 #[inline]
135 fn closed_value() -> u8 {
136 255 // Closed
137 }
138}
139
140#[inline]
141pub fn channel<T: State>() -> (Sender<T>, Receiver<T>) {
142 let (notifier, receiver) = Sender::<T>::new();
143 (notifier, receiver)
144}
145
146/// Inner state for one-shot completion notification
147///
148/// Uses AtomicWaker for zero Box allocation waker storage:
149/// - Waker itself is just 2 pointers (16 bytes on 64-bit), no additional heap allocation
150/// - Atomic state machine ensures safe concurrent access
151/// - Reuses common AtomicWaker implementation
152///
153/// 一次性完成通知的内部状态
154///
155/// 使用 AtomicWaker 实现零 Box 分配的 waker 存储:
156/// - Waker 本身只是 2 个指针(64 位系统上 16 字节),无额外堆分配
157/// - 原子状态机确保安全的并发访问
158/// - 复用通用的 AtomicWaker 实现
159pub(crate) struct Inner<T: State> {
160 waker: AtomicWaker,
161 state: AtomicU8,
162 _marker: std::marker::PhantomData<T>,
163}
164
165impl<T: State> std::fmt::Debug for Inner<T> {
166 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 let state = self.state.load(std::sync::atomic::Ordering::Acquire);
168 let is_pending = state == T::pending_value();
169 f.debug_struct("Inner")
170 .field("state", &state)
171 .field("is_pending", &is_pending)
172 .finish()
173 }
174}
175
176impl<T: State> Inner<T> {
177 /// Create a new oneshot inner state
178 ///
179 /// Extremely fast: just initializes empty waker and pending state
180 ///
181 /// 创建一个新的 oneshot 内部状态
182 ///
183 /// 极快:仅初始化空 waker 和待处理状态
184 #[inline]
185 pub(crate) fn new() -> Arc<Self> {
186 Arc::new(Self {
187 waker: AtomicWaker::new(),
188 state: AtomicU8::new(T::pending_value()),
189 _marker: std::marker::PhantomData,
190 })
191 }
192
193 /// Send a completion notification (set state and wake)
194 ///
195 /// 发送完成通知(设置状态并唤醒)
196 #[inline]
197 pub(crate) fn send(&self, state: T) {
198 // Store completion state first with Release ordering
199 self.state.store(state.to_u8(), Ordering::Release);
200
201 // Wake the registered waker if any
202 self.waker.wake();
203 }
204
205 /// Register a waker to be notified on completion
206 ///
207 /// 注册一个 waker 以在完成时收到通知
208 #[inline]
209 fn register_waker(&self, waker: &std::task::Waker) {
210 self.waker.register(waker);
211 }
212}
213
214// PERFORMANCE OPTIMIZATION: No Drop implementation for Inner
215// Waker cleanup is handled by Receiver::drop instead, which is more efficient because:
216// 1. In the common case (sender notifies before receiver drops), waker is already consumed
217// 2. Only Receiver creates wakers, so only Receiver needs to clean them up
218// 3. This makes Inner::drop a complete no-op, eliminating atomic load overhead
219//
220// 性能优化:Inner 不实现 Drop
221// Waker 清理由 Receiver::drop 处理,这更高效因为:
222// 1. 在常见情况下(发送方在接收方 drop 前通知),waker 已被消费
223// 2. 只有 Receiver 创建 waker,所以只有 Receiver 需要清理它们
224// 3. 这使得 Inner::drop 完全成为 no-op,消除了原子加载开销
225
226/// Completion notifier for one-shot tasks
227///
228/// 一次性任务完成通知器
229pub struct Sender<T: State> {
230 inner: Arc<Inner<T>>,
231}
232
233impl<T: State> std::fmt::Debug for Sender<T> {
234 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 let state = self.inner.state.load(std::sync::atomic::Ordering::Acquire);
236 let is_pending = state == T::pending_value();
237 f.debug_struct("Sender")
238 .field("state", &state)
239 .field("is_pending", &is_pending)
240 .finish()
241 }
242}
243
244impl<T: State> Sender<T> {
245 /// Create a new oneshot completion notifier with receiver
246 ///
247 /// 创建一个新的 oneshot 完成通知器和接收器
248 ///
249 /// # Returns
250 /// Returns a tuple of (notifier, receiver)
251 ///
252 /// 返回 (通知器, 接收器) 元组
253 #[inline]
254 pub fn new() -> (Self, Receiver<T>) {
255 let inner = Inner::new();
256
257 let notifier = Sender {
258 inner: inner.clone(),
259 };
260 let receiver = Receiver {
261 inner,
262 };
263
264 (notifier, receiver)
265 }
266
267 /// Notify completion with the given state
268 ///
269 /// 使用给定状态通知完成
270 #[inline]
271 pub fn notify(&self, state: T) {
272 self.inner.send(state);
273 }
274}
275
276impl<T: State> Drop for Sender<T> {
277 fn drop(&mut self) {
278 // Mark the channel as closed when sender is dropped, but only if no value was sent
279 // This allows receivers to detect that the sender is gone
280 // and return an error instead of waiting forever
281 //
282 // 当发送器被丢弃时标记通道为已关闭,但仅在没有发送值时
283 // 这允许接收器检测到发送器已消失
284 // 并返回错误而不是永远等待
285
286 // Try to transition from PENDING to CLOSED
287 // If this fails, it means a value was already sent, so we don't need to do anything
288 if self.inner.state.compare_exchange(
289 T::pending_value(),
290 T::closed_value(),
291 Ordering::Release,
292 Ordering::Acquire,
293 ).is_ok() {
294 // Successfully marked as closed, wake any waiting receiver
295 self.inner.waker.wake();
296 }
297 }
298}
299
300/// Completion receiver for one-shot tasks
301///
302/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
303///
304/// 一次性任务完成通知接收器
305///
306/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
307///
308/// # Examples
309///
310/// ## Using unit type for simple completion
311///
312/// ```
313/// use lite_sync::oneshot::lite::Sender;
314///
315/// # tokio_test::block_on(async {
316/// let (notifier, receiver) = Sender::<()>::new();
317///
318/// tokio::spawn(async move {
319/// // ... do work ...
320/// notifier.notify(()); // Signal completion
321/// });
322///
323/// // Two equivalent ways to await:
324/// let result = receiver.await; // Direct await via Future impl
325/// assert_eq!(result, Ok(()));
326/// # });
327/// ```
328///
329/// ## Awaiting on mutable reference
330///
331/// ```
332/// use lite_sync::oneshot::lite::Sender;
333///
334/// # tokio_test::block_on(async {
335/// let (notifier, mut receiver) = Sender::<()>::new();
336///
337/// tokio::spawn(async move {
338/// // ... do work ...
339/// notifier.notify(());
340/// });
341///
342/// // Can also await on &mut receiver
343/// let result = (&mut receiver).await;
344/// assert_eq!(result, Ok(()));
345/// # });
346/// ```
347///
348/// ## Using custom state
349///
350/// ```
351/// use lite_sync::oneshot::lite::{State, Sender};
352///
353/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
354/// enum CustomState {
355/// Success,
356/// Failure,
357/// Timeout,
358/// }
359///
360/// impl State for CustomState {
361/// fn to_u8(&self) -> u8 {
362/// match self {
363/// CustomState::Success => 1,
364/// CustomState::Failure => 2,
365/// CustomState::Timeout => 3,
366/// }
367/// }
368///
369/// fn from_u8(value: u8) -> Option<Self> {
370/// match value {
371/// 1 => Some(CustomState::Success),
372/// 2 => Some(CustomState::Failure),
373/// 3 => Some(CustomState::Timeout),
374/// _ => None,
375/// }
376/// }
377///
378/// fn pending_value() -> u8 {
379/// 0
380/// }
381///
382/// fn closed_value() -> u8 {
383/// 255
384/// }
385/// }
386///
387/// # tokio_test::block_on(async {
388/// let (notifier, receiver) = Sender::<CustomState>::new();
389///
390/// tokio::spawn(async move {
391/// notifier.notify(CustomState::Success);
392/// });
393///
394/// match receiver.await {
395/// Ok(CustomState::Success) => { /* Success! */ },
396/// Ok(CustomState::Failure) => { /* Failed */ },
397/// Ok(CustomState::Timeout) => { /* Timed out */ },
398/// Err(_) => { /* Sender dropped */ },
399/// }
400/// # });
401/// ```
402pub struct Receiver<T: State> {
403 inner: Arc<Inner<T>>,
404}
405
406impl<T: State> std::fmt::Debug for Receiver<T> {
407 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
408 let state = self.inner.state.load(std::sync::atomic::Ordering::Acquire);
409 let is_pending = state == T::pending_value();
410 f.debug_struct("Receiver")
411 .field("state", &state)
412 .field("is_pending", &is_pending)
413 .finish()
414 }
415}
416
417// Receiver is Unpin because all its fields are Unpin
418impl<T: State> Unpin for Receiver<T> {}
419
420// Receiver drop is automatically handled by AtomicWaker's drop implementation
421// No need for explicit drop implementation
422//
423// Receiver 的 drop 由 AtomicWaker 的 drop 实现自动处理
424// 无需显式的 drop 实现
425
426impl<T: State> Receiver<T> {
427 /// Receive a value asynchronously
428 ///
429 /// This is equivalent to using `.await` directly on the receiver
430 ///
431 /// Returns `Err(SendError)` if the sender was dropped before sending a value
432 ///
433 /// 异步接收一个值
434 ///
435 /// 这等同于直接在 receiver 上使用 `.await`
436 ///
437 /// 如果发送器在发送值之前被丢弃则返回 `Err(SendError)`
438 ///
439 /// # Returns
440 /// Returns the completion state or error if sender was dropped
441 ///
442 /// # 返回值
443 /// 返回完成状态或发送器被丢弃时的错误
444 #[inline]
445 pub async fn recv(self) -> Result<T, SendError> {
446 self.await
447 }
448
449 /// Try to receive a value without blocking
450 ///
451 /// Returns `None` if no value has been sent yet
452 /// Returns `Err(SendError)` if the sender was dropped
453 ///
454 /// 尝试接收值而不阻塞
455 ///
456 /// 如果还没有发送值则返回 `None`
457 /// 如果发送器被丢弃则返回 `Err(SendError)`
458 ///
459 /// # Returns
460 /// Returns `Some(value)` if value is ready, `None` if pending, or `Err(SendError)` if sender dropped
461 ///
462 /// # 返回值
463 /// 如果值已就绪返回 `Some(value)`,如果待处理返回 `None`,如果发送器被丢弃返回 `Err(SendError)`
464 #[inline]
465 pub fn try_recv(&mut self) -> Result<Option<T>, SendError> {
466 let current = self.inner.state.load(Ordering::Acquire);
467
468 // Check if sender was dropped
469 if current == T::closed_value() {
470 return Err(SendError);
471 }
472
473 // Check if value is ready
474 if let Some(state) = T::from_u8(current)
475 && current != T::pending_value() {
476 return Ok(Some(state));
477 }
478
479 Ok(None)
480 }
481}
482
483/// Direct Future implementation for Receiver
484///
485/// This allows both `receiver.await` and `(&mut receiver).await` to work
486///
487/// Optimized implementation:
488/// - Fast path: Immediate return if already completed (no allocation)
489/// - Slow path: Direct waker registration (no Box allocation, just copy two pointers)
490/// - No intermediate future state needed
491/// - Detects when sender is dropped and returns error
492///
493/// 为 Receiver 直接实现 Future
494///
495/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
496///
497/// 优化实现:
498/// - 快速路径:如已完成则立即返回(无分配)
499/// - 慢速路径:直接注册 waker(无 Box 分配,只复制两个指针)
500/// - 无需中间 future 状态
501/// - 检测发送器何时被丢弃并返回错误
502impl<T: State> Future for Receiver<T> {
503 type Output = Result<T, SendError>;
504
505 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
506 // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
507 let this = self.get_mut();
508
509 // Fast path: check if already completed or closed
510 let current = this.inner.state.load(Ordering::Acquire);
511
512 // Check if sender was dropped
513 if current == T::closed_value() {
514 return Poll::Ready(Err(SendError));
515 }
516
517 if let Some(state) = T::from_u8(current)
518 && current != T::pending_value() {
519 return Poll::Ready(Ok(state));
520 }
521
522 // Slow path: register waker for notification
523 this.inner.register_waker(cx.waker());
524
525 // Check again after registering waker to avoid race condition
526 // The sender might have completed between our first check and waker registration
527 let current = this.inner.state.load(Ordering::Acquire);
528
529 // Check if sender was dropped
530 if current == T::closed_value() {
531 return Poll::Ready(Err(SendError));
532 }
533
534 if let Some(state) = T::from_u8(current)
535 && current != T::pending_value() {
536 return Poll::Ready(Ok(state));
537 }
538
539 Poll::Pending
540 }
541}
542
543#[cfg(test)]
544mod tests {
545 use super::*;
546
547 /// Test-only state type for completion notification
548 ///
549 /// 测试专用的完成通知状态类型
550 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
551 enum TestCompletion {
552 /// Task was called/completed successfully
553 ///
554 /// 任务被调用/成功完成
555 Called,
556
557 /// Task was cancelled
558 ///
559 /// 任务被取消
560 Cancelled,
561 }
562
563 impl State for TestCompletion {
564 fn to_u8(&self) -> u8 {
565 match self {
566 TestCompletion::Called => 1,
567 TestCompletion::Cancelled => 2,
568 }
569 }
570
571 fn from_u8(value: u8) -> Option<Self> {
572 match value {
573 1 => Some(TestCompletion::Called),
574 2 => Some(TestCompletion::Cancelled),
575 _ => None,
576 }
577 }
578
579 fn pending_value() -> u8 {
580 0
581 }
582
583 fn closed_value() -> u8 {
584 255
585 }
586 }
587
588 #[tokio::test]
589 async fn test_oneshot_called() {
590 let (notifier, receiver) = Sender::<TestCompletion>::new();
591
592 tokio::spawn(async move {
593 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
594 notifier.notify(TestCompletion::Called);
595 });
596
597 let result = receiver.recv().await;
598 assert_eq!(result, Ok(TestCompletion::Called));
599 }
600
601 #[tokio::test]
602 async fn test_oneshot_cancelled() {
603 let (notifier, receiver) = Sender::<TestCompletion>::new();
604
605 tokio::spawn(async move {
606 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
607 notifier.notify(TestCompletion::Cancelled);
608 });
609
610 let result = receiver.recv().await;
611 assert_eq!(result, Ok(TestCompletion::Cancelled));
612 }
613
614 #[tokio::test]
615 async fn test_oneshot_immediate_called() {
616 let (notifier, receiver) = Sender::<TestCompletion>::new();
617
618 // Notify before waiting (fast path)
619 notifier.notify(TestCompletion::Called);
620
621 let result = receiver.recv().await;
622 assert_eq!(result, Ok(TestCompletion::Called));
623 }
624
625 #[tokio::test]
626 async fn test_oneshot_immediate_cancelled() {
627 let (notifier, receiver) = Sender::<TestCompletion>::new();
628
629 // Notify before waiting (fast path)
630 notifier.notify(TestCompletion::Cancelled);
631
632 let result = receiver.recv().await;
633 assert_eq!(result, Ok(TestCompletion::Cancelled));
634 }
635
636 // Test with custom state type
637 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
638 enum CustomState {
639 Success,
640 Failure,
641 Timeout,
642 }
643
644 impl State for CustomState {
645 fn to_u8(&self) -> u8 {
646 match self {
647 CustomState::Success => 1,
648 CustomState::Failure => 2,
649 CustomState::Timeout => 3,
650 }
651 }
652
653 fn from_u8(value: u8) -> Option<Self> {
654 match value {
655 1 => Some(CustomState::Success),
656 2 => Some(CustomState::Failure),
657 3 => Some(CustomState::Timeout),
658 _ => None,
659 }
660 }
661
662 fn pending_value() -> u8 {
663 0
664 }
665
666 fn closed_value() -> u8 {
667 255
668 }
669 }
670
671 #[tokio::test]
672 async fn test_oneshot_custom_state() {
673 let (notifier, receiver) = Sender::<CustomState>::new();
674
675 tokio::spawn(async move {
676 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
677 notifier.notify(CustomState::Success);
678 });
679
680 let result = receiver.recv().await;
681 assert_eq!(result, Ok(CustomState::Success));
682 }
683
684 #[tokio::test]
685 async fn test_oneshot_custom_state_timeout() {
686 let (notifier, receiver) = Sender::<CustomState>::new();
687
688 // Immediate notification
689 notifier.notify(CustomState::Timeout);
690
691 let result = receiver.recv().await;
692 assert_eq!(result, Ok(CustomState::Timeout));
693 }
694
695 #[tokio::test]
696 async fn test_oneshot_unit_type() {
697 let (notifier, receiver) = Sender::<()>::new();
698
699 tokio::spawn(async move {
700 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
701 notifier.notify(());
702 });
703
704 let result = receiver.recv().await;
705 assert_eq!(result, Ok(()));
706 }
707
708 #[tokio::test]
709 async fn test_oneshot_unit_type_immediate() {
710 let (notifier, receiver) = Sender::<()>::new();
711
712 // Immediate notification (fast path)
713 notifier.notify(());
714
715 let result = receiver.recv().await;
716 assert_eq!(result, Ok(()));
717 }
718
719 // Tests for Future implementation (direct await)
720 #[tokio::test]
721 async fn test_oneshot_into_future_called() {
722 let (notifier, receiver) = Sender::<TestCompletion>::new();
723
724 tokio::spawn(async move {
725 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
726 notifier.notify(TestCompletion::Called);
727 });
728
729 // Direct await without .wait()
730 let result = receiver.await;
731 assert_eq!(result, Ok(TestCompletion::Called));
732 }
733
734 #[tokio::test]
735 async fn test_oneshot_into_future_immediate() {
736 let (notifier, receiver) = Sender::<TestCompletion>::new();
737
738 // Notify before awaiting (fast path)
739 notifier.notify(TestCompletion::Cancelled);
740
741 // Direct await
742 let result = receiver.await;
743 assert_eq!(result, Ok(TestCompletion::Cancelled));
744 }
745
746 #[tokio::test]
747 async fn test_oneshot_into_future_unit_type() {
748 let (notifier, receiver) = Sender::<()>::new();
749
750 tokio::spawn(async move {
751 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
752 notifier.notify(());
753 });
754
755 // Direct await with unit type
756 let result = receiver.await;
757 assert_eq!(result, Ok(()));
758 }
759
760 #[tokio::test]
761 async fn test_oneshot_into_future_custom_state() {
762 let (notifier, receiver) = Sender::<CustomState>::new();
763
764 tokio::spawn(async move {
765 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
766 notifier.notify(CustomState::Failure);
767 });
768
769 // Direct await with custom state
770 let result = receiver.await;
771 assert_eq!(result, Ok(CustomState::Failure));
772 }
773
774 // Test awaiting on &mut receiver
775 #[tokio::test]
776 async fn test_oneshot_await_mut_reference() {
777 let (notifier, mut receiver) = Sender::<TestCompletion>::new();
778
779 tokio::spawn(async move {
780 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
781 notifier.notify(TestCompletion::Called);
782 });
783
784 // Await on mutable reference
785 let result = (&mut receiver).await;
786 assert_eq!(result, Ok(TestCompletion::Called));
787 }
788
789 #[tokio::test]
790 async fn test_oneshot_await_mut_reference_unit_type() {
791 let (notifier, mut receiver) = Sender::<()>::new();
792
793 // Immediate notification
794 notifier.notify(());
795
796 // Await on mutable reference (fast path)
797 let result = (&mut receiver).await;
798 assert_eq!(result, Ok(()));
799 }
800
801 // Tests for try_recv
802 #[tokio::test]
803 async fn test_oneshot_try_recv_pending() {
804 let (_notifier, mut receiver) = Sender::<TestCompletion>::new();
805
806 // Try receive before sending
807 let result = receiver.try_recv();
808 assert_eq!(result, Ok(None));
809 }
810
811 #[tokio::test]
812 async fn test_oneshot_try_recv_ready() {
813 let (notifier, mut receiver) = Sender::<TestCompletion>::new();
814
815 // Send value
816 notifier.notify(TestCompletion::Called);
817
818 // Try receive after sending
819 let result = receiver.try_recv();
820 assert_eq!(result, Ok(Some(TestCompletion::Called)));
821 }
822
823 #[tokio::test]
824 async fn test_oneshot_try_recv_sender_dropped() {
825 let (notifier, mut receiver) = Sender::<TestCompletion>::new();
826
827 // Drop sender without sending
828 drop(notifier);
829
830 // Try receive should return error
831 let result = receiver.try_recv();
832 assert_eq!(result, Err(SendError));
833 }
834
835 // Tests for sender dropped behavior
836 #[tokio::test]
837 async fn test_oneshot_sender_dropped_before_recv() {
838 let (notifier, receiver) = Sender::<TestCompletion>::new();
839
840 // Drop sender without sending
841 drop(notifier);
842
843 // Recv should return error
844 let result = receiver.recv().await;
845 assert_eq!(result, Err(SendError));
846 }
847
848 #[tokio::test]
849 async fn test_oneshot_sender_dropped_unit_type() {
850 let (notifier, receiver) = Sender::<()>::new();
851
852 // Drop sender without sending
853 drop(notifier);
854
855 // Recv should return error
856 let result = receiver.recv().await;
857 assert_eq!(result, Err(SendError));
858 }
859
860 #[tokio::test]
861 async fn test_oneshot_sender_dropped_custom_state() {
862 let (notifier, receiver) = Sender::<CustomState>::new();
863
864 // Drop sender without sending
865 drop(notifier);
866
867 // Recv should return error
868 let result = receiver.recv().await;
869 assert_eq!(result, Err(SendError));
870 }
871}
872