pub struct DagRunner { /* private fields */ }Expand description
Build and execute a typed DAG of tasks.
A DagRunner is the main orchestrator for building and executing a directed acyclic graph
of async tasks with compile-time type-safe dependencies.
§Workflow
- Create a new DAG with
DagRunner::new - Add tasks with
DagRunner::add_taskto getTaskBuilderbuilders - Wire dependencies with
TaskBuilder::depends_on - Execute all tasks with
DagRunner::run - Optionally retrieve outputs with
DagRunner::get
§Examples
// Task with state constructed via ::new()
struct LoadValue { value: i32 }
impl LoadValue {
fn new(value: i32) -> Self { Self { value } }
}
#[task]
impl LoadValue {
async fn run(&mut self) -> i32 { self.value }
}
// Unit struct - no fields needed
struct Add;
#[task]
impl Add {
async fn run(&mut self, a: &i32, b: &i32) -> i32 { a + b }
}
let dag = DagRunner::new();
// Construct instances using ::new() pattern
let x = dag.add_task(LoadValue::new(2));
let y = dag.add_task(LoadValue::new(3));
let sum = dag.add_task(Add).depends_on((&x, &y));
dag.run(|fut| { tokio::spawn(fut); }).await.unwrap();
assert_eq!(dag.get(sum).unwrap(), 5);Uses Mutex for interior mutability to enable the builder pattern (&self not &mut self).
This allows fluent chaining of add_task() calls.
Nodes use Option to allow taking ownership during execution. Outputs are Arc-wrapped and stored separately for retrieval via get(). Arc enables efficient sharing during fanout without cloning data.
Implementations§
Source§impl DagRunner
impl DagRunner
Sourcepub fn add_task<Tk>(&self, task: Tk) -> TaskBuilder<'_, Tk, Pending>
pub fn add_task<Tk>(&self, task: Tk) -> TaskBuilder<'_, Tk, Pending>
Add a task instance to the DAG, returning a node builder for wiring dependencies.
The returned TaskBuilder<Tk, Pending> can be used to:
- Specify dependencies via
TaskBuilder::depends_on - Used directly as a
TaskHandleto the task’s output
§Examples
// Task with state - shows you construct with specific value
struct LoadValue {
initial: i32,
}
impl LoadValue {
fn new(initial: i32) -> Self {
Self { initial }
}
}
#[task]
impl LoadValue {
async fn run(&mut self) -> i32 { self.initial }
}
// Task with configuration - shows you can parameterize behavior
struct AddOffset {
offset: i32,
}
impl AddOffset {
fn new(offset: i32) -> Self {
Self { offset }
}
}
#[task]
impl AddOffset {
async fn run(&mut self, x: &i32) -> i32 { x + self.offset }
}
let dag = DagRunner::new();
// Construct task with initial value of 10
let base = dag.add_task(LoadValue::new(10));
// Construct task with offset of 1
let inc = dag.add_task(AddOffset::new(1)).depends_on(&base);
dag.run(|fut| { tokio::spawn(fut); }).await.unwrap();
assert_eq!(dag.get(&inc).unwrap(), 11);Sourcepub async fn run<S>(&self, spawner: S) -> DagResult<()>
pub async fn run<S>(&self, spawner: S) -> DagResult<()>
Run the entire DAG to completion using the provided spawner.
This method:
- Executes tasks in topological order (respecting dependencies)
- Runs ready tasks with maximum parallelism (executor-limited)
- Executes each task at most once
- Waits for all sinks (tasks with no dependents) to complete
- Is runtime-agnostic via the spawner function
§Parameters
spawner: A function that spawns futures on the async runtime. Examples:- Tokio:
|fut| { tokio::spawn(fut); } - Smol:
|fut| { smol::spawn(fut).detach(); } - Async-std:
|fut| { async_std::task::spawn(fut); }
- Tokio:
§Errors
Returns DagError::CycleDetected if the DAG contains a cycle.
§Examples
// Tuple struct
struct Value(i32);
#[task]
impl Value {
async fn run(&mut self) -> i32 { self.0 }
}
// Unit struct
struct Add;
#[task]
impl Add {
async fn run(&mut self, a: &i32, b: &i32) -> i32 { a + b }
}
let dag = DagRunner::new();
let a = dag.add_task(Value(1));
let b = dag.add_task(Value(2));
let sum = dag.add_task(Add).depends_on((&a, &b));
dag.run(|fut| { tokio::spawn(fut); }).await.unwrap(); // Executes all tasks§Implementation Note
Tasks communicate via oneshot channels created fresh during run(). This eliminates Mutex contention on outputs and enables true streaming execution. Type erasure occurs only at the ExecutableNode trait boundary - channels are created with full type information and type-erased just before passing to execute_with_channels.
Sourcepub fn get<T: 'static + Clone + Send + Sync, H>(
&self,
handle: H,
) -> DagResult<T>where
H: Into<TaskHandle<T>>,
pub fn get<T: 'static + Clone + Send + Sync, H>(
&self,
handle: H,
) -> DagResult<T>where
H: Into<TaskHandle<T>>,
Retrieve a task’s output after DagRunner::run.
§Behavior
All task outputs are stored after execution and can be retrieved via get().
§Errors
Returns DagError::ResultNotFound if:
- The task hasn’t been executed yet
- The handle is invalid
§Examples
struct Configuration {
setting: i32,
}
impl Configuration {
fn new(setting: i32) -> Self {
Self { setting }
}
}
#[task]
impl Configuration {
async fn run(&mut self) -> i32 { self.setting }
}
let dag = DagRunner::new();
// Construct task with specific setting value
let task = dag.add_task(Configuration::new(42));
dag.run(|fut| { tokio::spawn(fut); }).await.unwrap();
assert_eq!(dag.get(task).unwrap(), 42);