kestrel_timer/utils/oneshot.rs
1use std::sync::Arc;
2use std::sync::atomic::{AtomicU8, AtomicPtr, Ordering};
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll, Waker};
6use std::ptr;
7
8use crate::task::{ONESHOT_PENDING, ONESHOT_CALLED, ONESHOT_CANCELLED, TaskCompletion};
9
10/// Trait for types that can be used as oneshot state
11///
12/// Types implementing this trait can be converted to/from u8 for atomic storage.
13/// This allows for zero-allocation, lock-free state transitions.
14///
15/// 可用作 oneshot 状态的类型的 trait
16///
17/// 实现此 trait 的类型可以与 u8 互相转换以进行原子存储。
18/// 这允许零分配、无锁的状态转换。
19///
20/// # Built-in Implementations
21///
22/// - `TaskCompletion`: Called or Cancelled states
23/// - `()`: Simple completion notification without state
24///
25/// # Example: Custom State
26///
27/// ```rust,ignore
28/// #[derive(Debug, Clone, Copy, PartialEq, Eq)]
29/// enum CustomState {
30/// Success,
31/// Failure,
32/// Timeout,
33/// }
34///
35/// impl State for CustomState {
36/// fn to_u8(&self) -> u8 {
37/// match self {
38/// CustomState::Success => 1,
39/// CustomState::Failure => 2,
40/// CustomState::Timeout => 3,
41/// }
42/// }
43///
44/// fn from_u8(value: u8) -> Option<Self> {
45/// match value {
46/// 1 => Some(CustomState::Success),
47/// 2 => Some(CustomState::Failure),
48/// 3 => Some(CustomState::Timeout),
49/// _ => None,
50/// }
51/// }
52///
53/// fn pending_value() -> u8 {
54/// 0
55/// }
56/// }
57///
58/// // Usage:
59/// let (notifier, receiver) = Sender::<CustomState>::new();
60/// tokio::spawn(async move {
61/// notifier.notify(CustomState::Success);
62/// });
63/// let result = receiver.await; // Direct await
64/// ```
65pub trait State: Sized + Send + Sync + 'static {
66 /// Convert the state to u8 for atomic storage
67 ///
68 /// 将状态转换为 u8 以进行原子存储
69 fn to_u8(&self) -> u8;
70
71 /// Convert u8 back to the state type
72 ///
73 /// Returns None if the value doesn't represent a valid state
74 ///
75 /// 将 u8 转换回状态类型
76 ///
77 /// 如果值不代表有效状态则返回 None
78 fn from_u8(value: u8) -> Option<Self>;
79
80 /// The pending state value (before completion)
81 ///
82 /// 待处理状态值(完成前)
83 fn pending_value() -> u8;
84}
85
86impl State for TaskCompletion {
87 #[inline]
88 fn to_u8(&self) -> u8 {
89 match self {
90 TaskCompletion::Called => ONESHOT_CALLED,
91 TaskCompletion::Cancelled => ONESHOT_CANCELLED,
92 }
93 }
94
95 #[inline]
96 fn from_u8(value: u8) -> Option<Self> {
97 match value {
98 ONESHOT_CALLED => Some(TaskCompletion::Called),
99 ONESHOT_CANCELLED => Some(TaskCompletion::Cancelled),
100 _ => None,
101 }
102 }
103
104 #[inline]
105 fn pending_value() -> u8 {
106 ONESHOT_PENDING
107 }
108}
109
110/// Implementation for unit type () - simple completion notification without state
111///
112/// 为单元类型 () 实现 - 简单的完成通知,无需状态信息
113impl State for () {
114 #[inline]
115 fn to_u8(&self) -> u8 {
116 1 // Completed
117 }
118
119 #[inline]
120 fn from_u8(value: u8) -> Option<Self> {
121 match value {
122 1 => Some(()),
123 _ => None,
124 }
125 }
126
127 #[inline]
128 fn pending_value() -> u8 {
129 0 // Pending
130 }
131}
132
133#[inline]
134pub fn channel<T: State>() -> (Sender<T>, Receiver<T>) {
135 let (notifier, receiver) = Sender::<T>::new();
136 (notifier, receiver)
137}
138
139/// Inner state for one-shot completion notification
140///
141/// Uses AtomicPtr<Waker> for minimal overhead:
142/// - Creation: Just initialize null pointer (no allocation)
143/// - Send: Atomic swap and wake (minimal operations)
144/// - Recv: Atomic store waker pointer (no intermediate futures)
145///
146/// Much lighter than both Notify (has waitlist) and AtomicWaker (has state machine)
147///
148/// 一次性完成通知的内部状态
149///
150/// 使用 AtomicPtr<Waker> 实现最小开销:
151/// - 创建:仅初始化空指针(无分配)
152/// - 发送:原子交换并唤醒(最少操作)
153/// - 接收:原子存储 waker 指针(无中间 future)
154///
155/// 比 Notify(有等待列表)和 AtomicWaker(有状态机)都更轻量
156pub(crate) struct Inner<T: State> {
157 waker: AtomicPtr<Waker>,
158 state: AtomicU8,
159 _marker: std::marker::PhantomData<T>,
160}
161
162impl<T: State> Inner<T> {
163 /// Create a new oneshot inner state
164 ///
165 /// Extremely fast: just initializes null pointer and pending state
166 ///
167 /// 创建一个新的 oneshot 内部状态
168 ///
169 /// 极快:仅初始化空指针和待处理状态
170 #[inline]
171 pub(crate) fn new() -> Arc<Self> {
172 Arc::new(Self {
173 waker: AtomicPtr::new(ptr::null_mut()),
174 state: AtomicU8::new(T::pending_value()),
175 _marker: std::marker::PhantomData,
176 })
177 }
178
179 /// Send a completion notification (set state and wake)
180 ///
181 /// Optimized: atomically swap waker and wake if exists
182 ///
183 /// 发送完成通知(设置状态并唤醒)
184 ///
185 /// 优化:原子交换 waker 并在存在时唤醒
186 #[inline]
187 pub(crate) fn send(&self, state: T) {
188 // Store state first with Release ordering
189 self.state.store(state.to_u8(), Ordering::Release);
190
191 // Atomically take the waker (if any) and wake it
192 let waker_ptr = self.waker.swap(ptr::null_mut(), Ordering::AcqRel);
193 if !waker_ptr.is_null() {
194 // SAFETY: We know this pointer was created by Box::into_raw in register_waker
195 // and we're the only one who can swap it out
196 unsafe {
197 let waker = Box::from_raw(waker_ptr);
198 waker.wake();
199 }
200 }
201 }
202
203 /// Register a waker to be notified on completion
204 ///
205 /// 注册一个 waker 以在完成时收到通知
206 #[inline]
207 fn register_waker(&self, waker: &Waker) {
208 // Create a boxed waker and convert to raw pointer
209 let new_waker = Box::into_raw(Box::new(waker.clone()));
210
211 // Atomically swap the old waker with the new one
212 let old_waker = self.waker.swap(new_waker, Ordering::AcqRel);
213
214 // Drop the old waker if it exists
215 if !old_waker.is_null() {
216 // SAFETY: This pointer was created by Box::into_raw
217 unsafe {
218 let _ = Box::from_raw(old_waker);
219 }
220 }
221 }
222}
223
224impl<T: State> Drop for Inner<T> {
225 fn drop(&mut self) {
226 // Clean up any remaining waker
227 let waker_ptr = self.waker.load(Ordering::Acquire);
228 if !waker_ptr.is_null() {
229 // SAFETY: This pointer was created by Box::into_raw and we're dropping
230 unsafe {
231 let _ = Box::from_raw(waker_ptr);
232 }
233 }
234 }
235}
236
237/// Completion notifier for one-shot tasks
238///
239/// Optimized implementation using AtomicPtr<Waker> + AtomicU8:
240/// - Faster creation than Notify (no waitlist allocation) or AtomicWaker (no state machine)
241/// - Lower memory footprint (just two atomic fields)
242/// - Direct waker management without intermediate futures
243/// - Single Arc allocation for both sender and receiver
244///
245/// 一次性任务完成通知器
246///
247/// 使用 AtomicPtr<Waker> + AtomicU8 的优化实现:
248/// - 比 Notify(无等待列表分配)或 AtomicWaker(无状态机)创建更快
249/// - 更小的内存占用(仅两个原子字段)
250/// - 直接管理 waker,无中间 future
251/// - 发送器和接收器共享单个 Arc 分配
252pub struct Sender<T: State = TaskCompletion> {
253 inner: Arc<Inner<T>>,
254}
255
256impl<T: State> Sender<T> {
257 /// Create a new oneshot completion notifier with receiver
258 ///
259 /// 创建一个新的 oneshot 完成通知器和接收器
260 ///
261 /// # Returns
262 /// Returns a tuple of (notifier, receiver)
263 ///
264 /// 返回 (通知器, 接收器) 元组
265 #[inline]
266 pub fn new() -> (Self, Receiver<T>) {
267 let inner = Inner::new();
268
269 let notifier = Sender {
270 inner: inner.clone(),
271 };
272 let receiver = Receiver {
273 inner,
274 };
275
276 (notifier, receiver)
277 }
278
279 /// Notify completion with the given state
280 ///
281 /// 使用给定状态通知完成
282 #[inline]
283 pub fn notify(&self, state: T) {
284 self.inner.send(state);
285 }
286}
287
288/// Completion receiver for one-shot tasks
289///
290/// Implements `Future` directly, allowing direct `.await` usage on both owned values and mutable references
291///
292/// 一次性任务完成通知接收器
293///
294/// 直接实现了 `Future`,允许对拥有的值和可变引用都直接使用 `.await`
295///
296/// # Examples
297///
298/// ## Using default TaskCompletion type
299///
300/// ```rust,ignore
301/// let (notifier, receiver) = Sender::new();
302///
303/// tokio::spawn(async move {
304/// notifier.notify(TaskCompletion::Called);
305/// });
306///
307/// // Two equivalent ways to await:
308/// let result = receiver.await; // Direct await via Future impl
309/// // or
310/// let result = receiver.wait().await; // Explicit wait method
311/// ```
312///
313/// ## Awaiting on mutable reference
314///
315/// ```rust,ignore
316/// let (notifier, mut receiver) = Sender::new();
317///
318/// tokio::spawn(async move {
319/// notifier.notify(TaskCompletion::Called);
320/// });
321///
322/// // Can also await on &mut receiver
323/// let result = (&mut receiver).await;
324/// ```
325///
326/// ## Using unit type for simple completion
327///
328/// ```rust,ignore
329/// let (notifier, receiver) = Sender::<()>::new();
330///
331/// tokio::spawn(async move {
332/// // ... do work ...
333/// notifier.notify(()); // Signal completion
334/// });
335///
336/// receiver.await; // Wait for completion
337/// ```
338///
339/// ## Using custom state
340///
341/// ```rust,ignore
342/// let (notifier, receiver) = Sender::<CustomState>::new();
343///
344/// tokio::spawn(async move {
345/// notifier.notify(CustomState::Success);
346/// });
347///
348/// match receiver.await {
349/// CustomState::Success => println!("Success!"),
350/// CustomState::Failure => println!("Failed"),
351/// CustomState::Timeout => println!("Timed out"),
352/// }
353/// ```
354pub struct Receiver<T: State = TaskCompletion> {
355 inner: Arc<Inner<T>>,
356}
357
358// Receiver is Unpin because all its fields are Unpin
359impl<T: State> Unpin for Receiver<T> {}
360
361impl<T: State> Receiver<T> {
362 /// Wait for task completion asynchronously
363 ///
364 /// This is equivalent to using `.await` directly on the receiver
365 ///
366 /// 异步等待任务完成
367 ///
368 /// 这等同于直接在 receiver 上使用 `.await`
369 ///
370 /// # Returns
371 /// Returns the completion state
372 ///
373 /// # 返回值
374 /// 返回完成状态
375 #[inline]
376 pub async fn wait(self) -> T {
377 self.await
378 }
379}
380
381/// Direct Future implementation for Receiver
382///
383/// This allows both `receiver.await` and `(&mut receiver).await` to work
384///
385/// Optimized implementation:
386/// - Fast path: Immediate return if already completed (no allocation)
387/// - Slow path: Direct waker registration (simpler than Notify's waitlist)
388/// - No intermediate future state needed
389///
390/// 为 Receiver 直接实现 Future
391///
392/// 这允许 `receiver.await` 和 `(&mut receiver).await` 都能工作
393///
394/// 优化实现:
395/// - 快速路径:如已完成则立即返回(无分配)
396/// - 慢速路径:直接注册 waker(比 Notify 的等待列表更简单)
397/// - 无需中间 future 状态
398impl<T: State> Future for Receiver<T> {
399 type Output = T;
400
401 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
402 // SAFETY: Receiver is Unpin, so we can safely get a mutable reference
403 let this = self.get_mut();
404
405 // Fast path: check if already completed
406 let current = this.inner.state.load(Ordering::Acquire);
407 if let Some(state) = T::from_u8(current) {
408 if current != T::pending_value() {
409 return Poll::Ready(state);
410 }
411 }
412
413 // Slow path: register waker for notification
414 // This is much lighter than Notify's waitlist management
415 this.inner.register_waker(cx.waker());
416
417 // Check again after registering waker to avoid race condition
418 // The sender might have completed between our first check and waker registration
419 let current = this.inner.state.load(Ordering::Acquire);
420 if let Some(state) = T::from_u8(current) {
421 if current != T::pending_value() {
422 return Poll::Ready(state);
423 }
424 }
425
426 Poll::Pending
427 }
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433
434 #[tokio::test]
435 async fn test_oneshot_called() {
436 let (notifier, receiver) = Sender::new();
437
438 tokio::spawn(async move {
439 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
440 notifier.notify(TaskCompletion::Called);
441 });
442
443 let result = receiver.wait().await;
444 assert_eq!(result, TaskCompletion::Called);
445 }
446
447 #[tokio::test]
448 async fn test_oneshot_cancelled() {
449 let (notifier, receiver) = Sender::new();
450
451 tokio::spawn(async move {
452 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
453 notifier.notify(TaskCompletion::Cancelled);
454 });
455
456 let result = receiver.wait().await;
457 assert_eq!(result, TaskCompletion::Cancelled);
458 }
459
460 #[tokio::test]
461 async fn test_oneshot_immediate_called() {
462 let (notifier, receiver) = Sender::new();
463
464 // Notify before waiting (fast path)
465 notifier.notify(TaskCompletion::Called);
466
467 let result = receiver.wait().await;
468 assert_eq!(result, TaskCompletion::Called);
469 }
470
471 #[tokio::test]
472 async fn test_oneshot_immediate_cancelled() {
473 let (notifier, receiver) = Sender::new();
474
475 // Notify before waiting (fast path)
476 notifier.notify(TaskCompletion::Cancelled);
477
478 let result = receiver.wait().await;
479 assert_eq!(result, TaskCompletion::Cancelled);
480 }
481
482 // Test with custom state type
483 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
484 enum CustomState {
485 Success,
486 Failure,
487 Timeout,
488 }
489
490 impl State for CustomState {
491 fn to_u8(&self) -> u8 {
492 match self {
493 CustomState::Success => 1,
494 CustomState::Failure => 2,
495 CustomState::Timeout => 3,
496 }
497 }
498
499 fn from_u8(value: u8) -> Option<Self> {
500 match value {
501 1 => Some(CustomState::Success),
502 2 => Some(CustomState::Failure),
503 3 => Some(CustomState::Timeout),
504 _ => None,
505 }
506 }
507
508 fn pending_value() -> u8 {
509 0
510 }
511 }
512
513 #[tokio::test]
514 async fn test_oneshot_custom_state() {
515 let (notifier, receiver) = Sender::<CustomState>::new();
516
517 tokio::spawn(async move {
518 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
519 notifier.notify(CustomState::Success);
520 });
521
522 let result = receiver.wait().await;
523 assert_eq!(result, CustomState::Success);
524 }
525
526 #[tokio::test]
527 async fn test_oneshot_custom_state_timeout() {
528 let (notifier, receiver) = Sender::<CustomState>::new();
529
530 // Immediate notification
531 notifier.notify(CustomState::Timeout);
532
533 let result = receiver.wait().await;
534 assert_eq!(result, CustomState::Timeout);
535 }
536
537 #[tokio::test]
538 async fn test_oneshot_unit_type() {
539 let (notifier, receiver) = Sender::<()>::new();
540
541 tokio::spawn(async move {
542 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
543 notifier.notify(());
544 });
545
546 let result = receiver.wait().await;
547 assert_eq!(result, ());
548 }
549
550 #[tokio::test]
551 async fn test_oneshot_unit_type_immediate() {
552 let (notifier, receiver) = Sender::<()>::new();
553
554 // Immediate notification (fast path)
555 notifier.notify(());
556
557 let result = receiver.wait().await;
558 assert_eq!(result, ());
559 }
560
561 // Tests for IntoFuture implementation
562 #[tokio::test]
563 async fn test_oneshot_into_future_called() {
564 let (notifier, receiver) = Sender::new();
565
566 tokio::spawn(async move {
567 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
568 notifier.notify(TaskCompletion::Called);
569 });
570
571 // Direct await without .wait()
572 let result = receiver.await;
573 assert_eq!(result, TaskCompletion::Called);
574 }
575
576 #[tokio::test]
577 async fn test_oneshot_into_future_immediate() {
578 let (notifier, receiver) = Sender::new();
579
580 // Notify before awaiting (fast path)
581 notifier.notify(TaskCompletion::Cancelled);
582
583 // Direct await
584 let result = receiver.await;
585 assert_eq!(result, TaskCompletion::Cancelled);
586 }
587
588 #[tokio::test]
589 async fn test_oneshot_into_future_unit_type() {
590 let (notifier, receiver) = Sender::<()>::new();
591
592 tokio::spawn(async move {
593 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
594 notifier.notify(());
595 });
596
597 // Direct await with unit type
598 let result = receiver.await;
599 assert_eq!(result, ());
600 }
601
602 #[tokio::test]
603 async fn test_oneshot_into_future_custom_state() {
604 let (notifier, receiver) = Sender::<CustomState>::new();
605
606 tokio::spawn(async move {
607 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
608 notifier.notify(CustomState::Failure);
609 });
610
611 // Direct await with custom state
612 let result = receiver.await;
613 assert_eq!(result, CustomState::Failure);
614 }
615
616 // Test awaiting on &mut receiver
617 #[tokio::test]
618 async fn test_oneshot_await_mut_reference() {
619 let (notifier, mut receiver) = Sender::new();
620
621 tokio::spawn(async move {
622 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
623 notifier.notify(TaskCompletion::Called);
624 });
625
626 // Await on mutable reference
627 let result = (&mut receiver).await;
628 assert_eq!(result, TaskCompletion::Called);
629 }
630
631 #[tokio::test]
632 async fn test_oneshot_await_mut_reference_unit_type() {
633 let (notifier, mut receiver) = Sender::<()>::new();
634
635 // Immediate notification
636 notifier.notify(());
637
638 // Await on mutable reference (fast path)
639 let result = (&mut receiver).await;
640 assert_eq!(result, ());
641 }
642}
643