lite_sync/oneshot.rs
1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use super::atomic_waker::AtomicWaker;
8
9/// Trait for types that can be used as oneshot state
10///
11/// Types implementing this trait can be converted to/from u8 for atomic storage.
12/// This allows for zero-allocation, lock-free state transitions.
13///
14/// 可用作 oneshot 状态的类型的 trait
15///
16/// 实现此 trait 的类型可以与 u8 互相转换以进行原子存储。
17/// 这允许零分配、无锁的状态转换。
18///
19/// # Built-in Implementations
20///
21/// - `()`: Simple completion notification without state
22///
23/// # Example: Custom State
24///
25/// ```
26/// use lite_sync::oneshot::{State, Sender};
27///
28/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
29/// enum CustomState {
30/// Success,
31/// Failure,
32/// Timeout,
33/// }
34///
35/// impl State for CustomState {
36/// fn to_u8(&self) -> u8 {
37/// match self {
38/// CustomState::Success => 1,
39/// CustomState::Failure => 2,
40/// CustomState::Timeout => 3,
41/// }
42/// }
43///
44/// fn from_u8(value: u8) -> Option<Self> {
45/// match value {
46/// 1 => Some(CustomState::Success),
47/// 2 => Some(CustomState::Failure),
48/// 3 => Some(CustomState::Timeout),
49/// _ => None,
50/// }
51/// }
52///
53/// fn pending_value() -> u8 {
54/// 0
55/// }
56/// }
57///
58/// # tokio_test::block_on(async {
59/// // Usage:
60/// let (notifier, receiver) = Sender::<CustomState>::new();
61/// tokio::spawn(async move {
62/// notifier.notify(CustomState::Success);
63/// });
64/// let result = receiver.await; // Direct await
65/// assert_eq!(result, CustomState::Success);
66/// # });
67/// ```
68pub trait State: Sized + Send + Sync + 'static {
69 /// Convert the state to u8 for atomic storage
70 ///
71 /// 将状态转换为 u8 以进行原子存储
72 fn to_u8(&self) -> u8;
73
74 /// Convert u8 back to the state type
75 ///
76 /// Returns None if the value doesn't represent a valid state
77 ///
78 /// 将 u8 转换回状态类型
79 ///
80 /// 如果值不代表有效状态则返回 None
81 fn from_u8(value: u8) -> Option<Self>;
82
83 /// The pending state value (before completion)
84 ///
85 /// 待处理状态值(完成前)
86 fn pending_value() -> u8;
87}
88
89/// Implementation for unit type () - simple completion notification without state
90///
91/// 为单元类型 () 实现 - 简单的完成通知,无需状态信息
92impl State for () {
93 #[inline]
94 fn to_u8(&self) -> u8 {
95 1 // Completed
96 }
97
98 #[inline]
99 fn from_u8(value: u8) -> Option<Self> {
100 match value {
101 1 => Some(()),
102 _ => None,
103 }
104 }
105
106 #[inline]
107 fn pending_value() -> u8 {
108 0 // Pending
109 }
110}
111
112#[inline]
113pub fn channel<T: State>() -> (Sender<T>, Receiver<T>) {
114 let (notifier, receiver) = Sender::<T>::new();
115 (notifier, receiver)
116}
117
118/// Inner state for one-shot completion notification
119///
120/// Uses AtomicWaker for zero Box allocation waker storage:
121/// - Waker itself is just 2 pointers (16 bytes on 64-bit), no additional heap allocation
122/// - Atomic state machine ensures safe concurrent access
123/// - Reuses common AtomicWaker implementation
124///
125/// 一次性完成通知的内部状态
126///
127/// 使用 AtomicWaker 实现零 Box 分配的 waker 存储:
128/// - Waker 本身只是 2 个指针(64 位系统上 16 字节),无额外堆分配
129/// - 原子状态机确保安全的并发访问
130/// - 复用通用的 AtomicWaker 实现
131pub(crate) struct Inner<T: State> {
132 waker: AtomicWaker,
133 state: AtomicU8,
134 _marker: std::marker::PhantomData<T>,
135}
136
137impl<T: State> Inner<T> {
138 /// Create a new oneshot inner state
139 ///
140 /// Extremely fast: just initializes empty waker and pending state
141 ///
142 /// 创建一个新的 oneshot 内部状态
143 ///
144 /// 极快:仅初始化空 waker 和待处理状态
145 #[inline]
146 pub(crate) fn new() -> Arc<Self> {
147 Arc::new(Self {
148 waker: AtomicWaker::new(),
149 state: AtomicU8::new(T::pending_value()),
150 _marker: std::marker::PhantomData,
151 })
152 }
153
154 /// Send a completion notification (set state and wake)
155 ///
156 /// 发送完成通知(设置状态并唤醒)
157 #[inline]
158 pub(crate) fn send(&self, state: T) {
159 // Store completion state first with Release ordering
160 self.state.store(state.to_u8(), Ordering::Release);
161
162 // Wake the registered waker if any
163 self.waker.wake();
164 }
165
166 /// Register a waker to be notified on completion
167 ///
168 /// 注册一个 waker 以在完成时收到通知
169 #[inline]
170 fn register_waker(&self, waker: &std::task::Waker) {
171 self.waker.register(waker);
172 }
173}
174
175// PERFORMANCE OPTIMIZATION: No Drop implementation for Inner
176// Waker cleanup is handled by Receiver::drop instead, which is more efficient because:
177// 1. In the common case (sender notifies before receiver drops), waker is already consumed
178// 2. Only Receiver creates wakers, so only Receiver needs to clean them up
179// 3. This makes Inner::drop a complete no-op, eliminating atomic load overhead
180//
181// 性能优化:Inner 不实现 Drop
182// Waker 清理由 Receiver::drop 处理,这更高效因为:
183// 1. 在常见情况下(发送方在接收方 drop 前通知),waker 已被消费
184// 2. 只有 Receiver 创建 waker,所以只有 Receiver 需要清理它们
185// 3. 这使得 Inner::drop 完全成为 no-op,消除了原子加载开销
186
187/// Completion notifier for one-shot tasks
188///
189/// 一次性任务完成通知器
190pub struct Sender<T: State> {
191 inner: Arc<Inner<T>>,
192}
193
194impl<T: State> Sender<T> {
195 /// Create a new oneshot completion notifier with receiver
196 ///
197 /// 创建一个新的 oneshot 完成通知器和接收器
198 ///
199 /// # Returns
200 /// Returns a tuple of (notifier, receiver)
201 ///
202 /// 返回 (通知器, 接收器) 元组
203 #[inline]
204 pub fn new() -> (Self, Receiver<T>) {
205 let inner = Inner::new();
206
207 let notifier = Sender {
208 inner: inner.clone(),
209 };
210 let receiver = Receiver {
211 inner,
212 };
213
214 (notifier, receiver)
215 }
216
217 /// Notify completion with the given state
218 ///
219 /// 使用给定状态通知完成
220 #[inline]
221 pub fn notify(&self, state: T) {
222 self.inner.send(state);
223 }
224}
225
226/// Completion receiver for one-shot tasks
227///
228/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
229///
230/// 一次性任务完成通知接收器
231///
232/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
233///
234/// # Examples
235///
236/// ## Using unit type for simple completion
237///
238/// ```
239/// use lite_sync::oneshot::Sender;
240///
241/// # tokio_test::block_on(async {
242/// let (notifier, receiver) = Sender::<()>::new();
243///
244/// tokio::spawn(async move {
245/// // ... do work ...
246/// notifier.notify(()); // Signal completion
247/// });
248///
249/// // Two equivalent ways to await:
250/// let result = receiver.await; // Direct await via Future impl
251/// assert_eq!(result, ());
252/// # });
253/// ```
254///
255/// ## Awaiting on mutable reference
256///
257/// ```
258/// use lite_sync::oneshot::Sender;
259///
260/// # tokio_test::block_on(async {
261/// let (notifier, mut receiver) = Sender::<()>::new();
262///
263/// tokio::spawn(async move {
264/// // ... do work ...
265/// notifier.notify(());
266/// });
267///
268/// // Can also await on &mut receiver
269/// let result = (&mut receiver).await;
270/// assert_eq!(result, ());
271/// # });
272/// ```
273///
274/// ## Using custom state
275///
276/// ```
277/// use lite_sync::oneshot::{State, Sender};
278///
279/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
280/// enum CustomState {
281/// Success,
282/// Failure,
283/// Timeout,
284/// }
285///
286/// impl State for CustomState {
287/// fn to_u8(&self) -> u8 {
288/// match self {
289/// CustomState::Success => 1,
290/// CustomState::Failure => 2,
291/// CustomState::Timeout => 3,
292/// }
293/// }
294///
295/// fn from_u8(value: u8) -> Option<Self> {
296/// match value {
297/// 1 => Some(CustomState::Success),
298/// 2 => Some(CustomState::Failure),
299/// 3 => Some(CustomState::Timeout),
300/// _ => None,
301/// }
302/// }
303///
304/// fn pending_value() -> u8 {
305/// 0
306/// }
307/// }
308///
309/// # tokio_test::block_on(async {
310/// let (notifier, receiver) = Sender::<CustomState>::new();
311///
312/// tokio::spawn(async move {
313/// notifier.notify(CustomState::Success);
314/// });
315///
316/// match receiver.await {
317/// CustomState::Success => { /* Success! */ },
318/// CustomState::Failure => { /* Failed */ },
319/// CustomState::Timeout => { /* Timed out */ },
320/// }
321/// # });
322/// ```
323pub struct Receiver<T: State> {
324 inner: Arc<Inner<T>>,
325}
326
327// Receiver is Unpin because all its fields are Unpin
328impl<T: State> Unpin for Receiver<T> {}
329
330// Receiver drop is automatically handled by AtomicWaker's drop implementation
331// No need for explicit drop implementation
332//
333// Receiver 的 drop 由 AtomicWaker 的 drop 实现自动处理
334// 无需显式的 drop 实现
335
336impl<T: State> Receiver<T> {
337 /// Wait for task completion asynchronously
338 ///
339 /// This is equivalent to using `.await` directly on the receiver
340 ///
341 /// 异步等待任务完成
342 ///
343 /// 这等同于直接在 receiver 上使用 `.await`
344 ///
345 /// # Returns
346 /// Returns the completion state
347 ///
348 /// # 返回值
349 /// 返回完成状态
350 #[inline]
351 pub async fn wait(self) -> T {
352 self.await
353 }
354}
355
356/// Direct Future implementation for Receiver
357///
358/// This allows both `receiver.await` and `(&mut receiver).await` to work
359///
360/// Optimized implementation:
361/// - Fast path: Immediate return if already completed (no allocation)
362/// - Slow path: Direct waker registration (no Box allocation, just copy two pointers)
363/// - No intermediate future state needed
364///
365/// 为 Receiver 直接实现 Future
366///
367/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
368///
369/// 优化实现:
370/// - 快速路径:如已完成则立即返回(无分配)
371/// - 慢速路径:直接注册 waker(无 Box 分配,只复制两个指针)
372/// - 无需中间 future 状态
373impl<T: State> Future for Receiver<T> {
374 type Output = T;
375
376 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
377 // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
378 let this = self.get_mut();
379
380 // Fast path: check if already completed
381 let current = this.inner.state.load(Ordering::Acquire);
382 if let Some(state) = T::from_u8(current) {
383 if current != T::pending_value() {
384 return Poll::Ready(state);
385 }
386 }
387
388 // Slow path: register waker for notification
389 this.inner.register_waker(cx.waker());
390
391 // Check again after registering waker to avoid race condition
392 // The sender might have completed between our first check and waker registration
393 let current = this.inner.state.load(Ordering::Acquire);
394 if let Some(state) = T::from_u8(current) {
395 if current != T::pending_value() {
396 return Poll::Ready(state);
397 }
398 }
399
400 Poll::Pending
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407
408 /// Test-only state type for completion notification
409 ///
410 /// 测试专用的完成通知状态类型
411 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
412 enum TestCompletion {
413 /// Task was called/completed successfully
414 ///
415 /// 任务被调用/成功完成
416 Called,
417
418 /// Task was cancelled
419 ///
420 /// 任务被取消
421 Cancelled,
422 }
423
424 impl State for TestCompletion {
425 fn to_u8(&self) -> u8 {
426 match self {
427 TestCompletion::Called => 1,
428 TestCompletion::Cancelled => 2,
429 }
430 }
431
432 fn from_u8(value: u8) -> Option<Self> {
433 match value {
434 1 => Some(TestCompletion::Called),
435 2 => Some(TestCompletion::Cancelled),
436 _ => None,
437 }
438 }
439
440 fn pending_value() -> u8 {
441 0
442 }
443 }
444
445 #[tokio::test]
446 async fn test_oneshot_called() {
447 let (notifier, receiver) = Sender::<TestCompletion>::new();
448
449 tokio::spawn(async move {
450 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
451 notifier.notify(TestCompletion::Called);
452 });
453
454 let result = receiver.wait().await;
455 assert_eq!(result, TestCompletion::Called);
456 }
457
458 #[tokio::test]
459 async fn test_oneshot_cancelled() {
460 let (notifier, receiver) = Sender::<TestCompletion>::new();
461
462 tokio::spawn(async move {
463 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
464 notifier.notify(TestCompletion::Cancelled);
465 });
466
467 let result = receiver.wait().await;
468 assert_eq!(result, TestCompletion::Cancelled);
469 }
470
471 #[tokio::test]
472 async fn test_oneshot_immediate_called() {
473 let (notifier, receiver) = Sender::<TestCompletion>::new();
474
475 // Notify before waiting (fast path)
476 notifier.notify(TestCompletion::Called);
477
478 let result = receiver.wait().await;
479 assert_eq!(result, TestCompletion::Called);
480 }
481
482 #[tokio::test]
483 async fn test_oneshot_immediate_cancelled() {
484 let (notifier, receiver) = Sender::<TestCompletion>::new();
485
486 // Notify before waiting (fast path)
487 notifier.notify(TestCompletion::Cancelled);
488
489 let result = receiver.wait().await;
490 assert_eq!(result, TestCompletion::Cancelled);
491 }
492
493 // Test with custom state type
494 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
495 enum CustomState {
496 Success,
497 Failure,
498 Timeout,
499 }
500
501 impl State for CustomState {
502 fn to_u8(&self) -> u8 {
503 match self {
504 CustomState::Success => 1,
505 CustomState::Failure => 2,
506 CustomState::Timeout => 3,
507 }
508 }
509
510 fn from_u8(value: u8) -> Option<Self> {
511 match value {
512 1 => Some(CustomState::Success),
513 2 => Some(CustomState::Failure),
514 3 => Some(CustomState::Timeout),
515 _ => None,
516 }
517 }
518
519 fn pending_value() -> u8 {
520 0
521 }
522 }
523
524 #[tokio::test]
525 async fn test_oneshot_custom_state() {
526 let (notifier, receiver) = Sender::<CustomState>::new();
527
528 tokio::spawn(async move {
529 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
530 notifier.notify(CustomState::Success);
531 });
532
533 let result = receiver.wait().await;
534 assert_eq!(result, CustomState::Success);
535 }
536
537 #[tokio::test]
538 async fn test_oneshot_custom_state_timeout() {
539 let (notifier, receiver) = Sender::<CustomState>::new();
540
541 // Immediate notification
542 notifier.notify(CustomState::Timeout);
543
544 let result = receiver.wait().await;
545 assert_eq!(result, CustomState::Timeout);
546 }
547
548 #[tokio::test]
549 async fn test_oneshot_unit_type() {
550 let (notifier, receiver) = Sender::<()>::new();
551
552 tokio::spawn(async move {
553 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
554 notifier.notify(());
555 });
556
557 let result = receiver.wait().await;
558 assert_eq!(result, ());
559 }
560
561 #[tokio::test]
562 async fn test_oneshot_unit_type_immediate() {
563 let (notifier, receiver) = Sender::<()>::new();
564
565 // Immediate notification (fast path)
566 notifier.notify(());
567
568 let result = receiver.wait().await;
569 assert_eq!(result, ());
570 }
571
572 // Tests for Future implementation (direct await)
573 #[tokio::test]
574 async fn test_oneshot_into_future_called() {
575 let (notifier, receiver) = Sender::<TestCompletion>::new();
576
577 tokio::spawn(async move {
578 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
579 notifier.notify(TestCompletion::Called);
580 });
581
582 // Direct await without .wait()
583 let result = receiver.await;
584 assert_eq!(result, TestCompletion::Called);
585 }
586
587 #[tokio::test]
588 async fn test_oneshot_into_future_immediate() {
589 let (notifier, receiver) = Sender::<TestCompletion>::new();
590
591 // Notify before awaiting (fast path)
592 notifier.notify(TestCompletion::Cancelled);
593
594 // Direct await
595 let result = receiver.await;
596 assert_eq!(result, TestCompletion::Cancelled);
597 }
598
599 #[tokio::test]
600 async fn test_oneshot_into_future_unit_type() {
601 let (notifier, receiver) = Sender::<()>::new();
602
603 tokio::spawn(async move {
604 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
605 notifier.notify(());
606 });
607
608 // Direct await with unit type
609 let result = receiver.await;
610 assert_eq!(result, ());
611 }
612
613 #[tokio::test]
614 async fn test_oneshot_into_future_custom_state() {
615 let (notifier, receiver) = Sender::<CustomState>::new();
616
617 tokio::spawn(async move {
618 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
619 notifier.notify(CustomState::Failure);
620 });
621
622 // Direct await with custom state
623 let result = receiver.await;
624 assert_eq!(result, CustomState::Failure);
625 }
626
627 // Test awaiting on &mut receiver
628 #[tokio::test]
629 async fn test_oneshot_await_mut_reference() {
630 let (notifier, mut receiver) = Sender::<TestCompletion>::new();
631
632 tokio::spawn(async move {
633 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
634 notifier.notify(TestCompletion::Called);
635 });
636
637 // Await on mutable reference
638 let result = (&mut receiver).await;
639 assert_eq!(result, TestCompletion::Called);
640 }
641
642 #[tokio::test]
643 async fn test_oneshot_await_mut_reference_unit_type() {
644 let (notifier, mut receiver) = Sender::<()>::new();
645
646 // Immediate notification
647 notifier.notify(());
648
649 // Await on mutable reference (fast path)
650 let result = (&mut receiver).await;
651 assert_eq!(result, ());
652 }
653}
654