lite_sync/notify.rs
1/// Lightweight single-waiter notification primitive
2///
3/// Optimized for SPSC (Single Producer Single Consumer) pattern where
4/// only one task waits at a time. Much lighter than tokio::sync::Notify.
5///
6/// 轻量级单等待者通知原语
7///
8/// 为 SPSC(单生产者单消费者)模式优化,其中每次只有一个任务等待。
9/// 比 tokio::sync::Notify 更轻量。
10
11use std::sync::atomic::{AtomicU8, Ordering};
12use std::future::Future;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16use super::atomic_waker::AtomicWaker;
17
18// States for the notification
19const EMPTY: u8 = 0; // No waiter, no notification
20const WAITING: u8 = 1; // Waiter registered
21const NOTIFIED: u8 = 2; // Notification sent
22
23/// Lightweight single-waiter notifier optimized for SPSC pattern
24///
25/// 为 SPSC 模式优化的轻量级单等待者通知器
26pub struct SingleWaiterNotify {
27 state: AtomicU8,
28 waker: AtomicWaker,
29}
30
31impl SingleWaiterNotify {
32 /// Create a new single-waiter notifier
33 ///
34 /// 创建一个新的单等待者通知器
35 #[inline]
36 pub fn new() -> Self {
37 Self {
38 state: AtomicU8::new(EMPTY),
39 waker: AtomicWaker::new(),
40 }
41 }
42
43 /// Returns a future that completes when notified
44 ///
45 /// 返回一个在收到通知时完成的 future
46 #[inline]
47 pub fn notified(&self) -> Notified<'_> {
48 Notified {
49 notify: self,
50 registered: false,
51 }
52 }
53
54 /// Wake the waiting task (if any)
55 ///
56 /// If called before wait, the next wait will complete immediately.
57 ///
58 /// 唤醒等待的任务(如果有)
59 ///
60 /// 如果在等待之前调用,下一次等待将立即完成。
61 #[inline]
62 pub fn notify_one(&self) {
63 // Mark as notified
64 let prev_state = self.state.swap(NOTIFIED, Ordering::AcqRel);
65
66 // If there was a waiter, wake it
67 if prev_state == WAITING {
68 self.waker.wake();
69 }
70 }
71
72 /// Register a waker to be notified
73 ///
74 /// Returns true if already notified (fast path)
75 ///
76 /// 注册一个 waker 以接收通知
77 ///
78 /// 如果已经被通知则返回 true(快速路径)
79 #[inline]
80 fn register_waker(&self, waker: &std::task::Waker) -> bool {
81 // Try to transition from EMPTY to WAITING
82 match self.state.compare_exchange(
83 EMPTY,
84 WAITING,
85 Ordering::AcqRel,
86 Ordering::Acquire,
87 ) {
88 Ok(_) => {
89 // Successfully transitioned, store the waker
90 self.waker.register(waker);
91 false // Not notified yet
92 }
93 Err(state) => {
94 // Already notified or waiting
95 if state == NOTIFIED {
96 // Reset to EMPTY for next wait
97 self.state.store(EMPTY, Ordering::Release);
98 true // Already notified
99 } else {
100 // State is WAITING, update the waker
101 self.waker.register(waker);
102
103 // Check if notified while we were updating waker
104 if self.state.load(Ordering::Acquire) == NOTIFIED {
105 self.state.store(EMPTY, Ordering::Release);
106 true
107 } else {
108 false
109 }
110 }
111 }
112 }
113 }
114}
115
116// Drop is automatically handled by AtomicWaker's drop implementation
117// No need for explicit drop implementation
118//
119// Drop 由 AtomicWaker 的 drop 实现自动处理
120// 无需显式的 drop 实现
121
122/// Future returned by `SingleWaiterNotify::notified()`
123///
124/// `SingleWaiterNotify::notified()` 返回的 Future
125pub struct Notified<'a> {
126 notify: &'a SingleWaiterNotify,
127 registered: bool,
128}
129
130impl Future for Notified<'_> {
131 type Output = ();
132
133 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
134 // On first poll, register the waker
135 if !self.registered {
136 self.registered = true;
137 if self.notify.register_waker(cx.waker()) {
138 // Already notified (fast path)
139 return Poll::Ready(());
140 }
141 } else {
142 // On subsequent polls, check if notified
143 if self.notify.state.load(Ordering::Acquire) == NOTIFIED {
144 self.notify.state.store(EMPTY, Ordering::Release);
145 return Poll::Ready(());
146 }
147 // Update waker in case it changed
148 self.notify.register_waker(cx.waker());
149 }
150
151 Poll::Pending
152 }
153}
154
155impl Drop for Notified<'_> {
156 fn drop(&mut self) {
157 if self.registered {
158 // If we registered but are being dropped, try to clean up
159 // PERFORMANCE: Direct compare_exchange without pre-check:
160 // - Single atomic operation instead of two (load + compare_exchange)
161 // - Relaxed ordering is sufficient - just cleaning up state
162 // - CAS will fail harmlessly if state is not WAITING
163 //
164 // 如果我们注册了但正在被 drop,尝试清理
165 // 性能优化:直接 compare_exchange 无需预检查:
166 // - 单次原子操作而不是两次(load + compare_exchange)
167 // - Relaxed ordering 就足够了 - 只是清理状态
168 // - 如果状态不是 WAITING,CAS 会无害地失败
169 let _ = self.notify.state.compare_exchange(
170 WAITING,
171 EMPTY,
172 Ordering::Relaxed,
173 Ordering::Relaxed,
174 );
175 }
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use std::sync::Arc;
183 use tokio::time::{sleep, Duration};
184
185 #[tokio::test]
186 async fn test_notify_before_wait() {
187 let notify = Arc::new(SingleWaiterNotify::new());
188
189 // Notify before waiting
190 notify.notify_one();
191
192 // Should complete immediately
193 notify.notified().await;
194 }
195
196 #[tokio::test]
197 async fn test_notify_after_wait() {
198 let notify = Arc::new(SingleWaiterNotify::new());
199 let notify_clone = notify.clone();
200
201 // Spawn a task that notifies after a delay
202 tokio::spawn(async move {
203 sleep(Duration::from_millis(10)).await;
204 notify_clone.notify_one();
205 });
206
207 // Wait for notification
208 notify.notified().await;
209 }
210
211 #[tokio::test]
212 async fn test_multiple_notify_cycles() {
213 let notify = Arc::new(SingleWaiterNotify::new());
214
215 for _ in 0..10 {
216 let notify_clone = notify.clone();
217 tokio::spawn(async move {
218 sleep(Duration::from_millis(5)).await;
219 notify_clone.notify_one();
220 });
221
222 notify.notified().await;
223 }
224 }
225
226 #[tokio::test]
227 async fn test_concurrent_notify() {
228 let notify = Arc::new(SingleWaiterNotify::new());
229 let notify_clone = notify.clone();
230
231 // Multiple notifiers (only one should wake the waiter)
232 for _ in 0..5 {
233 let n = notify_clone.clone();
234 tokio::spawn(async move {
235 sleep(Duration::from_millis(10)).await;
236 n.notify_one();
237 });
238 }
239
240 notify.notified().await;
241 }
242
243 #[tokio::test]
244 async fn test_notify_no_waiter() {
245 let notify = SingleWaiterNotify::new();
246
247 // Notify with no waiter should not panic
248 notify.notify_one();
249 notify.notify_one();
250
251 // Next wait should complete immediately
252 notify.notified().await;
253 }
254
255 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
256 async fn test_stress_test() {
257 let notify = Arc::new(SingleWaiterNotify::new());
258
259 for i in 0..100 {
260 let notify_clone = notify.clone();
261 tokio::spawn(async move {
262 sleep(Duration::from_micros(i % 10)).await;
263 notify_clone.notify_one();
264 });
265
266 notify.notified().await;
267 }
268 }
269
270 #[tokio::test]
271 async fn test_immediate_notification_race() {
272 // Test the race between notification and registration
273 for _ in 0..100 {
274 let notify = Arc::new(SingleWaiterNotify::new());
275 let notify_clone = notify.clone();
276
277 let waiter = tokio::spawn(async move {
278 notify.notified().await;
279 });
280
281 // Notify immediately (might happen before or after registration)
282 notify_clone.notify_one();
283
284 // Should complete without timeout
285 tokio::time::timeout(Duration::from_millis(100), waiter)
286 .await
287 .expect("Should not timeout")
288 .expect("Task should complete");
289 }
290 }
291}
292