durable-rust 0.4.1

Lightweight durable job execution engine backed by Postgres
Documentation

durable-rust

Lightweight durable job execution engine backed by Postgres.

No external services, no replay log, no orchestrator. Just checkpointed steps in plain Rust.

Quick example

use durable::{Ctx, DurableError};

async fn ingest(db: &DatabaseConnection) -> Result<(), DurableError> {
    // Every start() creates a new workflow with a unique ID
    let ctx = Ctx::start(db, "ingest", None).await?;

    // Step 1 — runs once, result saved to Postgres
    let shards: Vec<u32> = ctx.step("resolve_shards", || async {
        Ok(vec![0, 1, 2, 3])
    }).await?;

    // Step 2..N — skips already-completed shards on resume
    for shard in shards {
        ctx.step(&format!("shard_{shard}"), || async move {
            process_shard(shard).await
        }).await?;
    }

    // Child workflow with its own steps
    let child = ctx.child("post_process", None).await?;
    child.step("notify", || async { send_slack("done").await }).await?;
    child.complete(&"ok").await?;

    ctx.complete(&"finished").await?;
    Ok(())
}

// To resume after a crash, reattach by task ID — steps replay from saved output
async fn resume(db: &DatabaseConnection, task_id: Uuid) -> Result<(), DurableError> {
    let ctx = Ctx::from_id(db, task_id).await?;
    // Steps that already completed replay their saved results automatically.
    // New steps execute normally.
    // ...
    Ok(())
}

If the process crashes at shard 2, reattach with Ctx::from_id(db, task_id) — shards 0-1 replay instantly from saved output, and execution resumes from shard 2.

Schema

Three tables in a durable schema:

  • task -- unified row for workflows, steps, and child workflows. Self-referential via parent_id. Steps are uniquely identified by UNIQUE(parent_id, sequence).
  • task_queue -- optional concurrency and rate-limit controls.
  • executor_heartbeat -- tracks live workers for automatic recovery of crashed executors.

See doc/schema.md for the full DDL, indexes, and status lifecycle.

Getting started

  1. Start Postgres:

    just db-up
    # or: docker compose up -d
    
  2. Add the dependency:

    [dependencies]
    durable = { package = "durable-rust", version = "0.4" }
    
  3. Initialize in your app -- connects, runs migrations, starts heartbeat & recovery:

    let db = durable::init("postgres://durable:durable@localhost:5432/durable").await?;
    
  4. Write workflows using Ctx:

    • Ctx::start(&db, name, input) -- create a new workflow (always a fresh ID)
    • Ctx::from_id(&db, task_id) -- reattach to an existing workflow for resume/replay
    • ctx.step(name, closure) -- run-once step with saved output
    • ctx.child(name, input) -- spawn a nested child workflow
    • ctx.complete(output) -- mark workflow done
    • Ctx::pause(&db, task_id) -- pause a running workflow (cascades to children)
    • Ctx::resume(&db, task_id) -- resume a paused workflow
    • Ctx::cancel(&db, task_id) -- cancel a workflow permanently (cascades to children)
    • Ctx::list(&db, query) -- list tasks with filters, sorting, and pagination
    • Ctx::count(&db, query) -- count tasks matching a filter

Pause, resume, and cancel

Workflows can be paused, resumed, or cancelled by task ID -- useful for admin tooling or HTTP handlers:

// Pause a running workflow — all pending/running children are paused too
Ctx::pause(&db, task_id).await?;

// Resume — workflow goes back to RUNNING, children reset to PENDING
Ctx::resume(&db, task_id).await?;

// Cancel permanently — sets CANCELLED on the workflow and all non-terminal children
Ctx::cancel(&db, task_id).await?;

When a workflow is paused, any in-progress step will complete, but the next step(), child(), or transaction() call returns DurableError::Paused. After resume(), execution continues from where it left off.

Cancellation is terminal — a cancelled workflow cannot be resumed.

