dag-executor
A production-ready DAG (directed acyclic graph) executor in Rust with stateful task execution, file-based persistence and crash recovery, advanced workflow patterns, and built-in fault tolerance and observability.
Status: the core engine, persistence, fault-tolerance primitives, task types, tests, examples and benchmarks are implemented and green (
cargo testpasses). See Scope & deviations for how this differs from the original spec.
Features
- Concurrent execution — bounded-parallelism worker pool over Tokio; comfortably runs thousands of tasks per run.
- Dependency resolution —
petgraph-backed graph with cycle / missing-dep / duplicate / self-dependency validation and topological ordering. - Priority scheduling — higher-priority ready tasks run first; FIFO within a priority.
- Stateful tasks & recovery — every task's
TaskRecordis persisted; a crashed run is recovered and orphanedRunningtasks are repaired. - Workflow patterns —
BasicTask,StatefulTask,ConditionalTask,LoopTask,EventDrivenTask, plusfan_out_in/pipelinehelpers. - Fault tolerance — retry policies (fixed/linear/exponential + jitter), a circuit breaker, a dead-letter queue, and automatic skip-cascade of dependents when a task fails.
- Storage — checksummed file storage (
FileStorage) with three selectable durability modes —Fast(in-place, default, ~0.85 ms/1 KB),Atomic(temp+rename so a crash never destroys the prior record, ~2 ms), andDurable(atomic +fsync, survives power loss, ~8 ms) — plus an in-memory backend (MemoryStorage) and a write-through LRUCache. No external services required. - Observability — lock-free metrics collector and an optional Prometheus
exporter (
metricsfeature), plus structuredtracinglogs.
Quick start
use Arc;
use *;
async
Architecture
flowchart TD
Dag[Dag<br/>petgraph dependency graph] -->|submitted to| Exec
subgraph Exec[DagExecutor — validates, recovers, drives the run]
direction LR
Sched[Scheduler<br/>priority queue + records]
Pool[WorkerPool<br/>semaphore-bounded<br/>retry / timeout / circuit breaker]
Store[Storage<br/>File / Memory + LRU Cache]
DLQ[DeadLetterQueue<br/>durable]
Metrics[Metrics<br/>counters + Prometheus]
end
Exec -->|owns| Sched
Exec -->|owns| Pool
Exec -->|owns| Store
Exec -->|owns| DLQ
Exec -->|owns| Metrics
Sched -->|next ready task| Pool
Pool -->|task result| Sched
Pool -->|terminal failure| DLQ
Sched -->|persist records| Store
Pool -->|emit counters| Metrics
The executor runs a single event loop:
- Validate the graph (acyclicity, dependencies).
- Recover persisted records and repair any left in
Running. - Maintain a per-task remaining-dependency counter; when it hits zero the
task is enqueued in the
Schedulerat its priority. - Spawn ready tasks on the
WorkerPool, which gates concurrency with a semaphore and wraps each task in the retry loop, optional timeout, and circuit-breaker check. - On completion, persist the record, publish output to the shared
Contextblackboard, and decrement dependents' counters. On terminal failure, dead-letter the task and skip-cascade its dependents.
State lives in src/state; the engine in src/dag;
task types in src/tasks; fault-tolerance in
src/advanced.
Task lifecycle
stateDiagram-v2
[*] --> Pending
Pending --> Ready: dependencies satisfied
Pending --> Skipped: upstream failed
Ready --> Running: scheduled
Running --> Completed: success
Running --> Retrying: failure (retries left)
Retrying --> Running: backoff elapsed
Running --> Failed: retries exhausted
Failed --> DeadLettered: routed to DLQ
Ready --> Cancelled: shutdown
Running --> Cancelled: shutdown
Completed --> [*]
DeadLettered --> [*]
Skipped --> [*]
Cancelled --> [*]
Usage
Run the demo binary:
Run the examples:
Configuration
Config (see src/utils/config.rs) is loadable from
JSON (Config::from_file) and overridable via env vars: DAG_MAX_CONCURRENCY,
DAG_STORAGE_DIR, DAG_MAX_ATTEMPTS, DAG_LOG_LEVEL, DAG_PERSIST_STATE.
Set DAG_LOG_JSON=1 for JSON logs, or RUST_LOG to control verbosity.
Testing & benchmarks
Benchmarks live in benches/: task_scheduling, throughput,
state_persistence, memory_usage.
Docker
The multi-stage build produces a slim runtime image; a named volume backs the state directory.
Troubleshooting
| Symptom | Likely cause / fix |
|---|---|
ValidationError::Cycle |
A dependency loop — inspect the named task. |
ValidationError::MissingDependency |
A with_deps id has no matching task. |
| Run hangs | An EventDrivenTask is waiting for an event that never fires; add with_timeout. |
ChecksumMismatch on load |
The state file was corrupted/edited; delete it to let the task re-run. |
Tasks unexpectedly Skipped |
An upstream dependency failed; check the dead-letter queue. |
Scope & deviations
This implementation intentionally diverges from the original specification where the spec would not compile or would bloat the build:
- No HTTP server stack (
warp/hyper/tower): the Prometheus exporter renders text exposition that you can serve from any handler, keeping the dependency tree lean. Prometheus itself is behind themetricsfeature. tracing-subscriberis non-optional: a Cargo feature cannot share a name with a non-optional dependency, so onlyprometheusis feature-gated.- Dev-only crates (
tempfile,tokio-test,criterion) are in[dev-dependencies], where they belong. lru0.12 (not 0.7) and aBinaryHeap-based priority queue (instead ofpriority-queue) — fewer, current dependencies.src/lib.rsis the crate root (a baresrc/mod.rsis not a valid Rust crate root).
License
Licensed under either of
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.
Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.