1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
//! PostgreSQL persistence layer for Runledger durable execution.
//!
//! This crate owns the SQLx-backed storage and query helpers used by the
//! runtime and application crates. The main entrypoint is the [`jobs`] module,
//! which exposes APIs for:
//! - queueing, claiming, heartbeating, and completing jobs
//! - listing admin/job log data and runtime configuration
//! - enqueueing and querying workflow runs and steps
//! - applying or validating the bundled Runledger schema migrations
//!
//! Typical consumers share a [`DbPool`] with `runledger-runtime`, then call the
//! exported [`jobs`] functions from application setup, admin APIs, or tests.
//!
//! # Security Boundary
//!
//! This crate is a persistence layer, not an authentication or authorization
//! layer. Public job and workflow APIs that accept `organization_id`,
//! idempotency keys, workflow IDs, job IDs, or metadata expect those values to
//! come from a trusted service boundary. HTTP or RPC handlers should derive
//! organization scope from authenticated claims or server-side policy, not from
//! untrusted request parameters alone.
//!
//! [`QueryError::client_message`] and [`QueryError::code`] are the stable values
//! intended for public error responses. Detailed internal context remains
//! available through [`QueryError::internal_message`] for server-side diagnostics.
//! Public formatting keeps raw SQLx details sanitized. The standard error source
//! chain and [`QueryError::source_arc`] are available for trusted server-side
//! diagnostics.
//! Runtime lifecycle, workflow mutation, and idempotent enqueue APIs are designed
//! for PostgreSQL's default `READ COMMITTED` transaction isolation so they can
//! observe rows committed after lock waits or uniqueness conflicts.
//! Release-sensitive, workflow-append, and keyed-enqueue paths validate this
//! before running because their correctness depends on second reads after waits
//! or conflicts.
//!
//! For simple embedding, call [`migrate_after_idempotency_cutover`] during
//! startup:
//!
//! ```rust,no_run
//! # async fn demo() -> Result<(), Box<dyn std::error::Error>> {
//! let pool = sqlx::PgPool::connect("postgres://localhost/runledger").await?;
//! runledger_postgres::migrate_after_idempotency_cutover(&pool).await?;
//! # Ok(())
//! # }
//! ```
//!
//! For deployments that manage DDL elsewhere, call
//! [`ensure_schema_compatible_after_idempotency_cutover`] instead to fail fast
//! if the schema is missing, drifted, or still has keyed legacy rows without
//! idempotency request snapshots. That check is read-only, but it expects the
//! database to retain SQLx migration history in `_sqlx_migrations` and, when
//! available, Runledger-owned migration state in `runledger_migration_history`.
//!
//! # Copy-Paste Examples
//!
//! - [Enqueue one job](https://github.com/featherenvy/runledger/blob/master/runledger-postgres/examples/enqueue_job.rs)
//! - [Enqueue a workflow DAG](https://github.com/featherenvy/runledger/blob/master/runledger-postgres/examples/workflow_dag.rs)
//! - [Use an external workflow gate](https://github.com/featherenvy/runledger/blob/master/runledger-postgres/examples/external_gate.rs)
//! - [Create a scheduled job entrypoint](https://github.com/featherenvy/runledger/blob/master/runledger-postgres/examples/schedule_job.rs)
//!
//! Import `runledger_runtime::prelude::*` and use
//! [the worker binary example](https://github.com/featherenvy/runledger/blob/master/runledger-runtime/examples/worker_binary.rs)
//! when adding a worker process for the jobs and workflows enqueued through this
//! crate.
//!
//! # Prelude
//!
//! ```rust
//! use runledger_core::prelude::*;
//! use runledger_postgres::prelude::*;
//! ```
//!
//! The PostgreSQL prelude exports persistence functions and record/input types.
//! It intentionally does not re-export core contract types, so import
//! `runledger_core::prelude::*` beside it when building job or workflow inputs.
//!
//! # Enqueue One Job
//!
//! Use direct job enqueue for one independent retried unit of work.
//!
//! ```rust,no_run
//! # async fn demo(pool: runledger_postgres::DbPool) -> Result<(), Box<dyn std::error::Error>> {
//! use runledger_core::prelude::*;
//! use runledger_postgres::prelude::*;
//!
//! let payload = serde_json::json!({"email_id": "email_123"});
//! let job = JobEnqueue {
//! job_type: JobType::new("jobs.email.send"),
//! organization_id: None,
//! payload: &payload,
//! priority: None,
//! max_attempts: None,
//! timeout_seconds: None,
//! next_run_at: None,
//! idempotency_key: Some("email:email_123:send"),
//! stage: None,
//! };
//!
//! let _job_id = enqueue_job(&pool, &job).await?;
//! # Ok(())
//! # }
//! ```
//!
//! # Create A Scheduled Job Entrypoint
//!
//! Use [`jobs::JobScheduleUpsert`] to create or update the cron row consumed by
//! the runtime scheduler. Schedules are UTC-only. Updating an existing schedule
//! refreshes its definition while preserving `is_active` and `organization_id`;
//! `next_fire_at` is refreshed when `cron_expr` changes. Cron expressions are
//! validated with the same parser used by `runledger-runtime`. Use
//! [`jobs::set_job_schedule_active`] to pause or resume a schedule, and
//! [`jobs::set_job_schedule_next_fire_at`] to manually retime its cursor.
//!
//! ```rust,no_run
//! # async fn demo(pool: runledger_postgres::DbPool) -> Result<(), Box<dyn std::error::Error>> {
//! use chrono::Utc;
//! use runledger_core::prelude::*;
//! use runledger_postgres::prelude::*;
//!
//! let payload_template = serde_json::json!({"source": "api"});
//! let schedule = JobScheduleUpsert {
//! name: "profile-refresh-hourly",
//! job_type: JobType::new("profiles.refresh"),
//! organization_id: None,
//! payload_template: &payload_template,
//! cron_expr: "0 0 * * * *",
//! is_active: true,
//! next_fire_at: Utc::now(),
//! max_jitter_seconds: 0,
//! };
//!
//! let _schedule = upsert_job_schedule(&pool, &schedule).await?;
//! # Ok(())
//! # }
//! ```
//!
//! # Enqueue A Workflow DAG
//!
//! Use workflows when the work has step dependencies, fan-out/fan-in, external
//! gates, cancellation as one logical run, or workflow-level idempotency.
//!
//! ```rust,no_run
//! # async fn demo(pool: runledger_postgres::DbPool) -> Result<(), Box<dyn std::error::Error>> {
//! use runledger_core::prelude::*;
//! use runledger_postgres::prelude::*;
//!
//! let crawl_payload = serde_json::json!({"profile_id": "p_123"});
//! let classify_payload = serde_json::json!({"profile_id": "p_123"});
//! let metadata = serde_json::json!({"source": "api"});
//!
//! let run = WorkflowDagBuilder::new("profiles.research", &metadata)
//! .idempotency_key("profile:p_123:research")
//! .job("crawl", "profiles.crawl", &crawl_payload)?
//! .job("classify", "profiles.classify", &classify_payload)?
//! .after_success("classify", ["crawl"])?
//! .build()?;
//!
//! let _workflow_run = enqueue_workflow_run(&pool, &run).await?;
//! # Ok(())
//! # }
//! ```
//!
//! # Use An External Workflow Gate
//!
//! Create the gate with `WorkflowStepEnqueueBuilder::new_external`, then
//! complete it from a trusted service boundary when the external condition is
//! known.
//!
//! ```rust,no_run
//! # async fn demo(
//! # pool: runledger_postgres::DbPool,
//! # workflow_run_id: sqlx::types::Uuid,
//! # ) -> Result<(), Box<dyn std::error::Error>> {
//! use runledger_core::prelude::*;
//! use runledger_postgres::prelude::*;
//!
//! let input = CompleteExternalWorkflowStepInput {
//! workflow_run_id,
//! organization_id: None,
//! step_key: StepKey::new("approval"),
//! terminal_status: WorkflowStepStatus::Succeeded,
//! status_reason: Some("approved"),
//! last_error_code: None,
//! last_error_message: None,
//! };
//!
//! let _step = complete_external_workflow_step(&pool, &input).await?;
//! # Ok(())
//! # }
//! ```
//!
//! # Inspect Workflow State
//!
//! ```rust,no_run
//! # async fn demo(
//! # pool: runledger_postgres::DbPool,
//! # workflow_run_id: sqlx::types::Uuid,
//! # ) -> Result<(), Box<dyn std::error::Error>> {
//! use runledger_postgres::prelude::*;
//!
//! let _run = get_workflow_run_by_id(&pool, None, workflow_run_id).await?;
//! let _steps = list_workflow_steps(&pool, None, workflow_run_id).await?;
//! let _dependencies = list_workflow_step_dependencies(&pool, None, workflow_run_id).await?;
//! # Ok(())
//! # }
//! ```
//!
//! # Handle Errors Safely
//!
//! `QueryError::client_message` and `QueryError::code` are safe for public
//! responses. `QueryError::internal_message` is for trusted diagnostics.
//!
//! ```rust,no_run
//! # async fn demo(
//! # pool: runledger_postgres::DbPool,
//! # job: runledger_postgres::jobs::JobEnqueue<'_>,
//! # ) -> Result<(), runledger_postgres::Error> {
//! match runledger_postgres::jobs::enqueue_job(&pool, &job).await {
//! Ok(_job_id) => {}
//! Err(runledger_postgres::Error::QueryError(query_error)) => {
//! let _public_code = query_error.code();
//! let _public_message = query_error.client_message();
//! let _private_diagnostic = query_error.internal_message();
//! }
//! Err(error) => return Err(error),
//! }
//! # Ok(())
//! # }
//! ```
use fmt;
pub use ;
pub use ;
pub use ;
/// Common `runledger-postgres` imports for integration crates.
///
/// This prelude contains persistence APIs, DB types, and database record/input
/// structs. It avoids generic `Result` or `Error` aliases and does not re-export
/// `runledger-core` contracts, so it can be glob-imported alongside
/// `runledger_core::prelude::*` and `runledger_runtime::prelude::*`.
pub type DbPool = PgPool;
pub type DbTx<'a> = Transaction;
pub type Result<T> = Result;