1use std::collections::HashMap;
2use std::fmt;
3
4use sqlx::migrate::{AppliedMigration, Migrate, MigrateError, Migrator};
5
6use crate::DbPool;
7
8pub static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
9
10type PgPoolConnection = sqlx::pool::PoolConnection<sqlx::Postgres>;
11type RunledgerMigrationMap = HashMap<i64, &'static sqlx::migrate::Migration>;
12
13#[derive(Debug)]
14#[non_exhaustive]
15pub enum SchemaCompatibilityError {
16 Query(sqlx::Error),
17 MissingMigrationHistory {
18 required_first_migration_version: i64,
19 },
20 LegacyIdempotencySnapshotsMissing {
21 job_count: i64,
22 workflow_count: i64,
23 },
24 Incompatible(MigrateError),
25 MigrationUnlock(MigrateError),
26}
27
28impl fmt::Display for SchemaCompatibilityError {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 match self {
31 Self::Query(error) => write!(
32 f,
33 "Runledger schema compatibility check could not query PostgreSQL state: {error}"
34 ),
35 Self::MissingMigrationHistory {
36 required_first_migration_version,
37 } => write!(
38 f,
39 "Runledger schema compatibility check requires the _sqlx_migrations table; apply or record Runledger migrations first (expected migration history starting at version {required_first_migration_version})"
40 ),
41 Self::LegacyIdempotencySnapshotsMissing {
42 job_count,
43 workflow_count,
44 } => write!(
45 f,
46 "Runledger idempotency cutover requires enqueue_request snapshots for all keyed rows; found {job_count} legacy job rows and {workflow_count} legacy workflow rows"
47 ),
48 Self::Incompatible(error) => write!(f, "{error}"),
49 Self::MigrationUnlock(error) => {
50 write!(
51 f,
52 "Runledger schema migration lock could not be released: {error}"
53 )
54 }
55 }
56 }
57}
58
59impl std::error::Error for SchemaCompatibilityError {
60 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
61 match self {
62 Self::Query(error) => Some(error),
63 Self::MissingMigrationHistory { .. } => None,
64 Self::LegacyIdempotencySnapshotsMissing { .. } => None,
65 Self::Incompatible(error) | Self::MigrationUnlock(error) => Some(error),
66 }
67 }
68}
69
70impl From<MigrateError> for SchemaCompatibilityError {
71 fn from(error: MigrateError) -> Self {
72 Self::Incompatible(error)
73 }
74}
75
76impl From<sqlx::Error> for SchemaCompatibilityError {
77 fn from(error: sqlx::Error) -> Self {
78 Self::Query(error)
79 }
80}
81
82pub async fn migrate_after_idempotency_cutover(
89 pool: &DbPool,
90) -> Result<(), SchemaCompatibilityError> {
91 let mut conn = pool.acquire().await?;
92
93 if MIGRATOR.locking {
94 conn.close_on_drop();
97 (*conn)
98 .lock()
99 .await
100 .map_err(SchemaCompatibilityError::Incompatible)?;
101 }
102
103 let result = run_migrations_with_filtered_history(&mut conn).await;
104 let unlock_result = if MIGRATOR.locking {
105 (*conn).unlock().await
106 } else {
107 Ok(())
108 };
109
110 match (result, unlock_result) {
111 (Err(migration_error), Err(unlock_error)) => {
112 tracing::error!(
113 error = %unlock_error,
114 "failed to unlock migration lock after migration failure"
115 );
116 Err(SchemaCompatibilityError::Incompatible(migration_error))
117 }
118 (Err(error), Ok(())) => Err(SchemaCompatibilityError::Incompatible(error)),
119 (Ok(()), Err(error)) => Err(SchemaCompatibilityError::MigrationUnlock(error)),
120 (Ok(()), Ok(())) => {
121 reject_legacy_idempotency_rows(&mut conn).await?;
125 validate_idempotency_cutover_constraints(&mut conn).await
126 }
127 }
128}
129
130#[deprecated(
136 since = "0.1.2",
137 note = "use migrate_after_idempotency_cutover to make the enqueue request snapshot cutover explicit"
138)]
139pub async fn migrate(pool: &DbPool) -> Result<(), SchemaCompatibilityError> {
140 migrate_after_idempotency_cutover(pool).await
141}
142
143pub async fn ensure_schema_compatible_after_idempotency_cutover(
160 pool: &DbPool,
161) -> Result<(), SchemaCompatibilityError> {
162 let mut conn = pool.acquire().await?;
163
164 if !has_migrations_table(&mut conn).await? {
165 return Err(SchemaCompatibilityError::MissingMigrationHistory {
166 required_first_migration_version: first_up_migration_version(),
167 });
168 }
169
170 let expected_migrations = expected_runledger_migrations();
171 let history = list_migration_history(&mut conn).await?;
172
173 if let Some(version) = first_conflicting_runledger_version(&history, &expected_migrations) {
174 return Err(SchemaCompatibilityError::Incompatible(
175 MigrateError::VersionMismatch(version),
176 ));
177 }
178
179 if let Some(version) = first_dirty_runledger_version(&history, &expected_migrations) {
180 return Err(SchemaCompatibilityError::Incompatible(MigrateError::Dirty(
181 version,
182 )));
183 }
184
185 if has_runledger_migration_history_table(&mut conn).await? {
186 let recorded_versions = list_recorded_runledger_migrations(&mut conn).await?;
187 if let Some(version) =
188 first_missing_runledger_version(&recorded_versions, &expected_migrations)
189 {
190 return Err(SchemaCompatibilityError::Incompatible(
191 MigrateError::VersionMissing(version),
192 ));
193 }
194 }
195
196 let applied = applied_runledger_migrations(&history, &expected_migrations);
197 let applied_by_version: HashMap<_, _> = applied
198 .iter()
199 .map(|applied_migration| (applied_migration.version, applied_migration))
200 .collect();
201 let latest_applied_version = applied.iter().map(|migration| migration.version).max();
202
203 for migration in MIGRATOR
204 .iter()
205 .filter(|migration| migration.migration_type.is_up_migration())
206 {
207 match applied_by_version.get(&migration.version) {
208 Some(applied_migration) => {
209 validate_checksum(migration.version, applied_migration, migration)
210 .map_err(SchemaCompatibilityError::from)?
211 }
212 None => {
213 return Err(SchemaCompatibilityError::Incompatible(
214 MigrateError::VersionTooNew(
215 migration.version,
216 latest_applied_version.unwrap_or_default(),
217 ),
218 ));
219 }
220 }
221 }
222
223 reject_legacy_idempotency_rows(&mut conn).await
224}
225
226#[deprecated(
234 since = "0.1.2",
235 note = "use ensure_schema_compatible_after_idempotency_cutover to make the enqueue request snapshot cutover explicit"
236)]
237pub async fn ensure_schema_compatible(pool: &DbPool) -> Result<(), SchemaCompatibilityError> {
238 ensure_schema_compatible_after_idempotency_cutover(pool).await
239}
240
241async fn has_migrations_table(conn: &mut PgPoolConnection) -> Result<bool, sqlx::Error> {
242 sqlx::query_scalar::<_, bool>("SELECT to_regclass('_sqlx_migrations') IS NOT NULL")
243 .fetch_one(&mut **conn)
244 .await
245}
246
247async fn has_runledger_migration_history_table(
248 conn: &mut PgPoolConnection,
249) -> Result<bool, sqlx::Error> {
250 sqlx::query_scalar::<_, bool>("SELECT to_regclass('runledger_migration_history') IS NOT NULL")
251 .fetch_one(&mut **conn)
252 .await
253}
254
255async fn list_migration_history(
256 conn: &mut PgPoolConnection,
257) -> Result<Vec<MigrationHistoryRow>, sqlx::Error> {
258 sqlx::query_as::<_, MigrationHistoryRow>(
259 "SELECT version, checksum, success
260 FROM _sqlx_migrations
261 ORDER BY version",
262 )
263 .fetch_all(&mut **conn)
264 .await
265}
266
267async fn list_recorded_runledger_migrations(
268 conn: &mut PgPoolConnection,
269) -> Result<Vec<i64>, sqlx::Error> {
270 sqlx::query_scalar::<_, i64>(
271 "SELECT version
272 FROM runledger_migration_history
273 ORDER BY version",
274 )
275 .fetch_all(&mut **conn)
276 .await
277}
278
279async fn reject_legacy_idempotency_rows(
280 conn: &mut PgPoolConnection,
281) -> Result<(), SchemaCompatibilityError> {
282 if idempotency_cutover_constraints_valid(conn).await? {
283 return Ok(());
284 }
285
286 let row = sqlx::query!(
287 r#"SELECT
288 (
289 SELECT COUNT(*)::bigint
290 FROM job_queue
291 WHERE idempotency_key IS NOT NULL
292 AND enqueue_request IS NULL
293 ) AS "job_count!",
294 (
295 SELECT COUNT(*)::bigint
296 FROM workflow_runs
297 WHERE idempotency_key IS NOT NULL
298 AND enqueue_request IS NULL
299 ) AS "workflow_count!""#,
300 )
301 .fetch_one(&mut **conn)
302 .await?;
303
304 if row.job_count == 0 && row.workflow_count == 0 {
305 return Ok(());
306 }
307
308 Err(
309 SchemaCompatibilityError::LegacyIdempotencySnapshotsMissing {
310 job_count: row.job_count,
311 workflow_count: row.workflow_count,
312 },
313 )
314}
315
316async fn validate_idempotency_cutover_constraints(
317 conn: &mut PgPoolConnection,
318) -> Result<(), SchemaCompatibilityError> {
319 if idempotency_cutover_constraints_valid(conn).await? {
320 return Ok(());
321 }
322
323 sqlx::query(
327 "ALTER TABLE job_queue
328 VALIDATE CONSTRAINT ck_job_queue_idempotency_enqueue_request",
329 )
330 .execute(&mut **conn)
331 .await
332 .map_err(|error| {
333 tracing::warn!(
334 error = %error,
335 "failed to validate job_queue idempotency cutover constraint"
336 );
337 SchemaCompatibilityError::Query(error)
338 })?;
339
340 sqlx::query(
341 "ALTER TABLE workflow_runs
342 VALIDATE CONSTRAINT ck_workflow_runs_idempotency_enqueue_request",
343 )
344 .execute(&mut **conn)
345 .await
346 .map_err(|error| {
347 tracing::warn!(
348 error = %error,
349 "failed to validate workflow_runs idempotency cutover constraint"
350 );
351 SchemaCompatibilityError::Query(error)
352 })?;
353
354 Ok(())
355}
356
357async fn idempotency_cutover_constraints_valid(
358 conn: &mut PgPoolConnection,
359) -> Result<bool, sqlx::Error> {
360 sqlx::query_scalar::<_, bool>(
365 "SELECT COUNT(*) FILTER (WHERE c.convalidated) = 2
366 FROM pg_constraint c
367 JOIN pg_class t ON t.oid = c.conrelid
368 WHERE (t.relname, c.conname) IN (
369 ('job_queue', 'ck_job_queue_idempotency_enqueue_request'),
370 ('workflow_runs', 'ck_workflow_runs_idempotency_enqueue_request')
371 )",
372 )
373 .fetch_one(&mut **conn)
374 .await
375}
376
377fn first_up_migration_version() -> i64 {
378 MIGRATOR
379 .iter()
380 .find(|migration| migration.migration_type.is_up_migration())
381 .map(|migration| migration.version)
382 .unwrap_or_default()
383}
384
385fn expected_runledger_migrations() -> RunledgerMigrationMap {
386 MIGRATOR
387 .iter()
388 .filter(|migration| migration.migration_type.is_up_migration())
389 .map(|migration| (migration.version, migration))
390 .collect()
391}
392
393fn first_conflicting_runledger_version(
394 history: &[MigrationHistoryRow],
395 expected_migrations: &RunledgerMigrationMap,
396) -> Option<i64> {
397 history.iter().find_map(|row| {
398 expected_migrations
399 .get(&row.version)
400 .filter(|migration| row.checksum.as_slice() != migration.checksum.as_ref())
401 .map(|_| row.version)
402 })
403}
404
405fn first_dirty_runledger_version(
406 history: &[MigrationHistoryRow],
407 expected_migrations: &RunledgerMigrationMap,
408) -> Option<i64> {
409 history.iter().filter(|row| !row.success).find_map(|row| {
410 expected_migrations
411 .get(&row.version)
412 .filter(|migration| row.checksum.as_slice() == migration.checksum.as_ref())
413 .map(|_| row.version)
414 })
415}
416
417fn first_missing_runledger_version(
418 recorded_versions: &[i64],
419 expected_migrations: &RunledgerMigrationMap,
420) -> Option<i64> {
421 recorded_versions
422 .iter()
423 .copied()
424 .find(|version| !expected_migrations.contains_key(version))
425}
426
427fn applied_runledger_migrations(
428 history: &[MigrationHistoryRow],
429 expected_migrations: &RunledgerMigrationMap,
430) -> Vec<AppliedMigration> {
431 history
432 .iter()
433 .filter(|row| row.success)
434 .filter(|row| {
435 expected_migrations
436 .get(&row.version)
437 .is_some_and(|migration| row.checksum.as_slice() == migration.checksum.as_ref())
438 })
439 .map(|row| AppliedMigration {
440 version: row.version,
441 checksum: row.checksum.clone().into(),
442 })
443 .collect()
444}
445
446async fn run_migrations_with_filtered_history(
447 conn: &mut PgPoolConnection,
448) -> Result<(), MigrateError> {
449 (**conn).ensure_migrations_table().await?;
450
451 let expected_migrations = expected_runledger_migrations();
452 let history = list_migration_history(conn).await?;
453
454 if let Some(version) = first_conflicting_runledger_version(&history, &expected_migrations) {
455 return Err(MigrateError::VersionMismatch(version));
456 }
457
458 if let Some(version) = first_dirty_runledger_version(&history, &expected_migrations) {
459 return Err(MigrateError::Dirty(version));
460 }
461
462 if has_runledger_migration_history_table(conn).await? {
463 let recorded_versions = list_recorded_runledger_migrations(conn).await?;
464 if let Some(version) =
465 first_missing_runledger_version(&recorded_versions, &expected_migrations)
466 {
467 return Err(MigrateError::VersionMissing(version));
468 }
469 }
470
471 let applied = applied_runledger_migrations(&history, &expected_migrations);
472 let applied_by_version: HashMap<_, _> = applied
473 .into_iter()
474 .map(|migration| (migration.version, migration))
475 .collect();
476
477 for migration in MIGRATOR
478 .iter()
479 .filter(|migration| migration.migration_type.is_up_migration())
480 {
481 match applied_by_version.get(&migration.version) {
482 Some(applied_migration) => {
483 validate_checksum(migration.version, applied_migration, migration)?
484 }
485 None => {
486 (**conn).apply(migration).await?;
487 }
488 }
489 }
490
491 Ok(())
492}
493
494#[derive(sqlx::FromRow)]
495struct MigrationHistoryRow {
496 version: i64,
497 checksum: Vec<u8>,
498 success: bool,
499}
500
501fn validate_checksum(
502 version: i64,
503 applied_migration: &AppliedMigration,
504 expected_migration: &sqlx::migrate::Migration,
505) -> Result<(), MigrateError> {
506 if applied_migration.checksum != expected_migration.checksum {
507 return Err(MigrateError::VersionMismatch(version));
508 }
509
510 Ok(())
511}