thread_groups/
thread_groups.rs1use std::collections::{BTreeMap, VecDeque};
8use std::fmt::Display;
9use std::thread::{Builder, JoinHandle, Thread};
10
11pub fn thread_id(thread: &Thread) -> String {
13 format!(
14 "{}:{}",
15 std::process::id(),
16 thread
17 .name()
18 .map(|a| a.to_string())
19 .unwrap_or_else(|| format!("{:#?}", thread.id()))
20 .to_string()
21 )
22}
23
24pub struct ThreadGroup<T> {
27 id: String,
28 handles: VecDeque<JoinHandle<T>>,
29 count: usize,
30 errors: BTreeMap<String, Error>,
31}
32impl<T: Send + Sync + 'static> ThreadGroup<T> {
33 pub fn new() -> ThreadGroup<T> {
35 ThreadGroup::with_id(thread_id(&std::thread::current()))
36 }
37
38 pub fn with_id(id: String) -> ThreadGroup<T> {
40 ThreadGroup {
41 id,
42 handles: VecDeque::new(),
43 errors: BTreeMap::new(),
44 count: 0,
45 }
46 }
47
48 pub fn spawn<F: FnOnce() -> T + Send + 'static>(&mut self, func: F) -> Result<()> {
50 self.count += 1;
51 let name = format!("{}:{}", &self.id, self.count);
52 self.handles.push_back(
53 Builder::new().name(name.clone()).spawn(func).map_err(|e| {
54 Error::ThreadSpawnError(format!("spawning thread {}: {:#?}", name, e))
55 })?,
56 );
57 Ok(())
58 }
59
60 pub fn join(&mut self) -> Result<T> {
64 let handle = self
65 .handles
66 .pop_front()
67 .ok_or(Error::ThreadGroupError(format!("no threads in group {}", &self)))?;
68
69 let id = thread_id(&handle.thread());
70
71 let end = match handle.join() {
72 Ok(t) => Ok(t),
73 Err(e) => {
74 let e = Error::ThreadJoinError(format!("joining thread {}: {:#?}", id, e));
75 self.errors.insert(id, e.clone());
76 Err(e)
77 },
78 };
79 self.count -= 1;
80 end
81 }
82
83 pub fn results(&mut self) -> Vec<Result<T>> {
86 let mut val = Vec::<Result<T>>::new();
87 while !self.handles.is_empty() {
88 val.push(self.join());
89 }
90 val
91 }
92
93 pub fn as_far_as_ok(&mut self) -> Vec<T> {
96 let mut val = Vec::<T>::new();
97 while !self.handles.is_empty() {
98 if let Ok(g) = self.join() {
99 val.push(g)
100 }
101 }
102 val
103 }
104
105 pub fn all_ok(&mut self) -> Result<Vec<T>> {
108 let mut val = Vec::<T>::new();
109 while !self.handles.is_empty() {
110 val.push(self.join()?);
111 }
112 Ok(val)
113 }
114
115 pub fn errors(&self) -> BTreeMap<String, Error> {
117 self.errors.clone()
118 }
119}
120
121impl<T> std::fmt::Display for ThreadGroup<T> {
122 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
123 write!(f, "{}::ThreadGroup {}", module_path!(), &self.id)
124 }
125}
126
127impl<T: Send + Sync + 'static> Default for ThreadGroup<T> {
128 fn default() -> ThreadGroup<T> {
129 Self::new()
130 }
131}
132
133#[derive(Debug, Clone, PartialEq, Eq)]
134pub enum Error {
135 ThreadGroupError(String),
136 ThreadJoinError(String),
137 ThreadSpawnError(String),
138}
139
140impl Display for Error {
141 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
142 write!(
143 f,
144 "{}{}",
145 self.prefix().unwrap_or_default(),
146 match self {
147 Self::ThreadGroupError(s)
148 | Self::ThreadJoinError(s)
149 | Self::ThreadSpawnError(s) => format!("{}", s),
150 }
151 )
152 }
153}
154
155impl Error {
156 pub fn variant(&self) -> String {
157 match self {
158 Error::ThreadGroupError(_) => "ThreadGroupError",
159 Error::ThreadJoinError(_) => "ThreadJoinError",
160 Error::ThreadSpawnError(_) => "ThreadSpawnError",
161 }
162 .to_string()
163 }
164
165 fn prefix(&self) -> Option<String> {
166 match self {
167 _ => Some(format!("{}: ", self.variant())),
168 }
169 }
170}
171
172impl std::error::Error for Error {}
173
174pub type Result<T> = std::result::Result<T, Error>;