spawn_groups/spawn_group.rs
1use crate::shared::{priority::Priority, runtime::RuntimeEngine};
2use futures_lite::{Stream, StreamExt};
3use std::{
4 future::Future,
5 pin::Pin,
6 task::{Context, Poll},
7};
8
9/// Spawn Group
10///
11/// A kind of a spawn group that spawns asynchronous child tasks that returns a value of ValueType,
12/// that implicitly wait for the spawned tasks to return before being dropped unless by
13/// explicitly calling ``dont_wait_at_drop()``
14///
15/// Child tasks are spawned by calling either ``spawn_task()`` or ``spawn_task_unless_cancelled()`` methods.
16///
17/// Running child tasks can be cancelled by calling ``cancel_all()`` method.
18///
19/// Child tasks spawned to a spawn group execute concurrently, and may be scheduled in
20/// any order.
21///
22/// It dereferences into a ``futures`` crate ``Stream`` type where the results of each finished child task is stored and it pops out the result in First-In First-Out
23/// FIFO order whenever it is being used
24pub struct SpawnGroup<ValueType> {
25 runtime: RuntimeEngine<ValueType>,
26 /// A field that indicates if the spawn group had been cancelled
27 pub is_cancelled: bool,
28 wait_at_drop: bool,
29}
30
31impl<ValueType> SpawnGroup<ValueType> {
32 /// Instantiates `SpawnGroup` with a specific number of threads to use in the underlying threadpool when polling futures
33 ///
34 /// # Parameters
35 ///
36 /// * `num_of_threads`: number of threads to use
37 pub fn new(num_of_threads: usize) -> Self {
38 Self {
39 runtime: RuntimeEngine::new(num_of_threads),
40 is_cancelled: false,
41 wait_at_drop: true,
42 }
43 }
44}
45
46impl<ValueType> Default for SpawnGroup<ValueType> {
47 /// Instantiates `SpawnGroup` with the number of threads as the number of cores as the system to use in the underlying threadpool when polling futures
48 fn default() -> Self {
49 Self {
50 is_cancelled: false,
51 runtime: RuntimeEngine::default(),
52 wait_at_drop: true,
53 }
54 }
55}
56
57impl<ValueType> SpawnGroup<ValueType> {
58 /// Don't implicity wait for spawned child tasks to finish before being dropped
59 pub fn dont_wait_at_drop(&mut self) {
60 self.wait_at_drop = false;
61 }
62
63 /// Cancels all running task in the spawn group
64 pub fn cancel_all(&mut self) {
65 self.runtime.cancel();
66 self.is_cancelled = true;
67 }
68}
69
70impl<ValueType> SpawnGroup<ValueType> {
71 /// Spawns a new task into the spawn group
72 /// # Parameters
73 ///
74 /// * `priority`: priority to use
75 /// * `closure`: an async closure that return a value of type ``ValueType``
76 pub fn spawn_task(
77 &mut self,
78 priority: Priority,
79 closure: impl Future<Output = ValueType> + Send + 'static,
80 ) {
81 self.runtime.write_task(priority, closure);
82 }
83
84 /// Spawn a new task only if the group is not cancelled yet,
85 /// otherwise does nothing
86 ///
87 /// # Parameters
88 ///
89 /// * `priority`: priority to use
90 /// * `closure`: an async closure that return a value of type ``ValueType``
91 pub fn spawn_task_unlessed_cancelled(
92 &mut self,
93 priority: Priority,
94 closure: impl Future<Output = ValueType> + Send + 'static,
95 ) {
96 if !self.is_cancelled {
97 self.runtime.write_task(priority, closure)
98 }
99 }
100}
101
102impl<ValueType> SpawnGroup<ValueType> {
103 /// Returns the first element of the stream, or None if it is empty.
104 pub async fn first(&self) -> Option<ValueType> {
105 self.runtime.stream().first().await
106 }
107}
108
109impl<ValueType> SpawnGroup<ValueType> {
110 /// Waits for all remaining child tasks for finish.
111 pub async fn wait_for_all(&mut self) {
112 self.wait_non_async()
113 }
114
115 /// Waits for all remaining child tasks for finish in non async context.
116 pub fn wait_non_async(&mut self) {
117 self.runtime.wait_for_all_tasks();
118 }
119}
120
121impl<ValueType> SpawnGroup<ValueType> {
122 /// A Boolean value that indicates whether the group has any remaining tasks.
123 ///
124 /// At the start of the body of a ``with_spawn_group()`` call, , or before calling ``spawn_task`` or ``spawn_task_unless_cancelled`` methods
125 /// the spawn group is always empty.
126 ///
127 /// # Returns
128 /// - true: if there's no child task still running
129 /// - false: if any child task is still running
130 pub fn is_empty(&self) -> bool {
131 self.runtime.task_count() == 0
132 }
133}
134
135impl<ValueType> SpawnGroup<ValueType> {
136 /// Returns an instance of the `Stream` trait.
137 pub fn stream(&self) -> impl Stream<Item = ValueType> {
138 self.runtime.stream()
139 }
140}
141
142impl<ValueType> SpawnGroup<ValueType> {
143 /// Waits for a specific number of spawned child tasks to finish and returns their respectively result as a vector
144 ///
145 /// # Panics
146 /// If the `of_count` parameter is larger than the number of already spawned child tasks, this method panics
147 ///
148 /// Remember whenever you call either ``wait_for_all`` or ``cancel_all`` methods, the child tasks' count reverts back to zero
149 ///
150 /// # Parameter
151 /// * `of_count`: The number of running child tasks to wait for their results to return
152 ///
153 /// # Returns
154 /// Returns a vector of length `of_count` elements from the spawn group instance
155 #[deprecated(since = "2.0.0", note = "Buggy")]
156 pub async fn get_chunks(&self, of_count: usize) -> Vec<ValueType> {
157 if of_count == 0 {
158 return vec![];
159 }
160 let buffer_count = self.runtime.stream().buffer_count().await;
161 if buffer_count == of_count {
162 let mut count: usize = of_count;
163 let mut results: Vec<ValueType> = vec![];
164 while count != 0 {
165 if let Some(result) = self.runtime.stream().next().await {
166 results.push(result);
167 count -= 1;
168 }
169 }
170 return results;
171 }
172 if of_count > self.runtime.task_count() {
173 panic!("The argument supplied cannot be greater than the number of spawned child tasks")
174 }
175 let mut count: usize = of_count;
176 let mut results: Vec<ValueType> = vec![];
177 while count != 0 {
178 if let Some(result) = self.runtime.stream().next().await {
179 results.push(result);
180 count -= 1;
181 }
182 }
183 results
184 }
185}
186
187impl<ValueType> Drop for SpawnGroup<ValueType> {
188 fn drop(&mut self) {
189 if self.wait_at_drop {
190 self.runtime.wait_for_all_tasks();
191 }
192 self.runtime.end()
193 }
194}
195
196impl<ValueType> Stream for SpawnGroup<ValueType> {
197 type Item = ValueType;
198
199 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
200 self.runtime.stream().poll_next(cx)
201 }
202}