Skip to main content

sabi/
async_group.rs

1// Copyright (C) 2024-2026 Takayuki Sato. All Rights Reserved.
2// This program is free software under MIT License.
3// See the file LICENSE in this distribution for more details.
4
5use crate::AsyncGroup;
6
7use std::sync::Arc;
8use std::{fmt, mem, thread};
9
10/// The enum type representing the reasons for errors that can occur within an [`AsyncGroup`].
11#[derive(Debug)]
12pub enum AsyncGroupError {
13    /// Indicates that a spawned thread by [`AsyncGroup`] has panicked.
14    /// Contains the panic message if available.
15    ThreadPanicked(String),
16}
17
18impl AsyncGroup {
19    pub(crate) fn new() -> Self {
20        Self {
21            handlers: Vec::new(),
22            _name: "".into(),
23        }
24    }
25
26    /// Adds a task (a closure) to the group to be executed concurrently.
27    ///
28    /// This provided closure is executed in a new `std::thread` concurrently
29    /// with other added tasks.
30    ///
31    /// # Type Parameters
32    ///
33    /// * `F`: The type of the closure, which must be
34    ///   `FnOnce() -> errs::Result<()> + Send + 'static`.
35    ///
36    /// # Parameters
37    ///
38    /// * `f`: The closure to be executed in a separate thread.
39    pub fn add<F>(&mut self, f: F)
40    where
41        F: FnOnce() -> errs::Result<()> + Send + 'static,
42    {
43        self.handlers.push((self._name.clone(), thread::spawn(f)));
44    }
45
46    pub(crate) fn join_and_collect_errors(&mut self, errors: &mut Vec<(Arc<str>, errs::Err)>) {
47        if self.handlers.is_empty() {
48            return;
49        }
50
51        let vec = mem::take(&mut self.handlers);
52
53        for h in vec.into_iter() {
54            match h.1.join() {
55                Ok(r) => {
56                    if let Err(e) = r {
57                        errors.push((h.0.clone(), e));
58                    }
59                }
60                Err(e) => {
61                    let s = match e.downcast_ref::<Box<dyn fmt::Debug>>() {
62                        Some(d) => format!("{d:?}"),
63                        None => "".to_string(),
64                    };
65                    let e = errs::Err::new(AsyncGroupError::ThreadPanicked(s));
66                    errors.push((h.0.clone(), e));
67                }
68            }
69        }
70    }
71
72    pub(crate) fn join_and_ignore_errors(&mut self) {
73        if self.handlers.is_empty() {
74            return;
75        }
76
77        let vec = mem::take(&mut self.handlers);
78
79        for h in vec.into_iter() {
80            let _ = h.1.join();
81        }
82    }
83}
84
85#[cfg(test)]
86mod tests_of_async_group {
87    use super::*;
88    use std::{sync, time};
89
90    const BASE_LINE: u32 = line!();
91
92    #[derive(Debug, PartialEq)]
93    enum Reasons {
94        BadString(String),
95    }
96
97    struct MyStruct {
98        string: sync::Arc<sync::Mutex<String>>,
99        fail: bool,
100    }
101
102    impl MyStruct {
103        fn new(s: String, fail: bool) -> Self {
104            Self {
105                string: sync::Arc::new(sync::Mutex::new(s)),
106                fail,
107            }
108        }
109
110        fn process(&self, ag: &mut AsyncGroup) {
111            let s_mutex = self.string.clone();
112            let fail = self.fail;
113            ag.add(move || {
114                let _ = thread::sleep(time::Duration::from_millis(100));
115                {
116                    let mut s = s_mutex.lock().unwrap();
117                    if fail {
118                        return Err(errs::Err::new(Reasons::BadString(s.to_string())));
119                    }
120                    *s = s.to_uppercase();
121                }
122                Ok(())
123            });
124        }
125
126        fn process_multiple(&self, ag: &mut AsyncGroup) {
127            let s_mutex = self.string.clone();
128            let fail = self.fail;
129            ag.add(move || {
130                let _ = thread::sleep(time::Duration::from_millis(100));
131                {
132                    let mut s = s_mutex.lock().unwrap();
133                    if fail {
134                        return Err(errs::Err::new(Reasons::BadString(s.to_string())));
135                    }
136                    *s = s.to_uppercase();
137                }
138                Ok(())
139            });
140
141            let s_mutex = self.string.clone();
142            let fail = self.fail;
143            ag.add(move || {
144                let _ = thread::sleep(time::Duration::from_millis(100));
145                {
146                    let mut s = s_mutex.lock().unwrap();
147                    if fail {
148                        return Err(errs::Err::new(Reasons::BadString(s.to_string())));
149                    }
150                    *s = s.to_uppercase();
151                }
152                Ok(())
153            });
154        }
155    }
156
157    mod tests_of_join_and_collect_errors {
158        use super::*;
159
160        #[test]
161        fn zero() {
162            let mut ag = AsyncGroup::new();
163
164            let mut err_vec = Vec::new();
165            ag.join_and_collect_errors(&mut err_vec);
166
167            assert!(err_vec.is_empty());
168        }
169
170        #[test]
171        fn single_ok() {
172            let mut ag = AsyncGroup::new();
173
174            let struct_a = MyStruct::new("a".to_string(), false);
175            assert_eq!(*struct_a.string.lock().unwrap(), "a");
176
177            ag._name = "foo".into();
178            struct_a.process(&mut ag);
179
180            let mut errors = Vec::new();
181            ag.join_and_collect_errors(&mut errors);
182
183            assert!(errors.is_empty());
184            assert_eq!(*struct_a.string.lock().unwrap(), "A");
185        }
186
187        #[test]
188        fn single_fail() {
189            let mut ag = AsyncGroup::new();
190
191            let struct_a = MyStruct::new("a".to_string(), true);
192            assert_eq!(*struct_a.string.lock().unwrap(), "a");
193
194            ag._name = "foo".into();
195            struct_a.process(&mut ag);
196
197            let mut errors = Vec::new();
198            ag.join_and_collect_errors(&mut errors);
199
200            assert_eq!(errors.len(), 1);
201            assert_eq!(*struct_a.string.lock().unwrap(), "a");
202
203            assert_eq!(errors[0].0, "foo".into());
204            #[cfg(unix)]
205            assert_eq!(
206                format!("{:?}", errors[0].1),
207                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src/async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }"
208            );
209            #[cfg(windows)]
210            assert_eq!(
211                format!("{:?}", errors[0].1),
212                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src\\async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }"
213            );
214        }
215
216        #[test]
217        fn multiple_ok() {
218            let mut ag = AsyncGroup::new();
219
220            let struct_a = MyStruct::new("a".to_string(), false);
221            assert_eq!(*struct_a.string.lock().unwrap(), "a".to_string());
222
223            let struct_b = MyStruct::new("b".to_string(), false);
224            assert_eq!(*struct_b.string.lock().unwrap(), "b".to_string());
225
226            let struct_c = MyStruct::new("c".to_string(), false);
227            assert_eq!(*struct_c.string.lock().unwrap(), "c".to_string());
228
229            ag._name = "foo".into();
230            struct_a.process(&mut ag);
231
232            ag._name = "bar".into();
233            struct_b.process(&mut ag);
234
235            ag._name = "baz".into();
236            struct_c.process(&mut ag);
237
238            let mut err_vec = Vec::new();
239            ag.join_and_collect_errors(&mut err_vec);
240
241            assert_eq!(err_vec.len(), 0);
242
243            assert_eq!(*struct_a.string.lock().unwrap(), "A");
244            assert_eq!(*struct_b.string.lock().unwrap(), "B");
245            assert_eq!(*struct_c.string.lock().unwrap(), "C");
246        }
247
248        #[test]
249        fn multiple_processes_and_single_fail() {
250            let mut ag = AsyncGroup::new();
251
252            let struct_a = MyStruct::new("a".to_string(), false);
253            assert_eq!(*struct_a.string.lock().unwrap(), "a");
254
255            let struct_b = MyStruct::new("b".to_string(), true);
256            assert_eq!(*struct_b.string.lock().unwrap(), "b");
257
258            let struct_c = MyStruct::new("c".to_string(), false);
259            assert_eq!(*struct_c.string.lock().unwrap(), "c");
260
261            ag._name = "foo".into();
262            struct_a.process(&mut ag);
263
264            ag._name = "bar".into();
265            struct_b.process(&mut ag);
266
267            ag._name = "baz".into();
268            struct_c.process(&mut ag);
269
270            let mut err_vec = Vec::new();
271            ag.join_and_collect_errors(&mut err_vec);
272
273            assert_eq!(err_vec.len(), 1);
274            assert_eq!(err_vec[0].0, "bar".into());
275            #[cfg(unix)]
276            assert_eq!(
277                format!("{:?}", err_vec[0].1),
278                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src/async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
279            );
280            #[cfg(windows)]
281            assert_eq!(
282                format!("{:?}", err_vec[0].1),
283                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src\\async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
284            );
285
286            assert_eq!(*struct_a.string.lock().unwrap(), "A");
287            assert_eq!(*struct_b.string.lock().unwrap(), "b");
288            assert_eq!(*struct_c.string.lock().unwrap(), "C");
289        }
290
291        #[test]
292        fn multiple_fail() {
293            let mut ag = AsyncGroup::new();
294
295            let struct_a = MyStruct::new("a".to_string(), true);
296            assert_eq!(*struct_a.string.lock().unwrap(), "a");
297
298            let struct_b = MyStruct::new("b".to_string(), true);
299            assert_eq!(*struct_b.string.lock().unwrap(), "b");
300
301            let struct_c = MyStruct::new("c".to_string(), true);
302            assert_eq!(*struct_c.string.lock().unwrap(), "c");
303
304            ag._name = "foo".into();
305            struct_a.process(&mut ag);
306
307            ag._name = "bar".into();
308            struct_b.process(&mut ag);
309
310            ag._name = "baz".into();
311            struct_c.process(&mut ag);
312
313            let mut err_vec = Vec::new();
314            ag.join_and_collect_errors(&mut err_vec);
315
316            assert_eq!(err_vec.len(), 3);
317
318            assert_eq!(err_vec[0].0, "foo".into());
319            assert_eq!(err_vec[1].0, "bar".into());
320            assert_eq!(err_vec[2].0, "baz".into());
321
322            #[cfg(unix)]
323            assert_eq!(
324                format!("{:?}", err_vec[0].1),
325                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src/async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
326            );
327            #[cfg(windows)]
328            assert_eq!(
329                format!("{:?}", err_vec[0].1),
330                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"a\"), file = src\\async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
331            );
332            #[cfg(unix)]
333            assert_eq!(
334                format!("{:?}", err_vec[1].1),
335                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src/async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
336            );
337            #[cfg(windows)]
338            assert_eq!(
339                format!("{:?}", err_vec[1].1),
340                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"b\"), file = src\\async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
341            );
342            #[cfg(unix)]
343            assert_eq!(
344                format!("{:?}", err_vec[2].1),
345                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"c\"), file = src/async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
346            );
347            #[cfg(windows)]
348            assert_eq!(
349                format!("{:?}", err_vec[2].1),
350                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"c\"), file = src\\async_group.rs, line = ".to_string() + &(BASE_LINE + 28).to_string() + " }",
351            );
352
353            assert_eq!(*struct_a.string.lock().unwrap(), "a");
354            assert_eq!(*struct_b.string.lock().unwrap(), "b");
355            assert_eq!(*struct_c.string.lock().unwrap(), "c");
356        }
357
358        #[test]
359        fn data_src_execute_multiple_setup_handles() {
360            let mut ag = AsyncGroup::new();
361
362            let struct_d = MyStruct::new("d".to_string(), false);
363            assert_eq!(*struct_d.string.lock().unwrap(), "d");
364
365            ag._name = "foo".into();
366            struct_d.process(&mut ag);
367
368            let mut err_vec = Vec::new();
369            ag.join_and_collect_errors(&mut err_vec);
370
371            assert_eq!(err_vec.len(), 0);
372
373            assert_eq!(*struct_d.string.lock().unwrap(), "D");
374        }
375
376        #[test]
377        fn collect_all_errors_if_data_src_executes_multiple_setup_handles() {
378            let mut ag = AsyncGroup::new();
379
380            let struct_d = MyStruct::new("d".to_string(), true);
381            assert_eq!(*struct_d.string.lock().unwrap(), "d");
382
383            ag._name = "foo".into();
384            struct_d.process_multiple(&mut ag);
385
386            let mut err_vec = Vec::new();
387            ag.join_and_collect_errors(&mut err_vec);
388
389            assert_eq!(err_vec.len(), 2);
390
391            assert_eq!(err_vec[0].0, "foo".into());
392            assert_eq!(err_vec[1].0, "foo".into());
393
394            #[cfg(unix)]
395            assert_eq!(
396                format!("{:?}", err_vec[0].1),
397                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src/async_group.rs, line = ".to_string() + &(BASE_LINE + 44).to_string() + " }",
398            );
399            #[cfg(windows)]
400            assert_eq!(
401                format!("{:?}", err_vec[0].1),
402                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src\\async_group.rs, line = ".to_string() + &(BASE_LINE + 44).to_string() + " }"
403            );
404
405            #[cfg(unix)]
406            assert_eq!(
407                format!("{:?}", err_vec[1].1),
408                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src/async_group.rs, line = ".to_string() + &(BASE_LINE + 58).to_string() + " }",
409            );
410            #[cfg(windows)]
411            assert_eq!(
412                format!("{:?}", err_vec[1].1),
413                "errs::Err { reason = sabi::async_group::tests_of_async_group::Reasons BadString(\"d\"), file = src\\async_group.rs, line = ".to_string() + &(BASE_LINE + 58).to_string() + " }"
414            );
415
416            assert_eq!(*struct_d.string.lock().unwrap(), "d");
417        }
418    }
419
420    mod tests_of_join_and_ignore_errors {
421        use super::*;
422
423        #[test]
424        fn zero() {
425            let mut ag = AsyncGroup::new();
426
427            ag.join_and_ignore_errors();
428        }
429
430        #[test]
431        fn single_ok() {
432            let mut ag = AsyncGroup::new();
433
434            let struct_a = MyStruct::new("a".to_string(), false);
435            assert_eq!(*struct_a.string.lock().unwrap(), "a");
436
437            ag._name = "foo".into();
438            struct_a.process(&mut ag);
439
440            ag.join_and_ignore_errors();
441            assert_eq!(*struct_a.string.lock().unwrap(), "A");
442        }
443
444        #[test]
445        fn single_fail() {
446            let mut ag = AsyncGroup::new();
447
448            let struct_a = MyStruct::new("a".to_string(), true);
449            assert_eq!(*struct_a.string.lock().unwrap(), "a");
450
451            ag._name = "foo".into();
452            struct_a.process(&mut ag);
453
454            ag.join_and_ignore_errors();
455            assert_eq!(*struct_a.string.lock().unwrap(), "a");
456        }
457
458        #[test]
459        fn multiple_ok() {
460            let mut ag = AsyncGroup::new();
461
462            let struct_a = MyStruct::new("a".to_string(), false);
463            assert_eq!(*struct_a.string.lock().unwrap(), "a");
464
465            let struct_b = MyStruct::new("b".to_string(), false);
466            assert_eq!(*struct_b.string.lock().unwrap(), "b");
467
468            let struct_c = MyStruct::new("c".to_string(), false);
469            assert_eq!(*struct_c.string.lock().unwrap(), "c");
470
471            ag._name = "foo".into();
472            struct_a.process(&mut ag);
473
474            ag._name = "bar".into();
475            struct_b.process(&mut ag);
476
477            ag._name = "baz".into();
478            struct_c.process(&mut ag);
479
480            ag.join_and_ignore_errors();
481
482            assert_eq!(*struct_a.string.lock().unwrap(), "A");
483            assert_eq!(*struct_b.string.lock().unwrap(), "B");
484            assert_eq!(*struct_c.string.lock().unwrap(), "C");
485        }
486
487        #[test]
488        fn multiple_processes_and_single_fail() {
489            let mut ag = AsyncGroup::new();
490
491            let struct_a = MyStruct::new("a".to_string(), false);
492            assert_eq!(*struct_a.string.lock().unwrap(), "a");
493
494            let struct_b = MyStruct::new("b".to_string(), true);
495            assert_eq!(*struct_b.string.lock().unwrap(), "b");
496
497            let struct_c = MyStruct::new("c".to_string(), false);
498            assert_eq!(*struct_c.string.lock().unwrap(), "c");
499
500            ag._name = "foo".into();
501            struct_a.process(&mut ag);
502
503            ag._name = "bar".into();
504            struct_b.process(&mut ag);
505
506            ag._name = "baz".into();
507            struct_c.process(&mut ag);
508
509            ag.join_and_ignore_errors();
510
511            assert_eq!(*struct_a.string.lock().unwrap(), "A");
512            assert_eq!(*struct_b.string.lock().unwrap(), "b");
513            assert_eq!(*struct_c.string.lock().unwrap(), "C");
514        }
515
516        #[test]
517        fn multiple_fail() {
518            let mut ag = AsyncGroup::new();
519
520            let struct_a = MyStruct::new("a".to_string(), true);
521            assert_eq!(*struct_a.string.lock().unwrap(), "a");
522
523            let struct_b = MyStruct::new("b".to_string(), true);
524            assert_eq!(*struct_b.string.lock().unwrap(), "b");
525
526            let struct_c = MyStruct::new("c".to_string(), true);
527            assert_eq!(*struct_c.string.lock().unwrap(), "c");
528
529            ag._name = "foo".into();
530            struct_a.process(&mut ag);
531
532            ag._name = "bar".into();
533            struct_b.process(&mut ag);
534
535            ag._name = "baz".into();
536            struct_c.process(&mut ag);
537
538            ag.join_and_ignore_errors();
539
540            assert_eq!(*struct_a.string.lock().unwrap(), "a");
541            assert_eq!(*struct_b.string.lock().unwrap(), "b");
542            assert_eq!(*struct_c.string.lock().unwrap(), "c");
543        }
544    }
545}