Expand description
Async DAG Task Runner
A minimal, type-safe, runtime-agnostic DAG (Directed Acyclic Graph) executor with compile-time dependency validation and optimal parallel execution.
§Features
- Compile-time type safety: Dependencies are validated at compile time through the type system. The public API is fully type-safe with no runtime type errors. Internal execution uses type erasure for heterogeneous task storage, but this is never exposed to users.
- Runtime-agnostic: Works with any async runtime (Tokio, async-std, smol, Embassy, etc.)
- Type-state pattern: The API guides you with compile-time errors if you wire dependencies incorrectly
- Optimal execution: Topological scheduling with maximum safe parallelism
- Zero-cost abstractions: Leverages generics and monomorphization for minimal overhead
- Error handling: Result-based error handling with
DagResult<T>for cycle detection and validation
§Quick Start
use dagx::{task, DagRunner, Task};
// Source task with read-only state (tuple struct)
struct Value(i32);
#[task]
impl Value {
async fn run(&self) -> i32 { self.0 } // Read-only: use &self
}
// Stateless task (unit struct)
struct Add;
#[task]
impl Add {
async fn run(a: &i32, b: &i32) -> i32 { a + b } // No self needed!
}
let dag = DagRunner::new();
// Add source tasks
let x = dag.add_task(Value(2));
let y = dag.add_task(Value(3));
// Add task with dependencies
let sum = dag.add_task(Add).depends_on((&x, &y));
// Execute and retrieve results
dag.run(|fut| { tokio::spawn(fut); }).await.unwrap();
assert_eq!(dag.get(sum).unwrap(), 5);§Implementation Notes
§Inline Execution Fast-Path
dagx automatically optimizes execution for sequential workloads using an inline fast-path. When a layer contains only a single task (common in deep chains and linear pipelines), that task executes inline rather than being spawned. This eliminates spawning overhead, context switching, and channel creation.
Panic handling: To maintain consistent behavior between inline and spawned execution,
panics in inline tasks are caught using FutureExt::catch_unwind() and converted to
DagError::TaskPanicked. This matches the behavior of async runtimes like Tokio, async-std,
and smol, which catch panics in spawned tasks and convert them to errors.
What this means for you:
- Tasks behave identically whether executed inline or spawned
- Panics become errors regardless of execution path
- Sequential workloads see 10-100x performance improvements
- The optimization is completely transparent - no code changes needed
When inline execution happens:
- Single-task layers (e.g., long sequential chains)
- Linear pipelines (A→B→C→D)
- Any layer where
layer.len() == 1after topological ordering
§Dependency Limits
dagx supports up to 8 dependencies per task. If you need more than 8 dependencies:
- Group related inputs into a struct
- Use intermediate aggregation tasks
- Consider if 8+ dependencies indicates a design issue
§Task Output Limitations
IMPORTANT: Tasks cannot return bare tuples as output types. This is a technical limitation of the current implementation. If you need to return multiple values, use one of these workarounds:
§Workaround 1: Wrap in Result (Recommended for Fallible Operations)
use dagx::{task, Task};
struct ProcessData;
#[task]
impl ProcessData {
async fn run(input: &String) -> Result<(String, i32, bool), String> {
// Wrap the tuple in Result - this works!
Ok(("Alice".to_string(), 30, true))
}
}§Workaround 2: Use a Struct (Recommended for Most Cases)
use dagx::{task, Task};
// Define a struct to hold your multiple values
struct UserData {
name: String,
age: i32,
active: bool,
}
struct ProcessDataStruct;
#[task]
impl ProcessDataStruct {
async fn run(input: &String) -> UserData {
// Return the struct - clean and self-documenting
UserData {
name: "Alice".to_string(),
age: 30,
active: true,
}
}
}§Why Use Structs Over Result<(...)>?
While both workarounds solve the technical limitation, structs are the better choice for most cases:
- Self-documenting:
user.nameis clearer thanuser.0 - Refactorable: Easy to add/remove fields without breaking all dependencies
- Type-safe: Named fields prevent field order mistakes
- Semantic clarity:
Resultshould indicate fallibility, not just wrap data
Use Result<(...), E> only when your operation can genuinely fail and you need to
return multiple values on success
§Core Concepts
§Task
A Task is a unit of async work with typed inputs and outputs. Use the #[task] macro.
§Task Patterns
dagx supports three task patterns based on state requirements:
1. Stateless - No state (unit struct, no self parameter):
use dagx::{task, Task};
struct Add;
#[task]
impl Add {
async fn run(a: &i32, b: &i32) -> i32 { a + b }
}2. Read-only state - Immutable access (use &self):
use dagx::{task, Task};
struct Scale(i32);
#[task]
impl Scale {
async fn run(&self, input: &i32) -> i32 {
input * self.0 // Read-only access
}
}3. Mutable state - State modification (use &mut self):
use dagx::{task, Task};
struct Counter(i32);
#[task]
impl Counter {
async fn run(&mut self, input: &i32) -> i32 {
self.0 += input; // Modifies state
self.0
}
}§DagRunner
The DagRunner orchestrates task execution. Add tasks with DagRunner::add_task,
wire dependencies with TaskBuilder::depends_on, then run everything with DagRunner::run.
§TaskHandle
A TaskHandle<T> is a typed, opaque reference to a task’s output. Use it to:
- Wire dependencies between tasks
- Retrieve results after execution with
DagRunner::get
§Dependency Patterns
dagx supports three dependency patterns:
§No Dependencies (Source Tasks)
Tasks with no dependencies don’t call depends_on():
let source = dag.add_task(Value(42));§Single Dependency
Tasks with a single dependency receive a reference to that value:
struct Double;
#[task]
impl Double {
async fn run(input: &i32) -> i32 { input * 2 }
}
let doubled = dag.add_task(Double).depends_on(&source);§Multiple Dependencies
Tasks with multiple dependencies receive separate reference parameters (order matters!):
struct Add;
#[task]
impl Add {
async fn run(a: &i32, b: &i32) -> i32 { a + b }
}
let sum = dag.add_task(Add).depends_on((&x, &y));§Examples
§Fan-out Pattern (1 → n)
One task produces a value consumed by multiple downstream tasks:
use dagx::{task, DagRunner, Task};
// Source task (tuple struct)
struct Value(i32);
#[task]
impl Value {
async fn run(&self) -> i32 { self.0 }
}
// Stateful task (tuple struct)
struct Add(i32);
#[task]
impl Add {
async fn run(&self, input: &i32) -> i32 { input + self.0 }
}
// Stateful task (tuple struct)
struct Scale(i32);
#[task]
impl Scale {
async fn run(&self, input: &i32) -> i32 { input * self.0 }
}
let dag = DagRunner::new();
let base = dag.add_task(Value(10));
let plus1 = dag.add_task(Add(1)).depends_on(&base);
let times2 = dag.add_task(Scale(2)).depends_on(&base);
dag.run(|fut| { tokio::spawn(fut); }).await.unwrap();
assert_eq!(dag.get(plus1).unwrap(), 11);
assert_eq!(dag.get(times2).unwrap(), 20);§Fan-in Pattern (m → 1)
Multiple tasks produce values consumed by a single downstream task:
use dagx::{task, DagRunner, Task};
// Source task for String (tuple struct)
struct Name(String);
#[task]
impl Name {
async fn run(&self) -> String { self.0.clone() }
}
// Source task for i32 (tuple struct)
struct Age(i32);
#[task]
impl Age {
async fn run(&self) -> i32 { self.0 }
}
// Source task for bool (tuple struct)
struct Active(bool);
#[task]
impl Active {
async fn run(&self) -> bool { self.0 }
}
// Stateless formatter (unit struct)
struct FormatUser;
#[task]
impl FormatUser {
async fn run(n: &String, a: &i32, f: &bool) -> String {
format!("User: {n}, Age: {a}, Active: {f}")
}
}
let dag = DagRunner::new();
let name = dag.add_task(Name("Alice".to_string()));
let age = dag.add_task(Age(30));
let active = dag.add_task(Active(true));
let result = dag.add_task(FormatUser).depends_on((&name, &age, &active));
dag.run(|fut| { tokio::spawn(fut); }).await.unwrap();
assert_eq!(dag.get(result).unwrap(), "User: Alice, Age: 30, Active: true");§Many-to-Many Pattern (m ↔ n)
Complex DAGs with multiple layers and dependencies:
use dagx::{task, DagRunner, Task};
// Source task (tuple struct)
struct Value(i32);
#[task]
impl Value {
async fn run(&self) -> i32 { self.0 }
}
// Stateless addition (unit struct)
struct Add;
#[task]
impl Add {
async fn run(a: &i32, b: &i32) -> i32 { a + b }
}
// Stateless multiplication (unit struct)
struct Multiply;
#[task]
impl Multiply {
async fn run(a: &i32, b: &i32) -> i32 { a * b }
}
let dag = DagRunner::new();
// Layer 1: Sources
let x = dag.add_task(Value(2));
let y = dag.add_task(Value(3));
let z = dag.add_task(Value(5));
// Layer 2: Intermediate computations
let sum_xy = dag.add_task(Add).depends_on((&x, &y)); // 2 + 3 = 5
let prod_yz = dag.add_task(Multiply).depends_on((&y, &z)); // 3 * 5 = 15
// Layer 3: Final result
let total = dag.add_task(Add).depends_on((&sum_xy, &prod_yz)); // 5 + 15 = 20
dag.run(|fut| { tokio::spawn(fut); }).await.unwrap();
assert_eq!(dag.get(total).unwrap(), 20);§Runtime Agnostic
dagx works with any async runtime. The library has been tested with:
- Tokio - Most popular async runtime
- async-std - Alternative async runtime
- smol - Lightweight async runtime
Examples with different runtimes:
// With Tokio
#[tokio::main]
async fn main() {
let dag = DagRunner::new();
// ... build and run DAG
}
// With async-std
#[async_std::main]
async fn main() {
let dag = DagRunner::new();
// ... build and run DAG
}
// With smol
fn main() {
smol::block_on(async {
let dag = DagRunner::new();
// ... build and run DAG
});
}§Error Handling
dagx uses DagResult<T> (an alias for Result<T, DagError>) for operations that
can fail:
DagRunner::runreturnsDagResult<()>and can fail if a cycle is detectedDagRunner::getreturnsDagResult<T>and can fail if the task hasn’t executed or the handle is invalid
You can handle errors explicitly or use .unwrap() for simple cases:
// Simple approach with .unwrap()
dag.run(|fut| { tokio::spawn(fut); }).await.unwrap();
let result = dag.get(node).unwrap();
// Or handle errors explicitly
match dag.run(|fut| { tokio::spawn(fut); }).await {
Ok(_) => println!("DAG executed successfully"),
Err(e) => eprintln!("DAG execution failed: {}", e),
}§Documentation Accuracy
This library maintains documentation accuracy through:
- Doctests: All code examples are tested via
cargo test --doc - Examples: All files in
examples/are tested viacargo build --examples - Type signatures: Documentation reflects actual API return types
- Error handling: All examples show proper
DagResult<T>handling - Safety claims: All safety guarantees are verified by tests
Last audited: 2025-10-06 for v0.1.0
Changes that require documentation updates:
- API signature changes (
run(),get(), etc.) - New error types or error handling changes
- Safety guarantee changes
- Example code updates
§API Design Principles
dagx follows these API design principles:
- Type Safety: Dependencies validated at compile time via type-state pattern
- Builder Pattern: Fluent interface with
add_task().depends_on() - Error Handling: All fallible operations return
DagResult<T> - Minimal Surface: Small, focused API with 4 core types and 1 trait
- Zero Cost: Leverages generics for monomorphization
- Consistency:
- All types are PascalCase
- All methods are snake_case
- Mutable operations take
&mut self - Builder methods consume
self - Results are always
DagResult<T>
§Performance Characteristics
dagx is designed for minimal overhead and optimal parallel execution.
§Benchmarks
Run benchmarks with:
cargo benchView the detailed HTML reports:
# macOS
open target/criterion/report/index.html
# Linux
xdg-open target/criterion/report/index.html
# Windows
start target/criterion/report/index.html
# Or manually open target/criterion/report/index.html in your browserTypical performance on modern hardware (AMD 7840U - Zen 4 laptop CPU):
- DAG creation: ~20ns empty DAG, ~96ns per task added
- Execution overhead: ~119ns per task (framework coordination only)
- Total overhead (construction + execution): ~223ns per task
- Scaling: Linear overhead with task count - 100 tasks in ~123µs, 1000 tasks in ~1.15ms
For real-world workloads where tasks perform meaningful work (I/O, computation), the framework overhead is typically well under 1% of total execution time.
§Performance Tips
§Automatic Arc Wrapping (No Manual Arc Needed!)
Task outputs are automatically wrapped in Arc<T> internally for efficient fan-out patterns.
You output T, the framework handles the Arc wrapping:
use dagx::{task, DagRunner, Task};
// ✅ CORRECT: Just output Vec<String>, framework wraps in Arc internally
struct FetchData;
#[task]
impl FetchData {
async fn run() -> Vec<String> {
vec!["data".to_string(); 10_000]
}
}
// Downstream tasks receive &Vec<String> as normal
// Behind the scenes: Arc<Vec<String>> is cloned cheaply, then inner Vec extracted
struct ProcessData;
#[task]
impl ProcessData {
async fn run(data: &Vec<String>) -> usize {
data.len()
}
}
let dag = DagRunner::new();
let data = dag.add_task(FetchData);
// All three tasks get efficient Arc-wrapped sharing automatically
let task1 = dag.add_task(ProcessData).depends_on(&data);
let task2 = dag.add_task(ProcessData).depends_on(&data);
let task3 = dag.add_task(ProcessData).depends_on(&data);
dag.run(|fut| { tokio::spawn(fut); }).await.unwrap();How it works:
- Your task outputs
T(e.g.,Vec<String>) - Framework wraps it in
Arc<T>internally - For fan-out (1→N), Arc is cloned N times (just atomic pointer increments - O(1))
- Each downstream task receives
&Tafter extracting from Arc - When you call
dag.get(), the Arc is cloned and inner value extracted
Performance characteristics:
- Heap types (Vec, String, HashMap): Arc overhead is negligible (~few ns)
- Copy types (i32, usize): Small Arc overhead but usually still negligible
- See
cargo benchfor actual measurements on your hardware
Advanced - Zero-copy optimization:
If you want true zero-copy sharing (no extraction), output Arc<T> explicitly:
struct ZeroCopyData;
#[task]
impl ZeroCopyData {
// Output Arc<T> directly
async fn run() -> Arc<Vec<String>> {
Arc::new(vec!["data".to_string(); 10_000])
}
}
struct ZeroCopyProcess;
#[task]
impl ZeroCopyProcess {
// Receive &Arc<T> - just clones the Arc pointer, no data extraction
async fn run(data: &Arc<Vec<String>>) -> usize {
data.len()
}
}This becomes Arc<Arc<T>> internally, but ExtractInput unwraps one layer automatically.
Use this when you have many dependents AND want to avoid the clone() of inner data
§Other Tips
- Minimize task count: Combine small operations into larger tasks
- Use appropriate granularity: Don’t create tasks for trivial work (< 1µs)
- Parallel execution: dagx automatically maximizes parallelism
Run cargo bench to see comprehensive benchmarks including basic operations, scaling
characteristics, common patterns (fan-out, diamond), and realistic workloads.
Structs§
- DagRunner
- Build and execute a typed DAG of tasks.
- Pending
- Type-level marker for empty dependency list.
- Task
Builder - Node builder that tracks dependency completion via type state.
- Task
Handle - Opaque, typed token for a node’s output.
Enums§
- DagError
- Errors that can occur during DAG construction and execution
Traits§
- Task
- A unit of async work with typed inputs and outputs.
Type Aliases§
- DagResult
- Result type for DAG operations
Attribute Macros§
- task
- Attribute macro to automatically implement the
Tasktrait.