thread_groups/
thread_groups.rs

1//! Thread Groups is a simple tool for spawing several threads and waiting for all to complete - i.e.: join - at once.
2//!
3//! It provides the [`ThreadGroup`] struct which does all the job for
4//! you so you can wait and enjoy the silence of your life in
5//! the real world.
6#![feature(thread_id_value)]
7
8use std::collections::{BTreeMap, VecDeque};
9use std::thread::{Builder, JoinHandle, Thread};
10
11/// `thread_id` returns a deterministic name for instances of [`std::thread::Thread`].
12pub fn thread_id(thread: &Thread) -> String {
13    format!(
14        "{}:{}",
15        std::process::id(),
16        thread
17            .name()
18            .map(|a| a.to_string())
19            .unwrap_or_else(|| format!("{}", thread.id().as_u64()))
20            .to_string()
21    )
22}
23
24/// `ThreadGroup` is allows spawning several threads and waiting for
25/// their completion through the specialized methods.
26pub struct ThreadGroup<T> {
27    id: String,
28    handles: VecDeque<JoinHandle<T>>,
29    count: usize,
30    errors: BTreeMap<String, Error>,
31}
32impl<T: Send + Sync + 'static> ThreadGroup<T> {
33    /// `ThreadGroup::new` creates a new thread group
34    pub fn new() -> ThreadGroup<T> {
35        ThreadGroup::with_id(thread_id(&std::thread::current()))
36    }
37
38    /// `ThreadGroup::with_id` creates a new thread group with a specific id ([`String`])
39    pub fn with_id(id: String) -> ThreadGroup<T> {
40        ThreadGroup {
41            id,
42            handles: VecDeque::new(),
43            errors: BTreeMap::new(),
44            count: 0,
45        }
46    }
47
48    /// `ThreadGroup::spawn` spawns a thread
49    pub fn spawn<F: FnOnce() -> T + Send + 'static>(&mut self, func: F) -> Result<()> {
50        self.count += 1;
51        let name = format!("{}:{}", &self.id, self.count);
52        self.handles.push_front(
53            Builder::new().name(name.clone()).spawn(func).map_err(|e| {
54                Error::ThreadJoinError(format!("spawning thread {}: {:#?}", name, e))
55            })?,
56        );
57        Ok(())
58    }
59
60    /// `ThreadGroup::join` waits for the first thread to join in
61    /// blocking fashion, returning the result of that threads
62    /// [`FnOnce`]
63    pub fn join(&mut self) -> Result<T> {
64        let handle = self
65            .handles
66            .pop_front()
67            .ok_or(Error::ThreadGroupError(format!(
68                "no threads in group {}",
69                &self
70            )))?;
71
72        let id = thread_id(&handle.thread());
73
74        let end = match handle.join() {
75            Ok(t) => Ok(t),
76            Err(e) => {
77                self.count -= 1;
78                let e = Error::ThreadJoinError(format!("joining thread {}: {:#?}", id, e));
79                self.errors.insert(id, e.clone());
80                Err(e)
81            }
82        };
83        self.count -= 1;
84        end
85    }
86
87    /// `ThreadGroup::results` waits for the all threads to join in
88    /// blocking fashion, returning all their results at once as a [`Vec<Result<T>>`]
89    pub fn results(&mut self) -> Vec<Result<T>> {
90        let mut val = Vec::<Result<T>>::new();
91        while !self.handles.is_empty() {
92            val.push(self.join());
93        }
94        val
95    }
96
97    /// `ThreadGroup::as_far_as_ok` waits for the all threads to join in
98    /// blocking fashion, returning all the OK results at once as a [`Vec<T>`] but ignoring all errors.
99    pub fn as_far_as_ok(&mut self) -> Vec<T> {
100        let mut val = Vec::<T>::new();
101        while !self.handles.is_empty() {
102            if let Ok(g) = self.join() {
103                val.push(g)
104            }
105        }
106        val
107    }
108
109    /// `ThreadGroup::all_ok` waits for the all threads to join in
110    /// blocking fashion, returning all the OK results at once as a [`Vec<T>`] if there are no errors.
111    pub fn all_ok(&mut self) -> Result<Vec<T>> {
112        let mut val = Vec::<T>::new();
113        while !self.handles.is_empty() {
114            val.push(self.join()?);
115        }
116        Ok(val)
117    }
118}
119
120impl<T> std::fmt::Display for ThreadGroup<T> {
121    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
122        write!(f, "{}::ThreadGroup {}", module_path!(), &self.id)
123    }
124}
125
126impl<T: Send + Sync + 'static> Default for ThreadGroup<T> {
127    fn default() -> ThreadGroup<T> {
128        Self::new()
129    }
130}
131
132#[cfg(test)]
133mod thread_group_test {
134    use super::*;
135
136    #[test]
137    fn test_join() -> Result<()> {
138        Ok({
139            let mut threads =
140                ThreadGroup::<String>::with_id(format!("{}:{}", module_path!(), line!()));
141            for count in 65..67 {
142                threads.spawn(move || {
143                    format!(
144                        "{}",
145                        char::from_u32(count)
146                            .map(|val| val.to_string())
147                            .unwrap_or(format!("{}", count))
148                    )
149                })?;
150            }
151            let mut data = threads.all_ok()?;
152            data.sort();
153
154            assert_eq!(
155                data,
156                ["A", "B"]
157                    .iter()
158                    .map(|val| val.to_string())
159                    .collect::<Vec<String>>()
160            );
161        })
162    }
163}
164use std::fmt::Display;
165
166#[derive(Debug, Clone)]
167pub enum Error {
168    ThreadGroupError(String),
169    ThreadJoinError(String),
170}
171
172impl Display for Error {
173    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
174        write!(
175            f,
176            "{}{}",
177            self.prefix().unwrap_or_default(),
178            match self {
179                Self::ThreadGroupError(s) => format!("{}", s),
180                Self::ThreadJoinError(s) => format!("{}", s),
181            }
182        )
183    }
184}
185
186impl Error {
187    pub fn variant(&self) -> String {
188        match self {
189            Error::ThreadGroupError(_) => "ThreadGroupError",
190            Error::ThreadJoinError(_) => "ThreadJoinError",
191        }
192        .to_string()
193    }
194
195    fn prefix(&self) -> Option<String> {
196        match self {
197            _ => Some(format!("{}: ", self.variant())),
198        }
199    }
200}
201
202impl std::error::Error for Error {}
203
204pub type Result<T> = std::result::Result<T, Error>;