af_core/task/
join.rs

1// Copyright © 2021 Alexandra Frydl
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7//! Wait for multiple tasks concurrently.
8
9use crate::channel;
10use crate::prelude::*;
11use crate::string::SharedString;
12use crate::task::{self, Task};
13use fnv::FnvHashMap;
14
15/// The index of a [`Join`] task.
16pub type Index = usize;
17
18/// Concurrently waits for the results of multiple tasks.
19pub struct Join<T> {
20  children: FnvHashMap<Index, Child>,
21  next_index: Index,
22  rx: channel::Receiver<Stopped<T>>,
23  tx: channel::Sender<Stopped<T>>,
24}
25
26/// A task in a [`Join`].
27struct Child {
28  name: SharedString,
29  _monitor: Task<()>,
30}
31
32/// A message sent from a task monitor.
33struct Stopped<T> {
34  index: usize,
35  result: Result<T, Panic>,
36}
37
38impl<T> Join<T>
39where
40  T: Send + 'static,
41{
42  /// Creates an empty join.
43  pub fn new() -> Self {
44    let (tx, rx) = channel::unbounded();
45
46    Self { children: default(), next_index: 0, rx, tx }
47  }
48
49  /// Adds a task to the join, returning its index.
50  pub fn add(&mut self, task: impl task::Start<T>) -> Index {
51    self.add_as("", task)
52  }
53
54  /// Adds a named task to the join, returning its index.
55  pub fn add_as(&mut self, name: impl Into<SharedString>, task: impl task::Start<T>) -> Index {
56    // Get next index.
57
58    let index = self.next_index;
59
60    self.next_index += 1;
61
62    // Start the task.
63
64    let task = task.start();
65
66    // Start a task to monitor when this task stops and send its result on the
67    // channel.
68
69    let tx = self.tx.clone();
70
71    let _monitor = task::start(async move {
72      let result = task.join().await;
73
74      tx.send(Stopped { index, result }).await.ok();
75    });
76
77    self.children.insert(index, Child { name: name.into(), _monitor });
78
79    index
80  }
81
82  /// Waits for the next stopped task.
83  ///
84  /// If all tasks have stopped, this function returns `None`.
85  pub async fn next(&mut self) -> Option<StoppedTask<Result<T, task::Panic>>> {
86    if self.children.is_empty() {
87      return None;
88    }
89
90    let Stopped { index, result } = self.rx.recv().await.ok()?;
91    let child = self.children.remove(&index).expect("Received result from unknown child.");
92
93    Some(StoppedTask { index, name: child.name, result })
94  }
95
96  /// Waits for the next stopped task and returns its information as a
97  /// [`Result`].
98  ///
99  /// If all tasks have stopped, this function returns `None`.
100  pub async fn try_next(&mut self) -> Option<Result<StoppedTask<T>, PanickedTask>> {
101    if self.children.is_empty() {
102      return None;
103    }
104
105    let Stopped { index, result } = self.rx.recv().await.ok()?;
106    let child = self.children.remove(&index).expect("Received result from unknown child.");
107
108    Some(match result {
109      Ok(result) => Ok(StoppedTask { index, name: child.name, result }),
110      Err(panic) => Err(PanickedTask { index, name: child.name, panic }),
111    })
112  }
113
114  /// Waits for all tasks to stop, dropping their results.
115  pub async fn drain(&mut self) {
116    while self.next().await.is_some() {}
117  }
118
119  /// Waits for all tasks to stop, dropping their results, until a task panics.
120  pub async fn try_drain(&mut self) -> Result<(), PanickedTask> {
121    while self.try_next().await.transpose()?.is_some() {}
122
123    Ok(())
124  }
125}
126
127impl<T> Default for Join<T>
128where
129  T: Send + 'static,
130{
131  fn default() -> Self {
132    Self::new()
133  }
134}
135
136/// Information about a stopped task.
137#[derive(Debug)]
138pub struct StoppedTask<T> {
139  /// The index of the task.
140  pub index: Index,
141  /// The name of the task, if any.
142  pub name: SharedString,
143  /// The result of the task.
144  pub result: T,
145}
146
147/// Information about a stopped task.
148#[derive(Debug, Error)]
149pub struct PanickedTask {
150  /// The index of the task.
151  pub index: Index,
152  /// The name of the task, if any.
153  pub name: SharedString,
154  /// The panic from the task.
155  pub panic: task::Panic,
156}
157
158impl Display for PanickedTask {
159  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
160    match self.name.as_str() {
161      "" => write!(f, "Task #{} ", self.index)?,
162      name => write!(f, "Task `{}`", name)?,
163    }
164
165    write!(f, "panicked")?;
166
167    if let Some(value) = self.panic.value_str() {
168      write!(f, " with `{}`", value)?;
169    }
170
171    write!(f, ".")
172  }
173}