1use crate::channel;
10use crate::prelude::*;
11use crate::string::SharedString;
12use crate::task::{self, Task};
13use fnv::FnvHashMap;
14
15pub type Index = usize;
17
18pub struct Join<T> {
20 children: FnvHashMap<Index, Child>,
21 next_index: Index,
22 rx: channel::Receiver<Stopped<T>>,
23 tx: channel::Sender<Stopped<T>>,
24}
25
26struct Child {
28 name: SharedString,
29 _monitor: Task<()>,
30}
31
32struct Stopped<T> {
34 index: usize,
35 result: Result<T, Panic>,
36}
37
38impl<T> Join<T>
39where
40 T: Send + 'static,
41{
42 pub fn new() -> Self {
44 let (tx, rx) = channel::unbounded();
45
46 Self { children: default(), next_index: 0, rx, tx }
47 }
48
49 pub fn add(&mut self, task: impl task::Start<T>) -> Index {
51 self.add_as("", task)
52 }
53
54 pub fn add_as(&mut self, name: impl Into<SharedString>, task: impl task::Start<T>) -> Index {
56 let index = self.next_index;
59
60 self.next_index += 1;
61
62 let task = task.start();
65
66 let tx = self.tx.clone();
70
71 let _monitor = task::start(async move {
72 let result = task.join().await;
73
74 tx.send(Stopped { index, result }).await.ok();
75 });
76
77 self.children.insert(index, Child { name: name.into(), _monitor });
78
79 index
80 }
81
82 pub async fn next(&mut self) -> Option<StoppedTask<Result<T, task::Panic>>> {
86 if self.children.is_empty() {
87 return None;
88 }
89
90 let Stopped { index, result } = self.rx.recv().await.ok()?;
91 let child = self.children.remove(&index).expect("Received result from unknown child.");
92
93 Some(StoppedTask { index, name: child.name, result })
94 }
95
96 pub async fn try_next(&mut self) -> Option<Result<StoppedTask<T>, PanickedTask>> {
101 if self.children.is_empty() {
102 return None;
103 }
104
105 let Stopped { index, result } = self.rx.recv().await.ok()?;
106 let child = self.children.remove(&index).expect("Received result from unknown child.");
107
108 Some(match result {
109 Ok(result) => Ok(StoppedTask { index, name: child.name, result }),
110 Err(panic) => Err(PanickedTask { index, name: child.name, panic }),
111 })
112 }
113
114 pub async fn drain(&mut self) {
116 while self.next().await.is_some() {}
117 }
118
119 pub async fn try_drain(&mut self) -> Result<(), PanickedTask> {
121 while self.try_next().await.transpose()?.is_some() {}
122
123 Ok(())
124 }
125}
126
127impl<T> Default for Join<T>
128where
129 T: Send + 'static,
130{
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136#[derive(Debug)]
138pub struct StoppedTask<T> {
139 pub index: Index,
141 pub name: SharedString,
143 pub result: T,
145}
146
147#[derive(Debug, Error)]
149pub struct PanickedTask {
150 pub index: Index,
152 pub name: SharedString,
154 pub panic: task::Panic,
156}
157
158impl Display for PanickedTask {
159 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160 match self.name.as_str() {
161 "" => write!(f, "Task #{} ", self.index)?,
162 name => write!(f, "Task `{}`", name)?,
163 }
164
165 write!(f, "panicked")?;
166
167 if let Some(value) = self.panic.value_str() {
168 write!(f, " with `{}`", value)?;
169 }
170
171 write!(f, ".")
172 }
173}