altair-concurrent 0.1.6

Type-safe parallel execution of named async tasks with cancellation and per-task tracing
Documentation

altair-concurrent

Type-safe parallel execution of named async tasks with cancellation, timeout, and per-task tracing.

Part of the altair-rs workspace.

Add to your project

cargo add altair-concurrent

tokio and tokio-util types you need are re-exported — you do not have to add them yourself.

Quick start — parallel fetch with timeout

use altair_concurrent::prelude::*;
use std::time::Duration;

#[tokio::main]
async fn main() -> altair_concurrent::Result<()> {
    let tasks: TaskMap<String> = TaskMap::new()
        .insert("fetch_user", |_| async { Ok::<_, std::io::Error>("alice".into()) })
        .insert("fetch_orders", |_| async { Ok::<_, std::io::Error>("3 open".into()) });

    let results = execute_concurrently(tasks)
        .with_timeout(Duration::from_secs(5))
        .await?;

    println!("user: {}", results["fetch_user"]);
    println!("orders: {}", results["fetch_orders"]);
    Ok(())
}

results is a HashMap<&'static str, T> — look tasks up by the name you gave them at insert.

Fail-fast (default) — first error cancels the rest

use altair_concurrent::prelude::*;

# async fn run() -> altair_concurrent::Result<()> {
let tasks: TaskMap<u32> = TaskMap::new()
    .insert("ok", |_| async { Ok::<_, std::io::Error>(1) })
    .insert("bad", |_| async {
        Err::<u32, _>(std::io::Error::other("nope"))
    });

match execute_concurrently(tasks).await {
    Ok(map) => println!("all succeeded: {map:?}"),
    Err(Error::TaskFailed { name, source }) => {
        eprintln!("task '{name}' failed: {source}");
        // remaining tasks already cancelled
    }
    Err(e) => eprintln!("infrastructure error: {e}"),
}
# Ok(()) }

Partial results — run everything, inspect each task

When you want every task to run regardless of failures and see each one's outcome:

use altair_concurrent::prelude::*;

# async fn run() -> altair_concurrent::Result<()> {
let tasks: TaskMap<u32> = TaskMap::new()
    .insert("a", |_| async { Ok::<_, std::io::Error>(1) })
    .insert("b", |_| async { Err::<u32, _>(std::io::Error::other("transient")) })
    .insert("c", |_| async { Ok::<_, std::io::Error>(3) });

// PartialResults<u32> = HashMap<&'static str, Result<u32, BoxedError>>
let results: PartialResults<u32> = execute_concurrently(tasks)
    .with_partial_results()
    .await?;

for (name, outcome) in &results {
    match outcome {
        Ok(v) => println!("{name} -> {v}"),
        Err(e) => eprintln!("{name} -> failed: {e}"),
    }
}
# Ok(()) }

Only infrastructure errors (Timeout / Cancelled / Join) bubble up through the outer Result; task failures live in the inner map values.

External cancellation

use altair_concurrent::prelude::*;
use std::time::Duration;

# async fn run() -> altair_concurrent::Result<()> {
let cancel = CancellationToken::new();
let tasks: TaskMap<()> = TaskMap::new().insert("worker", |ct| async move {
    // Tasks receive the same token — make them cooperative.
    tokio::select! {
        () = ct.cancelled() => Err::<(), _>(std::io::Error::other("cancelled")),
        () = tokio::time::sleep(Duration::from_secs(30)) => Ok(()),
    }
});

let handle = tokio::spawn({
    let cancel = cancel.clone();
    async move { execute_concurrently(tasks).with_cancellation(cancel).await }
});

// Some condition triggers cancel from elsewhere (signal handler, timeout, etc.)
tokio::time::sleep(Duration::from_millis(50)).await;
cancel.cancel();

let _ = handle.await;
# Ok(()) }

Features

  • Named tasksHashMap<&'static str, T> results, not positional tuples
  • Tracing — each task runs inside tracing::info_span!("concurrent.task", task.name = ...); aggregate batch span shows task_count
  • Cancellation — pass a CancellationToken via .with_cancellation(...); tasks receive their own clone so they can cooperate
  • Timeout.with_timeout(Duration); expiry cancels remaining tasks and returns Error::Timeout
  • Fail-fast or partialExecutor (default) returns the first error; PartialExecutor (via .with_partial_results()) returns every task's outcome

Constraints

  • All tasks in a single batch must return the same Result<T, E>. For heterogeneous batches, use tokio::join! directly.
  • Built on tokio::task::JoinSet; tokio is the only supported runtime.

License

MIT