thread_groups/
thread_groups.rs

1//! Thread Groups is a simple tool for spawing several threads and waiting for all to complete - i.e.: join - at once.
2//!
3//! It provides the [`ThreadGroup`] struct which does all the job for
4//! you so you can wait and enjoy the silence of your life in
5//! the real world.
6
7use std::collections::{BTreeMap, VecDeque};
8use std::fmt::Display;
9use std::thread::{Builder, JoinHandle, Thread};
10
11/// `thread_id` returns a deterministic name for instances of [`std::thread::Thread`].
12pub fn thread_id(thread: &Thread) -> String {
13    format!(
14        "{}:{}",
15        std::process::id(),
16        thread
17            .name()
18            .map(|a| a.to_string())
19            .unwrap_or_else(|| format!("{:#?}", thread.id()))
20            .to_string()
21    )
22}
23
24/// `ThreadGroup` is allows spawning several threads and waiting for
25/// their completion through the specialized methods.
26pub struct ThreadGroup<T> {
27    id: String,
28    handles: VecDeque<JoinHandle<T>>,
29    count: usize,
30    errors: BTreeMap<String, Error>,
31}
32impl<T: Send + Sync + 'static> ThreadGroup<T> {
33    /// `ThreadGroup::new` creates a new thread group
34    pub fn new() -> ThreadGroup<T> {
35        ThreadGroup::with_id(thread_id(&std::thread::current()))
36    }
37
38    /// `ThreadGroup::with_id` creates a new thread group with a specific id ([`String`])
39    pub fn with_id(id: String) -> ThreadGroup<T> {
40        ThreadGroup {
41            id,
42            handles: VecDeque::new(),
43            errors: BTreeMap::new(),
44            count: 0,
45        }
46    }
47
48    /// `ThreadGroup::spawn` spawns a thread
49    pub fn spawn<F: FnOnce() -> T + Send + 'static>(&mut self, func: F) -> Result<()> {
50        self.count += 1;
51        let name = format!("{}:{}", &self.id, self.count);
52        self.handles.push_back(
53            Builder::new().name(name.clone()).spawn(func).map_err(|e| {
54                Error::ThreadJoinError(format!("spawning thread {}: {:#?}", name, e))
55            })?,
56        );
57        Ok(())
58    }
59
60    /// `ThreadGroup::join` waits for the first thread to join in
61    /// blocking fashion, returning the result of that threads
62    /// [`FnOnce`]
63    pub fn join(&mut self) -> Result<T> {
64        let handle = self
65            .handles
66            .pop_front()
67            .ok_or(Error::ThreadGroupError(format!(
68                "no threads in group {}",
69                &self
70            )))?;
71
72        let id = thread_id(&handle.thread());
73
74        let end = match handle.join() {
75            Ok(t) => Ok(t),
76            Err(e) => {
77                let e = Error::ThreadJoinError(format!("joining thread {}: {:#?}", id, e));
78                self.errors.insert(id, e.clone());
79                Err(e)
80            }
81        };
82        self.count -= 1;
83        end
84    }
85
86    /// `ThreadGroup::results` waits for the all threads to join in
87    /// blocking fashion, returning all their results at once as a [`Vec<Result<T>>`]
88    pub fn results(&mut self) -> Vec<Result<T>> {
89        let mut val = Vec::<Result<T>>::new();
90        while !self.handles.is_empty() {
91            val.push(self.join());
92        }
93        val
94    }
95
96    /// `ThreadGroup::as_far_as_ok` waits for the all threads to join in
97    /// blocking fashion, returning all the OK results at once as a [`Vec<T>`] but ignoring all errors.
98    pub fn as_far_as_ok(&mut self) -> Vec<T> {
99        let mut val = Vec::<T>::new();
100        while !self.handles.is_empty() {
101            if let Ok(g) = self.join() {
102                val.push(g)
103            }
104        }
105        val
106    }
107
108    /// `ThreadGroup::all_ok` waits for the all threads to join in
109    /// blocking fashion, returning all the OK results at once as a [`Vec<T>`] if there are no errors.
110    pub fn all_ok(&mut self) -> Result<Vec<T>> {
111        let mut val = Vec::<T>::new();
112        while !self.handles.is_empty() {
113            val.push(self.join()?);
114        }
115        Ok(val)
116    }
117
118    /// `ThreadGroup::errors` returns a [`BTreeMap<String, Error>`] of errors whose keys are thread ids that panicked.
119    pub fn errors(&self) -> BTreeMap<String, Error> {
120        self.errors.clone()
121    }
122}
123
124impl<T> std::fmt::Display for ThreadGroup<T> {
125    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
126        write!(f, "{}::ThreadGroup {}", module_path!(), &self.id)
127    }
128}
129
130impl<T: Send + Sync + 'static> Default for ThreadGroup<T> {
131    fn default() -> ThreadGroup<T> {
132        Self::new()
133    }
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum Error {
138    ThreadGroupError(String),
139    ThreadJoinError(String),
140}
141
142impl Display for Error {
143    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
144        write!(
145            f,
146            "{}{}",
147            self.prefix().unwrap_or_default(),
148            match self {
149                Self::ThreadGroupError(s) => format!("{}", s),
150                Self::ThreadJoinError(s) => format!("{}", s),
151            }
152        )
153    }
154}
155
156impl Error {
157    pub fn variant(&self) -> String {
158        match self {
159            Error::ThreadGroupError(_) => "ThreadGroupError",
160            Error::ThreadJoinError(_) => "ThreadJoinError",
161        }
162        .to_string()
163    }
164
165    fn prefix(&self) -> Option<String> {
166        match self {
167            _ => Some(format!("{}: ", self.variant())),
168        }
169    }
170}
171
172impl std::error::Error for Error {}
173
174pub type Result<T> = std::result::Result<T, Error>;