# dag-executor
[](#license)
A production-ready **DAG (directed acyclic graph) executor** in Rust with
stateful task execution, file-based persistence and crash recovery, advanced
workflow patterns, and built-in fault tolerance and observability.
> **Status:** the core engine, persistence, fault-tolerance primitives, task
> types, tests, examples and benchmarks are implemented and green
> (`cargo test` passes). See [Scope & deviations](#scope--deviations) for how
> this differs from the original spec.
## Features
- **Concurrent execution** — bounded-parallelism worker pool over Tokio;
comfortably runs thousands of tasks per run.
- **Dependency resolution** — `petgraph`-backed graph with cycle / missing-dep /
duplicate / self-dependency validation and topological ordering.
- **Priority scheduling** — higher-priority ready tasks run first; FIFO within a
priority.
- **Stateful tasks & recovery** — every task's `TaskRecord` is persisted; a
crashed run is recovered and orphaned `Running` tasks are repaired.
- **Workflow patterns** — `BasicTask`, `StatefulTask`, `ConditionalTask`,
`LoopTask`, `EventDrivenTask`, plus `fan_out_in` / `pipeline` helpers.
- **Fault tolerance** — retry policies (fixed/linear/exponential + jitter),
a circuit breaker, a dead-letter queue, and automatic skip-cascade of
dependents when a task fails.
- **Storage** — checksummed file storage (`FileStorage`) with three selectable
durability modes — `Fast` (in-place, default, ~0.85 ms/1 KB), `Atomic`
(temp+rename so a crash never destroys the prior record, ~2 ms), and `Durable`
(atomic + `fsync`, survives power loss, ~8 ms) — plus an in-memory backend
(`MemoryStorage`) and a write-through LRU `Cache`. No external services
required.
- **Observability** — lock-free metrics collector and an optional Prometheus
exporter (`metrics` feature), plus structured `tracing` logs.
## Quick start
```rust
use std::sync::Arc;
use dag_executor::prelude::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut dag = Dag::new();
dag.add_task(Arc::new(BasicTask::new("a", |_| async { Ok(serde_json::json!(1)) })))?;
dag.add_task(Arc::new(
BasicTask::new("b", |_| async { Ok(serde_json::json!(2)) }).with_deps(["a"]),
))?;
let executor = DagExecutor::builder().persist(false).build();
let report = executor.run(dag).await?;
assert!(report.is_success());
Ok(())
}
```
## Architecture
```mermaid
flowchart TD
Dag[Dag<br/>petgraph dependency graph] -->|submitted to| Exec
subgraph Exec[DagExecutor — validates, recovers, drives the run]
direction LR
Sched[Scheduler<br/>priority queue + records]
Pool[WorkerPool<br/>semaphore-bounded<br/>retry / timeout / circuit breaker]
Store[Storage<br/>File / Memory + LRU Cache]
DLQ[DeadLetterQueue<br/>durable]
Metrics[Metrics<br/>counters + Prometheus]
end
Exec -->|owns| Sched
Exec -->|owns| Pool
Exec -->|owns| Store
Exec -->|owns| DLQ
Exec -->|owns| Metrics
Sched -->|next ready task| Pool
Pool -->|task result| Sched
Pool -->|terminal failure| DLQ
Sched -->|persist records| Store
Pool -->|emit counters| Metrics
```
The executor runs a single event loop:
1. **Validate** the graph (acyclicity, dependencies).
2. **Recover** persisted records and repair any left in `Running`.
3. Maintain a per-task **remaining-dependency counter**; when it hits zero the
task is enqueued in the `Scheduler` at its priority.
4. **Spawn** ready tasks on the `WorkerPool`, which gates concurrency with a
semaphore and wraps each task in the retry loop, optional timeout, and
circuit-breaker check.
5. On completion, **persist** the record, publish output to the shared
`Context` blackboard, and decrement dependents' counters. On terminal
failure, **dead-letter** the task and **skip-cascade** its dependents.
State lives in [`src/state`](src/state); the engine in [`src/dag`](src/dag);
task types in [`src/tasks`](src/tasks); fault-tolerance in
[`src/advanced`](src/advanced).
### Task lifecycle
```mermaid
stateDiagram-v2
[*] --> Pending
Pending --> Ready: dependencies satisfied
Pending --> Skipped: upstream failed
Ready --> Running: scheduled
Running --> Completed: success
Running --> Retrying: failure (retries left)
Retrying --> Running: backoff elapsed
Running --> Failed: retries exhausted
Failed --> DeadLettered: routed to DLQ
Ready --> Cancelled: shutdown
Running --> Cancelled: shutdown
Completed --> [*]
DeadLettered --> [*]
Skipped --> [*]
Cancelled --> [*]
```
## Usage
Run the demo binary:
```bash
cargo run -- --concurrency 64 --workers 8
```
Run the examples:
```bash
cargo run --example basic_usage
cargo run --example stateful_tasks
cargo run --example advanced_patterns
cargo run --example fault_tolerance
cargo run --release --example performance_demo
```
### Configuration
`Config` (see [`src/utils/config.rs`](src/utils/config.rs)) is loadable from
JSON (`Config::from_file`) and overridable via env vars: `DAG_MAX_CONCURRENCY`,
`DAG_STORAGE_DIR`, `DAG_MAX_ATTEMPTS`, `DAG_LOG_LEVEL`, `DAG_PERSIST_STATE`.
Set `DAG_LOG_JSON=1` for JSON logs, or `RUST_LOG` to control verbosity.
## Testing & benchmarks
```bash
cargo test # unit + integration + doctests
cargo test --test unit
cargo test --test integration workflow_tests
cargo bench # criterion benchmarks
```
Benchmarks live in [`benches/`](benches): `task_scheduling`, `throughput`,
`state_persistence`, `memory_usage`.
## Docker
```bash
docker build -f docker/Dockerfile -t dag-executor .
docker compose -f docker/docker-compose.yml up
```
The multi-stage build produces a slim runtime image; a named volume backs the
state directory.
## Troubleshooting
| `ValidationError::Cycle` | A dependency loop — inspect the named task. |
| `ValidationError::MissingDependency` | A `with_deps` id has no matching task. |
| Run hangs | An `EventDrivenTask` is waiting for an event that never fires; add `with_timeout`. |
| `ChecksumMismatch` on load | The state file was corrupted/edited; delete it to let the task re-run. |
| Tasks unexpectedly `Skipped` | An upstream dependency failed; check the dead-letter queue. |
## Scope & deviations
This implementation intentionally diverges from the original specification where
the spec would not compile or would bloat the build:
- **No HTTP server stack** (`warp`/`hyper`/`tower`): the Prometheus exporter
renders text exposition that you can serve from any handler, keeping the
dependency tree lean. Prometheus itself is behind the `metrics` feature.
- **`tracing-subscriber` is non-optional**: a Cargo feature cannot share a name
with a non-optional dependency, so only `prometheus` is feature-gated.
- **Dev-only crates** (`tempfile`, `tokio-test`, `criterion`) are in
`[dev-dependencies]`, where they belong.
- **`lru` 0.12** (not 0.7) and a `BinaryHeap`-based priority queue (instead of
`priority-queue`) — fewer, current dependencies.
- **`src/lib.rs`** is the crate root (a bare `src/mod.rs` is not a valid Rust
crate root).
## License
Licensed under either of
- Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or
<http://www.apache.org/licenses/LICENSE-2.0>)
- MIT license ([LICENSE-MIT](LICENSE-MIT) or
<http://opensource.org/licenses/MIT>)
at your option.
### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.