thread_groups/
thread_groups.rs

1//! Thread Groups is a simple tool for spawing several threads and waiting for all to complete - i.e.: join - at once.
2//!
3//! It provides the [`ThreadGroup`] struct which does all the job for
4//! you so you can wait and enjoy the silence of your life in
5//! the real world.
6
7use std::collections::{BTreeMap, VecDeque};
8use std::fmt::Display;
9use std::thread::{Builder, JoinHandle, Thread};
10
11/// `thread_id` returns a deterministic name for instances of [`std::thread::Thread`].
12pub fn thread_id(thread: &Thread) -> String {
13    format!(
14        "{}:{}",
15        std::process::id(),
16        thread
17            .name()
18            .map(|a| a.to_string())
19            .unwrap_or_else(|| format!("{:#?}", thread.id()))
20            .to_string()
21    )
22}
23
24/// `ThreadGroup` is allows spawning several threads and waiting for
25/// their completion through the specialized methods.
26pub struct ThreadGroup<T> {
27    id: String,
28    handles: VecDeque<JoinHandle<T>>,
29    count: usize,
30    errors: BTreeMap<String, Error>,
31}
32impl<T: Send + Sync + 'static> ThreadGroup<T> {
33    /// `ThreadGroup::new` creates a new thread group
34    pub fn new() -> ThreadGroup<T> {
35        ThreadGroup::with_id(thread_id(&std::thread::current()))
36    }
37
38    /// `ThreadGroup::with_id` creates a new thread group with a specific id ([`String`])
39    pub fn with_id(id: String) -> ThreadGroup<T> {
40        ThreadGroup {
41            id,
42            handles: VecDeque::new(),
43            errors: BTreeMap::new(),
44            count: 0,
45        }
46    }
47
48    /// `ThreadGroup::spawn` spawns a thread
49    pub fn spawn<F: FnOnce() -> T + Send + 'static>(&mut self, func: F) -> Result<()> {
50        self.count += 1;
51        let name = format!("{}:{}", &self.id, self.count);
52        self.handles.push_back(
53            Builder::new().name(name.clone()).spawn(func).map_err(|e| {
54                Error::ThreadSpawnError(format!("spawning thread {}: {:#?}", name, e))
55            })?,
56        );
57        Ok(())
58    }
59
60    /// `ThreadGroup::join` waits for the first thread to join in
61    /// blocking fashion, returning the result of that threads
62    /// [`FnOnce`]
63    pub fn join(&mut self) -> Result<T> {
64        let handle = self
65            .handles
66            .pop_front()
67            .ok_or(Error::ThreadGroupError(format!("no threads in group {}", &self)))?;
68
69        let id = thread_id(&handle.thread());
70
71        let end = match handle.join() {
72            Ok(t) => Ok(t),
73            Err(e) => {
74                let e = Error::ThreadJoinError(format!("joining thread {}: {:#?}", id, e));
75                self.errors.insert(id, e.clone());
76                Err(e)
77            },
78        };
79        self.count -= 1;
80        end
81    }
82
83    /// `ThreadGroup::results` waits for the all threads to join in
84    /// blocking fashion, returning all their results at once as a [`Vec<Result<T>>`]
85    pub fn results(&mut self) -> Vec<Result<T>> {
86        let mut val = Vec::<Result<T>>::new();
87        while !self.handles.is_empty() {
88            val.push(self.join());
89        }
90        val
91    }
92
93    /// `ThreadGroup::as_far_as_ok` waits for the all threads to join in
94    /// blocking fashion, returning all the OK results at once as a [`Vec<T>`] but ignoring all errors.
95    pub fn as_far_as_ok(&mut self) -> Vec<T> {
96        let mut val = Vec::<T>::new();
97        while !self.handles.is_empty() {
98            if let Ok(g) = self.join() {
99                val.push(g)
100            }
101        }
102        val
103    }
104
105    /// `ThreadGroup::all_ok` waits for the all threads to join in
106    /// blocking fashion, returning all the OK results at once as a [`Vec<T>`] if there are no errors.
107    pub fn all_ok(&mut self) -> Result<Vec<T>> {
108        let mut val = Vec::<T>::new();
109        while !self.handles.is_empty() {
110            val.push(self.join()?);
111        }
112        Ok(val)
113    }
114
115    /// `ThreadGroup::errors` returns a [`BTreeMap<String, Error>`] of errors whose keys are thread ids that panicked.
116    pub fn errors(&self) -> BTreeMap<String, Error> {
117        self.errors.clone()
118    }
119}
120
121impl<T> std::fmt::Display for ThreadGroup<T> {
122    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
123        write!(f, "{}::ThreadGroup {}", module_path!(), &self.id)
124    }
125}
126
127impl<T: Send + Sync + 'static> Default for ThreadGroup<T> {
128    fn default() -> ThreadGroup<T> {
129        Self::new()
130    }
131}
132
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub enum Error {
135    ThreadGroupError(String),
136    ThreadJoinError(String),
137    ThreadSpawnError(String),
138}
139
140impl Display for Error {
141    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
142        write!(
143            f,
144            "{}{}",
145            self.prefix().unwrap_or_default(),
146            match self {
147                Self::ThreadGroupError(s)
148                | Self::ThreadJoinError(s)
149                | Self::ThreadSpawnError(s) => format!("{}", s),
150            }
151        )
152    }
153}
154
155impl Error {
156    pub fn variant(&self) -> String {
157        match self {
158            Error::ThreadGroupError(_) => "ThreadGroupError",
159            Error::ThreadJoinError(_) => "ThreadJoinError",
160            Error::ThreadSpawnError(_) => "ThreadSpawnError",
161        }
162        .to_string()
163    }
164
165    fn prefix(&self) -> Option<String> {
166        match self {
167            _ => Some(format!("{}: ", self.variant())),
168        }
169    }
170}
171
172impl std::error::Error for Error {}
173
174pub type Result<T> = std::result::Result<T, Error>;