Crate spawn_groups

Crate spawn_groups 

Source
Expand description

A structured concurrency construct which provides a way to spawn and run an arbitrary number of child tasks, possibly await the results of each child task or even cancel all running child tasks. This was heavily influenced by the Swift language’s TaskGroup.

§Installation

Add to your source code

cargo add spawn_groups

§Example

use async_std::stream::StreamExt;
use spawn_groups::{with_err_spawn_group, GetType, Priority};
use std::time::Instant;
use surf::{Error, Client, http::Mime, StatusCode};

async fn get_mimetype<AsStr: AsRef<str>>(url: AsStr, client: Client) -> Option<Mime> {
    let Ok(resp) = client.get(url).send().await else {
        return None;
    };
    resp.content_type()
}

#[async_std::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = surf::Client::new();
    let urls = [
        "https://www.google.com",
        "https://www.bing.com",
        "https://www.yandex.com",
        "https://www.duckduckgo.com",
        "https://www.wikipedia.org",
        "https://www.whatsapp.com",
        "https://www.yahoo.com",
        "https://www.amazon.com",
        "https://www.baidu.com",
        "https://www.youtube.com",
        "https://facebook.com",
        "https://www.instagram.com",
        "https://tiktok.com",
    ];
    with_err_spawn_group(move |mut group| async move {
        println!("About to start");
        let now = Instant::now();
        for url in urls {
            let client = client.clone();
            group.spawn_task(Priority::default(), async move {
                let Some(mimetype) = get_mimetype(url, client).await else {
                    return Err(Error::from_str(StatusCode::ExpectationFailed, format!("No content type found for {}", url)));
                }
                Ok(format!("{url}: {}", mimetype))
            })
        }

        while let Some(result) = group.next().await {
            if let Err(error) = result {
                eprintln!("{}", error);
            } else {
                println!("{}", result.unwrap());
            }
        }
        println("Ended");
        println!("It took {} nanoseconds", now.elapsed().as_nanos());
    })
    .await;
    Ok(())
}

§Usage

To properly use this crate

  • with_spawn_group for the creation of a dynamic number of asynchronous tasks that return a value. See with_spawn_group for more information

  • with_type_spawn_group for the creation of a dynamic number of asynchronous tasks that return a value by specifying the type explicitly. See with_type_spawn_group for more information

  • with_err_spawn_group for the creation of a dynamic number of asynchronous tasks that return a value or an error. See with_err_spawn_group for more information

  • with_err_type_spawn_group for the creation of a dynamic number of asynchronous tasks that return a value or an error by specifiying the return type and the error type explicitly. See with_err_type_spawn_group for more information

  • with_discarding_spawn_group for the creation of a dynamic number of asynchronous tasks that returns nothing. See with_discarding_spawn_group for more information

  • block_on polls future to finish. See block_on for more information

§Spawning Child Tasks

Child tasks are spawned by calling either spawn_task or spawn_task_unless_cancelled methods on any of the spawn groups’ instance.

To avoid spawning new child tasks to an already cancelled spawn group, use spawn_task_unless_cancelled rather than the plain spawn_task which spawns new child tasks unconditionally.

§Child Task Execution Order

Child tasks are scheduled in any order and spawned child tasks execute concurrently.

§Cancellation

By calling explicitly calling the cancel_all method on any of the spawn groups’ instance, all running child tasks are immediately cancelled.

§Waiting

By calling explicitly calling the wait_for_all_tasks method on any of the spawn groups’ instance, all child tasks are immediately awaited for.

§Stream

Both SpawnGroup and ErrSpawnGroup structs implements the futures_lite::Stream which means that you can await the result of each child task asynchronously and with the help of StreamExt trait, one can call methods such as next, map, filter_map, fold and so much more.

use spawn_groups::with_spawn_group;
use futures_lite::StreamExt;
use spawn_groups::Priority;
use spawn_groups::GetType;

with_spawn_group(|mut group| async move {
     for i in 0..=10 {
        group.spawn_task(Priority::default(), async move {
          // simulate asynchronous operation
             i
         });
     }

     // Loop over all the results of the child tasks spawned asynchronously
     let mut counter = 0;
     while let Some(x) = group.next().await {
        counter += x;
     }

    assert_eq!(counter, 55);

}).await;

§Comparisons against existing alternatives

  • JoinSet: Like this alternative, both await the completion of some or all of the child tasks, spawn child tasks in an unordered manner and the result of their child tasks will be returned in the order they complete and also cancel or abort all child tasks. Unlike the Joinset, you can explicitly await for all the child task to finish their execution. The Spawn group option provides a scope for the child tasks to execute.

  • FuturesUnordered Like this alternative, both spawn child tasks in an unordered manner, but FuturesUnordered doesn’t immediately start running the spawned child tasks until it is being polled. It also doesn’t provide a way to cancel all child tasks.

§Note

  • Import StreamExt trait from futures_lite::StreamExt or futures::stream::StreamExt or async_std::stream::StreamExt to provide a variety of convenient combinator functions on the various spawn groups.
  • To await all running child tasks to finish their execution, call wait_for_all or wait_non_async methods on the various group instances

§Warning

  • This crate relies on atomics, condition variables
  • Avoid calling long, blocking, non asynchronous functions while using any of the spawn groups because it was built with asynchrony in mind.

Structs§

DiscardingSpawnGroup
Discarding Spawn Group
ErrSpawnGroup
Err Spawn Group
SpawnGroup
Spawn Group

Enums§

Priority
Task Priority

Traits§

GetType
GetType trait implements asssociated constant for every type and this associated constant provides a metatype value that’s a type’s type value of any type that is ?Sized

Functions§

block_on
Blocks the current thread until the future is polled to finish.
with_discarding_spawn_group
Starts a scoped closure that takes a mutable DiscardingSpawnGroup instance as an argument which can execute any number of child tasks which return nothing.
with_err_spawn_group
Starts a scoped closure that takes a mutable ErrSpawnGroup instance as an argument which can execute any number of child tasks which its result values are of the type Result<ResultType, ErrorType> where ResultType can be of type and ErrorType which is any type that implements the standard Error type.
with_err_type_spawn_group
Starts a scoped closure that takes a mutable ErrSpawnGroup instance as an argument which can execute any number of child tasks which its result values are of the type Result<ResultType, ErrorType> where ResultType can be of type and ErrorType which is any type that implements the standard Error type.
with_spawn_group
Starts a scoped closure that takes a mutable SpawnGroup instance as an argument which can execute any number of child tasks which its result values are of the generic ResultType type.
with_type_spawn_group
Starts a scoped closure that takes a mutable SpawnGroup instance as an argument which can execute any number of child tasks which its result values are of the generic ResultType type.