thread_groups/
thread_groups.rs1use std::collections::{BTreeMap, VecDeque};
8use std::fmt::Display;
9use std::thread::{Builder, JoinHandle, Thread};
10
11pub fn thread_id(thread: &Thread) -> String {
13 format!(
14 "{}:{}",
15 std::process::id(),
16 thread
17 .name()
18 .map(|a| a.to_string())
19 .unwrap_or_else(|| format!("{:#?}", thread.id()))
20 .to_string()
21 )
22}
23
24pub struct ThreadGroup<T> {
27 id: String,
28 handles: VecDeque<JoinHandle<T>>,
29 count: usize,
30 errors: BTreeMap<String, Error>,
31}
32impl<T: Send + Sync + 'static> ThreadGroup<T> {
33 pub fn new() -> ThreadGroup<T> {
35 ThreadGroup::with_id(thread_id(&std::thread::current()))
36 }
37
38 pub fn with_id(id: String) -> ThreadGroup<T> {
40 ThreadGroup {
41 id,
42 handles: VecDeque::new(),
43 errors: BTreeMap::new(),
44 count: 0,
45 }
46 }
47
48 pub fn spawn<F: FnOnce() -> T + Send + 'static>(&mut self, func: F) -> Result<()> {
50 self.count += 1;
51 let name = format!("{}:{}", &self.id, self.count);
52 self.handles.push_back(
53 Builder::new().name(name.clone()).spawn(func).map_err(|e| {
54 Error::ThreadJoinError(format!("spawning thread {}: {:#?}", name, e))
55 })?,
56 );
57 Ok(())
58 }
59
60 pub fn join(&mut self) -> Result<T> {
64 let handle = self
65 .handles
66 .pop_front()
67 .ok_or(Error::ThreadGroupError(format!(
68 "no threads in group {}",
69 &self
70 )))?;
71
72 let id = thread_id(&handle.thread());
73
74 let end = match handle.join() {
75 Ok(t) => Ok(t),
76 Err(e) => {
77 let e = Error::ThreadJoinError(format!("joining thread {}: {:#?}", id, e));
78 self.errors.insert(id, e.clone());
79 Err(e)
80 }
81 };
82 self.count -= 1;
83 end
84 }
85
86 pub fn results(&mut self) -> Vec<Result<T>> {
89 let mut val = Vec::<Result<T>>::new();
90 while !self.handles.is_empty() {
91 val.push(self.join());
92 }
93 val
94 }
95
96 pub fn as_far_as_ok(&mut self) -> Vec<T> {
99 let mut val = Vec::<T>::new();
100 while !self.handles.is_empty() {
101 if let Ok(g) = self.join() {
102 val.push(g)
103 }
104 }
105 val
106 }
107
108 pub fn all_ok(&mut self) -> Result<Vec<T>> {
111 let mut val = Vec::<T>::new();
112 while !self.handles.is_empty() {
113 val.push(self.join()?);
114 }
115 Ok(val)
116 }
117
118 pub fn errors(&self) -> BTreeMap<String, Error> {
120 self.errors.clone()
121 }
122}
123
124impl<T> std::fmt::Display for ThreadGroup<T> {
125 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
126 write!(f, "{}::ThreadGroup {}", module_path!(), &self.id)
127 }
128}
129
130impl<T: Send + Sync + 'static> Default for ThreadGroup<T> {
131 fn default() -> ThreadGroup<T> {
132 Self::new()
133 }
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum Error {
138 ThreadGroupError(String),
139 ThreadJoinError(String),
140}
141
142impl Display for Error {
143 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
144 write!(
145 f,
146 "{}{}",
147 self.prefix().unwrap_or_default(),
148 match self {
149 Self::ThreadGroupError(s) => format!("{}", s),
150 Self::ThreadJoinError(s) => format!("{}", s),
151 }
152 )
153 }
154}
155
156impl Error {
157 pub fn variant(&self) -> String {
158 match self {
159 Error::ThreadGroupError(_) => "ThreadGroupError",
160 Error::ThreadJoinError(_) => "ThreadJoinError",
161 }
162 .to_string()
163 }
164
165 fn prefix(&self) -> Option<String> {
166 match self {
167 _ => Some(format!("{}: ", self.variant())),
168 }
169 }
170}
171
172impl std::error::Error for Error {}
173
174pub type Result<T> = std::result::Result<T, Error>;