af_core/task/
try_join.rs

1// Copyright © 2021 Alexandra Frydl
2//
3// This Source Code Form is subject to the terms of the Mozilla Public
4// License, v. 2.0. If a copy of the MPL was not distributed with this
5// file, You can obtain one at http://mozilla.org/MPL/2.0/.
6
7//! Wait for multiple fallible tasks concurrently.
8
9use crate::prelude::*;
10use crate::string::SharedString;
11use crate::task;
12
13/// The index of a [`Join`] task.
14pub type Index = usize;
15
16/// Concurrently waits for the results of multiple tasks that may return an
17/// error.
18#[derive(Deref, DerefMut)]
19pub struct TryJoin<T, E> {
20  #[deref]
21  #[deref_mut]
22  join: task::Join<Result<T, E>>,
23}
24
25impl<T, E> TryJoin<T, E>
26where
27  T: Send + 'static,
28  E: From<task::Panic> + Send + 'static,
29{
30  /// Creates an empty join.
31  pub fn new() -> Self {
32    Self { join: task::Join::new() }
33  }
34
35  /// Adds a task to the join, returning its index.
36  pub fn add(&mut self, task: impl task::Start<Result<T, E>>) -> Index {
37    self.join.add(task)
38  }
39
40  /// Adds a named task to the join, returning its index.
41  pub fn add_as(
42    &mut self,
43    name: impl Into<SharedString>,
44    task: impl task::Start<Result<T, E>>,
45  ) -> Index {
46    self.join.add_as(name, task)
47  }
48
49  /// Waits for the next stopped task.
50  ///
51  /// If all tasks have stopped, this function returns `None`.
52  pub async fn next(&mut self) -> Option<StoppedTask<T, E>> {
53    let task = self.join.next().await?;
54
55    Some(StoppedTask {
56      index: task.index,
57      name: task.name,
58      result: task.result.map_err(E::from).and_then(|res| res),
59    })
60  }
61
62  /// Waits for the next stopped task and returns its information as a
63  /// [`Result`].
64  ///
65  /// If all tasks have stopped, this function returns `None`.
66  pub async fn try_next(&mut self) -> Option<Result<FinishedTask<T>, FailedTask<E>>> {
67    let task = self.next().await?;
68
69    Some(match task.result {
70      Ok(output) => Ok(FinishedTask { index: task.index, name: task.name, output }),
71      Err(error) => Err(FailedTask { index: task.index, name: task.name, error }),
72    })
73  }
74
75  /// Waits for all tasks to stop, dropping their results.
76  pub async fn drain(&mut self) {
77    self.join.drain().await
78  }
79
80  /// Waits for all tasks to stop, dropping their results, until a task fails.
81  pub async fn try_drain(&mut self) -> Result<(), FailedTask<E>> {
82    while self.try_next().await.transpose()?.is_some() {}
83
84    Ok(())
85  }
86}
87
88impl<T, E> Default for TryJoin<T, E>
89where
90  T: Send + 'static,
91  E: From<Panic> + Send + 'static,
92{
93  fn default() -> Self {
94    Self::new()
95  }
96}
97
98/// Information about a stopped task.
99#[derive(Debug)]
100pub struct StoppedTask<T, E> {
101  /// The index of the task.
102  pub index: Index,
103  /// The name of the task, if any.
104  pub name: SharedString,
105  /// The result of the task.
106  pub result: Result<T, E>,
107}
108
109/// Information about a finished task.
110#[derive(Debug)]
111pub struct FinishedTask<T> {
112  /// The index of the task.
113  pub index: Index,
114  /// The name of the task, if any.
115  pub name: SharedString,
116  /// The output of the task.
117  pub output: T,
118}
119
120/// Information about a failed task.
121#[derive(Debug)]
122pub struct FailedTask<E> {
123  /// The index of the task.
124  pub index: Index,
125  /// The name of the task, if any.
126  pub name: SharedString,
127  /// The error of the task.
128  pub error: E,
129}
130
131impl<E> Display for FailedTask<E>
132where
133  E: Display,
134{
135  fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
136    match self.name.as_str() {
137      "" => write!(f, "Task #{} ", self.index)?,
138      name => write!(f, "Task `{}`", name)?,
139    }
140
141    write!(f, "failed. {}", self.error)
142  }
143}
144
145impl<E> Error for FailedTask<E> where E: Debug + Display {}