lite_sync/oneshot/lite.rs
1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use crate::atomic_waker::AtomicWaker;
8
9/// Trait for types that can be used as oneshot state
10///
11/// Types implementing this trait can be converted to/from u8 for atomic storage.
12/// This allows for zero-allocation, lock-free state transitions.
13///
14/// 可用作 oneshot 状态的类型的 trait
15///
16/// 实现此 trait 的类型可以与 u8 互相转换以进行原子存储。
17/// 这允许零分配、无锁的状态转换。
18///
19/// # Built-in Implementations
20///
21/// - `()`: Simple completion notification without state
22///
23/// # Example: Custom State
24///
25/// ```
26/// use lite_sync::oneshot::lite::{State, Sender};
27///
28/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
29/// enum CustomState {
30/// Success,
31/// Failure,
32/// Timeout,
33/// }
34///
35/// impl State for CustomState {
36/// fn to_u8(&self) -> u8 {
37/// match self {
38/// CustomState::Success => 1,
39/// CustomState::Failure => 2,
40/// CustomState::Timeout => 3,
41/// }
42/// }
43///
44/// fn from_u8(value: u8) -> Option<Self> {
45/// match value {
46/// 1 => Some(CustomState::Success),
47/// 2 => Some(CustomState::Failure),
48/// 3 => Some(CustomState::Timeout),
49/// _ => None,
50/// }
51/// }
52///
53/// fn pending_value() -> u8 {
54/// 0
55/// }
56/// }
57///
58/// # tokio_test::block_on(async {
59/// // Usage:
60/// let (notifier, receiver) = Sender::<CustomState>::new();
61/// tokio::spawn(async move {
62/// notifier.notify(CustomState::Success);
63/// });
64/// let result = receiver.await; // Direct await
65/// assert_eq!(result, CustomState::Success);
66/// # });
67/// ```
68pub trait State: Sized + Send + Sync + 'static {
69 /// Convert the state to u8 for atomic storage
70 ///
71 /// 将状态转换为 u8 以进行原子存储
72 fn to_u8(&self) -> u8;
73
74 /// Convert u8 back to the state type
75 ///
76 /// Returns None if the value doesn't represent a valid state
77 ///
78 /// 将 u8 转换回状态类型
79 ///
80 /// 如果值不代表有效状态则返回 None
81 fn from_u8(value: u8) -> Option<Self>;
82
83 /// The pending state value (before completion)
84 ///
85 /// 待处理状态值(完成前)
86 fn pending_value() -> u8;
87}
88
89/// Implementation for unit type () - simple completion notification without state
90///
91/// 为单元类型 () 实现 - 简单的完成通知,无需状态信息
92impl State for () {
93 #[inline]
94 fn to_u8(&self) -> u8 {
95 1 // Completed
96 }
97
98 #[inline]
99 fn from_u8(value: u8) -> Option<Self> {
100 match value {
101 1 => Some(()),
102 _ => None,
103 }
104 }
105
106 #[inline]
107 fn pending_value() -> u8 {
108 0 // Pending
109 }
110}
111
112#[inline]
113pub fn channel<T: State>() -> (Sender<T>, Receiver<T>) {
114 let (notifier, receiver) = Sender::<T>::new();
115 (notifier, receiver)
116}
117
118/// Inner state for one-shot completion notification
119///
120/// Uses AtomicWaker for zero Box allocation waker storage:
121/// - Waker itself is just 2 pointers (16 bytes on 64-bit), no additional heap allocation
122/// - Atomic state machine ensures safe concurrent access
123/// - Reuses common AtomicWaker implementation
124///
125/// 一次性完成通知的内部状态
126///
127/// 使用 AtomicWaker 实现零 Box 分配的 waker 存储:
128/// - Waker 本身只是 2 个指针(64 位系统上 16 字节),无额外堆分配
129/// - 原子状态机确保安全的并发访问
130/// - 复用通用的 AtomicWaker 实现
131pub(crate) struct Inner<T: State> {
132 waker: AtomicWaker,
133 state: AtomicU8,
134 _marker: std::marker::PhantomData<T>,
135}
136
137impl<T: State> std::fmt::Debug for Inner<T> {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 let state = self.state.load(std::sync::atomic::Ordering::Acquire);
140 let is_pending = state == T::pending_value();
141 f.debug_struct("Inner")
142 .field("state", &state)
143 .field("is_pending", &is_pending)
144 .finish()
145 }
146}
147
148impl<T: State> Inner<T> {
149 /// Create a new oneshot inner state
150 ///
151 /// Extremely fast: just initializes empty waker and pending state
152 ///
153 /// 创建一个新的 oneshot 内部状态
154 ///
155 /// 极快:仅初始化空 waker 和待处理状态
156 #[inline]
157 pub(crate) fn new() -> Arc<Self> {
158 Arc::new(Self {
159 waker: AtomicWaker::new(),
160 state: AtomicU8::new(T::pending_value()),
161 _marker: std::marker::PhantomData,
162 })
163 }
164
165 /// Send a completion notification (set state and wake)
166 ///
167 /// 发送完成通知(设置状态并唤醒)
168 #[inline]
169 pub(crate) fn send(&self, state: T) {
170 // Store completion state first with Release ordering
171 self.state.store(state.to_u8(), Ordering::Release);
172
173 // Wake the registered waker if any
174 self.waker.wake();
175 }
176
177 /// Register a waker to be notified on completion
178 ///
179 /// 注册一个 waker 以在完成时收到通知
180 #[inline]
181 fn register_waker(&self, waker: &std::task::Waker) {
182 self.waker.register(waker);
183 }
184}
185
186// PERFORMANCE OPTIMIZATION: No Drop implementation for Inner
187// Waker cleanup is handled by Receiver::drop instead, which is more efficient because:
188// 1. In the common case (sender notifies before receiver drops), waker is already consumed
189// 2. Only Receiver creates wakers, so only Receiver needs to clean them up
190// 3. This makes Inner::drop a complete no-op, eliminating atomic load overhead
191//
192// 性能优化:Inner 不实现 Drop
193// Waker 清理由 Receiver::drop 处理,这更高效因为:
194// 1. 在常见情况下(发送方在接收方 drop 前通知),waker 已被消费
195// 2. 只有 Receiver 创建 waker,所以只有 Receiver 需要清理它们
196// 3. 这使得 Inner::drop 完全成为 no-op,消除了原子加载开销
197
198/// Completion notifier for one-shot tasks
199///
200/// 一次性任务完成通知器
201pub struct Sender<T: State> {
202 inner: Arc<Inner<T>>,
203}
204
205impl<T: State> std::fmt::Debug for Sender<T> {
206 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207 let state = self.inner.state.load(std::sync::atomic::Ordering::Acquire);
208 let is_pending = state == T::pending_value();
209 f.debug_struct("Sender")
210 .field("state", &state)
211 .field("is_pending", &is_pending)
212 .finish()
213 }
214}
215
216impl<T: State> Sender<T> {
217 /// Create a new oneshot completion notifier with receiver
218 ///
219 /// 创建一个新的 oneshot 完成通知器和接收器
220 ///
221 /// # Returns
222 /// Returns a tuple of (notifier, receiver)
223 ///
224 /// 返回 (通知器, 接收器) 元组
225 #[inline]
226 pub fn new() -> (Self, Receiver<T>) {
227 let inner = Inner::new();
228
229 let notifier = Sender {
230 inner: inner.clone(),
231 };
232 let receiver = Receiver {
233 inner,
234 };
235
236 (notifier, receiver)
237 }
238
239 /// Notify completion with the given state
240 ///
241 /// 使用给定状态通知完成
242 #[inline]
243 pub fn notify(&self, state: T) {
244 self.inner.send(state);
245 }
246}
247
248/// Completion receiver for one-shot tasks
249///
250/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
251///
252/// 一次性任务完成通知接收器
253///
254/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
255///
256/// # Examples
257///
258/// ## Using unit type for simple completion
259///
260/// ```
261/// use lite_sync::oneshot::lite::Sender;
262///
263/// # tokio_test::block_on(async {
264/// let (notifier, receiver) = Sender::<()>::new();
265///
266/// tokio::spawn(async move {
267/// // ... do work ...
268/// notifier.notify(()); // Signal completion
269/// });
270///
271/// // Two equivalent ways to await:
272/// let result = receiver.await; // Direct await via Future impl
273/// assert_eq!(result, ());
274/// # });
275/// ```
276///
277/// ## Awaiting on mutable reference
278///
279/// ```
280/// use lite_sync::oneshot::lite::Sender;
281///
282/// # tokio_test::block_on(async {
283/// let (notifier, mut receiver) = Sender::<()>::new();
284///
285/// tokio::spawn(async move {
286/// // ... do work ...
287/// notifier.notify(());
288/// });
289///
290/// // Can also await on &mut receiver
291/// let result = (&mut receiver).await;
292/// assert_eq!(result, ());
293/// # });
294/// ```
295///
296/// ## Using custom state
297///
298/// ```
299/// use lite_sync::oneshot::lite::{State, Sender};
300///
301/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
302/// enum CustomState {
303/// Success,
304/// Failure,
305/// Timeout,
306/// }
307///
308/// impl State for CustomState {
309/// fn to_u8(&self) -> u8 {
310/// match self {
311/// CustomState::Success => 1,
312/// CustomState::Failure => 2,
313/// CustomState::Timeout => 3,
314/// }
315/// }
316///
317/// fn from_u8(value: u8) -> Option<Self> {
318/// match value {
319/// 1 => Some(CustomState::Success),
320/// 2 => Some(CustomState::Failure),
321/// 3 => Some(CustomState::Timeout),
322/// _ => None,
323/// }
324/// }
325///
326/// fn pending_value() -> u8 {
327/// 0
328/// }
329/// }
330///
331/// # tokio_test::block_on(async {
332/// let (notifier, receiver) = Sender::<CustomState>::new();
333///
334/// tokio::spawn(async move {
335/// notifier.notify(CustomState::Success);
336/// });
337///
338/// match receiver.await {
339/// CustomState::Success => { /* Success! */ },
340/// CustomState::Failure => { /* Failed */ },
341/// CustomState::Timeout => { /* Timed out */ },
342/// }
343/// # });
344/// ```
345pub struct Receiver<T: State> {
346 inner: Arc<Inner<T>>,
347}
348
349impl<T: State> std::fmt::Debug for Receiver<T> {
350 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351 let state = self.inner.state.load(std::sync::atomic::Ordering::Acquire);
352 let is_pending = state == T::pending_value();
353 f.debug_struct("Receiver")
354 .field("state", &state)
355 .field("is_pending", &is_pending)
356 .finish()
357 }
358}
359
360// Receiver is Unpin because all its fields are Unpin
361impl<T: State> Unpin for Receiver<T> {}
362
363// Receiver drop is automatically handled by AtomicWaker's drop implementation
364// No need for explicit drop implementation
365//
366// Receiver 的 drop 由 AtomicWaker 的 drop 实现自动处理
367// 无需显式的 drop 实现
368
369impl<T: State> Receiver<T> {
370 /// Wait for task completion asynchronously
371 ///
372 /// This is equivalent to using `.await` directly on the receiver
373 ///
374 /// 异步等待任务完成
375 ///
376 /// 这等同于直接在 receiver 上使用 `.await`
377 ///
378 /// # Returns
379 /// Returns the completion state
380 ///
381 /// # 返回值
382 /// 返回完成状态
383 #[inline]
384 pub async fn wait(self) -> T {
385 self.await
386 }
387}
388
389/// Direct Future implementation for Receiver
390///
391/// This allows both `receiver.await` and `(&mut receiver).await` to work
392///
393/// Optimized implementation:
394/// - Fast path: Immediate return if already completed (no allocation)
395/// - Slow path: Direct waker registration (no Box allocation, just copy two pointers)
396/// - No intermediate future state needed
397///
398/// 为 Receiver 直接实现 Future
399///
400/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
401///
402/// 优化实现:
403/// - 快速路径:如已完成则立即返回(无分配)
404/// - 慢速路径:直接注册 waker(无 Box 分配,只复制两个指针)
405/// - 无需中间 future 状态
406impl<T: State> Future for Receiver<T> {
407 type Output = T;
408
409 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
410 // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
411 let this = self.get_mut();
412
413 // Fast path: check if already completed
414 let current = this.inner.state.load(Ordering::Acquire);
415 if let Some(state) = T::from_u8(current)
416 && current != T::pending_value() {
417 return Poll::Ready(state);
418 }
419
420 // Slow path: register waker for notification
421 this.inner.register_waker(cx.waker());
422
423 // Check again after registering waker to avoid race condition
424 // The sender might have completed between our first check and waker registration
425 let current = this.inner.state.load(Ordering::Acquire);
426 if let Some(state) = T::from_u8(current)
427 && current != T::pending_value() {
428 return Poll::Ready(state);
429 }
430
431 Poll::Pending
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438
439 /// Test-only state type for completion notification
440 ///
441 /// 测试专用的完成通知状态类型
442 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
443 enum TestCompletion {
444 /// Task was called/completed successfully
445 ///
446 /// 任务被调用/成功完成
447 Called,
448
449 /// Task was cancelled
450 ///
451 /// 任务被取消
452 Cancelled,
453 }
454
455 impl State for TestCompletion {
456 fn to_u8(&self) -> u8 {
457 match self {
458 TestCompletion::Called => 1,
459 TestCompletion::Cancelled => 2,
460 }
461 }
462
463 fn from_u8(value: u8) -> Option<Self> {
464 match value {
465 1 => Some(TestCompletion::Called),
466 2 => Some(TestCompletion::Cancelled),
467 _ => None,
468 }
469 }
470
471 fn pending_value() -> u8 {
472 0
473 }
474 }
475
476 #[tokio::test]
477 async fn test_oneshot_called() {
478 let (notifier, receiver) = Sender::<TestCompletion>::new();
479
480 tokio::spawn(async move {
481 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
482 notifier.notify(TestCompletion::Called);
483 });
484
485 let result = receiver.wait().await;
486 assert_eq!(result, TestCompletion::Called);
487 }
488
489 #[tokio::test]
490 async fn test_oneshot_cancelled() {
491 let (notifier, receiver) = Sender::<TestCompletion>::new();
492
493 tokio::spawn(async move {
494 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
495 notifier.notify(TestCompletion::Cancelled);
496 });
497
498 let result = receiver.wait().await;
499 assert_eq!(result, TestCompletion::Cancelled);
500 }
501
502 #[tokio::test]
503 async fn test_oneshot_immediate_called() {
504 let (notifier, receiver) = Sender::<TestCompletion>::new();
505
506 // Notify before waiting (fast path)
507 notifier.notify(TestCompletion::Called);
508
509 let result = receiver.wait().await;
510 assert_eq!(result, TestCompletion::Called);
511 }
512
513 #[tokio::test]
514 async fn test_oneshot_immediate_cancelled() {
515 let (notifier, receiver) = Sender::<TestCompletion>::new();
516
517 // Notify before waiting (fast path)
518 notifier.notify(TestCompletion::Cancelled);
519
520 let result = receiver.wait().await;
521 assert_eq!(result, TestCompletion::Cancelled);
522 }
523
524 // Test with custom state type
525 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
526 enum CustomState {
527 Success,
528 Failure,
529 Timeout,
530 }
531
532 impl State for CustomState {
533 fn to_u8(&self) -> u8 {
534 match self {
535 CustomState::Success => 1,
536 CustomState::Failure => 2,
537 CustomState::Timeout => 3,
538 }
539 }
540
541 fn from_u8(value: u8) -> Option<Self> {
542 match value {
543 1 => Some(CustomState::Success),
544 2 => Some(CustomState::Failure),
545 3 => Some(CustomState::Timeout),
546 _ => None,
547 }
548 }
549
550 fn pending_value() -> u8 {
551 0
552 }
553 }
554
555 #[tokio::test]
556 async fn test_oneshot_custom_state() {
557 let (notifier, receiver) = Sender::<CustomState>::new();
558
559 tokio::spawn(async move {
560 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
561 notifier.notify(CustomState::Success);
562 });
563
564 let result = receiver.wait().await;
565 assert_eq!(result, CustomState::Success);
566 }
567
568 #[tokio::test]
569 async fn test_oneshot_custom_state_timeout() {
570 let (notifier, receiver) = Sender::<CustomState>::new();
571
572 // Immediate notification
573 notifier.notify(CustomState::Timeout);
574
575 let result = receiver.wait().await;
576 assert_eq!(result, CustomState::Timeout);
577 }
578
579 #[tokio::test]
580 async fn test_oneshot_unit_type() {
581 let (notifier, receiver) = Sender::<()>::new();
582
583 tokio::spawn(async move {
584 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
585 notifier.notify(());
586 });
587
588 let result = receiver.wait().await;
589 assert_eq!(result, ());
590 }
591
592 #[tokio::test]
593 async fn test_oneshot_unit_type_immediate() {
594 let (notifier, receiver) = Sender::<()>::new();
595
596 // Immediate notification (fast path)
597 notifier.notify(());
598
599 let result = receiver.wait().await;
600 assert_eq!(result, ());
601 }
602
603 // Tests for Future implementation (direct await)
604 #[tokio::test]
605 async fn test_oneshot_into_future_called() {
606 let (notifier, receiver) = Sender::<TestCompletion>::new();
607
608 tokio::spawn(async move {
609 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
610 notifier.notify(TestCompletion::Called);
611 });
612
613 // Direct await without .wait()
614 let result = receiver.await;
615 assert_eq!(result, TestCompletion::Called);
616 }
617
618 #[tokio::test]
619 async fn test_oneshot_into_future_immediate() {
620 let (notifier, receiver) = Sender::<TestCompletion>::new();
621
622 // Notify before awaiting (fast path)
623 notifier.notify(TestCompletion::Cancelled);
624
625 // Direct await
626 let result = receiver.await;
627 assert_eq!(result, TestCompletion::Cancelled);
628 }
629
630 #[tokio::test]
631 async fn test_oneshot_into_future_unit_type() {
632 let (notifier, receiver) = Sender::<()>::new();
633
634 tokio::spawn(async move {
635 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
636 notifier.notify(());
637 });
638
639 // Direct await with unit type
640 let result = receiver.await;
641 assert_eq!(result, ());
642 }
643
644 #[tokio::test]
645 async fn test_oneshot_into_future_custom_state() {
646 let (notifier, receiver) = Sender::<CustomState>::new();
647
648 tokio::spawn(async move {
649 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
650 notifier.notify(CustomState::Failure);
651 });
652
653 // Direct await with custom state
654 let result = receiver.await;
655 assert_eq!(result, CustomState::Failure);
656 }
657
658 // Test awaiting on &mut receiver
659 #[tokio::test]
660 async fn test_oneshot_await_mut_reference() {
661 let (notifier, mut receiver) = Sender::<TestCompletion>::new();
662
663 tokio::spawn(async move {
664 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
665 notifier.notify(TestCompletion::Called);
666 });
667
668 // Await on mutable reference
669 let result = (&mut receiver).await;
670 assert_eq!(result, TestCompletion::Called);
671 }
672
673 #[tokio::test]
674 async fn test_oneshot_await_mut_reference_unit_type() {
675 let (notifier, mut receiver) = Sender::<()>::new();
676
677 // Immediate notification
678 notifier.notify(());
679
680 // Await on mutable reference (fast path)
681 let result = (&mut receiver).await;
682 assert_eq!(result, ());
683 }
684}
685