Skip to main content

Crate runledger_postgres

Crate runledger_postgres 

Source
Expand description

PostgreSQL persistence layer for Runledger durable execution.

This crate owns the SQLx-backed storage and query helpers used by the runtime and application crates. The main entrypoint is the jobs module, which exposes APIs for:

  • queueing, claiming, heartbeating, and completing jobs
  • listing admin/job log data and runtime configuration
  • enqueueing and querying workflow runs and steps
  • applying or validating the bundled Runledger schema migrations

Typical consumers share a DbPool with runledger-runtime, then call the exported jobs functions from application setup, admin APIs, or tests.

§Security Boundary

This crate is a persistence layer, not an authentication or authorization layer. Public job and workflow APIs that accept organization_id, idempotency keys, workflow IDs, job IDs, or metadata expect those values to come from a trusted service boundary. HTTP or RPC handlers should derive organization scope from authenticated claims or server-side policy, not from untrusted request parameters alone.

QueryError::client_message and QueryError::code are the stable values intended for public error responses. Detailed internal context remains available through QueryError::internal_message for server-side diagnostics. Public formatting keeps raw SQLx details sanitized. The standard error source chain and QueryError::source_arc are available for trusted server-side diagnostics. Runtime lifecycle, workflow mutation, and idempotent enqueue APIs are designed for PostgreSQL’s default READ COMMITTED transaction isolation so they can observe rows committed after lock waits or uniqueness conflicts. Release-sensitive, workflow-append, and keyed-enqueue paths validate this before running because their correctness depends on second reads after waits or conflicts.

For simple embedding, call migrate_after_idempotency_cutover during startup:

let pool = sqlx::PgPool::connect("postgres://localhost/runledger").await?;
runledger_postgres::migrate_after_idempotency_cutover(&pool).await?;

For deployments that manage DDL elsewhere, call ensure_schema_compatible_after_idempotency_cutover instead to fail fast if the schema is missing, drifted, or still has keyed legacy rows without idempotency request snapshots. That check is read-only, but it expects the database to retain SQLx migration history in _sqlx_migrations and, when available, Runledger-owned migration state in runledger_migration_history.

§Copy-Paste Examples

Import runledger_runtime::prelude::* and use the worker binary example when adding a worker process for the jobs and workflows enqueued through this crate.

§Prelude

use runledger_core::prelude::*;
use runledger_postgres::prelude::*;

The PostgreSQL prelude exports persistence functions and record/input types. It intentionally does not re-export core contract types, so import runledger_core::prelude::* beside it when building job or workflow inputs.

§Enqueue One Job

Use direct job enqueue for one independent retried unit of work.

use runledger_core::prelude::*;
use runledger_postgres::prelude::*;

let payload = serde_json::json!({"email_id": "email_123"});
let job = JobEnqueue {
    job_type: JobType::new("jobs.email.send"),
    organization_id: None,
    payload: &payload,
    priority: None,
    max_attempts: None,
    timeout_seconds: None,
    next_run_at: None,
    idempotency_key: Some("email:email_123:send"),
    stage: None,
};

let _job_id = enqueue_job(&pool, &job).await?;

§Create A Scheduled Job Entrypoint

Use jobs::JobScheduleUpsert to create or update the cron row consumed by the runtime scheduler. Schedules are UTC-only. Updating an existing schedule refreshes its definition while preserving is_active and organization_id; next_fire_at is refreshed when cron_expr changes. Cron expressions are validated with the same parser used by runledger-runtime. Use jobs::set_job_schedule_active to pause or resume a schedule, and jobs::set_job_schedule_next_fire_at to manually retime its cursor.

use chrono::Utc;
use runledger_core::prelude::*;
use runledger_postgres::prelude::*;

let payload_template = serde_json::json!({"source": "api"});
let schedule = JobScheduleUpsert {
    name: "profile-refresh-hourly",
    job_type: JobType::new("profiles.refresh"),
    organization_id: None,
    payload_template: &payload_template,
    cron_expr: "0 0 * * * *",
    is_active: true,
    next_fire_at: Utc::now(),
    max_jitter_seconds: 0,
};

let _schedule = upsert_job_schedule(&pool, &schedule).await?;

§Enqueue A Workflow DAG

Use workflows when the work has step dependencies, fan-out/fan-in, external gates, cancellation as one logical run, or workflow-level idempotency.

use runledger_core::prelude::*;
use runledger_postgres::prelude::*;

let crawl_payload = serde_json::json!({"profile_id": "p_123"});
let classify_payload = serde_json::json!({"profile_id": "p_123"});
let metadata = serde_json::json!({"source": "api"});

let run = WorkflowDagBuilder::new("profiles.research", &metadata)
    .idempotency_key("profile:p_123:research")
    .job("crawl", "profiles.crawl", &crawl_payload)?
    .job("classify", "profiles.classify", &classify_payload)?
    .after_success("classify", ["crawl"])?
    .build()?;

let _workflow_run = enqueue_workflow_run(&pool, &run).await?;

§Use An External Workflow Gate

Create the gate with WorkflowStepEnqueueBuilder::new_external, then complete it from a trusted service boundary when the external condition is known.

use runledger_core::prelude::*;
use runledger_postgres::prelude::*;

let input = CompleteExternalWorkflowStepInput {
    workflow_run_id,
    organization_id: None,
    step_key: StepKey::new("approval"),
    terminal_status: WorkflowStepStatus::Succeeded,
    status_reason: Some("approved"),
    last_error_code: None,
    last_error_message: None,
};

let _step = complete_external_workflow_step(&pool, &input).await?;

§Inspect Workflow State

use runledger_postgres::prelude::*;

let _run = get_workflow_run_by_id(&pool, None, workflow_run_id).await?;
let _steps = list_workflow_steps(&pool, None, workflow_run_id).await?;
let _dependencies = list_workflow_step_dependencies(&pool, None, workflow_run_id).await?;

§Handle Errors Safely

QueryError::client_message and QueryError::code are safe for public responses. QueryError::internal_message is for trusted diagnostics.

match runledger_postgres::jobs::enqueue_job(&pool, &job).await {
    Ok(_job_id) => {}
    Err(runledger_postgres::Error::QueryError(query_error)) => {
        let _public_code = query_error.code();
        let _public_message = query_error.client_message();
        let _private_diagnostic = query_error.internal_message();
    }
    Err(error) => return Err(error),
}

Modules§

jobs
Durable job, schedule, workflow, and admin persistence APIs.
prelude
Common runledger-postgres imports for integration crates.

Structs§

FrameworkConstraintSpec
QueryError

Enums§

Error
QueryErrorCategory
SchemaCompatibilityError

Statics§

MIGRATOR

Functions§

classify_framework_constraint
classify_query_error
classify_query_error_with_constraint_classifier
ensure_schema_compatibleDeprecated
Validate that the target database’s SQLx migration history matches the bundled Runledger migrations.
ensure_schema_compatible_after_idempotency_cutover
Validate that the target database’s SQLx migration history matches the bundled Runledger migrations.
has_framework_constraint_classifier
migrateDeprecated
Apply the bundled Runledger schema migrations to a PostgreSQL pool.
migrate_after_idempotency_cutover
Apply the bundled Runledger schema migrations to a PostgreSQL pool, then enforce the idempotency snapshot cutover.

Type Aliases§

DbPool
DbTx
Result