Skip to main content

runledger_postgres/
migrations.rs

1use std::collections::HashMap;
2use std::fmt;
3
4use sqlx::migrate::{AppliedMigration, Migrate, MigrateError, Migrator};
5
6use crate::DbPool;
7
8pub static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
9
10type PgPoolConnection = sqlx::pool::PoolConnection<sqlx::Postgres>;
11type RunledgerMigrationMap = HashMap<i64, &'static sqlx::migrate::Migration>;
12
13#[derive(Debug)]
14#[non_exhaustive]
15pub enum SchemaCompatibilityError {
16    Query(sqlx::Error),
17    MissingMigrationHistory {
18        required_first_migration_version: i64,
19    },
20    LegacyIdempotencySnapshotsMissing {
21        job_count: i64,
22        workflow_count: i64,
23    },
24    Incompatible(MigrateError),
25    MigrationUnlock(MigrateError),
26}
27
28impl fmt::Display for SchemaCompatibilityError {
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        match self {
31            Self::Query(error) => write!(
32                f,
33                "Runledger schema compatibility check could not query PostgreSQL state: {error}"
34            ),
35            Self::MissingMigrationHistory {
36                required_first_migration_version,
37            } => write!(
38                f,
39                "Runledger schema compatibility check requires the _sqlx_migrations table; apply or record Runledger migrations first (expected migration history starting at version {required_first_migration_version})"
40            ),
41            Self::LegacyIdempotencySnapshotsMissing {
42                job_count,
43                workflow_count,
44            } => write!(
45                f,
46                "Runledger idempotency cutover requires enqueue_request snapshots for all keyed rows; found {job_count} legacy job rows and {workflow_count} legacy workflow rows"
47            ),
48            Self::Incompatible(error) => write!(f, "{error}"),
49            Self::MigrationUnlock(error) => {
50                write!(
51                    f,
52                    "Runledger schema migration lock could not be released: {error}"
53                )
54            }
55        }
56    }
57}
58
59impl std::error::Error for SchemaCompatibilityError {
60    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
61        match self {
62            Self::Query(error) => Some(error),
63            Self::MissingMigrationHistory { .. } => None,
64            Self::LegacyIdempotencySnapshotsMissing { .. } => None,
65            Self::Incompatible(error) | Self::MigrationUnlock(error) => Some(error),
66        }
67    }
68}
69
70impl From<MigrateError> for SchemaCompatibilityError {
71    fn from(error: MigrateError) -> Self {
72        Self::Incompatible(error)
73    }
74}
75
76impl From<sqlx::Error> for SchemaCompatibilityError {
77    fn from(error: sqlx::Error) -> Self {
78        Self::Query(error)
79    }
80}
81
82/// Apply the bundled Runledger schema migrations to a PostgreSQL pool, then
83/// enforce the idempotency snapshot cutover.
84///
85/// This is intentionally named as a hard-cutover API. Downstream applications
86/// upgrading from older Runledger versions must update their startup code and
87/// verify no keyed legacy rows remain without `enqueue_request` snapshots.
88pub async fn migrate_after_idempotency_cutover(
89    pool: &DbPool,
90) -> Result<(), SchemaCompatibilityError> {
91    let mut conn = pool.acquire().await?;
92
93    if MIGRATOR.locking {
94        // PostgreSQL advisory migration locks are session-scoped; never return
95        // a possibly locked session to the pool if this future is cancelled.
96        conn.close_on_drop();
97        (*conn)
98            .lock()
99            .await
100            .map_err(SchemaCompatibilityError::Incompatible)?;
101    }
102
103    let result = run_migrations_with_filtered_history(&mut conn).await;
104    let unlock_result = if MIGRATOR.locking {
105        (*conn).unlock().await
106    } else {
107        Ok(())
108    };
109
110    match (result, unlock_result) {
111        (Err(migration_error), Err(unlock_error)) => {
112            tracing::error!(
113                error = %unlock_error,
114                "failed to unlock migration lock after migration failure"
115            );
116            Err(SchemaCompatibilityError::Incompatible(migration_error))
117        }
118        (Err(error), Ok(())) => Err(SchemaCompatibilityError::Incompatible(error)),
119        (Ok(()), Err(error)) => Err(SchemaCompatibilityError::MigrationUnlock(error)),
120        (Ok(()), Ok(())) => {
121            // The DDL migration lock is no longer needed here: the NOT VALID
122            // cutover constraints already block new violating rows, and
123            // validation is idempotent if another startup validates first.
124            reject_legacy_idempotency_rows(&mut conn).await?;
125            validate_idempotency_cutover_constraints(&mut conn).await
126        }
127    }
128}
129
130/// Apply the bundled Runledger schema migrations to a PostgreSQL pool.
131///
132/// Deprecated compatibility alias for [`migrate_after_idempotency_cutover`].
133/// The current migration set enforces the enqueue request snapshot cutover, so
134/// this function has the same strict behavior as the new explicit API.
135#[deprecated(
136    since = "0.1.2",
137    note = "use migrate_after_idempotency_cutover to make the enqueue request snapshot cutover explicit"
138)]
139pub async fn migrate(pool: &DbPool) -> Result<(), SchemaCompatibilityError> {
140    migrate_after_idempotency_cutover(pool).await
141}
142
143/// Validate that the target database's SQLx migration history matches the
144/// bundled Runledger migrations.
145///
146/// Unlike [`migrate_after_idempotency_cutover`], this does not apply pending
147/// migrations. It is intended
148/// for deployments that manage DDL outside the application process but still
149/// want a startup guardrail. This check is read-only, but it relies on the
150/// `_sqlx_migrations` history table being present and up to date. When present,
151/// it also uses Runledger's own `runledger_migration_history` table to detect
152/// migrations applied by newer Runledger releases.
153///
154/// This read-only path does not validate `NOT VALID` cutover constraints after
155/// legacy rows are remediated. Deployments that apply DDL externally can run
156/// PostgreSQL `VALIDATE CONSTRAINT` for the idempotency cutover constraints
157/// after this check passes, or use [`migrate_after_idempotency_cutover`] to let
158/// Runledger do that promotion.
159pub async fn ensure_schema_compatible_after_idempotency_cutover(
160    pool: &DbPool,
161) -> Result<(), SchemaCompatibilityError> {
162    let mut conn = pool.acquire().await?;
163
164    if !has_migrations_table(&mut conn).await? {
165        return Err(SchemaCompatibilityError::MissingMigrationHistory {
166            required_first_migration_version: first_up_migration_version(),
167        });
168    }
169
170    let expected_migrations = expected_runledger_migrations();
171    let history = list_migration_history(&mut conn).await?;
172
173    if let Some(version) = first_conflicting_runledger_version(&history, &expected_migrations) {
174        return Err(SchemaCompatibilityError::Incompatible(
175            MigrateError::VersionMismatch(version),
176        ));
177    }
178
179    if let Some(version) = first_dirty_runledger_version(&history, &expected_migrations) {
180        return Err(SchemaCompatibilityError::Incompatible(MigrateError::Dirty(
181            version,
182        )));
183    }
184
185    if has_runledger_migration_history_table(&mut conn).await? {
186        let recorded_versions = list_recorded_runledger_migrations(&mut conn).await?;
187        if let Some(version) =
188            first_missing_runledger_version(&recorded_versions, &expected_migrations)
189        {
190            return Err(SchemaCompatibilityError::Incompatible(
191                MigrateError::VersionMissing(version),
192            ));
193        }
194    }
195
196    let applied = applied_runledger_migrations(&history, &expected_migrations);
197    let applied_by_version: HashMap<_, _> = applied
198        .iter()
199        .map(|applied_migration| (applied_migration.version, applied_migration))
200        .collect();
201    let latest_applied_version = applied.iter().map(|migration| migration.version).max();
202
203    for migration in MIGRATOR
204        .iter()
205        .filter(|migration| migration.migration_type.is_up_migration())
206    {
207        match applied_by_version.get(&migration.version) {
208            Some(applied_migration) => {
209                validate_checksum(migration.version, applied_migration, migration)
210                    .map_err(SchemaCompatibilityError::from)?
211            }
212            None => {
213                return Err(SchemaCompatibilityError::Incompatible(
214                    MigrateError::VersionTooNew(
215                        migration.version,
216                        latest_applied_version.unwrap_or_default(),
217                    ),
218                ));
219            }
220        }
221    }
222
223    reject_legacy_idempotency_rows(&mut conn).await
224}
225
226/// Validate that the target database's SQLx migration history matches the
227/// bundled Runledger migrations.
228///
229/// Deprecated compatibility alias for
230/// [`ensure_schema_compatible_after_idempotency_cutover`]. The current schema
231/// compatibility check rejects keyed legacy rows without enqueue request
232/// snapshots, matching the stricter cutover API.
233#[deprecated(
234    since = "0.1.2",
235    note = "use ensure_schema_compatible_after_idempotency_cutover to make the enqueue request snapshot cutover explicit"
236)]
237pub async fn ensure_schema_compatible(pool: &DbPool) -> Result<(), SchemaCompatibilityError> {
238    ensure_schema_compatible_after_idempotency_cutover(pool).await
239}
240
241async fn has_migrations_table(conn: &mut PgPoolConnection) -> Result<bool, sqlx::Error> {
242    sqlx::query_scalar::<_, bool>("SELECT to_regclass('_sqlx_migrations') IS NOT NULL")
243        .fetch_one(&mut **conn)
244        .await
245}
246
247async fn has_runledger_migration_history_table(
248    conn: &mut PgPoolConnection,
249) -> Result<bool, sqlx::Error> {
250    sqlx::query_scalar::<_, bool>("SELECT to_regclass('runledger_migration_history') IS NOT NULL")
251        .fetch_one(&mut **conn)
252        .await
253}
254
255async fn list_migration_history(
256    conn: &mut PgPoolConnection,
257) -> Result<Vec<MigrationHistoryRow>, sqlx::Error> {
258    sqlx::query_as::<_, MigrationHistoryRow>(
259        "SELECT version, checksum, success
260         FROM _sqlx_migrations
261         ORDER BY version",
262    )
263    .fetch_all(&mut **conn)
264    .await
265}
266
267async fn list_recorded_runledger_migrations(
268    conn: &mut PgPoolConnection,
269) -> Result<Vec<i64>, sqlx::Error> {
270    sqlx::query_scalar::<_, i64>(
271        "SELECT version
272         FROM runledger_migration_history
273         ORDER BY version",
274    )
275    .fetch_all(&mut **conn)
276    .await
277}
278
279async fn reject_legacy_idempotency_rows(
280    conn: &mut PgPoolConnection,
281) -> Result<(), SchemaCompatibilityError> {
282    if idempotency_cutover_constraints_valid(conn).await? {
283        return Ok(());
284    }
285
286    let row = sqlx::query!(
287        r#"SELECT
288            (
289                SELECT COUNT(*)::bigint
290                FROM job_queue
291                WHERE idempotency_key IS NOT NULL
292                  AND enqueue_request IS NULL
293            ) AS "job_count!",
294            (
295                SELECT COUNT(*)::bigint
296                FROM workflow_runs
297                WHERE idempotency_key IS NOT NULL
298                  AND enqueue_request IS NULL
299            ) AS "workflow_count!""#,
300    )
301    .fetch_one(&mut **conn)
302    .await?;
303
304    if row.job_count == 0 && row.workflow_count == 0 {
305        return Ok(());
306    }
307
308    Err(
309        SchemaCompatibilityError::LegacyIdempotencySnapshotsMissing {
310            job_count: row.job_count,
311            workflow_count: row.workflow_count,
312        },
313    )
314}
315
316async fn validate_idempotency_cutover_constraints(
317    conn: &mut PgPoolConnection,
318) -> Result<(), SchemaCompatibilityError> {
319    if idempotency_cutover_constraints_valid(conn).await? {
320        return Ok(());
321    }
322
323    // PostgreSQL validates each table constraint independently. If one
324    // validation succeeds and the other fails, the next startup skips the valid
325    // constraint and retries the remaining one.
326    sqlx::query(
327        "ALTER TABLE job_queue
328         VALIDATE CONSTRAINT ck_job_queue_idempotency_enqueue_request",
329    )
330    .execute(&mut **conn)
331    .await
332    .map_err(|error| {
333        tracing::warn!(
334            error = %error,
335            "failed to validate job_queue idempotency cutover constraint"
336        );
337        SchemaCompatibilityError::Query(error)
338    })?;
339
340    sqlx::query(
341        "ALTER TABLE workflow_runs
342         VALIDATE CONSTRAINT ck_workflow_runs_idempotency_enqueue_request",
343    )
344    .execute(&mut **conn)
345    .await
346    .map_err(|error| {
347        tracing::warn!(
348            error = %error,
349            "failed to validate workflow_runs idempotency cutover constraint"
350        );
351        SchemaCompatibilityError::Query(error)
352    })?;
353
354    Ok(())
355}
356
357async fn idempotency_cutover_constraints_valid(
358    conn: &mut PgPoolConnection,
359) -> Result<bool, sqlx::Error> {
360    // A validated cutover constraint is the durable proof that legacy keyed rows
361    // without enqueue_request snapshots cannot exist for that table. If future
362    // migrations replace these constraints, they must preserve that invariant
363    // before this short-circuit remains valid.
364    sqlx::query_scalar::<_, bool>(
365        "SELECT COUNT(*) FILTER (WHERE c.convalidated) = 2
366         FROM pg_constraint c
367         JOIN pg_class t ON t.oid = c.conrelid
368         WHERE (t.relname, c.conname) IN (
369             ('job_queue', 'ck_job_queue_idempotency_enqueue_request'),
370             ('workflow_runs', 'ck_workflow_runs_idempotency_enqueue_request')
371         )",
372    )
373    .fetch_one(&mut **conn)
374    .await
375}
376
377fn first_up_migration_version() -> i64 {
378    MIGRATOR
379        .iter()
380        .find(|migration| migration.migration_type.is_up_migration())
381        .map(|migration| migration.version)
382        .unwrap_or_default()
383}
384
385fn expected_runledger_migrations() -> RunledgerMigrationMap {
386    MIGRATOR
387        .iter()
388        .filter(|migration| migration.migration_type.is_up_migration())
389        .map(|migration| (migration.version, migration))
390        .collect()
391}
392
393fn first_conflicting_runledger_version(
394    history: &[MigrationHistoryRow],
395    expected_migrations: &RunledgerMigrationMap,
396) -> Option<i64> {
397    history.iter().find_map(|row| {
398        expected_migrations
399            .get(&row.version)
400            .filter(|migration| row.checksum.as_slice() != migration.checksum.as_ref())
401            .map(|_| row.version)
402    })
403}
404
405fn first_dirty_runledger_version(
406    history: &[MigrationHistoryRow],
407    expected_migrations: &RunledgerMigrationMap,
408) -> Option<i64> {
409    history.iter().filter(|row| !row.success).find_map(|row| {
410        expected_migrations
411            .get(&row.version)
412            .filter(|migration| row.checksum.as_slice() == migration.checksum.as_ref())
413            .map(|_| row.version)
414    })
415}
416
417fn first_missing_runledger_version(
418    recorded_versions: &[i64],
419    expected_migrations: &RunledgerMigrationMap,
420) -> Option<i64> {
421    recorded_versions
422        .iter()
423        .copied()
424        .find(|version| !expected_migrations.contains_key(version))
425}
426
427fn applied_runledger_migrations(
428    history: &[MigrationHistoryRow],
429    expected_migrations: &RunledgerMigrationMap,
430) -> Vec<AppliedMigration> {
431    history
432        .iter()
433        .filter(|row| row.success)
434        .filter(|row| {
435            expected_migrations
436                .get(&row.version)
437                .is_some_and(|migration| row.checksum.as_slice() == migration.checksum.as_ref())
438        })
439        .map(|row| AppliedMigration {
440            version: row.version,
441            checksum: row.checksum.clone().into(),
442        })
443        .collect()
444}
445
446async fn run_migrations_with_filtered_history(
447    conn: &mut PgPoolConnection,
448) -> Result<(), MigrateError> {
449    (**conn).ensure_migrations_table().await?;
450
451    let expected_migrations = expected_runledger_migrations();
452    let history = list_migration_history(conn).await?;
453
454    if let Some(version) = first_conflicting_runledger_version(&history, &expected_migrations) {
455        return Err(MigrateError::VersionMismatch(version));
456    }
457
458    if let Some(version) = first_dirty_runledger_version(&history, &expected_migrations) {
459        return Err(MigrateError::Dirty(version));
460    }
461
462    if has_runledger_migration_history_table(conn).await? {
463        let recorded_versions = list_recorded_runledger_migrations(conn).await?;
464        if let Some(version) =
465            first_missing_runledger_version(&recorded_versions, &expected_migrations)
466        {
467            return Err(MigrateError::VersionMissing(version));
468        }
469    }
470
471    let applied = applied_runledger_migrations(&history, &expected_migrations);
472    let applied_by_version: HashMap<_, _> = applied
473        .into_iter()
474        .map(|migration| (migration.version, migration))
475        .collect();
476
477    for migration in MIGRATOR
478        .iter()
479        .filter(|migration| migration.migration_type.is_up_migration())
480    {
481        match applied_by_version.get(&migration.version) {
482            Some(applied_migration) => {
483                validate_checksum(migration.version, applied_migration, migration)?
484            }
485            None => {
486                (**conn).apply(migration).await?;
487            }
488        }
489    }
490
491    Ok(())
492}
493
494#[derive(sqlx::FromRow)]
495struct MigrationHistoryRow {
496    version: i64,
497    checksum: Vec<u8>,
498    success: bool,
499}
500
501fn validate_checksum(
502    version: i64,
503    applied_migration: &AppliedMigration,
504    expected_migration: &sqlx::migrate::Migration,
505) -> Result<(), MigrateError> {
506    if applied_migration.checksum != expected_migration.checksum {
507        return Err(MigrateError::VersionMismatch(version));
508    }
509
510    Ok(())
511}