use sea_query::{Alias, ForeignKey, Index, Query, Table};
use vespertide_core::{TableConstraint, TableDef};
use super::helpers::{
build_sqlite_temp_table_create, recreate_indexes_after_rebuild, to_sea_fk_action,
};
fn merge_constraint(
existing: &[TableConstraint],
constraint: &TableConstraint,
) -> Vec<TableConstraint> {
let mut out: Vec<TableConstraint> = Vec::with_capacity(existing.len() + 1);
let mut replaced = false;
for c in existing {
if constraints_overlap(c, constraint) {
if !replaced {
out.push(constraint.clone());
replaced = true;
}
} else {
out.push(c.clone());
}
}
if !replaced {
out.push(constraint.clone());
}
out
}
fn constraints_overlap(a: &TableConstraint, b: &TableConstraint) -> bool {
match (a, b) {
(
TableConstraint::ForeignKey {
columns: a_cols, ..
},
TableConstraint::ForeignKey {
columns: b_cols, ..
},
) => a_cols == b_cols,
(
TableConstraint::PrimaryKey {
columns: a_cols, ..
},
TableConstraint::PrimaryKey {
columns: b_cols, ..
},
) => a_cols == b_cols,
(
TableConstraint::Check {
name: a_name,
expr: a_expr,
},
TableConstraint::Check {
name: b_name,
expr: b_expr,
},
) => a_name == b_name && a_expr == b_expr,
_ => false,
}
}
use super::rename_table::build_rename_table;
use super::types::{BuiltQuery, DatabaseBackend, RawSql};
use crate::error::QueryError;
pub fn build_add_constraint(
backend: &DatabaseBackend,
table: &str,
constraint: &TableConstraint,
current_schema: &[TableDef],
pending_constraints: &[TableConstraint],
) -> Result<Vec<BuiltQuery>, QueryError> {
match constraint {
TableConstraint::PrimaryKey { columns, .. } => {
if *backend == DatabaseBackend::Sqlite {
let table_def = current_schema.iter().find(|t| t.name == table).ok_or_else(|| QueryError::Other(format!("Table '{}' not found in current schema. SQLite requires current schema information to add constraints.", table)))?;
let new_constraints = merge_constraint(&table_def.constraints, constraint);
let temp_table = format!("{}_temp", table);
let create_query = build_sqlite_temp_table_create(
backend,
&temp_table,
table,
&table_def.columns,
&new_constraints,
);
let column_aliases: Vec<Alias> = table_def
.columns
.iter()
.map(|c| Alias::new(&c.name))
.collect();
let mut select_query = Query::select();
for col_alias in &column_aliases {
select_query = select_query.column(col_alias.clone()).to_owned();
}
select_query = select_query.from(Alias::new(table)).to_owned();
let insert_stmt = Query::insert()
.into_table(Alias::new(&temp_table))
.columns(column_aliases.clone())
.select_from(select_query)
.unwrap()
.to_owned();
let insert_query = BuiltQuery::Insert(Box::new(insert_stmt));
let drop_table = Table::drop().table(Alias::new(table)).to_owned();
let drop_query = BuiltQuery::DropTable(Box::new(drop_table));
let rename_query = build_rename_table(&temp_table, table);
let index_queries = recreate_indexes_after_rebuild(
table,
&table_def.constraints,
pending_constraints,
);
let mut queries = vec![create_query, insert_query, drop_query, rename_query];
queries.extend(index_queries);
Ok(queries)
} else {
let pg_cols = columns
.iter()
.map(|c| format!("\"{}\"", c))
.collect::<Vec<_>>()
.join(", ");
let mysql_cols = columns
.iter()
.map(|c| format!("`{}`", c))
.collect::<Vec<_>>()
.join(", ");
let pg_sql = format!("ALTER TABLE \"{}\" ADD PRIMARY KEY ({})", table, pg_cols);
let mysql_sql = format!("ALTER TABLE `{}` ADD PRIMARY KEY ({})", table, mysql_cols);
Ok(vec![BuiltQuery::Raw(RawSql::per_backend(
pg_sql.clone(),
mysql_sql,
pg_sql,
))])
}
}
TableConstraint::Unique { name, columns } => {
let index_name =
super::helpers::build_unique_constraint_name(table, columns, name.as_deref());
let mut idx = Index::create()
.table(Alias::new(table))
.name(&index_name)
.unique()
.to_owned();
for col in columns {
idx = idx.col(Alias::new(col)).to_owned();
}
Ok(vec![BuiltQuery::CreateIndex(Box::new(idx))])
}
TableConstraint::ForeignKey {
name,
columns,
ref_table,
ref_columns,
on_delete,
on_update,
} => {
if *backend == DatabaseBackend::Sqlite {
let table_def = current_schema.iter().find(|t| t.name == table).ok_or_else(|| QueryError::Other(format!("Table '{}' not found in current schema. SQLite requires current schema information to add constraints.", table)))?;
let new_constraints = merge_constraint(&table_def.constraints, constraint);
let temp_table = format!("{}_temp", table);
let create_query = build_sqlite_temp_table_create(
backend,
&temp_table,
table,
&table_def.columns,
&new_constraints,
);
let column_aliases: Vec<Alias> = table_def
.columns
.iter()
.map(|c| Alias::new(&c.name))
.collect();
let mut select_query = Query::select();
for col_alias in &column_aliases {
select_query = select_query.column(col_alias.clone()).to_owned();
}
select_query = select_query.from(Alias::new(table)).to_owned();
let insert_stmt = Query::insert()
.into_table(Alias::new(&temp_table))
.columns(column_aliases.clone())
.select_from(select_query)
.unwrap()
.to_owned();
let insert_query = BuiltQuery::Insert(Box::new(insert_stmt));
let drop_table = Table::drop().table(Alias::new(table)).to_owned();
let drop_query = BuiltQuery::DropTable(Box::new(drop_table));
let rename_query = build_rename_table(&temp_table, table);
let index_queries = recreate_indexes_after_rebuild(
table,
&table_def.constraints,
pending_constraints,
);
let mut queries = vec![create_query, insert_query, drop_query, rename_query];
queries.extend(index_queries);
Ok(queries)
} else {
let fk_name =
vespertide_naming::build_foreign_key_name(table, columns, name.as_deref());
let mut fk = ForeignKey::create();
fk = fk.name(&fk_name).to_owned();
fk = fk.from_tbl(Alias::new(table)).to_owned();
for col in columns {
fk = fk.from_col(Alias::new(col)).to_owned();
}
fk = fk.to_tbl(Alias::new(ref_table)).to_owned();
for col in ref_columns {
fk = fk.to_col(Alias::new(col)).to_owned();
}
if let Some(action) = on_delete {
fk = fk.on_delete(to_sea_fk_action(action)).to_owned();
}
if let Some(action) = on_update {
fk = fk.on_update(to_sea_fk_action(action)).to_owned();
}
Ok(vec![BuiltQuery::CreateForeignKey(Box::new(fk))])
}
}
TableConstraint::Index { name, columns } => {
let index_name = vespertide_naming::build_index_name(table, columns, name.as_deref());
let mut idx = Index::create()
.table(Alias::new(table))
.name(&index_name)
.to_owned();
for col in columns {
idx = idx.col(Alias::new(col)).to_owned();
}
Ok(vec![BuiltQuery::CreateIndex(Box::new(idx))])
}
TableConstraint::Check { name, expr } => {
if *backend == DatabaseBackend::Sqlite {
let table_def = current_schema.iter().find(|t| t.name == table).ok_or_else(|| QueryError::Other(format!("Table '{}' not found in current schema. SQLite requires current schema information to add constraints.", table)))?;
let new_constraints = merge_constraint(&table_def.constraints, constraint);
let temp_table = format!("{}_temp", table);
let create_query = build_sqlite_temp_table_create(
backend,
&temp_table,
table,
&table_def.columns,
&new_constraints,
);
let column_aliases: Vec<Alias> = table_def
.columns
.iter()
.map(|c| Alias::new(&c.name))
.collect();
let mut select_query = Query::select();
for col_alias in &column_aliases {
select_query = select_query.column(col_alias.clone()).to_owned();
}
select_query = select_query.from(Alias::new(table)).to_owned();
let insert_stmt = Query::insert()
.into_table(Alias::new(&temp_table))
.columns(column_aliases.clone())
.select_from(select_query)
.unwrap()
.to_owned();
let insert_query = BuiltQuery::Insert(Box::new(insert_stmt));
let drop_table = Table::drop().table(Alias::new(table)).to_owned();
let drop_query = BuiltQuery::DropTable(Box::new(drop_table));
let rename_query = build_rename_table(&temp_table, table);
let index_queries = recreate_indexes_after_rebuild(
table,
&table_def.constraints,
pending_constraints,
);
let mut queries = vec![create_query, insert_query, drop_query, rename_query];
queries.extend(index_queries);
Ok(queries)
} else {
let pg_sql = format!(
"ALTER TABLE \"{}\" ADD CONSTRAINT \"{}\" CHECK ({})",
table, name, expr
);
let mysql_sql = format!(
"ALTER TABLE `{}` ADD CONSTRAINT `{}` CHECK ({})",
table, name, expr
);
Ok(vec![BuiltQuery::Raw(RawSql::per_backend(
pg_sql.clone(),
mysql_sql,
pg_sql,
))])
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sql::types::DatabaseBackend;
use insta::{assert_snapshot, with_settings};
use rstest::rstest;
use vespertide_core::{
ColumnDef, ColumnType, ReferenceAction, SimpleColumnType, TableConstraint, TableDef,
};
#[rstest]
#[case::add_constraint_primary_key_postgres(
"add_constraint_primary_key_postgres",
DatabaseBackend::Postgres,
&["ALTER TABLE \"users\" ADD PRIMARY KEY (\"id\")"]
)]
#[case::add_constraint_primary_key_mysql(
"add_constraint_primary_key_mysql",
DatabaseBackend::MySql,
&["ALTER TABLE `users` ADD PRIMARY KEY (`id`)"]
)]
#[case::add_constraint_primary_key_sqlite(
"add_constraint_primary_key_sqlite",
DatabaseBackend::Sqlite,
&["CREATE TABLE \"users_temp\""]
)]
#[case::add_constraint_unique_named_postgres(
"add_constraint_unique_named_postgres",
DatabaseBackend::Postgres,
&["CREATE UNIQUE INDEX \"uq_users__uq_email\" ON \"users\" (\"email\")"]
)]
#[case::add_constraint_unique_named_mysql(
"add_constraint_unique_named_mysql",
DatabaseBackend::MySql,
&["CREATE UNIQUE INDEX `uq_users__uq_email` ON `users` (`email`)"]
)]
#[case::add_constraint_unique_named_sqlite(
"add_constraint_unique_named_sqlite",
DatabaseBackend::Sqlite,
&["CREATE UNIQUE INDEX \"uq_users__uq_email\" ON \"users\" (\"email\")"]
)]
#[case::add_constraint_foreign_key_postgres(
"add_constraint_foreign_key_postgres",
DatabaseBackend::Postgres,
&["FOREIGN KEY (\"user_id\")", "REFERENCES \"users\" (\"id\")", "ON DELETE CASCADE", "ON UPDATE RESTRICT"]
)]
#[case::add_constraint_foreign_key_mysql(
"add_constraint_foreign_key_mysql",
DatabaseBackend::MySql,
&["FOREIGN KEY (`user_id`)", "REFERENCES `users` (`id`)", "ON DELETE CASCADE", "ON UPDATE RESTRICT"]
)]
#[case::add_constraint_foreign_key_sqlite(
"add_constraint_foreign_key_sqlite",
DatabaseBackend::Sqlite,
&["CREATE TABLE \"users_temp\""]
)]
#[case::add_constraint_check_named_postgres(
"add_constraint_check_named_postgres",
DatabaseBackend::Postgres,
&["ADD CONSTRAINT \"chk_age\" CHECK (age > 0)"]
)]
#[case::add_constraint_check_named_mysql(
"add_constraint_check_named_mysql",
DatabaseBackend::MySql,
&["ADD CONSTRAINT `chk_age` CHECK (age > 0)"]
)]
#[case::add_constraint_check_named_sqlite(
"add_constraint_check_named_sqlite",
DatabaseBackend::Sqlite,
&["CREATE TABLE \"users_temp\""]
)]
fn test_add_constraint(
#[case] title: &str,
#[case] backend: DatabaseBackend,
#[case] expected: &[&str],
) {
let constraint = if title.contains("primary_key") {
TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
}
} else if title.contains("unique") {
TableConstraint::Unique {
name: Some("uq_email".into()),
columns: vec!["email".into()],
}
} else if title.contains("foreign_key") {
TableConstraint::ForeignKey {
name: Some("fk_user".into()),
columns: vec!["user_id".into()],
ref_table: "users".into(),
ref_columns: vec!["id".into()],
on_delete: Some(ReferenceAction::Cascade),
on_update: Some(ReferenceAction::Restrict),
}
} else {
TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 0".into(),
}
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: if title.contains("foreign_key") {
vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "user_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
]
} else {
vec![
ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: if title.contains("check") {
"age".into()
} else {
"email".into()
},
r#type: ColumnType::Simple(SimpleColumnType::Text),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
]
},
constraints: vec![],
}];
let result =
build_add_constraint(&backend, "users", &constraint, ¤t_schema, &[]).unwrap();
let sql = result[0].build(backend);
for exp in expected {
assert!(
sql.contains(exp),
"Expected SQL to contain '{}', got: {}",
exp,
sql
);
}
with_settings!({ snapshot_suffix => format!("add_constraint_{}", title) }, {
assert_snapshot!(result.iter().map(|q| q.build(backend)).collect::<Vec<String>>().join("\n"));
});
}
#[test]
fn test_add_constraint_primary_key_sqlite_table_not_found() {
let constraint = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
};
let current_schema = vec![]; let result = build_add_constraint(
&DatabaseBackend::Sqlite,
"users",
&constraint,
¤t_schema,
&[],
);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Table 'users' not found in current schema"));
}
#[test]
fn test_add_constraint_primary_key_sqlite_with_check_constraints() {
let constraint = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![TableConstraint::Check {
name: "chk_id".into(),
expr: "id > 0".into(),
}],
}];
let result = build_add_constraint(
&DatabaseBackend::Sqlite,
"users",
&constraint,
¤t_schema,
&[],
);
assert!(result.is_ok());
let queries = result.unwrap();
let sql = queries
.iter()
.map(|q| q.build(DatabaseBackend::Sqlite))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("CONSTRAINT \"chk_id\" CHECK"));
}
#[test]
fn test_add_constraint_primary_key_sqlite_with_indexes() {
let constraint = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![TableConstraint::Index {
name: Some("idx_id".into()),
columns: vec!["id".into()],
}],
}];
let result = build_add_constraint(
&DatabaseBackend::Sqlite,
"users",
&constraint,
¤t_schema,
&[],
);
assert!(result.is_ok());
let queries = result.unwrap();
let sql = queries
.iter()
.map(|q| q.build(DatabaseBackend::Sqlite))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("CREATE INDEX"));
assert!(sql.contains("idx_id"));
}
#[test]
fn test_add_constraint_primary_key_sqlite_with_unique_constraint() {
let constraint = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![TableConstraint::Unique {
name: Some("uq_email".into()),
columns: vec!["email".into()],
}],
}];
let result = build_add_constraint(
&DatabaseBackend::Sqlite,
"users",
&constraint,
¤t_schema,
&[],
);
assert!(result.is_ok());
let queries = result.unwrap();
let sql = queries
.iter()
.map(|q| q.build(DatabaseBackend::Sqlite))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("CREATE TABLE"));
}
#[test]
fn test_add_constraint_foreign_key_sqlite_table_not_found() {
let constraint = TableConstraint::ForeignKey {
name: Some("fk_user".into()),
columns: vec!["user_id".into()],
ref_table: "users".into(),
ref_columns: vec!["id".into()],
on_delete: None,
on_update: None,
};
let current_schema = vec![]; let result = build_add_constraint(
&DatabaseBackend::Sqlite,
"posts",
&constraint,
¤t_schema,
&[],
);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Table 'posts' not found in current schema"));
}
#[test]
fn test_add_constraint_foreign_key_sqlite_with_check_constraints() {
let constraint = TableConstraint::ForeignKey {
name: Some("fk_user".into()),
columns: vec!["user_id".into()],
ref_table: "users".into(),
ref_columns: vec!["id".into()],
on_delete: None,
on_update: None,
};
let current_schema = vec![TableDef {
name: "posts".into(),
description: None,
columns: vec![ColumnDef {
name: "user_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![TableConstraint::Check {
name: "chk_user_id".into(),
expr: "user_id > 0".into(),
}],
}];
let result = build_add_constraint(
&DatabaseBackend::Sqlite,
"posts",
&constraint,
¤t_schema,
&[],
);
assert!(result.is_ok());
let queries = result.unwrap();
let sql = queries
.iter()
.map(|q| q.build(DatabaseBackend::Sqlite))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("CONSTRAINT \"chk_user_id\" CHECK"));
}
#[test]
fn test_add_constraint_foreign_key_sqlite_with_indexes() {
let constraint = TableConstraint::ForeignKey {
name: Some("fk_user".into()),
columns: vec!["user_id".into()],
ref_table: "users".into(),
ref_columns: vec!["id".into()],
on_delete: None,
on_update: None,
};
let current_schema = vec![TableDef {
name: "posts".into(),
description: None,
columns: vec![ColumnDef {
name: "user_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![TableConstraint::Index {
name: Some("idx_user_id".into()),
columns: vec!["user_id".into()],
}],
}];
let result = build_add_constraint(
&DatabaseBackend::Sqlite,
"posts",
&constraint,
¤t_schema,
&[],
);
assert!(result.is_ok());
let queries = result.unwrap();
let sql = queries
.iter()
.map(|q| q.build(DatabaseBackend::Sqlite))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("CREATE INDEX"));
assert!(sql.contains("idx_user_id"));
}
#[test]
fn test_add_constraint_foreign_key_sqlite_with_unique_constraint() {
let constraint = TableConstraint::ForeignKey {
name: Some("fk_user".into()),
columns: vec!["user_id".into()],
ref_table: "users".into(),
ref_columns: vec!["id".into()],
on_delete: None,
on_update: None,
};
let current_schema = vec![TableDef {
name: "posts".into(),
description: None,
columns: vec![ColumnDef {
name: "user_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![TableConstraint::Unique {
name: Some("uq_user_id".into()),
columns: vec!["user_id".into()],
}],
}];
let result = build_add_constraint(
&DatabaseBackend::Sqlite,
"posts",
&constraint,
¤t_schema,
&[],
);
assert!(result.is_ok());
let queries = result.unwrap();
let sql = queries
.iter()
.map(|q| q.build(DatabaseBackend::Sqlite))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("CREATE TABLE"));
}
#[test]
fn test_add_constraint_check_sqlite_table_not_found() {
let constraint = TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 0".into(),
};
let current_schema = vec![]; let result = build_add_constraint(
&DatabaseBackend::Sqlite,
"users",
&constraint,
¤t_schema,
&[],
);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(err_msg.contains("Table 'users' not found in current schema"));
}
#[test]
fn test_add_constraint_check_sqlite_without_existing_check() {
let constraint = TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 0".into(),
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![ColumnDef {
name: "age".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![], }];
let result = build_add_constraint(
&DatabaseBackend::Sqlite,
"users",
&constraint,
¤t_schema,
&[],
);
assert!(result.is_ok());
let queries = result.unwrap();
let sql = queries
.iter()
.map(|q| q.build(DatabaseBackend::Sqlite))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("CREATE TABLE"));
assert!(sql.contains("CONSTRAINT \"chk_age\" CHECK"));
}
#[test]
fn test_add_constraint_primary_key_sqlite_without_existing_check() {
let constraint = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![], }];
let result = build_add_constraint(
&DatabaseBackend::Sqlite,
"users",
&constraint,
¤t_schema,
&[],
);
assert!(result.is_ok());
let queries = result.unwrap();
let sql = queries
.iter()
.map(|q| q.build(DatabaseBackend::Sqlite))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("CREATE TABLE"));
assert!(sql.contains("PRIMARY KEY"));
}
#[test]
fn test_add_constraint_foreign_key_sqlite_without_existing_check() {
let constraint = TableConstraint::ForeignKey {
name: Some("fk_user".into()),
columns: vec!["user_id".into()],
ref_table: "users".into(),
ref_columns: vec!["id".into()],
on_delete: None,
on_update: None,
};
let current_schema = vec![TableDef {
name: "posts".into(),
description: None,
columns: vec![ColumnDef {
name: "user_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![], }];
let result = build_add_constraint(
&DatabaseBackend::Sqlite,
"posts",
&constraint,
¤t_schema,
&[],
);
assert!(result.is_ok());
let queries = result.unwrap();
let sql = queries
.iter()
.map(|q| q.build(DatabaseBackend::Sqlite))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("CREATE TABLE"));
assert!(sql.contains("FOREIGN KEY"));
}
#[test]
fn test_add_constraint_check_sqlite_with_indexes() {
let constraint = TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 0".into(),
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![ColumnDef {
name: "age".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![TableConstraint::Index {
name: Some("idx_age".into()),
columns: vec!["age".into()],
}],
}];
let result = build_add_constraint(
&DatabaseBackend::Sqlite,
"users",
&constraint,
¤t_schema,
&[],
);
assert!(result.is_ok());
let queries = result.unwrap();
let sql = queries
.iter()
.map(|q| q.build(DatabaseBackend::Sqlite))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("CREATE INDEX"));
assert!(sql.contains("idx_age"));
}
#[test]
fn test_add_constraint_check_sqlite_with_unique_constraint() {
let constraint = TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 0".into(),
};
let current_schema = vec![TableDef {
name: "users".into(),
description: None,
columns: vec![ColumnDef {
name: "age".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: true,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
constraints: vec![TableConstraint::Unique {
name: Some("uq_age".into()),
columns: vec!["age".into()],
}],
}];
let result = build_add_constraint(
&DatabaseBackend::Sqlite,
"users",
&constraint,
¤t_schema,
&[],
);
assert!(result.is_ok());
let queries = result.unwrap();
let sql = queries
.iter()
.map(|q| q.build(DatabaseBackend::Sqlite))
.collect::<Vec<String>>()
.join("\n");
assert!(sql.contains("CREATE TABLE"));
}
#[test]
fn test_add_constraint_composite_primary_key_postgres() {
let constraint = TableConstraint::PrimaryKey {
columns: vec!["user_id".into(), "role_id".into()],
auto_increment: false,
};
let current_schema = vec![TableDef {
name: "user_roles".into(),
description: None,
columns: vec![
ColumnDef {
name: "user_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "role_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
],
constraints: vec![],
}];
let result = build_add_constraint(
&DatabaseBackend::Postgres,
"user_roles",
&constraint,
¤t_schema,
&[],
)
.unwrap();
let sql = result[0].build(DatabaseBackend::Postgres);
assert!(sql.contains("ADD PRIMARY KEY"));
assert!(sql.contains("\"user_id\""));
assert!(sql.contains("\"role_id\""));
}
#[test]
fn test_add_constraint_composite_primary_key_mysql() {
let constraint = TableConstraint::PrimaryKey {
columns: vec!["user_id".into(), "role_id".into()],
auto_increment: false,
};
let current_schema = vec![TableDef {
name: "user_roles".into(),
description: None,
columns: vec![
ColumnDef {
name: "user_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
ColumnDef {
name: "role_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
},
],
constraints: vec![],
}];
let result = build_add_constraint(
&DatabaseBackend::MySql,
"user_roles",
&constraint,
¤t_schema,
&[],
)
.unwrap();
let sql = result[0].build(DatabaseBackend::MySql);
assert!(sql.contains("ADD PRIMARY KEY"));
assert!(sql.contains("`user_id`"));
assert!(sql.contains("`role_id`"));
}
#[test]
fn test_constraints_overlap_primary_key_same_columns() {
let a = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
};
let b = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: true,
};
assert!(constraints_overlap(&a, &b));
}
#[test]
fn test_constraints_overlap_primary_key_different_columns() {
let a = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
};
let b = TableConstraint::PrimaryKey {
columns: vec!["uid".into()],
auto_increment: false,
};
assert!(!constraints_overlap(&a, &b));
}
#[test]
fn test_constraints_overlap_check_same() {
let a = TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 0".into(),
};
let b = TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 0".into(),
};
assert!(constraints_overlap(&a, &b));
}
#[test]
fn test_constraints_overlap_check_different_name() {
let a = TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 0".into(),
};
let b = TableConstraint::Check {
name: "chk_age2".into(),
expr: "age > 0".into(),
};
assert!(!constraints_overlap(&a, &b));
}
#[test]
fn test_constraints_overlap_check_different_expr() {
let a = TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 0".into(),
};
let b = TableConstraint::Check {
name: "chk_age".into(),
expr: "age > 10".into(),
};
assert!(!constraints_overlap(&a, &b));
}
#[test]
fn test_constraints_overlap_different_variants() {
let a = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
};
let b = TableConstraint::Check {
name: "chk".into(),
expr: "id > 0".into(),
};
assert!(!constraints_overlap(&a, &b));
}
#[test]
fn test_constraints_overlap_fk_same_columns() {
let a = TableConstraint::ForeignKey {
name: None,
columns: vec!["user_id".into()],
ref_table: "users".into(),
ref_columns: vec!["id".into()],
on_delete: None,
on_update: None,
};
let b = TableConstraint::ForeignKey {
name: Some("fk".into()),
columns: vec!["user_id".into()],
ref_table: "other".into(),
ref_columns: vec!["oid".into()],
on_delete: Some(ReferenceAction::Cascade),
on_update: None,
};
assert!(constraints_overlap(&a, &b));
}
#[test]
fn test_merge_constraint_replaces_overlapping() {
let existing = vec![
TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
},
TableConstraint::Index {
name: None,
columns: vec!["email".into()],
},
];
let new_pk = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: true,
};
let result = merge_constraint(&existing, &new_pk);
assert_eq!(result.len(), 2); }
#[test]
fn test_merge_constraint_appends_non_overlapping() {
let existing = vec![TableConstraint::Index {
name: None,
columns: vec!["email".into()],
}];
let new_pk = TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
};
let result = merge_constraint(&existing, &new_pk);
assert_eq!(result.len(), 2); }
#[test]
fn test_extract_check_clauses_with_mixed_constraints() {
let constraints = vec![
TableConstraint::Check {
name: "chk1".into(),
expr: "a > 0".into(),
},
TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
},
TableConstraint::Check {
name: "chk2".into(),
expr: "b < 100".into(),
},
TableConstraint::Unique {
name: Some("uq".into()),
columns: vec!["email".into()],
},
];
let clauses = crate::sql::helpers::extract_check_clauses(&constraints);
assert_eq!(clauses.len(), 2);
assert!(clauses[0].contains("chk1"));
assert!(clauses[1].contains("chk2"));
}
#[test]
fn test_extract_check_clauses_with_no_check_constraints() {
let constraints = vec![
TableConstraint::PrimaryKey {
columns: vec!["id".into()],
auto_increment: false,
},
TableConstraint::Unique {
name: None,
columns: vec!["email".into()],
},
];
let clauses = crate::sql::helpers::extract_check_clauses(&constraints);
assert!(clauses.is_empty());
}
#[test]
fn test_build_create_with_checks_empty_clauses() {
use crate::sql::create_table::build_create_table_for_backend;
let create_stmt = build_create_table_for_backend(
&DatabaseBackend::Sqlite,
"test_table",
&[ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
&[],
);
let result = crate::sql::helpers::build_create_with_checks(
&DatabaseBackend::Sqlite,
&create_stmt,
&[],
);
let sql = result.build(DatabaseBackend::Sqlite);
assert!(sql.contains("CREATE TABLE"));
}
#[test]
fn test_build_create_with_checks_with_clauses() {
use crate::sql::create_table::build_create_table_for_backend;
let create_stmt = build_create_table_for_backend(
&DatabaseBackend::Sqlite,
"test_table",
&[ColumnDef {
name: "id".into(),
r#type: ColumnType::Simple(SimpleColumnType::Integer),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: None,
}],
&[],
);
let check_clauses = vec!["CONSTRAINT \"chk1\" CHECK (id > 0)".to_string()];
let result = crate::sql::helpers::build_create_with_checks(
&DatabaseBackend::Sqlite,
&create_stmt,
&check_clauses,
);
let sql = result.build(DatabaseBackend::Sqlite);
assert!(sql.contains("CREATE TABLE"));
assert!(sql.contains("CONSTRAINT \"chk1\" CHECK (id > 0)"));
}
}