kestrel_timer/utils/notify.rs
1/// Lightweight single-waiter notification primitive
2///
3/// Optimized for SPSC (Single Producer Single Consumer) pattern where
4/// only one task waits at a time. Much lighter than tokio::sync::Notify.
5///
6/// 轻量级单等待者通知原语
7///
8/// 为 SPSC(单生产者单消费者)模式优化,其中每次只有一个任务等待。
9/// 比 tokio::sync::Notify 更轻量。
10
11use std::sync::atomic::{AtomicU8, Ordering};
12use std::future::Future;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16use super::atomic_waker::AtomicWaker;
17
18// States for the notification
19const EMPTY: u8 = 0; // No waiter, no notification
20const WAITING: u8 = 1; // Waiter registered
21const NOTIFIED: u8 = 2; // Notification sent
22
23/// Lightweight single-waiter notifier optimized for SPSC pattern
24///
25/// Much lighter than tokio::sync::Notify:
26/// - No waitlist allocation (just AtomicWaker + one atomic state)
27/// - Reuses common AtomicWaker implementation (no Box allocation)
28/// - Faster creation and notification
29/// - Handles notification-before-wait correctly
30///
31/// 为 SPSC 模式优化的轻量级单等待者通知器
32///
33/// 比 tokio::sync::Notify 更轻量:
34/// - 无需等待列表分配(仅 AtomicWaker + 一个原子状态)
35/// - 复用通用的 AtomicWaker 实现(无 Box 分配)
36/// - 更快的创建和通知速度
37/// - 正确处理通知先于等待的情况
38pub struct SingleWaiterNotify {
39 state: AtomicU8,
40 waker: AtomicWaker,
41}
42
43impl SingleWaiterNotify {
44 /// Create a new single-waiter notifier
45 ///
46 /// 创建一个新的单等待者通知器
47 #[inline]
48 pub fn new() -> Self {
49 Self {
50 state: AtomicU8::new(EMPTY),
51 waker: AtomicWaker::new(),
52 }
53 }
54
55 /// Returns a future that completes when notified
56 ///
57 /// 返回一个在收到通知时完成的 future
58 #[inline]
59 pub fn notified(&self) -> Notified<'_> {
60 Notified {
61 notify: self,
62 registered: false,
63 }
64 }
65
66 /// Wake the waiting task (if any)
67 ///
68 /// If called before wait, the next wait will complete immediately.
69 ///
70 /// 唤醒等待的任务(如果有)
71 ///
72 /// 如果在等待之前调用,下一次等待将立即完成。
73 #[inline]
74 pub fn notify_one(&self) {
75 // Mark as notified
76 let prev_state = self.state.swap(NOTIFIED, Ordering::AcqRel);
77
78 // If there was a waiter, wake it
79 if prev_state == WAITING {
80 self.waker.wake();
81 }
82 }
83
84 /// Register a waker to be notified
85 ///
86 /// Returns true if already notified (fast path)
87 ///
88 /// 注册一个 waker 以接收通知
89 ///
90 /// 如果已经被通知则返回 true(快速路径)
91 #[inline]
92 fn register_waker(&self, waker: &std::task::Waker) -> bool {
93 // Try to transition from EMPTY to WAITING
94 match self.state.compare_exchange(
95 EMPTY,
96 WAITING,
97 Ordering::AcqRel,
98 Ordering::Acquire,
99 ) {
100 Ok(_) => {
101 // Successfully transitioned, store the waker
102 self.waker.register(waker);
103 false // Not notified yet
104 }
105 Err(state) => {
106 // Already notified or waiting
107 if state == NOTIFIED {
108 // Reset to EMPTY for next wait
109 self.state.store(EMPTY, Ordering::Release);
110 true // Already notified
111 } else {
112 // State is WAITING, update the waker
113 self.waker.register(waker);
114
115 // Check if notified while we were updating waker
116 if self.state.load(Ordering::Acquire) == NOTIFIED {
117 self.state.store(EMPTY, Ordering::Release);
118 true
119 } else {
120 false
121 }
122 }
123 }
124 }
125 }
126}
127
128// Drop is automatically handled by AtomicWaker's drop implementation
129// No need for explicit drop implementation
130//
131// Drop 由 AtomicWaker 的 drop 实现自动处理
132// 无需显式的 drop 实现
133
134/// Future returned by `SingleWaiterNotify::notified()`
135///
136/// `SingleWaiterNotify::notified()` 返回的 Future
137pub struct Notified<'a> {
138 notify: &'a SingleWaiterNotify,
139 registered: bool,
140}
141
142impl Future for Notified<'_> {
143 type Output = ();
144
145 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
146 // On first poll, register the waker
147 if !self.registered {
148 self.registered = true;
149 if self.notify.register_waker(cx.waker()) {
150 // Already notified (fast path)
151 return Poll::Ready(());
152 }
153 } else {
154 // On subsequent polls, check if notified
155 if self.notify.state.load(Ordering::Acquire) == NOTIFIED {
156 self.notify.state.store(EMPTY, Ordering::Release);
157 return Poll::Ready(());
158 }
159 // Update waker in case it changed
160 self.notify.register_waker(cx.waker());
161 }
162
163 Poll::Pending
164 }
165}
166
167impl Drop for Notified<'_> {
168 fn drop(&mut self) {
169 if self.registered {
170 // If we registered but are being dropped, try to clean up
171 // PERFORMANCE: Direct compare_exchange without pre-check:
172 // - Single atomic operation instead of two (load + compare_exchange)
173 // - Relaxed ordering is sufficient - just cleaning up state
174 // - CAS will fail harmlessly if state is not WAITING
175 //
176 // 如果我们注册了但正在被 drop,尝试清理
177 // 性能优化:直接 compare_exchange 无需预检查:
178 // - 单次原子操作而不是两次(load + compare_exchange)
179 // - Relaxed ordering 就足够了 - 只是清理状态
180 // - 如果状态不是 WAITING,CAS 会无害地失败
181 let _ = self.notify.state.compare_exchange(
182 WAITING,
183 EMPTY,
184 Ordering::Relaxed,
185 Ordering::Relaxed,
186 );
187 }
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use std::sync::Arc;
195 use tokio::time::{sleep, Duration};
196
197 #[tokio::test]
198 async fn test_notify_before_wait() {
199 let notify = Arc::new(SingleWaiterNotify::new());
200
201 // Notify before waiting
202 notify.notify_one();
203
204 // Should complete immediately
205 notify.notified().await;
206 }
207
208 #[tokio::test]
209 async fn test_notify_after_wait() {
210 let notify = Arc::new(SingleWaiterNotify::new());
211 let notify_clone = notify.clone();
212
213 // Spawn a task that notifies after a delay
214 tokio::spawn(async move {
215 sleep(Duration::from_millis(10)).await;
216 notify_clone.notify_one();
217 });
218
219 // Wait for notification
220 notify.notified().await;
221 }
222
223 #[tokio::test]
224 async fn test_multiple_notify_cycles() {
225 let notify = Arc::new(SingleWaiterNotify::new());
226
227 for _ in 0..10 {
228 let notify_clone = notify.clone();
229 tokio::spawn(async move {
230 sleep(Duration::from_millis(5)).await;
231 notify_clone.notify_one();
232 });
233
234 notify.notified().await;
235 }
236 }
237
238 #[tokio::test]
239 async fn test_concurrent_notify() {
240 let notify = Arc::new(SingleWaiterNotify::new());
241 let notify_clone = notify.clone();
242
243 // Multiple notifiers (only one should wake the waiter)
244 for _ in 0..5 {
245 let n = notify_clone.clone();
246 tokio::spawn(async move {
247 sleep(Duration::from_millis(10)).await;
248 n.notify_one();
249 });
250 }
251
252 notify.notified().await;
253 }
254
255 #[tokio::test]
256 async fn test_notify_no_waiter() {
257 let notify = SingleWaiterNotify::new();
258
259 // Notify with no waiter should not panic
260 notify.notify_one();
261 notify.notify_one();
262
263 // Next wait should complete immediately
264 notify.notified().await;
265 }
266
267 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
268 async fn test_stress_test() {
269 let notify = Arc::new(SingleWaiterNotify::new());
270
271 for i in 0..100 {
272 let notify_clone = notify.clone();
273 tokio::spawn(async move {
274 sleep(Duration::from_micros(i % 10)).await;
275 notify_clone.notify_one();
276 });
277
278 notify.notified().await;
279 }
280 }
281
282 #[tokio::test]
283 async fn test_immediate_notification_race() {
284 // Test the race between notification and registration
285 for _ in 0..100 {
286 let notify = Arc::new(SingleWaiterNotify::new());
287 let notify_clone = notify.clone();
288
289 let waiter = tokio::spawn(async move {
290 notify.notified().await;
291 });
292
293 // Notify immediately (might happen before or after registration)
294 notify_clone.notify_one();
295
296 // Should complete without timeout
297 tokio::time::timeout(Duration::from_millis(100), waiter)
298 .await
299 .expect("Should not timeout")
300 .expect("Task should complete");
301 }
302 }
303}
304