1use super::AsyncGroup;
6
7use futures::future;
8use std::future::Future;
9use std::sync::Arc;
10
11impl AsyncGroup {
12 pub(crate) fn new() -> Self {
13 Self {
14 tasks: Vec::new(),
15 names: Vec::new(),
16 _name: "".into(),
17 }
18 }
19
20 #[allow(clippy::doc_overindented_list_items)]
29 pub fn add<Fut>(&mut self, future: Fut)
30 where
31 Fut: Future<Output = errs::Result<()>> + Send + 'static,
32 {
33 self.tasks.push(Box::pin(future));
34 self.names.push(self._name.clone());
35 }
36
37 pub(crate) async fn join_and_collect_errors_async(
38 self,
39 errors: &mut Vec<(Arc<str>, errs::Err)>,
40 ) {
41 if self.tasks.is_empty() {
42 return;
43 }
44
45 let result_all = future::join_all(self.tasks).await;
46 for (i, result) in result_all.into_iter().enumerate() {
47 if let Err(err) = result {
48 errors.push((self.names[i].clone(), err));
49 }
50 }
51 }
52
53 pub(crate) async fn join_and_ignore_errors_async(self) {
54 let _ = future::join_all(self.tasks).await;
55 }
56}
57
58#[cfg(test)]
59mod tests_of_async_group {
60 use super::*;
61 use std::sync::Arc;
62 use tokio::sync::Mutex;
63 use tokio::time;
64
65 const BASE_LINE: u32 = line!();
66
67 #[derive(Debug, PartialEq)]
68 enum Reasons {
69 BadString(String),
70 }
71
72 struct MyStruct {
73 string: Arc<Mutex<String>>,
74 fail: bool,
75 }
76
77 impl MyStruct {
78 fn new(s: String, fail: bool) -> Self {
79 Self {
80 string: Arc::new(Mutex::new(s)),
81 fail,
82 }
83 }
84
85 fn process(&self, ag: &mut AsyncGroup) {
86 let s_clone = self.string.clone();
87 let fail = self.fail;
88 ag.add(async move {
89 let _ = time::sleep(time::Duration::from_millis(100)).await;
91
92 {
93 let mut s = s_clone.lock().await;
94 if fail {
95 return Err(errs::Err::new(Reasons::BadString((*s).to_string())));
96 }
97 *s = s.to_uppercase();
98 }
99
100 Ok(())
101 });
102 }
103
104 fn process_multiple(&self, ag: &mut AsyncGroup) {
105 let s_clone = self.string.clone();
106 let fail = self.fail;
107 ag.add(async move {
108 let _ = time::sleep(time::Duration::from_millis(100)).await;
110
111 {
112 let mut s = s_clone.lock().await;
113 if fail {
114 return Err(errs::Err::new(Reasons::BadString((*s).to_string())));
115 }
116 *s = s.to_uppercase();
117 }
118
119 Ok(())
120 });
121
122 let s_clone = self.string.clone();
123 let fail = self.fail;
124 ag.add(async move {
125 let _ = time::sleep(time::Duration::from_millis(100)).await;
127
128 {
129 let mut s = s_clone.lock().await;
130 if fail {
131 return Err(errs::Err::new(Reasons::BadString((*s).to_string())));
132 }
133 *s = s.to_uppercase();
134 }
135
136 Ok(())
137 });
138 }
139 }
140
141 mod tests_of_join_and_collect_errors {
142 use super::*;
143
144 #[tokio::test]
145 async fn zero() {
146 let ag = AsyncGroup::new();
147
148 let mut err_vec = Vec::new();
149 ag.join_and_collect_errors_async(&mut err_vec).await;
150
151 assert!(err_vec.is_empty());
152 }
153
154 #[tokio::test]
155 async fn single_ok() {
156 let mut ag = AsyncGroup::new();
157
158 let struct_a = MyStruct::new("a".to_string(), false);
159 assert_eq!(*struct_a.string.lock().await, "a");
160
161 ag._name = "foo".into();
162 struct_a.process(&mut ag);
163
164 let mut errors = Vec::new();
165 ag.join_and_collect_errors_async(&mut errors).await;
166
167 assert!(errors.is_empty());
168 assert_eq!(*struct_a.string.lock().await, "A");
169 }
170
171 #[tokio::test]
172 async fn single_fail() {
173 let mut ag = AsyncGroup::new();
174
175 let struct_a = MyStruct::new("a".to_string(), true);
176 assert_eq!(*struct_a.string.lock().await, "a");
177
178 ag._name = "foo".into();
179 struct_a.process(&mut ag);
180
181 let mut errors = Vec::new();
182 ag.join_and_collect_errors_async(&mut errors).await;
183
184 assert_eq!(errors.len(), 1);
185 assert_eq!(*struct_a.string.lock().await, "a");
186
187 assert_eq!(errors[0].0, "foo".into());
188 #[cfg(unix)]
189 assert_eq!(
190 format!("{:?}", errors[0].1),
191 "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src/tokio/async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }"
192 );
193 #[cfg(windows)]
194 assert_eq!(
195 format!("{:?}", errors[0].1),
196 "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src\\tokio\\async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }"
197 );
198 }
199
200 #[tokio::test]
201 async fn multiple_ok() {
202 let mut ag = AsyncGroup::new();
203
204 let struct_a = MyStruct::new("a".to_string(), false);
205 assert_eq!(*struct_a.string.lock().await, "a".to_string());
206
207 let struct_b = MyStruct::new("b".to_string(), false);
208 assert_eq!(*struct_b.string.lock().await, "b".to_string());
209
210 let struct_c = MyStruct::new("c".to_string(), false);
211 assert_eq!(*struct_c.string.lock().await, "c".to_string());
212
213 ag._name = "foo".into();
214 struct_a.process(&mut ag);
215
216 ag._name = "bar".into();
217 struct_b.process(&mut ag);
218
219 ag._name = "baz".into();
220 struct_c.process(&mut ag);
221
222 let mut err_vec = Vec::new();
223 ag.join_and_collect_errors_async(&mut err_vec).await;
224
225 assert_eq!(err_vec.len(), 0);
226
227 assert_eq!(*struct_a.string.lock().await, "A");
228 assert_eq!(*struct_b.string.lock().await, "B");
229 assert_eq!(*struct_c.string.lock().await, "C");
230 }
231
232 #[tokio::test]
233 async fn multiple_processes_and_single_fail() {
234 let mut ag = AsyncGroup::new();
235
236 let struct_a = MyStruct::new("a".to_string(), false);
237 assert_eq!(*struct_a.string.lock().await, "a");
238
239 let struct_b = MyStruct::new("b".to_string(), true);
240 assert_eq!(*struct_b.string.lock().await, "b");
241
242 let struct_c = MyStruct::new("c".to_string(), false);
243 assert_eq!(*struct_c.string.lock().await, "c");
244
245 ag._name = "foo".into();
246 struct_a.process(&mut ag);
247
248 ag._name = "bar".into();
249 struct_b.process(&mut ag);
250
251 ag._name = "baz".into();
252 struct_c.process(&mut ag);
253
254 let mut err_vec = Vec::new();
255 ag.join_and_collect_errors_async(&mut err_vec).await;
256
257 assert_eq!(err_vec.len(), 1);
258 assert_eq!(err_vec[0].0, "bar".into());
259 #[cfg(unix)]
260 assert_eq!(
261 format!("{:?}", err_vec[0].1),
262 "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src/tokio/async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
263 );
264 #[cfg(windows)]
265 assert_eq!(
266 format!("{:?}", err_vec[0].1),
267 "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src\\tokio\\async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
268 );
269
270 assert_eq!(*struct_a.string.lock().await, "A");
271 assert_eq!(*struct_b.string.lock().await, "b");
272 assert_eq!(*struct_c.string.lock().await, "C");
273 }
274
275 #[tokio::test]
276 async fn multiple_fail() {
277 let mut ag = AsyncGroup::new();
278
279 let struct_a = MyStruct::new("a".to_string(), true);
280 assert_eq!(*struct_a.string.lock().await, "a");
281
282 let struct_b = MyStruct::new("b".to_string(), true);
283 assert_eq!(*struct_b.string.lock().await, "b");
284
285 let struct_c = MyStruct::new("c".to_string(), true);
286 assert_eq!(*struct_c.string.lock().await, "c");
287
288 ag._name = "foo".into();
289 struct_a.process(&mut ag);
290
291 ag._name = "bar".into();
292 struct_b.process(&mut ag);
293
294 ag._name = "baz".into();
295 struct_c.process(&mut ag);
296
297 let mut err_vec = Vec::new();
298 ag.join_and_collect_errors_async(&mut err_vec).await;
299
300 assert_eq!(err_vec.len(), 3);
301
302 assert_eq!(err_vec[0].0, "foo".into());
303 assert_eq!(err_vec[1].0, "bar".into());
304 assert_eq!(err_vec[2].0, "baz".into());
305
306 #[cfg(unix)]
307 assert_eq!(
308 format!("{:?}", err_vec[0].1),
309 "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src/tokio/async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
310 );
311 #[cfg(windows)]
312 assert_eq!(
313 format!("{:?}", err_vec[0].1),
314 "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src\\tokio\\async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
315 );
316 #[cfg(unix)]
317 assert_eq!(
318 format!("{:?}", err_vec[1].1),
319 "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src/tokio/async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
320 );
321 #[cfg(windows)]
322 assert_eq!(
323 format!("{:?}", err_vec[1].1),
324 "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src\\tokio\\async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
325 );
326 #[cfg(unix)]
327 assert_eq!(
328 format!("{:?}", err_vec[2].1),
329 "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"c\"), file = src/tokio/async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
330 );
331 #[cfg(windows)]
332 assert_eq!(
333 format!("{:?}", err_vec[2].1),
334 "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"c\"), file = src\\tokio\\async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
335 );
336
337 assert_eq!(*struct_a.string.lock().await, "a");
338 assert_eq!(*struct_b.string.lock().await, "b");
339 assert_eq!(*struct_c.string.lock().await, "c");
340 }
341
342 #[tokio::test]
343 async fn data_src_execute_multiple_setup_handles() {
344 let mut ag = AsyncGroup::new();
345
346 let struct_d = MyStruct::new("d".to_string(), false);
347 assert_eq!(*struct_d.string.lock().await, "d");
348
349 ag._name = "foo".into();
350 struct_d.process(&mut ag);
351
352 let mut err_vec = Vec::new();
353 ag.join_and_collect_errors_async(&mut err_vec).await;
354
355 assert_eq!(err_vec.len(), 0);
356
357 assert_eq!(*struct_d.string.lock().await, "D");
358 }
359
360 #[tokio::test]
361 async fn collect_all_errors_if_data_src_executes_multiple_setup_handles() {
362 let mut ag = AsyncGroup::new();
363
364 let struct_d = MyStruct::new("d".to_string(), true);
365 assert_eq!(*struct_d.string.lock().await, "d");
366
367 ag._name = "foo".into();
368 struct_d.process_multiple(&mut ag);
369
370 let mut err_vec = Vec::new();
371 ag.join_and_collect_errors_async(&mut err_vec).await;
372
373 assert_eq!(err_vec.len(), 2);
374
375 assert_eq!(err_vec[0].0, "foo".into());
376 assert_eq!(err_vec[1].0, "foo".into());
377
378 #[cfg(unix)]
379 assert_eq!(
380 format!("{:?}", err_vec[0].1),
381 "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src/tokio/async_group.rs, line = ".to_string() + &(BASE_LINE + 49).to_string() + " }",
382 );
383 #[cfg(windows)]
384 assert_eq!(
385 format!("{:?}", err_vec[0].1),
386 "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src\\tokio\\async_group.rs, line = ".to_string() + &(BASE_LINE + 49).to_string() + " }"
387 );
388
389 #[cfg(unix)]
390 assert_eq!(
391 format!("{:?}", err_vec[1].1),
392 "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src/tokio/async_group.rs, line = ".to_string() + &(BASE_LINE + 66).to_string() + " }",
393 );
394 #[cfg(windows)]
395 assert_eq!(
396 format!("{:?}", err_vec[1].1),
397 "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src\\tokio\\async_group.rs, line = ".to_string() + &(BASE_LINE + 66).to_string() + " }"
398 );
399
400 assert_eq!(*struct_d.string.lock().await, "d");
401 }
402 }
403
404 mod tests_join_and_ignore_errors_async {
405 use super::*;
406
407 #[tokio::test]
408 async fn zero() {
409 let ag = AsyncGroup::new();
410
411 ag.join_and_ignore_errors_async().await;
412 }
413
414 #[tokio::test]
415 async fn single_ok() {
416 let mut ag = AsyncGroup::new();
417
418 let struct_a = MyStruct::new("a".to_string(), false);
419 assert_eq!(*struct_a.string.lock().await, "a".to_string());
420
421 ag._name = "foo".into();
422 struct_a.process(&mut ag);
423
424 ag.join_and_ignore_errors_async().await;
425 assert_eq!(*struct_a.string.lock().await, "A".to_string());
426 }
427
428 #[tokio::test]
429 async fn single_fail() {
430 let mut ag = AsyncGroup::new();
431
432 let struct_a = MyStruct::new("a".to_string(), true);
433 assert_eq!(*struct_a.string.lock().await, "a".to_string());
434
435 ag._name = "foo".into();
436 struct_a.process(&mut ag);
437
438 ag.join_and_ignore_errors_async().await;
439 assert_eq!(*struct_a.string.lock().await, "a".to_string());
440 }
441
442 #[tokio::test]
443 async fn multiple_ok() {
444 let mut ag = AsyncGroup::new();
445
446 let struct_a = MyStruct::new("a".to_string(), false);
447 assert_eq!(*struct_a.string.lock().await, "a".to_string());
448
449 let struct_b = MyStruct::new("b".to_string(), false);
450 assert_eq!(*struct_b.string.lock().await, "b".to_string());
451
452 let struct_c = MyStruct::new("c".to_string(), false);
453 assert_eq!(*struct_c.string.lock().await, "c".to_string());
454
455 ag._name = "foo".into();
456 struct_a.process(&mut ag);
457
458 ag._name = "bar".into();
459 struct_b.process(&mut ag);
460
461 ag._name = "baz".into();
462 struct_c.process(&mut ag);
463
464 ag.join_and_ignore_errors_async().await;
465
466 assert_eq!(*struct_a.string.lock().await, "A");
467 assert_eq!(*struct_b.string.lock().await, "B");
468 assert_eq!(*struct_c.string.lock().await, "C");
469 }
470
471 #[tokio::test]
472 async fn multiple_processes_and_single_fail() {
473 let mut ag = AsyncGroup::new();
474
475 let struct_a = MyStruct::new("a".to_string(), false);
476 assert_eq!(*struct_a.string.lock().await, "a".to_string());
477
478 let struct_b = MyStruct::new("b".to_string(), true);
479 assert_eq!(*struct_b.string.lock().await, "b".to_string());
480
481 let struct_c = MyStruct::new("c".to_string(), false);
482 assert_eq!(*struct_c.string.lock().await, "c".to_string());
483
484 ag._name = "foo".into();
485 struct_a.process(&mut ag);
486
487 ag._name = "bar".into();
488 struct_b.process(&mut ag);
489
490 ag._name = "baz".into();
491 struct_c.process(&mut ag);
492
493 ag.join_and_ignore_errors_async().await;
494
495 assert_eq!(*struct_a.string.lock().await, "A");
496 assert_eq!(*struct_b.string.lock().await, "b");
497 assert_eq!(*struct_c.string.lock().await, "C");
498 }
499
500 #[tokio::test]
501 async fn multiple_fail() {
502 let mut ag = AsyncGroup::new();
503
504 let struct_a = MyStruct::new("a".to_string(), true);
505 assert_eq!(*struct_a.string.lock().await, "a".to_string());
506
507 let struct_b = MyStruct::new("b".to_string(), true);
508 assert_eq!(*struct_b.string.lock().await, "b".to_string());
509
510 let struct_c = MyStruct::new("c".to_string(), true);
511 assert_eq!(*struct_c.string.lock().await, "c".to_string());
512
513 ag._name = "foo".into();
514 struct_a.process(&mut ag);
515
516 ag._name = "bar".into();
517 struct_b.process(&mut ag);
518
519 ag._name = "baz".into();
520 struct_c.process(&mut ag);
521
522 ag.join_and_ignore_errors_async().await;
523
524 assert_eq!(*struct_a.string.lock().await, "a");
525 assert_eq!(*struct_b.string.lock().await, "b");
526 assert_eq!(*struct_c.string.lock().await, "c");
527 }
528 }
529}