Skip to main content

sabi/
async_group.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5use crate::AsyncGroup;
6
7use std::sync::Arc;
8use std::{fmt, mem, thread};
9
10/// The enum type representing the reasons for errors that can occur within an [`AsyncGroup`].
11#[derive(Debug)]
12pub enum AsyncGroupError {
13    /// Indicates that a spawned thread by [`AsyncGroup`] has panicked.
14    /// Contains the panic message if available.
15    ThreadPanicked(String),
16}
17
18impl AsyncGroup {
19    #[allow(clippy::new_without_default)]
20    pub fn new() -> Self {
21        Self {
22            handlers: Vec::new(),
23            _name: "".into(),
24        }
25    }
26
27    /// Adds a task (a closure) to the group to be executed concurrently.
28    ///
29    /// This provided closure is executed in a new `std::thread` concurrently
30    /// with other added tasks.
31    ///
32    /// # Type Parameters
33    ///
34    /// * `F`: The type of the closure, which must be
35    ///   `FnOnce() -> errs::Result<()> + Send + 'static`.
36    ///
37    /// # Parameters
38    ///
39    /// * `f`: The closure to be executed in a separate thread.
40    pub fn add<F>(&mut self, f: F)
41    where
42        F: FnOnce() -> errs::Result<()> + Send + 'static,
43    {
44        self.handlers.push((self._name.clone(), thread::spawn(f)));
45    }
46
47    pub(crate) fn join_and_collect_errors(mut self, errors: &mut Vec<(Arc<str>, errs::Err)>) {
48        if self.handlers.is_empty() {
49            return;
50        }
51
52        let vec = mem::take(&mut self.handlers);
53
54        for h in vec.into_iter() {
55            match h.1.join() {
56                Ok(r) => {
57                    if let Err(e) = r {
58                        errors.push((h.0.clone(), e));
59                    }
60                }
61                Err(e) => {
62                    let s = match e.downcast_ref::<Box<dyn fmt::Debug>>() {
63                        Some(d) => format!("{d:?}"),
64                        None => "".to_string(),
65                    };
66                    let e = errs::Err::new(AsyncGroupError::ThreadPanicked(s));
67                    errors.push((h.0.clone(), e));
68                }
69            }
70        }
71    }
72
73    pub(crate) fn join_and_ignore_errors(mut self) {
74        if self.handlers.is_empty() {
75            return;
76        }
77
78        let vec = mem::take(&mut self.handlers);
79
80        for h in vec.into_iter() {
81            let _ = h.1.join();
82        }
83    }
84
85    #[inline]
86    pub fn join(self) -> Vec<(Arc<str>, errs::Err)> {
87        let mut vec = Vec::new();
88        self.join_and_collect_errors(&mut vec);
89        vec
90    }
91}
92
93#[cfg(test)]
94mod tests_of_async_group {
95    use super::*;
96    use std::{sync, time};
97
98    const BASE_LINE: u32 = line!();
99
100    #[derive(Debug, PartialEq)]
101    enum Reasons {
102        BadString(String),
103    }
104
105    struct MyStruct {
106        string: sync::Arc<sync::Mutex<String>>,
107        fail: bool,
108    }
109
110    impl MyStruct {
111        fn new(s: String, fail: bool) -> Self {
112            Self {
113                string: sync::Arc::new(sync::Mutex::new(s)),
114                fail,
115            }
116        }
117
118        fn process(&self, ag: &mut AsyncGroup) {
119            let s_mutex = self.string.clone();
120            let fail = self.fail;
121            ag.add(move || {
122                let _ = thread::sleep(time::Duration::from_millis(100));
123                {
124                    let mut s = s_mutex.lock().unwrap();
125                    if fail {
126                        return Err(errs::Err::new(Reasons::BadString(s.to_string())));
127                    }
128                    *s = s.to_uppercase();
129                }
130                Ok(())
131            });
132        }
133
134        fn process_multiple(&self, ag: &mut AsyncGroup) {
135            let s_mutex = self.string.clone();
136            let fail = self.fail;
137            ag.add(move || {
138                let _ = thread::sleep(time::Duration::from_millis(100));
139                {
140                    let mut s = s_mutex.lock().unwrap();
141                    if fail {
142                        return Err(errs::Err::new(Reasons::BadString(s.to_string())));
143                    }
144                    *s = s.to_uppercase();
145                }
146                Ok(())
147            });
148
149            let s_mutex = self.string.clone();
150            let fail = self.fail;
151            ag.add(move || {
152                let _ = thread::sleep(time::Duration::from_millis(100));
153                {
154                    let mut s = s_mutex.lock().unwrap();
155                    if fail {
156                        return Err(errs::Err::new(Reasons::BadString(s.to_string())));
157                    }
158                    *s = s.to_uppercase();
159                }
160                Ok(())
161            });
162        }
163    }
164
165    mod tests_of_join_and_collect_errors {
166        use super::*;
167
168        #[test]
169        fn zero() {
170            let ag = AsyncGroup::new();
171
172            let mut err_vec = Vec::new();
173            ag.join_and_collect_errors(&mut err_vec);
174
175            assert!(err_vec.is_empty());
176        }
177
178        #[test]
179        fn single_ok() {
180            let mut ag = AsyncGroup::new();
181
182            let struct_a = MyStruct::new("a".to_string(), false);
183            assert_eq!(*struct_a.string.lock().unwrap(), "a");
184
185            ag._name = "foo".into();
186            struct_a.process(&mut ag);
187
188            let mut errors = Vec::new();
189            ag.join_and_collect_errors(&mut errors);
190
191            assert!(errors.is_empty());
192            assert_eq!(*struct_a.string.lock().unwrap(), "A");
193        }
194
195        #[test]
196        fn single_fail() {
197            let mut ag = AsyncGroup::new();
198
199            let struct_a = MyStruct::new("a".to_string(), true);
200            assert_eq!(*struct_a.string.lock().unwrap(), "a");
201
202            ag._name = "foo".into();
203            struct_a.process(&mut ag);
204
205            let mut errors = Vec::new();
206            ag.join_and_collect_errors(&mut errors);
207
208            assert_eq!(errors.len(), 1);
209            assert_eq!(*struct_a.string.lock().unwrap(), "a");
210
211            assert_eq!(errors[0].0, "foo".into());
212            #[cfg(unix)]
213            assert_eq!(
214                format!("{:?}", errors[0].1),
215                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src/async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }"
216            );
217            #[cfg(windows)]
218            assert_eq!(
219                format!("{:?}", errors[0].1),
220                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src\\async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }"
221            );
222        }
223
224        #[test]
225        fn multiple_ok() {
226            let mut ag = AsyncGroup::new();
227
228            let struct_a = MyStruct::new("a".to_string(), false);
229            assert_eq!(*struct_a.string.lock().unwrap(), "a".to_string());
230
231            let struct_b = MyStruct::new("b".to_string(), false);
232            assert_eq!(*struct_b.string.lock().unwrap(), "b".to_string());
233
234            let struct_c = MyStruct::new("c".to_string(), false);
235            assert_eq!(*struct_c.string.lock().unwrap(), "c".to_string());
236
237            ag._name = "foo".into();
238            struct_a.process(&mut ag);
239
240            ag._name = "bar".into();
241            struct_b.process(&mut ag);
242
243            ag._name = "baz".into();
244            struct_c.process(&mut ag);
245
246            let mut err_vec = Vec::new();
247            ag.join_and_collect_errors(&mut err_vec);
248
249            assert_eq!(err_vec.len(), 0);
250
251            assert_eq!(*struct_a.string.lock().unwrap(), "A");
252            assert_eq!(*struct_b.string.lock().unwrap(), "B");
253            assert_eq!(*struct_c.string.lock().unwrap(), "C");
254        }
255
256        #[test]
257        fn multiple_processes_and_single_fail() {
258            let mut ag = AsyncGroup::new();
259
260            let struct_a = MyStruct::new("a".to_string(), false);
261            assert_eq!(*struct_a.string.lock().unwrap(), "a");
262
263            let struct_b = MyStruct::new("b".to_string(), true);
264            assert_eq!(*struct_b.string.lock().unwrap(), "b");
265
266            let struct_c = MyStruct::new("c".to_string(), false);
267            assert_eq!(*struct_c.string.lock().unwrap(), "c");
268
269            ag._name = "foo".into();
270            struct_a.process(&mut ag);
271
272            ag._name = "bar".into();
273            struct_b.process(&mut ag);
274
275            ag._name = "baz".into();
276            struct_c.process(&mut ag);
277
278            let mut err_vec = Vec::new();
279            ag.join_and_collect_errors(&mut err_vec);
280
281            assert_eq!(err_vec.len(), 1);
282            assert_eq!(err_vec[0].0, "bar".into());
283            #[cfg(unix)]
284            assert_eq!(
285                format!("{:?}", err_vec[0].1),
286                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src/async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
287            );
288            #[cfg(windows)]
289            assert_eq!(
290                format!("{:?}", err_vec[0].1),
291                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src\\async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
292            );
293
294            assert_eq!(*struct_a.string.lock().unwrap(), "A");
295            assert_eq!(*struct_b.string.lock().unwrap(), "b");
296            assert_eq!(*struct_c.string.lock().unwrap(), "C");
297        }
298
299        #[test]
300        fn multiple_fail() {
301            let mut ag = AsyncGroup::new();
302
303            let struct_a = MyStruct::new("a".to_string(), true);
304            assert_eq!(*struct_a.string.lock().unwrap(), "a");
305
306            let struct_b = MyStruct::new("b".to_string(), true);
307            assert_eq!(*struct_b.string.lock().unwrap(), "b");
308
309            let struct_c = MyStruct::new("c".to_string(), true);
310            assert_eq!(*struct_c.string.lock().unwrap(), "c");
311
312            ag._name = "foo".into();
313            struct_a.process(&mut ag);
314
315            ag._name = "bar".into();
316            struct_b.process(&mut ag);
317
318            ag._name = "baz".into();
319            struct_c.process(&mut ag);
320
321            let mut err_vec = Vec::new();
322            ag.join_and_collect_errors(&mut err_vec);
323
324            assert_eq!(err_vec.len(), 3);
325
326            assert_eq!(err_vec[0].0, "foo".into());
327            assert_eq!(err_vec[1].0, "bar".into());
328            assert_eq!(err_vec[2].0, "baz".into());
329
330            #[cfg(unix)]
331            assert_eq!(
332                format!("{:?}", err_vec[0].1),
333                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src/async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
334            );
335            #[cfg(windows)]
336            assert_eq!(
337                format!("{:?}", err_vec[0].1),
338                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src\\async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
339            );
340            #[cfg(unix)]
341            assert_eq!(
342                format!("{:?}", err_vec[1].1),
343                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src/async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
344            );
345            #[cfg(windows)]
346            assert_eq!(
347                format!("{:?}", err_vec[1].1),
348                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src\\async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
349            );
350            #[cfg(unix)]
351            assert_eq!(
352                format!("{:?}", err_vec[2].1),
353                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"c\"), file = src/async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
354            );
355            #[cfg(windows)]
356            assert_eq!(
357                format!("{:?}", err_vec[2].1),
358                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"c\"), file = src\\async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
359            );
360
361            assert_eq!(*struct_a.string.lock().unwrap(), "a");
362            assert_eq!(*struct_b.string.lock().unwrap(), "b");
363            assert_eq!(*struct_c.string.lock().unwrap(), "c");
364        }
365
366        #[test]
367        fn data_src_execute_multiple_setup_handles() {
368            let mut ag = AsyncGroup::new();
369
370            let struct_d = MyStruct::new("d".to_string(), false);
371            assert_eq!(*struct_d.string.lock().unwrap(), "d");
372
373            ag._name = "foo".into();
374            struct_d.process(&mut ag);
375
376            let mut err_vec = Vec::new();
377            ag.join_and_collect_errors(&mut err_vec);
378
379            assert_eq!(err_vec.len(), 0);
380
381            assert_eq!(*struct_d.string.lock().unwrap(), "D");
382        }
383
384        #[test]
385        fn collect_all_errors_if_data_src_executes_multiple_setup_handles() {
386            let mut ag = AsyncGroup::new();
387
388            let struct_d = MyStruct::new("d".to_string(), true);
389            assert_eq!(*struct_d.string.lock().unwrap(), "d");
390
391            ag._name = "foo".into();
392            struct_d.process_multiple(&mut ag);
393
394            let mut err_vec = Vec::new();
395            ag.join_and_collect_errors(&mut err_vec);
396
397            assert_eq!(err_vec.len(), 2);
398
399            assert_eq!(err_vec[0].0, "foo".into());
400            assert_eq!(err_vec[1].0, "foo".into());
401
402            #[cfg(unix)]
403            assert_eq!(
404                format!("{:?}", err_vec[0].1),
405                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src/async_group.rs, line = ".to_string() + &(BASE_LINE + 44).to_string() + " }",
406            );
407            #[cfg(windows)]
408            assert_eq!(
409                format!("{:?}", err_vec[0].1),
410                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src\\async_group.rs, line = ".to_string() + &(BASE_LINE + 44).to_string() + " }"
411            );
412
413            #[cfg(unix)]
414            assert_eq!(
415                format!("{:?}", err_vec[1].1),
416                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src/async_group.rs, line = ".to_string() + &(BASE_LINE + 58).to_string() + " }",
417            );
418            #[cfg(windows)]
419            assert_eq!(
420                format!("{:?}", err_vec[1].1),
421                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src\\async_group.rs, line = ".to_string() + &(BASE_LINE + 58).to_string() + " }"
422            );
423
424            assert_eq!(*struct_d.string.lock().unwrap(), "d");
425        }
426    }
427
428    mod tests_of_join_and_ignore_errors {
429        use super::*;
430
431        #[test]
432        fn zero() {
433            let ag = AsyncGroup::new();
434
435            ag.join_and_ignore_errors();
436        }
437
438        #[test]
439        fn single_ok() {
440            let mut ag = AsyncGroup::new();
441
442            let struct_a = MyStruct::new("a".to_string(), false);
443            assert_eq!(*struct_a.string.lock().unwrap(), "a");
444
445            ag._name = "foo".into();
446            struct_a.process(&mut ag);
447
448            ag.join_and_ignore_errors();
449            assert_eq!(*struct_a.string.lock().unwrap(), "A");
450        }
451
452        #[test]
453        fn single_fail() {
454            let mut ag = AsyncGroup::new();
455
456            let struct_a = MyStruct::new("a".to_string(), true);
457            assert_eq!(*struct_a.string.lock().unwrap(), "a");
458
459            ag._name = "foo".into();
460            struct_a.process(&mut ag);
461
462            ag.join_and_ignore_errors();
463            assert_eq!(*struct_a.string.lock().unwrap(), "a");
464        }
465
466        #[test]
467        fn multiple_ok() {
468            let mut ag = AsyncGroup::new();
469
470            let struct_a = MyStruct::new("a".to_string(), false);
471            assert_eq!(*struct_a.string.lock().unwrap(), "a");
472
473            let struct_b = MyStruct::new("b".to_string(), false);
474            assert_eq!(*struct_b.string.lock().unwrap(), "b");
475
476            let struct_c = MyStruct::new("c".to_string(), false);
477            assert_eq!(*struct_c.string.lock().unwrap(), "c");
478
479            ag._name = "foo".into();
480            struct_a.process(&mut ag);
481
482            ag._name = "bar".into();
483            struct_b.process(&mut ag);
484
485            ag._name = "baz".into();
486            struct_c.process(&mut ag);
487
488            ag.join_and_ignore_errors();
489
490            assert_eq!(*struct_a.string.lock().unwrap(), "A");
491            assert_eq!(*struct_b.string.lock().unwrap(), "B");
492            assert_eq!(*struct_c.string.lock().unwrap(), "C");
493        }
494
495        #[test]
496        fn multiple_processes_and_single_fail() {
497            let mut ag = AsyncGroup::new();
498
499            let struct_a = MyStruct::new("a".to_string(), false);
500            assert_eq!(*struct_a.string.lock().unwrap(), "a");
501
502            let struct_b = MyStruct::new("b".to_string(), true);
503            assert_eq!(*struct_b.string.lock().unwrap(), "b");
504
505            let struct_c = MyStruct::new("c".to_string(), false);
506            assert_eq!(*struct_c.string.lock().unwrap(), "c");
507
508            ag._name = "foo".into();
509            struct_a.process(&mut ag);
510
511            ag._name = "bar".into();
512            struct_b.process(&mut ag);
513
514            ag._name = "baz".into();
515            struct_c.process(&mut ag);
516
517            ag.join_and_ignore_errors();
518
519            assert_eq!(*struct_a.string.lock().unwrap(), "A");
520            assert_eq!(*struct_b.string.lock().unwrap(), "b");
521            assert_eq!(*struct_c.string.lock().unwrap(), "C");
522        }
523
524        #[test]
525        fn multiple_fail() {
526            let mut ag = AsyncGroup::new();
527
528            let struct_a = MyStruct::new("a".to_string(), true);
529            assert_eq!(*struct_a.string.lock().unwrap(), "a");
530
531            let struct_b = MyStruct::new("b".to_string(), true);
532            assert_eq!(*struct_b.string.lock().unwrap(), "b");
533
534            let struct_c = MyStruct::new("c".to_string(), true);
535            assert_eq!(*struct_c.string.lock().unwrap(), "c");
536
537            ag._name = "foo".into();
538            struct_a.process(&mut ag);
539
540            ag._name = "bar".into();
541            struct_b.process(&mut ag);
542
543            ag._name = "baz".into();
544            struct_c.process(&mut ag);
545
546            ag.join_and_ignore_errors();
547
548            assert_eq!(*struct_a.string.lock().unwrap(), "a");
549            assert_eq!(*struct_b.string.lock().unwrap(), "b");
550            assert_eq!(*struct_c.string.lock().unwrap(), "c");
551        }
552    }
553}