spawn_groups/
spawn_group.rs1use crate::async_stream::stream::{AsyncIterator, AsyncStream};
2use crate::shared::{
3 initializible::Initializible, priority::Priority, runtime::RuntimeEngine, sharedfuncs::Shared,
4 wait::Waitable,
5};
6use async_trait::async_trait;
7use futures_lite::{Stream, StreamExt};
8use std::{
9 future::Future,
10 ops::{Deref, DerefMut},
11 pin::Pin,
12};
13
14pub struct SpawnGroup<ValueType: Send + 'static> {
31 pub is_cancelled: bool,
33 wait_at_drop: bool,
34 count: Box<usize>,
35 runtime: RuntimeEngine<ValueType>,
36}
37
38impl<ValueType: Send> SpawnGroup<ValueType> {
39 pub(crate) fn new() -> Self {
40 Self::init()
41 }
42}
43
44impl<ValueType: Send> SpawnGroup<ValueType> {
45 pub fn dont_wait_at_drop(&mut self) {
47 self.wait_at_drop = false;
48 }
49}
50
51impl<ValueType: Send + 'static> SpawnGroup<ValueType> {
52 pub fn spawn_task<F>(&mut self, priority: Priority, closure: F)
58 where
59 F: Future<Output = <SpawnGroup<ValueType> as Shared>::Result> + Send + 'static,
60 {
61 self.add_task(priority, closure);
62 }
63
64 pub fn spawn_task_unlessed_cancelled<F>(&mut self, priority: Priority, closure: F)
72 where
73 F: Future<Output = <SpawnGroup<ValueType> as Shared>::Result> + Send + 'static,
74 {
75 self.add_task_unlessed_cancelled(priority, closure);
76 }
77
78 pub fn cancel_all(&mut self) {
80 self.cancel_all_tasks();
81 }
82}
83
84impl<ValueType: Send> SpawnGroup<ValueType> {
85 pub async fn first(&self) -> Option<ValueType> {
87 self.runtime.stream.first().await
88 }
89}
90
91impl<ValueType: Send> SpawnGroup<ValueType> {
92 pub async fn wait_for_all(&mut self) {
94 self.wait().await;
95 }
96}
97
98impl<ValueType: Send> SpawnGroup<ValueType> {
99 pub async fn get_chunks(&self, of_count: usize) -> Vec<ValueType> {
112 if of_count == 0 {
113 return vec![];
114 }
115 let buffer_count = self.runtime.stream.buffer_count().await;
116 if buffer_count == of_count {
117 let mut count = of_count;
118 let mut results = vec![];
119 while count != 0 {
120 if let Some(result) = self.runtime.stream.clone().next().await {
121 results.push(result);
122 count -= 1;
123 }
124 }
125 return results;
126 }
127 if of_count > *self.count {
128 panic!("The argument supplied cannot be greater than the number of spawned child tasks")
129 }
130 let mut count = of_count;
131 let mut results = vec![];
132 while count != 0 {
133 if let Some(result) = self.runtime.stream.clone().next().await {
134 results.push(result);
135 count -= 1;
136 }
137 }
138 results
139 }
140}
141
142impl<ValueType: Send> SpawnGroup<ValueType> {
143 pub fn is_empty(&self) -> bool {
152 if *self.count == 0 || self.runtime.stream.clone().task_count() == 0 {
153 return true;
154 }
155 false
156 }
157}
158
159impl<ValueType: Send> Clone for SpawnGroup<ValueType> {
160 fn clone(&self) -> Self {
161 Self {
162 runtime: self.runtime.clone(),
163 is_cancelled: self.is_cancelled,
164 count: self.count.clone(),
165 wait_at_drop: self.wait_at_drop,
166 }
167 }
168}
169
170impl<ValueType: Send + 'static> Deref for SpawnGroup<ValueType> {
171 type Target = AsyncIterator<ValueType>;
172 fn deref(&self) -> &Self::Target {
173 self
174 }
175}
176
177impl<ValueType: Send + 'static> DerefMut for SpawnGroup<ValueType> {
178 fn deref_mut(&mut self) -> &mut Self::Target {
179 self
180 }
181}
182
183impl<ValueType: Send> Drop for SpawnGroup<ValueType> {
184 fn drop(&mut self) {
185 futures_lite::future::block_on(async move {
186 if self.wait_at_drop {
187 self.wait_for_all().await;
188 }
189 });
190 }
191}
192
193impl<ValueType: Send> Initializible for SpawnGroup<ValueType> {
194 fn init() -> Self {
195 SpawnGroup {
196 runtime: RuntimeEngine::init(),
197 is_cancelled: false,
198 count: Box::new(0),
199 wait_at_drop: true,
200 }
201 }
202}
203
204impl<ValueType: Send + 'static> Shared for SpawnGroup<ValueType> {
205 type Result = ValueType;
206
207 fn add_task<F>(&mut self, priority: Priority, closure: F)
208 where
209 F: Future<Output = Self::Result> + Send + 'static,
210 {
211 *self.count += 1;
212 self.runtime.write_task(priority, closure);
213 }
214
215 fn cancel_all_tasks(&mut self) {
216 self.runtime.cancel();
217 self.is_cancelled = true;
218 *self.count = 0;
219 }
220
221 fn add_task_unlessed_cancelled<F>(&mut self, priority: Priority, closure: F)
222 where
223 F: Future<Output = Self::Result> + Send + 'static,
224 {
225 if !self.is_cancelled {
226 self.add_task(priority, closure)
227 }
228 }
229}
230
231impl<ValueType: Send> Stream for SpawnGroup<ValueType> {
232 type Item = ValueType;
233
234 fn poll_next(
235 mut self: std::pin::Pin<&mut Self>,
236 cx: &mut std::task::Context<'_>,
237 ) -> std::task::Poll<Option<Self::Item>> {
238 let pinned_stream = Pin::new(&mut self.runtime.stream);
239 <AsyncStream<Self::Item> as Stream>::poll_next(pinned_stream, cx)
240 }
241}
242
243#[async_trait]
244impl<ValueType: Send + 'static> Waitable for SpawnGroup<ValueType> {
245 async fn wait(&mut self) {
246 self.runtime.wait_for_all_tasks().await;
247 *self.count = 0;
248 }
249}