Crate spawn_groups
source ·Expand description
A structured concurrency construct which provides a way to spawn and run an arbitrary number of child tasks,
possibly await the results of each child task or even cancel all running child tasks.
This was heavily influenced by the Swift language’s TaskGroup
.
Installation
Add to your source code
cargo add spawn_groups
Example
use async_std::io::{self};
use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*;
use spawn_groups::{with_err_spawn_group, GetType, Priority};
async fn process(stream: TcpStream) -> io::Result<()> {
println!("Accepted from local: {}", stream.local_addr()?);
println!("Accepted from: {}", stream.peer_addr()?);
let mut reader = stream.clone();
let mut writer = stream;
io::copy(&mut reader, &mut writer).await?;
Ok(())
}
type Void = ();
#[async_std::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Listening on {}", listener.local_addr()?);
with_err_spawn_group(Void::TYPE, io::Error::TYPE, |mut group| async move {
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let Ok(stream) = stream else {
return Err(stream.expect_err("Expected an error"));
};
group.spawn_task(Priority::default(), async move { process(stream).await });
}
Ok(())
})
.await?;
Ok(())
}
Usage
To properly use this crate
-
with_spawn_group
for the creation of a dynamic number of asynchronous tasks that return a value. Seewith_spawn_group
for more information -
with_err_spawn_group
for the creation of a dynamic number of asynchronous tasks that return a value or an error. Seewith_err_spawn_group
for more information -
with_discarding_spawn_group
for the creation of a dynamic number of asynchronous tasks that returns nothing. Seewith_discarding_spawn_group
for more information
Spawning Child Tasks
Child tasks are spawned by calling either spawn_task
or spawn_task_unless_cancelled
methods on any of the spawn groups’ instance.
To avoid spawning new child tasks to an already cancelled spawn group, use spawn_task_unless_cancelled
rather than the plain spawn_task
which spawns new child tasks unconditionally.
Child Task Execution Order
Child tasks are scheduled in any order and spawned child tasks execute concurrently.
Cancellation
By calling explicitly calling the cancel_all
method on any of the spawn groups’ instance, all running child tasks
are immediately cancelled.
Waiting
By calling explicitly calling the wait_for_all_tasks
method on any of the spawn groups’ instance, all child tasks
are immediately awaited for.
Stream
Both SpawnGroup
and ErrSpawnGroup
structs implements the futures_lite::Stream
which means that you can await the result of each child task asynchronously and with the help of StreamExt
trait, one can call methods such as next
,
map
, filter_map
, fold
and so much more.
If you want a specific number of results from the spawned child tasks,
consider calling get_chunks
method instead of iterating over
the spawn group instance which waits for all child tasks to finish their execution
use spawn_groups::with_spawn_group;
use futures_lite::StreamExt;
use spawn_groups::Priority;
use spawn_groups::GetType;
with_spawn_group(i64::TYPE, |mut group| async move {
for i in 0..=10 {
group.spawn_task(Priority::default(), async move {
// simulate asynchronous operation
i
});
}
// Loop over all the results of the child tasks spawned already
while let Some(x) = group.next().await {
println!("{}", x);
}
}).await;
Note
- Import
StreamExt
trait fromfutures_lite::StreamExt
orfutures::stream::StreamExt
orasync_std::stream::StreamExt
to provide a variety of convenient combinator functions on the various spawn groups. - To await all running child tasks to finish their execution, call
wait_for_all
method on the spawn group instance unless using thewith_discarding_spawn_group
function.
Warning
- This crate relies on atomics
- Avoid using a spawn group from outside the above functions this crate provides
- Avoid calling long, blocking, non asynchronous functions while using any of the spawn groups because it was built with asynchrony in mind.
- Avoid spawning off an asynchronous function such as calling spawn methods from crate such as tokio, async_std, smol, etc.
Re-exports
pub use meta_types::GetType;
Modules
Enums
- Task Priority
Functions
- Starts a scoped closure that takes a mutable
DiscardingSpawnGroup
instance as an argument which can execute any number of child tasks which return nothing. - Starts a scoped closure that takes a mutable
ErrSpawnGroup
instance as an argument which can execute any number of child tasks which its result values are of the typeResult<ResultType, ErrorType>
whereResultType
can be of type andErrorType
which is any type that implements the standardError
type. - Starts a scoped closure that takes a mutable
SpawnGroup
instance as an argument which can execute any number of child tasks which its result values are of the genericResultType
type.