dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
//! Example: retries with backoff, plus dead-lettering and skip-cascade.
//!
//! Run with: `cargo run --example fault_tolerance`

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

use dag_executor::advanced::RetryPolicy;
use dag_executor::context::Context;
use dag_executor::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut dag = Dag::new();

    // A task that fails its first two attempts, then succeeds.
    let attempts = Arc::new(AtomicUsize::new(0));
    let a = attempts.clone();
    dag.add_task(Arc::new(BasicTask::new(
        "flaky",
        move |_ctx: Arc<Context>| {
            let a = a.clone();
            async move {
                let n = a.fetch_add(1, Ordering::SeqCst);
                if n < 2 {
                    println!("flaky attempt {n}: failing");
                    Err(TaskError::execution("transient error"))
                } else {
                    println!("flaky attempt {n}: success");
                    Ok(serde_json::json!("ok"))
                }
            }
        },
    )))?;

    // A task that always fails, dead-letters, and skips its dependent.
    dag.add_task(Arc::new(BasicTask::new(
        "always_fails",
        |_ctx: Arc<Context>| async { Err(TaskError::execution("permanent error")) },
    )))?;
    dag.add_task(Arc::new(
        BasicTask::new("downstream", |_ctx: Arc<Context>| async {
            Ok(serde_json::Value::Null)
        })
        .with_deps(["always_fails"]),
    ))?;

    let executor = DagExecutor::builder()
        .persist(false)
        .retry(RetryPolicy::exponential(4, Duration::from_millis(20)))
        .build();
    let report = executor.run(dag).await?;

    println!("\n--- report ---");
    for (id, rec) in &report.records {
        println!("{id:>14}: {} (attempts: {})", rec.state, rec.attempts);
    }
    println!("retries: {}", report.metrics.retries);

    println!("\ndead-letter queue:");
    for entry in executor.dead_letter().list().await? {
        println!("  {} -> {}", entry.task_id, entry.error);
    }
    Ok(())
}