# durable-rust
Lightweight durable job execution engine backed by Postgres.
No external services, no replay log, no orchestrator. Just checkpointed steps in plain Rust.
## Quick example
```rust
use durable::{Ctx, DurableError};
#[durable::workflow]
async fn ingest(ctx: Ctx) -> Result<(), DurableError> {
// Step 1 — runs once, result saved to Postgres
let shards: Vec<u32> = ctx.step("resolve_shards", || async {
Ok(vec![0, 1, 2, 3])
}).await?;
// Step 2..N — skips already-completed shards on resume
for shard in shards {
ctx.step(&format!("shard_{shard}"), || async move {
process_shard(shard).await
}).await?;
}
// Child workflow with its own steps
let child = ctx.child("post_process", None).await?;
child.step("notify", || async { send_slack("done").await }).await?;
child.complete(&"ok").await?;
ctx.complete(&"finished").await?;
Ok(())
}
async fn run(db: &DatabaseConnection) -> Result<(), DurableError> {
// start_ingest is auto-generated by #[durable::workflow].
// Idempotent: if a RUNNING task with this name exists, attaches to it.
// Stores handler="ingest" in the DB for automatic crash recovery.
let ctx = start_ingest(db, "ingest-CC-MAIN-2026-08", None).await?;
ingest(ctx).await
}
```
If the process crashes at shard 2 and restarts, `init()` recovers the stale task and auto-dispatches it via the stored `handler` column. Calling `start_ingest` again with the same name attaches to the recovered task. Shards 0-1 replay instantly from saved output, and execution resumes from shard 2.
## Schema
Three tables in a `durable` schema:
- **`task`** -- unified row for workflows, steps, and child workflows. Self-referential via `parent_id`. Steps are uniquely identified by `UNIQUE(parent_id, sequence)`.
- **`task_queue`** -- optional concurrency and rate-limit controls.
- **`executor_heartbeat`** -- tracks live workers for automatic recovery of crashed executors.
See [doc/schema.md](doc/schema.md) for the full DDL, indexes, and status lifecycle.
## Getting started
1. Start Postgres:
```sh
just db-up
```
2. Add the dependency:
```toml
[dependencies]
durable = { package = "durable-rust", version = "0.4" }
```
3. Initialize in your app -- connects, runs migrations, starts heartbeat & recovery:
```rust
let (db, recovered) = durable::init("postgres://durable:durable@localhost:5432/durable").await?;
```
4. Write workflows using `Ctx`:
- `#[durable::workflow]` -- decorator that registers a `fn(Ctx)` workflow for auto-recovery and generates a `start_<fn>()` helper
- `start_<fn>(&db, name, input)` -- macro-generated starter: creates a new workflow or attaches to an existing RUNNING one (idempotent). Stores the handler name for crash recovery
- `Ctx::start(&db, name, input)` -- manual start without handler registration
- `Ctx::start_with_handler(&db, name, input, handler)` -- manual start with explicit handler name
- `Ctx::from_id(&db, task_id)` -- reattach to an existing workflow for resume/replay
- `ctx.step(name, closure)` -- run-once step with saved output
- `ctx.child(name, input)` -- spawn a nested child workflow
- `ctx.complete(output)` -- mark workflow done
- `Ctx::pause(&db, task_id)` -- pause a running workflow (cascades to children)
- `Ctx::resume(&db, task_id)` -- resume a paused workflow
- `Ctx::cancel(&db, task_id)` -- cancel a workflow permanently (cascades to children)
- `Ctx::list(&db, query)` -- list tasks with filters, sorting, and pagination
- `Ctx::count(&db, query)` -- count tasks matching a filter
## Pause, resume, and cancel
Workflows can be paused, resumed, or cancelled by task ID -- useful for admin tooling or HTTP handlers:
```rust
// Pause a running workflow — all pending/running children are paused too
Ctx::pause(&db, task_id).await?;
// Resume — workflow goes back to RUNNING, children reset to PENDING
Ctx::resume(&db, task_id).await?;
// Cancel permanently — sets CANCELLED on the workflow and all non-terminal children
Ctx::cancel(&db, task_id).await?;
```
When a workflow is paused, any in-progress step will complete, but the next `step()`, `child()`, or `transaction()` call returns `DurableError::Paused`. After `resume()`, execution continues from where it left off.
Cancellation is terminal — a cancelled workflow cannot be resumed.
## Listing and querying tasks
Use `TaskQuery` to filter, sort, and paginate:
```rust
use durable::{Ctx, TaskQuery, TaskSort, TaskStatus};
use sea_orm::Order;
// All running root workflows, newest first
let running = Ctx::list(&db, TaskQuery::default()
.status(TaskStatus::Running)
.root_only(true)
.sort(TaskSort::CreatedAt(Order::Desc))
.limit(20)
).await?;
// Count paused tasks
let paused_count = Ctx::count(&db, TaskQuery::default().status(TaskStatus::Paused)).await?;
// Children of a specific workflow
let children = Ctx::list(&db, TaskQuery::default().parent_id(task_id)).await?;
```
Task status is a proper enum (`TaskStatus`) with variants: `Pending`, `Running`, `Completed`, `Failed`, `Paused`, `Cancelled`. The database column uses a Postgres enum type for type safety.
Filters: `status`, `kind`, `name`, `parent_id`, `root_only`, `queue_name`. Sort by: `CreatedAt`, `StartedAt`, `CompletedAt`, `Name`, `Status`.
## Crate structure
| `durable-rust` | `crates/durable` | SDK -- `Ctx`, `DurableError`, `Executor`, proc-macro re-exports. The only crate users add. |
| `durable-db` | `crates/durable-db` | SeaORM migrations for the `durable` schema (pulled in transitively) |
| `durable-macros` | `crates/durable-macros` | `#[durable::workflow]` and `#[durable::step]` proc macros (pulled in transitively) |
## Run the example
```sh
just db-up
cargo run -p nested-etl
```
The `nested-etl` example runs a parent ETL workflow that spawns child workflows per data source, then demonstrates crash recovery by resuming mid-run.
## Run tests
```sh
just test # starts DB + runs all tests
just test-integration # integration tests only
```
## Multi-node setup
Multiple workers can share one Postgres database. Each worker sets a unique `executor_id`:
```
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Worker A │ │ Worker B │ │ Worker C │
│ executor_id │ │ executor_id │ │ executor_id │
│ = "a1b2" │ │ = "c3d4" │ │ = "e5f6" │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└────────┬────────┴────────┬────────┘
│ │
▼ ▼
┌─────────────────────────────────┐
│ Postgres │
│ durable.task (all state) │
│ durable.task_queue (limits) │
└─────────────────────────────────┘
```
**How it works:**
- Workers dequeue tasks with `FOR UPDATE SKIP LOCKED` -- no double-processing
- Each task's `executor_id` column tracks which worker owns it
- `init()` automatically starts a heartbeat loop (every 60s) and a recovery loop (every 180s)
- On startup, stale tasks from crashed workers are recovered and auto-dispatched
- The `handler` column in `durable.task` maps recovered tasks to their `#[durable::workflow]` function
- Cross-worker recovery: if worker B's heartbeat goes stale, worker A resets B's tasks
**Queue concurrency control:**
```sql
-- task_queue limits how many tasks run concurrently per queue
INSERT INTO durable.task_queue (name, max_concurrency) VALUES ('ingest', 4);
-- Workers respect the limit across all nodes
-- If 4 ingest tasks are RUNNING, the 5th worker waits
```
## Design docs
- [doc/api.md](doc/api.md) -- API design with proc macros (`#[durable::workflow]`, `#[durable::step]`)
- [doc/dataflow.md](doc/dataflow.md) -- dataflow diagrams for direct, queued, scheduled, and nested execution
- [doc/schema.md](doc/schema.md) -- full schema DDL and status lifecycle
## Design principles
1. **Postgres is the source of truth** -- no WAL, no event log, no separate replay mechanism
2. **Idempotent start** -- `start()` with the same name while RUNNING returns the existing task. After COMPLETED/FAILED/CANCELLED, creates a new one
3. **Steps are idempotent by design** -- if a step completed, its saved result is returned; the closure is never re-executed
4. **Automatic crash recovery** -- `#[durable::workflow]` registers handlers. On restart, `init()` recovers stale tasks and auto-dispatches them via the stored `handler` column (DBOS-style)
5. **No orchestrator** -- the job runner is just your application code calling `ctx.step()`
6. **No serialization framework** -- uses `serde_json` for input/output
7. **Crash safe** -- reattach with `from_id()` to resume; completed steps replay from saved output
8. **Observable** -- query the `durable.task` table directly for monitoring and debugging