use vespertide_core::{MigrationAction, MigrationPlan, TableDef};
use crate::DatabaseBackend;
use crate::error::QueryError;
use crate::parallel_config::plan_query_par_action_threshold;
use crate::sql::BuiltQuery;
mod parallel;
mod sequential;
mod transaction;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct PlanQueriesOptions {
pub wrap_in_transaction: bool,
}
#[derive(Debug)]
pub struct PlanQueries {
pub action: MigrationAction,
pub postgres: Vec<BuiltQuery>,
pub mysql: Vec<BuiltQuery>,
pub sqlite: Vec<BuiltQuery>,
}
impl PlanQueries {
#[must_use]
pub fn into_transactional(mut queries: Vec<Self>) -> Vec<Self> {
transaction::wrap_backend_queries(&mut queries, DatabaseBackend::Postgres);
transaction::wrap_backend_queries(&mut queries, DatabaseBackend::MySql);
transaction::wrap_backend_queries(&mut queries, DatabaseBackend::Sqlite);
queries
}
}
fn action_target_table(action: &MigrationAction) -> Option<&str> {
match action {
MigrationAction::RenameTable { .. } | MigrationAction::RawSql { .. } => None,
_ => action.table_name(),
}
}
pub fn build_plan_queries(
plan: &MigrationPlan,
current_schema: &[TableDef],
) -> Result<Vec<PlanQueries>, QueryError> {
if plan.actions.len() < plan_query_par_action_threshold() {
return sequential::build_plan_queries_sequentially(plan, current_schema);
}
parallel::build_plan_queries_in_parallel(plan, current_schema)
}
pub fn build_plan_queries_with_options(
plan: &MigrationPlan,
current_schema: &[TableDef],
options: PlanQueriesOptions,
) -> Result<Vec<PlanQueries>, QueryError> {
let queries = build_plan_queries(plan, current_schema)?;
if options.wrap_in_transaction {
Ok(PlanQueries::into_transactional(queries))
} else {
Ok(queries)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sql::{BuiltQuery, DatabaseBackend};
use crate::test_support::col;
use insta::{assert_snapshot, with_settings};
use rstest::rstest;
use vespertide_core::{
ColumnDef, ColumnType, MigrationAction, MigrationPlan, ReferenceAction, SimpleColumnType,
TableConstraint, TableDef,
};
fn build_sql_snapshot(result: &[BuiltQuery], backend: DatabaseBackend) -> String {
result
.iter()
.map(|q| q.build(backend))
.collect::<Vec<_>>()
.join(";\n")
}
#[rstest]
#[case::empty(
MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions: vec![],
},
0
)]
#[case::single_action(
MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions: vec![MigrationAction::DeleteTable {
table: "users".into(),
}],
},
1
)]
#[case::multiple_actions(
MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions: vec![
MigrationAction::CreateTable {
table: "users".into(),
columns: vec![col("id", ColumnType::Simple(SimpleColumnType::Integer))],
constraints: vec![],
},
MigrationAction::DeleteTable {
table: "posts".into(),
},
],
},
2
)]
fn test_build_plan_queries(#[case] plan: MigrationPlan, #[case] expected_count: usize) {
let result = build_plan_queries(&plan, &[]).unwrap();
assert_eq!(
result.len(),
expected_count,
"Expected {} queries, got {}",
expected_count,
result.len()
);
}
#[rstest]
#[case::postgres("postgres", DatabaseBackend::Postgres)]
#[case::mysql("mysql", DatabaseBackend::MySql)]
#[case::sqlite("sqlite", DatabaseBackend::Sqlite)]
fn test_delete_column_after_create_table_with_inline_unique(
#[case] title: &str,
#[case] backend: DatabaseBackend,
) {
let mut col_with_unique = col("gift_code", ColumnType::Simple(SimpleColumnType::Text));
col_with_unique.unique = Some(vespertide_core::StrOrBoolOrArray::Bool(true));
let plan = MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions: vec![
MigrationAction::CreateTable {
table: "gift".into(),
columns: vec![
col("id", ColumnType::Simple(SimpleColumnType::Integer)),
col_with_unique,
],
constraints: vec![], },
MigrationAction::DeleteColumn {
table: "gift".into(),
column: "gift_code".into(),
},
],
};
let result = build_plan_queries(&plan, &[]).unwrap();
let queries = match backend {
DatabaseBackend::Postgres => &result[1].postgres,
DatabaseBackend::MySql => &result[1].mysql,
DatabaseBackend::Sqlite => &result[1].sqlite,
};
let sql = build_sql_snapshot(queries, backend);
with_settings!({ snapshot_path => "../snapshots", snapshot_suffix => format!("inline_unique_{}", title) }, {
assert_snapshot!(sql);
});
}
#[rstest]
#[case::postgres("postgres", DatabaseBackend::Postgres)]
#[case::mysql("mysql", DatabaseBackend::MySql)]
#[case::sqlite("sqlite", DatabaseBackend::Sqlite)]
fn test_delete_column_after_create_table_with_inline_index(
#[case] title: &str,
#[case] backend: DatabaseBackend,
) {
let mut col_with_index = col("email", ColumnType::Simple(SimpleColumnType::Text));
col_with_index.index = Some(vespertide_core::StrOrBoolOrArray::Bool(true));
let plan = MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions: vec![
MigrationAction::CreateTable {
table: "users".into(),
columns: vec![
col("id", ColumnType::Simple(SimpleColumnType::Integer)),
col_with_index,
],
constraints: vec![],
},
MigrationAction::DeleteColumn {
table: "users".into(),
column: "email".into(),
},
],
};
let result = build_plan_queries(&plan, &[]).unwrap();
let queries = match backend {
DatabaseBackend::Postgres => &result[1].postgres,
DatabaseBackend::MySql => &result[1].mysql,
DatabaseBackend::Sqlite => &result[1].sqlite,
};
let sql = build_sql_snapshot(queries, backend);
with_settings!({ snapshot_path => "../snapshots", snapshot_suffix => format!("inline_index_{}", title) }, {
assert_snapshot!(sql);
});
}
#[test]
fn test_build_plan_queries_sql_content() {
let plan = MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions: vec![
MigrationAction::CreateTable {
table: "users".into(),
columns: vec![col("id", ColumnType::Simple(SimpleColumnType::Integer))],
constraints: vec![],
},
MigrationAction::DeleteTable {
table: "posts".into(),
},
],
};
let result = build_plan_queries(&plan, &[]).unwrap();
assert_eq!(result.len(), 2);
let sql1 = result[0]
.postgres
.iter()
.map(|q| q.build(DatabaseBackend::Postgres))
.collect::<Vec<_>>()
.join(";\n");
assert!(sql1.contains("CREATE TABLE"));
assert!(sql1.contains("\"users\""));
assert!(sql1.contains("\"id\""));
let sql2 = result[1]
.postgres
.iter()
.map(|q| q.build(DatabaseBackend::Postgres))
.collect::<Vec<_>>()
.join(";\n");
assert!(sql2.contains("DROP TABLE"));
assert!(sql2.contains("\"posts\""));
let sql1_mysql = result[0]
.mysql
.iter()
.map(|q| q.build(DatabaseBackend::MySql))
.collect::<Vec<_>>()
.join(";\n");
assert!(sql1_mysql.contains("`users`"));
let sql2_mysql = result[1]
.mysql
.iter()
.map(|q| q.build(DatabaseBackend::MySql))
.collect::<Vec<_>>()
.join(";\n");
assert!(sql2_mysql.contains("`posts`"));
}
#[test]
fn transactional_wrapping_leaves_empty_backend_queries_unchanged() {
let action = MigrationAction::RawSql {
sql: "-- noop".into(),
};
let queries = vec![PlanQueries {
action,
postgres: vec![],
mysql: vec![],
sqlite: vec![],
}];
let wrapped = PlanQueries::into_transactional(queries);
assert!(wrapped[0].postgres.is_empty());
assert!(wrapped[0].mysql.is_empty());
assert!(wrapped[0].sqlite.is_empty());
}
#[test]
fn transactional_wrapping_has_no_expect_on_last_non_empty_query() {
let source = include_str!("mod.rs");
assert!(!source.contains("expect(\"first non-empty backend query implies"));
}
fn fk_constraint() -> TableConstraint {
TableConstraint::ForeignKey {
name: None,
columns: vec!["category_id".into()],
ref_table: "category".into(),
ref_columns: vec!["id".into()],
on_delete: Some(ReferenceAction::Cascade),
on_update: None,
orphan_strategy: vespertide_core::ForeignKeyOrphanStrategy::default(),
}
}
fn unique_constraint() -> TableConstraint {
TableConstraint::Unique {
name: None,
columns: vec!["category_id".into()],
strategy: vespertide_core::UniqueConstraintStrategy::DeleteDuplicates {
keep: vespertide_core::KeepPolicy::First,
},
}
}
fn index_constraint() -> TableConstraint {
TableConstraint::Index {
name: None,
columns: vec!["category_id".into()],
}
}
fn plan_add_column_with_constraints(order: &[TableConstraint]) -> MigrationPlan {
let mut actions: Vec<MigrationAction> = vec![MigrationAction::AddColumn {
table: "product".into(),
column: Box::new(col(
"category_id",
ColumnType::Simple(SimpleColumnType::BigInt),
)),
fill_with: None,
}];
for c in order {
actions.push(MigrationAction::AddConstraint {
table: "product".into(),
constraint: c.clone(),
});
}
MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions,
}
}
fn plan_remove_constraints_then_drop(order: &[TableConstraint]) -> MigrationPlan {
let mut actions: Vec<MigrationAction> = Vec::new();
for c in order {
actions.push(MigrationAction::RemoveConstraint {
table: "product".into(),
constraint: c.clone(),
});
}
actions.push(MigrationAction::DeleteColumn {
table: "product".into(),
column: "category_id".into(),
});
MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions,
}
}
fn base_schema_no_constraints() -> Vec<TableDef> {
vec![TableDef {
name: "product".into(),
description: None,
columns: vec![col("id", ColumnType::Simple(SimpleColumnType::Integer))],
constraints: vec![],
}]
}
fn base_schema_with_all_constraints() -> Vec<TableDef> {
vec![TableDef {
name: "product".into(),
description: None,
columns: vec![
col("id", ColumnType::Simple(SimpleColumnType::Integer)),
col("category_id", ColumnType::Simple(SimpleColumnType::BigInt)),
],
constraints: vec![fk_constraint(), unique_constraint(), index_constraint()],
}]
}
fn collect_all_sql(result: &[PlanQueries], backend: DatabaseBackend) -> String {
result
.iter()
.enumerate()
.map(|(i, pq)| {
let queries = match backend {
DatabaseBackend::Postgres => &pq.postgres,
DatabaseBackend::MySql => &pq.mysql,
DatabaseBackend::Sqlite => &pq.sqlite,
};
let sql = build_sql_snapshot(queries, backend);
format!("-- Action {}: {:?}\n{}", i, pq.action, sql)
})
.collect::<Vec<_>>()
.join("\n\n")
}
fn assert_no_duplicate_indexes_per_action(result: &[PlanQueries]) {
for (i, pq) in result.iter().enumerate() {
let stmts: Vec<String> = pq
.sqlite
.iter()
.map(|q| q.build(DatabaseBackend::Sqlite))
.collect();
let index_stmts: Vec<&String> = stmts
.iter()
.filter(|s| s.contains("CREATE INDEX") || s.contains("CREATE UNIQUE INDEX"))
.collect();
let mut seen = std::collections::HashSet::new();
for stmt in &index_stmts {
assert!(
seen.insert(stmt.as_str()),
"Duplicate index within action {} ({:?}):\n {}\nAll index statements in this action:\n{}",
i,
pq.action,
stmt,
index_stmts
.iter()
.map(|s| format!(" {s}"))
.collect::<Vec<_>>()
.join("\n")
);
}
}
}
fn assert_no_orphan_duplicate_indexes(result: &[PlanQueries]) {
let mut live_indexes: std::collections::HashSet<String> = std::collections::HashSet::new();
for pq in result {
let stmts: Vec<String> = pq
.sqlite
.iter()
.map(|q| q.build(DatabaseBackend::Sqlite))
.collect();
if stmts.iter().any(|s| s.starts_with("DROP TABLE")) {
live_indexes.clear();
}
for stmt in &stmts {
if stmt.contains("CREATE INDEX") || stmt.contains("CREATE UNIQUE INDEX") {
assert!(
live_indexes.insert(stmt.clone()),
"Index would already exist when action {:?} tries to create it:\n {}\nCurrently live indexes:\n{}",
pq.action,
stmt,
live_indexes
.iter()
.map(|s| format!(" {s}"))
.collect::<Vec<_>>()
.join("\n")
);
}
}
for stmt in &stmts {
if stmt.starts_with("DROP INDEX") {
live_indexes.retain(|s| {
let drop_name = stmt
.strip_prefix("DROP INDEX \"")
.and_then(|s| s.strip_suffix('"'));
if let Some(name) = drop_name {
!s.contains(&format!("\"{name}\""))
} else {
true
}
});
}
}
}
}
#[rstest]
#[case::fk_unique_index("fk_uq_ix", &[fk_constraint(), unique_constraint(), index_constraint()])]
#[case::fk_index_unique("fk_ix_uq", &[fk_constraint(), index_constraint(), unique_constraint()])]
#[case::unique_fk_index("uq_fk_ix", &[unique_constraint(), fk_constraint(), index_constraint()])]
#[case::unique_index_fk("uq_ix_fk", &[unique_constraint(), index_constraint(), fk_constraint()])]
#[case::index_fk_unique("ix_fk_uq", &[index_constraint(), fk_constraint(), unique_constraint()])]
#[case::index_unique_fk("ix_uq_fk", &[index_constraint(), unique_constraint(), fk_constraint()])]
fn test_add_column_with_fk_unique_index_all_orderings(
#[case] title: &str,
#[case] order: &[TableConstraint],
) {
let plan = plan_add_column_with_constraints(order);
let schema = base_schema_no_constraints();
let result = build_plan_queries(&plan, &schema).unwrap();
assert_no_duplicate_indexes_per_action(&result);
assert_no_orphan_duplicate_indexes(&result);
for (backend, label) in [
(DatabaseBackend::Postgres, "postgres"),
(DatabaseBackend::MySql, "mysql"),
(DatabaseBackend::Sqlite, "sqlite"),
] {
let sql = collect_all_sql(&result, backend);
with_settings!({ snapshot_path => "../snapshots", snapshot_suffix => format!("add_col_{}_{}", title, label) }, {
assert_snapshot!(sql);
});
}
}
#[rstest]
#[case::fk_unique_index("fk_uq_ix", &[fk_constraint(), unique_constraint(), index_constraint()])]
#[case::fk_index_unique("fk_ix_uq", &[fk_constraint(), index_constraint(), unique_constraint()])]
#[case::unique_fk_index("uq_fk_ix", &[unique_constraint(), fk_constraint(), index_constraint()])]
#[case::unique_index_fk("uq_ix_fk", &[unique_constraint(), index_constraint(), fk_constraint()])]
#[case::index_fk_unique("ix_fk_uq", &[index_constraint(), fk_constraint(), unique_constraint()])]
#[case::index_unique_fk("ix_uq_fk", &[index_constraint(), unique_constraint(), fk_constraint()])]
fn test_remove_fk_unique_index_then_drop_column_all_orderings(
#[case] title: &str,
#[case] order: &[TableConstraint],
) {
let plan = plan_remove_constraints_then_drop(order);
let schema = base_schema_with_all_constraints();
let result = build_plan_queries(&plan, &schema).unwrap();
for (backend, label) in [
(DatabaseBackend::Postgres, "postgres"),
(DatabaseBackend::MySql, "mysql"),
(DatabaseBackend::Sqlite, "sqlite"),
] {
let sql = collect_all_sql(&result, backend);
with_settings!({ snapshot_path => "../snapshots", snapshot_suffix => format!("rm_col_{}_{}", title, label) }, {
assert_snapshot!(sql);
});
}
}
#[rstest]
#[case::fk_then_index("fk_ix", &[fk_constraint(), index_constraint()])]
#[case::index_then_fk("ix_fk", &[index_constraint(), fk_constraint()])]
fn test_add_column_with_fk_and_index_pair(
#[case] title: &str,
#[case] order: &[TableConstraint],
) {
let plan = plan_add_column_with_constraints(order);
let schema = base_schema_no_constraints();
let result = build_plan_queries(&plan, &schema).unwrap();
assert_no_duplicate_indexes_per_action(&result);
assert_no_orphan_duplicate_indexes(&result);
for (backend, label) in [
(DatabaseBackend::Postgres, "postgres"),
(DatabaseBackend::MySql, "mysql"),
(DatabaseBackend::Sqlite, "sqlite"),
] {
let sql = collect_all_sql(&result, backend);
with_settings!({ snapshot_path => "../snapshots", snapshot_suffix => format!("add_col_pair_{}_{}", title, label) }, {
assert_snapshot!(sql);
});
}
}
#[rstest]
#[case::fk_then_unique("fk_uq", &[fk_constraint(), unique_constraint()])]
#[case::unique_then_fk("uq_fk", &[unique_constraint(), fk_constraint()])]
fn test_add_column_with_fk_and_unique_pair(
#[case] title: &str,
#[case] order: &[TableConstraint],
) {
let plan = plan_add_column_with_constraints(order);
let schema = base_schema_no_constraints();
let result = build_plan_queries(&plan, &schema).unwrap();
assert_no_duplicate_indexes_per_action(&result);
assert_no_orphan_duplicate_indexes(&result);
for (backend, label) in [
(DatabaseBackend::Postgres, "postgres"),
(DatabaseBackend::MySql, "mysql"),
(DatabaseBackend::Sqlite, "sqlite"),
] {
let sql = collect_all_sql(&result, backend);
with_settings!({ snapshot_path => "../snapshots", snapshot_suffix => format!("add_col_pair_{}_{}", title, label) }, {
assert_snapshot!(sql);
});
}
}
#[rstest]
#[case::postgres("postgres", DatabaseBackend::Postgres)]
#[case::mysql("mysql", DatabaseBackend::MySql)]
#[case::sqlite("sqlite", DatabaseBackend::Sqlite)]
fn test_add_column_with_fk_no_duplicate_fk_in_temp_table(
#[case] label: &str,
#[case] backend: DatabaseBackend,
) {
let schema = vec![
TableDef {
name: "project".into(),
description: None,
columns: vec![col("id", ColumnType::Simple(SimpleColumnType::Integer))],
constraints: vec![],
},
TableDef {
name: "companion".into(),
description: None,
columns: vec![
col("id", ColumnType::Simple(SimpleColumnType::Integer)),
col("user_id", ColumnType::Simple(SimpleColumnType::BigInt)),
],
constraints: vec![
TableConstraint::ForeignKey {
name: None,
columns: vec!["user_id".into()],
ref_table: "user".into(),
ref_columns: vec!["id".into()],
on_delete: Some(ReferenceAction::Cascade),
on_update: None,
orphan_strategy: vespertide_core::ForeignKeyOrphanStrategy::default(),
},
TableConstraint::Unique {
name: Some("invite_code".into()),
columns: vec!["invite_code".into()],
strategy: vespertide_core::UniqueConstraintStrategy::DeleteDuplicates {
keep: vespertide_core::KeepPolicy::First,
},
},
TableConstraint::Index {
name: None,
columns: vec!["user_id".into()],
},
],
},
];
let plan = MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions: vec![
MigrationAction::AddColumn {
table: "companion".into(),
column: Box::new(ColumnDef {
name: "project_id".into(),
r#type: ColumnType::Simple(SimpleColumnType::BigInt),
nullable: false,
default: None,
comment: None,
primary_key: None,
unique: None,
index: None,
foreign_key: Some(
vespertide_core::schema::foreign_key::ForeignKeySyntax::String(
"project.id".into(),
),
),
}),
fill_with: None,
},
MigrationAction::AddConstraint {
table: "companion".into(),
constraint: TableConstraint::ForeignKey {
name: None,
columns: vec!["project_id".into()],
ref_table: "project".into(),
ref_columns: vec!["id".into()],
on_delete: Some(ReferenceAction::Cascade),
on_update: None,
orphan_strategy: vespertide_core::ForeignKeyOrphanStrategy::default(),
},
},
MigrationAction::AddConstraint {
table: "companion".into(),
constraint: TableConstraint::Index {
name: None,
columns: vec!["project_id".into()],
},
},
],
};
let result = build_plan_queries(&plan, &schema).unwrap();
assert_no_duplicate_indexes_per_action(&result);
assert_no_orphan_duplicate_indexes(&result);
let sql = collect_all_sql(&result, backend);
with_settings!({ snapshot_path => "../snapshots", snapshot_suffix => format!("dup_fk_{}", label) }, {
assert_snapshot!(sql);
});
}
#[rstest]
#[case::postgres("postgres", DatabaseBackend::Postgres)]
#[case::mysql("mysql", DatabaseBackend::MySql)]
#[case::sqlite("sqlite", DatabaseBackend::Sqlite)]
fn test_two_not_null_add_columns_with_inline_index_no_duplicate(
#[case] label: &str,
#[case] backend: DatabaseBackend,
) {
use vespertide_core::DefaultValue;
use vespertide_core::schema::str_or_bool::StrOrBoolOrArray;
let schema = vec![TableDef {
name: "article".into(),
description: None,
columns: vec![
col("id", ColumnType::Simple(SimpleColumnType::Integer)),
col("title", ColumnType::Simple(SimpleColumnType::Text)),
],
constraints: vec![],
}];
let plan = MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions: vec![
MigrationAction::AddColumn {
table: "article".into(),
column: Box::new(ColumnDef {
name: "category_pinned".into(),
r#type: ColumnType::Simple(SimpleColumnType::Boolean),
nullable: false,
default: Some(DefaultValue::Bool(false)),
comment: None,
primary_key: None,
unique: None,
index: Some(StrOrBoolOrArray::Bool(true)),
foreign_key: None,
}),
fill_with: None,
},
MigrationAction::AddColumn {
table: "article".into(),
column: Box::new(ColumnDef {
name: "main_pinned".into(),
r#type: ColumnType::Simple(SimpleColumnType::Boolean),
nullable: false,
default: Some(DefaultValue::Bool(false)),
comment: None,
primary_key: None,
unique: None,
index: Some(StrOrBoolOrArray::Bool(true)),
foreign_key: None,
}),
fill_with: None,
},
MigrationAction::AddConstraint {
table: "article".into(),
constraint: TableConstraint::Index {
name: None,
columns: vec!["main_pinned".into()],
},
},
MigrationAction::AddConstraint {
table: "article".into(),
constraint: TableConstraint::Index {
name: None,
columns: vec!["category_pinned".into()],
},
},
],
};
let result = build_plan_queries(&plan, &schema).unwrap();
assert_no_duplicate_indexes_per_action(&result);
assert_no_orphan_duplicate_indexes(&result);
let sql = collect_all_sql(&result, backend);
with_settings!({ snapshot_path => "../snapshots", snapshot_suffix => format!("two_not_null_inline_index_{}", label) }, {
assert_snapshot!(sql);
});
}
}