thread_group/
thread_group.rs

1//! Thread Groups is a simple tool for spawing several threads and waiting for all to complete - i.e.: join - at once.
2//!
3//! It provides the [`ThreadGroup`] struct which does all the job for
4//! you so you can wait and enjoy the silence of your life in
5//! the real world.
6#![feature(thread_id_value)]
7
8use std::collections::{BTreeMap, VecDeque};
9use std::thread::{Builder, JoinHandle, Thread};
10
11/// `thread_id` returns a deterministic name for instances of [`std::thread::Thread`].
12pub fn thread_id(thread: &Thread) -> String {
13    format!(
14        "{}:{}",
15        std::process::id(),
16        thread
17            .name()
18            .map(|a| a.to_string())
19            .unwrap_or_else(|| format!("{}", thread.id().as_u64()))
20            .to_string()
21    )
22}
23
24/// `ThreadGroup` is allows spawning several threads and waiting for
25/// their completion through the specialized methods.
26pub struct ThreadGroup<T> {
27    id: String,
28    handles: VecDeque<JoinHandle<T>>,
29    count: usize,
30    errors: BTreeMap<String, Error>,
31}
32impl<T: Send + Sync + 'static> ThreadGroup<T> {
33    /// `ThreadGroup::new` creates a new thread group
34    pub fn new() -> ThreadGroup<T> {
35        ThreadGroup::with_id(thread_id(&std::thread::current()))
36    }
37
38    /// `ThreadGroup::with_id` creates a new thread group with a specific id ([`String`])
39    pub fn with_id(id: String) -> ThreadGroup<T> {
40        ThreadGroup {
41            id,
42            handles: VecDeque::new(),
43            errors: BTreeMap::new(),
44            count: 0,
45        }
46    }
47
48    /// `ThreadGroup::spawn` spawns a thread
49    pub fn spawn<F: FnOnce() -> T + Send + 'static>(&mut self, func: F) -> Result<()> {
50        assert_eq!(self.handles.len(), self.count);
51        self.count += 1;
52        let name = format!("{}:{}", &self.id, self.count);
53        self.handles.push_front(
54            Builder::new().name(name.clone()).spawn(func).map_err(|e| {
55                Error::ThreadJoinError(format!("spawning thread {}: {:#?}", name, e))
56            })?,
57        );
58        assert_eq!(self.handles.len(), self.count);
59        Ok(())
60    }
61
62    /// `ThreadGroup::join` waits for the first thread to join in
63    /// blocking fashion, returning the result of that threads
64    /// [`FnOnce`]
65    pub fn join(&mut self) -> Result<T> {
66        assert_eq!(self.handles.len(), self.count);
67        let handle = self
68            .handles
69            .pop_front()
70            .ok_or(Error::ThreadGroupError(format!(
71                "no threads in group {}",
72                &self
73            )))?;
74
75        let id = thread_id(&handle.thread());
76
77        let end = match handle.join() {
78            Ok(t) => Ok(t),
79            Err(e) => {
80                self.count -= 1;
81                let e = Error::ThreadJoinError(format!("joining thread {}: {:#?}", id, e));
82                self.errors.insert(id, e.clone());
83                Err(e)
84            }
85        };
86        self.count -= 1;
87        assert_eq!(self.handles.len(), self.count);
88        end
89    }
90
91    /// `ThreadGroup::results` waits for the all threads to join in
92    /// blocking fashion, returning all their results at once as a [`Vec<Result<T>>`]
93    pub fn results(&mut self) -> Vec<Result<T>> {
94        let mut val = Vec::<Result<T>>::new();
95        while !self.handles.is_empty() {
96            val.push(self.join());
97        }
98        val
99    }
100
101    /// `ThreadGroup::as_far_as_ok` waits for the all threads to join in
102    /// blocking fashion, returning all the OK results at once as a [`Vec<T>`] but ignoring all errors.
103    pub fn as_far_as_ok(&mut self) -> Vec<T> {
104        let mut val = Vec::<T>::new();
105        while !self.handles.is_empty() {
106            if let Ok(g) = self.join() {
107                val.push(g)
108            }
109        }
110        val
111    }
112
113    /// `ThreadGroup::all_ok` waits for the all threads to join in
114    /// blocking fashion, returning all the OK results at once as a [`Vec<T>`] if there are no errors.
115    pub fn all_ok(&mut self) -> Result<Vec<T>> {
116        let mut val = Vec::<T>::new();
117        while !self.handles.is_empty() {
118            val.push(self.join()?);
119        }
120        Ok(val)
121    }
122}
123
124impl<T> std::fmt::Display for ThreadGroup<T> {
125    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
126        write!(f, "{}::ThreadGroup {}", module_path!(), &self.id)
127    }
128}
129
130impl<T: Send + Sync + 'static> Default for ThreadGroup<T> {
131    fn default() -> ThreadGroup<T> {
132        Self::new()
133    }
134}
135
136#[cfg(test)]
137mod thread_group_test {
138    use super::*;
139
140    #[test]
141    fn test_join() -> Result<()> {
142        Ok({
143            let mut threads =
144                ThreadGroup::<String>::with_id(format!("{}:{}", module_path!(), line!()));
145            for count in 65..67 {
146                threads.spawn(move || {
147                    format!(
148                        "{}",
149                        char::from_u32(count)
150                            .map(|val| val.to_string())
151                            .unwrap_or(format!("{}", count))
152                    )
153                })?;
154            }
155            let mut data = threads.all_ok()?;
156            data.sort();
157
158            assert_eq!(
159                data,
160                ["A", "B"]
161                    .iter()
162                    .map(|val| val.to_string())
163                    .collect::<Vec<String>>()
164            );
165        })
166    }
167}
168use std::fmt::Display;
169
170#[derive(Debug, Clone)]
171pub enum Error {
172    ThreadGroupError(String),
173    ThreadJoinError(String),
174}
175
176impl Display for Error {
177    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
178        write!(
179            f,
180            "{}{}",
181            self.prefix().unwrap_or_default(),
182            match self {
183                Self::ThreadGroupError(s) => format!("{}", s),
184                Self::ThreadJoinError(s) => format!("{}", s),
185            }
186        )
187    }
188}
189
190impl Error {
191    pub fn variant(&self) -> String {
192        match self {
193            Error::ThreadGroupError(_) => "ThreadGroupError",
194            Error::ThreadJoinError(_) => "ThreadJoinError",
195        }
196        .to_string()
197    }
198
199    fn prefix(&self) -> Option<String> {
200        match self {
201            _ => Some(format!("{}: ", self.variant())),
202        }
203    }
204}
205
206impl std::error::Error for Error {}
207
208pub type Result<T> = std::result::Result<T, Error>;