pub mod aggregates;
pub mod cascade;
pub mod casts;
pub mod columns;
pub mod comments;
pub mod constraints;
pub mod custom_types;
pub mod domains;
pub mod extensions;
pub mod functions;
pub mod grants;
pub mod indexes;
pub mod namespace;
pub mod operations;
pub mod operators;
pub mod policies;
pub mod schemas;
pub mod sequences;
pub mod tables;
pub mod triggers;
pub mod views;
use crate::catalog::id::{DbObjectId, DependsOn};
use crate::catalog::utils::is_system_schema;
use crate::catalog::{
Catalog, aggregate::Aggregate, cast::Cast, constraint::Constraint, custom_type::CustomType,
domain::Domain, extension::Extension, function::Function, index::Index, operator::Operator,
sequence::Sequence, table::Table, view::View,
};
use crate::diff::operations::{MigrationStep, OperationKind};
use petgraph::algo::toposort;
use petgraph::graph::DiGraph;
use std::collections::{BTreeMap, BTreeSet};
use tracing::{info, warn};
pub fn plan(old: &Catalog, new: &Catalog) -> anyhow::Result<Vec<MigrationStep>> {
let steps = diff_all(old, new);
let expanded = cascade::expand(steps, old, new);
diff_order(expanded, old, new)
}
pub fn diff_all(old: &Catalog, new: &Catalog) -> Vec<MigrationStep> {
info!("Diffing catalogs...");
let mut out = Vec::new();
out.extend(diff_list(
&old.schemas,
&new.schemas,
|s| DbObjectId::Schema {
name: s.name.clone(),
},
schemas::diff,
));
out.extend(diff_list(
&old.extensions,
&new.extensions,
Extension::id,
extensions::diff,
));
out.extend(diff_list(
&old.types,
&new.types,
CustomType::id,
custom_types::diff,
));
out.extend(diff_list(
&old.domains,
&new.domains,
Domain::id,
domains::diff,
));
out.extend(diff_list(
&old.sequences,
&new.sequences,
Sequence::id,
sequences::diff,
));
out.extend(diff_list(&old.tables, &new.tables, Table::id, tables::diff));
out.extend(diff_list(
&old.indexes,
&new.indexes,
Index::id,
indexes::diff,
));
out.extend(diff_list(
&old.constraints,
&new.constraints,
Constraint::id,
constraints::diff,
));
out.extend(diff_list(
&old.triggers,
&new.triggers,
|t| t.id(),
triggers::diff,
));
out.extend(diff_list(
&old.policies,
&new.policies,
|p| DbObjectId::Policy {
schema: p.schema.clone(),
table: p.table_name.clone(),
name: p.name.clone(),
},
policies::diff,
));
out.extend(diff_list(&old.views, &new.views, View::id, views::diff));
out.extend(diff_list(
&old.functions,
&new.functions,
Function::id,
functions::diff,
));
out.extend(diff_list(
&old.aggregates,
&new.aggregates,
Aggregate::id,
aggregates::diff,
));
out.extend(diff_list(
&old.operators,
&new.operators,
Operator::id,
operators::diff,
));
out.extend(diff_list(&old.casts, &new.casts, Cast::id, casts::diff));
out.extend(grants::diff_grants(&old.grants, &new.grants));
out.extend(comments::diff_comments(old, new));
info!("Diff complete");
out
}
pub fn diff_list<T, I: Eq + Ord + Clone, R>(
old: &[T],
new: &[T],
id_of: impl Fn(&T) -> I,
diff_fn: impl Fn(Option<&T>, Option<&T>) -> Vec<R>,
) -> Vec<R> {
let mut old_map = BTreeMap::new();
for o in old {
old_map.insert(id_of(o), o);
}
let mut new_map = BTreeMap::new();
for n in new {
new_map.insert(id_of(n), n);
}
let mut results = Vec::new();
for o in old {
let id = id_of(o);
if !new_map.contains_key(&id) {
results.extend(diff_fn(Some(o), None));
}
}
for n in new {
let id = id_of(n);
results.extend(diff_fn(old_map.get(&id).cloned(), Some(n)));
}
results
}
pub fn diff_order(
steps: Vec<MigrationStep>,
old_catalog: &Catalog,
new_catalog: &Catalog,
) -> anyhow::Result<Vec<MigrationStep>> {
info!("Ordering migration steps...");
let steps = grants::coalesce_column_grants(steps);
let mut primary_steps = Vec::new();
let mut relationship_steps = Vec::new();
let relationship_ids: BTreeSet<DbObjectId> = steps
.iter()
.filter(|step| step.is_relationship())
.map(|step| step.id())
.collect();
for step in steps {
if step.is_relationship() {
relationship_steps.push(step);
} else {
let id = step.id();
if let DbObjectId::Comment { object_id } = &id {
if relationship_ids.contains(object_id.as_ref()) {
relationship_steps.push(step);
} else {
primary_steps.push(step);
}
} else {
primary_steps.push(step);
}
}
}
let mut ordered_steps = order_steps_by_dependencies(primary_steps, old_catalog, new_catalog)?;
let ordered_relationships =
order_steps_by_dependencies(relationship_steps, old_catalog, new_catalog)?;
ordered_steps.extend(ordered_relationships);
Ok(ordered_steps)
}
fn resolve_for_ordering(dep: &DbObjectId) -> DbObjectId {
match dep {
DbObjectId::Column { schema, table, .. } => DbObjectId::Table {
schema: schema.clone(),
name: table.clone(),
},
other => other.clone(),
}
}
fn order_steps_by_dependencies(
steps: Vec<MigrationStep>,
old_catalog: &Catalog,
new_catalog: &Catalog,
) -> anyhow::Result<Vec<MigrationStep>> {
let mut graph: DiGraph<usize, ()> = DiGraph::new();
let mut id_to_indices: BTreeMap<DbObjectId, Vec<usize>> = BTreeMap::new();
let mut node_indices = Vec::new();
for (i, step) in steps.iter().enumerate() {
let idx = graph.add_node(i);
node_indices.push(idx);
id_to_indices.entry(step.id()).or_default().push(i);
}
let mut missing_deps: Vec<(DbObjectId, DbObjectId)> = Vec::new();
for (i, step) in steps.iter().enumerate() {
let is_drop = step.operation_kind() == OperationKind::Drop;
if let DbObjectId::Comment { object_id } = &step.id() {
if let Some(indices) = id_to_indices.get(object_id.as_ref()) {
for &dep_i in indices {
let from = node_indices[dep_i];
let to = node_indices[i];
graph.add_edge(from, to, ());
}
}
if let DbObjectId::Constraint { schema, table, .. } = object_id.as_ref() {
let table_id = DbObjectId::Table {
schema: schema.clone(),
name: table.clone(),
};
if let Some(indices) = id_to_indices.get(&table_id) {
for &dep_i in indices {
let from = node_indices[dep_i];
let to = node_indices[i];
graph.add_edge(from, to, ());
}
}
}
continue;
}
let catalog_deps = if is_drop {
old_catalog.forward_deps.get(&step.id())
} else {
new_catalog.forward_deps.get(&step.id())
};
if let Some(deps) = catalog_deps {
for dep in deps {
let resolved_dep = resolve_for_ordering(dep);
if let Some(indices) = id_to_indices.get(&resolved_dep) {
for &dep_i in indices {
let from = node_indices[if is_drop { i } else { dep_i }];
let to = node_indices[if is_drop { dep_i } else { i }];
graph.add_edge(from, to, ());
}
} else {
let catalog = if is_drop { old_catalog } else { new_catalog };
if !catalog.contains_id(&resolved_dep) {
missing_deps.push((step.id(), dep.clone()));
}
}
}
} else {
let step_deps = step.dependencies();
for dep in &step_deps {
let resolved_dep = resolve_for_ordering(dep);
if let Some(indices) = id_to_indices.get(&resolved_dep) {
for &dep_i in indices {
let from = node_indices[dep_i];
let to = node_indices[i];
graph.add_edge(from, to, ());
}
} else {
if !new_catalog.contains_id(&resolved_dep) {
missing_deps.push((step.id(), dep.clone()));
}
}
}
}
}
for (object_id, missing_dep) in &missing_deps {
if let Some(schema) = missing_dep.schema()
&& is_system_schema(schema)
{
continue;
}
warn!(
"{:?} depends on {:?} which is not in the catalog (may be filtered by config)",
object_id, missing_dep
);
}
let mut drop_indices = BTreeMap::new();
let mut create_indices = BTreeMap::new();
let mut other_indices = BTreeMap::new();
for (i, step) in steps.iter().enumerate() {
let id = step.id();
match step.operation_kind() {
OperationKind::Drop => {
drop_indices.entry(id).or_insert_with(Vec::new).push(i);
}
OperationKind::Create => {
create_indices.entry(id).or_insert_with(Vec::new).push(i);
}
OperationKind::Alter => {
other_indices.entry(id).or_insert_with(Vec::new).push(i);
}
}
}
for (id, drops) in drop_indices {
if let Some(creates) = create_indices.get(&id) {
for &drop_i in &drops {
for &create_i in creates {
let from = node_indices[drop_i];
let to = node_indices[create_i];
graph.add_edge(from, to, ());
}
}
}
}
{
let mut slot_drops: BTreeMap<namespace::NamespaceSlot, Vec<usize>> = BTreeMap::new();
let mut slot_creates: BTreeMap<namespace::NamespaceSlot, Vec<usize>> = BTreeMap::new();
for (i, step) in steps.iter().enumerate() {
let slots = namespace::namespace_slots(&step.id());
match step.operation_kind() {
OperationKind::Drop => {
for slot in slots {
slot_drops.entry(slot).or_default().push(i);
}
}
OperationKind::Create => {
for slot in slots {
slot_creates.entry(slot).or_default().push(i);
}
}
OperationKind::Alter => {}
}
}
for (slot, drops) in &slot_drops {
if let Some(creates) = slot_creates.get(slot) {
for &drop_i in drops {
for &create_i in creates {
if drop_i != create_i {
graph.add_edge(node_indices[drop_i], node_indices[create_i], ());
}
}
}
}
}
}
for (id, creates) in create_indices {
if let Some(others) = other_indices.get(&id) {
for &create_i in &creates {
for &other_i in others {
let from = node_indices[create_i];
let to = node_indices[other_i];
graph.add_edge(from, to, ());
}
}
}
}
{
let mut func_drops: BTreeMap<(String, String), Vec<usize>> = BTreeMap::new();
let mut func_creates: BTreeMap<(String, String), Vec<usize>> = BTreeMap::new();
for (i, step) in steps.iter().enumerate() {
let routine_name = match &step.id() {
DbObjectId::Function { schema, name, .. }
| DbObjectId::Procedure { schema, name, .. } => {
Some((schema.clone(), name.clone()))
}
_ => None,
};
if let Some(key) = routine_name {
match step.operation_kind() {
OperationKind::Drop => func_drops.entry(key).or_default().push(i),
OperationKind::Create => func_creates.entry(key).or_default().push(i),
_ => {}
}
}
}
for (key, drops) in &func_drops {
if let Some(creates) = func_creates.get(key) {
for &drop_i in drops {
for &create_i in creates {
let from = node_indices[drop_i];
let to = node_indices[create_i];
graph.add_edge(from, to, ());
}
}
}
}
}
let extension_create_indices: Vec<usize> = steps
.iter()
.enumerate()
.filter_map(|(i, step)| {
if matches!(step, MigrationStep::Extension(_))
&& step.operation_kind() == OperationKind::Create
{
Some(i)
} else {
None
}
})
.collect();
let non_extension_create_indices: Vec<usize> = steps
.iter()
.enumerate()
.filter_map(|(i, step)| {
if !matches!(step, MigrationStep::Extension(_) | MigrationStep::Schema(_))
&& step.operation_kind() == OperationKind::Create
{
Some(i)
} else {
None
}
})
.collect();
for &ext_i in &extension_create_indices {
for &obj_i in &non_extension_create_indices {
let from = node_indices[ext_i];
let to = node_indices[obj_i];
graph.add_edge(from, to, ());
}
}
let index_to_step_idx: BTreeMap<_, _> = node_indices
.iter()
.enumerate()
.map(|(i, &node)| (node, i))
.collect();
let sorted = toposort(&graph, None)
.map_err(|cycle| {
let node = cycle.node_id();
if let Some(&step_idx) = index_to_step_idx.get(&node) {
let step = &steps[step_idx];
let step_type = match step {
MigrationStep::Schema(_) => "Schema",
MigrationStep::Table(_) => "Table",
MigrationStep::View(_) => "View",
MigrationStep::Type(_) => "Type",
MigrationStep::Domain(_) => "Domain",
MigrationStep::Sequence(_) => "Sequence",
MigrationStep::Function(_) => "Function",
MigrationStep::Aggregate(_) => "Aggregate",
MigrationStep::Operator(_) => "Operator",
MigrationStep::Cast(_) => "Cast",
MigrationStep::Index(_) => "Index",
MigrationStep::Constraint(_) => "Constraint",
MigrationStep::Trigger(_) => "Trigger",
MigrationStep::Policy(_) => "Policy",
MigrationStep::Extension(_) => "Extension",
MigrationStep::Grant(_) => "Grant",
MigrationStep::Comment(_) => "Comment",
};
anyhow::anyhow!(
"Dependency cycle detected involving {} operation on {:?}. This usually indicates circular dependencies between database objects. Check for circular references in your schema.",
step_type,
step.id()
)
} else {
anyhow::anyhow!("Dependency cycle detected in migration ordering. This usually indicates circular dependencies between database objects.")
}
})?;
let ordered = sorted
.into_iter()
.filter_map(|node| index_to_step_idx.get(&node).map(|&i| steps[i].clone()))
.collect();
Ok(ordered)
}