runledger_postgres/lib.rs
1//! PostgreSQL persistence layer for Runledger durable execution.
2//!
3//! This crate owns the SQLx-backed storage and query helpers used by the
4//! runtime and application crates. The main entrypoint is the [`jobs`] module,
5//! which exposes APIs for:
6//! - queueing, claiming, heartbeating, and completing jobs
7//! - listing admin/job log data and runtime configuration
8//! - enqueueing and querying workflow runs and steps
9//! - applying or validating the bundled Runledger schema migrations
10//!
11//! Typical consumers share a [`DbPool`] with `runledger-runtime`, then call the
12//! exported [`jobs`] functions from application setup, admin APIs, or tests.
13//!
14//! # Security Boundary
15//!
16//! This crate is a persistence layer, not an authentication or authorization
17//! layer. Public job and workflow APIs that accept `organization_id`,
18//! idempotency keys, workflow IDs, job IDs, or metadata expect those values to
19//! come from a trusted service boundary. HTTP or RPC handlers should derive
20//! organization scope from authenticated claims or server-side policy, not from
21//! untrusted request parameters alone.
22//!
23//! [`QueryError::client_message`] and [`QueryError::code`] are the stable values
24//! intended for public error responses. Detailed internal context remains
25//! available through [`QueryError::internal_message`] for server-side diagnostics.
26//! Public formatting keeps raw SQLx details sanitized. The standard error source
27//! chain and [`QueryError::source_arc`] are available for trusted server-side
28//! diagnostics.
29//! Runtime lifecycle, workflow mutation, and idempotent enqueue APIs are designed
30//! for PostgreSQL's default `READ COMMITTED` transaction isolation so they can
31//! observe rows committed after lock waits or uniqueness conflicts.
32//! Release-sensitive, workflow-append, and keyed-enqueue paths validate this
33//! before running because their correctness depends on second reads after waits
34//! or conflicts.
35//!
36//! For simple embedding, call [`migrate_after_idempotency_cutover`] during
37//! startup:
38//!
39//! ```rust,no_run
40//! # async fn demo() -> Result<(), Box<dyn std::error::Error>> {
41//! let pool = sqlx::PgPool::connect("postgres://localhost/runledger").await?;
42//! runledger_postgres::migrate_after_idempotency_cutover(&pool).await?;
43//! # Ok(())
44//! # }
45//! ```
46//!
47//! For deployments that manage DDL elsewhere, call
48//! [`ensure_schema_compatible_after_idempotency_cutover`] instead to fail fast
49//! if the schema is missing, drifted, or still has keyed legacy rows without
50//! idempotency request snapshots. That check is read-only, but it expects the
51//! database to retain SQLx migration history in `_sqlx_migrations` and, when
52//! available, Runledger-owned migration state in `runledger_migration_history`.
53//!
54//! # Copy-Paste Examples
55//!
56//! - [Enqueue one job](https://github.com/featherenvy/runledger/blob/master/runledger-postgres/examples/enqueue_job.rs)
57//! - [Enqueue a workflow DAG](https://github.com/featherenvy/runledger/blob/master/runledger-postgres/examples/workflow_dag.rs)
58//! - [Use an external workflow gate](https://github.com/featherenvy/runledger/blob/master/runledger-postgres/examples/external_gate.rs)
59//! - [Create a scheduled job entrypoint](https://github.com/featherenvy/runledger/blob/master/runledger-postgres/examples/schedule_job.rs)
60//!
61//! Import `runledger_runtime::prelude::*` and use
62//! [the worker binary example](https://github.com/featherenvy/runledger/blob/master/runledger-runtime/examples/worker_binary.rs)
63//! when adding a worker process for the jobs and workflows enqueued through this
64//! crate.
65//!
66//! # Prelude
67//!
68//! ```rust
69//! use runledger_core::prelude::*;
70//! use runledger_postgres::prelude::*;
71//! ```
72//!
73//! The PostgreSQL prelude exports persistence functions and record/input types.
74//! It intentionally does not re-export core contract types, so import
75//! `runledger_core::prelude::*` beside it when building job or workflow inputs.
76//!
77//! # Enqueue One Job
78//!
79//! Use direct job enqueue for one independent retried unit of work.
80//!
81//! ```rust,no_run
82//! # async fn demo(pool: runledger_postgres::DbPool) -> Result<(), Box<dyn std::error::Error>> {
83//! use runledger_core::prelude::*;
84//! use runledger_postgres::prelude::*;
85//!
86//! let payload = serde_json::json!({"email_id": "email_123"});
87//! let job = JobEnqueue {
88//! job_type: JobType::new("jobs.email.send"),
89//! organization_id: None,
90//! payload: &payload,
91//! priority: None,
92//! max_attempts: None,
93//! timeout_seconds: None,
94//! next_run_at: None,
95//! idempotency_key: Some("email:email_123:send"),
96//! stage: None,
97//! };
98//!
99//! let _job_id = enqueue_job(&pool, &job).await?;
100//! # Ok(())
101//! # }
102//! ```
103//!
104//! # Create A Scheduled Job Entrypoint
105//!
106//! Use [`jobs::JobScheduleUpsert`] to create or update the cron row consumed by
107//! the runtime scheduler. Schedules are UTC-only. Updating an existing schedule
108//! refreshes its definition while preserving `is_active` and `organization_id`;
109//! `next_fire_at` is refreshed when `cron_expr` changes. Cron expressions are
110//! validated with the same parser used by `runledger-runtime`. Use
111//! [`jobs::set_job_schedule_active`] to pause or resume a schedule, and
112//! [`jobs::set_job_schedule_next_fire_at`] to manually retime its cursor.
113//!
114//! ```rust,no_run
115//! # async fn demo(pool: runledger_postgres::DbPool) -> Result<(), Box<dyn std::error::Error>> {
116//! use chrono::Utc;
117//! use runledger_core::prelude::*;
118//! use runledger_postgres::prelude::*;
119//!
120//! let payload_template = serde_json::json!({"source": "api"});
121//! let schedule = JobScheduleUpsert {
122//! name: "profile-refresh-hourly",
123//! job_type: JobType::new("profiles.refresh"),
124//! organization_id: None,
125//! payload_template: &payload_template,
126//! cron_expr: "0 0 * * * *",
127//! is_active: true,
128//! next_fire_at: Utc::now(),
129//! max_jitter_seconds: 0,
130//! };
131//!
132//! let _schedule = upsert_job_schedule(&pool, &schedule).await?;
133//! # Ok(())
134//! # }
135//! ```
136//!
137//! # Enqueue A Workflow DAG
138//!
139//! Use workflows when the work has step dependencies, fan-out/fan-in, external
140//! gates, cancellation as one logical run, or workflow-level idempotency.
141//!
142//! ```rust,no_run
143//! # async fn demo(pool: runledger_postgres::DbPool) -> Result<(), Box<dyn std::error::Error>> {
144//! use runledger_core::prelude::*;
145//! use runledger_postgres::prelude::*;
146//!
147//! let crawl_payload = serde_json::json!({"profile_id": "p_123"});
148//! let classify_payload = serde_json::json!({"profile_id": "p_123"});
149//! let metadata = serde_json::json!({"source": "api"});
150//!
151//! let run = WorkflowDagBuilder::new("profiles.research", &metadata)
152//! .idempotency_key("profile:p_123:research")
153//! .job("crawl", "profiles.crawl", &crawl_payload)?
154//! .job("classify", "profiles.classify", &classify_payload)?
155//! .after_success("classify", ["crawl"])?
156//! .build()?;
157//!
158//! let _workflow_run = enqueue_workflow_run(&pool, &run).await?;
159//! # Ok(())
160//! # }
161//! ```
162//!
163//! # Use An External Workflow Gate
164//!
165//! Create the gate with `WorkflowStepEnqueueBuilder::new_external`, then
166//! complete it from a trusted service boundary when the external condition is
167//! known.
168//!
169//! ```rust,no_run
170//! # async fn demo(
171//! # pool: runledger_postgres::DbPool,
172//! # workflow_run_id: sqlx::types::Uuid,
173//! # ) -> Result<(), Box<dyn std::error::Error>> {
174//! use runledger_core::prelude::*;
175//! use runledger_postgres::prelude::*;
176//!
177//! let input = CompleteExternalWorkflowStepInput {
178//! workflow_run_id,
179//! organization_id: None,
180//! step_key: StepKey::new("approval"),
181//! terminal_status: WorkflowStepStatus::Succeeded,
182//! status_reason: Some("approved"),
183//! last_error_code: None,
184//! last_error_message: None,
185//! };
186//!
187//! let _step = complete_external_workflow_step(&pool, &input).await?;
188//! # Ok(())
189//! # }
190//! ```
191//!
192//! # Inspect Workflow State
193//!
194//! ```rust,no_run
195//! # async fn demo(
196//! # pool: runledger_postgres::DbPool,
197//! # workflow_run_id: sqlx::types::Uuid,
198//! # ) -> Result<(), Box<dyn std::error::Error>> {
199//! use runledger_postgres::prelude::*;
200//!
201//! let _run = get_workflow_run_by_id(&pool, None, workflow_run_id).await?;
202//! let _steps = list_workflow_steps(&pool, None, workflow_run_id).await?;
203//! let _dependencies = list_workflow_step_dependencies(&pool, None, workflow_run_id).await?;
204//! # Ok(())
205//! # }
206//! ```
207//!
208//! # Handle Errors Safely
209//!
210//! `QueryError::client_message` and `QueryError::code` are safe for public
211//! responses. `QueryError::internal_message` is for trusted diagnostics.
212//!
213//! ```rust,no_run
214//! # async fn demo(
215//! # pool: runledger_postgres::DbPool,
216//! # job: runledger_postgres::jobs::JobEnqueue<'_>,
217//! # ) -> Result<(), runledger_postgres::Error> {
218//! match runledger_postgres::jobs::enqueue_job(&pool, &job).await {
219//! Ok(_job_id) => {}
220//! Err(runledger_postgres::Error::QueryError(query_error)) => {
221//! let _public_code = query_error.code();
222//! let _public_message = query_error.client_message();
223//! let _private_diagnostic = query_error.internal_message();
224//! }
225//! Err(error) => return Err(error),
226//! }
227//! # Ok(())
228//! # }
229//! ```
230
231use std::fmt;
232
233mod error;
234pub mod jobs;
235mod migrations;
236
237pub use error::{
238 FrameworkConstraintSpec, QueryError, QueryErrorCategory, classify_framework_constraint,
239 classify_query_error, classify_query_error_with_constraint_classifier,
240 has_framework_constraint_classifier,
241};
242pub use migrations::{
243 MIGRATOR, SchemaCompatibilityError, ensure_schema_compatible_after_idempotency_cutover,
244 migrate_after_idempotency_cutover,
245};
246#[allow(deprecated)]
247pub use migrations::{ensure_schema_compatible, migrate};
248
249/// Common `runledger-postgres` imports for integration crates.
250///
251/// This prelude contains persistence APIs, DB types, and database record/input
252/// structs. It avoids generic `Result` or `Error` aliases and does not re-export
253/// `runledger-core` contracts, so it can be glob-imported alongside
254/// `runledger_core::prelude::*` and `runledger_runtime::prelude::*`.
255pub mod prelude {
256 pub use crate::jobs::{
257 AppendWorkflowStepsInput, AppendWorkflowStepsOutcome, AppendWorkflowStepsResult,
258 CompleteExternalWorkflowStepInput, JobDefinitionListFilter, JobDefinitionRecord,
259 JobDefinitionUpdate, JobDefinitionUpsert, JobEnqueue, JobEventRecord, JobFailureUpdate,
260 JobListFilter, JobLogRecord, JobLogRecordInput, JobMetricsRecord, JobProgressUpdate,
261 JobQueueRecord, JobRuntimeConfigListFilter, JobRuntimeConfigRecord, JobRuntimeConfigUpsert,
262 JobScheduleRecord, JobScheduleUpsert, ReapExpiredLeasesResult, ReapedTerminalLeaseRecord,
263 WorkflowRunDbRecord, WorkflowRunListFilter, WorkflowStepDbRecord,
264 WorkflowStepDependencyDbRecord, append_workflow_steps, append_workflow_steps_tx,
265 cancel_job, cancel_workflow_run_tx, complete_external_workflow_step,
266 complete_external_workflow_step_tx, complete_job_failure, complete_job_success,
267 enqueue_job, enqueue_job_tx, enqueue_workflow_run, enqueue_workflow_run_tx, get_job_by_id,
268 get_job_definition_by_type, get_job_metrics, get_job_payload_by_idempotency_key,
269 get_job_runtime_config_by_type, get_latest_job_payload_for_run,
270 get_latest_workflow_run_by_type, get_required_job_runtime_config_by_type,
271 get_workflow_run_by_id, get_workflow_run_by_type_and_idempotency_key,
272 get_workflow_run_id_for_job, insert_job_definition_if_missing_tx, insert_job_log,
273 insert_job_runtime_config_if_missing, list_job_definitions, list_job_events, list_job_logs,
274 list_job_runtime_configs, list_jobs, list_workflow_runs, list_workflow_step_dependencies,
275 list_workflow_steps, requeue_job, set_job_schedule_active, set_job_schedule_active_tx,
276 set_job_schedule_next_fire_at, set_job_schedule_next_fire_at_tx, update_job_definition,
277 update_job_payload_uuid_array_field, update_workflow_step_and_pending_job_payload_tx,
278 upsert_job_definition_tx, upsert_job_runtime_config, upsert_job_runtime_config_tx,
279 upsert_job_schedule, upsert_job_schedule_tx,
280 };
281 pub use crate::{
282 DbPool, DbTx, FrameworkConstraintSpec, MIGRATOR, QueryError, QueryErrorCategory,
283 SchemaCompatibilityError, ensure_schema_compatible_after_idempotency_cutover,
284 migrate_after_idempotency_cutover,
285 };
286}
287
288pub type DbPool = sqlx::PgPool;
289pub type DbTx<'a> = sqlx::Transaction<'a, sqlx::Postgres>;
290pub type Result<T> = std::result::Result<T, Error>;
291
292#[derive(Debug)]
293pub enum Error {
294 ConfigError(String),
295 ConnectionError(String),
296 MigrationError(String),
297 QueryError(QueryError),
298}
299
300impl fmt::Display for Error {
301 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
302 match self {
303 Self::ConfigError(message) => write!(f, "{message}"),
304 Self::ConnectionError(message) => write!(f, "{message}"),
305 Self::MigrationError(message) => write!(f, "{message}"),
306 Self::QueryError(query_error) => write!(f, "{query_error}"),
307 }
308 }
309}
310
311impl std::error::Error for Error {
312 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
313 match self {
314 Self::QueryError(query_error) => Some(query_error),
315 Self::ConfigError(_) | Self::ConnectionError(_) | Self::MigrationError(_) => None,
316 }
317 }
318}
319
320impl Error {
321 #[must_use]
322 pub fn from_query_sqlx(error: sqlx::Error) -> Self {
323 Self::QueryError(QueryError::from_sqlx(error, None))
324 }
325
326 #[must_use]
327 pub fn from_query_sqlx_with_context(context: &str, error: sqlx::Error) -> Self {
328 Self::QueryError(QueryError::from_sqlx(error, Some(context)))
329 }
330}