use crate::colors::*;
use anyhow::{Result, anyhow, bail};
use qail_core::prelude::Qail;
use qail_pg::PgDriver;
use std::time::Instant;
use tokio::time::{Duration, sleep};
const LOCK_CLASS_ID: i32 = 20_801; const LOCK_OBJECT_SEED: i32 = 19_783; const LOCK_WAIT_POLL_MS: u64 = 500;
pub async fn acquire_migration_lock(
driver: &mut PgDriver,
operation: &str,
wait_for_lock: bool,
lock_timeout_secs: Option<u64>,
lock_scope: Option<&str>,
) -> Result<()> {
let should_wait = wait_for_lock || lock_timeout_secs.is_some();
let deadline = lock_timeout_secs
.map(Duration::from_secs)
.and_then(|timeout| Instant::now().checked_add(timeout));
let lock_object_id = scoped_lock_object_id(lock_scope);
let scope_label = normalize_scope(lock_scope).unwrap_or_else(|| "global".to_string());
if should_wait {
println!(
" {} Waiting for migration lock (scope: {})...",
"⏳".yellow().dimmed(),
scope_label.cyan()
);
loop {
if try_acquire_lock(driver, lock_object_id).await? {
println!(
" {} Acquired migration lock (scope: {})",
"✓".green(),
scope_label.cyan()
);
return Ok(());
}
if let Some(deadline) = deadline
&& Instant::now() >= deadline
{
bail!(
"Timed out waiting for migration lock for '{}' (scope: {}) after {} second(s).",
operation,
scope_label,
lock_timeout_secs.unwrap_or_default()
);
}
sleep(Duration::from_millis(LOCK_WAIT_POLL_MS)).await;
}
}
if try_acquire_lock(driver, lock_object_id).await? {
println!(
" {} Acquired migration lock (scope: {})",
"✓".green(),
scope_label.cyan()
);
return Ok(());
}
bail!(
"Another migration operation is already running. \
Could not acquire migration lock for '{}' (scope: {}). \
Re-run with --wait-for-lock or retry after it completes.",
operation,
scope_label
);
}
fn normalize_scope(lock_scope: Option<&str>) -> Option<String> {
lock_scope
.map(str::trim)
.filter(|s| !s.is_empty())
.map(|s| s.to_ascii_lowercase())
}
fn scoped_lock_object_id(lock_scope: Option<&str>) -> i32 {
let Some(scope) = normalize_scope(lock_scope) else {
return LOCK_OBJECT_SEED;
};
let mut hash: u32 = 0x811c9dc5;
for byte in scope.as_bytes() {
hash ^= u32::from(*byte);
hash = hash.wrapping_mul(0x0100_0193);
}
let mixed = hash ^ u32::try_from(LOCK_OBJECT_SEED).unwrap_or_default();
i32::try_from(mixed & 0x7fff_ffff).unwrap_or(LOCK_OBJECT_SEED)
}
async fn try_acquire_lock(driver: &mut PgDriver, lock_object_id: i32) -> Result<bool> {
let lock_source = format!(
"pg_try_advisory_lock({}, {})",
LOCK_CLASS_ID, lock_object_id
);
let lock_cmd = Qail::get(lock_source.as_str())
.column("pg_try_advisory_lock")
.limit(1);
let rows = driver
.fetch_all(&lock_cmd)
.await
.map_err(|e| anyhow!("Failed to acquire migration advisory lock: {}", e))?;
Ok(rows.first().and_then(|r| r.get_bool(0)).unwrap_or(false))
}
#[cfg(test)]
mod tests {
use super::{LOCK_CLASS_ID, LOCK_OBJECT_SEED, acquire_migration_lock, scoped_lock_object_id};
use qail_pg::PgDriver;
use std::time::Instant;
use tokio::time::Duration;
#[test]
fn advisory_lock_class_and_seed_are_stable() {
assert_eq!(LOCK_CLASS_ID, 20_801);
assert_eq!(LOCK_OBJECT_SEED, 19_783);
}
#[test]
fn scoped_lock_id_is_stable_and_distinct() {
let users_db = scoped_lock_object_id(Some("users_db"));
let inventory_db = scoped_lock_object_id(Some("inventory_db"));
assert_eq!(users_db, scoped_lock_object_id(Some("users_db")));
assert_ne!(users_db, inventory_db);
assert_eq!(LOCK_OBJECT_SEED, scoped_lock_object_id(Some("")));
assert_eq!(LOCK_OBJECT_SEED, scoped_lock_object_id(None));
}
fn lock_test_db_url() -> Option<String> {
std::env::var("QAIL_TEST_DB_URL").ok()
}
async fn connect_test_driver(url: &str) -> PgDriver {
PgDriver::connect_url(url)
.await
.expect("Failed to connect test driver using QAIL_TEST_DB_URL")
}
#[tokio::test]
async fn advisory_lock_real_db_contention_and_timeout() {
let Some(url) = lock_test_db_url() else {
eprintln!("Skipping advisory lock DB test (set QAIL_TEST_DB_URL)");
return;
};
let mut holder = connect_test_driver(&url).await;
let mut contender = connect_test_driver(&url).await;
acquire_migration_lock(&mut holder, "test holder", true, Some(5), Some(&url))
.await
.expect("holder should acquire lock");
let err = acquire_migration_lock(&mut contender, "test contender", false, None, Some(&url))
.await
.expect_err("contender should fail fast while lock is held");
assert!(
err.to_string()
.contains("Another migration operation is already running"),
"unexpected fast-fail error: {err}"
);
drop(contender);
drop(holder);
let mut holder = connect_test_driver(&url).await;
let mut waiter = connect_test_driver(&url).await;
acquire_migration_lock(
&mut holder,
"test timeout holder",
true,
Some(5),
Some(&url),
)
.await
.expect("timeout holder should acquire lock");
let started = Instant::now();
let timeout_err = acquire_migration_lock(
&mut waiter,
"test timeout waiter",
true,
Some(1),
Some(&url),
)
.await
.expect_err("waiter should time out while holder retains lock");
assert!(
started.elapsed() >= Duration::from_millis(900),
"wait timeout returned too quickly: {:?}",
started.elapsed()
);
assert!(
timeout_err
.to_string()
.contains("Timed out waiting for migration lock"),
"unexpected timeout error: {timeout_err}"
);
drop(waiter);
drop(holder);
let mut holder = connect_test_driver(&url).await;
let mut waiter = connect_test_driver(&url).await;
acquire_migration_lock(
&mut holder,
"test eventual holder",
true,
Some(5),
Some(&url),
)
.await
.expect("eventual holder should acquire lock");
let waiter_fut = acquire_migration_lock(
&mut waiter,
"test eventual waiter",
true,
Some(5),
Some(&url),
);
let release_fut = async move {
tokio::time::sleep(Duration::from_millis(700)).await;
drop(holder);
};
let ((), wait_result) = tokio::join!(release_fut, waiter_fut);
wait_result.expect("waiter should acquire lock after holder is released");
}
}