dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation

dag-executor

License: MIT OR Apache-2.0

A production-ready DAG (directed acyclic graph) executor in Rust with stateful task execution, file-based persistence and crash recovery, advanced workflow patterns, and built-in fault tolerance and observability.

Status: the core engine, persistence, fault-tolerance primitives, task types, tests, examples and benchmarks are implemented and green (cargo test passes). See Scope & deviations for how this differs from the original spec.

Features

  • Concurrent execution — bounded-parallelism worker pool over Tokio; comfortably runs thousands of tasks per run.
  • Dependency resolutionpetgraph-backed graph with cycle / missing-dep / duplicate / self-dependency validation and topological ordering.
  • Priority scheduling — higher-priority ready tasks run first; FIFO within a priority.
  • Stateful tasks & recovery — every task's TaskRecord is persisted; a crashed run is recovered and orphaned Running tasks are repaired.
  • Workflow patternsBasicTask, StatefulTask, ConditionalTask, LoopTask, EventDrivenTask, plus fan_out_in / pipeline helpers.
  • Fault tolerance — retry policies (fixed/linear/exponential + jitter), a circuit breaker, a dead-letter queue, and automatic skip-cascade of dependents when a task fails.
  • Storage — checksummed file storage (FileStorage) with three selectable durability modes — Fast (in-place, default, ~0.85 ms/1 KB), Atomic (temp+rename so a crash never destroys the prior record, ~2 ms), and Durable (atomic + fsync, survives power loss, ~8 ms) — plus an in-memory backend (MemoryStorage) and a write-through LRU Cache. No external services required.
  • Observability — lock-free metrics collector and an optional Prometheus exporter (metrics feature), plus structured tracing logs.

Quick start

use std::sync::Arc;
use dag_executor::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut dag = Dag::new();
    dag.add_task(Arc::new(BasicTask::new("a", |_| async { Ok(serde_json::json!(1)) })))?;
    dag.add_task(Arc::new(
        BasicTask::new("b", |_| async { Ok(serde_json::json!(2)) }).with_deps(["a"]),
    ))?;

    let executor = DagExecutor::builder().persist(false).build();
    let report = executor.run(dag).await?;
    assert!(report.is_success());
    Ok(())
}

Architecture

flowchart TD
    Dag[Dag<br/>petgraph dependency graph] -->|submitted to| Exec

    subgraph Exec[DagExecutor — validates, recovers, drives the run]
        direction LR
        Sched[Scheduler<br/>priority queue + records]
        Pool[WorkerPool<br/>semaphore-bounded<br/>retry / timeout / circuit breaker]
        Store[Storage<br/>File / Memory + LRU Cache]
        DLQ[DeadLetterQueue<br/>durable]
        Metrics[Metrics<br/>counters + Prometheus]
    end

    Exec -->|owns| Sched
    Exec -->|owns| Pool
    Exec -->|owns| Store
    Exec -->|owns| DLQ
    Exec -->|owns| Metrics

    Sched -->|next ready task| Pool
    Pool -->|task result| Sched
    Pool -->|terminal failure| DLQ
    Sched -->|persist records| Store
    Pool -->|emit counters| Metrics

The executor runs a single event loop:

  1. Validate the graph (acyclicity, dependencies).
  2. Recover persisted records and repair any left in Running.
  3. Maintain a per-task remaining-dependency counter; when it hits zero the task is enqueued in the Scheduler at its priority.
  4. Spawn ready tasks on the WorkerPool, which gates concurrency with a semaphore and wraps each task in the retry loop, optional timeout, and circuit-breaker check.
  5. On completion, persist the record, publish output to the shared Context blackboard, and decrement dependents' counters. On terminal failure, dead-letter the task and skip-cascade its dependents.

State lives in src/state; the engine in src/dag; task types in src/tasks; fault-tolerance in src/advanced.

Task lifecycle

stateDiagram-v2
    [*] --> Pending
    Pending --> Ready: dependencies satisfied
    Pending --> Skipped: upstream failed
    Ready --> Running: scheduled
    Running --> Completed: success
    Running --> Retrying: failure (retries left)
    Retrying --> Running: backoff elapsed
    Running --> Failed: retries exhausted
    Failed --> DeadLettered: routed to DLQ
    Ready --> Cancelled: shutdown
    Running --> Cancelled: shutdown
    Completed --> [*]
    DeadLettered --> [*]
    Skipped --> [*]
    Cancelled --> [*]

Usage

Run the demo binary:

cargo run -- --concurrency 64 --workers 8

Run the examples:

cargo run --example basic_usage
cargo run --example stateful_tasks
cargo run --example advanced_patterns
cargo run --example fault_tolerance
cargo run --release --example performance_demo

Configuration

Config (see src/utils/config.rs) is loadable from JSON (Config::from_file) and overridable via env vars: DAG_MAX_CONCURRENCY, DAG_STORAGE_DIR, DAG_MAX_ATTEMPTS, DAG_LOG_LEVEL, DAG_PERSIST_STATE. Set DAG_LOG_JSON=1 for JSON logs, or RUST_LOG to control verbosity.

Testing & benchmarks

cargo test                 # unit + integration + doctests
cargo test --test unit
cargo test --test integration workflow_tests
cargo bench                # criterion benchmarks

Benchmarks live in benches/: task_scheduling, throughput, state_persistence, memory_usage.

Docker

docker build -f docker/Dockerfile -t dag-executor .
docker compose -f docker/docker-compose.yml up

The multi-stage build produces a slim runtime image; a named volume backs the state directory.

Troubleshooting

Symptom Likely cause / fix
ValidationError::Cycle A dependency loop — inspect the named task.
ValidationError::MissingDependency A with_deps id has no matching task.
Run hangs An EventDrivenTask is waiting for an event that never fires; add with_timeout.
ChecksumMismatch on load The state file was corrupted/edited; delete it to let the task re-run.
Tasks unexpectedly Skipped An upstream dependency failed; check the dead-letter queue.

Scope & deviations

This implementation intentionally diverges from the original specification where the spec would not compile or would bloat the build:

  • No HTTP server stack (warp/hyper/tower): the Prometheus exporter renders text exposition that you can serve from any handler, keeping the dependency tree lean. Prometheus itself is behind the metrics feature.
  • tracing-subscriber is non-optional: a Cargo feature cannot share a name with a non-optional dependency, so only prometheus is feature-gated.
  • Dev-only crates (tempfile, tokio-test, criterion) are in [dev-dependencies], where they belong.
  • lru 0.12 (not 0.7) and a BinaryHeap-based priority queue (instead of priority-queue) — fewer, current dependencies.
  • src/lib.rs is the crate root (a bare src/mod.rs is not a valid Rust crate root).

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.