lite_sync/
notify.rs

1/// Lightweight single-waiter notification primitive
2/// 
3/// Optimized for SPSC (Single Producer Single Consumer) pattern where
4/// only one task waits at a time. Much lighter than tokio::sync::Notify.
5/// 
6/// 轻量级单等待者通知原语
7/// 
8/// 为 SPSC(单生产者单消费者)模式优化,其中每次只有一个任务等待。
9/// 比 tokio::sync::Notify 更轻量。
10
11use std::sync::atomic::{AtomicU8, Ordering};
12use std::future::Future;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16use super::atomic_waker::AtomicWaker;
17
18// States for the notification
19const EMPTY: u8 = 0;      // No waiter, no notification
20const WAITING: u8 = 1;    // Waiter registered
21const NOTIFIED: u8 = 2;   // Notification sent
22
23/// Lightweight single-waiter notifier optimized for SPSC pattern
24/// 
25/// 为 SPSC 模式优化的轻量级单等待者通知器
26pub struct SingleWaiterNotify {
27    state: AtomicU8,
28    waker: AtomicWaker,
29}
30
31impl SingleWaiterNotify {
32    /// Create a new single-waiter notifier
33    /// 
34    /// 创建一个新的单等待者通知器
35    #[inline]
36    pub fn new() -> Self {
37        Self {
38            state: AtomicU8::new(EMPTY),
39            waker: AtomicWaker::new(),
40        }
41    }
42    
43    /// Returns a future that completes when notified
44    /// 
45    /// 返回一个在收到通知时完成的 future
46    #[inline]
47    pub fn notified(&self) -> Notified<'_> {
48        Notified { 
49            notify: self,
50            registered: false,
51        }
52    }
53    
54    /// Wake the waiting task (if any)
55    /// 
56    /// If called before wait, the next wait will complete immediately.
57    /// 
58    /// 唤醒等待的任务(如果有)
59    /// 
60    /// 如果在等待之前调用,下一次等待将立即完成。
61    #[inline]
62    pub fn notify_one(&self) {
63        // Mark as notified
64        let prev_state = self.state.swap(NOTIFIED, Ordering::AcqRel);
65        
66        // If there was a waiter, wake it
67        if prev_state == WAITING {
68            self.waker.wake();
69        }
70    }
71    
72    /// Register a waker to be notified
73    /// 
74    /// Returns true if already notified (fast path)
75    /// 
76    /// 注册一个 waker 以接收通知
77    /// 
78    /// 如果已经被通知则返回 true(快速路径)
79    #[inline]
80    fn register_waker(&self, waker: &std::task::Waker) -> bool {
81        // Try to transition from EMPTY to WAITING
82        match self.state.compare_exchange(
83            EMPTY,
84            WAITING,
85            Ordering::AcqRel,
86            Ordering::Acquire,
87        ) {
88            Ok(_) => {
89                // Successfully transitioned, store the waker
90                self.waker.register(waker);
91                false // Not notified yet
92            }
93            Err(state) => {
94                // Already notified or waiting
95                if state == NOTIFIED {
96                    // Reset to EMPTY for next wait
97                    self.state.store(EMPTY, Ordering::Release);
98                    true // Already notified
99                } else {
100                    // State is WAITING, update the waker
101                    self.waker.register(waker);
102                    
103                    // Check if notified while we were updating waker
104                    if self.state.load(Ordering::Acquire) == NOTIFIED {
105                        self.state.store(EMPTY, Ordering::Release);
106                        true
107                    } else {
108                        false
109                    }
110                }
111            }
112        }
113    }
114}
115
116// Drop is automatically handled by AtomicWaker's drop implementation
117// No need for explicit drop implementation
118//
119// Drop 由 AtomicWaker 的 drop 实现自动处理
120// 无需显式的 drop 实现
121
122/// Future returned by `SingleWaiterNotify::notified()`
123/// 
124/// `SingleWaiterNotify::notified()` 返回的 Future
125pub struct Notified<'a> {
126    notify: &'a SingleWaiterNotify,
127    registered: bool,
128}
129
130impl Future for Notified<'_> {
131    type Output = ();
132    
133    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
134        // On first poll, register the waker
135        if !self.registered {
136            self.registered = true;
137            if self.notify.register_waker(cx.waker()) {
138                // Already notified (fast path)
139                return Poll::Ready(());
140            }
141        } else {
142            // On subsequent polls, check if notified
143            if self.notify.state.load(Ordering::Acquire) == NOTIFIED {
144                self.notify.state.store(EMPTY, Ordering::Release);
145                return Poll::Ready(());
146            }
147            // Update waker in case it changed
148            self.notify.register_waker(cx.waker());
149        }
150        
151        Poll::Pending
152    }
153}
154
155impl Drop for Notified<'_> {
156    fn drop(&mut self) {
157        if self.registered {
158            // If we registered but are being dropped, try to clean up
159            // PERFORMANCE: Direct compare_exchange without pre-check:
160            // - Single atomic operation instead of two (load + compare_exchange)
161            // - Relaxed ordering is sufficient - just cleaning up state
162            // - CAS will fail harmlessly if state is not WAITING
163            //
164            // 如果我们注册了但正在被 drop,尝试清理
165            // 性能优化:直接 compare_exchange 无需预检查:
166            // - 单次原子操作而不是两次(load + compare_exchange)
167            // - Relaxed ordering 就足够了 - 只是清理状态
168            // - 如果状态不是 WAITING,CAS 会无害地失败
169            let _ = self.notify.state.compare_exchange(
170                WAITING,
171                EMPTY,
172                Ordering::Relaxed,
173                Ordering::Relaxed,
174            );
175        }
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use std::sync::Arc;
183    use tokio::time::{sleep, Duration};
184
185    #[tokio::test]
186    async fn test_notify_before_wait() {
187        let notify = Arc::new(SingleWaiterNotify::new());
188        
189        // Notify before waiting
190        notify.notify_one();
191        
192        // Should complete immediately
193        notify.notified().await;
194    }
195
196    #[tokio::test]
197    async fn test_notify_after_wait() {
198        let notify = Arc::new(SingleWaiterNotify::new());
199        let notify_clone = notify.clone();
200        
201        // Spawn a task that notifies after a delay
202        tokio::spawn(async move {
203            sleep(Duration::from_millis(10)).await;
204            notify_clone.notify_one();
205        });
206        
207        // Wait for notification
208        notify.notified().await;
209    }
210
211    #[tokio::test]
212    async fn test_multiple_notify_cycles() {
213        let notify = Arc::new(SingleWaiterNotify::new());
214        
215        for _ in 0..10 {
216            let notify_clone = notify.clone();
217            tokio::spawn(async move {
218                sleep(Duration::from_millis(5)).await;
219                notify_clone.notify_one();
220            });
221            
222            notify.notified().await;
223        }
224    }
225
226    #[tokio::test]
227    async fn test_concurrent_notify() {
228        let notify = Arc::new(SingleWaiterNotify::new());
229        let notify_clone = notify.clone();
230        
231        // Multiple notifiers (only one should wake the waiter)
232        for _ in 0..5 {
233            let n = notify_clone.clone();
234            tokio::spawn(async move {
235                sleep(Duration::from_millis(10)).await;
236                n.notify_one();
237            });
238        }
239        
240        notify.notified().await;
241    }
242
243    #[tokio::test]
244    async fn test_notify_no_waiter() {
245        let notify = SingleWaiterNotify::new();
246        
247        // Notify with no waiter should not panic
248        notify.notify_one();
249        notify.notify_one();
250        
251        // Next wait should complete immediately
252        notify.notified().await;
253    }
254
255    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
256    async fn test_stress_test() {
257        let notify = Arc::new(SingleWaiterNotify::new());
258        
259        for i in 0..100 {
260            let notify_clone = notify.clone();
261            tokio::spawn(async move {
262                sleep(Duration::from_micros(i % 10)).await;
263                notify_clone.notify_one();
264            });
265            
266            notify.notified().await;
267        }
268    }
269
270    #[tokio::test]
271    async fn test_immediate_notification_race() {
272        // Test the race between notification and registration
273        for _ in 0..100 {
274            let notify = Arc::new(SingleWaiterNotify::new());
275            let notify_clone = notify.clone();
276            
277            let waiter = tokio::spawn(async move {
278                notify.notified().await;
279            });
280            
281            // Notify immediately (might happen before or after registration)
282            notify_clone.notify_one();
283            
284            // Should complete without timeout
285            tokio::time::timeout(Duration::from_millis(100), waiter)
286                .await
287                .expect("Should not timeout")
288                .expect("Task should complete");
289        }
290    }
291}
292