use crate::error::AwaError;
use sqlx::postgres::PgConnection;
use sqlx::PgPool;
use tracing::info;
pub const CURRENT_VERSION: i32 = 4;
const MIGRATIONS: &[(i32, &str, &[&str])] = &[
(1, "Canonical schema with UI indexes", &[V1_UP]),
(2, "Runtime observability snapshots", &[V2_UP]),
(3, "Maintenance loop health in runtime snapshots", &[V3_UP]),
(4, "Admin metadata cache tables", &[V4_UP]),
];
const V1_UP: &str = include_str!("../migrations/v001_canonical_schema.sql");
const V2_UP: &str = include_str!("../migrations/v002_runtime_instances.sql");
const V3_UP: &str = include_str!("../migrations/v003_maintenance_health.sql");
const V4_UP: &str = include_str!("../migrations/v004_admin_metadata.sql");
fn normalize_legacy_version(old_version: i32) -> i32 {
match old_version {
v if v >= 6 => 4, 5 => 3, 4 => 2, 3 => 1, _ => 0, }
}
pub async fn run(pool: &PgPool) -> Result<(), AwaError> {
let lock_key: i64 = 0x4157_415f_4d49_4752; let mut conn = pool.acquire().await?;
sqlx::query("SELECT pg_advisory_lock($1)")
.bind(lock_key)
.execute(&mut *conn)
.await?;
let result = run_inner(&mut conn).await;
let _ = sqlx::query("SELECT pg_advisory_unlock($1)")
.bind(lock_key)
.execute(&mut *conn)
.await;
result
}
async fn run_inner(conn: &mut PgConnection) -> Result<(), AwaError> {
let has_schema: bool =
sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
.fetch_one(&mut *conn)
.await?;
let current = if has_schema {
current_version_conn(conn).await?
} else {
0
};
if has_schema && current == CURRENT_VERSION {
info!(version = current, "Schema is up to date");
return Ok(());
}
for &(version, description, steps) in MIGRATIONS {
if version <= current {
continue;
}
info!(version, description, "Applying migration");
for step in steps {
sqlx::raw_sql(step).execute(&mut *conn).await?;
}
info!(version, "Migration applied");
}
Ok(())
}
pub async fn current_version(pool: &PgPool) -> Result<i32, AwaError> {
let mut conn = pool.acquire().await?;
current_version_conn(&mut conn).await
}
async fn current_version_conn(conn: &mut PgConnection) -> Result<i32, AwaError> {
let has_schema: bool =
sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM pg_namespace WHERE nspname = 'awa')")
.fetch_one(&mut *conn)
.await?;
if !has_schema {
return Ok(0);
}
let has_table: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'schema_version')",
)
.fetch_one(&mut *conn)
.await?;
if !has_table {
return Ok(0);
}
let version: Option<i32> = sqlx::query_scalar("SELECT MAX(version) FROM awa.schema_version")
.fetch_one(&mut *conn)
.await?;
let raw_version = version.unwrap_or(0);
let has_legacy_high: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM awa.schema_version WHERE version IN (5, 6))",
)
.fetch_one(&mut *conn)
.await
.unwrap_or(false);
let has_admin_metadata: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'queue_state_counts')",
)
.fetch_one(&mut *conn)
.await
.unwrap_or(false);
let is_legacy_v4_only = raw_version == 4 && !has_legacy_high && !has_admin_metadata;
let is_legacy_v3_only = raw_version == 3
&& !has_legacy_high
&& {
let has_runtime: bool = sqlx::query_scalar(
"SELECT EXISTS(SELECT 1 FROM information_schema.tables WHERE table_schema = 'awa' AND table_name = 'runtime_instances')",
)
.fetch_one(&mut *conn)
.await
.unwrap_or(false);
!has_runtime
};
if has_legacy_high || is_legacy_v4_only || is_legacy_v3_only {
let normalized = normalize_legacy_version(raw_version);
info!(
old_version = raw_version,
new_version = normalized,
"Normalizing legacy version numbering"
);
sqlx::query("DELETE FROM awa.schema_version WHERE version >= 3")
.execute(&mut *conn)
.await?;
for &(v, desc, _) in MIGRATIONS {
if v <= normalized {
sqlx::query(
"INSERT INTO awa.schema_version (version, description) VALUES ($1, $2) ON CONFLICT (version) DO NOTHING",
)
.bind(v)
.bind(desc)
.execute(&mut *conn)
.await?;
}
}
return Ok(normalized);
}
Ok(raw_version)
}
pub fn migration_sql() -> Vec<(i32, &'static str, String)> {
MIGRATIONS
.iter()
.map(|&(v, d, steps)| (v, d, steps.join("\n")))
.collect()
}