async_foundation/common/
ready_observable.rs

1use crate::common::ready_future::ReadyFuture;
2
3pub struct ReadyObservable {
4    ready: bool,
5    observers: Vec<ReadyFuture<()>>,
6}
7
8impl ReadyObservable {
9    pub fn complete(&mut self) {
10        self.ready = true;
11        for observer in self.observers.drain(..) {
12            observer.get_state().complete(());
13        }
14    }
15
16    pub fn is_ready(&self) -> bool {
17        self.ready
18    }
19
20    pub fn wait(&mut self) -> ReadyFuture<()> {
21        if self.is_ready() {
22            ReadyFuture::new_completed(())
23        } else {
24            let future = ReadyFuture::new();
25            self.observers.push(future.clone());
26            future
27        }
28    }
29}
30
31impl Default for ReadyObservable {
32    fn default() -> Self {
33        Self {
34            ready: false,
35            observers: Vec::with_capacity(50),
36        }
37    }
38}
39
40#[cfg(test)]
41mod tests {
42    use std::{thread, time::Duration};
43
44    use super::*;
45
46    #[test]
47    fn test_multiple_awaits_on_observable() {
48        let mut ready = ReadyObservable::default();
49        const COUNT: usize = 5;
50        let mut handles = vec![];
51
52        for _ in 0..COUNT {
53            let ready = ready.wait();
54            let handle =
55                thread::spawn(move || futures::executor::block_on(async move { ready.await }));
56            handles.push(handle);
57        }
58        assert_eq!(handles.len(), COUNT);
59        thread::sleep(Duration::from_millis(100));
60        ready.complete();
61
62        let mut counter = 0usize;
63        for handle in handles {
64            handle.join().expect("Thread panicked");
65            eprintln!("handler ready {counter}");
66            counter += 1;
67        }
68        assert_eq!(counter, COUNT);
69    }
70
71    #[test]
72    fn test_ready_observable_default() {
73        let ready = ReadyObservable::default();
74        assert!(!ready.is_ready());
75    }
76
77    #[test]
78    fn test_ready_observable_complete() {
79        let mut ready = ReadyObservable::default();
80        assert!(!ready.is_ready());
81        
82        ready.complete();
83        assert!(ready.is_ready());
84    }
85
86    #[test]
87    fn test_wait_when_ready() {
88        let mut ready = ReadyObservable::default();
89        ready.complete();
90        
91        let future = ready.wait();
92        let result = futures::executor::block_on(future);
93        assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
94    }
95
96    #[test]
97    fn test_wait_when_not_ready() {
98        let mut ready = ReadyObservable::default();
99        assert!(!ready.is_ready());
100        
101        let future = ready.wait();
102        assert!(!ready.is_ready()); // Should still not be ready
103        
104        let ready_clone = std::sync::Arc::new(std::sync::Mutex::new(ready));
105        let ready_for_complete = ready_clone.clone();
106        let handle = thread::spawn(move || {
107            thread::sleep(Duration::from_millis(50));
108            ready_for_complete.lock().unwrap().complete();
109        });
110        
111        let result = futures::executor::block_on(future);
112        assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
113        
114        handle.join().expect("Thread panicked");
115        assert!(ready_clone.lock().unwrap().is_ready());
116    }
117
118    #[test]
119    fn test_multiple_waits_before_complete() {
120        let mut ready = ReadyObservable::default();
121        
122        let future1 = ready.wait();
123        let future2 = ready.wait();
124        let future3 = ready.wait();
125        
126        assert!(!ready.is_ready());
127        
128        ready.complete();
129        assert!(ready.is_ready());
130        
131        let result1 = futures::executor::block_on(future1);
132        let result2 = futures::executor::block_on(future2);
133        let result3 = futures::executor::block_on(future3);
134        
135        assert!(matches!(result1, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
136        assert!(matches!(result2, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
137        assert!(matches!(result3, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
138    }
139
140    #[test]
141    fn test_wait_after_complete() {
142        let mut ready = ReadyObservable::default();
143        ready.complete();
144        assert!(ready.is_ready());
145        
146        let future1 = ready.wait();
147        let future2 = ready.wait();
148        let future3 = ready.wait();
149        
150        let result1 = futures::executor::block_on(future1);
151        let result2 = futures::executor::block_on(future2);
152        let result3 = futures::executor::block_on(future3);
153        
154        assert!(matches!(result1, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
155        assert!(matches!(result2, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
156        assert!(matches!(result3, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
157    }
158
159    #[test]
160    fn test_observers_cleared_after_complete() {
161        let mut ready = ReadyObservable::default();
162        
163        let _future1 = ready.wait();
164        let _future2 = ready.wait();
165        
166        ready.complete();
167        
168        let future3 = ready.wait();
169        let result = futures::executor::block_on(future3);
170        assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
171    }
172
173    #[test]
174    fn test_concurrent_waits_and_complete() {
175        let ready = ReadyObservable::default();
176        let ready_arc = std::sync::Arc::new(std::sync::Mutex::new(ready));
177        let mut handles = vec![];
178        
179        for i in 0..10 {
180            let ready_clone = ready_arc.clone();
181            let handle = thread::spawn(move || {
182                let future = ready_clone.lock().unwrap().wait();
183                let result = futures::executor::block_on(future);
184                (i, result)
185            });
186            handles.push(handle);
187        }
188        
189        thread::sleep(Duration::from_millis(100));
190        ready_arc.lock().unwrap().complete();
191        
192        for handle in handles {
193            let (id, result) = handle.join().expect("Thread panicked");
194            assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
195            println!("Thread {} completed", id);
196        }
197    }
198}
199
200#[cfg(test)]
201#[allow(dead_code)]
202mod web_tests {
203    use super::*;
204    use futures::join;
205    use wasm_bindgen_test::wasm_bindgen_test;
206
207    #[wasm_bindgen_test]
208    async fn test_web_multiple_simultaneous_awaits() {
209        wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
210        multiple_simultaneous_awaits().await;
211    }
212
213    #[wasm_bindgen_test]
214    async fn test_node_multiple_simultaneous_awaits() {
215        wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
216        multiple_simultaneous_awaits().await;
217    }
218
219    async fn multiple_simultaneous_awaits() {
220        let mut ready = ReadyObservable::default();
221        let ready_clone1 = ready.wait();
222        let ready_clone2 = ready.wait();
223        let ready_clone3 = ready.wait();
224
225        let fut1 = async {
226            ready_clone1.await;
227        };
228        let fut2 = async {
229            ready_clone2.await;
230        };
231        let fut3 = async {
232            ready_clone3.await;
233        };
234
235        ready.complete();
236        assert_eq!(join!(fut1, fut2, fut3), ((), (), ()));
237    }
238}