Expand description
PostgreSQL persistence layer for Runledger durable execution.
This crate owns the SQLx-backed storage and query helpers used by the
runtime and application crates. The main entrypoint is the jobs module,
which exposes APIs for:
- queueing, claiming, heartbeating, and completing jobs
- listing admin/job log data and runtime configuration
- enqueueing and querying workflow runs and steps
- applying or validating the bundled Runledger schema migrations
Typical consumers share a DbPool with runledger-runtime, then call the
exported jobs functions from application setup, admin APIs, or tests.
§Security Boundary
This crate is a persistence layer, not an authentication or authorization
layer. Public job and workflow APIs that accept organization_id,
idempotency keys, workflow IDs, job IDs, or metadata expect those values to
come from a trusted service boundary. HTTP or RPC handlers should derive
organization scope from authenticated claims or server-side policy, not from
untrusted request parameters alone.
QueryError::client_message and QueryError::code are the stable values
intended for public error responses. Detailed internal context remains
available through QueryError::internal_message for server-side diagnostics.
Public formatting keeps raw SQLx details sanitized. The standard error source
chain and QueryError::source_arc are available for trusted server-side
diagnostics.
Runtime lifecycle, workflow mutation, and idempotent enqueue APIs are designed
for PostgreSQL’s default READ COMMITTED transaction isolation so they can
observe rows committed after lock waits or uniqueness conflicts.
Release-sensitive, workflow-append, and keyed-enqueue paths validate this
before running because their correctness depends on second reads after waits
or conflicts.
For simple embedding, call migrate_after_idempotency_cutover during
startup:
let pool = sqlx::PgPool::connect("postgres://localhost/runledger").await?;
runledger_postgres::migrate_after_idempotency_cutover(&pool).await?;For deployments that manage DDL elsewhere, call
ensure_schema_compatible_after_idempotency_cutover instead to fail fast
if the schema is missing, drifted, or still has keyed legacy rows without
idempotency request snapshots. That check is read-only, but it expects the
database to retain SQLx migration history in _sqlx_migrations and, when
available, Runledger-owned migration state in runledger_migration_history.
§Copy-Paste Examples
- Enqueue one job
- Enqueue a workflow DAG
- Use an external workflow gate
- Create a scheduled job entrypoint
Import runledger_runtime::prelude::* and use
the worker binary example
when adding a worker process for the jobs and workflows enqueued through this
crate.
§Prelude
use runledger_core::prelude::*;
use runledger_postgres::prelude::*;The PostgreSQL prelude exports persistence functions and record/input types.
It intentionally does not re-export core contract types, so import
runledger_core::prelude::* beside it when building job or workflow inputs.
§Enqueue One Job
Use direct job enqueue for one independent retried unit of work.
use runledger_core::prelude::*;
use runledger_postgres::prelude::*;
let payload = serde_json::json!({"email_id": "email_123"});
let job = JobEnqueue {
job_type: JobType::new("jobs.email.send"),
organization_id: None,
payload: &payload,
priority: None,
max_attempts: None,
timeout_seconds: None,
next_run_at: None,
idempotency_key: Some("email:email_123:send"),
stage: None,
};
let _job_id = enqueue_job(&pool, &job).await?;§Create A Scheduled Job Entrypoint
Use jobs::JobScheduleUpsert to create or update the cron row consumed by
the runtime scheduler. Schedules are UTC-only. Updating an existing schedule
refreshes its definition while preserving is_active and organization_id;
next_fire_at is refreshed when cron_expr changes. Cron expressions are
validated with the same parser used by runledger-runtime. Use
jobs::set_job_schedule_active to pause or resume a schedule, and
jobs::set_job_schedule_next_fire_at to manually retime its cursor.
use chrono::Utc;
use runledger_core::prelude::*;
use runledger_postgres::prelude::*;
let payload_template = serde_json::json!({"source": "api"});
let schedule = JobScheduleUpsert {
name: "profile-refresh-hourly",
job_type: JobType::new("profiles.refresh"),
organization_id: None,
payload_template: &payload_template,
cron_expr: "0 0 * * * *",
is_active: true,
next_fire_at: Utc::now(),
max_jitter_seconds: 0,
};
let _schedule = upsert_job_schedule(&pool, &schedule).await?;§Enqueue A Workflow DAG
Use workflows when the work has step dependencies, fan-out/fan-in, external gates, cancellation as one logical run, or workflow-level idempotency.
use runledger_core::prelude::*;
use runledger_postgres::prelude::*;
let crawl_payload = serde_json::json!({"profile_id": "p_123"});
let classify_payload = serde_json::json!({"profile_id": "p_123"});
let metadata = serde_json::json!({"source": "api"});
let run = WorkflowDagBuilder::new("profiles.research", &metadata)
.idempotency_key("profile:p_123:research")
.job("crawl", "profiles.crawl", &crawl_payload)?
.job("classify", "profiles.classify", &classify_payload)?
.after_success("classify", ["crawl"])?
.build()?;
let _workflow_run = enqueue_workflow_run(&pool, &run).await?;§Use An External Workflow Gate
Create the gate with WorkflowStepEnqueueBuilder::new_external, then
complete it from a trusted service boundary when the external condition is
known.
use runledger_core::prelude::*;
use runledger_postgres::prelude::*;
let input = CompleteExternalWorkflowStepInput {
workflow_run_id,
organization_id: None,
step_key: StepKey::new("approval"),
terminal_status: WorkflowStepStatus::Succeeded,
status_reason: Some("approved"),
last_error_code: None,
last_error_message: None,
};
let _step = complete_external_workflow_step(&pool, &input).await?;§Inspect Workflow State
use runledger_postgres::prelude::*;
let _run = get_workflow_run_by_id(&pool, None, workflow_run_id).await?;
let _steps = list_workflow_steps(&pool, None, workflow_run_id).await?;
let _dependencies = list_workflow_step_dependencies(&pool, None, workflow_run_id).await?;§Handle Errors Safely
QueryError::client_message and QueryError::code are safe for public
responses. QueryError::internal_message is for trusted diagnostics.
match runledger_postgres::jobs::enqueue_job(&pool, &job).await {
Ok(_job_id) => {}
Err(runledger_postgres::Error::QueryError(query_error)) => {
let _public_code = query_error.code();
let _public_message = query_error.client_message();
let _private_diagnostic = query_error.internal_message();
}
Err(error) => return Err(error),
}Modules§
- jobs
- Durable job, schedule, workflow, and admin persistence APIs.
- prelude
- Common
runledger-postgresimports for integration crates.
Structs§
Enums§
Statics§
Functions§
- classify_
framework_ constraint - classify_
query_ error - classify_
query_ error_ with_ constraint_ classifier - ensure_
schema_ compatible Deprecated - Validate that the target database’s SQLx migration history matches the bundled Runledger migrations.
- ensure_
schema_ compatible_ after_ idempotency_ cutover - Validate that the target database’s SQLx migration history matches the bundled Runledger migrations.
- has_
framework_ constraint_ classifier - migrate
Deprecated - Apply the bundled Runledger schema migrations to a PostgreSQL pool.
- migrate_
after_ idempotency_ cutover - Apply the bundled Runledger schema migrations to a PostgreSQL pool, then enforce the idempotency snapshot cutover.