thread_groups/
thread_groups.rs1#![feature(thread_id_value)]
7
8use std::collections::{BTreeMap, VecDeque};
9use std::thread::{Builder, JoinHandle, Thread};
10
11pub fn thread_id(thread: &Thread) -> String {
13 format!(
14 "{}:{}",
15 std::process::id(),
16 thread
17 .name()
18 .map(|a| a.to_string())
19 .unwrap_or_else(|| format!("{}", thread.id().as_u64()))
20 .to_string()
21 )
22}
23
24pub struct ThreadGroup<T> {
27 id: String,
28 handles: VecDeque<JoinHandle<T>>,
29 count: usize,
30 errors: BTreeMap<String, Error>,
31}
32impl<T: Send + Sync + 'static> ThreadGroup<T> {
33 pub fn new() -> ThreadGroup<T> {
35 ThreadGroup::with_id(thread_id(&std::thread::current()))
36 }
37
38 pub fn with_id(id: String) -> ThreadGroup<T> {
40 ThreadGroup {
41 id,
42 handles: VecDeque::new(),
43 errors: BTreeMap::new(),
44 count: 0,
45 }
46 }
47
48 pub fn spawn<F: FnOnce() -> T + Send + 'static>(&mut self, func: F) -> Result<()> {
50 self.count += 1;
51 let name = format!("{}:{}", &self.id, self.count);
52 self.handles.push_front(
53 Builder::new().name(name.clone()).spawn(func).map_err(|e| {
54 Error::ThreadJoinError(format!("spawning thread {}: {:#?}", name, e))
55 })?,
56 );
57 Ok(())
58 }
59
60 pub fn join(&mut self) -> Result<T> {
64 let handle = self
65 .handles
66 .pop_front()
67 .ok_or(Error::ThreadGroupError(format!(
68 "no threads in group {}",
69 &self
70 )))?;
71
72 let id = thread_id(&handle.thread());
73
74 let end = match handle.join() {
75 Ok(t) => Ok(t),
76 Err(e) => {
77 self.count -= 1;
78 let e = Error::ThreadJoinError(format!("joining thread {}: {:#?}", id, e));
79 self.errors.insert(id, e.clone());
80 Err(e)
81 }
82 };
83 self.count -= 1;
84 end
85 }
86
87 pub fn results(&mut self) -> Vec<Result<T>> {
90 let mut val = Vec::<Result<T>>::new();
91 while !self.handles.is_empty() {
92 val.push(self.join());
93 }
94 val
95 }
96
97 pub fn as_far_as_ok(&mut self) -> Vec<T> {
100 let mut val = Vec::<T>::new();
101 while !self.handles.is_empty() {
102 if let Ok(g) = self.join() {
103 val.push(g)
104 }
105 }
106 val
107 }
108
109 pub fn all_ok(&mut self) -> Result<Vec<T>> {
112 let mut val = Vec::<T>::new();
113 while !self.handles.is_empty() {
114 val.push(self.join()?);
115 }
116 Ok(val)
117 }
118}
119
120impl<T> std::fmt::Display for ThreadGroup<T> {
121 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
122 write!(f, "{}::ThreadGroup {}", module_path!(), &self.id)
123 }
124}
125
126impl<T: Send + Sync + 'static> Default for ThreadGroup<T> {
127 fn default() -> ThreadGroup<T> {
128 Self::new()
129 }
130}
131
132#[cfg(test)]
133mod thread_group_test {
134 use super::*;
135
136 #[test]
137 fn test_join() -> Result<()> {
138 Ok({
139 let mut threads =
140 ThreadGroup::<String>::with_id(format!("{}:{}", module_path!(), line!()));
141 for count in 65..67 {
142 threads.spawn(move || {
143 format!(
144 "{}",
145 char::from_u32(count)
146 .map(|val| val.to_string())
147 .unwrap_or(format!("{}", count))
148 )
149 })?;
150 }
151 let mut data = threads.all_ok()?;
152 data.sort();
153
154 assert_eq!(
155 data,
156 ["A", "B"]
157 .iter()
158 .map(|val| val.to_string())
159 .collect::<Vec<String>>()
160 );
161 })
162 }
163}
164use std::fmt::Display;
165
166#[derive(Debug, Clone)]
167pub enum Error {
168 ThreadGroupError(String),
169 ThreadJoinError(String),
170}
171
172impl Display for Error {
173 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
174 write!(
175 f,
176 "{}{}",
177 self.prefix().unwrap_or_default(),
178 match self {
179 Self::ThreadGroupError(s) => format!("{}", s),
180 Self::ThreadJoinError(s) => format!("{}", s),
181 }
182 )
183 }
184}
185
186impl Error {
187 pub fn variant(&self) -> String {
188 match self {
189 Error::ThreadGroupError(_) => "ThreadGroupError",
190 Error::ThreadJoinError(_) => "ThreadJoinError",
191 }
192 .to_string()
193 }
194
195 fn prefix(&self) -> Option<String> {
196 match self {
197 _ => Some(format!("{}: ", self.variant())),
198 }
199 }
200}
201
202impl std::error::Error for Error {}
203
204pub type Result<T> = std::result::Result<T, Error>;