Skip to main content

sabi/tokio/
async_group.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5use super::AsyncGroup;
6
7use futures::future;
8use std::future::Future;
9use std::sync::Arc;
10
11impl AsyncGroup {
12    pub(crate) fn new() -> Self {
13        Self {
14            tasks: Vec::new(),
15            names: Vec::new(),
16            _name: "".into(),
17        }
18    }
19
20    /// Adds a future to the AsyncGroup to be executed concurrently.
21    ///
22    /// The provided future will be polled along with others in this group.
23    ///
24    /// # Arguments
25    ///
26    /// * `future` - The future to add. It must implement `Future<Output = errs::Result<()>>`,
27    ///              `Send`, and have a `'static` lifetime.
28    #[allow(clippy::doc_overindented_list_items)]
29    pub fn add<Fut>(&mut self, future: Fut)
30    where
31        Fut: Future<Output = errs::Result<()>> + Send + 'static,
32    {
33        self.tasks.push(Box::pin(future));
34        self.names.push(self._name.clone());
35    }
36
37    pub(crate) async fn join_and_collect_errors_async(
38        self,
39        errors: &mut Vec<(Arc<str>, errs::Err)>,
40    ) {
41        if self.tasks.is_empty() {
42            return;
43        }
44
45        let result_all = future::join_all(self.tasks).await;
46        for (i, result) in result_all.into_iter().enumerate() {
47            if let Err(err) = result {
48                errors.push((self.names[i].clone(), err));
49            }
50        }
51    }
52
53    pub(crate) async fn join_and_ignore_errors_async(self) {
54        let _ = future::join_all(self.tasks).await;
55    }
56}
57
58#[cfg(test)]
59mod tests_of_async_group {
60    use super::*;
61    use std::sync::Arc;
62    use tokio::sync::Mutex;
63    use tokio::time;
64
65    const BASE_LINE: u32 = line!();
66
67    #[derive(Debug, PartialEq)]
68    enum Reasons {
69        BadString(String),
70    }
71
72    struct MyStruct {
73        string: Arc<Mutex<String>>,
74        fail: bool,
75    }
76
77    impl MyStruct {
78        fn new(s: String, fail: bool) -> Self {
79            Self {
80                string: Arc::new(Mutex::new(s)),
81                fail,
82            }
83        }
84
85        fn process(&self, ag: &mut AsyncGroup) {
86            let s_clone = self.string.clone();
87            let fail = self.fail;
88            ag.add(async move {
89                // The `.await` must be executed outside the Mutex lock.
90                let _ = time::sleep(time::Duration::from_millis(100)).await;
91
92                {
93                    let mut s = s_clone.lock().await;
94                    if fail {
95                        return Err(errs::Err::new(Reasons::BadString((*s).to_string())));
96                    }
97                    *s = s.to_uppercase();
98                }
99
100                Ok(())
101            });
102        }
103
104        fn process_multiple(&self, ag: &mut AsyncGroup) {
105            let s_clone = self.string.clone();
106            let fail = self.fail;
107            ag.add(async move {
108                // The `.await` must be executed outside the Mutex lock.
109                let _ = time::sleep(time::Duration::from_millis(100)).await;
110
111                {
112                    let mut s = s_clone.lock().await;
113                    if fail {
114                        return Err(errs::Err::new(Reasons::BadString((*s).to_string())));
115                    }
116                    *s = s.to_uppercase();
117                }
118
119                Ok(())
120            });
121
122            let s_clone = self.string.clone();
123            let fail = self.fail;
124            ag.add(async move {
125                // The `.await` must be executed outside the Mutex lock.
126                let _ = time::sleep(time::Duration::from_millis(100)).await;
127
128                {
129                    let mut s = s_clone.lock().await;
130                    if fail {
131                        return Err(errs::Err::new(Reasons::BadString((*s).to_string())));
132                    }
133                    *s = s.to_uppercase();
134                }
135
136                Ok(())
137            });
138        }
139    }
140
141    mod tests_of_join_and_collect_errors {
142        use super::*;
143
144        #[tokio::test]
145        async fn zero() {
146            let ag = AsyncGroup::new();
147
148            let mut err_vec = Vec::new();
149            ag.join_and_collect_errors_async(&mut err_vec).await;
150
151            assert!(err_vec.is_empty());
152        }
153
154        #[tokio::test]
155        async fn single_ok() {
156            let mut ag = AsyncGroup::new();
157
158            let struct_a = MyStruct::new("a".to_string(), false);
159            assert_eq!(*struct_a.string.lock().await, "a");
160
161            ag._name = "foo".into();
162            struct_a.process(&mut ag);
163
164            let mut errors = Vec::new();
165            ag.join_and_collect_errors_async(&mut errors).await;
166
167            assert!(errors.is_empty());
168            assert_eq!(*struct_a.string.lock().await, "A");
169        }
170
171        #[tokio::test]
172        async fn single_fail() {
173            let mut ag = AsyncGroup::new();
174
175            let struct_a = MyStruct::new("a".to_string(), true);
176            assert_eq!(*struct_a.string.lock().await, "a");
177
178            ag._name = "foo".into();
179            struct_a.process(&mut ag);
180
181            let mut errors = Vec::new();
182            ag.join_and_collect_errors_async(&mut errors).await;
183
184            assert_eq!(errors.len(), 1);
185            assert_eq!(*struct_a.string.lock().await, "a");
186
187            assert_eq!(errors[0].0, "foo".into());
188            #[cfg(unix)]
189            assert_eq!(
190                format!("{:?}", errors[0].1),
191                "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src/tokio/async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }"
192            );
193            #[cfg(windows)]
194            assert_eq!(
195                format!("{:?}", errors[0].1),
196                "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src\\tokio\\async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }"
197            );
198        }
199
200        #[tokio::test]
201        async fn multiple_ok() {
202            let mut ag = AsyncGroup::new();
203
204            let struct_a = MyStruct::new("a".to_string(), false);
205            assert_eq!(*struct_a.string.lock().await, "a".to_string());
206
207            let struct_b = MyStruct::new("b".to_string(), false);
208            assert_eq!(*struct_b.string.lock().await, "b".to_string());
209
210            let struct_c = MyStruct::new("c".to_string(), false);
211            assert_eq!(*struct_c.string.lock().await, "c".to_string());
212
213            ag._name = "foo".into();
214            struct_a.process(&mut ag);
215
216            ag._name = "bar".into();
217            struct_b.process(&mut ag);
218
219            ag._name = "baz".into();
220            struct_c.process(&mut ag);
221
222            let mut err_vec = Vec::new();
223            ag.join_and_collect_errors_async(&mut err_vec).await;
224
225            assert_eq!(err_vec.len(), 0);
226
227            assert_eq!(*struct_a.string.lock().await, "A");
228            assert_eq!(*struct_b.string.lock().await, "B");
229            assert_eq!(*struct_c.string.lock().await, "C");
230        }
231
232        #[tokio::test]
233        async fn multiple_processes_and_single_fail() {
234            let mut ag = AsyncGroup::new();
235
236            let struct_a = MyStruct::new("a".to_string(), false);
237            assert_eq!(*struct_a.string.lock().await, "a");
238
239            let struct_b = MyStruct::new("b".to_string(), true);
240            assert_eq!(*struct_b.string.lock().await, "b");
241
242            let struct_c = MyStruct::new("c".to_string(), false);
243            assert_eq!(*struct_c.string.lock().await, "c");
244
245            ag._name = "foo".into();
246            struct_a.process(&mut ag);
247
248            ag._name = "bar".into();
249            struct_b.process(&mut ag);
250
251            ag._name = "baz".into();
252            struct_c.process(&mut ag);
253
254            let mut err_vec = Vec::new();
255            ag.join_and_collect_errors_async(&mut err_vec).await;
256
257            assert_eq!(err_vec.len(), 1);
258            assert_eq!(err_vec[0].0, "bar".into());
259            #[cfg(unix)]
260            assert_eq!(
261                format!("{:?}", err_vec[0].1),
262                "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src/tokio/async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
263            );
264            #[cfg(windows)]
265            assert_eq!(
266                format!("{:?}", err_vec[0].1),
267                "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src\\tokio\\async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
268            );
269
270            assert_eq!(*struct_a.string.lock().await, "A");
271            assert_eq!(*struct_b.string.lock().await, "b");
272            assert_eq!(*struct_c.string.lock().await, "C");
273        }
274
275        #[tokio::test]
276        async fn multiple_fail() {
277            let mut ag = AsyncGroup::new();
278
279            let struct_a = MyStruct::new("a".to_string(), true);
280            assert_eq!(*struct_a.string.lock().await, "a");
281
282            let struct_b = MyStruct::new("b".to_string(), true);
283            assert_eq!(*struct_b.string.lock().await, "b");
284
285            let struct_c = MyStruct::new("c".to_string(), true);
286            assert_eq!(*struct_c.string.lock().await, "c");
287
288            ag._name = "foo".into();
289            struct_a.process(&mut ag);
290
291            ag._name = "bar".into();
292            struct_b.process(&mut ag);
293
294            ag._name = "baz".into();
295            struct_c.process(&mut ag);
296
297            let mut err_vec = Vec::new();
298            ag.join_and_collect_errors_async(&mut err_vec).await;
299
300            assert_eq!(err_vec.len(), 3);
301
302            assert_eq!(err_vec[0].0, "foo".into());
303            assert_eq!(err_vec[1].0, "bar".into());
304            assert_eq!(err_vec[2].0, "baz".into());
305
306            #[cfg(unix)]
307            assert_eq!(
308                format!("{:?}", err_vec[0].1),
309                "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src/tokio/async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
310            );
311            #[cfg(windows)]
312            assert_eq!(
313                format!("{:?}", err_vec[0].1),
314                "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src\\tokio\\async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
315            );
316            #[cfg(unix)]
317            assert_eq!(
318                format!("{:?}", err_vec[1].1),
319                "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src/tokio/async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
320            );
321            #[cfg(windows)]
322            assert_eq!(
323                format!("{:?}", err_vec[1].1),
324                "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src\\tokio\\async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
325            );
326            #[cfg(unix)]
327            assert_eq!(
328                format!("{:?}", err_vec[2].1),
329                "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"c\"), file = src/tokio/async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
330            );
331            #[cfg(windows)]
332            assert_eq!(
333                format!("{:?}", err_vec[2].1),
334                "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"c\"), file = src\\tokio\\async_group.rs, line = ".to_string() + &(BASE_LINE + 30).to_string() + " }",
335            );
336
337            assert_eq!(*struct_a.string.lock().await, "a");
338            assert_eq!(*struct_b.string.lock().await, "b");
339            assert_eq!(*struct_c.string.lock().await, "c");
340        }
341
342        #[tokio::test]
343        async fn data_src_execute_multiple_setup_handles() {
344            let mut ag = AsyncGroup::new();
345
346            let struct_d = MyStruct::new("d".to_string(), false);
347            assert_eq!(*struct_d.string.lock().await, "d");
348
349            ag._name = "foo".into();
350            struct_d.process(&mut ag);
351
352            let mut err_vec = Vec::new();
353            ag.join_and_collect_errors_async(&mut err_vec).await;
354
355            assert_eq!(err_vec.len(), 0);
356
357            assert_eq!(*struct_d.string.lock().await, "D");
358        }
359
360        #[tokio::test]
361        async fn collect_all_errors_if_data_src_executes_multiple_setup_handles() {
362            let mut ag = AsyncGroup::new();
363
364            let struct_d = MyStruct::new("d".to_string(), true);
365            assert_eq!(*struct_d.string.lock().await, "d");
366
367            ag._name = "foo".into();
368            struct_d.process_multiple(&mut ag);
369
370            let mut err_vec = Vec::new();
371            ag.join_and_collect_errors_async(&mut err_vec).await;
372
373            assert_eq!(err_vec.len(), 2);
374
375            assert_eq!(err_vec[0].0, "foo".into());
376            assert_eq!(err_vec[1].0, "foo".into());
377
378            #[cfg(unix)]
379            assert_eq!(
380                format!("{:?}", err_vec[0].1),
381                "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src/tokio/async_group.rs, line = ".to_string() + &(BASE_LINE + 49).to_string() + " }",
382            );
383            #[cfg(windows)]
384            assert_eq!(
385                format!("{:?}", err_vec[0].1),
386                "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src\\tokio\\async_group.rs, line = ".to_string() + &(BASE_LINE + 49).to_string() + " }"
387            );
388
389            #[cfg(unix)]
390            assert_eq!(
391                format!("{:?}", err_vec[1].1),
392                "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src/tokio/async_group.rs, line = ".to_string() + &(BASE_LINE + 66).to_string() + " }",
393            );
394            #[cfg(windows)]
395            assert_eq!(
396                format!("{:?}", err_vec[1].1),
397                "errs::Err { reason = sabi::tokio::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src\\tokio\\async_group.rs, line = ".to_string() + &(BASE_LINE + 66).to_string() + " }"
398            );
399
400            assert_eq!(*struct_d.string.lock().await, "d");
401        }
402    }
403
404    mod tests_join_and_ignore_errors_async {
405        use super::*;
406
407        #[tokio::test]
408        async fn zero() {
409            let ag = AsyncGroup::new();
410
411            ag.join_and_ignore_errors_async().await;
412        }
413
414        #[tokio::test]
415        async fn single_ok() {
416            let mut ag = AsyncGroup::new();
417
418            let struct_a = MyStruct::new("a".to_string(), false);
419            assert_eq!(*struct_a.string.lock().await, "a".to_string());
420
421            ag._name = "foo".into();
422            struct_a.process(&mut ag);
423
424            ag.join_and_ignore_errors_async().await;
425            assert_eq!(*struct_a.string.lock().await, "A".to_string());
426        }
427
428        #[tokio::test]
429        async fn single_fail() {
430            let mut ag = AsyncGroup::new();
431
432            let struct_a = MyStruct::new("a".to_string(), true);
433            assert_eq!(*struct_a.string.lock().await, "a".to_string());
434
435            ag._name = "foo".into();
436            struct_a.process(&mut ag);
437
438            ag.join_and_ignore_errors_async().await;
439            assert_eq!(*struct_a.string.lock().await, "a".to_string());
440        }
441
442        #[tokio::test]
443        async fn multiple_ok() {
444            let mut ag = AsyncGroup::new();
445
446            let struct_a = MyStruct::new("a".to_string(), false);
447            assert_eq!(*struct_a.string.lock().await, "a".to_string());
448
449            let struct_b = MyStruct::new("b".to_string(), false);
450            assert_eq!(*struct_b.string.lock().await, "b".to_string());
451
452            let struct_c = MyStruct::new("c".to_string(), false);
453            assert_eq!(*struct_c.string.lock().await, "c".to_string());
454
455            ag._name = "foo".into();
456            struct_a.process(&mut ag);
457
458            ag._name = "bar".into();
459            struct_b.process(&mut ag);
460
461            ag._name = "baz".into();
462            struct_c.process(&mut ag);
463
464            ag.join_and_ignore_errors_async().await;
465
466            assert_eq!(*struct_a.string.lock().await, "A");
467            assert_eq!(*struct_b.string.lock().await, "B");
468            assert_eq!(*struct_c.string.lock().await, "C");
469        }
470
471        #[tokio::test]
472        async fn multiple_processes_and_single_fail() {
473            let mut ag = AsyncGroup::new();
474
475            let struct_a = MyStruct::new("a".to_string(), false);
476            assert_eq!(*struct_a.string.lock().await, "a".to_string());
477
478            let struct_b = MyStruct::new("b".to_string(), true);
479            assert_eq!(*struct_b.string.lock().await, "b".to_string());
480
481            let struct_c = MyStruct::new("c".to_string(), false);
482            assert_eq!(*struct_c.string.lock().await, "c".to_string());
483
484            ag._name = "foo".into();
485            struct_a.process(&mut ag);
486
487            ag._name = "bar".into();
488            struct_b.process(&mut ag);
489
490            ag._name = "baz".into();
491            struct_c.process(&mut ag);
492
493            ag.join_and_ignore_errors_async().await;
494
495            assert_eq!(*struct_a.string.lock().await, "A");
496            assert_eq!(*struct_b.string.lock().await, "b");
497            assert_eq!(*struct_c.string.lock().await, "C");
498        }
499
500        #[tokio::test]
501        async fn multiple_fail() {
502            let mut ag = AsyncGroup::new();
503
504            let struct_a = MyStruct::new("a".to_string(), true);
505            assert_eq!(*struct_a.string.lock().await, "a".to_string());
506
507            let struct_b = MyStruct::new("b".to_string(), true);
508            assert_eq!(*struct_b.string.lock().await, "b".to_string());
509
510            let struct_c = MyStruct::new("c".to_string(), true);
511            assert_eq!(*struct_c.string.lock().await, "c".to_string());
512
513            ag._name = "foo".into();
514            struct_a.process(&mut ag);
515
516            ag._name = "bar".into();
517            struct_b.process(&mut ag);
518
519            ag._name = "baz".into();
520            struct_c.process(&mut ag);
521
522            ag.join_and_ignore_errors_async().await;
523
524            assert_eq!(*struct_a.string.lock().await, "a");
525            assert_eq!(*struct_b.string.lock().await, "b");
526            assert_eq!(*struct_c.string.lock().await, "c");
527        }
528    }
529}