thread_group/
thread_group.rs1#![feature(thread_id_value)]
7
8use std::collections::{BTreeMap, VecDeque};
9use std::thread::{Builder, JoinHandle, Thread};
10
11pub fn thread_id(thread: &Thread) -> String {
13 format!(
14 "{}:{}",
15 std::process::id(),
16 thread
17 .name()
18 .map(|a| a.to_string())
19 .unwrap_or_else(|| format!("{}", thread.id().as_u64()))
20 .to_string()
21 )
22}
23
24pub struct ThreadGroup<T> {
27 id: String,
28 handles: VecDeque<JoinHandle<T>>,
29 count: usize,
30 errors: BTreeMap<String, Error>,
31}
32impl<T: Send + Sync + 'static> ThreadGroup<T> {
33 pub fn new() -> ThreadGroup<T> {
35 ThreadGroup::with_id(thread_id(&std::thread::current()))
36 }
37
38 pub fn with_id(id: String) -> ThreadGroup<T> {
40 ThreadGroup {
41 id,
42 handles: VecDeque::new(),
43 errors: BTreeMap::new(),
44 count: 0,
45 }
46 }
47
48 pub fn spawn<F: FnOnce() -> T + Send + 'static>(&mut self, func: F) -> Result<()> {
50 assert_eq!(self.handles.len(), self.count);
51 self.count += 1;
52 let name = format!("{}:{}", &self.id, self.count);
53 self.handles.push_front(
54 Builder::new().name(name.clone()).spawn(func).map_err(|e| {
55 Error::ThreadJoinError(format!("spawning thread {}: {:#?}", name, e))
56 })?,
57 );
58 assert_eq!(self.handles.len(), self.count);
59 Ok(())
60 }
61
62 pub fn join(&mut self) -> Result<T> {
66 assert_eq!(self.handles.len(), self.count);
67 let handle = self
68 .handles
69 .pop_front()
70 .ok_or(Error::ThreadGroupError(format!(
71 "no threads in group {}",
72 &self
73 )))?;
74
75 let id = thread_id(&handle.thread());
76
77 let end = match handle.join() {
78 Ok(t) => Ok(t),
79 Err(e) => {
80 self.count -= 1;
81 let e = Error::ThreadJoinError(format!("joining thread {}: {:#?}", id, e));
82 self.errors.insert(id, e.clone());
83 Err(e)
84 }
85 };
86 self.count -= 1;
87 assert_eq!(self.handles.len(), self.count);
88 end
89 }
90
91 pub fn results(&mut self) -> Vec<Result<T>> {
94 let mut val = Vec::<Result<T>>::new();
95 while !self.handles.is_empty() {
96 val.push(self.join());
97 }
98 val
99 }
100
101 pub fn as_far_as_ok(&mut self) -> Vec<T> {
104 let mut val = Vec::<T>::new();
105 while !self.handles.is_empty() {
106 if let Ok(g) = self.join() {
107 val.push(g)
108 }
109 }
110 val
111 }
112
113 pub fn all_ok(&mut self) -> Result<Vec<T>> {
116 let mut val = Vec::<T>::new();
117 while !self.handles.is_empty() {
118 val.push(self.join()?);
119 }
120 Ok(val)
121 }
122}
123
124impl<T> std::fmt::Display for ThreadGroup<T> {
125 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
126 write!(f, "{}::ThreadGroup {}", module_path!(), &self.id)
127 }
128}
129
130impl<T: Send + Sync + 'static> Default for ThreadGroup<T> {
131 fn default() -> ThreadGroup<T> {
132 Self::new()
133 }
134}
135
136#[cfg(test)]
137mod thread_group_test {
138 use super::*;
139
140 #[test]
141 fn test_join() -> Result<()> {
142 Ok({
143 let mut threads =
144 ThreadGroup::<String>::with_id(format!("{}:{}", module_path!(), line!()));
145 for count in 65..67 {
146 threads.spawn(move || {
147 format!(
148 "{}",
149 char::from_u32(count)
150 .map(|val| val.to_string())
151 .unwrap_or(format!("{}", count))
152 )
153 })?;
154 }
155 let mut data = threads.all_ok()?;
156 data.sort();
157
158 assert_eq!(
159 data,
160 ["A", "B"]
161 .iter()
162 .map(|val| val.to_string())
163 .collect::<Vec<String>>()
164 );
165 })
166 }
167}
168use std::fmt::Display;
169
170#[derive(Debug, Clone)]
171pub enum Error {
172 ThreadGroupError(String),
173 ThreadJoinError(String),
174}
175
176impl Display for Error {
177 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
178 write!(
179 f,
180 "{}{}",
181 self.prefix().unwrap_or_default(),
182 match self {
183 Self::ThreadGroupError(s) => format!("{}", s),
184 Self::ThreadJoinError(s) => format!("{}", s),
185 }
186 )
187 }
188}
189
190impl Error {
191 pub fn variant(&self) -> String {
192 match self {
193 Error::ThreadGroupError(_) => "ThreadGroupError",
194 Error::ThreadJoinError(_) => "ThreadJoinError",
195 }
196 .to_string()
197 }
198
199 fn prefix(&self) -> Option<String> {
200 match self {
201 _ => Some(format!("{}: ", self.variant())),
202 }
203 }
204}
205
206impl std::error::Error for Error {}
207
208pub type Result<T> = std::result::Result<T, Error>;