thread_groups/
thread_groups.rs1#![feature(thread_id_value)]
7
8use std::collections::{BTreeMap, VecDeque};
9use std::fmt::Display;
10use std::thread::{Builder, JoinHandle, Thread};
11
12pub fn thread_id(thread: &Thread) -> String {
14 format!(
15 "{}:{}",
16 std::process::id(),
17 thread
18 .name()
19 .map(|a| a.to_string())
20 .unwrap_or_else(|| format!("{}", thread.id().as_u64()))
21 .to_string()
22 )
23}
24
25pub struct ThreadGroup<T> {
28 id: String,
29 handles: VecDeque<JoinHandle<T>>,
30 count: usize,
31 errors: BTreeMap<String, Error>,
32}
33impl<T: Send + Sync + 'static> ThreadGroup<T> {
34 pub fn new() -> ThreadGroup<T> {
36 ThreadGroup::with_id(thread_id(&std::thread::current()))
37 }
38
39 pub fn with_id(id: String) -> ThreadGroup<T> {
41 ThreadGroup {
42 id,
43 handles: VecDeque::new(),
44 errors: BTreeMap::new(),
45 count: 0,
46 }
47 }
48
49 pub fn spawn<F: FnOnce() -> T + Send + 'static>(&mut self, func: F) -> Result<()> {
51 self.count += 1;
52 let name = format!("{}:{}", &self.id, self.count);
53 self.handles.push_back(
54 Builder::new().name(name.clone()).spawn(func).map_err(|e| {
55 Error::ThreadJoinError(format!("spawning thread {}: {:#?}", name, e))
56 })?,
57 );
58 Ok(())
59 }
60
61 pub fn join(&mut self) -> Result<T> {
65 let handle = self
66 .handles
67 .pop_front()
68 .ok_or(Error::ThreadGroupError(format!(
69 "no threads in group {}",
70 &self
71 )))?;
72
73 let id = thread_id(&handle.thread());
74
75 let end = match handle.join() {
76 Ok(t) => Ok(t),
77 Err(e) => {
78 let e = Error::ThreadJoinError(format!("joining thread {}: {:#?}", id, e));
79 self.errors.insert(id, e.clone());
80 Err(e)
81 }
82 };
83 self.count -= 1;
84 end
85 }
86
87 pub fn results(&mut self) -> Vec<Result<T>> {
90 let mut val = Vec::<Result<T>>::new();
91 while !self.handles.is_empty() {
92 val.push(self.join());
93 }
94 val
95 }
96
97 pub fn as_far_as_ok(&mut self) -> Vec<T> {
100 let mut val = Vec::<T>::new();
101 while !self.handles.is_empty() {
102 if let Ok(g) = self.join() {
103 val.push(g)
104 }
105 }
106 val
107 }
108
109 pub fn all_ok(&mut self) -> Result<Vec<T>> {
112 let mut val = Vec::<T>::new();
113 while !self.handles.is_empty() {
114 val.push(self.join()?);
115 }
116 Ok(val)
117 }
118
119 pub fn errors(&self) -> BTreeMap<String, Error> {
121 self.errors.clone()
122 }
123}
124
125impl<T> std::fmt::Display for ThreadGroup<T> {
126 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
127 write!(f, "{}::ThreadGroup {}", module_path!(), &self.id)
128 }
129}
130
131impl<T: Send + Sync + 'static> Default for ThreadGroup<T> {
132 fn default() -> ThreadGroup<T> {
133 Self::new()
134 }
135}
136
137#[derive(Debug, Clone, PartialEq, Eq)]
138pub enum Error {
139 ThreadGroupError(String),
140 ThreadJoinError(String),
141}
142
143impl Display for Error {
144 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
145 write!(
146 f,
147 "{}{}",
148 self.prefix().unwrap_or_default(),
149 match self {
150 Self::ThreadGroupError(s) => format!("{}", s),
151 Self::ThreadJoinError(s) => format!("{}", s),
152 }
153 )
154 }
155}
156
157impl Error {
158 pub fn variant(&self) -> String {
159 match self {
160 Error::ThreadGroupError(_) => "ThreadGroupError",
161 Error::ThreadJoinError(_) => "ThreadJoinError",
162 }
163 .to_string()
164 }
165
166 fn prefix(&self) -> Option<String> {
167 match self {
168 _ => Some(format!("{}: ", self.variant())),
169 }
170 }
171}
172
173impl std::error::Error for Error {}
174
175pub type Result<T> = std::result::Result<T, Error>;