use std::panic::{AssertUnwindSafe, catch_unwind};
use diesel::{RunQueryDsl, sql_query};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
use crate::{Error, PgPool, queries::with_conn};
pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("migrations");
const ACQUIRE_MIGRATION_LOCK: &str =
"SELECT pg_advisory_lock(hashtext('apalis_diesel_postgres'), hashtext('migrations'))";
const RELEASE_MIGRATION_LOCK: &str =
"SELECT pg_advisory_unlock(hashtext('apalis_diesel_postgres'), hashtext('migrations'))";
pub(crate) async fn setup(pool: PgPool) -> Result<(), Error> {
with_conn(pool, |conn| {
sql_query(ACQUIRE_MIGRATION_LOCK)
.execute(conn)
.map_err(Error::database("acquiring the migration advisory lock"))?;
let migrated = catch_unwind(AssertUnwindSafe(|| {
conn.run_pending_migrations(MIGRATIONS).map(|_| ())
}));
let released = sql_query(RELEASE_MIGRATION_LOCK).execute(conn);
match migrated {
Err(panic) => std::panic::resume_unwind(panic),
Ok(Err(error)) => Err(Error::Migration(error)),
Ok(Ok(())) => released
.map(|_| ())
.map_err(Error::database("releasing the migration advisory lock")),
}
})
.await
}
pub(crate) async fn verify_schema(pool: PgPool) -> Result<(), Error> {
with_conn(pool, |conn| {
let pending = conn
.has_pending_migration(MIGRATIONS)
.map_err(Error::Migration)?;
if pending {
Err(Error::Migration(
"embedded migrations have not been applied — call `apalis_diesel_postgres::setup` first".into(),
))
} else {
Ok(())
}
})
.await
}