use crate::schema::SchemaConfig;
pub struct Migration {
pub version: u32,
pub description: &'static str,
pub up: fn(&SchemaConfig) -> Vec<String>,
}
pub const VERSIONS_TABLE: &str = "_schema_versions";
fn oracle_versions_table(schema: &SchemaConfig) -> String {
match &schema.owner {
Some(owner) => format!("{owner}.\"{VERSIONS_TABLE}\""),
None => format!("\"{VERSIONS_TABLE}\""),
}
}
pub fn oracle_create_versions_table(schema: &SchemaConfig) -> String {
let table = oracle_versions_table(schema);
format!(
"CREATE TABLE {table} (\
version NUMBER(10) NOT NULL PRIMARY KEY, \
description VARCHAR2(512), \
applied_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL)"
)
}
pub fn postgres_create_versions_table() -> String {
format!(
"CREATE TABLE IF NOT EXISTS {VERSIONS_TABLE} (\
version INTEGER NOT NULL PRIMARY KEY, \
description VARCHAR(512), \
applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW())"
)
}
fn pending_migrations_error(
backend: &str,
current: u32,
pending: &[&Migration],
schema: &SchemaConfig,
) -> String {
let mut msg = format!(
"Schema migration required (current version: {current}, \
latest version: {}).\n\n\
The following migrations need to be applied:\n",
pending.last().map_or(current, |m| m.version)
);
let versions_insert = if backend == "oracle" {
format!(
"INSERT INTO {} (version, description) VALUES",
oracle_versions_table(schema),
)
} else {
format!("INSERT INTO {VERSIONS_TABLE} (version, description) VALUES")
};
for m in pending {
msg.push_str(&format!("\n v{}: {}\n", m.version, m.description));
let stmts = (m.up)(schema);
for stmt in &stmts {
if !stmt.is_empty() {
msg.push_str(&format!(" {stmt}\n"));
}
}
msg.push_str(&format!(
" {versions_insert} ({}, '{}');\n",
m.version, m.description,
));
}
msg.push_str(&format!(
"\nTo apply automatically, set `auto_migrate: true` in your {backend} schema config.\n\
To skip, set `version: {}` to mark your schema as already up to date.",
pending.last().map_or(current, |m| m.version)
));
msg
}
pub fn run_oracle_migrations(
conn: &oracle::Connection,
schema: &SchemaConfig,
migrations: &[Migration],
config_version: Option<u32>,
auto_migrate: bool,
) -> Result<Vec<u32>, String> {
ensure_oracle_versions_table(conn, schema)?;
let current = oracle_current_version(conn, schema)?;
let skip_up_to = config_version.unwrap_or(0);
let effective_start = current.max(skip_up_to);
let pending: Vec<&Migration> = migrations
.iter()
.filter(|m| m.version > effective_start)
.collect();
if pending.is_empty() {
tracing::debug!(current_version = effective_start, "schema is up to date");
return Ok(Vec::new());
}
if !auto_migrate {
return Err(pending_migrations_error(
"oracle",
effective_start,
&pending,
schema,
));
}
tracing::info!(
current_version = effective_start,
pending = pending.len(),
"applying schema migrations"
);
let versions_table = oracle_versions_table(schema);
let mut applied = Vec::new();
for migration in pending {
let stmts = (migration.up)(schema);
for stmt in &stmts {
if stmt.is_empty() {
continue;
}
conn.execute(stmt, &[]).map_err(|e| {
format!(
"migration v{} ({}) failed: {}",
migration.version, migration.description, e
)
})?;
}
match conn.execute(
&format!("INSERT INTO {versions_table} (version, description) VALUES (:1, :2)"),
&[&migration.version, &migration.description],
) {
Ok(_) => {
conn.commit().map_err(|e| format!("commit failed: {e}"))?;
}
Err(e) if e.db_error().is_some_and(|de| de.code() == 1) => {
tracing::info!(
version = migration.version,
"migration already applied by another instance, skipping"
);
continue;
}
Err(e) => {
return Err(format!(
"failed to record migration v{}: {}",
migration.version, e
));
}
}
tracing::info!(
version = migration.version,
description = migration.description,
"applied migration"
);
applied.push(migration.version);
}
let final_version = oracle_current_version(conn, schema)?;
tracing::info!(
schema_version = final_version,
"schema version after migrations"
);
Ok(applied)
}
fn ensure_oracle_versions_table(
conn: &oracle::Connection,
schema: &SchemaConfig,
) -> Result<(), String> {
let check_sql = match &schema.owner {
Some(owner) => format!(
"SELECT COUNT(*) FROM all_tables WHERE owner = '{}' AND table_name = '{VERSIONS_TABLE}'",
owner.to_ascii_uppercase()
),
None => {
format!("SELECT COUNT(*) FROM user_tables WHERE table_name = '{VERSIONS_TABLE}'")
}
};
let present: i64 = conn
.query_row_as(&check_sql, &[])
.map_err(|e| format!("failed to check for {VERSIONS_TABLE} table: {e}"))?;
if present == 0 {
let ddl = oracle_create_versions_table(schema);
if let Err(err) = conn.execute(&ddl, &[]) {
if err.db_error().is_some_and(|de| de.code() == 955) {
tracing::debug!("versions table created by concurrent instance, proceeding");
} else {
return Err(format!("failed to create {VERSIONS_TABLE} table: {err}"));
}
}
conn.commit().map_err(|e| format!("commit failed: {e}"))?;
}
Ok(())
}
fn oracle_current_version(conn: &oracle::Connection, schema: &SchemaConfig) -> Result<u32, String> {
let versions_table = oracle_versions_table(schema);
let row: Option<i64> = conn
.query_row_as_named(&format!("SELECT MAX(version) FROM {versions_table}"), &[])
.map_err(|e| format!("failed to read current schema version: {e}"))?;
Ok(row.unwrap_or(0) as u32)
}
pub async fn run_postgres_migrations(
client: &mut tokio_postgres::Client,
schema: &SchemaConfig,
migrations: &[Migration],
config_version: Option<u32>,
auto_migrate: bool,
) -> Result<Vec<u32>, String> {
client
.batch_execute(&postgres_create_versions_table())
.await
.map_err(|e| format!("failed to create {VERSIONS_TABLE} table: {e}"))?;
let current = postgres_current_version(client).await?;
let skip_up_to = config_version.unwrap_or(0);
let effective_start = current.max(skip_up_to);
let pending: Vec<&Migration> = migrations
.iter()
.filter(|m| m.version > effective_start)
.collect();
if pending.is_empty() {
tracing::debug!(current_version = effective_start, "schema is up to date");
return Ok(Vec::new());
}
if !auto_migrate {
return Err(pending_migrations_error(
"postgres",
effective_start,
&pending,
schema,
));
}
const MIGRATION_LOCK_ID: i64 = 0x736D675F6D696772; client
.execute("SELECT pg_advisory_lock($1)", &[&MIGRATION_LOCK_ID])
.await
.map_err(|e| format!("failed to acquire migration lock: {e}"))?;
let result =
apply_postgres_migrations(client, schema, migrations, config_version, effective_start)
.await;
let _ = client
.execute("SELECT pg_advisory_unlock($1)", &[&MIGRATION_LOCK_ID])
.await;
result
}
async fn apply_postgres_migrations(
client: &mut tokio_postgres::Client,
schema: &SchemaConfig,
migrations: &[Migration],
config_version: Option<u32>,
pre_lock_start: u32,
) -> Result<Vec<u32>, String> {
let current = postgres_current_version(client).await?;
let skip_up_to = config_version.unwrap_or(0);
let effective_start = current.max(skip_up_to).max(pre_lock_start);
let pending: Vec<&Migration> = migrations
.iter()
.filter(|m| m.version > effective_start)
.collect();
if pending.is_empty() {
tracing::debug!(current_version = effective_start, "schema is up to date");
return Ok(Vec::new());
}
tracing::info!(
current_version = effective_start,
pending = pending.len(),
"applying schema migrations"
);
let mut applied = Vec::new();
for migration in pending {
let tx = client
.transaction()
.await
.map_err(|e| format!("failed to begin transaction: {e}"))?;
let stmts = (migration.up)(schema);
for stmt in &stmts {
if stmt.is_empty() {
continue;
}
tx.batch_execute(stmt).await.map_err(|e| {
format!(
"migration v{} ({}) failed: {}",
migration.version, migration.description, e
)
})?;
}
tx.execute(
&format!("INSERT INTO {VERSIONS_TABLE} (version, description) VALUES ($1, $2)"),
&[&(migration.version as i32), &migration.description],
)
.await
.map_err(|e| format!("failed to record migration v{}: {}", migration.version, e))?;
tx.commit()
.await
.map_err(|e| format!("commit failed: {e}"))?;
tracing::info!(
version = migration.version,
description = migration.description,
"applied migration"
);
applied.push(migration.version);
}
let final_version = postgres_current_version(client).await?;
tracing::info!(
schema_version = final_version,
"schema version after migrations"
);
Ok(applied)
}
async fn postgres_current_version(client: &tokio_postgres::Client) -> Result<u32, String> {
let row = client
.query_one(
&format!("SELECT COALESCE(MAX(version), 0) FROM {VERSIONS_TABLE}"),
&[],
)
.await
.map_err(|e| format!("failed to read current schema version: {e}"))?;
let version: i32 = row.get(0);
Ok(version as u32)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::TableConfig;
#[test]
fn oracle_versions_table_name_is_always_quoted() {
let no_owner = SchemaConfig::default();
assert_eq!(oracle_versions_table(&no_owner), "\"_schema_versions\"");
let with_owner = SchemaConfig {
owner: Some("ADMIN".to_string()),
..Default::default()
};
assert_eq!(
oracle_versions_table(&with_owner),
"ADMIN.\"_schema_versions\""
);
}
#[test]
fn oracle_versions_table_ddl_without_owner() {
let schema = SchemaConfig::default();
let ddl = oracle_create_versions_table(&schema);
assert!(
ddl.contains("\"_schema_versions\""),
"must be quoted for Oracle: {ddl}"
);
assert!(ddl.contains("PRIMARY KEY"));
}
#[test]
fn oracle_versions_table_ddl_with_owner() {
let schema = SchemaConfig {
owner: Some("ADMIN".to_string()),
..Default::default()
};
let ddl = oracle_create_versions_table(&schema);
assert!(ddl.contains("ADMIN.\"_schema_versions\""), "got: {ddl}");
}
#[test]
fn postgres_versions_table_ddl() {
let ddl = postgres_create_versions_table();
assert!(ddl.contains("IF NOT EXISTS"));
assert!(ddl.contains("_schema_versions"));
assert!(ddl.contains("PRIMARY KEY"));
}
#[test]
fn migration_up_respects_schema_config() {
let schema = SchemaConfig {
owner: Some("ADMIN".to_string()),
responses: TableConfig {
table: "MY_RESPONSES".to_string(),
..Default::default()
},
..Default::default()
};
let m = Migration {
version: 1,
description: "test",
up: |s| {
let t = s.responses.qualified_table(s.owner.as_deref());
vec![format!("ALTER TABLE {t} ADD COLUMN x INT")]
},
};
let stmts = (m.up)(&schema);
assert!(
stmts[0].contains("ADMIN.\"MY_RESPONSES\""),
"got: {}",
stmts[0]
);
}
#[test]
fn pending_migrations_error_includes_sql_and_hints() {
let schema = SchemaConfig::default();
let migrations = [
Migration {
version: 1,
description: "add col_x",
up: |_| vec!["ALTER TABLE t ADD COLUMN x INT".to_string()],
},
Migration {
version: 2,
description: "drop col_y",
up: |_| vec!["ALTER TABLE t DROP COLUMN y".to_string()],
},
];
let pending: Vec<&Migration> = migrations.iter().collect();
let err = pending_migrations_error("postgres", 0, &pending, &schema);
assert!(err.contains("v1: add col_x"), "should list v1: {err}");
assert!(err.contains("v2: drop col_y"), "should list v2: {err}");
assert!(
err.contains("ALTER TABLE t ADD COLUMN x INT"),
"should include SQL: {err}"
);
assert!(
err.contains("INSERT INTO _schema_versions"),
"should include version INSERT: {err}"
);
assert!(
err.contains("auto_migrate: true"),
"should hint auto_migrate: {err}"
);
assert!(
err.contains("version: 2"),
"should hint version skip: {err}"
);
}
#[test]
fn pending_migrations_error_shows_current_version() {
let schema = SchemaConfig::default();
let migrations = [Migration {
version: 3,
description: "test",
up: |_| vec!["SELECT 1".to_string()],
}];
let pending: Vec<&Migration> = migrations.iter().collect();
let err = pending_migrations_error("oracle", 2, &pending, &schema);
assert!(
err.contains("current version: 2"),
"should show current version: {err}"
);
assert!(
err.contains("latest version: 3"),
"should show target version: {err}"
);
}
}