durable-rust 0.3.6

Lightweight durable job execution engine backed by Postgres
Documentation
# durable-rust

Lightweight durable job execution engine backed by Postgres.

No external services, no replay log, no orchestrator. Just checkpointed steps in plain Rust.

## Quick example

```rust
use durable::{Ctx, DurableError};

async fn ingest(db: &DatabaseConnection) -> Result<(), DurableError> {
    let ctx = Ctx::start(db, "ingest", None).await?;

    // Step 1 — runs once, result saved to Postgres
    let shards: Vec<u32> = ctx.step("resolve_shards", || async {
        Ok(vec![0, 1, 2, 3])
    }).await?;

    // Step 2..N — skips already-completed shards on resume
    for shard in shards {
        ctx.step(&format!("shard_{shard}"), || async move {
            process_shard(shard).await
        }).await?;
    }

    // Child workflow with its own steps
    let child = ctx.child("post_process", None).await?;
    child.step("notify", || async { send_slack("done").await }).await?;
    child.complete(&"ok").await?;

    ctx.complete(&"finished").await?;
    Ok(())
}
```

If the process crashes at shard 2, on restart it skips shards 0-1 (results saved) and resumes from 2.

## Schema

Two tables in a `durable` schema:

- **`task`** -- unified row for workflows, steps, and child workflows. Self-referential via `parent_id`. Idempotent creation via `UNIQUE(parent_id, name)`.
- **`task_queue`** -- optional concurrency and rate-limit controls.

See [doc/schema.md](doc/schema.md) for the full DDL, indexes, and status lifecycle.

## Getting started

1. Start Postgres:

   ```sh
   docker compose -f compose.db.yml up -d
   ```

2. Add the dependency:

   ```toml
   [dependencies]
   durable = { package = "durable-rust", version = "0.1" }
   ```

3. Initialize in your app -- connects and runs migrations automatically:

   ```rust
   let db = durable::init("postgres://durable:durable@localhost:5432/durable").await?;
   ```

4. Write workflows using `Ctx`:

   - `Ctx::start(&db, name, input)` -- create or resume a root workflow
   - `ctx.step(name, closure)` -- run-once step with saved output
   - `ctx.child(name, input)` -- spawn a nested child workflow
   - `ctx.complete(output)` -- mark workflow done
   - `Ctx::pause(&db, task_id)` -- pause a running workflow (cascades to children)
   - `Ctx::resume(&db, task_id)` -- resume a paused workflow
   - `Ctx::cancel(&db, task_id)` -- cancel a workflow permanently (cascades to children)
   - `Ctx::list(&db, query)` -- list tasks with filters, sorting, and pagination
   - `Ctx::count(&db, query)` -- count tasks matching a filter

## Pause, resume, and cancel

Workflows can be paused, resumed, or cancelled by task ID -- useful for admin tooling or HTTP handlers:

```rust
// Pause a running workflow — all pending/running children are paused too
Ctx::pause(&db, task_id).await?;

// Resume — workflow goes back to RUNNING, children reset to PENDING
Ctx::resume(&db, task_id).await?;

// Cancel permanently — sets CANCELLED on the workflow and all non-terminal children
Ctx::cancel(&db, task_id).await?;
```

When a workflow is paused, any in-progress step will complete, but the next `step()`, `child()`, or `transaction()` call returns `DurableError::Paused`. After `resume()`, execution continues from where it left off.

Cancellation is terminal — a cancelled workflow cannot be resumed.

## Listing and querying tasks

Use `TaskQuery` to filter, sort, and paginate:

```rust
use durable::{Ctx, TaskQuery, TaskSort, TaskStatus};
use sea_orm::Order;

// All running root workflows, newest first
let running = Ctx::list(&db, TaskQuery::default()
    .status(TaskStatus::Running)
    .root_only(true)
    .sort(TaskSort::CreatedAt(Order::Desc))
    .limit(20)
).await?;

// Count paused tasks
let paused_count = Ctx::count(&db, TaskQuery::default().status(TaskStatus::Paused)).await?;

// Children of a specific workflow
let children = Ctx::list(&db, TaskQuery::default().parent_id(task_id)).await?;
```

Task status is a proper enum (`TaskStatus`) with variants: `Pending`, `Running`, `Completed`, `Failed`, `Paused`, `Cancelled`. The database column uses a Postgres enum type for type safety.

Filters: `status`, `kind`, `name`, `parent_id`, `root_only`, `queue_name`. Sort by: `CreatedAt`, `StartedAt`, `CompletedAt`, `Name`, `Status`.

## Crate structure

| Crate | Path | Purpose |
|---|---|---|
| `durable-rust` | `crates/durable` | SDK -- `Ctx`, `DurableError`, `Executor`, proc-macro re-exports. The only crate users add. |
| `durable-db` | `crates/durable-db` | SeaORM migrations for the `durable` schema (pulled in transitively) |
| `durable-macros` | `crates/durable-macros` | `#[durable::workflow]` and `#[durable::step]` proc macros (pulled in transitively) |

## Run the example

```sh
docker compose -f compose.db.yml up -d
cargo run -p nested-etl
```

The `nested-etl` example runs a parent ETL workflow that spawns child workflows per data source, then demonstrates crash recovery by resuming mid-run.

## Run tests

```sh
cargo test --workspace
```

## Multi-node setup

Multiple workers can share one Postgres database. Each worker sets a unique `executor_id`:

```
┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│  Worker A    │  │  Worker B    │  │  Worker C    │
│  executor_id │  │  executor_id │  │  executor_id │
│  = "a1b2"   │  │  = "c3d4"   │  │  = "e5f6"   │
└──────┬───────┘  └──────┬───────┘  └──────┬───────┘
       │                 │                 │
       └────────┬────────┴────────┬────────┘
                │                 │
                ▼                 ▼
       ┌─────────────────────────────────┐
       │           Postgres              │
       │  durable.task       (all state) │
       │  durable.task_queue (limits)    │
       └─────────────────────────────────┘
```

**How it works:**

- Workers dequeue tasks with `FOR UPDATE SKIP LOCKED` -- no double-processing
- Each task's `executor_id` column tracks which worker owns it
- On startup, a worker resets its own stale `RUNNING` tasks to `PENDING` (single-worker recovery)
- For cross-worker recovery (worker A detects worker B crashed), enable the executor heartbeat (see [#7]https://github.com/code-salad/durable-rust/issues/7)

**Queue concurrency control:**

```sql
-- task_queue limits how many tasks run concurrently per queue
INSERT INTO durable.task_queue (name, max_concurrency) VALUES ('ingest', 4);

-- Workers respect the limit across all nodes
-- If 4 ingest tasks are RUNNING, the 5th worker waits
```

## Design docs

- [doc/api.md]doc/api.md -- API design with proc macros (`#[durable::workflow]`, `#[durable::step]`)
- [doc/dataflow.md]doc/dataflow.md -- dataflow diagrams for direct, queued, scheduled, and nested execution
- [doc/schema.md]doc/schema.md -- full schema DDL and status lifecycle

## Design principles

1. **Postgres is the source of truth** -- no WAL, no event log, no separate replay mechanism
2. **Steps are idempotent by design** -- if a step completed, its saved result is returned; the closure is never re-executed
3. **No orchestrator** -- the job runner is just your application code calling `ctx.step()`
4. **No serialization framework** -- uses `serde_json` for input/output
5. **Crash safe** -- incomplete steps are detected on resume; completed steps replay from saved output
6. **Observable** -- query the `durable.task` table directly for monitoring and debugging