lite_sync/oneshot.rs
1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use super::atomic_waker::AtomicWaker;
8
9/// Trait for types that can be used as oneshot state
10///
11/// Types implementing this trait can be converted to/from u8 for atomic storage.
12/// This allows for zero-allocation, lock-free state transitions.
13///
14/// 可用作 oneshot 状态的类型的 trait
15///
16/// 实现此 trait 的类型可以与 u8 互相转换以进行原子存储。
17/// 这允许零分配、无锁的状态转换。
18///
19/// # Built-in Implementations
20///
21/// - `()`: Simple completion notification without state
22///
23/// # Example: Custom State
24///
25/// ```
26/// use lite_sync::oneshot::{State, Sender};
27///
28/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
29/// enum CustomState {
30/// Success,
31/// Failure,
32/// Timeout,
33/// }
34///
35/// impl State for CustomState {
36/// fn to_u8(&self) -> u8 {
37/// match self {
38/// CustomState::Success => 1,
39/// CustomState::Failure => 2,
40/// CustomState::Timeout => 3,
41/// }
42/// }
43///
44/// fn from_u8(value: u8) -> Option<Self> {
45/// match value {
46/// 1 => Some(CustomState::Success),
47/// 2 => Some(CustomState::Failure),
48/// 3 => Some(CustomState::Timeout),
49/// _ => None,
50/// }
51/// }
52///
53/// fn pending_value() -> u8 {
54/// 0
55/// }
56/// }
57///
58/// # tokio_test::block_on(async {
59/// // Usage:
60/// let (notifier, receiver) = Sender::<CustomState>::new();
61/// tokio::spawn(async move {
62/// notifier.notify(CustomState::Success);
63/// });
64/// let result = receiver.await; // Direct await
65/// assert_eq!(result, CustomState::Success);
66/// # });
67/// ```
68pub trait State: Sized + Send + Sync + 'static {
69 /// Convert the state to u8 for atomic storage
70 ///
71 /// 将状态转换为 u8 以进行原子存储
72 fn to_u8(&self) -> u8;
73
74 /// Convert u8 back to the state type
75 ///
76 /// Returns None if the value doesn't represent a valid state
77 ///
78 /// 将 u8 转换回状态类型
79 ///
80 /// 如果值不代表有效状态则返回 None
81 fn from_u8(value: u8) -> Option<Self>;
82
83 /// The pending state value (before completion)
84 ///
85 /// 待处理状态值(完成前)
86 fn pending_value() -> u8;
87}
88
89/// Implementation for unit type () - simple completion notification without state
90///
91/// 为单元类型 () 实现 - 简单的完成通知,无需状态信息
92impl State for () {
93 #[inline]
94 fn to_u8(&self) -> u8 {
95 1 // Completed
96 }
97
98 #[inline]
99 fn from_u8(value: u8) -> Option<Self> {
100 match value {
101 1 => Some(()),
102 _ => None,
103 }
104 }
105
106 #[inline]
107 fn pending_value() -> u8 {
108 0 // Pending
109 }
110}
111
112#[inline]
113pub fn channel<T: State>() -> (Sender<T>, Receiver<T>) {
114 let (notifier, receiver) = Sender::<T>::new();
115 (notifier, receiver)
116}
117
118/// Inner state for one-shot completion notification
119///
120/// Uses AtomicWaker for zero Box allocation waker storage:
121/// - Waker itself is just 2 pointers (16 bytes on 64-bit), no additional heap allocation
122/// - Atomic state machine ensures safe concurrent access
123/// - Reuses common AtomicWaker implementation
124///
125/// 一次性完成通知的内部状态
126///
127/// 使用 AtomicWaker 实现零 Box 分配的 waker 存储:
128/// - Waker 本身只是 2 个指针(64 位系统上 16 字节),无额外堆分配
129/// - 原子状态机确保安全的并发访问
130/// - 复用通用的 AtomicWaker 实现
131pub(crate) struct Inner<T: State> {
132 waker: AtomicWaker,
133 state: AtomicU8,
134 _marker: std::marker::PhantomData<T>,
135}
136
137impl<T: State> Inner<T> {
138 /// Create a new oneshot inner state
139 ///
140 /// Extremely fast: just initializes empty waker and pending state
141 ///
142 /// 创建一个新的 oneshot 内部状态
143 ///
144 /// 极快:仅初始化空 waker 和待处理状态
145 #[inline]
146 pub(crate) fn new() -> Arc<Self> {
147 Arc::new(Self {
148 waker: AtomicWaker::new(),
149 state: AtomicU8::new(T::pending_value()),
150 _marker: std::marker::PhantomData,
151 })
152 }
153
154 /// Send a completion notification (set state and wake)
155 ///
156 /// 发送完成通知(设置状态并唤醒)
157 #[inline]
158 pub(crate) fn send(&self, state: T) {
159 // Store completion state first with Release ordering
160 self.state.store(state.to_u8(), Ordering::Release);
161
162 // Wake the registered waker if any
163 self.waker.wake();
164 }
165
166 /// Register a waker to be notified on completion
167 ///
168 /// 注册一个 waker 以在完成时收到通知
169 #[inline]
170 fn register_waker(&self, waker: &std::task::Waker) {
171 self.waker.register(waker);
172 }
173}
174
175// PERFORMANCE OPTIMIZATION: No Drop implementation for Inner
176// Waker cleanup is handled by Receiver::drop instead, which is more efficient because:
177// 1. In the common case (sender notifies before receiver drops), waker is already consumed
178// 2. Only Receiver creates wakers, so only Receiver needs to clean them up
179// 3. This makes Inner::drop a complete no-op, eliminating atomic load overhead
180//
181// 性能优化:Inner 不实现 Drop
182// Waker 清理由 Receiver::drop 处理,这更高效因为:
183// 1. 在常见情况下(发送方在接收方 drop 前通知),waker 已被消费
184// 2. 只有 Receiver 创建 waker,所以只有 Receiver 需要清理它们
185// 3. 这使得 Inner::drop 完全成为 no-op,消除了原子加载开销
186
187/// Completion notifier for one-shot tasks
188///
189/// 一次性任务完成通知器
190pub struct Sender<T: State> {
191 inner: Arc<Inner<T>>,
192}
193
194impl<T: State> Sender<T> {
195 /// Create a new oneshot completion notifier with receiver
196 ///
197 /// 创建一个新的 oneshot 完成通知器和接收器
198 ///
199 /// # Returns
200 /// Returns a tuple of (notifier, receiver)
201 ///
202 /// 返回 (通知器, 接收器) 元组
203 #[inline]
204 pub fn new() -> (Self, Receiver<T>) {
205 let inner = Inner::new();
206
207 let notifier = Sender {
208 inner: inner.clone(),
209 };
210 let receiver = Receiver {
211 inner,
212 };
213
214 (notifier, receiver)
215 }
216
217 /// Notify completion with the given state
218 ///
219 /// 使用给定状态通知完成
220 #[inline]
221 pub fn notify(&self, state: T) {
222 self.inner.send(state);
223 }
224}
225
226/// Completion receiver for one-shot tasks
227///
228/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
229///
230/// 一次性任务完成通知接收器
231///
232/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
233///
234/// # Examples
235///
236/// ## Using unit type for simple completion
237///
238/// ```
239/// use lite_sync::oneshot::Sender;
240///
241/// # tokio_test::block_on(async {
242/// let (notifier, receiver) = Sender::<()>::new();
243///
244/// tokio::spawn(async move {
245/// // ... do work ...
246/// notifier.notify(()); // Signal completion
247/// });
248///
249/// // Two equivalent ways to await:
250/// let result = receiver.await; // Direct await via Future impl
251/// assert_eq!(result, ());
252/// # });
253/// ```
254///
255/// ## Awaiting on mutable reference
256///
257/// ```
258/// use lite_sync::oneshot::Sender;
259///
260/// # tokio_test::block_on(async {
261/// let (notifier, mut receiver) = Sender::<()>::new();
262///
263/// tokio::spawn(async move {
264/// // ... do work ...
265/// notifier.notify(());
266/// });
267///
268/// // Can also await on &mut receiver
269/// let result = (&mut receiver).await;
270/// assert_eq!(result, ());
271/// # });
272/// ```
273///
274/// ## Using custom state
275///
276/// ```
277/// use lite_sync::oneshot::{State, Sender};
278///
279/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
280/// enum CustomState {
281/// Success,
282/// Failure,
283/// Timeout,
284/// }
285///
286/// impl State for CustomState {
287/// fn to_u8(&self) -> u8 {
288/// match self {
289/// CustomState::Success => 1,
290/// CustomState::Failure => 2,
291/// CustomState::Timeout => 3,
292/// }
293/// }
294///
295/// fn from_u8(value: u8) -> Option<Self> {
296/// match value {
297/// 1 => Some(CustomState::Success),
298/// 2 => Some(CustomState::Failure),
299/// 3 => Some(CustomState::Timeout),
300/// _ => None,
301/// }
302/// }
303///
304/// fn pending_value() -> u8 {
305/// 0
306/// }
307/// }
308///
309/// # tokio_test::block_on(async {
310/// let (notifier, receiver) = Sender::<CustomState>::new();
311///
312/// tokio::spawn(async move {
313/// notifier.notify(CustomState::Success);
314/// });
315///
316/// match receiver.await {
317/// CustomState::Success => { /* Success! */ },
318/// CustomState::Failure => { /* Failed */ },
319/// CustomState::Timeout => { /* Timed out */ },
320/// }
321/// # });
322/// ```
323pub struct Receiver<T: State> {
324 inner: Arc<Inner<T>>,
325}
326
327// Receiver is Unpin because all its fields are Unpin
328impl<T: State> Unpin for Receiver<T> {}
329
330// Receiver drop is automatically handled by AtomicWaker's drop implementation
331// No need for explicit drop implementation
332//
333// Receiver 的 drop 由 AtomicWaker 的 drop 实现自动处理
334// 无需显式的 drop 实现
335
336impl<T: State> Receiver<T> {
337 /// Wait for task completion asynchronously
338 ///
339 /// This is equivalent to using `.await` directly on the receiver
340 ///
341 /// 异步等待任务完成
342 ///
343 /// 这等同于直接在 receiver 上使用 `.await`
344 ///
345 /// # Returns
346 /// Returns the completion state
347 ///
348 /// # 返回值
349 /// 返回完成状态
350 #[inline]
351 pub async fn wait(self) -> T {
352 self.await
353 }
354}
355
356/// Direct Future implementation for Receiver
357///
358/// This allows both `receiver.await` and `(&mut receiver).await` to work
359///
360/// Optimized implementation:
361/// - Fast path: Immediate return if already completed (no allocation)
362/// - Slow path: Direct waker registration (no Box allocation, just copy two pointers)
363/// - No intermediate future state needed
364///
365/// 为 Receiver 直接实现 Future
366///
367/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
368///
369/// 优化实现:
370/// - 快速路径:如已完成则立即返回(无分配)
371/// - 慢速路径:直接注册 waker(无 Box 分配,只复制两个指针)
372/// - 无需中间 future 状态
373impl<T: State> Future for Receiver<T> {
374 type Output = T;
375
376 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
377 // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
378 let this = self.get_mut();
379
380 // Fast path: check if already completed
381 let current = this.inner.state.load(Ordering::Acquire);
382 if let Some(state) = T::from_u8(current)
383 && current != T::pending_value() {
384 return Poll::Ready(state);
385 }
386
387 // Slow path: register waker for notification
388 this.inner.register_waker(cx.waker());
389
390 // Check again after registering waker to avoid race condition
391 // The sender might have completed between our first check and waker registration
392 let current = this.inner.state.load(Ordering::Acquire);
393 if let Some(state) = T::from_u8(current)
394 && current != T::pending_value() {
395 return Poll::Ready(state);
396 }
397
398 Poll::Pending
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405
406 /// Test-only state type for completion notification
407 ///
408 /// 测试专用的完成通知状态类型
409 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
410 enum TestCompletion {
411 /// Task was called/completed successfully
412 ///
413 /// 任务被调用/成功完成
414 Called,
415
416 /// Task was cancelled
417 ///
418 /// 任务被取消
419 Cancelled,
420 }
421
422 impl State for TestCompletion {
423 fn to_u8(&self) -> u8 {
424 match self {
425 TestCompletion::Called => 1,
426 TestCompletion::Cancelled => 2,
427 }
428 }
429
430 fn from_u8(value: u8) -> Option<Self> {
431 match value {
432 1 => Some(TestCompletion::Called),
433 2 => Some(TestCompletion::Cancelled),
434 _ => None,
435 }
436 }
437
438 fn pending_value() -> u8 {
439 0
440 }
441 }
442
443 #[tokio::test]
444 async fn test_oneshot_called() {
445 let (notifier, receiver) = Sender::<TestCompletion>::new();
446
447 tokio::spawn(async move {
448 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
449 notifier.notify(TestCompletion::Called);
450 });
451
452 let result = receiver.wait().await;
453 assert_eq!(result, TestCompletion::Called);
454 }
455
456 #[tokio::test]
457 async fn test_oneshot_cancelled() {
458 let (notifier, receiver) = Sender::<TestCompletion>::new();
459
460 tokio::spawn(async move {
461 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
462 notifier.notify(TestCompletion::Cancelled);
463 });
464
465 let result = receiver.wait().await;
466 assert_eq!(result, TestCompletion::Cancelled);
467 }
468
469 #[tokio::test]
470 async fn test_oneshot_immediate_called() {
471 let (notifier, receiver) = Sender::<TestCompletion>::new();
472
473 // Notify before waiting (fast path)
474 notifier.notify(TestCompletion::Called);
475
476 let result = receiver.wait().await;
477 assert_eq!(result, TestCompletion::Called);
478 }
479
480 #[tokio::test]
481 async fn test_oneshot_immediate_cancelled() {
482 let (notifier, receiver) = Sender::<TestCompletion>::new();
483
484 // Notify before waiting (fast path)
485 notifier.notify(TestCompletion::Cancelled);
486
487 let result = receiver.wait().await;
488 assert_eq!(result, TestCompletion::Cancelled);
489 }
490
491 // Test with custom state type
492 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
493 enum CustomState {
494 Success,
495 Failure,
496 Timeout,
497 }
498
499 impl State for CustomState {
500 fn to_u8(&self) -> u8 {
501 match self {
502 CustomState::Success => 1,
503 CustomState::Failure => 2,
504 CustomState::Timeout => 3,
505 }
506 }
507
508 fn from_u8(value: u8) -> Option<Self> {
509 match value {
510 1 => Some(CustomState::Success),
511 2 => Some(CustomState::Failure),
512 3 => Some(CustomState::Timeout),
513 _ => None,
514 }
515 }
516
517 fn pending_value() -> u8 {
518 0
519 }
520 }
521
522 #[tokio::test]
523 async fn test_oneshot_custom_state() {
524 let (notifier, receiver) = Sender::<CustomState>::new();
525
526 tokio::spawn(async move {
527 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
528 notifier.notify(CustomState::Success);
529 });
530
531 let result = receiver.wait().await;
532 assert_eq!(result, CustomState::Success);
533 }
534
535 #[tokio::test]
536 async fn test_oneshot_custom_state_timeout() {
537 let (notifier, receiver) = Sender::<CustomState>::new();
538
539 // Immediate notification
540 notifier.notify(CustomState::Timeout);
541
542 let result = receiver.wait().await;
543 assert_eq!(result, CustomState::Timeout);
544 }
545
546 #[tokio::test]
547 async fn test_oneshot_unit_type() {
548 let (notifier, receiver) = Sender::<()>::new();
549
550 tokio::spawn(async move {
551 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
552 notifier.notify(());
553 });
554
555 let result = receiver.wait().await;
556 assert_eq!(result, ());
557 }
558
559 #[tokio::test]
560 async fn test_oneshot_unit_type_immediate() {
561 let (notifier, receiver) = Sender::<()>::new();
562
563 // Immediate notification (fast path)
564 notifier.notify(());
565
566 let result = receiver.wait().await;
567 assert_eq!(result, ());
568 }
569
570 // Tests for Future implementation (direct await)
571 #[tokio::test]
572 async fn test_oneshot_into_future_called() {
573 let (notifier, receiver) = Sender::<TestCompletion>::new();
574
575 tokio::spawn(async move {
576 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
577 notifier.notify(TestCompletion::Called);
578 });
579
580 // Direct await without .wait()
581 let result = receiver.await;
582 assert_eq!(result, TestCompletion::Called);
583 }
584
585 #[tokio::test]
586 async fn test_oneshot_into_future_immediate() {
587 let (notifier, receiver) = Sender::<TestCompletion>::new();
588
589 // Notify before awaiting (fast path)
590 notifier.notify(TestCompletion::Cancelled);
591
592 // Direct await
593 let result = receiver.await;
594 assert_eq!(result, TestCompletion::Cancelled);
595 }
596
597 #[tokio::test]
598 async fn test_oneshot_into_future_unit_type() {
599 let (notifier, receiver) = Sender::<()>::new();
600
601 tokio::spawn(async move {
602 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
603 notifier.notify(());
604 });
605
606 // Direct await with unit type
607 let result = receiver.await;
608 assert_eq!(result, ());
609 }
610
611 #[tokio::test]
612 async fn test_oneshot_into_future_custom_state() {
613 let (notifier, receiver) = Sender::<CustomState>::new();
614
615 tokio::spawn(async move {
616 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
617 notifier.notify(CustomState::Failure);
618 });
619
620 // Direct await with custom state
621 let result = receiver.await;
622 assert_eq!(result, CustomState::Failure);
623 }
624
625 // Test awaiting on &mut receiver
626 #[tokio::test]
627 async fn test_oneshot_await_mut_reference() {
628 let (notifier, mut receiver) = Sender::<TestCompletion>::new();
629
630 tokio::spawn(async move {
631 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
632 notifier.notify(TestCompletion::Called);
633 });
634
635 // Await on mutable reference
636 let result = (&mut receiver).await;
637 assert_eq!(result, TestCompletion::Called);
638 }
639
640 #[tokio::test]
641 async fn test_oneshot_await_mut_reference_unit_type() {
642 let (notifier, mut receiver) = Sender::<()>::new();
643
644 // Immediate notification
645 notifier.notify(());
646
647 // Await on mutable reference (fast path)
648 let result = (&mut receiver).await;
649 assert_eq!(result, ());
650 }
651}
652