async_foundation/common/
ready_observable.rs1use crate::common::ready_future::ReadyFuture;
2
3pub struct ReadyObservable {
4 ready: bool,
5 observers: Vec<ReadyFuture<()>>,
6}
7
8impl ReadyObservable {
9 pub fn complete(&mut self) {
10 self.ready = true;
11 for observer in self.observers.drain(..) {
12 observer.get_state().complete(());
13 }
14 }
15
16 pub fn is_ready(&self) -> bool {
17 self.ready
18 }
19
20 pub fn wait(&mut self) -> ReadyFuture<()> {
21 if self.is_ready() {
22 ReadyFuture::new_completed(())
23 } else {
24 let future = ReadyFuture::new();
25 self.observers.push(future.clone());
26 future
27 }
28 }
29}
30
31impl Default for ReadyObservable {
32 fn default() -> Self {
33 Self {
34 ready: false,
35 observers: Vec::with_capacity(50),
36 }
37 }
38}
39
40#[cfg(test)]
41mod tests {
42 use std::{thread, time::Duration};
43
44 use super::*;
45
46 #[test]
47 fn test_multiple_awaits_on_observable() {
48 let mut ready = ReadyObservable::default();
49 const COUNT: usize = 5;
50 let mut handles = vec![];
51
52 for _ in 0..COUNT {
53 let ready = ready.wait();
54 let handle =
55 thread::spawn(move || futures::executor::block_on(async move { ready.await }));
56 handles.push(handle);
57 }
58 assert_eq!(handles.len(), COUNT);
59 thread::sleep(Duration::from_millis(100));
60 ready.complete();
61
62 let mut counter = 0usize;
63 for handle in handles {
64 handle.join().expect("Thread panicked");
65 eprintln!("handler ready {counter}");
66 counter += 1;
67 }
68 assert_eq!(counter, COUNT);
69 }
70
71 #[test]
72 fn test_ready_observable_default() {
73 let ready = ReadyObservable::default();
74 assert!(!ready.is_ready());
75 }
76
77 #[test]
78 fn test_ready_observable_complete() {
79 let mut ready = ReadyObservable::default();
80 assert!(!ready.is_ready());
81
82 ready.complete();
83 assert!(ready.is_ready());
84 }
85
86 #[test]
87 fn test_wait_when_ready() {
88 let mut ready = ReadyObservable::default();
89 ready.complete();
90
91 let future = ready.wait();
92 let result = futures::executor::block_on(future);
93 assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
94 }
95
96 #[test]
97 fn test_wait_when_not_ready() {
98 let mut ready = ReadyObservable::default();
99 assert!(!ready.is_ready());
100
101 let future = ready.wait();
102 assert!(!ready.is_ready()); let ready_clone = std::sync::Arc::new(std::sync::Mutex::new(ready));
105 let ready_for_complete = ready_clone.clone();
106 let handle = thread::spawn(move || {
107 thread::sleep(Duration::from_millis(50));
108 ready_for_complete.lock().unwrap().complete();
109 });
110
111 let result = futures::executor::block_on(future);
112 assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
113
114 handle.join().expect("Thread panicked");
115 assert!(ready_clone.lock().unwrap().is_ready());
116 }
117
118 #[test]
119 fn test_multiple_waits_before_complete() {
120 let mut ready = ReadyObservable::default();
121
122 let future1 = ready.wait();
123 let future2 = ready.wait();
124 let future3 = ready.wait();
125
126 assert!(!ready.is_ready());
127
128 ready.complete();
129 assert!(ready.is_ready());
130
131 let result1 = futures::executor::block_on(future1);
132 let result2 = futures::executor::block_on(future2);
133 let result3 = futures::executor::block_on(future3);
134
135 assert!(matches!(result1, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
136 assert!(matches!(result2, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
137 assert!(matches!(result3, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
138 }
139
140 #[test]
141 fn test_wait_after_complete() {
142 let mut ready = ReadyObservable::default();
143 ready.complete();
144 assert!(ready.is_ready());
145
146 let future1 = ready.wait();
147 let future2 = ready.wait();
148 let future3 = ready.wait();
149
150 let result1 = futures::executor::block_on(future1);
151 let result2 = futures::executor::block_on(future2);
152 let result3 = futures::executor::block_on(future3);
153
154 assert!(matches!(result1, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
155 assert!(matches!(result2, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
156 assert!(matches!(result3, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
157 }
158
159 #[test]
160 fn test_observers_cleared_after_complete() {
161 let mut ready = ReadyObservable::default();
162
163 let _future1 = ready.wait();
164 let _future2 = ready.wait();
165
166 ready.complete();
167
168 let future3 = ready.wait();
169 let result = futures::executor::block_on(future3);
170 assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
171 }
172
173 #[test]
174 fn test_concurrent_waits_and_complete() {
175 let ready = ReadyObservable::default();
176 let ready_arc = std::sync::Arc::new(std::sync::Mutex::new(ready));
177 let mut handles = vec![];
178
179 for i in 0..10 {
180 let ready_clone = ready_arc.clone();
181 let handle = thread::spawn(move || {
182 let future = ready_clone.lock().unwrap().wait();
183 let result = futures::executor::block_on(future);
184 (i, result)
185 });
186 handles.push(handle);
187 }
188
189 thread::sleep(Duration::from_millis(100));
190 ready_arc.lock().unwrap().complete();
191
192 for handle in handles {
193 let (id, result) = handle.join().expect("Thread panicked");
194 assert!(matches!(result, crate::common::ready_future_state::ReadyFutureResult::Completed(())));
195 println!("Thread {} completed", id);
196 }
197 }
198}
199
200#[cfg(test)]
201#[allow(dead_code)]
202mod web_tests {
203 use super::*;
204 use futures::join;
205 use wasm_bindgen_test::wasm_bindgen_test;
206
207 #[wasm_bindgen_test]
208 async fn test_web_multiple_simultaneous_awaits() {
209 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
210 multiple_simultaneous_awaits().await;
211 }
212
213 #[wasm_bindgen_test]
214 async fn test_node_multiple_simultaneous_awaits() {
215 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
216 multiple_simultaneous_awaits().await;
217 }
218
219 async fn multiple_simultaneous_awaits() {
220 let mut ready = ReadyObservable::default();
221 let ready_clone1 = ready.wait();
222 let ready_clone2 = ready.wait();
223 let ready_clone3 = ready.wait();
224
225 let fut1 = async {
226 ready_clone1.await;
227 };
228 let fut2 = async {
229 ready_clone2.await;
230 };
231 let fut3 = async {
232 ready_clone3.await;
233 };
234
235 ready.complete();
236 assert_eq!(join!(fut1, fut2, fut3), ((), (), ()));
237 }
238}