async_foundation/common/
ready_observable.rs

1use crate::common::ready_future::ReadyFuture;
2
3/// A simple observable that lets multiple waiters be notified when an event occurs.
4///
5/// `ReadyObservable` starts in a non-ready state; callers can obtain a
6/// [`ReadyFuture<()>`] via [`ReadyObservable::wait`]. When [`ReadyObservable::complete`]
7/// is called, all pending waiters are woken and subsequent calls to `wait` return
8/// an already-completed future.
9pub struct ReadyObservable {
10    ready: bool,
11    observers: Vec<ReadyFuture<()>>,
12}
13
14impl ReadyObservable {
15    pub fn complete(&mut self) {
16        self.ready = true;
17        for observer in self.observers.drain(..) {
18            observer.get_state().complete(());
19        }
20    }
21
22    pub fn is_ready(&self) -> bool {
23        self.ready
24    }
25
26    pub fn wait(&mut self) -> ReadyFuture<()> {
27        if self.is_ready() {
28            ReadyFuture::new_completed(())
29        } else {
30            let future = ReadyFuture::new();
31            self.observers.push(future.clone());
32            future
33        }
34    }
35}
36
37impl Default for ReadyObservable {
38    fn default() -> Self {
39        Self {
40            ready: false,
41            observers: Vec::with_capacity(50),
42        }
43    }
44}
45
46#[cfg(test)]
47mod tests {
48    use std::{thread, time::Duration};
49
50    use super::*;
51
52    #[test]
53    fn test_multiple_awaits_on_observable() {
54        let mut ready = ReadyObservable::default();
55        const COUNT: usize = 5;
56        let mut handles = vec![];
57
58        for _ in 0..COUNT {
59            let ready = ready.wait();
60            let handle =
61                thread::spawn(move || futures::executor::block_on(async move { ready.await }));
62            handles.push(handle);
63        }
64        assert_eq!(handles.len(), COUNT);
65        thread::sleep(Duration::from_millis(100));
66        ready.complete();
67
68        let mut counter = 0usize;
69        for handle in handles {
70            handle.join().expect("Thread panicked");
71            eprintln!("handler ready {counter}");
72            counter += 1;
73        }
74        assert_eq!(counter, COUNT);
75    }
76
77    #[test]
78    fn test_ready_observable_default() {
79        let ready = ReadyObservable::default();
80        assert!(!ready.is_ready());
81    }
82
83    #[test]
84    fn test_ready_observable_complete() {
85        let mut ready = ReadyObservable::default();
86        assert!(!ready.is_ready());
87        
88        ready.complete();
89        assert!(ready.is_ready());
90    }
91
92    #[test]
93    fn test_wait_when_ready() {
94        let mut ready = ReadyObservable::default();
95        ready.complete();
96        
97        let future = ready.wait();
98        let result = futures::executor::block_on(future);
99        assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
100    }
101
102    #[test]
103    fn test_wait_when_not_ready() {
104        let mut ready = ReadyObservable::default();
105        assert!(!ready.is_ready());
106        
107        let future = ready.wait();
108        assert!(!ready.is_ready()); // Should still not be ready
109        
110        let ready_clone = std::sync::Arc::new(std::sync::Mutex::new(ready));
111        let ready_for_complete = ready_clone.clone();
112        let handle = thread::spawn(move || {
113            thread::sleep(Duration::from_millis(50));
114            ready_for_complete.lock().unwrap().complete();
115        });
116        
117        let result = futures::executor::block_on(future);
118        assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
119        
120        handle.join().expect("Thread panicked");
121        assert!(ready_clone.lock().unwrap().is_ready());
122    }
123
124    #[test]
125    fn test_multiple_waits_before_complete() {
126        let mut ready = ReadyObservable::default();
127        
128        let future1 = ready.wait();
129        let future2 = ready.wait();
130        let future3 = ready.wait();
131        
132        assert!(!ready.is_ready());
133        
134        ready.complete();
135        assert!(ready.is_ready());
136        
137        let result1 = futures::executor::block_on(future1);
138        let result2 = futures::executor::block_on(future2);
139        let result3 = futures::executor::block_on(future3);
140        
141        assert!(matches!(result1, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
142        assert!(matches!(result2, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
143        assert!(matches!(result3, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
144    }
145
146    #[test]
147    fn test_wait_after_complete() {
148        let mut ready = ReadyObservable::default();
149        ready.complete();
150        assert!(ready.is_ready());
151        
152        let future1 = ready.wait();
153        let future2 = ready.wait();
154        let future3 = ready.wait();
155        
156        let result1 = futures::executor::block_on(future1);
157        let result2 = futures::executor::block_on(future2);
158        let result3 = futures::executor::block_on(future3);
159        
160        assert!(matches!(result1, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
161        assert!(matches!(result2, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
162        assert!(matches!(result3, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
163    }
164
165    #[test]
166    fn test_observers_cleared_after_complete() {
167        let mut ready = ReadyObservable::default();
168        
169        let _future1 = ready.wait();
170        let _future2 = ready.wait();
171        
172        ready.complete();
173        
174        let future3 = ready.wait();
175        let result = futures::executor::block_on(future3);
176        assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
177    }
178
179    #[test]
180    fn test_concurrent_waits_and_complete() {
181        let ready = ReadyObservable::default();
182        let ready_arc = std::sync::Arc::new(std::sync::Mutex::new(ready));
183        let mut handles = vec![];
184        
185        for i in 0..10 {
186            let ready_clone = ready_arc.clone();
187            let handle = thread::spawn(move || {
188                let future = ready_clone.lock().unwrap().wait();
189                let result = futures::executor::block_on(future);
190                (i, result)
191            });
192            handles.push(handle);
193        }
194        
195        thread::sleep(Duration::from_millis(100));
196        ready_arc.lock().unwrap().complete();
197        
198        for handle in handles {
199            let (id, result) = handle.join().expect("Thread panicked");
200            assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
201            println!("Thread {} completed", id);
202        }
203    }
204}
205
206#[cfg(test)]
207#[allow(dead_code)]
208mod web_tests {
209    use super::*;
210    use futures::join;
211    use wasm_bindgen_test::wasm_bindgen_test;
212
213    #[wasm_bindgen_test]
214    async fn test_web_multiple_simultaneous_awaits() {
215        wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
216        multiple_simultaneous_awaits().await;
217    }
218
219    #[wasm_bindgen_test]
220    async fn test_node_multiple_simultaneous_awaits() {
221        wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
222        multiple_simultaneous_awaits().await;
223    }
224
225    async fn multiple_simultaneous_awaits() {
226        let mut ready = ReadyObservable::default();
227        let ready_clone1 = ready.wait();
228        let ready_clone2 = ready.wait();
229        let ready_clone3 = ready.wait();
230
231        let fut1 = async {
232            ready_clone1.await;
233        };
234        let fut2 = async {
235            ready_clone2.await;
236        };
237        let fut3 = async {
238            ready_clone3.await;
239        };
240
241        ready.complete();
242        assert_eq!(join!(fut1, fut2, fut3), ((), (), ()));
243    }
244}