use crate::catalog::attached::Attached;
use crate::catalog::constraint::ConstraintType;
use crate::catalog::target::AttrTarget;
use crate::catalog::{Catalog, id::DbObjectId};
use crate::diff::comments::desired_comment_steps;
use crate::diff::grants::{desired_acl_steps, grant_target_object};
use crate::diff::operations::{
ColumnAction, MigrationStep, OperationKind, PolicyOperation, SequenceOperation, TableOperation,
};
use std::collections::{BTreeSet, HashMap, HashSet};
pub fn expand(
steps: Vec<MigrationStep>,
old_catalog: &Catalog,
new_catalog: &Catalog,
) -> Vec<MigrationStep> {
let mut extra_steps: Vec<MigrationStep> = Vec::new();
let mut cascaded_ids: HashSet<DbObjectId> = HashSet::new();
let mut drop_counts: HashMap<DbObjectId, usize> = HashMap::new();
for step in &steps {
let id = step.id();
if step.operation_kind() == OperationKind::Drop {
*drop_counts.entry(id).or_insert(0) += 1;
}
}
let mut visited: HashSet<DbObjectId> = HashSet::new();
for id in drop_counts.keys() {
collect_dependents(id, old_catalog, &mut visited);
}
for id in visited {
if drop_counts.get(&id).copied().unwrap_or(0) > 0 {
continue;
}
if let Some(steps) = old_catalog.synthesize_drop_create(&id, new_catalog) {
extra_steps.extend(steps);
cascaded_ids.insert(id);
}
}
let tables_with_type_changes = tables_with_column_type_changes(&steps);
for table_id in &tables_with_type_changes {
if let Some(deps) = old_catalog.reverse_deps.get(table_id) {
for dep in deps {
let should_cascade = match dep {
DbObjectId::Function { .. } | DbObjectId::Trigger { .. } => {
drop_counts.get(dep).copied().unwrap_or(0) == 0
}
_ => false,
};
if should_cascade
&& !cascaded_ids.contains(dep)
&& let Some(steps) = old_catalog.synthesize_drop_create(dep, new_catalog)
{
extra_steps.extend(steps);
cascaded_ids.insert(dep.clone());
}
}
}
}
let fk_constraints_to_cascade = fk_constraints_affected_by_type_changes(&steps, old_catalog);
for constraint_id in &fk_constraints_to_cascade {
if !cascaded_ids.contains(constraint_id)
&& let Some(steps) = old_catalog.synthesize_drop_create(constraint_id, new_catalog)
{
extra_steps.extend(steps);
cascaded_ids.insert(constraint_id.clone());
}
}
let dropped_columns = columns_being_dropped(&steps);
let type_changed_columns = columns_with_type_changes_ids(&steps);
let affected_columns: HashSet<_> = dropped_columns
.union(&type_changed_columns)
.cloned()
.collect();
for column_id in &affected_columns {
if let Some(deps) = old_catalog.reverse_deps.get(column_id) {
for dep in deps {
if !cascaded_ids.contains(dep)
&& drop_counts.get(dep).copied().unwrap_or(0) == 0
&& new_catalog.contains_id(dep)
&& let Some(steps) = old_catalog.synthesize_drop_create(dep, new_catalog)
{
extra_steps.extend(steps);
cascaded_ids.insert(dep.clone());
}
}
}
}
let mut all = steps;
all.extend(extra_steps);
let all = filter_cascaded_alters(all, &cascaded_ids);
let filtered = filter_owned_sequence_drops(all, old_catalog);
let filtered = filter_policy_drops(filtered, old_catalog);
reapply_attached_state_for_recreated_objects(filtered, new_catalog)
}
fn reapply_attached_state_for_recreated_objects(
steps: Vec<MigrationStep>,
new_catalog: &Catalog,
) -> Vec<MigrationStep> {
let recreated: BTreeSet<DbObjectId> = steps
.iter()
.filter(|step| {
!step.is_grant()
&& step.operation_kind() == OperationKind::Drop
&& new_catalog.contains_id(&step.id())
})
.map(|step| step.id())
.collect();
if recreated.is_empty() {
return steps;
}
let recreated_attached: Vec<&dyn Attached> = new_catalog
.attached_objects()
.into_iter()
.filter(|o| recreated.contains(&o.object_id()))
.collect();
let recreated_comment_targets: BTreeSet<AttrTarget> = recreated_attached
.iter()
.flat_map(|o| o.comment_targets().into_iter().map(|(t, _)| t))
.collect();
let mut result: Vec<MigrationStep> = steps
.into_iter()
.filter(|step| match step {
MigrationStep::Grant(op) => !recreated.contains(&grant_target_object(op)),
MigrationStep::Comment(op) => !recreated_comment_targets.contains(op.target()),
_ => true,
})
.collect();
for id in &recreated {
result.extend(desired_acl_steps(id, new_catalog));
}
for obj in &recreated_attached {
result.extend(desired_comment_steps(*obj));
}
result
}
fn collect_dependents(id: &DbObjectId, catalog: &Catalog, out: &mut HashSet<DbObjectId>) {
if out.insert(id.clone())
&& let Some(deps) = catalog.reverse_deps.get(id)
{
for dep in deps {
collect_dependents(dep, catalog, out);
}
}
}
fn filter_owned_sequence_drops(
steps: Vec<MigrationStep>,
old_catalog: &Catalog,
) -> Vec<MigrationStep> {
let tables_being_dropped: HashSet<(String, String)> = steps
.iter()
.filter_map(|step| {
if let MigrationStep::Table(TableOperation::Drop { schema, name }) = step {
Some((schema.clone(), name.clone()))
} else {
None
}
})
.collect();
if tables_being_dropped.is_empty() {
return steps;
}
let mut sequence_owners: HashMap<(String, String), (String, String)> = HashMap::new();
for seq in &old_catalog.sequences {
if let Some(owned_by) = &seq.owned_by {
let parts: Vec<&str> = owned_by.splitn(3, '.').collect();
if parts.len() >= 2 {
let owner_schema = parts[0].to_string();
let owner_table = parts[1].to_string();
sequence_owners.insert(
(seq.schema.clone(), seq.name.clone()),
(owner_schema, owner_table),
);
}
}
}
steps
.into_iter()
.filter(|step| {
if let MigrationStep::Sequence(SequenceOperation::Drop { schema, name }) = step
&& let Some((owner_schema, owner_table)) =
sequence_owners.get(&(schema.clone(), name.clone()))
&& tables_being_dropped.contains(&(owner_schema.clone(), owner_table.clone()))
{
return false;
}
true
})
.collect()
}
fn filter_policy_drops(steps: Vec<MigrationStep>, _old_catalog: &Catalog) -> Vec<MigrationStep> {
let tables_being_dropped: HashSet<(String, String)> = steps
.iter()
.filter_map(|step| {
if let MigrationStep::Table(TableOperation::Drop { schema, name }) = step {
Some((schema.clone(), name.clone()))
} else {
None
}
})
.collect();
if tables_being_dropped.is_empty() {
return steps;
}
steps
.into_iter()
.filter(|step| {
if let MigrationStep::Policy(PolicyOperation::Drop { identifier }) = step
&& tables_being_dropped
.contains(&(identifier.schema.clone(), identifier.table.clone()))
{
return false;
}
true
})
.collect()
}
fn filter_cascaded_alters(
steps: Vec<MigrationStep>,
cascaded_ids: &HashSet<DbObjectId>,
) -> Vec<MigrationStep> {
if cascaded_ids.is_empty() {
return steps;
}
steps
.into_iter()
.filter(|step| {
if step.operation_kind() == OperationKind::Alter {
let step_id = step.id();
if cascaded_ids.contains(&step_id) {
return false;
}
}
true
})
.collect()
}
fn tables_with_column_type_changes(steps: &[MigrationStep]) -> HashSet<DbObjectId> {
steps
.iter()
.filter_map(|step| {
if let MigrationStep::Table(TableOperation::Alter {
schema,
name,
actions,
}) = step
{
let has_type_change = actions
.iter()
.any(|a| matches!(a, ColumnAction::AlterType { .. }));
if has_type_change {
return Some(DbObjectId::Table {
schema: schema.clone(),
name: name.clone(),
});
}
}
None
})
.collect()
}
fn columns_with_type_changes(
steps: &[MigrationStep],
) -> HashMap<(String, String), HashSet<String>> {
let mut result: HashMap<(String, String), HashSet<String>> = HashMap::new();
for step in steps {
if let MigrationStep::Table(TableOperation::Alter {
schema,
name,
actions,
}) = step
{
for action in actions {
if let ColumnAction::AlterType { name: col_name, .. } = action {
result
.entry((schema.clone(), name.clone()))
.or_default()
.insert(col_name.clone());
}
}
}
}
result
}
fn fk_constraints_affected_by_type_changes(
steps: &[MigrationStep],
old_catalog: &Catalog,
) -> HashSet<DbObjectId> {
let columns_changing = columns_with_type_changes(steps);
if columns_changing.is_empty() {
return HashSet::new();
}
let mut affected = HashSet::new();
for constraint in &old_catalog.constraints {
if let ConstraintType::ForeignKey {
columns,
referenced_schema,
referenced_table,
referenced_columns,
..
} = &constraint.constraint_type
{
let table_key = (constraint.schema.clone(), constraint.table_name.clone());
if let Some(changing_cols) = columns_changing.get(&table_key)
&& columns.iter().any(|col| changing_cols.contains(col))
{
affected.insert(constraint.id());
continue;
}
let ref_table_key = (referenced_schema.clone(), referenced_table.clone());
if let Some(changing_cols) = columns_changing.get(&ref_table_key)
&& referenced_columns
.iter()
.any(|col| changing_cols.contains(col))
{
affected.insert(constraint.id());
}
}
}
affected
}
fn columns_being_dropped(steps: &[MigrationStep]) -> HashSet<DbObjectId> {
let mut result = HashSet::new();
for step in steps {
if let MigrationStep::Table(TableOperation::Alter {
schema,
name,
actions,
}) = step
{
for action in actions {
if let ColumnAction::Drop { name: col_name } = action {
result.insert(DbObjectId::Column {
schema: schema.clone(),
table: name.clone(),
column: col_name.clone(),
});
}
}
}
}
result
}
fn columns_with_type_changes_ids(steps: &[MigrationStep]) -> HashSet<DbObjectId> {
let mut result = HashSet::new();
for step in steps {
if let MigrationStep::Table(TableOperation::Alter {
schema,
name,
actions,
}) = step
{
for action in actions {
if let ColumnAction::AlterType { name: col_name, .. } = action {
result.insert(DbObjectId::Column {
schema: schema.clone(),
table: name.clone(),
column: col_name.clone(),
});
}
}
}
}
result
}