use rayon::prelude::*;
use vespertide_core::{MigrationAction, MigrationPlan, TableConstraint, TableDef};
use vespertide_planner::apply_action;
use super::{PlanQueries, action_target_table};
use crate::DatabaseBackend;
use crate::error::QueryError;
use crate::parallel_config::PLAN_QUERY_PAR_ACTION_MIN_LEN;
use crate::sql::build_action_queries_with_pending;
struct PreparedAction {
action: MigrationAction,
schema: Vec<TableDef>,
pending_constraints: Vec<TableConstraint>,
}
pub(super) fn build_plan_queries_in_parallel(
plan: &MigrationPlan,
current_schema: &[TableDef],
) -> Result<Vec<PlanQueries>, QueryError> {
let prepared_actions = prepare_actions(plan, current_schema);
prepared_actions
.par_iter()
.with_min_len(PLAN_QUERY_PAR_ACTION_MIN_LEN)
.map(build_prepared_action_queries)
.collect()
}
fn prepare_actions(plan: &MigrationPlan, current_schema: &[TableDef]) -> Vec<PreparedAction> {
let mut prepared_actions = Vec::with_capacity(plan.actions.len());
let mut evolving_schema = current_schema.to_vec();
for (i, action) in plan.actions.iter().enumerate() {
prepared_actions.push(PreparedAction {
action: action.clone(),
schema: evolving_schema.clone(),
pending_constraints: pending_constraints_for_action(plan, i, action),
});
let _ = apply_action(&mut evolving_schema, action);
}
prepared_actions
}
fn pending_constraints_for_action(
plan: &MigrationPlan,
action_index: usize,
action: &MigrationAction,
) -> Vec<TableConstraint> {
let Some(table) = action_target_table(action) else {
return vec![];
};
plan.actions[action_index + 1..]
.iter()
.filter_map(|a| {
if let MigrationAction::AddConstraint {
table: t,
constraint,
} = a
&& t == table
&& matches!(
constraint,
TableConstraint::Index { .. } | TableConstraint::Unique { .. }
)
{
Some(constraint.clone())
} else {
None
}
})
.collect()
}
fn build_prepared_action_queries(prepared: &PreparedAction) -> Result<PlanQueries, QueryError> {
let postgres_queries = build_action_queries_with_pending(
DatabaseBackend::Postgres,
&prepared.action,
&prepared.schema,
&prepared.pending_constraints,
)?;
let mysql_queries = build_action_queries_with_pending(
DatabaseBackend::MySql,
&prepared.action,
&prepared.schema,
&prepared.pending_constraints,
)?;
let sqlite_queries = build_action_queries_with_pending(
DatabaseBackend::Sqlite,
&prepared.action,
&prepared.schema,
&prepared.pending_constraints,
)?;
Ok(PlanQueries {
action: prepared.action.clone(),
postgres: postgres_queries,
mysql: mysql_queries,
sqlite: sqlite_queries,
})
}
#[cfg(test)]
#[expect(
unsafe_code,
reason = "tests that drive the parallel builder path must set/unset VESPERTIDE_PLAN_QUERY_PAR_THRESHOLD via std::env::{set_var, remove_var}; serialized via #[serial] to avoid cross-test races"
)]
mod tests {
use super::*;
use crate::builder::build_plan_queries;
use crate::sql::BuiltQuery;
use crate::sql::types::DatabaseBackend as _Backend;
use serial_test::serial;
use vespertide_core::{
ColumnDef, ColumnType, ForeignKeyOrphanStrategy, ReferenceAction, SimpleColumnType,
TableDef,
};
fn nn_col(name: &str, ty: SimpleColumnType) -> ColumnDef {
ColumnDef::new(name, ColumnType::Simple(ty), false)
}
fn index(name: Option<&str>, column: &str) -> TableConstraint {
TableConstraint::Index {
name: name.map(Into::into),
columns: vec![column.into()],
}
}
fn foreign_key() -> TableConstraint {
TableConstraint::ForeignKey {
name: Some("fk_u__pk".into()),
columns: vec!["pk".into()],
ref_table: "other".into(),
ref_columns: vec!["id".into()],
on_delete: Some(ReferenceAction::Cascade),
on_update: None,
orphan_strategy: ForeignKeyOrphanStrategy::default(),
}
}
fn table(name: &str, constraints: Vec<TableConstraint>) -> TableDef {
TableDef {
name: name.into(),
description: None,
columns: vec![nn_col("pk", SimpleColumnType::Integer)],
constraints,
}
}
fn schema_u_with_constraints(constraints: Vec<TableConstraint>) -> Vec<TableDef> {
vec![table("u", constraints)]
}
fn schema_u_and_v_with_u_constraints(constraints: Vec<TableConstraint>) -> Vec<TableDef> {
vec![table("u", constraints), table("v", vec![])]
}
fn add_required_column(table: &str, column: &str) -> MigrationAction {
MigrationAction::AddColumn {
table: table.into(),
column: Box::new(nn_col(column, SimpleColumnType::Integer)),
fill_with: None,
}
}
fn sqlite_sql(queries: &[BuiltQuery]) -> String {
queries
.iter()
.map(|q| q.build(_Backend::Sqlite))
.collect::<Vec<_>>()
.join("\n")
}
#[test]
#[serial]
fn build_plan_queries_uses_parallel_path_under_threshold_override() {
unsafe {
std::env::set_var("VESPERTIDE_PLAN_QUERY_PAR_THRESHOLD", "1");
}
let plan = MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions: vec![
MigrationAction::CreateTable {
table: "a".into(),
columns: vec![nn_col("id", SimpleColumnType::Integer)],
constraints: vec![],
},
MigrationAction::CreateTable {
table: "b".into(),
columns: vec![nn_col("id", SimpleColumnType::Integer)],
constraints: vec![],
},
MigrationAction::AddConstraint {
table: "a".into(),
constraint: TableConstraint::Index {
name: None,
columns: vec!["id".into()],
},
},
],
};
let result = build_plan_queries(&plan, &[]).unwrap();
assert_eq!(result.len(), 3);
unsafe {
std::env::remove_var("VESPERTIDE_PLAN_QUERY_PAR_THRESHOLD");
}
}
#[test]
fn pending_constraints_start_after_current_action_parallel() {
let current_index = index(Some("ix_u__pk"), "pk");
let plan = MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions: vec![MigrationAction::AddConstraint {
table: "u".into(),
constraint: current_index,
}],
};
let pending = pending_constraints_for_action(&plan, 0, &plan.actions[0]);
assert!(
pending.is_empty(),
"current action must not be included in its own pending set (parallel): {pending:?}"
);
}
#[test]
fn sqlite_rebuild_skips_later_same_table_pending_index_parallel() {
let pending_index = index(None, "pk");
let plan = MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions: vec![
add_required_column("u", "extra"),
MigrationAction::AddConstraint {
table: "u".into(),
constraint: pending_index.clone(),
},
],
};
let result =
build_plan_queries_in_parallel(&plan, &schema_u_with_constraints(vec![pending_index]))
.unwrap();
assert_eq!(result.len(), 2);
let sqlite_sql_0 = sqlite_sql(&result[0].sqlite);
assert!(
!sqlite_sql_0.contains("CREATE INDEX \"ix_u__pk\""),
"same-table pending index must be deferred to action 1 (parallel), got:\n{sqlite_sql_0}"
);
let sqlite_sql_1 = sqlite_sql(&result[1].sqlite);
assert!(
sqlite_sql_1.contains("CREATE INDEX \"ix_u__pk\""),
"later AddConstraint must create the deferred index (parallel), got:\n{sqlite_sql_1}"
);
}
#[test]
fn sqlite_rebuild_recreates_existing_index_when_matching_later_index_is_other_table_parallel() {
let existing_u_index = index(None, "pk");
let plan = MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions: vec![
add_required_column("u", "extra"),
MigrationAction::AddConstraint {
table: "v".into(),
constraint: existing_u_index.clone(),
},
],
};
let result = build_plan_queries_in_parallel(
&plan,
&schema_u_and_v_with_u_constraints(vec![existing_u_index]),
)
.unwrap();
assert_eq!(result.len(), 2);
let sqlite_sql_0 = sqlite_sql(&result[0].sqlite);
assert!(
sqlite_sql_0.contains("CREATE INDEX \"ix_u__pk\" ON \"u\" (\"pk\")"),
"wrong-table pending index must not suppress recreating u.pk (parallel), got:\n{sqlite_sql_0}"
);
}
#[test]
fn pending_constraints_exclude_same_table_non_index_constraint_parallel() {
let plan = MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions: vec![
add_required_column("u", "extra"),
MigrationAction::AddConstraint {
table: "u".into(),
constraint: foreign_key(),
},
],
};
let pending = pending_constraints_for_action(&plan, 0, &plan.actions[0]);
assert!(
pending.is_empty(),
"foreign keys must not be collected as pending indexes (parallel): {pending:?}"
);
}
#[test]
#[serial]
fn build_plan_queries_parallel_path_skips_no_target_actions() {
unsafe {
std::env::set_var("VESPERTIDE_PLAN_QUERY_PAR_THRESHOLD", "1");
}
let plan = MigrationPlan {
id: String::new(),
comment: None,
created_at: None,
version: 1,
actions: vec![
MigrationAction::RawSql {
sql: "SELECT 1;".into(),
},
MigrationAction::RenameTable {
from: "a".into(),
to: "b".into(),
},
],
};
let result = build_plan_queries(&plan, &[]).unwrap();
assert_eq!(result.len(), 2);
unsafe {
std::env::remove_var("VESPERTIDE_PLAN_QUERY_PAR_THRESHOLD");
}
let _ = _Backend::Postgres;
}
}