async_foundation/common/
ready_observable.rs1use crate::common::ready_future::ReadyFuture;
2
3pub struct ReadyObservable {
10 ready: bool,
11 observers: Vec<ReadyFuture<()>>,
12}
13
14impl ReadyObservable {
15 pub fn complete(&mut self) {
16 self.ready = true;
17 for observer in self.observers.drain(..) {
18 observer.get_state().complete(());
19 }
20 }
21
22 pub fn is_ready(&self) -> bool {
23 self.ready
24 }
25
26 pub fn wait(&mut self) -> ReadyFuture<()> {
27 if self.is_ready() {
28 ReadyFuture::new_completed(())
29 } else {
30 let future = ReadyFuture::new();
31 self.observers.push(future.clone());
32 future
33 }
34 }
35}
36
37impl Default for ReadyObservable {
38 fn default() -> Self {
39 Self {
40 ready: false,
41 observers: Vec::with_capacity(50),
42 }
43 }
44}
45
46#[cfg(test)]
47mod tests {
48 use std::{thread, time::Duration};
49
50 use super::*;
51
52 #[test]
53 fn test_multiple_awaits_on_observable() {
54 let mut ready = ReadyObservable::default();
55 const COUNT: usize = 5;
56 let mut handles = vec![];
57
58 for _ in 0..COUNT {
59 let ready = ready.wait();
60 let handle =
61 thread::spawn(move || futures::executor::block_on(async move { ready.await }));
62 handles.push(handle);
63 }
64 assert_eq!(handles.len(), COUNT);
65 thread::sleep(Duration::from_millis(100));
66 ready.complete();
67
68 let mut counter = 0usize;
69 for handle in handles {
70 handle.join().expect("Thread panicked");
71 eprintln!("handler ready {counter}");
72 counter += 1;
73 }
74 assert_eq!(counter, COUNT);
75 }
76
77 #[test]
78 fn test_ready_observable_default() {
79 let ready = ReadyObservable::default();
80 assert!(!ready.is_ready());
81 }
82
83 #[test]
84 fn test_ready_observable_complete() {
85 let mut ready = ReadyObservable::default();
86 assert!(!ready.is_ready());
87
88 ready.complete();
89 assert!(ready.is_ready());
90 }
91
92 #[test]
93 fn test_wait_when_ready() {
94 let mut ready = ReadyObservable::default();
95 ready.complete();
96
97 let future = ready.wait();
98 let result = futures::executor::block_on(future);
99 assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
100 }
101
102 #[test]
103 fn test_wait_when_not_ready() {
104 let mut ready = ReadyObservable::default();
105 assert!(!ready.is_ready());
106
107 let future = ready.wait();
108 assert!(!ready.is_ready()); let ready_clone = std::sync::Arc::new(std::sync::Mutex::new(ready));
111 let ready_for_complete = ready_clone.clone();
112 let handle = thread::spawn(move || {
113 thread::sleep(Duration::from_millis(50));
114 ready_for_complete.lock().unwrap().complete();
115 });
116
117 let result = futures::executor::block_on(future);
118 assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
119
120 handle.join().expect("Thread panicked");
121 assert!(ready_clone.lock().unwrap().is_ready());
122 }
123
124 #[test]
125 fn test_multiple_waits_before_complete() {
126 let mut ready = ReadyObservable::default();
127
128 let future1 = ready.wait();
129 let future2 = ready.wait();
130 let future3 = ready.wait();
131
132 assert!(!ready.is_ready());
133
134 ready.complete();
135 assert!(ready.is_ready());
136
137 let result1 = futures::executor::block_on(future1);
138 let result2 = futures::executor::block_on(future2);
139 let result3 = futures::executor::block_on(future3);
140
141 assert!(matches!(result1, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
142 assert!(matches!(result2, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
143 assert!(matches!(result3, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
144 }
145
146 #[test]
147 fn test_wait_after_complete() {
148 let mut ready = ReadyObservable::default();
149 ready.complete();
150 assert!(ready.is_ready());
151
152 let future1 = ready.wait();
153 let future2 = ready.wait();
154 let future3 = ready.wait();
155
156 let result1 = futures::executor::block_on(future1);
157 let result2 = futures::executor::block_on(future2);
158 let result3 = futures::executor::block_on(future3);
159
160 assert!(matches!(result1, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
161 assert!(matches!(result2, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
162 assert!(matches!(result3, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
163 }
164
165 #[test]
166 fn test_observers_cleared_after_complete() {
167 let mut ready = ReadyObservable::default();
168
169 let _future1 = ready.wait();
170 let _future2 = ready.wait();
171
172 ready.complete();
173
174 let future3 = ready.wait();
175 let result = futures::executor::block_on(future3);
176 assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
177 }
178
179 #[test]
180 fn test_concurrent_waits_and_complete() {
181 let ready = ReadyObservable::default();
182 let ready_arc = std::sync::Arc::new(std::sync::Mutex::new(ready));
183 let mut handles = vec![];
184
185 for i in 0..10 {
186 let ready_clone = ready_arc.clone();
187 let handle = thread::spawn(move || {
188 let future = ready_clone.lock().unwrap().wait();
189 let result = futures::executor::block_on(future);
190 (i, result)
191 });
192 handles.push(handle);
193 }
194
195 thread::sleep(Duration::from_millis(100));
196 ready_arc.lock().unwrap().complete();
197
198 for handle in handles {
199 let (id, result) = handle.join().expect("Thread panicked");
200 assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
201 println!("Thread {} completed", id);
202 }
203 }
204}
205
206#[cfg(test)]
207#[allow(dead_code)]
208mod web_tests {
209 use super::*;
210 use futures::join;
211 use wasm_bindgen_test::wasm_bindgen_test;
212
213 #[wasm_bindgen_test]
214 async fn test_web_multiple_simultaneous_awaits() {
215 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
216 multiple_simultaneous_awaits().await;
217 }
218
219 #[wasm_bindgen_test]
220 async fn test_node_multiple_simultaneous_awaits() {
221 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
222 multiple_simultaneous_awaits().await;
223 }
224
225 async fn multiple_simultaneous_awaits() {
226 let mut ready = ReadyObservable::default();
227 let ready_clone1 = ready.wait();
228 let ready_clone2 = ready.wait();
229 let ready_clone3 = ready.wait();
230
231 let fut1 = async {
232 ready_clone1.await;
233 };
234 let fut2 = async {
235 ready_clone2.await;
236 };
237 let fut3 = async {
238 ready_clone3.await;
239 };
240
241 ready.complete();
242 assert_eq!(join!(fut1, fut2, fut3), ((), (), ()));
243 }
244}