spawn_groups/
spawn_group.rs

1use crate::async_stream::stream::{AsyncIterator, AsyncStream};
2use crate::shared::{
3    initializible::Initializible, priority::Priority, runtime::RuntimeEngine, sharedfuncs::Shared,
4    wait::Waitable,
5};
6use async_trait::async_trait;
7use futures_lite::{Stream, StreamExt};
8use std::{
9    future::Future,
10    ops::{Deref, DerefMut},
11    pin::Pin,
12};
13
14/// Spawn Group
15///
16/// A kind of a spawn group that spawns asynchronous child tasks that returns a value of ValueType,
17/// that implicitly wait for the spawned tasks to return before being dropped unless by
18/// explicitly calling ``dont_wait_at_drop()``
19///
20/// Child tasks are spawned by calling either ``spawn_task()`` or ``spawn_task_unless_cancelled()`` methods.
21///
22/// Running child tasks can be cancelled by calling ``cancel_all()`` method.
23///
24/// Child tasks spawned to a spawn group execute concurrently, and may be scheduled in
25/// any order.
26///
27/// It dereferences into a ``futures`` crate ``Stream`` type where the results of each finished child task is stored and it pops out the result in First-In First-Out
28/// FIFO order whenever it is being used
29///
30pub struct SpawnGroup<ValueType: Send + 'static> {
31    /// A field that indicates if the spawn group had been cancelled
32    pub is_cancelled: bool,
33    wait_at_drop: bool,
34    count: Box<usize>,
35    runtime: RuntimeEngine<ValueType>,
36}
37
38impl<ValueType: Send> SpawnGroup<ValueType> {
39    pub(crate) fn new() -> Self {
40        Self::init()
41    }
42}
43
44impl<ValueType: Send> SpawnGroup<ValueType> {
45    /// Don't implicity wait for spawned child tasks to finish before being dropped
46    pub fn dont_wait_at_drop(&mut self) {
47        self.wait_at_drop = false;
48    }
49}
50
51impl<ValueType: Send + 'static> SpawnGroup<ValueType> {
52    /// Spawns a new task into the spawn group
53    /// # Parameters
54    ///
55    /// * `priority`: priority to use
56    /// * `closure`: an async closure that return a value of type ``ValueType``
57    pub fn spawn_task<F>(&mut self, priority: Priority, closure: F)
58    where
59        F: Future<Output = <SpawnGroup<ValueType> as Shared>::Result> + Send + 'static,
60    {
61        self.add_task(priority, closure);
62    }
63
64    /// Spawn a new task only if the group is not cancelled yet,
65    /// otherwise does nothing
66    ///
67    /// # Parameters
68    ///
69    /// * `priority`: priority to use
70    /// * `closure`: an async closure that return a value of type ``ValueType``
71    pub fn spawn_task_unlessed_cancelled<F>(&mut self, priority: Priority, closure: F)
72    where
73        F: Future<Output = <SpawnGroup<ValueType> as Shared>::Result> + Send + 'static,
74    {
75        self.add_task_unlessed_cancelled(priority, closure);
76    }
77
78    /// Cancels all running task in the spawn group
79    pub fn cancel_all(&mut self) {
80        self.cancel_all_tasks();
81    }
82}
83
84impl<ValueType: Send> SpawnGroup<ValueType> {
85    /// Returns the first element of the stream, or None if it is empty.
86    pub async fn first(&self) -> Option<ValueType> {
87        self.runtime.stream.first().await
88    }
89}
90
91impl<ValueType: Send> SpawnGroup<ValueType> {
92    /// Waits for all remaining child tasks for finish.
93    pub async fn wait_for_all(&mut self) {
94        self.wait().await;
95    }
96}
97
98impl<ValueType: Send> SpawnGroup<ValueType> {
99    /// Waits for a specific number of spawned child tasks to finish and returns their respectively result as a vector  
100    ///
101    /// # Panics
102    /// If the `of_count` parameter is larger than the number of already spawned child tasks, this method panics
103    /// 
104    /// Remember whenever you call either ``wait_for_all`` or ``cancel_all`` methods, the child tasks' count reverts back to zero
105    ///
106    /// # Parameter
107    /// * `of_count`: The number of running child tasks to wait for their results to return
108    ///
109    /// # Returns
110    /// Returns a vector of length `of_count` elements from the spawn group instance
111    pub async fn get_chunks(&self, of_count: usize) -> Vec<ValueType> {
112        if of_count == 0 {
113            return vec![];
114        }
115        let buffer_count = self.runtime.stream.buffer_count().await;
116        if buffer_count == of_count {
117            let mut count = of_count;
118            let mut results = vec![];
119            while count != 0 {
120                if let Some(result) = self.runtime.stream.clone().next().await {
121                    results.push(result);
122                    count -= 1;
123                }
124            }
125            return results;
126        }
127        if of_count > *self.count {
128            panic!("The argument supplied cannot be greater than the number of spawned child tasks")
129        }
130        let mut count = of_count;
131        let mut results = vec![];
132        while count != 0 {
133            if let Some(result) = self.runtime.stream.clone().next().await {
134                results.push(result);
135                count -= 1;
136            }
137        }
138        results
139    }
140}
141
142impl<ValueType: Send> SpawnGroup<ValueType> {
143    /// A Boolean value that indicates whether the group has any remaining tasks.
144    ///
145    /// At the start of the body of a ``with_spawn_group()`` call, , or before calling ``spawn_task`` or ``spawn_task_unless_cancelled`` methods
146    /// the spawn group is always empty.
147    ///  
148    /// # Returns
149    /// - true: if there's no child task still running
150    /// - false: if any child task is still running
151    pub fn is_empty(&self) -> bool {
152        if *self.count == 0 || self.runtime.stream.clone().task_count() == 0 {
153            return true;
154        }
155        false
156    }
157}
158
159impl<ValueType: Send> Clone for SpawnGroup<ValueType> {
160    fn clone(&self) -> Self {
161        Self {
162            runtime: self.runtime.clone(),
163            is_cancelled: self.is_cancelled,
164            count: self.count.clone(),
165            wait_at_drop: self.wait_at_drop,
166        }
167    }
168}
169
170impl<ValueType: Send + 'static> Deref for SpawnGroup<ValueType> {
171    type Target = AsyncIterator<ValueType>;
172    fn deref(&self) -> &Self::Target {
173        self
174    }
175}
176
177impl<ValueType: Send + 'static> DerefMut for SpawnGroup<ValueType> {
178    fn deref_mut(&mut self) -> &mut Self::Target {
179        self
180    }
181}
182
183impl<ValueType: Send> Drop for SpawnGroup<ValueType> {
184    fn drop(&mut self) {
185        futures_lite::future::block_on(async move {
186            if self.wait_at_drop {
187                self.wait_for_all().await;
188            }
189        });
190    }
191}
192
193impl<ValueType: Send> Initializible for SpawnGroup<ValueType> {
194    fn init() -> Self {
195        SpawnGroup {
196            runtime: RuntimeEngine::init(),
197            is_cancelled: false,
198            count: Box::new(0),
199            wait_at_drop: true,
200        }
201    }
202}
203
204impl<ValueType: Send + 'static> Shared for SpawnGroup<ValueType> {
205    type Result = ValueType;
206
207    fn add_task<F>(&mut self, priority: Priority, closure: F)
208    where
209        F: Future<Output = Self::Result> + Send + 'static,
210    {
211        *self.count += 1;
212        self.runtime.write_task(priority, closure);
213    }
214
215    fn cancel_all_tasks(&mut self) {
216        self.runtime.cancel();
217        self.is_cancelled = true;
218        *self.count = 0;
219    }
220
221    fn add_task_unlessed_cancelled<F>(&mut self, priority: Priority, closure: F)
222    where
223        F: Future<Output = Self::Result> + Send + 'static,
224    {
225        if !self.is_cancelled {
226            self.add_task(priority, closure)
227        }
228    }
229}
230
231impl<ValueType: Send> Stream for SpawnGroup<ValueType> {
232    type Item = ValueType;
233
234    fn poll_next(
235        mut self: std::pin::Pin<&mut Self>,
236        cx: &mut std::task::Context<'_>,
237    ) -> std::task::Poll<Option<Self::Item>> {
238        let pinned_stream = Pin::new(&mut self.runtime.stream);
239        <AsyncStream<Self::Item> as Stream>::poll_next(pinned_stream, cx)
240    }
241}
242
243#[async_trait]
244impl<ValueType: Send + 'static> Waitable for SpawnGroup<ValueType> {
245    async fn wait(&mut self) {
246        self.runtime.wait_for_all_tasks().await;
247        *self.count = 0;
248    }
249}