use graphile_worker_migrations::{
migrate,
sql::{
m000001::M000001_MIGRATION, m000002::M000002_MIGRATION, m000003::M000003_MIGRATION,
m000004::M000004_MIGRATION, m000005::M000005_MIGRATION, m000006::M000006_MIGRATION,
m000007::M000007_MIGRATION, m000008::M000008_MIGRATION, m000009::M000009_MIGRATION,
m000010::M000010_MIGRATION,
},
MigrateError,
};
use helpers::with_test_db;
use serde_json::json;
use sqlx::query;
mod helpers;
#[tokio::test]
async fn migration_install_schema_and_second_migration_does_not_harm() {
with_test_db(|test_db| async move {
query("drop schema if exists graphile_worker cascade")
.execute(&test_db.test_pool)
.await
.unwrap();
migrate(&test_db.test_pool, "graphile_worker")
.await
.expect("Failed to migrate");
let migrations = test_db.get_migrations().await;
assert_eq!(migrations.len(), 19);
let m0 = &migrations[0];
assert_eq!(m0.id, 1);
test_db
.add_job("assert_job_works", json!({}), Default::default())
.await;
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 1);
let job = &jobs[0];
assert_eq!(job.task_identifier, "assert_job_works");
for _ in 0..3 {
migrate(&test_db.test_pool, "graphile_worker")
.await
.expect("Failed to migrate");
}
let jobs = test_db.get_jobs().await;
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].task_identifier, "assert_job_works");
})
.await;
}
#[tokio::test]
async fn migration_can_take_over_from_pre_existing_migrations_table() {
with_test_db(|test_db| async move {
let initial_stmts = &[
"DROP SCHEMA IF EXISTS graphile_worker CASCADE",
"CREATE SCHEMA IF NOT EXISTS graphile_worker",
"CREATE TABLE IF NOT EXISTS graphile_worker.migrations(id INT PRIMARY KEY, ts TIMESTAMPTZ DEFAULT NOW() NOT NULL, breaking BOOLEAN DEFAULT FALSE NOT NULL)",
"INSERT INTO graphile_worker.migrations (id) VALUES (1)",
];
let mut tx = test_db.test_pool.begin().await.unwrap();
for stmt in initial_stmts {
query(stmt).execute(tx.as_mut()).await.unwrap();
}
M000001_MIGRATION
.execute(&mut tx, "graphile_worker")
.await
.expect("Failed to execute migration");
tx.commit().await.unwrap();
migrate(&test_db.test_pool, "graphile_worker")
.await
.expect("Failed to migrate");
let migration_rows = test_db.get_migrations().await;
assert!(
migration_rows.len() >= 19,
"There should be at least 19 migrations"
);
let migration_2 = migration_rows.iter().find(|m| m.id == 2).unwrap();
assert!(!migration_2.breaking, "Migration 2 should not be breaking");
let migration_11 = migration_rows.iter().find(|m| m.id == 11).unwrap();
assert!(migration_11.breaking, "Migration 11 should be breaking");
test_db
.add_job("assert_jobs_work", json!({}), Default::default())
.await;
let jobs_rows = test_db.get_jobs().await;
assert_eq!(jobs_rows.len(), 1, "There should be one job");
assert_eq!(
jobs_rows[0].task_identifier, "assert_jobs_work",
"The job should match 'assert_jobs_work'"
);
for _ in 0..3 {
migrate(&test_db.test_pool, "graphile_worker")
.await
.expect("Failed to re-migrate");
}
let jobs_rows_after = test_db.get_jobs().await;
assert_eq!(
jobs_rows_after.len(),
1,
"There should still be one job after re-migrating"
);
assert_eq!(
jobs_rows_after[0].task_identifier, "assert_jobs_work",
"The job should still match 'assert_jobs_work' after re-migrating"
);
})
.await;
}
#[tokio::test]
async fn aborts_if_database_is_more_up_to_date_than_current_worker() {
with_test_db(|test_db| async move {
query("DROP SCHEMA IF EXISTS graphile_worker CASCADE;")
.execute(&test_db.test_pool)
.await
.unwrap();
migrate(&test_db.test_pool, "graphile_worker")
.await
.expect("Failed to perform initial migration");
query("INSERT INTO graphile_worker.migrations (id, ts, breaking) VALUES (999999, '2023-10-19T10:31:00Z', true);")
.execute(&test_db.test_pool)
.await
.unwrap();
let migration_result = migrate(&test_db.test_pool, "graphile_worker").await;
assert!(
matches!(
migration_result,
Err(MigrateError::IncompatbleRevision { latest_migration, latest_breaking_migration, highest_migration })
if latest_migration == 999999 && latest_breaking_migration == 999999 && highest_migration == 19
),
"Expected migration to abort due to database being more up to date than current worker"
);
})
.await;
}
#[tokio::test]
async fn throws_helpful_error_message_in_migration_11() {
with_test_db(|test_db| async move {
query("CREATE SCHEMA IF NOT EXISTS graphile_worker;")
.execute(&test_db.test_pool)
.await
.unwrap();
query("CREATE TABLE IF NOT EXISTS graphile_worker.migrations(id INT PRIMARY KEY, ts TIMESTAMPTZ DEFAULT NOW() NOT NULL, breaking BOOLEAN DEFAULT FALSE NOT NULL);")
.execute(&test_db.test_pool)
.await
.unwrap();
let migrations = &[
M000001_MIGRATION,
M000002_MIGRATION,
M000003_MIGRATION,
M000004_MIGRATION,
M000005_MIGRATION,
M000006_MIGRATION,
M000007_MIGRATION,
M000008_MIGRATION,
M000009_MIGRATION,
M000010_MIGRATION,
];
let mut tx = test_db.test_pool.begin().await.unwrap();
for migration in migrations {
migration.execute(&mut tx, "graphile_worker").await.unwrap();
let sql = "insert into graphile_worker.migrations (id, breaking) values ($1, $2)";
query(sql)
.bind(migration.migration_number() as i64)
.bind(migration.is_breaking())
.execute(tx.as_mut())
.await
.unwrap();
}
tx.commit().await.unwrap();
query("SELECT graphile_worker.add_job('lock_me', '{}');")
.execute(&test_db.test_pool)
.await
.unwrap();
query("UPDATE graphile_worker.jobs SET locked_at = NOW(), locked_by = 'test_runner';")
.execute(&test_db.test_pool)
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let migration_result = migrate(&test_db.test_pool, "graphile_worker").await;
assert!(
matches!(migration_result, Err(MigrateError::LockedJobInMigration11)),
"Expected migration to fail due to locked jobs"
);
})
.await;
}