DagRunner

Struct DagRunner 

Source
pub struct DagRunner { /* private fields */ }
Expand description

Build and execute a typed DAG of tasks.

A DagRunner is the main orchestrator for building and executing a directed acyclic graph of async tasks with compile-time type-safe dependencies.

§Workflow

  1. Create a new DAG with DagRunner::new
  2. Add tasks with DagRunner::add_task to get TaskBuilder builders
  3. Wire dependencies with TaskBuilder::depends_on
  4. Execute all tasks with DagRunner::run
  5. Optionally retrieve outputs with DagRunner::get

§Examples

// Task with state constructed via ::new()
struct LoadValue { value: i32 }

impl LoadValue {
    fn new(value: i32) -> Self { Self { value } }
}

#[task]
impl LoadValue {
    async fn run(&mut self) -> i32 { self.value }
}

// Unit struct - no fields needed
struct Add;

#[task]
impl Add {
    async fn run(&mut self, a: &i32, b: &i32) -> i32 { a + b }
}

let dag = DagRunner::new();

// Construct instances using ::new() pattern
let x = dag.add_task(LoadValue::new(2));
let y = dag.add_task(LoadValue::new(3));
let sum = dag.add_task(Add).depends_on((&x, &y));

dag.run(|fut| { tokio::spawn(fut); }).await.unwrap();

assert_eq!(dag.get(sum).unwrap(), 5);

Uses Mutex for interior mutability to enable the builder pattern (&self not &mut self). This allows fluent chaining of add_task() calls.

Nodes use Option to allow taking ownership during execution. Outputs are Arc-wrapped and stored separately for retrieval via get(). Arc enables efficient sharing during fanout without cloning data.

Implementations§

Source§

impl DagRunner

Source

pub fn new() -> Self

Create a new empty DAG.

§Examples
use dagx::DagRunner;

let dag = DagRunner::new();
Source

pub fn add_task<Tk>(&self, task: Tk) -> TaskBuilder<'_, Tk, Pending>
where Tk: Task + 'static, Tk::Input: 'static + Clone + ExtractInput, Tk::Output: 'static + Clone,

Add a task instance to the DAG, returning a node builder for wiring dependencies.

The returned TaskBuilder<Tk, Pending> can be used to:

§Examples
// Task with state - shows you construct with specific value
struct LoadValue {
    initial: i32,
}

impl LoadValue {
    fn new(initial: i32) -> Self {
        Self { initial }
    }
}

#[task]
impl LoadValue {
    async fn run(&mut self) -> i32 { self.initial }
}

// Task with configuration - shows you can parameterize behavior
struct AddOffset {
    offset: i32,
}

impl AddOffset {
    fn new(offset: i32) -> Self {
        Self { offset }
    }
}

#[task]
impl AddOffset {
    async fn run(&mut self, x: &i32) -> i32 { x + self.offset }
}

let dag = DagRunner::new();

// Construct task with initial value of 10
let base = dag.add_task(LoadValue::new(10));

// Construct task with offset of 1
let inc = dag.add_task(AddOffset::new(1)).depends_on(&base);

dag.run(|fut| { tokio::spawn(fut); }).await.unwrap();
assert_eq!(dag.get(&inc).unwrap(), 11);
Source

pub async fn run<S>(&self, spawner: S) -> DagResult<()>
where S: Fn(BoxFuture<'static, ()>),

Run the entire DAG to completion using the provided spawner.

This method:

  • Executes tasks in topological order (respecting dependencies)
  • Runs ready tasks with maximum parallelism (executor-limited)
  • Executes each task at most once
  • Waits for all sinks (tasks with no dependents) to complete
  • Is runtime-agnostic via the spawner function
§Parameters
  • spawner: A function that spawns futures on the async runtime. Examples:
    • Tokio: |fut| { tokio::spawn(fut); }
    • Smol: |fut| { smol::spawn(fut).detach(); }
    • Async-std: |fut| { async_std::task::spawn(fut); }
§Errors

Returns DagError::CycleDetected if the DAG contains a cycle.

§Examples
// Tuple struct
struct Value(i32);

#[task]
impl Value {
    async fn run(&mut self) -> i32 { self.0 }
}

// Unit struct
struct Add;

#[task]
impl Add {
    async fn run(&mut self, a: &i32, b: &i32) -> i32 { a + b }
}

let dag = DagRunner::new();

let a = dag.add_task(Value(1));
let b = dag.add_task(Value(2));
let sum = dag.add_task(Add).depends_on((&a, &b));

dag.run(|fut| { tokio::spawn(fut); }).await.unwrap(); // Executes all tasks
§Implementation Note

Tasks communicate via oneshot channels created fresh during run(). This eliminates Mutex contention on outputs and enables true streaming execution. Type erasure occurs only at the ExecutableNode trait boundary - channels are created with full type information and type-erased just before passing to execute_with_channels.

Source

pub fn get<T: 'static + Clone + Send + Sync, H>( &self, handle: H, ) -> DagResult<T>
where H: Into<TaskHandle<T>>,

Retrieve a task’s output after DagRunner::run.

§Behavior

All task outputs are stored after execution and can be retrieved via get().

§Errors

Returns DagError::ResultNotFound if:

  • The task hasn’t been executed yet
  • The handle is invalid
§Examples
struct Configuration {
    setting: i32,
}

impl Configuration {
    fn new(setting: i32) -> Self {
        Self { setting }
    }
}

#[task]
impl Configuration {
    async fn run(&mut self) -> i32 { self.setting }
}

let dag = DagRunner::new();

// Construct task with specific setting value
let task = dag.add_task(Configuration::new(42));

dag.run(|fut| { tokio::spawn(fut); }).await.unwrap();

assert_eq!(dag.get(task).unwrap(), 42);

Trait Implementations§

Source§

impl Default for DagRunner

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.