dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Example: a stateful task that checkpoints a running counter to storage.
//!
//! Run with: `cargo run --example stateful_tasks`

use std::sync::Arc;

use dag_executor::context::Context;
use dag_executor::prelude::*;
use dag_executor::storage::{FileStorage, Storage};
use dag_executor::tasks::{StatefulTask, StepResult};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let dir = tempfile::tempdir()?;
    let storage: Arc<dyn Storage> = Arc::new(FileStorage::open(dir.path())?);

    let mut dag = Dag::new();
    dag.add_task(Arc::new(StatefulTask::new(
        "accumulate",
        storage.clone(),
        |_ctx: Arc<Context>, prev| async move {
            // `prev` is the last persisted checkpoint (null on first run).
            let count = prev.get("count").and_then(|v| v.as_u64()).unwrap_or(0) + 1;
            let state = serde_json::json!({ "count": count });
            println!("checkpoint -> count = {count}");
            Ok(StepResult::new(state.clone(), state))
        },
    )))?;

    // A new run each time picks up where the last left off, because the
    // checkpoint is persisted under `state:accumulate`.
    let executor = DagExecutor::builder()
        .storage(storage.clone())
        .persist(true)
        .build();

    for _ in 0..3 {
        let mut dag2 = Dag::new();
        dag2.add_task(Arc::new(StatefulTask::new(
            "accumulate",
            storage.clone(),
            |_ctx: Arc<Context>, prev| async move {
                let count = prev.get("count").and_then(|v| v.as_u64()).unwrap_or(0) + 1;
                let state = serde_json::json!({ "count": count });
                Ok(StepResult::new(state.clone(), state))
            },
        )))?;
        executor.run(dag2).await?;
    }
    let _ = dag;

    let final_state = storage.load("state:accumulate").await?;
    println!("final persisted state: {final_state:?}");
    Ok(())
}