Listing and querying tasks

Use TaskQuery to filter, sort, and paginate:

use durable::{Ctx, TaskQuery, TaskSort, TaskStatus};
use sea_orm::Order;

// All running root workflows, newest first
let running = Ctx::list(&db, TaskQuery::default()
    .status(TaskStatus::Running)
    .root_only(true)
    .sort(TaskSort::CreatedAt(Order::Desc))
    .limit(20)
).await?;

// Count paused tasks
let paused_count = Ctx::count(&db, TaskQuery::default().status(TaskStatus::Paused)).await?;

// Children of a specific workflow
let children = Ctx::list(&db, TaskQuery::default().parent_id(task_id)).await?;

Task status is a proper enum (TaskStatus) with variants: Pending, Running, Completed, Failed, Paused, Cancelled. The database column uses a Postgres enum type for type safety.

Filters: status, kind, name, parent_id, root_only, queue_name. Sort by: CreatedAt, StartedAt, CompletedAt, Name, Status.

Crate structure

Crate Path Purpose
durable-rust crates/durable SDK -- Ctx, DurableError, Executor, proc-macro re-exports. The only crate users add.
durable-db crates/durable-db SeaORM migrations for the durable schema (pulled in transitively)
durable-macros crates/durable-macros #[durable::workflow] and #[durable::step] proc macros (pulled in transitively)

Run the example

just db-up
cargo run -p nested-etl

The nested-etl example runs a parent ETL workflow that spawns child workflows per data source, then demonstrates crash recovery by resuming mid-run.

Run tests

just test              # starts DB + runs all tests
just test-integration  # integration tests only

Multi-node setup

Multiple workers can share one Postgres database. Each worker sets a unique executor_id:

┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│  Worker A    │  │  Worker B    │  │  Worker C    │
│  executor_id │  │  executor_id │  │  executor_id │
│  = "a1b2"   │  │  = "c3d4"   │  │  = "e5f6"   │
└──────┬───────┘  └──────┬───────┘  └──────┬───────┘
       │                 │                 │
       └────────┬────────┴────────┬────────┘
                │                 │
                ▼                 ▼
       ┌─────────────────────────────────┐
       │           Postgres              │
       │  durable.task       (all state) │
       │  durable.task_queue (limits)    │
       └─────────────────────────────────┘

How it works:

  • Workers dequeue tasks with FOR UPDATE SKIP LOCKED -- no double-processing
  • Each task's executor_id column tracks which worker owns it
  • init() automatically starts a heartbeat loop (every 60s) and a recovery loop (every 180s)
  • On startup, stale tasks from crashed workers are recovered to PENDING
  • Cross-worker recovery: if worker B's heartbeat goes stale, worker A resets B's tasks

Queue concurrency control:

-- task_queue limits how many tasks run concurrently per queue
INSERT INTO durable.task_queue (name, max_concurrency) VALUES ('ingest', 4);

-- Workers respect the limit across all nodes
-- If 4 ingest tasks are RUNNING, the 5th worker waits

Design docs

  • doc/api.md -- API design with proc macros (#[durable::workflow], #[durable::step])
  • doc/dataflow.md -- dataflow diagrams for direct, queued, scheduled, and nested execution
  • doc/schema.md -- full schema DDL and status lifecycle

Design principles

  1. Postgres is the source of truth -- no WAL, no event log, no separate replay mechanism
  2. Every run is isolated -- start() always creates a new task with a unique ID. Steps are scoped by (parent_id, sequence), so runs with the same name never share state
  3. Steps are idempotent by design -- if a step completed, its saved result is returned; the closure is never re-executed
  4. No orchestrator -- the job runner is just your application code calling ctx.step()
  5. No serialization framework -- uses serde_json for input/output
  6. Crash safe -- reattach with from_id() to resume; completed steps replay from saved output
  7. Observable -- query the durable.task table directly for monitoring and debugging