Runledger
Runledger is a standalone Rust workspace for durable job execution and workflow orchestration on PostgreSQL.
This repository was extracted from a larger application and scoped down to the Runledger-specific crates, migrations, and test utilities needed to build and evolve the job system independently.
Workspace
The workspace contains four crates:
runledger-coreStorage-agnostic contracts: job handler traits, runtime types, statuses, identifiers, and workflow enqueue/build validation.runledger-postgresSQLx-backed PostgreSQL persistence for the queue, job lifecycle, schedules, workflow DAG state machine, runtime configs, logs, and admin reads/mutations.runledger-runtimeAsync worker, scheduler, and reaper loops plus runtime configuration and handler registry.runledger-test-supportPublished test utilities for ephemeral PostgreSQL databases and scoped environment-variable overrides.
The root workspace manifest is Cargo.toml.
What This Repo Includes
- Rust crates for the Runledger contracts, runtime, and PostgreSQL persistence layer
- A Runledger-only SQL migration history in migrations
- Vendored copies of those migrations in runledger-postgres/migrations and runledger-test-support/migrations so packaged crates can apply schemas without relying on repo-relative paths
- Local test support for DB-backed tests using
testcontainers - SQLx offline metadata in
.sqlx/so the macro-based queries compile without a live database during normal builds
What This Repo Does Not Include
- Application-specific handlers
- API servers, CLIs, or binaries
- Non-Runledger product schema from the original application
- Domain models owned by a larger app
You are expected to embed these crates inside your own service and supply:
- concrete job handlers
- process bootstrapping
- database provisioning
- application-level auth/admin surfaces
Choosing The API
Use the highest-level Runledger API that matches the shape of the work. This is especially important for agents and generated integrations: a workflow DAG is a first-class Runledger feature, not something consumers should recreate by polling jobs or chaining handlers manually.
For a shorter prompt-facing version, see llms.txt. For a slightly longer guide, see docs/downstream-agent-guide.md.
Common integration imports:
use *;
use *;
use *;
| Need | Prefer |
|---|---|
| One independent retried unit of work | runledger_postgres::jobs::enqueue_job |
| Multi-step work with dependencies | WorkflowDagBuilder (simple DAGs) or WorkflowRunEnqueueBuilder / WorkflowStepEnqueueBuilder (advanced), then enqueue_workflow_run |
| Fan-out, fan-in, or ordered stages | WorkflowDagBuilder::after_success / after_terminal, or lower-level depends_on_success / depends_on_terminal |
| Human/API approval or another external gate | External workflow steps and complete_external_workflow_step |
| Delayed or recurring entrypoint | JobScheduleUpsert and upsert_job_schedule |
| Worker process lifecycle | runledger_runtime::Supervisor::run_until_shutdown |
| Admin/status views | runledger_postgres::jobs read/list APIs |
Avoid manual workflow orchestration unless you are intentionally building a
custom orchestrator outside Runledger. For ordinary dependent work, do not poll
get_job_by_id in a loop, enqueue dependent jobs from parent handlers, encode
dependency state in job payload JSON, or add app-owned tables to track workflow
edges. Model the run as a workflow DAG instead.
Crate Responsibilities
runledger-core
Use runledger-core for the public contracts shared across the rest of the workspace:
JobHandlerandJobHandlerRegistryJobContext,JobProgress, andJobFailure- job status and event enums
- workflow enqueue builders and DAG validation
This crate intentionally has no persistence or async loop logic.
runledger-postgres
Use runledger-postgres when you need durable state in PostgreSQL.
Key capabilities:
- enqueue, claim, heartbeat, retry, succeed, cancel, dead-letter, and requeue jobs
- create, materialize, and update cron schedules
- persist job logs and runtime configs
- create, read, mutate, and advance workflow runs and steps
- query operator/admin views over queue and workflow state
The crate assumes the matching Runledger schema has already been migrated into the target database.
For consumer setup there are two supported modes:
- call
runledger_postgres::migrate_after_idempotency_cutover(&pool)to apply the bundled schema during startup and reject keyed legacy rows without enqueue snapshots - call
runledger_postgres::ensure_schema_compatible_after_idempotency_cutover(&pool)to perform a read-only validation that an existing_sqlx_migrationshistory matches the bundled migrations, with explicit errors for missing history, incompatible history, legacy idempotency rows, or PostgreSQL query/connectivity failures
Operational API notes:
QueryError::DisplayandDebugare safe for public surfaces and omit internal database context; useQueryError::internal_message()for server-side diagnostics.- Worker lifecycle updates reject expired leases with the stable
job.lease_owner_mismatchcode, even when the lease was lost by time rather than by another worker; oncelease_expires_athas passed there is no owner grace period. complete_job_successpersistsJobStage::Completed; passing any other success stage is rejected as a caller error.- Workflow-backed job completion waits for an in-flight workflow cancellation to commit or roll back instead of returning a transient
workflow.release_conflict; append and external-step release paths may still returnworkflow.release_conflictwhile cancellation owns the exclusive release lock. - Retry conflicts such as
workflow.append_conflicting_retryare reported as conflict-category query errors; clients should prefer stable error codes over broad categories for exact branching. - Release-sensitive workflow operations, workflow append mutations, and keyed enqueue retries require PostgreSQL
READ COMMITTEDsemantics. PostgreSQL'sREAD UNCOMMITTEDmode is accepted because PostgreSQL implements it as read committed. - Keyed rows created before enqueue snapshots existed cannot be safely reconstructed. The idempotency cutover rejects keyed job and workflow rows with
enqueue_request IS NULLduring startup/schema validation, and keyed retries against such rows return dedicated conflict errors instead of falling back to mutable state comparisons.
runledger-runtime
Use runledger-runtime to run the operational loops around the storage layer:
Supervisorcatalog::JobCatalogfor single-source handler registration, definition sync, and validated enqueue helpersregistry::JobRegistryfor advanced setups that manage handlers separately from definitionsconfig::JobsConfig
The runtime is generic. It does not embed application-specific job lists; applications build their own JobCatalog at startup. Supervisor is the preferred facade for worker processes; worker::run_worker_loop, scheduler::run_scheduler_loop, and reaper::run_reaper_loop remain available as low-level building blocks for custom orchestration. Those low-level loops return RuntimeLoopExit; custom orchestrators that type their join handles explicitly should use JoinHandle<RuntimeLoopExit>.
runledger-test-support
This crate provides shared testing utilities for Runledger crates and downstream integration tests.
It provides:
setup_ephemeral_poolteardown_ephemeral_poolScopedEnv
It starts a disposable PostgreSQL container, creates per-test databases, and runs its vendored Runledger migrations against them. It is published so package tests in runledger-postgres can depend on the same harness that workspace tests use.
Database Model
The standalone schema is intentionally limited to Runledger-owned objects.
Major schema areas:
- queue and lifecycle tables
job_definitions,job_queue,job_attempts,job_events,job_dead_letters,job_schedules - workflow orchestration tables
workflow_runs,workflow_steps,workflow_step_dependencies,workflow_run_mutations - operational support tables
job_logs,job_runtime_configs - derived operational view
job_metrics_rollup
Notable schema features:
- idempotent queueing via
idempotency_key - cron-backed schedule materialization
- workflow DAG execution with dependency counters
- external workflow gates via
WAITING_FOR_EXTERNAL - append-only workflow mutation tracking
- panic-aware job metrics rollups
Schema Scope Difference From The Original App
This repository no longer ships the original product schema.
A few columns remain for integration flexibility, but their original foreign keys were intentionally removed in the standalone migration set:
organization_idcreated_by_user_idupdated_by_user_id
These values are now treated as opaque UUIDs from the perspective of Runledger. If your host application wants referential integrity, it should add that in its own schema layer or wrap these migrations with app-owned extensions.
Migrations
The migration set lives in migrations.
This repo uses a flattened baseline plus forward migrations:
202603280001_runledger_baselinecreates the standalone Runledger schema baseline, including: helper functions, queue tables, workflow DAG tables, logs, runtime configs, workflow mutations, external workflow gates, panic-aware attempt outcomes, and the final metrics rollup view202604100001_runledger_migration_historycreatesrunledger_migration_historyand records the standalone baseline and history-table migration versions202605180001_add_enqueue_request_snapshotsaddsenqueue_requestsnapshots tojob_queueandworkflow_runsso keyed enqueue retries can compare the original request instead of mutable runtime state202605220001_enforce_enqueue_request_snapshotsblocks new keyed queue/workflow rows without snapshots while startup validation rejects pre-cutover legacy rows; the application migration API validates the cutover constraints after legacy-row validation passes
The historical standalone migration chain was intentionally collapsed because this repository now targets fresh standalone deployments rather than preserving every intermediate extraction-era cutover step.
If you already created databases from the older multi-file standalone migration history, treat the flattened baseline as a new-from-scratch schema definition, not as an in-place upgrade path. Apply later forward migrations normally.
The workspace-root migration directory remains the canonical schema source for repo development and review.
For consumers using the published crate:
runledger_postgres::MIGRATORembeds the vendoredrunledger-postgres/migrations/copyrunledger-test-supportembeds its ownrunledger-test-support/migrations/copy for packaged test harnessesrunledger_postgres::migrate_after_idempotency_cutover(&pool)applies those migrations and rejects keyed legacy rows without snapshotsrunledger_postgres::ensure_schema_compatible_after_idempotency_cutover(&pool)validates that an existing_sqlx_migrationshistory matches them without running DDL and returns Runledger-specific errors for missing history, incompatible history, legacy idempotency rows, or PostgreSQL query/connectivity failures; externally managed DDL can validate theNOT VALIDcutover constraints after this check passesrunledger-postgres/build.rsfails local builds if the vendored crate copy drifts from the canonical workspace-rootmigrations/directory
Apply these migrations, or call runledger_postgres::migrate_after_idempotency_cutover(&pool), before using runledger-postgres or running DB-backed tests.
For the enqueue-request snapshot cutover, apply the bundled migrations first,
then run either startup API. If it returns
SchemaCompatibilityError::LegacyIdempotencySnapshotsMissing, inspect the
legacy rows with the idx_job_queue_missing_enqueue_request_snapshot and
idx_workflow_runs_missing_enqueue_request_snapshot partial indexes, remediate
or drain those keyed rows, and retry startup. Prefer natural drain or clearing
the stale idempotency_key where retry identity no longer matters; only backfill
enqueue_request when you have the original canonical enqueue request, not from
mutable live queue/workflow state. migrate_after_idempotency_cutover validates
the cutover constraints once no legacy rows remain; that first validation scans
job_queue and workflow_runs and may briefly delay startup on large tables
without blocking ordinary DML. The cutover migration also builds helper indexes
for locating legacy rows; on large tables, apply it during a maintenance window
appropriate for your write volume.
Runtime Configuration
runledger-runtime exposes JobsConfig::from_env() in runledger-runtime/src/config.rs.
Supported environment variables:
JOBS_WORKER_IDJOBS_POLL_INTERVAL_MSJOBS_CLAIM_BATCH_SIZEJOBS_LEASE_TTL_SECONDSJOBS_MAX_GLOBAL_CONCURRENCYJOBS_REAPER_INTERVAL_SECONDSJOBS_SCHEDULE_POLL_INTERVAL_SECONDSJOBS_REAPER_RETRY_DELAY_MS
Default behavior:
- blank
JOBS_WORKER_IDfalls back toworker-<uuidv7> - interval and concurrency values are clamped to safe minimums
- lease TTL is clamped to at least
10seconds
Building
Common commands:
The standalone workspace has been validated with:
SQLx Offline Mode
This repo uses sqlx::query! and related macros extensively.
To keep normal builds self-contained:
.cargo/config.tomlsetsSQLX_OFFLINE=true- the workspace-root
.sqlx/directory is the source cache generated bycargo sqlx prepare --workspace - each publishable crate that uses SQLx checked macros also carries its own
.sqlx/directory socargo publishcan verify the packaged tarball in isolation
If you change SQL queries or the schema, refresh the cache before committing.
Typical workflow:
- bring up a PostgreSQL database with the current Runledger migrations applied
- point
DATABASE_URLat that database - run
./scripts/refresh-sqlx-cache.sh
What the script does:
- regenerates the workspace root
.sqlx/cache - syncs that cache into
runledger-postgres/.sqlx/andrunledger-runtime/.sqlx/ - syncs the workspace-root
migrations/directory intorunledger-postgres/migrations/ - runs
cargo check --workspace - confirms the publishable crate tarballs include their per-crate SQLx cache
Do not update only the workspace root .sqlx/ directory. cargo publish verifies each crate from its packaged tarball, so publishable crates must include their own SQLx cache.
If the cache and schema drift apart, cargo check will fail during macro expansion.
Publishing
Prepare a release with the repository script:
The preparation script:
- requires a clean working tree
- bumps publishable crate versions and root workspace dependency versions
- refreshes SQLx offline metadata
- runs workspace tests and the packaged external-consumer smoke test
- runs a publish dry-run for
runledger-coreand packages the dependent crates locally
Before publishing this release line, call out observable contract changes in release notes:
- published crates require Rust 1.88+
runledger-runtimeaddsSupervisor- low-level runtime loops now return
RuntimeLoopExit runledger-postgresaddsJobScheduleUpsert,upsert_job_schedule,set_job_schedule_active, andset_job_schedule_next_fire_at; conflict updates refresh the schedule definition while preservingis_activeandorganization_id, refreshnext_fire_atwhen cron syntax changes, and validate cron syntax plus name/jitter bounds- low-level
runledger-postgres::jobs::mark_schedule_fired_txnow returnsResult<bool>so runtime internals can distinguish a successful cursor advance from a missing schedule row JobScheduleRecordexposesis_activeso setup code can observe preserved pause/resume state after schedule upserts- schedules are UTC-only; schedule upserts store
timezone = 'UTC', and accepted cron expressions use the same parser asrunledger-runtime QueryError::Displaynow returns client-safe messages- expired leases have no owner grace period for heartbeat/progress/success/failure writes
- the
job.lease_owner_mismatchmessage now covers time-based loss of ownership - success completion rejects non-
Completedstages - workflow-backed job completion waits on in-flight cancellation instead of returning
workflow.release_conflict - append/external release can still return
workflow.release_conflict - workflow append mutations require read-committed transaction isolation
- idempotent enqueue adds new conflict/isolation error codes
workflow.append_conflicting_retryis now a conflict-category error
If publishing manually, run ./scripts/refresh-sqlx-cache.sh before publishing runledger-postgres or runledger-runtime and commit any resulting .sqlx/ changes.
After reviewing and committing the prepared diff, publish with:
The publish script publishes crates in dependency order, dry-runs each crate once its workspace dependencies are indexed, creates a v0.3.0 tag, and pushes the current branch and tag. Set PUBLISH_REMOTE to override the git remote used for the final push.
Testing
There are two main categories of tests:
- pure Rust unit tests these do not require PostgreSQL
- DB-backed tests
these use
runledger-test-supportandtestcontainers
The DB-backed tests:
- start a shared PostgreSQL container
- create isolated ephemeral databases per test
- apply the local Runledger migrations
The packaged external-consumer smoke test:
- packages
runledger-core,runledger-postgres, andrunledger-runtime - extracts those
.cratearchives locally - builds a standalone host crate against the packaged manifests via
[patch.crates-io] - runs migrations, starts the runtime supervisor, enqueues jobs, and asserts terminal states
Run it with:
The default test image is postgres:18.
Override it with:
The test harness expects the database image to support uuidv7().
PostgreSQL Assumptions
Runledger expects PostgreSQL semantics and features consistent with the migration set and SQLx queries in this repo.
In particular:
uuidv7()must be available- transactional DDL behavior must support the baseline migration as written
- the target DB must be migrated before runtime code uses it
Typical Integration Shape
A host application will generally:
- either call
runledger_postgres::migrate_after_idempotency_cutover(&pool)or apply the Runledger migrations with your own deployment tooling and then callrunledger_postgres::ensure_schema_compatible_after_idempotency_cutover(&pool) - create a shared
sqlx::PgPool - register concrete handlers in a
runledger_runtime::catalog::JobCatalog(or directly inrunledger_runtime::registry::JobRegistryfor advanced setups) - start
runledger_runtime::Supervisorin a worker process - call
runledger_postgres::jobs::*APIs from its own admin/API surfaces
At a high level:
use Duration;
use Supervisor;
use JobCatalog;
use JobsConfig;
let pool = /* sqlx PgPool */;
migrate_after_idempotency_cutover.await?;
let catalog = new
.job;
catalog.sync_definitions.await?;
let config = from_env;
let supervisor = builder?
.with_catalog
.build?;
supervisor
.run_until_shutdown
.await?;
Production worker binaries should still close their pool after supervisor shutdown; the worker example below keeps cleanup independent from shutdown errors.
See runledger-runtime/examples/worker_binary.rs for a compile-checked worker binary skeleton.
This workspace deliberately stops at the library boundary; it does not prescribe your process model or handler packaging.
Workflow DAG Recipe
When work has dependencies, model those dependencies directly in the workflow enqueue request. The workflow engine persists the run, validates the DAG, enqueues root steps, releases dependents when prerequisites finish, and keeps the run status coherent with cancellation and external gates.
use WorkflowDagBuilder;
let metadata = json!;
let crawl_payload = json!;
let classify_payload = json!;
let score_payload = json!;
let persist_payload = json!;
let run = new
.idempotency_key
.job?
.job?
.after_success?
.job?
.after_success?
.job?
.after_success?
.build?;
let workflow_run = enqueue_workflow_run.await?;
WorkflowDagBuilder accepts raw string identifiers for readable call sites. It
validates the workflow shape before enqueueing, but it does not prove at compile
time that a job type has a registered job definition or runtime handler. Use
WorkflowRunEnqueueBuilder and WorkflowStepEnqueueBuilder when you need
per-step priority, attempts, timeout, stage, external steps, hand-authored
dependency specs, or call sites that pass explicit StepKey and JobType
values.
Validation timing:
| Call | Fails immediately | Deferred until .build() / .try_build() |
|---|---|---|
WorkflowDagBuilder::new(...) |
never | blank workflow type |
WorkflowDagBuilder::try_new(...) |
blank workflow type | empty step list and dependency graph errors |
.job(step, job_type, payload) |
blank step key, blank job type, duplicate step key | job type registration is not checked by this builder |
.after_success(step, prerequisites) / .after_terminal(...) |
blank target step key, blank prerequisite step key, unknown target step | missing prerequisite step, self-dependency, duplicate dependency, cycle |
.idempotency_key(...) |
never | blank idempotency key |
The target of .after_success(...) or .after_terminal(...) must already have
been added with .job(...). Prerequisite steps may be added later in the chain,
as long as every referenced step exists before .build() succeeds.
See runledger-postgres/examples/workflow_dag.rs for a compile-checked example that shows a fan-out/fan-in DAG.
Worker Binary
Downstream services commonly run a web/API process and a separate worker process against the same PostgreSQL database. The web process enqueues jobs and workflows. The worker process registers handlers and runs the supervisor:
use Duration;
use ;
use async_trait;
use Supervisor;
use JobCatalog;
use JobsConfig;
use JobHandler;
use Value;
use PgPoolOptions;
;
async
Treat a run_until_shutdown(), shutdown(), or shutdown_with_timeout() error
as fatal for the worker process: it means a supervised loop panicked, exited
cleanly before shutdown was requested, or did not observe shutdown within the
process deadline. run_until_shutdown() is the preferred method for worker
binaries because it observes internal task failures while still applying a
shutdown deadline; when it times out, remaining supervised tasks are aborted
and in-flight handler futures are dropped. Size the shutdown timeout to cover
handler drain time, worker pool concurrency, and database capacity. A useful
starting point is your per-handler high-percentile latency under
JobsConfig::max_global_concurrency. The worker example stores the shutdown
result before closing the pool so cleanup still runs when shutdown reports an error.
For schedules and workflows, build persistence inputs from the same catalog so
job types stay aligned with synced job_definitions and registered handlers:
use ;
let payload = json!;
let schedule = catalog.job_schedule?;
upsert_job_schedule.await?;
Catalog sync owns the definition fields it writes: version, retry limits,
timeout, and priority are restored to catalog defaults on each startup sync.
Enabled catalog sync preserves an existing disabled row so operator pauses
survive worker restarts; a catalog with enabled(false) explicitly disables its
registered definitions. sync_definitions is additive: removed catalog entries
are not deleted or disabled. Use sync_definitions_exact with a
JobCatalogSyncScope when deployment startup should also disable enabled
job_definitions rows that are absent from the catalog but inside an explicit
owned job-type set. Exact sync returns the disabled job types and refuses to
disable definitions while active schedules still reference them. Unlike additive
sync, exact sync restores catalog entries' enabled state from catalog defaults.
Catalog helper builders validate catalog membership and catalog defaults only;
operator-disabled database rows are enforced later by persistence APIs such as
job enqueue, schedule materialization, and workflow enqueue.
Lower-level JobEnqueue, JobScheduleUpsert, WorkflowDagBuilder, and
WorkflowStepEnqueueBuilder APIs remain available when you do not use a catalog.
Additional compile-checked integration examples:
Repository Layout
.
├── Cargo.toml
├── README.md
├── migrations/
├── runledger-core/
├── runledger-postgres/
├── runledger-runtime/
└── runledger-test-support/
Development Notes
- Prefer keeping contracts in
runledger-core, runtime orchestration inrunledger-runtime, and SQL/state-machine logic inrunledger-postgres. - Treat the migration set as the canonical persisted contract for queue and workflow behavior.
- When schema semantics change, update Rust types, SQL, tests, and
.sqlxmetadata together. - The repo may compile offline, but DB-backed behavior still needs migration-compatible PostgreSQL for execution.
License
No license file is included in this extraction. Add one at the repository root if this workspace is intended for redistribution or open-source use.