spawn_groups/
lib.rs

1//! A structured concurrency construct which provides a way to spawn and run an arbitrary number of child tasks,
2//! possibly await the results of each child task or even cancel all running child tasks.
3//! This was heavily influenced by the Swift language's [`TaskGroup`](https://developer.apple.com/documentation/swift/taskgroup).
4//!
5//! # Installation
6//! Add to your source code
7//!
8//! ```sh
9//! cargo add spawn_groups
10//! ```
11//!
12//! # Example
13//!
14//! ```ignore
15//! use async_std::stream::StreamExt;
16//! use spawn_groups::{with_err_spawn_group, GetType, Priority};
17//! use std::time::Instant;
18//! use surf::{Error, Client, http::Mime, StatusCode};
19//!
20//! async fn get_mimetype<AsStr: AsRef<str>>(url: AsStr, client: Client) -> Option<Mime> {
21//!     let Ok(resp) = client.get(url).send().await else {
22//!         return None;
23//!     };
24//!     resp.content_type()
25//! }
26//!
27//! #[async_std::main]
28//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
29//!     let client = surf::Client::new();
30//!     let urls = [
31//!         "https://www.google.com",
32//!         "https://www.bing.com",
33//!         "https://www.yandex.com",
34//!         "https://www.duckduckgo.com",
35//!         "https://www.wikipedia.org",
36//!         "https://www.whatsapp.com",
37//!         "https://www.yahoo.com",
38//!         "https://www.amazon.com",
39//!         "https://www.baidu.com",
40//!         "https://www.youtube.com",
41//!         "https://facebook.com",
42//!         "https://www.instagram.com",
43//!         "https://tiktok.com",
44//!     ];
45//!     with_err_spawn_group(move |mut group| async move {
46//!         println!("About to start");
47//!         let now = Instant::now();
48//!         for url in urls {
49//!             let client = client.clone();
50//!             group.spawn_task(Priority::default(), async move {
51//!                 let Some(mimetype) = get_mimetype(url, client).await else {
52//!                     return Err(Error::from_str(StatusCode::ExpectationFailed, format!("No content type found for {}", url)));
53//!                 }
54//!                 Ok(format!("{url}: {}", mimetype))
55//!             })
56//!         }
57//!
58//!         while let Some(result) = group.next().await {
59//!             if let Err(error) = result {
60//!                 eprintln!("{}", error);
61//!             } else {
62//!                 println!("{}", result.unwrap());
63//!             }
64//!         }
65//!         println("Ended");
66//!         println!("It took {} nanoseconds", now.elapsed().as_nanos());
67//!     })
68//!     .await;
69//!     Ok(())
70//! }
71//! ```
72//!
73//! # Usage
74//!
75//! To properly use this crate
76//! * ``with_spawn_group`` for the creation of a dynamic number of asynchronous tasks that return a value. See [`with_spawn_group`](self::with_spawn_group)
77//! for more information
78//!
79//! * ``with_type_spawn_group`` for the creation of a dynamic number of asynchronous tasks that return a value by specifying the type explicitly. See [`with_type_spawn_group`](self::with_type_spawn_group)
80//! for more information
81//!
82//! * ``with_err_spawn_group`` for the creation of a dynamic number of asynchronous tasks that return a value or an error.
83//! See [`with_err_spawn_group`](self::with_err_spawn_group)
84//! for more information
85//!
86//! * ``with_err_type_spawn_group`` for the creation of a dynamic number of asynchronous tasks that return a value or an error by specifiying the return type and the error type explicitly.
87//! See [`with_err_type_spawn_group`](self::with_err_type_spawn_group)
88//! for more information
89//!
90//! * ``with_discarding_spawn_group`` for the creation of a dynamic number of asynchronous tasks that returns nothing.
91//! See [`with_discarding_spawn_group`](self::with_discarding_spawn_group)
92//! for more information
93//!
94//! * ``block_on`` polls future to finish. See [`block_on`](self::block_on)
95//! for more information
96//!
97//! # Spawning Child Tasks
98//!
99//! Child tasks are spawned by calling either `spawn_task` or `spawn_task_unless_cancelled` methods on any of the spawn groups' instance.
100//!
101//! To avoid spawning new child tasks to an already cancelled spawn group, use ``spawn_task_unless_cancelled``
102//! rather than the plain ``spawn_task`` which spawns new child tasks unconditionally.
103//!
104//! # Child Task Execution Order
105//! Child tasks are scheduled in any order and spawned child tasks execute concurrently.
106//!  
107//! # Cancellation
108//!
109//! By calling explicitly calling the ``cancel_all`` method on any of the spawn groups' instance, all running child tasks
110//! are immediately cancelled.
111//!
112//! # Waiting
113//!
114//! By calling explicitly calling the ``wait_for_all_tasks`` method on any of the spawn groups' instance, all child tasks
115//! are immediately awaited for.
116//!
117//! # Stream
118//!
119//! Both [`SpawnGroup`](self::spawn_group::SpawnGroup) and [`ErrSpawnGroup`](self::err_spawn_group::ErrSpawnGroup) structs implements the ``futures_lite::Stream``
120//! which means that you can await the result of each child task asynchronously and with the help of ``StreamExt`` trait, one can call methods such as ``next``,
121//! ``map``, ``filter_map``, ``fold`` and so much more.
122//!
123//! ```rust
124//! use spawn_groups::with_spawn_group;
125//! use futures_lite::StreamExt;
126//! use spawn_groups::Priority;
127//! use spawn_groups::GetType;
128//!
129//! # spawn_groups::block_on(async move {
130//! with_spawn_group(|mut group| async move {
131//!      for i in 0..=10 {
132//!         group.spawn_task(Priority::default(), async move {
133//!           // simulate asynchronous operation
134//!              i
135//!          });
136//!      }
137//!
138//!      // Loop over all the results of the child tasks spawned asynchronously
139//!      let mut counter = 0;
140//!      while let Some(x) = group.next().await {
141//!         counter += x;
142//!      }
143//!
144//!     assert_eq!(counter, 55);
145//!
146//! }).await;
147//! # });
148//!
149//! ```
150//!
151//! # Comparisons against existing alternatives
152//! * [`JoinSet`](): Like this alternative, both await the completion of some or all of the child tasks,
153//! spawn child tasks in an unordered manner and the result of their child tasks will be returned in the
154//!  order they complete and also cancel or abort all child tasks. Unlike the `Joinset`,
155//! you can explicitly await for all the child task to finish their execution.
156//! The Spawn group option provides a scope for the child tasks to execute.
157//!
158//! * [`FuturesUnordered`]() Like this alternative, both spawn child tasks in an unordered manner,
159//! but FuturesUnordered doesn't immediately start running the spawned child tasks until it is being polled.
160//! It also doesn't provide a way to cancel all child tasks.
161//!
162//! # Note
163//! * Import ``StreamExt`` trait from ``futures_lite::StreamExt`` or ``futures::stream::StreamExt`` or ``async_std::stream::StreamExt`` to provide a variety of convenient combinator functions on the various spawn groups.
164//! * To await all running child tasks to finish their execution, call ``wait_for_all`` or ``wait_non_async`` methods on the various group instances
165//!
166//! # Warning
167//! * This crate relies on atomics, condition variables
168//! * Avoid calling long, blocking, non asynchronous functions while using any of the spawn groups because it was built with asynchrony in mind.
169
170mod discarding_spawn_group;
171mod err_spawn_group;
172mod spawn_group;
173
174mod async_stream;
175mod executors;
176mod meta_types;
177mod shared;
178mod threadpool_impl;
179
180pub use discarding_spawn_group::DiscardingSpawnGroup;
181pub use err_spawn_group::ErrSpawnGroup;
182pub use executors::block_on;
183pub use meta_types::GetType;
184pub use shared::priority::Priority;
185pub use spawn_group::SpawnGroup;
186
187use std::future::Future;
188use std::marker::PhantomData;
189
190/// Starts a scoped closure that takes a mutable ``SpawnGroup`` instance as an argument which can execute any number of child tasks which its result values are of the generic ``ResultType`` type.
191///
192/// This closure ensures that before the function call ends, all spawned child tasks are implicitly waited for, or the programmer can explicitly wait by calling  its ``wait_for_all()`` method
193/// of the ``SpawnGroup`` struct.
194///
195/// This function use a threadpool of the same number of threads as the number of active processor count that is default amount of parallelism a program can use on the system for polling the spawned tasks
196///
197/// See [`SpawnGroup`](spawn_group::SpawnGroup)
198/// for more.
199///
200/// # Parameters
201///
202/// * `of_type`: The type which the child task can return
203/// * `body`: an async closure that takes a mutable instance of ``SpawnGroup`` as an argument
204///
205/// # Returns
206///
207/// Anything the ``body`` parameter returns
208///
209/// # Example
210///
211/// ```rust
212/// use spawn_groups::GetType;
213/// use spawn_groups::with_type_spawn_group;
214/// use futures_lite::StreamExt;
215/// use spawn_groups::Priority;
216///
217/// # spawn_groups::block_on(async move {
218/// let final_result = with_type_spawn_group(i64::TYPE, |mut group| async move {
219///      for i in 0..=10 {
220///         group.spawn_task(Priority::default(), async move {
221///            // simulate asynchronous operation
222///            i
223///         });
224///      }
225///
226///      group.fold(0, |acc, x| {
227///          acc + x
228///      }).await
229///  }).await;
230///
231///  assert_eq!(final_result, 55);
232/// # });
233/// ```
234pub async fn with_type_spawn_group<Closure, Fut, ResultType, ReturnType>(
235    of_type: PhantomData<ResultType>,
236    body: Closure,
237) -> ReturnType
238where
239    Closure: FnOnce(SpawnGroup<ResultType>) -> Fut,
240    Fut: Future<Output = ReturnType> + 'static,
241{
242    _ = of_type;
243    let task_group = SpawnGroup::<ResultType>::default();
244    body(task_group).await
245}
246
247/// Starts a scoped closure that takes a mutable ``SpawnGroup`` instance as an argument which can execute any number of child tasks which its result values are of the generic ``ResultType`` type.
248///
249/// This closure ensures that before the function call ends, all spawned child tasks are implicitly waited for, or the programmer can explicitly wait by calling  its ``wait_for_all()`` method
250/// of the ``SpawnGroup`` struct.
251///
252/// This function use a threadpool of the same number of threads as the number of active processor count that is default amount of parallelism a program can use on the system for polling the spawned tasks
253///
254/// See [`SpawnGroup`](spawn_group::SpawnGroup)
255/// for more.
256///
257/// # Parameters
258///
259/// * `body`: an async closure that takes a mutable instance of ``SpawnGroup`` as an argument
260///
261/// # Returns
262///
263/// Anything the ``body`` parameter returns
264///
265/// # Example
266///
267/// ```rust
268/// use spawn_groups::GetType;
269/// use spawn_groups::with_spawn_group;
270/// use futures_lite::StreamExt;
271/// use spawn_groups::Priority;
272///
273/// # spawn_groups::block_on(async move {
274/// let final_result = with_spawn_group(|mut group| async move {
275///      for i in 0..=10 {
276///         group.spawn_task(Priority::default(), async move {
277///            // simulate asynchronous operation
278///            i
279///         });
280///      }
281///
282///      group.fold(0, |acc, x| {
283///          acc + x
284///      }).await
285///  }).await;
286///
287///  assert_eq!(final_result, 55);
288/// # });
289/// ```
290pub async fn with_spawn_group<Closure, Fut, ResultType, ReturnType>(body: Closure) -> ReturnType
291where
292    Closure: FnOnce(SpawnGroup<ResultType>) -> Fut,
293    Fut: Future<Output = ReturnType> + 'static,
294{
295    let task_group = SpawnGroup::<ResultType>::default();
296    body(task_group).await
297}
298
299/// Starts a scoped closure that takes a mutable ``ErrSpawnGroup`` instance as an argument which can execute any number of child tasks which its result values are of the type ``Result<ResultType, ErrorType>``
300/// where ``ResultType`` can be of type and ``ErrorType`` which is any type that implements the standard ``Error`` type.
301///
302/// This closure ensures that before the function call ends, all spawned child tasks are implicitly waited for, or the programmer can explicitly wait by calling its ``wait_for_all()`` method
303/// of the ``ErrSpawnGroup`` struct
304///
305/// This function use a threadpool of the same number of threads as the number of active processor count that is default amount of parallelism a program can use on the system for polling the spawned tasks
306///
307/// See [`ErrSpawnGroup`](err_spawn_group::ErrSpawnGroup)
308/// for more.
309///
310/// # Parameters
311///
312/// * `of_type`: The type which the child task can return
313/// * `error_type`: The error type which the child task can return
314/// * `body`: an async closure that takes a mutable instance of ``ErrSpawnGroup`` as an argument
315///
316/// # Returns
317///
318/// Anything the ``body`` parameter returns
319///
320/// # Example
321///
322/// ```rust
323/// use std::error::Error;
324/// use std::fmt::Display;
325/// use spawn_groups::GetType;
326/// use spawn_groups::with_err_type_spawn_group;
327/// use futures_lite::StreamExt;
328/// use spawn_groups::Priority;
329///
330/// #[derive(Debug)]
331/// enum DivisibleByError {
332///     THREE,
333///     FIVE
334/// }
335///
336/// impl Display for DivisibleByError {
337///     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
338///         match self {
339///             DivisibleByError::THREE => f.write_str("Divisible by three"),
340///             DivisibleByError::FIVE => f.write_str("Divisible by five")
341///         }
342///     }
343/// }
344///
345/// impl Error for DivisibleByError {}
346///
347/// # spawn_groups::block_on(async move {
348/// let final_results = with_err_type_spawn_group(u8::TYPE, DivisibleByError::TYPE, |mut group| async move {
349///     for i in 1..=10 {
350///         group.spawn_task(Priority::default(), async move {
351///          // simulate asynchronous processing that might fail and
352///          // return a value of ErrorType specified above
353///             if i % 3 == 0 {
354///                return Err(DivisibleByError::THREE)
355///             } else if i % 5 == 0 {
356///                return Err(DivisibleByError::FIVE)
357///             }
358///             Ok(i)
359///           });
360///        }
361///             
362///   // Explicitly wait for the all spawned child tasks to finish
363///     group.wait_for_all().await;
364///
365///     let mut sum_result = 0;
366///     let mut divisible_by_five = 0;
367///     let mut divisible_by_three = 0;
368///     while let Some(group_result) = group.next().await {
369///        if let Ok(result) = group_result {
370///          sum_result += result;
371///        } else if let Err(err_result) = group_result {
372///            match err_result {
373///              DivisibleByError::THREE => divisible_by_three += 1,
374///              DivisibleByError::FIVE => divisible_by_five += 1
375///            }
376///        }
377///     }
378///
379///     (sum_result, divisible_by_three, divisible_by_five)
380///
381/// }).await;
382///
383/// assert_eq!(final_results.0, 22);
384/// assert_eq!(final_results.1, 3);
385/// assert_eq!(final_results.2, 2);
386/// # });
387/// ```
388pub async fn with_err_type_spawn_group<Closure, Fut, ResultType, ErrorType, ReturnType>(
389    of_type: PhantomData<ResultType>,
390    error_type: PhantomData<ErrorType>,
391    body: Closure,
392) -> ReturnType
393where
394    Fut: Future<Output = ReturnType>,
395    Closure: FnOnce(ErrSpawnGroup<ResultType, ErrorType>) -> Fut,
396{
397    _ = (of_type, error_type);
398    let task_group = ErrSpawnGroup::<ResultType, ErrorType>::default();
399    body(task_group).await
400}
401
402/// Starts a scoped closure that takes a mutable ``ErrSpawnGroup`` instance as an argument which can execute any number of child tasks which its result values are of the type ``Result<ResultType, ErrorType>``
403/// where ``ResultType`` can be of type and ``ErrorType`` which is any type that implements the standard ``Error`` type.
404///
405/// This closure ensures that before the function call ends, all spawned child tasks are implicitly waited for, or the programmer can explicitly wait by calling its ``wait_for_all()`` method
406/// of the ``ErrSpawnGroup`` struct
407///
408/// This function use a threadpoolof the same number of threads as the number of active processor count that is default amount of parallelism a program can use on the system  for polling the spawned tasks
409///
410/// See [`ErrSpawnGroup`](err_spawn_group::ErrSpawnGroup)
411/// for more.
412///
413/// # Parameters
414///
415/// * `body`: an async closure that takes a mutable instance of ``ErrSpawnGroup`` as an argument
416///
417/// # Returns
418///
419/// Anything the ``body`` parameter returns
420///
421/// # Example
422///
423/// ```rust
424/// use std::error::Error;
425/// use std::fmt::Display;
426/// use spawn_groups::GetType;
427/// use spawn_groups::with_err_spawn_group;
428/// use futures_lite::StreamExt;
429/// use spawn_groups::Priority;
430///
431/// #[derive(Debug)]
432/// enum DivisibleByError {
433///     THREE,
434///     FIVE
435/// }
436///
437/// impl Display for DivisibleByError {
438///     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
439///         match self {
440///             DivisibleByError::THREE => f.write_str("Divisible by three"),
441///             DivisibleByError::FIVE => f.write_str("Divisible by five")
442///         }
443///     }
444/// }
445///
446/// impl Error for DivisibleByError {}
447///
448/// # spawn_groups::block_on(async move {
449/// let final_results = with_err_spawn_group(|mut group| async move {
450///     for i in 1..=10 {
451///         group.spawn_task(Priority::default(), async move {
452///          // simulate asynchronous processing that might fail and
453///          // return a value of ErrorType specified above
454///             if i % 3 == 0 {
455///                return Err(DivisibleByError::THREE)
456///             } else if i % 5 == 0 {
457///                return Err(DivisibleByError::FIVE)
458///             }
459///             Ok(i)
460///           });
461///        }
462///             
463///   // Explicitly wait for the all spawned child tasks to finish
464///     group.wait_for_all().await;
465///
466///     let mut sum_result = 0;
467///     let mut divisible_by_five = 0;
468///     let mut divisible_by_three = 0;
469///     while let Some(group_result) = group.next().await {
470///        if let Ok(result) = group_result {
471///          sum_result += result;
472///        } else if let Err(err_result) = group_result {
473///            match err_result {
474///              DivisibleByError::THREE => divisible_by_three += 1,
475///              DivisibleByError::FIVE => divisible_by_five += 1
476///            }
477///        }
478///     }
479///
480///     (sum_result, divisible_by_three, divisible_by_five)
481///
482/// }).await;
483///
484/// assert_eq!(final_results.0, 22);
485/// assert_eq!(final_results.1, 3);
486/// assert_eq!(final_results.2, 2);
487/// # });
488/// ```
489pub async fn with_err_spawn_group<Closure, Fut, ResultType, ErrorType, ReturnType>(
490    body: Closure,
491) -> ReturnType
492where
493    Fut: Future<Output = ReturnType>,
494    Closure: FnOnce(ErrSpawnGroup<ResultType, ErrorType>) -> Fut,
495{
496    let task_group = ErrSpawnGroup::<ResultType, ErrorType>::default();
497    body(task_group).await
498}
499
500/// Starts a scoped closure that takes a mutable ``DiscardingSpawnGroup`` instance as an argument which can execute any number of child tasks which return nothing.
501///
502/// Ensures that before the function call ends, all spawned tasks are implicitly waited for
503///
504/// This function use a threadpool of the same number of threads as the number of active processor count that is default amount of parallelism a program can use on the system for polling the spawned tasks
505///
506/// See [`DiscardingSpawnGroup`](discarding_spawn_group::DiscardingSpawnGroup)
507/// for more.
508///
509/// # Parameters
510///
511/// * `body`: an async closure that takes an instance of ``DiscardingSpawnGroup`` as an argument
512///
513/// # Returns
514///
515/// Anything the ``body`` parameter returns
516///   
517/// # Example
518///
519/// ```rust
520/// use spawn_groups::GetType;
521/// use spawn_groups::with_discarding_spawn_group;
522/// use futures_lite::StreamExt;
523/// use spawn_groups::Priority;
524///
525/// # spawn_groups::block_on(async move {
526/// with_discarding_spawn_group(|mut group| async move {
527///     for i in 0..11 {
528///        group.spawn_task(Priority::default(), async move {
529///         // asynchronous processing
530///         // or some async network calls
531///        });
532///     }
533///
534/// }).await;
535/// # });
536/// ```
537pub async fn with_discarding_spawn_group<Closure, Fut, ReturnType>(body: Closure) -> ReturnType
538where
539    Fut: Future<Output = ReturnType>,
540    Closure: FnOnce(DiscardingSpawnGroup) -> Fut,
541{
542    let discarding_tg = DiscardingSpawnGroup::default();
543    body(discarding_tg).await
544}