use crate::channel;
use crate::prelude::*;
use crate::string::SharedString;
use crate::task::{self, Task};
use fnv::FnvHashMap;
pub type Index = usize;
pub struct Join<T> {
children: FnvHashMap<Index, Child>,
next_index: Index,
rx: channel::Receiver<Stopped<T>>,
tx: channel::Sender<Stopped<T>>,
}
struct Child {
name: SharedString,
_monitor: Task<()>,
}
struct Stopped<T> {
index: usize,
result: Result<T, Panic>,
}
impl<T> Join<T>
where
T: Send + 'static,
{
pub fn new() -> Self {
let (tx, rx) = channel::unbounded();
Self { children: default(), next_index: 0, rx, tx }
}
pub fn add(&mut self, task: impl task::Start<T>) -> Index {
self.add_as("", task)
}
pub fn add_as(&mut self, name: impl Into<SharedString>, task: impl task::Start<T>) -> Index {
let index = self.next_index;
self.next_index += 1;
let task = task.start();
let tx = self.tx.clone();
let _monitor = task::start(async move {
let result = task.join().await;
tx.send(Stopped { index, result }).await.ok();
});
self.children.insert(index, Child { name: name.into(), _monitor });
index
}
pub async fn next(&mut self) -> Option<StoppedTask<Result<T, task::Panic>>> {
if self.children.is_empty() {
return None;
}
let Stopped { index, result } = self.rx.recv().await.ok()?;
let child = self.children.remove(&index).expect("Received result from unknown child.");
Some(StoppedTask { index, name: child.name, result })
}
pub async fn try_next(&mut self) -> Option<Result<StoppedTask<T>, PanickedTask>> {
if self.children.is_empty() {
return None;
}
let Stopped { index, result } = self.rx.recv().await.ok()?;
let child = self.children.remove(&index).expect("Received result from unknown child.");
Some(match result {
Ok(result) => Ok(StoppedTask { index, name: child.name, result }),
Err(panic) => Err(PanickedTask { index, name: child.name, panic }),
})
}
pub async fn drain(&mut self) {
while self.next().await.is_some() {}
}
pub async fn try_drain(&mut self) -> Result<(), PanickedTask> {
while self.try_next().await.transpose()?.is_some() {}
Ok(())
}
}
impl<T> Default for Join<T>
where
T: Send + 'static,
{
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct StoppedTask<T> {
pub index: Index,
pub name: SharedString,
pub result: T,
}
#[derive(Debug, Error)]
pub struct PanickedTask {
pub index: Index,
pub name: SharedString,
pub panic: task::Panic,
}
impl Display for PanickedTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.name.as_str() {
"" => write!(f, "Task #{} ", self.index)?,
name => write!(f, "Task `{}`", name)?,
}
write!(f, "panicked")?;
if let Some(value) = self.panic.value_str() {
write!(f, " with `{}`", value)?;
}
write!(f, ".")
}
}