use crate::error::AwaError;
use sqlx::postgres::PgConnection;
use sqlx::PgPool;
use tracing::info;
pub const CURRENT_VERSION: i32 = 6;
const MIGRATIONS: &[(i32, &str, &[&str])] = &[
(1, "Canonical schema with UI indexes", &[V1_UP]),
(2, "Runtime observability snapshots", &[V2_UP]),
(3, "Maintenance loop health in runtime snapshots", &[V3_UP]),
(4, "Admin metadata cache tables", &[V4_UP]),
(5, "Statement-level admin metadata triggers", &[V5_UP]),
(
6,
"Dirty-key statement triggers for deadlock-free admin metadata",
&[V6_UP],
),
];
const V1_UP: &str = include_str!("../migrations/v001_canonical_schema.sql");
const V2_UP: &str = include_str!("../migrations/v002_runtime_instances.sql");
const V3_UP: &str = include_str!("../migrations/v003_maintenance_health.sql");
const V4_UP: &str = include_str!("../migrations/v004_admin_metadata.sql");
const V5_UP: &str = include_str!("../migrations/v005_admin_metadata_stmt_triggers.sql");
const V6_UP: &str = include_str!("../migrations/v006_remove_hot_table_triggers.sql");
fn normalize_legacy_version(old_version: i32) -> i32 {
match old_version {
v if v >= 6 => 4, 5 => 3, 4 => 2, 3 => 1, _ => 0, }
}
pub async fn run(pool: &PgPool) -> Result<(), AwaError> {
let lock_key: i64 = 0x4157_415f_4d49_4752; let mut conn = pool.acquire().await?;
sqlx::query("SELECT pg_advisory_lock($1)")
.bind(lock_key)
.execute(&mut *conn)
.await?;
let result = run_inner(&mut conn).await;
let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
.bind(lock_key)
.execute(&mut *conn)
.await;
result
}
async fn run_inner(conn: &mut PgConnection) -> Result<(), AwaError> {
let has_schema: bool =
sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
.fetch_one(&mut *conn)
.await?;
let current = if has_schema {
current_version_conn(conn).await?
} else {
0
};
if !(has_schema && current == CURRENT_VERSION) {
for &(version, description, steps) in MIGRATIONS {
if version <= current {
continue;
}
info!(version, description, "Applying migration");
for step in steps {
sqlx::raw_sql(step).execute(&mut *conn).await?;
}
info!(version, "Migration applied");
}
} else {
info!(version = current, "Schema is up to date");
}
let has_refresh: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM pg_proc WHERE proname = 'refresh_admin_metadata' AND pronamespace = (SELECT oid FROM pg_namespace WHERE nspname = 'awa'))",
)
.fetch_one(&mut *conn)
.await?;
if has_refresh {
let _ = sqlx::raw_sql(
"SET LOCAL statement_timeout = '5s'; SELECT awa.refresh_admin_metadata()",
)
.execute(&mut *conn)
.await;
}
Ok(())
}
pub async fn current_version(pool: &PgPool) -> Result<i32, AwaError> {
let mut conn = pool.acquire().await?;
current_version_conn(&mut conn).await
}
async fn current_version_conn(conn: &mut PgConnection) -> Result<i32, AwaError> {
let has_schema: bool =
sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
.fetch_one(&mut *conn)
.await?;
if !has_schema {
return Ok(0);
}
let has_table: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
)
.fetch_one(&mut *conn)
.await?;
if !has_table {
return Ok(0);
}
let version: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
.fetch_one(&mut *conn)
.await?;
let raw_version = version.unwrap_or(0);
if (1..=CURRENT_VERSION).contains(&raw_version) {
let has_admin_tables: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'queue_state_counts')",
)
.fetch_one(&mut *conn)
.await
.unwrap_or(false);
if raw_version >= 4 && has_admin_tables {
return Ok(raw_version);
}
if raw_version <= 3 {
let has_runtime: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'runtime_instances')",
)
.fetch_one(&mut *conn)
.await
.unwrap_or(false);
if (raw_version >= 2 && has_runtime) || raw_version == 1 {
return Ok(raw_version);
}
}
}
let has_legacy_high: bool =
sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM awa.schema_version WHERE version >= 6)")
.fetch_one(&mut *conn)
.await
.unwrap_or(false);
let has_admin_metadata: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'queue_state_counts')",
)
.fetch_one(&mut *conn)
.await
.unwrap_or(false);
let is_legacy_v5_only = raw_version == 5 && !has_legacy_high && !has_admin_metadata;
let is_legacy_v4_only = raw_version == 4 && !has_legacy_high && !has_admin_metadata;
let is_legacy_v3_only = raw_version == 3
&& !has_legacy_high
&& {
let has_runtime: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'runtime_instances')",
)
.fetch_one(&mut *conn)
.await
.unwrap_or(false);
!has_runtime
};
if has_legacy_high || is_legacy_v5_only || is_legacy_v4_only || is_legacy_v3_only {
let normalized = normalize_legacy_version(raw_version);
info!(
old_version = raw_version,
new_version = normalized,
"Normalizing legacy version numbering"
);
sqlx::query("DELETE FROM awa.schema_version WHERE version >= 3")
.execute(&mut *conn)
.await?;
for &(v, desc, _) in MIGRATIONS {
if v <= normalized {
sqlx::query(
"INSERT INTO awa.schema_version (version, description) VALUES ($1, $2) ON CONFLICT (version) DO NOTHING",
)
.bind(v)
.bind(desc)
.execute(&mut *conn)
.await?;
}
}
return Ok(normalized);
}
Ok(raw_version)
}
pub fn migration_sql() -> Vec<(i32, &'static str, String)> {
MIGRATIONS
.iter()
.map(|&(v, d, steps)| (v, d, steps.join("\n")))
.collect()
}
pub fn migration_sql_range(from: i32, to: i32) -> Vec<(i32, &'static str, String)> {
MIGRATIONS
.iter()
.filter(|&&(v, _, _)| v > from && v <= to)
.map(|&(v, d, steps)| (v, d, steps.join("\n")))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn migration_sql_range_all() {
let all = migration_sql_range(0, CURRENT_VERSION);
assert_eq!(all.len(), MIGRATIONS.len());
assert_eq!(all.first().unwrap().0, 1);
assert_eq!(all.last().unwrap().0, CURRENT_VERSION);
}
#[test]
fn migration_sql_range_subset() {
let subset = migration_sql_range(2, CURRENT_VERSION);
assert!(subset.iter().all(|(v, _, _)| *v > 2));
assert_eq!(subset.len(), (CURRENT_VERSION - 2) as usize);
}
#[test]
fn migration_sql_range_single() {
let single = migration_sql_range(2, 3);
assert_eq!(single.len(), 1);
assert_eq!(single[0].0, 3);
assert!(!single[0].2.is_empty());
}
#[test]
fn migration_sql_range_empty_when_equal() {
let empty = migration_sql_range(CURRENT_VERSION, CURRENT_VERSION);
assert!(empty.is_empty());
}
#[test]
fn migration_sql_range_empty_when_inverted() {
let empty = migration_sql_range(3, 1);
assert!(empty.is_empty());
}
#[test]
fn migration_sql_range_matches_full() {
let full = migration_sql();
let ranged = migration_sql_range(0, CURRENT_VERSION);
assert_eq!(full.len(), ranged.len());
for (f, r) in full.iter().zip(ranged.iter()) {
assert_eq!(f.0, r.0);
assert_eq!(f.1, r.1);
assert_eq!(f.2, r.2);
}
}
}