use std::sync::atomic::{AtomicU64, Ordering};
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use crate::ai::{
AddRelation, FieldSpec, OnDelete, Plan, Primitive, RemoveField, RemoveRelation,
};
fn default_dev_url() -> String {
"postgres://postgres:dev@localhost:5432/rustio_dev".to_string()
}
async fn connect_pg() -> PgPool {
let url =
std::env::var("RUSTIO_TEST_DATABASE_URL").unwrap_or_else(|_| default_dev_url());
PgPoolOptions::new()
.max_connections(2)
.connect(&url)
.await
.unwrap_or_else(|e| panic!("could not connect to {url}: {e}"))
}
static TABLE_COUNTER: AtomicU64 = AtomicU64::new(0);
fn fresh_table_name() -> String {
let seq = TABLE_COUNTER.fetch_add(1, Ordering::SeqCst);
let pid = std::process::id();
format!("pg_t_{pid}_{seq}")
}
async fn drop_table(pool: &PgPool, table: &str) {
let _ = sqlx::query(&format!("DROP TABLE IF EXISTS {table} CASCADE"))
.execute(pool)
.await;
}
async fn run_migration(pool: &PgPool, sql: &str) {
for stmt in sql.split(';') {
let trimmed = stmt.trim();
if trimmed.is_empty() || trimmed.lines().all(|l| l.trim_start().starts_with("--")) {
continue;
}
sqlx::query(trimmed)
.execute(pool)
.await
.unwrap_or_else(|e| panic!("statement failed:\n SQL: {trimmed}\n err: {e}"));
}
}
#[tokio::test]
#[ignore = "needs `RUSTIO_TEST_DB=1` + a running postgres (URL via RUSTIO_TEST_DATABASE_URL or default)"]
async fn pg_add_field_appends_column_with_pg_type() {
use super::executor::sql_for_add_field;
let pool = connect_pg().await;
let table = fresh_table_name();
sqlx::query(&format!(
"CREATE TABLE {table} (id BIGSERIAL PRIMARY KEY, title TEXT NOT NULL DEFAULT '')"
))
.execute(&pool)
.await
.unwrap();
let sql = sql_for_add_field(
&table,
&FieldSpec {
name: "priority".into(),
ty: "i32".into(),
nullable: false,
editable: true,
},
);
run_migration(&pool, &sql).await;
let row: (String, String, String) = sqlx::query_as(
"SELECT data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = $1 AND column_name = 'priority'",
)
.bind(&table)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, "integer", "i32 should map to PG `integer`");
assert_eq!(row.1, "NO", "non-nullable add should be NOT NULL");
assert_eq!(row.2, "0", "default literal should be `0`");
let sql2 = sql_for_add_field(
&table,
&FieldSpec {
name: "score".into(),
ty: "i64".into(),
nullable: false,
editable: true,
},
);
run_migration(&pool, &sql2).await;
let (ty,): (String,) = sqlx::query_as(
"SELECT data_type FROM information_schema.columns
WHERE table_name = $1 AND column_name = 'score'",
)
.bind(&table)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(ty, "bigint", "i64 should map to PG `bigint`");
let sql3 = sql_for_add_field(
&table,
&FieldSpec {
name: "active".into(),
ty: "bool".into(),
nullable: false,
editable: true,
},
);
run_migration(&pool, &sql3).await;
let (ty, dflt): (String, String) = sqlx::query_as(
"SELECT data_type, column_default FROM information_schema.columns
WHERE table_name = $1 AND column_name = 'active'",
)
.bind(&table)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(ty, "boolean", "bool should map to PG `boolean`");
assert_eq!(dflt, "false", "bool default should be `false`, not `0`");
let sql4 = sql_for_add_field(
&table,
&FieldSpec {
name: "completed_at".into(),
ty: "DateTime".into(),
nullable: true,
editable: true,
},
);
run_migration(&pool, &sql4).await;
let (ty, nullable): (String, String) = sqlx::query_as(
"SELECT data_type, is_nullable FROM information_schema.columns
WHERE table_name = $1 AND column_name = 'completed_at'",
)
.bind(&table)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(
ty, "timestamp with time zone",
"DateTime should map to PG `timestamptz`"
);
assert_eq!(nullable, "YES", "nullable flag should propagate");
drop_table(&pool, &table).await;
}
#[tokio::test]
#[ignore = "needs `RUSTIO_TEST_DB=1` + a running postgres (URL via RUSTIO_TEST_DATABASE_URL or default)"]
async fn pg_remove_field_drops_column_and_dependent_constraints() {
let pool = connect_pg().await;
let table = fresh_table_name();
let parent = format!("{table}_parent");
sqlx::query(&format!(
"CREATE TABLE {parent} (id BIGSERIAL PRIMARY KEY, name TEXT NOT NULL DEFAULT '')"
))
.execute(&pool)
.await
.unwrap();
sqlx::query(&format!(
"CREATE TABLE {table} (\
id BIGSERIAL PRIMARY KEY, \
parent_id BIGINT NOT NULL REFERENCES {parent}(id) ON DELETE RESTRICT, \
note TEXT NOT NULL DEFAULT ''\
)"
))
.execute(&pool)
.await
.unwrap();
sqlx::query(&format!(
"CREATE INDEX {table}_parent_idx ON {table} (parent_id)"
))
.execute(&pool)
.await
.unwrap();
let sql = format!(
"-- header\n\
ALTER TABLE {table} DROP COLUMN parent_id CASCADE;\n"
);
run_migration(&pool, &sql).await;
let col_count: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM information_schema.columns
WHERE table_name = $1 AND column_name = 'parent_id'",
)
.bind(&table)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(col_count.0, 0, "parent_id should be dropped");
let fk_count: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM information_schema.table_constraints
WHERE table_name = $1 AND constraint_type = 'FOREIGN KEY'",
)
.bind(&table)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(fk_count.0, 0, "the dependent FK should be CASCADEd");
let idx_count: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM pg_indexes
WHERE tablename = $1 AND indexname = $2",
)
.bind(&table)
.bind(format!("{table}_parent_idx"))
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(idx_count.0, 0, "the dependent index should be CASCADEd");
let other: (String,) = sqlx::query_as(
"SELECT column_name FROM information_schema.columns
WHERE table_name = $1 AND column_name = 'note'",
)
.bind(&table)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(other.0, "note");
drop_table(&pool, &table).await;
drop_table(&pool, &parent).await;
}
#[tokio::test]
#[ignore = "needs `RUSTIO_TEST_DB=1` + a running postgres (URL via RUSTIO_TEST_DATABASE_URL or default)"]
async fn pg_change_field_type_rewrites_column_in_place() {
let pool = connect_pg().await;
let table = fresh_table_name();
sqlx::query(&format!(
"CREATE TABLE {table} (id BIGSERIAL PRIMARY KEY, score INTEGER NOT NULL DEFAULT 0)"
))
.execute(&pool)
.await
.unwrap();
sqlx::query(&format!("INSERT INTO {table} (score) VALUES (42), (7), (0)"))
.execute(&pool)
.await
.unwrap();
let sql = format!(
"-- header\n\
ALTER TABLE {table} ALTER COLUMN score TYPE TEXT USING (score::TEXT);\n"
);
run_migration(&pool, &sql).await;
let (ty,): (String,) = sqlx::query_as(
"SELECT data_type FROM information_schema.columns
WHERE table_name = $1 AND column_name = 'score'",
)
.bind(&table)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(ty, "text", "i32 → String should land as PG `text`");
let mut rows: Vec<(String,)> =
sqlx::query_as(&format!("SELECT score FROM {table} ORDER BY id"))
.fetch_all(&pool)
.await
.unwrap();
rows.sort();
assert_eq!(
rows,
vec![("0".to_string(),), ("42".to_string(),), ("7".to_string(),)],
"every row's i32 should round-trip as its text representation"
);
drop_table(&pool, &table).await;
}
#[tokio::test]
#[ignore = "needs `RUSTIO_TEST_DB=1` + a running postgres (URL via RUSTIO_TEST_DATABASE_URL or default)"]
async fn pg_change_field_type_works_on_fk_bearing_table() {
let pool = connect_pg().await;
let parent = fresh_table_name();
let child = fresh_table_name();
sqlx::query(&format!(
"CREATE TABLE {parent} (id BIGSERIAL PRIMARY KEY, label TEXT NOT NULL DEFAULT '')"
))
.execute(&pool)
.await
.unwrap();
sqlx::query(&format!(
"CREATE TABLE {child} (\
id BIGSERIAL PRIMARY KEY, \
parent_id BIGINT NOT NULL REFERENCES {parent}(id) ON DELETE RESTRICT, \
quantity INTEGER NOT NULL DEFAULT 0\
)"
))
.execute(&pool)
.await
.unwrap();
run_migration(
&pool,
&format!(
"ALTER TABLE {child} ALTER COLUMN quantity TYPE BIGINT USING (quantity::BIGINT);"
),
)
.await;
let (ty,): (String,) = sqlx::query_as(
"SELECT data_type FROM information_schema.columns
WHERE table_name = $1 AND column_name = 'quantity'",
)
.bind(&child)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(ty, "bigint");
let (fk_count,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM information_schema.table_constraints
WHERE table_name = $1 AND constraint_type = 'FOREIGN KEY'",
)
.bind(&child)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(fk_count, 1, "FK constraint must survive the column type change");
drop_table(&pool, &child).await;
drop_table(&pool, &parent).await;
}
#[tokio::test]
#[ignore = "needs `RUSTIO_TEST_DB=1` + a running postgres (URL via RUSTIO_TEST_DATABASE_URL or default)"]
async fn pg_change_field_nullability_relax_then_tighten() {
let pool = connect_pg().await;
let table = fresh_table_name();
sqlx::query(&format!(
"CREATE TABLE {table} (id BIGSERIAL PRIMARY KEY, note TEXT NOT NULL DEFAULT '')"
))
.execute(&pool)
.await
.unwrap();
sqlx::query(&format!("INSERT INTO {table} (note) VALUES ('hello'), ('')"))
.execute(&pool)
.await
.unwrap();
run_migration(
&pool,
&format!("ALTER TABLE {table} ALTER COLUMN note DROP NOT NULL;"),
)
.await;
let (nullable,): (String,) = sqlx::query_as(
"SELECT is_nullable FROM information_schema.columns
WHERE table_name = $1 AND column_name = 'note'",
)
.bind(&table)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(nullable, "YES", "DROP NOT NULL should flip nullability");
sqlx::query(&format!("INSERT INTO {table} (note) VALUES (NULL)"))
.execute(&pool)
.await
.unwrap();
run_migration(
&pool,
&format!(
"UPDATE {table} SET note = '' WHERE note IS NULL;\n\
ALTER TABLE {table} ALTER COLUMN note SET NOT NULL;"
),
)
.await;
let (nullable,): (String,) = sqlx::query_as(
"SELECT is_nullable FROM information_schema.columns
WHERE table_name = $1 AND column_name = 'note'",
)
.bind(&table)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(nullable, "NO", "SET NOT NULL should re-tighten");
let (count,): (i64,) =
sqlx::query_as(&format!("SELECT COUNT(*) FROM {table} WHERE note = ''"))
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(count, 2, "the backfill should have replaced the NULL with ''");
drop_table(&pool, &table).await;
}
#[tokio::test]
#[ignore = "needs `RUSTIO_TEST_DB=1` + a running postgres (URL via RUSTIO_TEST_DATABASE_URL or default)"]
async fn pg_add_relation_creates_fk_constraint() {
let pool = connect_pg().await;
let parent = fresh_table_name();
let child = fresh_table_name();
sqlx::query(&format!(
"CREATE TABLE {parent} (id BIGSERIAL PRIMARY KEY, name TEXT NOT NULL DEFAULT '')"
))
.execute(&pool)
.await
.unwrap();
sqlx::query(&format!("CREATE TABLE {child} (id BIGSERIAL PRIMARY KEY)"))
.execute(&pool)
.await
.unwrap();
let sql = format!(
"-- header\n\
ALTER TABLE {child} ADD COLUMN parent_id BIGINT REFERENCES {parent}(id) ON DELETE RESTRICT;\n"
);
run_migration(&pool, &sql).await;
let (ty, nullable): (String, String) = sqlx::query_as(
"SELECT data_type, is_nullable FROM information_schema.columns
WHERE table_name = $1 AND column_name = 'parent_id'",
)
.bind(&child)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(ty, "bigint");
assert_eq!(nullable, "YES");
let (fk_count,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM information_schema.table_constraints
WHERE table_name = $1 AND constraint_type = 'FOREIGN KEY'",
)
.bind(&child)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(fk_count, 1, "FK constraint should be in place");
sqlx::query(&format!("INSERT INTO {parent} (name) VALUES ('p1') RETURNING id"))
.fetch_one(&pool)
.await
.unwrap();
let (parent_id,): (i64,) = sqlx::query_as(&format!("SELECT id FROM {parent} LIMIT 1"))
.fetch_one(&pool)
.await
.unwrap();
sqlx::query(&format!("INSERT INTO {child} (parent_id) VALUES ($1)"))
.bind(parent_id)
.execute(&pool)
.await
.unwrap();
let delete_err = sqlx::query(&format!("DELETE FROM {parent} WHERE id = $1"))
.bind(parent_id)
.execute(&pool)
.await;
assert!(
delete_err.is_err(),
"ON DELETE RESTRICT should reject deleting a parent with children: {delete_err:?}"
);
drop_table(&pool, &child).await;
drop_table(&pool, &parent).await;
}
#[tokio::test]
#[ignore = "needs `RUSTIO_TEST_DB=1` + a running postgres (URL via RUSTIO_TEST_DATABASE_URL or default)"]
async fn pg_remove_relation_drops_fk_column_and_constraint() {
let pool = connect_pg().await;
let parent = fresh_table_name();
let child = fresh_table_name();
sqlx::query(&format!(
"CREATE TABLE {parent} (id BIGSERIAL PRIMARY KEY, name TEXT NOT NULL DEFAULT '')"
))
.execute(&pool)
.await
.unwrap();
sqlx::query(&format!(
"CREATE TABLE {child} (\
id BIGSERIAL PRIMARY KEY, \
parent_id BIGINT REFERENCES {parent}(id) ON DELETE CASCADE\
)"
))
.execute(&pool)
.await
.unwrap();
let (fk_count_before,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM information_schema.table_constraints
WHERE table_name = $1 AND constraint_type = 'FOREIGN KEY'",
)
.bind(&child)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(fk_count_before, 1);
run_migration(
&pool,
&format!("ALTER TABLE {child} DROP COLUMN parent_id CASCADE;"),
)
.await;
let (fk_count_after,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM information_schema.table_constraints
WHERE table_name = $1 AND constraint_type = 'FOREIGN KEY'",
)
.bind(&child)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(fk_count_after, 0, "FK constraint should be CASCADEd out");
let (col_count,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM information_schema.columns
WHERE table_name = $1 AND column_name = 'parent_id'",
)
.bind(&child)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(col_count, 0, "FK column should be dropped");
drop_table(&pool, &child).await;
drop_table(&pool, &parent).await;
}
#[tokio::test]
#[ignore = "needs `RUSTIO_TEST_DB=1` + a running postgres (URL via RUSTIO_TEST_DATABASE_URL or default)"]
async fn pg_retrofit_adds_fk_constraint_in_place() {
use crate::ai::plan_retrofit_foreign_keys;
use crate::schema::{Relation, RelationKind, Schema, SchemaField, SchemaModel, SCHEMA_VERSION};
let pool = connect_pg().await;
let parent = fresh_table_name();
let child = fresh_table_name();
sqlx::query(&format!(
"CREATE TABLE {parent} (id BIGSERIAL PRIMARY KEY, name TEXT NOT NULL DEFAULT '')"
))
.execute(&pool)
.await
.unwrap();
sqlx::query(&format!(
"CREATE TABLE {child} (id BIGSERIAL PRIMARY KEY, parent_id BIGINT)"
))
.execute(&pool)
.await
.unwrap();
sqlx::query(&format!("INSERT INTO {parent} (name) VALUES ('p1') RETURNING id"))
.fetch_one(&pool)
.await
.unwrap();
let (parent_id,): (i64,) = sqlx::query_as(&format!("SELECT id FROM {parent} LIMIT 1"))
.fetch_one(&pool)
.await
.unwrap();
sqlx::query(&format!("INSERT INTO {child} (parent_id) VALUES ($1)"))
.bind(parent_id)
.execute(&pool)
.await
.unwrap();
let (fk_before,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM information_schema.table_constraints
WHERE table_name = $1 AND constraint_type = 'FOREIGN KEY'",
)
.bind(&child)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(fk_before, 0);
let _schema = Schema {
version: SCHEMA_VERSION,
rustio_version: "test".into(),
models: vec![
SchemaModel {
name: "Parent".into(),
table: parent.clone(),
admin_name: parent.clone(),
display_name: "Parents".into(),
singular_name: "Parent".into(),
fields: vec![],
relations: vec![],
core: false,
},
SchemaModel {
name: "Child".into(),
table: child.clone(),
admin_name: child.clone(),
display_name: "Children".into(),
singular_name: "Child".into(),
fields: vec![SchemaField {
name: "parent_id".into(),
ty: "i64".into(),
nullable: true,
editable: true,
relation: Some(Relation {
model: "Parent".into(),
field: "id".into(),
kind: RelationKind::BelongsTo,
display_field: None,
required: None,
on_delete: None, }),
}],
relations: vec![],
core: false,
},
],
};
let sql = format!(
"BEGIN;\n\
ALTER TABLE {child}\n \
ADD CONSTRAINT {child}_parent_id_fk \
FOREIGN KEY (parent_id) REFERENCES {parent}(id) ON DELETE RESTRICT;\n\
COMMIT;\n"
);
run_migration(&pool, &sql).await;
let (fk_after,): (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM information_schema.table_constraints
WHERE table_name = $1 AND constraint_type = 'FOREIGN KEY'",
)
.bind(&child)
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(fk_after, 1, "FK should be retrofitted onto the existing column");
let del = sqlx::query(&format!("DELETE FROM {parent} WHERE id = $1"))
.bind(parent_id)
.execute(&pool)
.await;
assert!(del.is_err(), "ON DELETE RESTRICT should block parent delete: {del:?}");
sqlx::query(&format!("DELETE FROM {child}"))
.execute(&pool)
.await
.unwrap();
sqlx::query(&format!("DELETE FROM {parent}"))
.execute(&pool)
.await
.unwrap();
drop_table(&pool, &child).await;
drop_table(&pool, &parent).await;
use crate::ai::RetrofitReport;
let report: RetrofitReport = plan_retrofit_foreign_keys(&Schema {
version: SCHEMA_VERSION,
rustio_version: "t".into(),
models: vec![
SchemaModel {
name: "Parent".into(),
table: "parents".into(),
admin_name: "parents".into(),
display_name: "Parents".into(),
singular_name: "Parent".into(),
fields: vec![SchemaField {
name: "id".into(),
ty: "i64".into(),
nullable: false,
editable: false,
relation: None,
}],
relations: vec![],
core: false,
},
SchemaModel {
name: "Child".into(),
table: "childs".into(), admin_name: "childs".into(),
display_name: "Children".into(),
singular_name: "Child".into(),
fields: vec![SchemaField {
name: "parent_id".into(),
ty: "i64".into(),
nullable: true,
editable: true,
relation: Some(Relation {
model: "Parent".into(),
field: "id".into(),
kind: RelationKind::BelongsTo,
display_field: None,
required: None,
on_delete: None,
}),
}],
relations: vec![],
core: false,
},
],
});
assert_eq!(report.upgraded.len(), 1);
let (_, mig_sql) = &report.migrations[0];
assert!(mig_sql.contains("ALTER TABLE childs"), "mig:\n{mig_sql}");
assert!(
mig_sql.contains("ADD CONSTRAINT childs_parent_id_fk"),
"mig:\n{mig_sql}"
);
assert!(!mig_sql.contains("CREATE TABLE"), "no recreate:\n{mig_sql}");
}
#[allow(dead_code)] fn add_relation(from: &str, to: &str, via: &str, required: bool, on_delete: OnDelete) -> Plan {
Plan::new(vec![Primitive::AddRelation(AddRelation {
from: from.into(),
kind: crate::schema::RelationKind::BelongsTo,
to: to.into(),
via: via.into(),
required,
on_delete,
})])
}
#[allow(dead_code)]
fn remove_field(model: &str, field: &str) -> Plan {
Plan::new(vec![Primitive::RemoveField(RemoveField {
model: model.into(),
field: field.into(),
})])
}
#[allow(dead_code)]
fn remove_relation(from: &str, via: &str) -> Plan {
Plan::new(vec![Primitive::RemoveRelation(RemoveRelation {
from: from.into(),
via: via.into(),
})])
}