use sea_query::{Alias, ForeignKey, Query, Table};
use vespertide_core::{TableConstraint, TableDef};
use super::helpers::{build_sqlite_temp_table_create, recreate_indexes_after_rebuild};
use super::rename_table::build_rename_table;
use super::types::{BuiltQuery, DatabaseBackend};
use crate::error::QueryError;
use crate::sql::RawSql;
pub fn build_remove_constraint(
backend: &DatabaseBackend,
table: &str,
constraint: &TableConstraint,
current_schema: &[TableDef],
pending_constraints: &[TableConstraint],
) -> Result<Vec<BuiltQuery>, QueryError> {
match constraint {
TableConstraint::PrimaryKey { .. } => {
if *backend == DatabaseBackend::Sqlite {
let table_def = current_schema.iter().find(|t| t.name == table).ok_or_else(|| QueryError::Other(format!("Table '{}' not found in current schema. SQLite requires current schema information to remove constraints.", table)))?;
let mut new_constraints = table_def.constraints.clone();
new_constraints.retain(|c| !matches!(c, TableConstraint::PrimaryKey { .. }));
let temp_table = format!("{}_temp", table);
let create_query = build_sqlite_temp_table_create(
backend,
&temp_table,
table,
&table_def.columns,
&new_constraints,
);
let column_aliases: Vec<Alias> = table_def
.columns
.iter()
.map(|c| Alias::new(&c.name))
.collect();
let mut select_query = Query::select();
for col_alias in &column_aliases {
select_query = select_query.column(col_alias.clone()).to_owned();
}
select_query = select_query.from(Alias::new(table)).to_owned();
let insert_stmt = Query::insert()
.into_table(Alias::new(&temp_table))
.columns(column_aliases.clone())
.select_from(select_query)
.unwrap()
.to_owned();
let insert_query = BuiltQuery::Insert(Box::new(insert_stmt));
let drop_table = Table::drop().table(Alias::new(table)).to_owned();
let drop_query = BuiltQuery::DropTable(Box::new(drop_table));
let rename_query = build_rename_table(&temp_table, table);
let index_queries = recreate_indexes_after_rebuild(
table,
&table_def.constraints,
pending_constraints,
);
let mut queries = vec![create_query, insert_query, drop_query, rename_query];
queries.extend(index_queries);
Ok(queries)
} else {
let pg_sql = format!(
"ALTER TABLE \"{}\" DROP CONSTRAINT \"{}_pkey\"",
table, table
);
let mysql_sql = format!("ALTER TABLE `{}` DROP PRIMARY KEY", table);
Ok(vec![BuiltQuery::Raw(RawSql::per_backend(
pg_sql.clone(),
mysql_sql,
pg_sql,
))])
}
}
TableConstraint::Unique { name, columns } => {
if *backend == DatabaseBackend::Sqlite {
let table_def = current_schema.iter().find(|t| t.name == table).ok_or_else(|| QueryError::Other(format!("Table '{}' not found in current schema. SQLite requires current schema information to remove constraints.", table)))?;
let mut new_constraints = table_def.constraints.clone();
new_constraints.retain(|c| {
match (c, constraint) {
(
TableConstraint::Unique {
name: c_name,
columns: c_cols,
},
TableConstraint::Unique {
name: r_name,
columns: r_cols,
},
) => {
if let (Some(cn), Some(rn)) = (c_name, r_name) {
cn != rn
} else {
c_cols != r_cols
}
}
_ => true,
}
});
let temp_table = format!("{}_temp", table);
let create_query = build_sqlite_temp_table_create(
backend,
&temp_table,
table,
&table_def.columns,
&new_constraints,
);
let column_aliases: Vec<Alias> = table_def
.columns
.iter()
.map(|c| Alias::new(&c.name))
.collect();
let mut select_query = Query::select();
for col_alias in &column_aliases {
select_query = select_query.column(col_alias.clone()).to_owned();
}
select_query = select_query.from(Alias::new(table)).to_owned();
let insert_stmt = Query::insert()
.into_table(Alias::new(&temp_table))
.columns(column_aliases.clone())
.select_from(select_query)
.unwrap()
.to_owned();
let insert_query = BuiltQuery::Insert(Box::new(insert_stmt));
let drop_table = Table::drop().table(Alias::new(table)).to_owned();
let drop_query = BuiltQuery::DropTable(Box::new(drop_table));
let rename_query = build_rename_table(&temp_table, table);
let index_queries =
recreate_indexes_after_rebuild(table, &new_constraints, pending_constraints);
let mut queries = vec![create_query, insert_query, drop_query, rename_query];
queries.extend(index_queries);
Ok(queries)
} else {
let constraint_name = vespertide_naming::build_unique_constraint_name(
table,
columns,
name.as_deref(),
);
let pg_sql = format!("DROP INDEX \"{}\"", constraint_name);
let mysql_sql = format!("ALTER TABLE `{}` DROP INDEX `{}`", table, constraint_name);
let sqlite_sql = format!("DROP INDEX \"{}\"", constraint_name);
Ok(vec![BuiltQuery::Raw(RawSql::per_backend(
pg_sql, mysql_sql, sqlite_sql,
))])
}
}
TableConstraint::ForeignKey { name, columns, .. } => {
if *backend == DatabaseBackend::Sqlite {
let table_def = current_schema.iter().find(|t| t.name == table).ok_or_else(|| QueryError::Other(format!("Table '{}' not found in current schema. SQLite requires current schema information to remove constraints.", table)))?;
let mut new_constraints = table_def.constraints.clone();
new_constraints.retain(|c| {
match (c, constraint) {
(
TableConstraint::ForeignKey {
name: c_name,
columns: c_cols,
..
},
TableConstraint::ForeignKey {
name: r_name,
columns: r_cols,
..
},
) => {
if let (Some(cn), Some(rn)) = (c_name, r_name) {
cn != rn
} else {
c_cols != r_cols
}
}
_ => true,
}
});
let temp_table = format!("{}_temp", table);
let create_query = build_sqlite_temp_table_create(
backend,
&temp_table,
table,
&table_def.columns,
&new_constraints,
);
let column_aliases: Vec<Alias> = table_def
.columns
.iter()
.map(|c| Alias::new(&c.name))
.collect();
let mut select_query = Query::select();
for col_alias in &column_aliases {
select_query = select_query.column(col_alias.clone()).to_owned();
}
select_query = select_query.from(Alias::new(table)).to_owned();
let insert_stmt = Query::insert()
.into_table(Alias::new(&temp_table))
.columns(column_aliases.clone())
.select_from(select_query)
.unwrap()
.to_owned();
let insert_query = BuiltQuery::Insert(Box::new(insert_stmt));
let drop_table = Table::drop().table(Alias::new(table)).to_owned();
let drop_query = BuiltQuery::DropTable(Box::new(drop_table));
let rename_query = build_rename_table(&temp_table, table);
let index_queries = recreate_indexes_after_rebuild(
table,
&table_def.constraints,
pending_constraints,
);
let mut queries = vec![create_query, insert_query, drop_query, rename_query];
queries.extend(index_queries);
Ok(queries)
} else {
let constraint_name =
vespertide_naming::build_foreign_key_name(table, columns, name.as_deref());
let fk_drop = ForeignKey::drop()
.name(&constraint_name)
.table(Alias::new(table))
.to_owned();
Ok(vec![BuiltQuery::DropForeignKey(Box::new(fk_drop))])
}
}
TableConstraint::Index { name, columns } => {
let index_name = if let Some(n) = name {
vespertide_naming::build_index_name(table, columns, Some(n))
} else {
vespertide_naming::build_index_name(table, columns, None)
};
let idx_drop = sea_query::Index::drop()
.table(Alias::new(table))
.name(&index_name)
.to_owned();
Ok(vec![BuiltQuery::DropIndex(Box::new(idx_drop))])
}
TableConstraint::Check { name, .. } => {
if *backend == DatabaseBackend::Sqlite {
let table_def = current_schema.iter().find(|t| t.name == table).ok_or_else(|| QueryError::Other(format!("Table '{}' not found in current schema. SQLite requires current schema information to remove constraints.", table)))?;
let mut new_constraints = table_def.constraints.clone();
new_constraints.retain(|c| match (c, constraint) {
(
TableConstraint::Check { name: c_name, .. },
TableConstraint::Check { name: r_name, .. },
) => c_name != r_name,
_ => true,
});
let temp_table = format!("{}_temp", table);
let create_query = build_sqlite_temp_table_create(
backend,
&temp_table,
table,
&table_def.columns,
&new_constraints,
);
let column_aliases: Vec<Alias> = table_def
.columns
.iter()
.map(|c| Alias::new(&c.name))
.collect();
let mut select_query = Query::select();
for col_alias in &column_aliases {
select_query = select_query.column(col_alias.clone()).to_owned();
}
select_query = select_query.from(Alias::new(table)).to_owned();
let insert_stmt = Query::insert()
.into_table(Alias::new(&temp_table))
.columns(column_aliases.clone())
.select_from(select_query)
.unwrap()
.to_owned();
let insert_query = BuiltQuery::Insert(Box::new(insert_stmt));
let drop_table = Table::drop().table(Alias::new(table)).to_owned();
let drop_query = BuiltQuery::DropTable(Box::new(drop_table));
let rename_query = build_rename_table(&temp_table, table);
let index_queries = recreate_indexes_after_rebuild(
table,
&table_def.constraints,
pending_constraints,
);
let mut queries = vec![create_query, insert_query, drop_query, rename_query];
queries.extend(index_queries);
Ok(queries)
} else {
let pg_sql = format!("ALTER TABLE \"{}\" DROP CONSTRAINT \"{}\"", table, name);
let mysql_sql = format!("ALTER TABLE `{}` DROP CHECK `{}`", table, name);
Ok(vec![BuiltQuery::Raw(RawSql::per_backend(
pg_sql.clone(),
mysql_sql,
pg_sql,
))])
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sql::types::DatabaseBackend;
use insta::{assert_snapshot, with_settings};
use rstest::rstest;
use vespertide_core::{ColumnDef, ColumnType, SimpleColumnType, TableConstraint, TableDef};
#[rstest]
#[case::remove_constraint_primary_key_postgres(
"remove_constraint_primary_key_postgres",
DatabaseBackend::Postgres,
&["DROP CONSTRAINT \"users_pkey\""]
)]
#[case::remove_constraint_primary_key_mysql(
"remove_constraint_primary_key_mysql",
DatabaseBackend::MySql,
&["DROP PRIMARY KEY"]
)]
#[case::remove_constraint_primary_key_sqlite(
"remove_constraint_primary_key_sqlite",
DatabaseBackend::Sqlite,
&["CREATE TABLE \"users_temp\""]
)]
#[case::remove_constraint_unique_named_postgres(
"remove_constraint_unique_named_postgres",
DatabaseBackend::Postgres,
&["DROP INDEX \"uq_users__uq_email\""]
)]
#[case::remove_constraint_unique_named_mysql(
"remove_constraint_unique_named_mysql",
DatabaseBackend::MySql,
&["DROP INDEX `uq_users__uq_email`"]
)]
#[case::remove_constraint_unique_named_sqlite(
"remove_constraint_unique_named_sqlite",
DatabaseBackend::Sqlite,
&["CREATE TABLE \"users_temp\""]
)]
#[case::remove_constraint_foreign_key_named_postgres(
"remove_constraint_foreign_key_named_postgres",
DatabaseBackend::Postgres,
&["DROP CONSTRAINT \"fk_users__fk_user\""]
)]
#[case::remove_constraint_foreign_key_named_mysql(
"remove_constraint_foreign_key_named_mysql",
DatabaseBackend::MySql,
&["DROP FOREIGN KEY `fk_users__fk_user`"]
)]
#[case::remove_constraint_foreign_key_named_sqlite(
"remove_constraint_foreign_key_named_sqlite",
DatabaseBackend::Sqlite,
&["CREATE TABLE \"users_temp\""]
)]
#[case::remove_constraint_check_named_postgres(
"remove_constraint_check_named_postgres",
DatabaseBackend::Postgres,
&["DROP CONSTRAINT \"chk_age\""]
)]
#[case::remove_constraint_check_named_mysql(
"remove_constraint_check_named_mysql",
DatabaseBackend::MySql,
&["DROP CHECK `chk_age`"]
)]
#[case::remove_constraint_check_named_sqlite(
"remove_constraint_check_named_sqlite",
DatabaseBackend::Sqlite,
&["CREATE TABLE \"users_temp\""]
)]
fn test_remove_constraint(
#[case] title: &str,
#[case] backend: DatabaseBackend,
#[case] expected: &[&str],
) {
let constraint = if title.contains("primary_key") {
TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
}
} else if title.contains("unique") {
TableConstraint::Unique {
name: Some("uq_email".into()),
columns: vec!["email".into()],
}
} else if title.contains("foreign_key") {
TableConstraint::ForeignKey {
name: Some("fk_user".into()),
columns: vec!["user_id".into()],
ref_table: "users".into(),
ref_columns: vec!["id".into()],
on_delete: None,
on_update: None,
}
} else {
TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 0".into(),
}
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: if title.contains("check") {
vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "age".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
]
} else if title.contains("foreign_key") {
vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "user_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
]
} else {
vec![ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}]
},
constraints: vec![constraint.clone()],
}];
let result =
build_remove_constraint(&backend, "users", &constraint, ¤t_schema, &[]).unwrap();
let sql = result[0].build(backend);
for exp in expected {
assert!(
sql.contains(exp),
"Expected SQL to contain '{}', got: {}",
exp,
sql
);
}
with_settings!({ snapshot_suffix => format!("remove_constraint_{}", title) }, {
assert_snapshot!(result.iter().map(|q| q.build(backend)).collect::<Vec<String>>().join("\n"));
});
}
#[test]
fn test_remove_constraint_primary_key_sqlite_table_not_found() {
let constraint = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
};
let result = build_remove_constraint(
&DatabaseBackend::Sqlite,
"nonexistent_table",
&constraint,
&[], &[],
);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Table 'nonexistent_table' not found in current schema"));
}
#[rstest]
#[case::remove_primary_key_with_index_postgres(DatabaseBackend::Postgres)]
#[case::remove_primary_key_with_index_mysql(DatabaseBackend::MySql)]
#[case::remove_primary_key_with_index_sqlite(DatabaseBackend::Sqlite)]
fn test_remove_constraint_primary_key_with_index(#[case] backend: DatabaseBackend) {
let constraint = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![
constraint.clone(),
TableConstraint::Index {
name: Some("idx_id".into()),
columns: vec!["id".into()],
},
],
}];
let result =
build_remove_constraint(&backend, "users", &constraint, ¤t_schema, &[]).unwrap();
let sql = result
.iter()
.map(|q| q.build(backend))
.collect::<Vec<String>>()
.join("\n");
if matches!(backend, DatabaseBackend::Sqlite) {
assert!(sql.contains("CREATE INDEX"));
assert!(sql.contains("ix_users__idx_id"));
}
with_settings!({ snapshot_suffix => format!("remove_primary_key_with_index_{:?}", backend) }, {
assert_snapshot!(sql);
});
}
#[rstest]
#[case::remove_primary_key_with_unique_constraint_postgres(DatabaseBackend::Postgres)]
#[case::remove_primary_key_with_unique_constraint_mysql(DatabaseBackend::MySql)]
#[case::remove_primary_key_with_unique_constraint_sqlite(DatabaseBackend::Sqlite)]
fn test_remove_constraint_primary_key_with_unique_constraint(#[case] backend: DatabaseBackend) {
let constraint = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![
constraint.clone(),
TableConstraint::Unique {
name: Some("uq_email".into()),
columns: vec!["email".into()],
},
],
}];
let result =
build_remove_constraint(&backend, "users", &constraint, ¤t_schema, &[]).unwrap();
let sql = result
.iter()
.map(|q| q.build(backend))
.collect::<Vec<String>>()
.join("\n");
if matches!(backend, DatabaseBackend::Sqlite) {
assert!(sql.contains("CREATE TABLE"));
}
with_settings!({ snapshot_suffix => format!("remove_primary_key_with_unique_constraint_{:?}", backend) }, {
assert_snapshot!(sql);
});
}
#[test]
fn test_remove_constraint_unique_sqlite_table_not_found() {
let constraint = TableConstraint::Unique {
name: Some("uq_email".into()),
columns: vec!["email".into()],
};
let result = build_remove_constraint(
&DatabaseBackend::Sqlite,
"nonexistent_table",
&constraint,
&[], &[],
);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Table 'nonexistent_table' not found in current schema"));
}
#[rstest]
#[case::remove_unique_without_name_postgres(DatabaseBackend::Postgres)]
#[case::remove_unique_without_name_mysql(DatabaseBackend::MySql)]
#[case::remove_unique_without_name_sqlite(DatabaseBackend::Sqlite)]
fn test_remove_constraint_unique_without_name(#[case] backend: DatabaseBackend) {
let constraint = TableConstraint::Unique {
name: None,
columns: vec!["email".into()],
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "email".into(),
r#type: ColumnType::Simple(SimpleColumnType::Text),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
],
constraints: vec![constraint.clone()],
}];
let result =
build_remove_constraint(&backend, "users", &constraint, ¤t_schema, &[]).unwrap();
let sql = result
.iter()
.map(|q| q.build(backend))
.collect::<Vec<String>>()
.join("\n");
if !matches!(backend, DatabaseBackend::Sqlite) {
assert!(sql.contains("users_email_key") || sql.contains("email"));
}
with_settings!({ snapshot_suffix => format!("remove_unique_without_name_{:?}", backend) }, {
assert_snapshot!(sql);
});
}
#[rstest]
#[case::remove_unique_with_index_postgres(DatabaseBackend::Postgres)]
#[case::remove_unique_with_index_mysql(DatabaseBackend::MySql)]
#[case::remove_unique_with_index_sqlite(DatabaseBackend::Sqlite)]
fn test_remove_constraint_unique_with_index(#[case] backend: DatabaseBackend) {
let constraint = TableConstraint::Unique {
name: Some("uq_email".into()),
columns: vec!["email".into()],
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "email".into(),
r#type: ColumnType::Simple(SimpleColumnType::Text),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
],
constraints: vec![
constraint.clone(),
TableConstraint::Index {
name: Some("idx_id".into()),
columns: vec!["id".into()],
},
],
}];
let result =
build_remove_constraint(&backend, "users", &constraint, ¤t_schema, &[]).unwrap();
let sql = result
.iter()
.map(|q| q.build(backend))
.collect::<Vec<String>>()
.join("\n");
if matches!(backend, DatabaseBackend::Sqlite) {
assert!(sql.contains("CREATE INDEX"));
assert!(sql.contains("ix_users__idx_id"));
}
with_settings!({ snapshot_suffix => format!("remove_unique_with_index_{:?}", backend) }, {
assert_snapshot!(sql);
});
}
#[rstest]
#[case::remove_unique_with_other_unique_constraint_postgres(DatabaseBackend::Postgres)]
#[case::remove_unique_with_other_unique_constraint_mysql(DatabaseBackend::MySql)]
#[case::remove_unique_with_other_unique_constraint_sqlite(DatabaseBackend::Sqlite)]
fn test_remove_constraint_unique_with_other_unique_constraint(
#[case] backend: DatabaseBackend,
) {
let constraint = TableConstraint::Unique {
name: Some("uq_email".into()),
columns: vec!["email".into()],
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "email".into(),
r#type: ColumnType::Simple(SimpleColumnType::Text),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
],
constraints: vec![
constraint.clone(),
TableConstraint::Unique {
name: Some("uq_name".into()),
columns: vec!["name".into()],
},
],
}];
let result =
build_remove_constraint(&backend, "users", &constraint, ¤t_schema, &[]).unwrap();
let sql = result
.iter()
.map(|q| q.build(backend))
.collect::<Vec<String>>()
.join("\n");
if matches!(backend, DatabaseBackend::Sqlite) {
assert!(sql.contains("CREATE TABLE"));
}
with_settings!({ snapshot_suffix => format!("remove_unique_with_other_unique_constraint_{:?}", backend) }, {
assert_snapshot!(sql);
});
}
#[test]
fn test_remove_constraint_foreign_key_sqlite_table_not_found() {
let constraint = TableConstraint::ForeignKey {
name: Some("fk_user".into()),
columns: vec!["user_id".into()],
ref_table: "users".into(),
ref_columns: vec!["id".into()],
on_delete: None,
on_update: None,
};
let result = build_remove_constraint(
&DatabaseBackend::Sqlite,
"nonexistent_table",
&constraint,
&[], &[],
);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Table 'nonexistent_table' not found in current schema"));
}
#[rstest]
#[case::remove_foreign_key_without_name_postgres(DatabaseBackend::Postgres)]
#[case::remove_foreign_key_without_name_mysql(DatabaseBackend::MySql)]
#[case::remove_foreign_key_without_name_sqlite(DatabaseBackend::Sqlite)]
fn test_remove_constraint_foreign_key_without_name(#[case] backend: DatabaseBackend) {
let constraint = TableConstraint::ForeignKey {
name: None,
columns: vec!["user_id".into()],
ref_table: "users".into(),
ref_columns: vec!["id".into()],
on_delete: None,
on_update: None,
};
let current_schema = vec![TableDef {
name: "posts".into(),
description: None,
columns: vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "user_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
],
constraints: vec![constraint.clone()],
}];
let result =
build_remove_constraint(&backend, "posts", &constraint, ¤t_schema, &[]).unwrap();
let sql = result
.iter()
.map(|q| q.build(backend))
.collect::<Vec<String>>()
.join("\n");
if !matches!(backend, DatabaseBackend::Sqlite) {
assert!(sql.contains("posts_user_id_fkey") || sql.contains("user_id"));
}
with_settings!({ snapshot_suffix => format!("remove_foreign_key_without_name_{:?}", backend) }, {
assert_snapshot!(sql);
});
}
#[rstest]
#[case::remove_foreign_key_with_index_postgres(DatabaseBackend::Postgres)]
#[case::remove_foreign_key_with_index_mysql(DatabaseBackend::MySql)]
#[case::remove_foreign_key_with_index_sqlite(DatabaseBackend::Sqlite)]
fn test_remove_constraint_foreign_key_with_index(#[case] backend: DatabaseBackend) {
let constraint = TableConstraint::ForeignKey {
name: Some("fk_user".into()),
columns: vec!["user_id".into()],
ref_table: "users".into(),
ref_columns: vec!["id".into()],
on_delete: None,
on_update: None,
};
let current_schema = vec![TableDef {
name: "posts".into(),
description: None,
columns: vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "user_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
],
constraints: vec![
constraint.clone(),
TableConstraint::Index {
name: Some("idx_user_id".into()),
columns: vec!["user_id".into()],
},
],
}];
let result =
build_remove_constraint(&backend, "posts", &constraint, ¤t_schema, &[]).unwrap();
let sql = result
.iter()
.map(|q| q.build(backend))
.collect::<Vec<String>>()
.join("\n");
if matches!(backend, DatabaseBackend::Sqlite) {
assert!(sql.contains("CREATE INDEX"));
assert!(sql.contains("idx_user_id"));
}
with_settings!({ snapshot_suffix => format!("remove_foreign_key_with_index_{:?}", backend) }, {
assert_snapshot!(sql);
});
}
#[rstest]
#[case::remove_foreign_key_with_unique_constraint_postgres(DatabaseBackend::Postgres)]
#[case::remove_foreign_key_with_unique_constraint_mysql(DatabaseBackend::MySql)]
#[case::remove_foreign_key_with_unique_constraint_sqlite(DatabaseBackend::Sqlite)]
fn test_remove_constraint_foreign_key_with_unique_constraint(#[case] backend: DatabaseBackend) {
let constraint = TableConstraint::ForeignKey {
name: Some("fk_user".into()),
columns: vec!["user_id".into()],
ref_table: "users".into(),
ref_columns: vec!["id".into()],
on_delete: None,
on_update: None,
};
let current_schema = vec![TableDef {
name: "posts".into(),
description: None,
columns: vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "user_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
],
constraints: vec![
constraint.clone(),
TableConstraint::Unique {
name: Some("uq_user_id".into()),
columns: vec!["user_id".into()],
},
],
}];
let result =
build_remove_constraint(&backend, "posts", &constraint, ¤t_schema, &[]).unwrap();
let sql = result
.iter()
.map(|q| q.build(backend))
.collect::<Vec<String>>()
.join("\n");
if matches!(backend, DatabaseBackend::Sqlite) {
assert!(sql.contains("CREATE TABLE"));
}
with_settings!({ snapshot_suffix => format!("remove_foreign_key_with_unique_constraint_{:?}", backend) }, {
assert_snapshot!(sql);
});
}
#[test]
fn test_remove_constraint_check_sqlite_table_not_found() {
let constraint = TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 0".into(),
};
let result = build_remove_constraint(
&DatabaseBackend::Sqlite,
"nonexistent_table",
&constraint,
&[], &[],
);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Table 'nonexistent_table' not found in current schema"));
}
#[rstest]
#[case::remove_check_with_index_postgres(DatabaseBackend::Postgres)]
#[case::remove_check_with_index_mysql(DatabaseBackend::MySql)]
#[case::remove_check_with_index_sqlite(DatabaseBackend::Sqlite)]
fn test_remove_constraint_check_with_index(#[case] backend: DatabaseBackend) {
let constraint = TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 0".into(),
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "age".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
],
constraints: vec![
constraint.clone(),
TableConstraint::Index {
name: Some("idx_age".into()),
columns: vec!["age".into()],
},
],
}];
let result =
build_remove_constraint(&backend, "users", &constraint, ¤t_schema, &[]).unwrap();
let sql = result
.iter()
.map(|q| q.build(backend))
.collect::<Vec<String>>()
.join("\n");
if matches!(backend, DatabaseBackend::Sqlite) {
assert!(sql.contains("CREATE INDEX"));
assert!(sql.contains("idx_age"));
}
with_settings!({ snapshot_suffix => format!("remove_check_with_index_{:?}", backend) }, {
assert_snapshot!(sql);
});
}
#[rstest]
#[case::remove_check_with_unique_constraint_postgres(DatabaseBackend::Postgres)]
#[case::remove_check_with_unique_constraint_mysql(DatabaseBackend::MySql)]
#[case::remove_check_with_unique_constraint_sqlite(DatabaseBackend::Sqlite)]
fn test_remove_constraint_check_with_unique_constraint(#[case] backend: DatabaseBackend) {
let constraint = TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 0".into(),
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "age".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
],
constraints: vec![
constraint.clone(),
TableConstraint::Unique {
name: Some("uq_age".into()),
columns: vec!["age".into()],
},
],
}];
let result =
build_remove_constraint(&backend, "users", &constraint, ¤t_schema, &[]).unwrap();
let sql = result
.iter()
.map(|q| q.build(backend))
.collect::<Vec<String>>()
.join("\n");
if matches!(backend, DatabaseBackend::Sqlite) {
assert!(sql.contains("CREATE TABLE"));
}
with_settings!({ snapshot_suffix => format!("remove_check_with_unique_constraint_{:?}", backend) }, {
assert_snapshot!(sql);
});
}
#[rstest]
#[case::remove_unique_with_other_constraints_postgres(DatabaseBackend::Postgres)]
#[case::remove_unique_with_other_constraints_mysql(DatabaseBackend::MySql)]
#[case::remove_unique_with_other_constraints_sqlite(DatabaseBackend::Sqlite)]
fn test_remove_constraint_unique_with_other_constraints(#[case] backend: DatabaseBackend) {
let constraint = TableConstraint::Unique {
name: Some("uq_email".into()),
columns: vec!["email".into()],
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "email".into(),
r#type: ColumnType::Simple(SimpleColumnType::Text),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
],
constraints: vec![
TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
},
constraint.clone(),
TableConstraint::Check {
name: "chk_email".into(),
expr: "email IS NOT NULL".into(),
},
],
}];
let result =
build_remove_constraint(&backend, "users", &constraint, ¤t_schema, &[]).unwrap();
let sql = result
.iter()
.map(|q| q.build(backend))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("DROP") || sql.contains("CREATE TABLE"));
with_settings!({ snapshot_suffix => format!("remove_unique_with_other_constraints_{:?}", backend) }, {
assert_snapshot!(sql);
});
}
#[rstest]
#[case::remove_foreign_key_with_other_constraints_postgres(DatabaseBackend::Postgres)]
#[case::remove_foreign_key_with_other_constraints_mysql(DatabaseBackend::MySql)]
#[case::remove_foreign_key_with_other_constraints_sqlite(DatabaseBackend::Sqlite)]
fn test_remove_constraint_foreign_key_with_other_constraints(#[case] backend: DatabaseBackend) {
let constraint = TableConstraint::ForeignKey {
name: Some("fk_user".into()),
columns: vec!["user_id".into()],
ref_table: "users".into(),
ref_columns: vec!["id".into()],
on_delete: None,
on_update: None,
};
let current_schema = vec![TableDef {
name: "posts".into(),
description: None,
columns: vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "user_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
],
constraints: vec![
TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
},
constraint.clone(),
TableConstraint::Unique {
name: Some("uq_user_id".into()),
columns: vec!["user_id".into()],
},
TableConstraint::Check {
name: "chk_user_id".into(),
expr: "user_id > 0".into(),
},
],
}];
let result =
build_remove_constraint(&backend, "posts", &constraint, ¤t_schema, &[]).unwrap();
let sql = result
.iter()
.map(|q| q.build(backend))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("DROP") || sql.contains("CREATE TABLE"));
with_settings!({ snapshot_suffix => format!("remove_foreign_key_with_other_constraints_{:?}", backend) }, {
assert_snapshot!(sql);
});
}
#[rstest]
#[case::remove_check_with_other_constraints_postgres(DatabaseBackend::Postgres)]
#[case::remove_check_with_other_constraints_mysql(DatabaseBackend::MySql)]
#[case::remove_check_with_other_constraints_sqlite(DatabaseBackend::Sqlite)]
fn test_remove_constraint_check_with_other_constraints(#[case] backend: DatabaseBackend) {
let constraint = TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 0".into(),
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "age".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
],
constraints: vec![
TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
},
TableConstraint::Unique {
name: Some("uq_age".into()),
columns: vec!["age".into()],
},
constraint.clone(),
],
}];
let result =
build_remove_constraint(&backend, "users", &constraint, ¤t_schema, &[]).unwrap();
let sql = result
.iter()
.map(|q| q.build(backend))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("DROP") || sql.contains("CREATE TABLE"));
with_settings!({ snapshot_suffix => format!("remove_check_with_other_constraints_{:?}", backend) }, {
assert_snapshot!(sql);
});
}
#[test]
fn test_remove_constraint_primary_key_postgres_direct() {
let constraint = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
};
let schema = vec![TableDef {
name: "orders".into(),
description: None,
columns: vec![ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![constraint.clone()],
}];
let result = build_remove_constraint(
&DatabaseBackend::Postgres,
"orders",
&constraint,
&schema,
&[],
)
.unwrap();
assert_eq!(result.len(), 1);
let sql = result[0].build(DatabaseBackend::Postgres);
assert!(sql.contains("ALTER TABLE \"orders\" DROP CONSTRAINT \"orders_pkey\""));
}
#[test]
fn test_remove_constraint_primary_key_mysql_direct() {
let constraint = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
};
let schema = vec![TableDef {
name: "orders".into(),
description: None,
columns: vec![ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![constraint.clone()],
}];
let result =
build_remove_constraint(&DatabaseBackend::MySql, "orders", &constraint, &schema, &[])
.unwrap();
assert_eq!(result.len(), 1);
let sql = result[0].build(DatabaseBackend::MySql);
assert!(sql.contains("ALTER TABLE `orders` DROP PRIMARY KEY"));
}
#[rstest]
#[case::remove_index_with_custom_inline_name_postgres(DatabaseBackend::Postgres)]
#[case::remove_index_with_custom_inline_name_mysql(DatabaseBackend::MySql)]
#[case::remove_index_with_custom_inline_name_sqlite(DatabaseBackend::Sqlite)]
fn test_remove_constraint_index_with_custom_inline_name(#[case] backend: DatabaseBackend) {
let constraint = TableConstraint::Index {
name: Some("custom_idx_email".into()),
columns: vec!["email".into()],
};
let schema = vec![TableDef {
name: "users".to_string(),
description: None,
columns: vec![ColumnDef {
name: "email".to_string(),
r#type: ColumnType::Simple(SimpleColumnType::Text),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: Some(vespertide_core::StrOrBoolOrArray::Str(
"custom_idx_email".into(),
)),
foreign_key: None,
}],
constraints: vec![],
}];
let result = build_remove_constraint(&backend, "users", &constraint, &schema, &[]);
assert!(result.is_ok());
let sql = result
.unwrap()
.iter()
.map(|q| q.build(backend))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("custom_idx_email"));
with_settings!({ snapshot_suffix => format!("remove_index_custom_name_{:?}", backend) }, {
assert_snapshot!(sql);
});
}
}