spawn_groups/lib.rs
1//! A structured concurrency construct which provides a way to spawn and run an arbitrary number of child tasks,
2//! possibly await the results of each child task or even cancel all running child tasks.
3//! This was heavily influenced by the Swift language's [`TaskGroup`](https://developer.apple.com/documentation/swift/taskgroup).
4//!
5//! # Installation
6//! Add to your source code
7//!
8//! ```sh
9//! cargo add spawn_groups
10//! ```
11//!
12//! # Example
13//!
14//! ```ignore
15//! use async_std::stream::StreamExt;
16//! use spawn_groups::{with_err_spawn_group, GetType, Priority};
17//! use std::time::Instant;
18//! use surf::{Error, Client, http::Mime, StatusCode};
19//!
20//! async fn get_mimetype<AsStr: AsRef<str>>(url: AsStr, client: Client) -> Option<Mime> {
21//! let Ok(resp) = client.get(url).send().await else {
22//! return None;
23//! };
24//! resp.content_type()
25//! }
26//!
27//! #[async_std::main]
28//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
29//! let client = surf::Client::new();
30//! let urls = [
31//! "https://www.google.com",
32//! "https://www.bing.com",
33//! "https://www.yandex.com",
34//! "https://www.duckduckgo.com",
35//! "https://www.wikipedia.org",
36//! "https://www.whatsapp.com",
37//! "https://www.yahoo.com",
38//! "https://www.amazon.com",
39//! "https://www.baidu.com",
40//! "https://www.youtube.com",
41//! "https://facebook.com",
42//! "https://www.instagram.com",
43//! "https://tiktok.com",
44//! ];
45//! with_err_spawn_group(move |mut group| async move {
46//! println!("About to start");
47//! let now = Instant::now();
48//! for url in urls {
49//! let client = client.clone();
50//! group.spawn_task(Priority::default(), async move {
51//! let Some(mimetype) = get_mimetype(url, client).await else {
52//! return Err(Error::from_str(StatusCode::ExpectationFailed, format!("No content type found for {}", url)));
53//! }
54//! Ok(format!("{url}: {}", mimetype))
55//! })
56//! }
57//!
58//! while let Some(result) = group.next().await {
59//! if let Err(error) = result {
60//! eprintln!("{}", error);
61//! } else {
62//! println!("{}", result.unwrap());
63//! }
64//! }
65//! println("Ended");
66//! println!("It took {} nanoseconds", now.elapsed().as_nanos());
67//! })
68//! .await;
69//! Ok(())
70//! }
71//! ```
72//!
73//! # Usage
74//!
75//! To properly use this crate
76//! * ``with_spawn_group`` for the creation of a dynamic number of asynchronous tasks that return a value. See [`with_spawn_group`](self::with_spawn_group)
77//! for more information
78//!
79//! * ``with_type_spawn_group`` for the creation of a dynamic number of asynchronous tasks that return a value by specifying the type explicitly. See [`with_type_spawn_group`](self::with_type_spawn_group)
80//! for more information
81//!
82//! * ``with_err_spawn_group`` for the creation of a dynamic number of asynchronous tasks that return a value or an error.
83//! See [`with_err_spawn_group`](self::with_err_spawn_group)
84//! for more information
85//!
86//! * ``with_err_type_spawn_group`` for the creation of a dynamic number of asynchronous tasks that return a value or an error by specifiying the return type and the error type explicitly.
87//! See [`with_err_type_spawn_group`](self::with_err_type_spawn_group)
88//! for more information
89//!
90//! * ``with_discarding_spawn_group`` for the creation of a dynamic number of asynchronous tasks that returns nothing.
91//! See [`with_discarding_spawn_group`](self::with_discarding_spawn_group)
92//! for more information
93//!
94//! * ``block_on`` polls future to finish. See [`block_on`](self::block_on)
95//! for more information
96//!
97//! # Spawning Child Tasks
98//!
99//! Child tasks are spawned by calling either `spawn_task` or `spawn_task_unless_cancelled` methods on any of the spawn groups' instance.
100//!
101//! To avoid spawning new child tasks to an already cancelled spawn group, use ``spawn_task_unless_cancelled``
102//! rather than the plain ``spawn_task`` which spawns new child tasks unconditionally.
103//!
104//! # Child Task Execution Order
105//! Child tasks are scheduled in any order and spawned child tasks execute concurrently.
106//!
107//! # Cancellation
108//!
109//! By calling explicitly calling the ``cancel_all`` method on any of the spawn groups' instance, all running child tasks
110//! are immediately cancelled.
111//!
112//! # Waiting
113//!
114//! By calling explicitly calling the ``wait_for_all_tasks`` method on any of the spawn groups' instance, all child tasks
115//! are immediately awaited for.
116//!
117//! # Stream
118//!
119//! Both [`SpawnGroup`](self::spawn_group::SpawnGroup) and [`ErrSpawnGroup`](self::err_spawn_group::ErrSpawnGroup) structs implements the ``futures_lite::Stream``
120//! which means that you can await the result of each child task asynchronously and with the help of ``StreamExt`` trait, one can call methods such as ``next``,
121//! ``map``, ``filter_map``, ``fold`` and so much more.
122//!
123//! ```rust
124//! use spawn_groups::with_spawn_group;
125//! use futures_lite::StreamExt;
126//! use spawn_groups::Priority;
127//! use spawn_groups::GetType;
128//!
129//! # spawn_groups::block_on(async move {
130//! with_spawn_group(|mut group| async move {
131//! for i in 0..=10 {
132//! group.spawn_task(Priority::default(), async move {
133//! // simulate asynchronous operation
134//! i
135//! });
136//! }
137//!
138//! // Loop over all the results of the child tasks spawned asynchronously
139//! let mut counter = 0;
140//! while let Some(x) = group.next().await {
141//! counter += x;
142//! }
143//!
144//! assert_eq!(counter, 55);
145//!
146//! }).await;
147//! # });
148//!
149//! ```
150//!
151//! # Comparisons against existing alternatives
152//! * [`JoinSet`](): Like this alternative, both await the completion of some or all of the child tasks,
153//! spawn child tasks in an unordered manner and the result of their child tasks will be returned in the
154//! order they complete and also cancel or abort all child tasks. Unlike the `Joinset`,
155//! you can explicitly await for all the child task to finish their execution.
156//! The Spawn group option provides a scope for the child tasks to execute.
157//!
158//! * [`FuturesUnordered`]() Like this alternative, both spawn child tasks in an unordered manner,
159//! but FuturesUnordered doesn't immediately start running the spawned child tasks until it is being polled.
160//! It also doesn't provide a way to cancel all child tasks.
161//!
162//! # Note
163//! * Import ``StreamExt`` trait from ``futures_lite::StreamExt`` or ``futures::stream::StreamExt`` or ``async_std::stream::StreamExt`` to provide a variety of convenient combinator functions on the various spawn groups.
164//! * To await all running child tasks to finish their execution, call ``wait_for_all`` or ``wait_non_async`` methods on the various group instances
165//!
166//! # Warning
167//! * This crate relies on atomics, condition variables
168//! * Avoid calling long, blocking, non asynchronous functions while using any of the spawn groups because it was built with asynchrony in mind.
169
170mod discarding_spawn_group;
171mod err_spawn_group;
172mod spawn_group;
173
174mod async_stream;
175mod executors;
176mod meta_types;
177mod shared;
178mod threadpool_impl;
179
180pub use discarding_spawn_group::DiscardingSpawnGroup;
181pub use err_spawn_group::ErrSpawnGroup;
182pub use executors::block_on;
183pub use meta_types::GetType;
184pub use shared::priority::Priority;
185pub use spawn_group::SpawnGroup;
186
187use std::future::Future;
188use std::marker::PhantomData;
189
190/// Starts a scoped closure that takes a mutable ``SpawnGroup`` instance as an argument which can execute any number of child tasks which its result values are of the generic ``ResultType`` type.
191///
192/// This closure ensures that before the function call ends, all spawned child tasks are implicitly waited for, or the programmer can explicitly wait by calling its ``wait_for_all()`` method
193/// of the ``SpawnGroup`` struct.
194///
195/// This function use a threadpool of the same number of threads as the number of active processor count that is default amount of parallelism a program can use on the system for polling the spawned tasks
196///
197/// See [`SpawnGroup`](spawn_group::SpawnGroup)
198/// for more.
199///
200/// # Parameters
201///
202/// * `of_type`: The type which the child task can return
203/// * `body`: an async closure that takes a mutable instance of ``SpawnGroup`` as an argument
204///
205/// # Returns
206///
207/// Anything the ``body`` parameter returns
208///
209/// # Example
210///
211/// ```rust
212/// use spawn_groups::GetType;
213/// use spawn_groups::with_type_spawn_group;
214/// use futures_lite::StreamExt;
215/// use spawn_groups::Priority;
216///
217/// # spawn_groups::block_on(async move {
218/// let final_result = with_type_spawn_group(i64::TYPE, |mut group| async move {
219/// for i in 0..=10 {
220/// group.spawn_task(Priority::default(), async move {
221/// // simulate asynchronous operation
222/// i
223/// });
224/// }
225///
226/// group.fold(0, |acc, x| {
227/// acc + x
228/// }).await
229/// }).await;
230///
231/// assert_eq!(final_result, 55);
232/// # });
233/// ```
234pub async fn with_type_spawn_group<Closure, Fut, ResultType, ReturnType>(
235 of_type: PhantomData<ResultType>,
236 body: Closure,
237) -> ReturnType
238where
239 Closure: FnOnce(SpawnGroup<ResultType>) -> Fut,
240 Fut: Future<Output = ReturnType> + 'static,
241{
242 _ = of_type;
243 let task_group = SpawnGroup::<ResultType>::default();
244 body(task_group).await
245}
246
247/// Starts a scoped closure that takes a mutable ``SpawnGroup`` instance as an argument which can execute any number of child tasks which its result values are of the generic ``ResultType`` type.
248///
249/// This closure ensures that before the function call ends, all spawned child tasks are implicitly waited for, or the programmer can explicitly wait by calling its ``wait_for_all()`` method
250/// of the ``SpawnGroup`` struct.
251///
252/// This function use a threadpool of the same number of threads as the number of active processor count that is default amount of parallelism a program can use on the system for polling the spawned tasks
253///
254/// See [`SpawnGroup`](spawn_group::SpawnGroup)
255/// for more.
256///
257/// # Parameters
258///
259/// * `body`: an async closure that takes a mutable instance of ``SpawnGroup`` as an argument
260///
261/// # Returns
262///
263/// Anything the ``body`` parameter returns
264///
265/// # Example
266///
267/// ```rust
268/// use spawn_groups::GetType;
269/// use spawn_groups::with_spawn_group;
270/// use futures_lite::StreamExt;
271/// use spawn_groups::Priority;
272///
273/// # spawn_groups::block_on(async move {
274/// let final_result = with_spawn_group(|mut group| async move {
275/// for i in 0..=10 {
276/// group.spawn_task(Priority::default(), async move {
277/// // simulate asynchronous operation
278/// i
279/// });
280/// }
281///
282/// group.fold(0, |acc, x| {
283/// acc + x
284/// }).await
285/// }).await;
286///
287/// assert_eq!(final_result, 55);
288/// # });
289/// ```
290pub async fn with_spawn_group<Closure, Fut, ResultType, ReturnType>(body: Closure) -> ReturnType
291where
292 Closure: FnOnce(SpawnGroup<ResultType>) -> Fut,
293 Fut: Future<Output = ReturnType> + 'static,
294{
295 let task_group = SpawnGroup::<ResultType>::default();
296 body(task_group).await
297}
298
299/// Starts a scoped closure that takes a mutable ``ErrSpawnGroup`` instance as an argument which can execute any number of child tasks which its result values are of the type ``Result<ResultType, ErrorType>``
300/// where ``ResultType`` can be of type and ``ErrorType`` which is any type that implements the standard ``Error`` type.
301///
302/// This closure ensures that before the function call ends, all spawned child tasks are implicitly waited for, or the programmer can explicitly wait by calling its ``wait_for_all()`` method
303/// of the ``ErrSpawnGroup`` struct
304///
305/// This function use a threadpool of the same number of threads as the number of active processor count that is default amount of parallelism a program can use on the system for polling the spawned tasks
306///
307/// See [`ErrSpawnGroup`](err_spawn_group::ErrSpawnGroup)
308/// for more.
309///
310/// # Parameters
311///
312/// * `of_type`: The type which the child task can return
313/// * `error_type`: The error type which the child task can return
314/// * `body`: an async closure that takes a mutable instance of ``ErrSpawnGroup`` as an argument
315///
316/// # Returns
317///
318/// Anything the ``body`` parameter returns
319///
320/// # Example
321///
322/// ```rust
323/// use std::error::Error;
324/// use std::fmt::Display;
325/// use spawn_groups::GetType;
326/// use spawn_groups::with_err_type_spawn_group;
327/// use futures_lite::StreamExt;
328/// use spawn_groups::Priority;
329///
330/// #[derive(Debug)]
331/// enum DivisibleByError {
332/// THREE,
333/// FIVE
334/// }
335///
336/// impl Display for DivisibleByError {
337/// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
338/// match self {
339/// DivisibleByError::THREE => f.write_str("Divisible by three"),
340/// DivisibleByError::FIVE => f.write_str("Divisible by five")
341/// }
342/// }
343/// }
344///
345/// impl Error for DivisibleByError {}
346///
347/// # spawn_groups::block_on(async move {
348/// let final_results = with_err_type_spawn_group(u8::TYPE, DivisibleByError::TYPE, |mut group| async move {
349/// for i in 1..=10 {
350/// group.spawn_task(Priority::default(), async move {
351/// // simulate asynchronous processing that might fail and
352/// // return a value of ErrorType specified above
353/// if i % 3 == 0 {
354/// return Err(DivisibleByError::THREE)
355/// } else if i % 5 == 0 {
356/// return Err(DivisibleByError::FIVE)
357/// }
358/// Ok(i)
359/// });
360/// }
361///
362/// // Explicitly wait for the all spawned child tasks to finish
363/// group.wait_for_all().await;
364///
365/// let mut sum_result = 0;
366/// let mut divisible_by_five = 0;
367/// let mut divisible_by_three = 0;
368/// while let Some(group_result) = group.next().await {
369/// if let Ok(result) = group_result {
370/// sum_result += result;
371/// } else if let Err(err_result) = group_result {
372/// match err_result {
373/// DivisibleByError::THREE => divisible_by_three += 1,
374/// DivisibleByError::FIVE => divisible_by_five += 1
375/// }
376/// }
377/// }
378///
379/// (sum_result, divisible_by_three, divisible_by_five)
380///
381/// }).await;
382///
383/// assert_eq!(final_results.0, 22);
384/// assert_eq!(final_results.1, 3);
385/// assert_eq!(final_results.2, 2);
386/// # });
387/// ```
388pub async fn with_err_type_spawn_group<Closure, Fut, ResultType, ErrorType, ReturnType>(
389 of_type: PhantomData<ResultType>,
390 error_type: PhantomData<ErrorType>,
391 body: Closure,
392) -> ReturnType
393where
394 Fut: Future<Output = ReturnType>,
395 Closure: FnOnce(ErrSpawnGroup<ResultType, ErrorType>) -> Fut,
396{
397 _ = (of_type, error_type);
398 let task_group = ErrSpawnGroup::<ResultType, ErrorType>::default();
399 body(task_group).await
400}
401
402/// Starts a scoped closure that takes a mutable ``ErrSpawnGroup`` instance as an argument which can execute any number of child tasks which its result values are of the type ``Result<ResultType, ErrorType>``
403/// where ``ResultType`` can be of type and ``ErrorType`` which is any type that implements the standard ``Error`` type.
404///
405/// This closure ensures that before the function call ends, all spawned child tasks are implicitly waited for, or the programmer can explicitly wait by calling its ``wait_for_all()`` method
406/// of the ``ErrSpawnGroup`` struct
407///
408/// This function use a threadpoolof the same number of threads as the number of active processor count that is default amount of parallelism a program can use on the system for polling the spawned tasks
409///
410/// See [`ErrSpawnGroup`](err_spawn_group::ErrSpawnGroup)
411/// for more.
412///
413/// # Parameters
414///
415/// * `body`: an async closure that takes a mutable instance of ``ErrSpawnGroup`` as an argument
416///
417/// # Returns
418///
419/// Anything the ``body`` parameter returns
420///
421/// # Example
422///
423/// ```rust
424/// use std::error::Error;
425/// use std::fmt::Display;
426/// use spawn_groups::GetType;
427/// use spawn_groups::with_err_spawn_group;
428/// use futures_lite::StreamExt;
429/// use spawn_groups::Priority;
430///
431/// #[derive(Debug)]
432/// enum DivisibleByError {
433/// THREE,
434/// FIVE
435/// }
436///
437/// impl Display for DivisibleByError {
438/// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
439/// match self {
440/// DivisibleByError::THREE => f.write_str("Divisible by three"),
441/// DivisibleByError::FIVE => f.write_str("Divisible by five")
442/// }
443/// }
444/// }
445///
446/// impl Error for DivisibleByError {}
447///
448/// # spawn_groups::block_on(async move {
449/// let final_results = with_err_spawn_group(|mut group| async move {
450/// for i in 1..=10 {
451/// group.spawn_task(Priority::default(), async move {
452/// // simulate asynchronous processing that might fail and
453/// // return a value of ErrorType specified above
454/// if i % 3 == 0 {
455/// return Err(DivisibleByError::THREE)
456/// } else if i % 5 == 0 {
457/// return Err(DivisibleByError::FIVE)
458/// }
459/// Ok(i)
460/// });
461/// }
462///
463/// // Explicitly wait for the all spawned child tasks to finish
464/// group.wait_for_all().await;
465///
466/// let mut sum_result = 0;
467/// let mut divisible_by_five = 0;
468/// let mut divisible_by_three = 0;
469/// while let Some(group_result) = group.next().await {
470/// if let Ok(result) = group_result {
471/// sum_result += result;
472/// } else if let Err(err_result) = group_result {
473/// match err_result {
474/// DivisibleByError::THREE => divisible_by_three += 1,
475/// DivisibleByError::FIVE => divisible_by_five += 1
476/// }
477/// }
478/// }
479///
480/// (sum_result, divisible_by_three, divisible_by_five)
481///
482/// }).await;
483///
484/// assert_eq!(final_results.0, 22);
485/// assert_eq!(final_results.1, 3);
486/// assert_eq!(final_results.2, 2);
487/// # });
488/// ```
489pub async fn with_err_spawn_group<Closure, Fut, ResultType, ErrorType, ReturnType>(
490 body: Closure,
491) -> ReturnType
492where
493 Fut: Future<Output = ReturnType>,
494 Closure: FnOnce(ErrSpawnGroup<ResultType, ErrorType>) -> Fut,
495{
496 let task_group = ErrSpawnGroup::<ResultType, ErrorType>::default();
497 body(task_group).await
498}
499
500/// Starts a scoped closure that takes a mutable ``DiscardingSpawnGroup`` instance as an argument which can execute any number of child tasks which return nothing.
501///
502/// Ensures that before the function call ends, all spawned tasks are implicitly waited for
503///
504/// This function use a threadpool of the same number of threads as the number of active processor count that is default amount of parallelism a program can use on the system for polling the spawned tasks
505///
506/// See [`DiscardingSpawnGroup`](discarding_spawn_group::DiscardingSpawnGroup)
507/// for more.
508///
509/// # Parameters
510///
511/// * `body`: an async closure that takes an instance of ``DiscardingSpawnGroup`` as an argument
512///
513/// # Returns
514///
515/// Anything the ``body`` parameter returns
516///
517/// # Example
518///
519/// ```rust
520/// use spawn_groups::GetType;
521/// use spawn_groups::with_discarding_spawn_group;
522/// use futures_lite::StreamExt;
523/// use spawn_groups::Priority;
524///
525/// # spawn_groups::block_on(async move {
526/// with_discarding_spawn_group(|mut group| async move {
527/// for i in 0..11 {
528/// group.spawn_task(Priority::default(), async move {
529/// // asynchronous processing
530/// // or some async network calls
531/// });
532/// }
533///
534/// }).await;
535/// # });
536/// ```
537pub async fn with_discarding_spawn_group<Closure, Fut, ReturnType>(body: Closure) -> ReturnType
538where
539 Fut: Future<Output = ReturnType>,
540 Closure: FnOnce(DiscardingSpawnGroup) -> Fut,
541{
542 let discarding_tg = DiscardingSpawnGroup::default();
543 body(discarding_tg).await
544}