use std::collections::{BTreeMap, VecDeque};
use std::fmt::Display;
use std::thread::{Builder, JoinHandle, Thread};
pub fn thread_id(thread: &Thread) -> String {
format!(
"{}:{}",
std::process::id(),
thread
.name()
.map(|a| a.to_string())
.unwrap_or_else(|| format!("{:#?}", thread.id()))
.to_string()
)
}
pub struct ThreadGroup<T> {
id: String,
handles: VecDeque<JoinHandle<T>>,
count: usize,
errors: BTreeMap<String, Error>,
}
impl<T: Send + Sync + 'static> ThreadGroup<T> {
pub fn new() -> ThreadGroup<T> {
ThreadGroup::with_id(thread_id(&std::thread::current()))
}
pub fn with_id(id: String) -> ThreadGroup<T> {
ThreadGroup {
id,
handles: VecDeque::new(),
errors: BTreeMap::new(),
count: 0,
}
}
pub fn spawn<F: FnOnce() -> T + Send + 'static>(&mut self, func: F) -> Result<()> {
self.count += 1;
let name = format!("{}:{}", &self.id, self.count);
self.handles.push_back(
Builder::new().name(name.clone()).spawn(func).map_err(|e| {
Error::ThreadSpawnError(format!("spawning thread {}: {:#?}", name, e))
})?,
);
Ok(())
}
pub fn join(&mut self) -> Result<T> {
let handle = self
.handles
.pop_front()
.ok_or(Error::ThreadGroupError(format!("no threads in group {}", &self)))?;
let id = thread_id(&handle.thread());
let end = match handle.join() {
Ok(t) => Ok(t),
Err(e) => {
let e = Error::ThreadJoinError(format!("joining thread {}: {:#?}", id, e));
self.errors.insert(id, e.clone());
Err(e)
},
};
self.count -= 1;
end
}
pub fn results(&mut self) -> Vec<Result<T>> {
let mut val = Vec::<Result<T>>::new();
while !self.handles.is_empty() {
val.push(self.join());
}
val
}
pub fn as_far_as_ok(&mut self) -> Vec<T> {
let mut val = Vec::<T>::new();
while !self.handles.is_empty() {
if let Ok(g) = self.join() {
val.push(g)
}
}
val
}
pub fn all_ok(&mut self) -> Result<Vec<T>> {
let mut val = Vec::<T>::new();
while !self.handles.is_empty() {
val.push(self.join()?);
}
Ok(val)
}
pub fn errors(&self) -> BTreeMap<String, Error> {
self.errors.clone()
}
}
impl<T> std::fmt::Display for ThreadGroup<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}::ThreadGroup {}", module_path!(), &self.id)
}
}
impl<T: Send + Sync + 'static> Default for ThreadGroup<T> {
fn default() -> ThreadGroup<T> {
Self::new()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Error {
ThreadGroupError(String),
ThreadJoinError(String),
ThreadSpawnError(String),
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(
f,
"{}{}",
self.prefix().unwrap_or_default(),
match self {
Self::ThreadGroupError(s)
| Self::ThreadJoinError(s)
| Self::ThreadSpawnError(s) => format!("{}", s),
}
)
}
}
impl Error {
pub fn variant(&self) -> String {
match self {
Error::ThreadGroupError(_) => "ThreadGroupError",
Error::ThreadJoinError(_) => "ThreadJoinError",
Error::ThreadSpawnError(_) => "ThreadSpawnError",
}
.to_string()
}
fn prefix(&self) -> Option<String> {
match self {
_ => Some(format!("{}: ", self.variant())),
}
}
}
impl std::error::Error for Error {}
pub type Result<T> = std::result::Result<T, Error>;