kestrel_timer/utils/
notify.rs

1/// Lightweight single-waiter notification primitive
2/// 
3/// Optimized for SPSC (Single Producer Single Consumer) pattern where
4/// only one task waits at a time. Much lighter than tokio::sync::Notify.
5/// 
6/// 轻量级单等待者通知原语
7/// 
8/// 为 SPSC(单生产者单消费者)模式优化,其中每次只有一个任务等待。
9/// 比 tokio::sync::Notify 更轻量。
10
11use std::sync::atomic::{AtomicU8, Ordering};
12use std::future::Future;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16use super::atomic_waker::AtomicWaker;
17
18// States for the notification
19const EMPTY: u8 = 0;      // No waiter, no notification
20const WAITING: u8 = 1;    // Waiter registered
21const NOTIFIED: u8 = 2;   // Notification sent
22
23/// Lightweight single-waiter notifier optimized for SPSC pattern
24/// 
25/// Much lighter than tokio::sync::Notify:
26/// - No waitlist allocation (just AtomicWaker + one atomic state)
27/// - Reuses common AtomicWaker implementation (no Box allocation)
28/// - Faster creation and notification
29/// - Handles notification-before-wait correctly
30/// 
31/// 为 SPSC 模式优化的轻量级单等待者通知器
32/// 
33/// 比 tokio::sync::Notify 更轻量:
34/// - 无需等待列表分配(仅 AtomicWaker + 一个原子状态)
35/// - 复用通用的 AtomicWaker 实现(无 Box 分配)
36/// - 更快的创建和通知速度
37/// - 正确处理通知先于等待的情况
38pub struct SingleWaiterNotify {
39    state: AtomicU8,
40    waker: AtomicWaker,
41}
42
43impl SingleWaiterNotify {
44    /// Create a new single-waiter notifier
45    /// 
46    /// 创建一个新的单等待者通知器
47    #[inline]
48    pub fn new() -> Self {
49        Self {
50            state: AtomicU8::new(EMPTY),
51            waker: AtomicWaker::new(),
52        }
53    }
54    
55    /// Returns a future that completes when notified
56    /// 
57    /// 返回一个在收到通知时完成的 future
58    #[inline]
59    pub fn notified(&self) -> Notified<'_> {
60        Notified { 
61            notify: self,
62            registered: false,
63        }
64    }
65    
66    /// Wake the waiting task (if any)
67    /// 
68    /// If called before wait, the next wait will complete immediately.
69    /// 
70    /// 唤醒等待的任务(如果有)
71    /// 
72    /// 如果在等待之前调用,下一次等待将立即完成。
73    #[inline]
74    pub fn notify_one(&self) {
75        // Mark as notified
76        let prev_state = self.state.swap(NOTIFIED, Ordering::AcqRel);
77        
78        // If there was a waiter, wake it
79        if prev_state == WAITING {
80            self.waker.wake();
81        }
82    }
83    
84    /// Register a waker to be notified
85    /// 
86    /// Returns true if already notified (fast path)
87    /// 
88    /// 注册一个 waker 以接收通知
89    /// 
90    /// 如果已经被通知则返回 true(快速路径)
91    #[inline]
92    fn register_waker(&self, waker: &std::task::Waker) -> bool {
93        // Try to transition from EMPTY to WAITING
94        match self.state.compare_exchange(
95            EMPTY,
96            WAITING,
97            Ordering::AcqRel,
98            Ordering::Acquire,
99        ) {
100            Ok(_) => {
101                // Successfully transitioned, store the waker
102                self.waker.register(waker);
103                false // Not notified yet
104            }
105            Err(state) => {
106                // Already notified or waiting
107                if state == NOTIFIED {
108                    // Reset to EMPTY for next wait
109                    self.state.store(EMPTY, Ordering::Release);
110                    true // Already notified
111                } else {
112                    // State is WAITING, update the waker
113                    self.waker.register(waker);
114                    
115                    // Check if notified while we were updating waker
116                    if self.state.load(Ordering::Acquire) == NOTIFIED {
117                        self.state.store(EMPTY, Ordering::Release);
118                        true
119                    } else {
120                        false
121                    }
122                }
123            }
124        }
125    }
126}
127
128// Drop is automatically handled by AtomicWaker's drop implementation
129// No need for explicit drop implementation
130//
131// Drop 由 AtomicWaker 的 drop 实现自动处理
132// 无需显式的 drop 实现
133
134/// Future returned by `SingleWaiterNotify::notified()`
135/// 
136/// `SingleWaiterNotify::notified()` 返回的 Future
137pub struct Notified<'a> {
138    notify: &'a SingleWaiterNotify,
139    registered: bool,
140}
141
142impl Future for Notified<'_> {
143    type Output = ();
144    
145    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
146        // On first poll, register the waker
147        if !self.registered {
148            self.registered = true;
149            if self.notify.register_waker(cx.waker()) {
150                // Already notified (fast path)
151                return Poll::Ready(());
152            }
153        } else {
154            // On subsequent polls, check if notified
155            if self.notify.state.load(Ordering::Acquire) == NOTIFIED {
156                self.notify.state.store(EMPTY, Ordering::Release);
157                return Poll::Ready(());
158            }
159            // Update waker in case it changed
160            self.notify.register_waker(cx.waker());
161        }
162        
163        Poll::Pending
164    }
165}
166
167impl Drop for Notified<'_> {
168    fn drop(&mut self) {
169        if self.registered {
170            // If we registered but are being dropped, try to clean up
171            // PERFORMANCE: Direct compare_exchange without pre-check:
172            // - Single atomic operation instead of two (load + compare_exchange)
173            // - Relaxed ordering is sufficient - just cleaning up state
174            // - CAS will fail harmlessly if state is not WAITING
175            //
176            // 如果我们注册了但正在被 drop,尝试清理
177            // 性能优化:直接 compare_exchange 无需预检查:
178            // - 单次原子操作而不是两次(load + compare_exchange)
179            // - Relaxed ordering 就足够了 - 只是清理状态
180            // - 如果状态不是 WAITING,CAS 会无害地失败
181            let _ = self.notify.state.compare_exchange(
182                WAITING,
183                EMPTY,
184                Ordering::Relaxed,
185                Ordering::Relaxed,
186            );
187        }
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use std::sync::Arc;
195    use tokio::time::{sleep, Duration};
196
197    #[tokio::test]
198    async fn test_notify_before_wait() {
199        let notify = Arc::new(SingleWaiterNotify::new());
200        
201        // Notify before waiting
202        notify.notify_one();
203        
204        // Should complete immediately
205        notify.notified().await;
206    }
207
208    #[tokio::test]
209    async fn test_notify_after_wait() {
210        let notify = Arc::new(SingleWaiterNotify::new());
211        let notify_clone = notify.clone();
212        
213        // Spawn a task that notifies after a delay
214        tokio::spawn(async move {
215            sleep(Duration::from_millis(10)).await;
216            notify_clone.notify_one();
217        });
218        
219        // Wait for notification
220        notify.notified().await;
221    }
222
223    #[tokio::test]
224    async fn test_multiple_notify_cycles() {
225        let notify = Arc::new(SingleWaiterNotify::new());
226        
227        for _ in 0..10 {
228            let notify_clone = notify.clone();
229            tokio::spawn(async move {
230                sleep(Duration::from_millis(5)).await;
231                notify_clone.notify_one();
232            });
233            
234            notify.notified().await;
235        }
236    }
237
238    #[tokio::test]
239    async fn test_concurrent_notify() {
240        let notify = Arc::new(SingleWaiterNotify::new());
241        let notify_clone = notify.clone();
242        
243        // Multiple notifiers (only one should wake the waiter)
244        for _ in 0..5 {
245            let n = notify_clone.clone();
246            tokio::spawn(async move {
247                sleep(Duration::from_millis(10)).await;
248                n.notify_one();
249            });
250        }
251        
252        notify.notified().await;
253    }
254
255    #[tokio::test]
256    async fn test_notify_no_waiter() {
257        let notify = SingleWaiterNotify::new();
258        
259        // Notify with no waiter should not panic
260        notify.notify_one();
261        notify.notify_one();
262        
263        // Next wait should complete immediately
264        notify.notified().await;
265    }
266
267    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
268    async fn test_stress_test() {
269        let notify = Arc::new(SingleWaiterNotify::new());
270        
271        for i in 0..100 {
272            let notify_clone = notify.clone();
273            tokio::spawn(async move {
274                sleep(Duration::from_micros(i % 10)).await;
275                notify_clone.notify_one();
276            });
277            
278            notify.notified().await;
279        }
280    }
281
282    #[tokio::test]
283    async fn test_immediate_notification_race() {
284        // Test the race between notification and registration
285        for _ in 0..100 {
286            let notify = Arc::new(SingleWaiterNotify::new());
287            let notify_clone = notify.clone();
288            
289            let waiter = tokio::spawn(async move {
290                notify.notified().await;
291            });
292            
293            // Notify immediately (might happen before or after registration)
294            notify_clone.notify_one();
295            
296            // Should complete without timeout
297            tokio::time::timeout(Duration::from_millis(100), waiter)
298                .await
299                .expect("Should not timeout")
300                .expect("Task should complete");
301        }
302    }
303}
304