dag-executor 0.1.0

A production-ready DAG executor with state management and advanced patterns
Documentation
# dag-executor

[![License: MIT OR Apache-2.0](https://img.shields.io/badge/license-MIT%20OR%20Apache--2.0-blue.svg)](#license)

A production-ready **DAG (directed acyclic graph) executor** in Rust with
stateful task execution, file-based persistence and crash recovery, advanced
workflow patterns, and built-in fault tolerance and observability.

> **Status:** the core engine, persistence, fault-tolerance primitives, task
> types, tests, examples and benchmarks are implemented and green
> (`cargo test` passes). See [Scope & deviations]#scope--deviations for how
> this differs from the original spec.

## Features

- **Concurrent execution** — bounded-parallelism worker pool over Tokio;
  comfortably runs thousands of tasks per run.
- **Dependency resolution**`petgraph`-backed graph with cycle / missing-dep /
  duplicate / self-dependency validation and topological ordering.
- **Priority scheduling** — higher-priority ready tasks run first; FIFO within a
  priority.
- **Stateful tasks & recovery** — every task's `TaskRecord` is persisted; a
  crashed run is recovered and orphaned `Running` tasks are repaired.
- **Workflow patterns**`BasicTask`, `StatefulTask`, `ConditionalTask`,
  `LoopTask`, `EventDrivenTask`, plus `fan_out_in` / `pipeline` helpers.
- **Fault tolerance** — retry policies (fixed/linear/exponential + jitter),
  a circuit breaker, a dead-letter queue, and automatic skip-cascade of
  dependents when a task fails.
- **Storage** — checksummed file storage (`FileStorage`) with three selectable
  durability modes — `Fast` (in-place, default, ~0.85 ms/1 KB), `Atomic`
  (temp+rename so a crash never destroys the prior record, ~2 ms), and `Durable`
  (atomic + `fsync`, survives power loss, ~8 ms) — plus an in-memory backend
  (`MemoryStorage`) and a write-through LRU `Cache`. No external services
  required.
- **Observability** — lock-free metrics collector and an optional Prometheus
  exporter (`metrics` feature), plus structured `tracing` logs.

## Quick start

```rust
use std::sync::Arc;
use dag_executor::prelude::*;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut dag = Dag::new();
    dag.add_task(Arc::new(BasicTask::new("a", |_| async { Ok(serde_json::json!(1)) })))?;
    dag.add_task(Arc::new(
        BasicTask::new("b", |_| async { Ok(serde_json::json!(2)) }).with_deps(["a"]),
    ))?;

    let executor = DagExecutor::builder().persist(false).build();
    let report = executor.run(dag).await?;
    assert!(report.is_success());
    Ok(())
}
```

## Architecture

```mermaid
flowchart TD
    Dag[Dag<br/>petgraph dependency graph] -->|submitted to| Exec

    subgraph Exec[DagExecutor — validates, recovers, drives the run]
        direction LR
        Sched[Scheduler<br/>priority queue + records]
        Pool[WorkerPool<br/>semaphore-bounded<br/>retry / timeout / circuit breaker]
        Store[Storage<br/>File / Memory + LRU Cache]
        DLQ[DeadLetterQueue<br/>durable]
        Metrics[Metrics<br/>counters + Prometheus]
    end

    Exec -->|owns| Sched
    Exec -->|owns| Pool
    Exec -->|owns| Store
    Exec -->|owns| DLQ
    Exec -->|owns| Metrics

    Sched -->|next ready task| Pool
    Pool -->|task result| Sched
    Pool -->|terminal failure| DLQ
    Sched -->|persist records| Store
    Pool -->|emit counters| Metrics
```

The executor runs a single event loop:

1. **Validate** the graph (acyclicity, dependencies).
2. **Recover** persisted records and repair any left in `Running`.
3. Maintain a per-task **remaining-dependency counter**; when it hits zero the
   task is enqueued in the `Scheduler` at its priority.
4. **Spawn** ready tasks on the `WorkerPool`, which gates concurrency with a
   semaphore and wraps each task in the retry loop, optional timeout, and
   circuit-breaker check.
5. On completion, **persist** the record, publish output to the shared
   `Context` blackboard, and decrement dependents' counters. On terminal
   failure, **dead-letter** the task and **skip-cascade** its dependents.

State lives in [`src/state`](src/state); the engine in [`src/dag`](src/dag);
task types in [`src/tasks`](src/tasks); fault-tolerance in
[`src/advanced`](src/advanced).

### Task lifecycle

```mermaid
stateDiagram-v2
    [*] --> Pending
    Pending --> Ready: dependencies satisfied
    Pending --> Skipped: upstream failed
    Ready --> Running: scheduled
    Running --> Completed: success
    Running --> Retrying: failure (retries left)
    Retrying --> Running: backoff elapsed
    Running --> Failed: retries exhausted
    Failed --> DeadLettered: routed to DLQ
    Ready --> Cancelled: shutdown
    Running --> Cancelled: shutdown
    Completed --> [*]
    DeadLettered --> [*]
    Skipped --> [*]
    Cancelled --> [*]
```

## Usage

Run the demo binary:

```bash
cargo run -- --concurrency 64 --workers 8
```

Run the examples:

```bash
cargo run --example basic_usage
cargo run --example stateful_tasks
cargo run --example advanced_patterns
cargo run --example fault_tolerance
cargo run --release --example performance_demo
```

### Configuration

`Config` (see [`src/utils/config.rs`](src/utils/config.rs)) is loadable from
JSON (`Config::from_file`) and overridable via env vars: `DAG_MAX_CONCURRENCY`,
`DAG_STORAGE_DIR`, `DAG_MAX_ATTEMPTS`, `DAG_LOG_LEVEL`, `DAG_PERSIST_STATE`.
Set `DAG_LOG_JSON=1` for JSON logs, or `RUST_LOG` to control verbosity.

## Testing & benchmarks

```bash
cargo test                 # unit + integration + doctests
cargo test --test unit
cargo test --test integration workflow_tests
cargo bench                # criterion benchmarks
```

Benchmarks live in [`benches/`](benches): `task_scheduling`, `throughput`,
`state_persistence`, `memory_usage`.

## Docker

```bash
docker build -f docker/Dockerfile -t dag-executor .
docker compose -f docker/docker-compose.yml up
```

The multi-stage build produces a slim runtime image; a named volume backs the
state directory.

## Troubleshooting

| Symptom | Likely cause / fix |
|---------|--------------------|
| `ValidationError::Cycle` | A dependency loop — inspect the named task. |
| `ValidationError::MissingDependency` | A `with_deps` id has no matching task. |
| Run hangs | An `EventDrivenTask` is waiting for an event that never fires; add `with_timeout`. |
| `ChecksumMismatch` on load | The state file was corrupted/edited; delete it to let the task re-run. |
| Tasks unexpectedly `Skipped` | An upstream dependency failed; check the dead-letter queue. |

## Scope & deviations

This implementation intentionally diverges from the original specification where
the spec would not compile or would bloat the build:

- **No HTTP server stack** (`warp`/`hyper`/`tower`): the Prometheus exporter
  renders text exposition that you can serve from any handler, keeping the
  dependency tree lean. Prometheus itself is behind the `metrics` feature.
- **`tracing-subscriber` is non-optional**: a Cargo feature cannot share a name
  with a non-optional dependency, so only `prometheus` is feature-gated.
- **Dev-only crates** (`tempfile`, `tokio-test`, `criterion`) are in
  `[dev-dependencies]`, where they belong.
- **`lru` 0.12** (not 0.7) and a `BinaryHeap`-based priority queue (instead of
  `priority-queue`) — fewer, current dependencies.
- **`src/lib.rs`** is the crate root (a bare `src/mod.rs` is not a valid Rust
  crate root).

## License

Licensed under either of

- Apache License, Version 2.0 ([LICENSE-APACHE]LICENSE-APACHE or
  <http://www.apache.org/licenses/LICENSE-2.0>)
- MIT license ([LICENSE-MIT]LICENSE-MIT or
  <http://opensource.org/licenses/MIT>)

at your option.

### Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.