thread_groups/
thread_groups.rs

1//! Thread Groups is a simple tool for spawing several threads and waiting for all to complete - i.e.: join - at once.
2//!
3//! It provides the [`ThreadGroup`] struct which does all the job for
4//! you so you can wait and enjoy the silence of your life in
5//! the real world.
6#![feature(thread_id_value)]
7
8use std::collections::{BTreeMap, VecDeque};
9use std::fmt::Display;
10use std::thread::{Builder, JoinHandle, Thread};
11
12/// `thread_id` returns a deterministic name for instances of [`std::thread::Thread`].
13pub fn thread_id(thread: &Thread) -> String {
14    format!(
15        "{}:{}",
16        std::process::id(),
17        thread
18            .name()
19            .map(|a| a.to_string())
20            .unwrap_or_else(|| format!("{}", thread.id().as_u64()))
21            .to_string()
22    )
23}
24
25/// `ThreadGroup` is allows spawning several threads and waiting for
26/// their completion through the specialized methods.
27pub struct ThreadGroup<T> {
28    id: String,
29    handles: VecDeque<JoinHandle<T>>,
30    count: usize,
31    errors: BTreeMap<String, Error>,
32}
33impl<T: Send + Sync + 'static> ThreadGroup<T> {
34    /// `ThreadGroup::new` creates a new thread group
35    pub fn new() -> ThreadGroup<T> {
36        ThreadGroup::with_id(thread_id(&std::thread::current()))
37    }
38
39    /// `ThreadGroup::with_id` creates a new thread group with a specific id ([`String`])
40    pub fn with_id(id: String) -> ThreadGroup<T> {
41        ThreadGroup {
42            id,
43            handles: VecDeque::new(),
44            errors: BTreeMap::new(),
45            count: 0,
46        }
47    }
48
49    /// `ThreadGroup::spawn` spawns a thread
50    pub fn spawn<F: FnOnce() -> T + Send + 'static>(&mut self, func: F) -> Result<()> {
51        self.count += 1;
52        let name = format!("{}:{}", &self.id, self.count);
53        self.handles.push_back(
54            Builder::new().name(name.clone()).spawn(func).map_err(|e| {
55                Error::ThreadJoinError(format!("spawning thread {}: {:#?}", name, e))
56            })?,
57        );
58        Ok(())
59    }
60
61    /// `ThreadGroup::join` waits for the first thread to join in
62    /// blocking fashion, returning the result of that threads
63    /// [`FnOnce`]
64    pub fn join(&mut self) -> Result<T> {
65        let handle = self
66            .handles
67            .pop_front()
68            .ok_or(Error::ThreadGroupError(format!(
69                "no threads in group {}",
70                &self
71            )))?;
72
73        let id = thread_id(&handle.thread());
74
75        let end = match handle.join() {
76            Ok(t) => Ok(t),
77            Err(e) => {
78                let e = Error::ThreadJoinError(format!("joining thread {}: {:#?}", id, e));
79                self.errors.insert(id, e.clone());
80                Err(e)
81            }
82        };
83        self.count -= 1;
84        end
85    }
86
87    /// `ThreadGroup::results` waits for the all threads to join in
88    /// blocking fashion, returning all their results at once as a [`Vec<Result<T>>`]
89    pub fn results(&mut self) -> Vec<Result<T>> {
90        let mut val = Vec::<Result<T>>::new();
91        while !self.handles.is_empty() {
92            val.push(self.join());
93        }
94        val
95    }
96
97    /// `ThreadGroup::as_far_as_ok` waits for the all threads to join in
98    /// blocking fashion, returning all the OK results at once as a [`Vec<T>`] but ignoring all errors.
99    pub fn as_far_as_ok(&mut self) -> Vec<T> {
100        let mut val = Vec::<T>::new();
101        while !self.handles.is_empty() {
102            if let Ok(g) = self.join() {
103                val.push(g)
104            }
105        }
106        val
107    }
108
109    /// `ThreadGroup::all_ok` waits for the all threads to join in
110    /// blocking fashion, returning all the OK results at once as a [`Vec<T>`] if there are no errors.
111    pub fn all_ok(&mut self) -> Result<Vec<T>> {
112        let mut val = Vec::<T>::new();
113        while !self.handles.is_empty() {
114            val.push(self.join()?);
115        }
116        Ok(val)
117    }
118
119    /// `ThreadGroup::errors` returns a [`BTreeMap<String, Error>`] of errors whose keys are thread ids that panicked.
120    pub fn errors(&self) -> BTreeMap<String, Error> {
121        self.errors.clone()
122    }
123}
124
125impl<T> std::fmt::Display for ThreadGroup<T> {
126    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
127        write!(f, "{}::ThreadGroup {}", module_path!(), &self.id)
128    }
129}
130
131impl<T: Send + Sync + 'static> Default for ThreadGroup<T> {
132    fn default() -> ThreadGroup<T> {
133        Self::new()
134    }
135}
136
137#[derive(Debug, Clone, PartialEq, Eq)]
138pub enum Error {
139    ThreadGroupError(String),
140    ThreadJoinError(String),
141}
142
143impl Display for Error {
144    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
145        write!(
146            f,
147            "{}{}",
148            self.prefix().unwrap_or_default(),
149            match self {
150                Self::ThreadGroupError(s) => format!("{}", s),
151                Self::ThreadJoinError(s) => format!("{}", s),
152            }
153        )
154    }
155}
156
157impl Error {
158    pub fn variant(&self) -> String {
159        match self {
160            Error::ThreadGroupError(_) => "ThreadGroupError",
161            Error::ThreadJoinError(_) => "ThreadJoinError",
162        }
163        .to_string()
164    }
165
166    fn prefix(&self) -> Option<String> {
167        match self {
168            _ => Some(format!("{}: ", self.variant())),
169        }
170    }
171}
172
173impl std::error::Error for Error {}
174
175pub type Result<T> = std::result::Result<T, Error>;