spawn_groups/
spawn_group.rs

1use crate::shared::{priority::Priority, runtime::RuntimeEngine};
2use futures_lite::{Stream, StreamExt};
3use std::{
4    future::Future,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9/// Spawn Group
10///
11/// A kind of a spawn group that spawns asynchronous child tasks that returns a value of ValueType,
12/// that implicitly wait for the spawned tasks to return before being dropped unless by
13/// explicitly calling ``dont_wait_at_drop()``
14///
15/// Child tasks are spawned by calling either ``spawn_task()`` or ``spawn_task_unless_cancelled()`` methods.
16///
17/// Running child tasks can be cancelled by calling ``cancel_all()`` method.
18///
19/// Child tasks spawned to a spawn group execute concurrently, and may be scheduled in
20/// any order.
21///
22/// It dereferences into a ``futures`` crate ``Stream`` type where the results of each finished child task is stored and it pops out the result in First-In First-Out
23/// FIFO order whenever it is being used
24pub struct SpawnGroup<ValueType> {
25    runtime: RuntimeEngine<ValueType>,
26    /// A field that indicates if the spawn group had been cancelled
27    pub is_cancelled: bool,
28    wait_at_drop: bool,
29}
30
31impl<ValueType> SpawnGroup<ValueType> {
32    /// Instantiates `SpawnGroup` with a specific number of threads to use in the underlying threadpool when polling futures
33    ///
34    /// # Parameters
35    ///
36    /// * `num_of_threads`: number of threads to use
37    pub fn new(num_of_threads: usize) -> Self {
38        Self {
39            runtime: RuntimeEngine::new(num_of_threads),
40            is_cancelled: false,
41            wait_at_drop: true,
42        }
43    }
44}
45
46impl<ValueType> Default for SpawnGroup<ValueType> {
47    /// Instantiates `SpawnGroup` with the number of threads as the number of cores as the system to use in the underlying threadpool when polling futures
48    fn default() -> Self {
49        Self {
50            is_cancelled: false,
51            runtime: RuntimeEngine::default(),
52            wait_at_drop: true,
53        }
54    }
55}
56
57impl<ValueType> SpawnGroup<ValueType> {
58    /// Don't implicity wait for spawned child tasks to finish before being dropped
59    pub fn dont_wait_at_drop(&mut self) {
60        self.wait_at_drop = false;
61    }
62
63    /// Cancels all running task in the spawn group
64    pub fn cancel_all(&mut self) {
65        self.runtime.cancel();
66        self.is_cancelled = true;
67    }
68}
69
70impl<ValueType> SpawnGroup<ValueType> {
71    /// Spawns a new task into the spawn group
72    /// # Parameters
73    ///
74    /// * `priority`: priority to use
75    /// * `closure`: an async closure that return a value of type ``ValueType``
76    pub fn spawn_task(
77        &mut self,
78        priority: Priority,
79        closure: impl Future<Output = ValueType> + Send + 'static,
80    ) {
81        self.runtime.write_task(priority, closure);
82    }
83
84    /// Spawn a new task only if the group is not cancelled yet,
85    /// otherwise does nothing
86    ///
87    /// # Parameters
88    ///
89    /// * `priority`: priority to use
90    /// * `closure`: an async closure that return a value of type ``ValueType``
91    pub fn spawn_task_unlessed_cancelled(
92        &mut self,
93        priority: Priority,
94        closure: impl Future<Output = ValueType> + Send + 'static,
95    ) {
96        if !self.is_cancelled {
97            self.runtime.write_task(priority, closure)
98        }
99    }
100}
101
102impl<ValueType> SpawnGroup<ValueType> {
103    /// Returns the first element of the stream, or None if it is empty.
104    pub async fn first(&self) -> Option<ValueType> {
105        self.runtime.stream().first().await
106    }
107}
108
109impl<ValueType> SpawnGroup<ValueType> {
110    /// Waits for all remaining child tasks for finish.
111    pub async fn wait_for_all(&mut self) {
112        self.wait_non_async()
113    }
114
115    /// Waits for all remaining child tasks for finish in non async context.
116    pub fn wait_non_async(&mut self) {
117        self.runtime.wait_for_all_tasks();
118    }
119}
120
121impl<ValueType> SpawnGroup<ValueType> {
122    /// A Boolean value that indicates whether the group has any remaining tasks.
123    ///
124    /// At the start of the body of a ``with_spawn_group()`` call, , or before calling ``spawn_task`` or ``spawn_task_unless_cancelled`` methods
125    /// the spawn group is always empty.
126    ///  
127    /// # Returns
128    /// - true: if there's no child task still running
129    /// - false: if any child task is still running
130    pub fn is_empty(&self) -> bool {
131        self.runtime.task_count() == 0
132    }
133}
134
135impl<ValueType> SpawnGroup<ValueType> {
136    /// Returns an instance of the `Stream` trait.
137    pub fn stream(&self) -> impl Stream<Item = ValueType> {
138        self.runtime.stream()
139    }
140}
141
142impl<ValueType> SpawnGroup<ValueType> {
143    /// Waits for a specific number of spawned child tasks to finish and returns their respectively result as a vector  
144    ///
145    /// # Panics
146    /// If the `of_count` parameter is larger than the number of already spawned child tasks, this method panics
147    ///
148    /// Remember whenever you call either ``wait_for_all`` or ``cancel_all`` methods, the child tasks' count reverts back to zero
149    ///
150    /// # Parameter
151    /// * `of_count`: The number of running child tasks to wait for their results to return
152    ///
153    /// # Returns
154    /// Returns a vector of length `of_count` elements from the spawn group instance
155    #[deprecated(since = "2.0.0", note = "Buggy")]
156    pub async fn get_chunks(&self, of_count: usize) -> Vec<ValueType> {
157        if of_count == 0 {
158            return vec![];
159        }
160        let buffer_count = self.runtime.stream().buffer_count().await;
161        if buffer_count == of_count {
162            let mut count: usize = of_count;
163            let mut results: Vec<ValueType> = vec![];
164            while count != 0 {
165                if let Some(result) = self.runtime.stream().next().await {
166                    results.push(result);
167                    count -= 1;
168                }
169            }
170            return results;
171        }
172        if of_count > self.runtime.task_count() {
173            panic!("The argument supplied cannot be greater than the number of spawned child tasks")
174        }
175        let mut count: usize = of_count;
176        let mut results: Vec<ValueType> = vec![];
177        while count != 0 {
178            if let Some(result) = self.runtime.stream().next().await {
179                results.push(result);
180                count -= 1;
181            }
182        }
183        results
184    }
185}
186
187impl<ValueType> Drop for SpawnGroup<ValueType> {
188    fn drop(&mut self) {
189        if self.wait_at_drop {
190            self.runtime.wait_for_all_tasks();
191        }
192        self.runtime.end()
193    }
194}
195
196impl<ValueType> Stream for SpawnGroup<ValueType> {
197    type Item = ValueType;
198
199    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
200        self.runtime.stream().poll_next(cx)
201    }
202}