Skip to main content

runledger_postgres/
lib.rs

1//! PostgreSQL persistence layer for Runledger durable execution.
2//!
3//! This crate owns the SQLx-backed storage and query helpers used by the
4//! runtime and application crates. The main entrypoint is the [`jobs`] module,
5//! which exposes APIs for:
6//! - queueing, claiming, heartbeating, and completing jobs
7//! - listing admin/job log data and runtime configuration
8//! - enqueueing and querying workflow runs and steps
9//! - applying or validating the bundled Runledger schema migrations
10//!
11//! Typical consumers share a [`DbPool`] with `runledger-runtime`, then call the
12//! exported [`jobs`] functions from application setup, admin APIs, or tests.
13//!
14//! # Security Boundary
15//!
16//! This crate is a persistence layer, not an authentication or authorization
17//! layer. Public job and workflow APIs that accept `organization_id`,
18//! idempotency keys, workflow IDs, job IDs, or metadata expect those values to
19//! come from a trusted service boundary. HTTP or RPC handlers should derive
20//! organization scope from authenticated claims or server-side policy, not from
21//! untrusted request parameters alone.
22//!
23//! [`QueryError::client_message`] and [`QueryError::code`] are the stable values
24//! intended for public error responses. Detailed internal context remains
25//! available through [`QueryError::internal_message`] for server-side diagnostics.
26//! Public formatting keeps raw SQLx details sanitized. The standard error source
27//! chain and [`QueryError::source_arc`] are available for trusted server-side
28//! diagnostics.
29//! Runtime lifecycle, workflow mutation, and idempotent enqueue APIs are designed
30//! for PostgreSQL's default `READ COMMITTED` transaction isolation so they can
31//! observe rows committed after lock waits or uniqueness conflicts.
32//! Release-sensitive, workflow-append, and keyed-enqueue paths validate this
33//! before running because their correctness depends on second reads after waits
34//! or conflicts.
35//!
36//! For simple embedding, call [`migrate_after_idempotency_cutover`] during
37//! startup:
38//!
39//! ```rust,no_run
40//! # async fn demo() -> Result<(), Box<dyn std::error::Error>> {
41//! let pool = sqlx::PgPool::connect("postgres://localhost/runledger").await?;
42//! runledger_postgres::migrate_after_idempotency_cutover(&pool).await?;
43//! # Ok(())
44//! # }
45//! ```
46//!
47//! For deployments that manage DDL elsewhere, call
48//! [`ensure_schema_compatible_after_idempotency_cutover`] instead to fail fast
49//! if the schema is missing, drifted, or still has keyed legacy rows without
50//! idempotency request snapshots. That check is read-only, but it expects the
51//! database to retain SQLx migration history in `_sqlx_migrations` and, when
52//! available, Runledger-owned migration state in `runledger_migration_history`.
53//!
54//! # Copy-Paste Examples
55//!
56//! - [Enqueue one job](https://github.com/featherenvy/runledger/blob/master/runledger-postgres/examples/enqueue_job.rs)
57//! - [Enqueue a workflow DAG](https://github.com/featherenvy/runledger/blob/master/runledger-postgres/examples/workflow_dag.rs)
58//! - [Use an external workflow gate](https://github.com/featherenvy/runledger/blob/master/runledger-postgres/examples/external_gate.rs)
59//! - [Create a scheduled job entrypoint](https://github.com/featherenvy/runledger/blob/master/runledger-postgres/examples/schedule_job.rs)
60//!
61//! Import `runledger_runtime::prelude::*` and use
62//! [the worker binary example](https://github.com/featherenvy/runledger/blob/master/runledger-runtime/examples/worker_binary.rs)
63//! when adding a worker process for the jobs and workflows enqueued through this
64//! crate.
65//!
66//! # Prelude
67//!
68//! ```rust
69//! use runledger_core::prelude::*;
70//! use runledger_postgres::prelude::*;
71//! ```
72//!
73//! The PostgreSQL prelude exports persistence functions and record/input types.
74//! It intentionally does not re-export core contract types, so import
75//! `runledger_core::prelude::*` beside it when building job or workflow inputs.
76//!
77//! # Enqueue One Job
78//!
79//! Use direct job enqueue for one independent retried unit of work.
80//!
81//! ```rust,no_run
82//! # async fn demo(pool: runledger_postgres::DbPool) -> Result<(), Box<dyn std::error::Error>> {
83//! use runledger_core::prelude::*;
84//! use runledger_postgres::prelude::*;
85//!
86//! let payload = serde_json::json!({"email_id": "email_123"});
87//! let job = JobEnqueue {
88//!     job_type: JobType::new("jobs.email.send"),
89//!     organization_id: None,
90//!     payload: &payload,
91//!     priority: None,
92//!     max_attempts: None,
93//!     timeout_seconds: None,
94//!     next_run_at: None,
95//!     idempotency_key: Some("email:email_123:send"),
96//!     stage: None,
97//! };
98//!
99//! let _job_id = enqueue_job(&pool, &job).await?;
100//! # Ok(())
101//! # }
102//! ```
103//!
104//! # Create A Scheduled Job Entrypoint
105//!
106//! Use [`jobs::JobScheduleUpsert`] to create or update the cron row consumed by
107//! the runtime scheduler. Schedules are UTC-only. Updating an existing schedule
108//! refreshes its definition while preserving `is_active` and `organization_id`;
109//! `next_fire_at` is refreshed when `cron_expr` changes. Cron expressions are
110//! validated with the same parser used by `runledger-runtime`. Use
111//! [`jobs::set_job_schedule_active`] to pause or resume a schedule, and
112//! [`jobs::set_job_schedule_next_fire_at`] to manually retime its cursor.
113//!
114//! ```rust,no_run
115//! # async fn demo(pool: runledger_postgres::DbPool) -> Result<(), Box<dyn std::error::Error>> {
116//! use chrono::Utc;
117//! use runledger_core::prelude::*;
118//! use runledger_postgres::prelude::*;
119//!
120//! let payload_template = serde_json::json!({"source": "api"});
121//! let schedule = JobScheduleUpsert {
122//!     name: "profile-refresh-hourly",
123//!     job_type: JobType::new("profiles.refresh"),
124//!     organization_id: None,
125//!     payload_template: &payload_template,
126//!     cron_expr: "0 0 * * * *",
127//!     is_active: true,
128//!     next_fire_at: Utc::now(),
129//!     max_jitter_seconds: 0,
130//! };
131//!
132//! let _schedule = upsert_job_schedule(&pool, &schedule).await?;
133//! # Ok(())
134//! # }
135//! ```
136//!
137//! # Enqueue A Workflow DAG
138//!
139//! Use workflows when the work has step dependencies, fan-out/fan-in, external
140//! gates, cancellation as one logical run, or workflow-level idempotency.
141//!
142//! ```rust,no_run
143//! # async fn demo(pool: runledger_postgres::DbPool) -> Result<(), Box<dyn std::error::Error>> {
144//! use runledger_core::prelude::*;
145//! use runledger_postgres::prelude::*;
146//!
147//! let crawl_payload = serde_json::json!({"profile_id": "p_123"});
148//! let classify_payload = serde_json::json!({"profile_id": "p_123"});
149//! let metadata = serde_json::json!({"source": "api"});
150//!
151//! let run = WorkflowDagBuilder::new("profiles.research", &metadata)
152//!     .idempotency_key("profile:p_123:research")
153//!     .job("crawl", "profiles.crawl", &crawl_payload)?
154//!     .job("classify", "profiles.classify", &classify_payload)?
155//!     .after_success("classify", ["crawl"])?
156//!     .build()?;
157//!
158//! let _workflow_run = enqueue_workflow_run(&pool, &run).await?;
159//! # Ok(())
160//! # }
161//! ```
162//!
163//! # Use An External Workflow Gate
164//!
165//! Create the gate with `WorkflowStepEnqueueBuilder::new_external`, then
166//! complete it from a trusted service boundary when the external condition is
167//! known.
168//!
169//! ```rust,no_run
170//! # async fn demo(
171//! #     pool: runledger_postgres::DbPool,
172//! #     workflow_run_id: sqlx::types::Uuid,
173//! # ) -> Result<(), Box<dyn std::error::Error>> {
174//! use runledger_core::prelude::*;
175//! use runledger_postgres::prelude::*;
176//!
177//! let input = CompleteExternalWorkflowStepInput {
178//!     workflow_run_id,
179//!     organization_id: None,
180//!     step_key: StepKey::new("approval"),
181//!     terminal_status: WorkflowStepStatus::Succeeded,
182//!     status_reason: Some("approved"),
183//!     last_error_code: None,
184//!     last_error_message: None,
185//! };
186//!
187//! let _step = complete_external_workflow_step(&pool, &input).await?;
188//! # Ok(())
189//! # }
190//! ```
191//!
192//! # Inspect Workflow State
193//!
194//! ```rust,no_run
195//! # async fn demo(
196//! #     pool: runledger_postgres::DbPool,
197//! #     workflow_run_id: sqlx::types::Uuid,
198//! # ) -> Result<(), Box<dyn std::error::Error>> {
199//! use runledger_postgres::prelude::*;
200//!
201//! let _run = get_workflow_run_by_id(&pool, None, workflow_run_id).await?;
202//! let _steps = list_workflow_steps(&pool, None, workflow_run_id).await?;
203//! let _dependencies = list_workflow_step_dependencies(&pool, None, workflow_run_id).await?;
204//! # Ok(())
205//! # }
206//! ```
207//!
208//! # Handle Errors Safely
209//!
210//! `QueryError::client_message` and `QueryError::code` are safe for public
211//! responses. `QueryError::internal_message` is for trusted diagnostics.
212//!
213//! ```rust,no_run
214//! # async fn demo(
215//! #     pool: runledger_postgres::DbPool,
216//! #     job: runledger_postgres::jobs::JobEnqueue<'_>,
217//! # ) -> Result<(), runledger_postgres::Error> {
218//! match runledger_postgres::jobs::enqueue_job(&pool, &job).await {
219//!     Ok(_job_id) => {}
220//!     Err(runledger_postgres::Error::QueryError(query_error)) => {
221//!         let _public_code = query_error.code();
222//!         let _public_message = query_error.client_message();
223//!         let _private_diagnostic = query_error.internal_message();
224//!     }
225//!     Err(error) => return Err(error),
226//! }
227//! # Ok(())
228//! # }
229//! ```
230
231use std::fmt;
232
233mod error;
234pub mod jobs;
235mod migrations;
236
237pub use error::{
238    FrameworkConstraintSpec, QueryError, QueryErrorCategory, classify_framework_constraint,
239    classify_query_error, classify_query_error_with_constraint_classifier,
240    has_framework_constraint_classifier,
241};
242pub use migrations::{
243    MIGRATOR, SchemaCompatibilityError, ensure_schema_compatible_after_idempotency_cutover,
244    migrate_after_idempotency_cutover,
245};
246#[allow(deprecated)]
247pub use migrations::{ensure_schema_compatible, migrate};
248
249/// Common `runledger-postgres` imports for integration crates.
250///
251/// This prelude contains persistence APIs, DB types, and database record/input
252/// structs. It avoids generic `Result` or `Error` aliases and does not re-export
253/// `runledger-core` contracts, so it can be glob-imported alongside
254/// `runledger_core::prelude::*` and `runledger_runtime::prelude::*`.
255pub mod prelude {
256    pub use crate::jobs::{
257        AppendWorkflowStepsInput, AppendWorkflowStepsOutcome, AppendWorkflowStepsResult,
258        CompleteExternalWorkflowStepInput, JobDefinitionListFilter, JobDefinitionRecord,
259        JobDefinitionUpdate, JobDefinitionUpsert, JobEnqueue, JobEventRecord, JobFailureUpdate,
260        JobListFilter, JobLogRecord, JobLogRecordInput, JobMetricsRecord, JobProgressUpdate,
261        JobQueueRecord, JobRuntimeConfigListFilter, JobRuntimeConfigRecord, JobRuntimeConfigUpsert,
262        JobScheduleRecord, JobScheduleUpsert, ReapExpiredLeasesResult, ReapedTerminalLeaseRecord,
263        WorkflowRunDbRecord, WorkflowRunListFilter, WorkflowStepDbRecord,
264        WorkflowStepDependencyDbRecord, append_workflow_steps, append_workflow_steps_tx,
265        cancel_job, cancel_workflow_run_tx, complete_external_workflow_step,
266        complete_external_workflow_step_tx, complete_job_failure, complete_job_success,
267        enqueue_job, enqueue_job_tx, enqueue_workflow_run, enqueue_workflow_run_tx, get_job_by_id,
268        get_job_definition_by_type, get_job_metrics, get_job_payload_by_idempotency_key,
269        get_job_runtime_config_by_type, get_latest_job_payload_for_run,
270        get_latest_workflow_run_by_type, get_required_job_runtime_config_by_type,
271        get_workflow_run_by_id, get_workflow_run_by_type_and_idempotency_key,
272        get_workflow_run_id_for_job, insert_job_definition_if_missing_tx, insert_job_log,
273        insert_job_runtime_config_if_missing, list_job_definitions, list_job_events, list_job_logs,
274        list_job_runtime_configs, list_jobs, list_workflow_runs, list_workflow_step_dependencies,
275        list_workflow_steps, requeue_job, set_job_schedule_active, set_job_schedule_active_tx,
276        set_job_schedule_next_fire_at, set_job_schedule_next_fire_at_tx, update_job_definition,
277        update_job_payload_uuid_array_field, update_workflow_step_and_pending_job_payload_tx,
278        upsert_job_definition_tx, upsert_job_runtime_config, upsert_job_runtime_config_tx,
279        upsert_job_schedule, upsert_job_schedule_tx,
280    };
281    pub use crate::{
282        DbPool, DbTx, FrameworkConstraintSpec, MIGRATOR, QueryError, QueryErrorCategory,
283        SchemaCompatibilityError, ensure_schema_compatible_after_idempotency_cutover,
284        migrate_after_idempotency_cutover,
285    };
286}
287
288pub type DbPool = sqlx::PgPool;
289pub type DbTx<'a> = sqlx::Transaction<'a, sqlx::Postgres>;
290pub type Result<T> = std::result::Result<T, Error>;
291
292#[derive(Debug)]
293pub enum Error {
294    ConfigError(String),
295    ConnectionError(String),
296    MigrationError(String),
297    QueryError(QueryError),
298}
299
300impl fmt::Display for Error {
301    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
302        match self {
303            Self::ConfigError(message) => write!(f, "{message}"),
304            Self::ConnectionError(message) => write!(f, "{message}"),
305            Self::MigrationError(message) => write!(f, "{message}"),
306            Self::QueryError(query_error) => write!(f, "{query_error}"),
307        }
308    }
309}
310
311impl std::error::Error for Error {
312    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
313        match self {
314            Self::QueryError(query_error) => Some(query_error),
315            Self::ConfigError(_) | Self::ConnectionError(_) | Self::MigrationError(_) => None,
316        }
317    }
318}
319
320impl Error {
321    #[must_use]
322    pub fn from_query_sqlx(error: sqlx::Error) -> Self {
323        Self::QueryError(QueryError::from_sqlx(error, None))
324    }
325
326    #[must_use]
327    pub fn from_query_sqlx_with_context(context: &str, error: sqlx::Error) -> Self {
328        Self::QueryError(QueryError::from_sqlx(error, Some(context)))
329    }
330}