Skip to main content

architect_sdk/
migration.rs

1//! Apply config to the database: DDL for schemas, enums, tables, indexes, and foreign keys.
2//! Order follows PostgreSQL dependencies (see docs/postgres-config-schema.md § 3.5).
3
4use crate::config::types::*;
5use crate::config::{validate, FullConfig};
6use crate::db::parse_canonical;
7use crate::db::pool::Pool;
8use crate::db::Dialect;
9use crate::error::AppError;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12
13fn quote(s: &str) -> String {
14    format!("\"{}\"", s.replace('\\', "\\\\").replace('"', "\\\""))
15}
16
17/// Name of the column added to app tables when RLS is enabled. Used by migration and CRUD.
18pub const RLS_TENANT_COLUMN: &str = "tenant_id";
19
20/// Apply full config to the database: CREATE SCHEMA, CREATE TYPE, CREATE TABLE, CREATE INDEX, ADD FK.
21/// Validates config first. Idempotent for schemas and types (IF NOT EXISTS); tables are CREATE TABLE only (fails if exists).
22/// When `schema_override` is `Some(s)`, app tables/indexes/FKs are created in schema `s` instead of config schema names (e.g. for schema-strategy tenants).
23/// When `rls_tenant_column` is `Some(col)`, each table gets that column (if missing), RLS enabled, and policies using `current_setting('app.tenant_id', true)`.
24pub async fn apply_migrations(
25    pool: &Pool,
26    config: &FullConfig,
27    schema_override: Option<&str>,
28    rls_tenant_column: Option<&str>,
29    dialect: &dyn Dialect,
30) -> Result<(), AppError> {
31    validate(config)?;
32    let default_sid = config
33        .schemas
34        .first()
35        .map(|s| s.id.as_str())
36        .ok_or_else(|| {
37            AppError::Config(crate::error::ConfigError::Validation(
38                "at least one schema required".into(),
39            ))
40        })?;
41
42    if let Some(s) = schema_override {
43        let name = quote(s);
44        sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {}", name))
45            .execute(pool)
46            .await?;
47    }
48
49    let schemas_by_id: HashMap<_, _> = config.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
50    let tables_by_id: HashMap<_, _> = config.tables.iter().map(|t| (t.id.as_str(), t)).collect();
51    let columns_by_table: HashMap<_, Vec<&ColumnConfig>> =
52        config.columns.iter().fold(HashMap::new(), |mut m, c| {
53            m.entry(c.table_id.as_str()).or_default().push(c);
54            m
55        });
56
57    // When schema_override is set, we only create the override schema; otherwise create config schemas.
58    if schema_override.is_none() {
59        for s in &config.schemas {
60            let name = quote(&s.name);
61            let comment = s
62                .comment
63                .as_ref()
64                .map(|c| format!("COMMENT ON SCHEMA {} IS '{}'", name, c.replace('\'', "''")));
65            sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {}", name))
66                .execute(pool)
67                .await?;
68            if let Some(sql) = comment {
69                let _ = sqlx::query(&sql).execute(pool).await;
70            }
71        }
72    }
73
74    for e in &config.enums {
75        let sid = e.schema_id.as_deref().unwrap_or(default_sid);
76        let schema = schemas_by_id.get(sid).ok_or_else(|| {
77            AppError::Config(crate::error::ConfigError::MissingReference {
78                kind: "schema",
79                id: sid.to_string(),
80            })
81        })?;
82        let schema_name = quote(schema_override.unwrap_or(&schema.name));
83        let type_name = quote(&e.name);
84        if dialect.supports_named_enum_types() {
85            let values: Vec<String> = e
86                .values
87                .iter()
88                .map(|v| format!("'{}'", v.replace('\'', "''")))
89                .collect();
90            let sql = format!(
91                "CREATE TYPE {}.{} AS ENUM ({})",
92                schema_name,
93                type_name,
94                values.join(", ")
95            );
96            let _ = sqlx::query(&sql).execute(pool).await;
97        }
98    }
99
100    for t in &config.tables {
101        let sid = t.schema_id.as_deref().unwrap_or(default_sid);
102        let schema = schemas_by_id.get(sid).ok_or_else(|| {
103            AppError::Config(crate::error::ConfigError::MissingReference {
104                kind: "schema",
105                id: sid.to_string(),
106            })
107        })?;
108        let schema_name = quote(schema_override.unwrap_or(&schema.name));
109        let table_name = quote(&t.name);
110        let full_name = format!("{}.{}", schema_name, table_name);
111
112        let cols = columns_by_table
113            .get(t.id.as_str())
114            .map(|v| v.as_slice())
115            .unwrap_or(&[]);
116        let mut col_defs: Vec<String> = Vec::new();
117        for c in cols {
118            let typ = dialect.ddl_type(&parse_canonical(&c.type_));
119            let mut def = format!("{} {}", quote(&c.name), typ);
120            if !c.nullable {
121                def.push_str(" NOT NULL");
122            }
123            if let Some(ref d) = c.default {
124                def.push_str(" DEFAULT ");
125                match d {
126                    ColumnDefaultConfig::Literal(s) => def.push_str(s),
127                    ColumnDefaultConfig::Expression { expression } => def.push_str(expression),
128                }
129            }
130            col_defs.push(def);
131        }
132
133        let config_col_names: HashSet<&str> = cols.iter().map(|c| c.name.as_str()).collect();
134        let ts_default = format!(
135            "{} NOT NULL DEFAULT {}",
136            dialect.sys_timestamp_type(),
137            dialect.now_fn()
138        );
139        let ts_nullable = dialect.sys_timestamp_type().to_string();
140        for (name, def_suffix) in [
141            ("created_at", ts_default.as_str()),
142            ("updated_at", ts_default.as_str()),
143            ("archived_at", ts_nullable.as_str()),
144            ("created_by", "TEXT"),
145            ("updated_by", "TEXT"),
146        ] {
147            if !config_col_names.contains(name) {
148                col_defs.push(format!("{} {}", quote(name), def_suffix));
149            }
150        }
151
152        let pk_cols = match &t.primary_key {
153            PrimaryKeyConfig::Single(s) => vec![quote(s)],
154            PrimaryKeyConfig::Composite(v) => v.iter().map(|s| quote(s)).collect::<Vec<_>>(),
155        };
156        let pk_def = format!("PRIMARY KEY ({})", pk_cols.join(", "));
157        col_defs.push(pk_def);
158
159        for u in &t.unique {
160            let cols: Vec<String> = u.iter().map(|s| quote(s)).collect();
161            col_defs.push(format!("UNIQUE ({})", cols.join(", ")));
162        }
163        for ch in &t.check {
164            col_defs.push(format!(
165                "CONSTRAINT {} CHECK ({})",
166                quote(&ch.name),
167                ch.expression
168            ));
169        }
170
171        let sql = format!(
172            "CREATE TABLE IF NOT EXISTS {} (\n  {}\n)",
173            full_name,
174            col_defs.join(",\n  ")
175        );
176        sqlx::query(&sql).execute(pool).await?;
177
178        if t.audit_log {
179            let schema_raw = schema_override.unwrap_or(&schema.name);
180            let audit_sql = audit_table_ddl(schema_raw, &t.name, cols, dialect);
181            sqlx::query(&audit_sql).execute(pool).await?;
182            let pk_col = match &t.primary_key {
183                PrimaryKeyConfig::Single(s) => s.clone(),
184                PrimaryKeyConfig::Composite(v) => v[0].clone(),
185            };
186            let audit_full = format!(
187                "{}.{}",
188                quote(schema_raw),
189                quote(&format!("{}_audit", t.name))
190            );
191            let idx_sql = format!(
192                "CREATE INDEX IF NOT EXISTS {} ON {} ({}, {})",
193                quote(&format!("{}_audit_record_idx", t.name)),
194                audit_full,
195                quote(&pk_col),
196                quote("audit_at")
197            );
198            let _ = sqlx::query(&idx_sql).execute(pool).await;
199        }
200
201        if let Some(col) = rls_tenant_column {
202            if dialect.supports_rls() {
203                if !config_col_names.contains(col) {
204                    let q_col = quote(col);
205                    let add_col = format!(
206                        "ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} TEXT",
207                        full_name, q_col
208                    );
209                    sqlx::query(&add_col).execute(pool).await?;
210                }
211                let enable_rls = format!("ALTER TABLE {} ENABLE ROW LEVEL SECURITY", full_name);
212                sqlx::query(&enable_rls).execute(pool).await?;
213                let q_col = quote(col);
214                let setting = "current_setting('app.tenant_id', true)";
215                let cond = format!("{} = {}", q_col, setting);
216                let policy_prefix = format!("rls_tenant_{}", t.name);
217                let policies: &[(&str, &str, Option<&str>, Option<&str>)] = &[
218                    ("select", "SELECT", Some(cond.as_str()), None),
219                    ("insert", "INSERT", None, Some(cond.as_str())),
220                    ("update", "UPDATE", Some(cond.as_str()), Some(cond.as_str())),
221                    ("delete", "DELETE", Some(cond.as_str()), None),
222                ];
223                for (suffix, cmd, using_cond, with_check) in policies.iter() {
224                    let policy_name = format!("{}_{}", policy_prefix, suffix);
225                    let drop_sql = format!(
226                        "DROP POLICY IF EXISTS {} ON {}",
227                        quote(&policy_name),
228                        full_name
229                    );
230                    let _ = sqlx::query(&drop_sql).execute(pool).await;
231                    let create_sql = match (using_cond, with_check) {
232                        (Some(u), Some(w)) => format!(
233                            "CREATE POLICY {} ON {} FOR {} USING ( {} ) WITH CHECK ( {} )",
234                            quote(&policy_name),
235                            full_name,
236                            cmd,
237                            u,
238                            w
239                        ),
240                        (Some(u), None) => format!(
241                            "CREATE POLICY {} ON {} FOR {} USING ( {} )",
242                            quote(&policy_name),
243                            full_name,
244                            cmd,
245                            u
246                        ),
247                        (None, Some(w)) => format!(
248                            "CREATE POLICY {} ON {} FOR {} WITH CHECK ( {} )",
249                            quote(&policy_name),
250                            full_name,
251                            cmd,
252                            w
253                        ),
254                        (None, None) => continue,
255                    };
256                    sqlx::query(&create_sql).execute(pool).await?;
257                }
258            } else {
259                tracing::warn!(table = %full_name, dialect = %dialect.name(), "RLS requested but not supported by this dialect; skipping");
260            }
261        }
262    }
263
264    for idx in &config.indexes {
265        let sid = idx.schema_id.as_deref().unwrap_or(default_sid);
266        let schema = schemas_by_id.get(sid).ok_or_else(|| {
267            AppError::Config(crate::error::ConfigError::MissingReference {
268                kind: "schema",
269                id: sid.to_string(),
270            })
271        })?;
272        let table = tables_by_id.get(idx.table_id.as_str()).ok_or_else(|| {
273            AppError::Config(crate::error::ConfigError::MissingReference {
274                kind: "table",
275                id: idx.table_id.clone(),
276            })
277        })?;
278        let schema_name = quote(schema_override.unwrap_or(&schema.name));
279        let table_name = quote(&table.name);
280        let full_table = format!("{}.{}", schema_name, table_name);
281        let index_name = quote(&idx.name);
282
283        let mut col_parts: Vec<String> = Vec::new();
284        for col in &idx.columns {
285            match col {
286                IndexColumnEntry::Name(n) => col_parts.push(quote(n)),
287                IndexColumnEntry::Spec {
288                    name, direction, ..
289                } => {
290                    let dir = direction
291                        .as_deref()
292                        .map(|d| format!(" {}", d.to_uppercase()))
293                        .unwrap_or_default();
294                    col_parts.push(format!("{}{}", quote(name), dir));
295                }
296                IndexColumnEntry::Expression { expression } => col_parts.push(expression.clone()),
297            }
298        }
299        let method = idx.method.as_deref().unwrap_or("btree");
300        let unique = if idx.unique { "UNIQUE " } else { "" };
301        let include: String = if idx.include.is_empty() {
302            String::new()
303        } else {
304            let inc: Vec<String> = idx.include.iter().map(|s| quote(s)).collect();
305            format!(" INCLUDE ({})", inc.join(", "))
306        };
307        let where_clause: String = idx
308            .where_
309            .as_ref()
310            .map(|w| format!(" WHERE {}", w))
311            .unwrap_or_default();
312
313        let sql = format!(
314            "CREATE {}INDEX IF NOT EXISTS {} ON {} USING {} ({}){}{}",
315            unique,
316            index_name,
317            full_table,
318            method,
319            col_parts.join(", "),
320            include,
321            where_clause
322        );
323        let _ = sqlx::query(&sql).execute(pool).await;
324    }
325
326    for rel in &config.relationships {
327        let from_sid = rel.from_schema_id.as_str();
328        let to_sid = rel.to_schema_id.as_str();
329        let from_schema = schemas_by_id.get(from_sid).ok_or_else(|| {
330            AppError::Config(crate::error::ConfigError::MissingReference {
331                kind: "schema",
332                id: from_sid.to_string(),
333            })
334        })?;
335        let from_table = tables_by_id
336            .get(rel.from_table_id.as_str())
337            .ok_or_else(|| {
338                AppError::Config(crate::error::ConfigError::MissingReference {
339                    kind: "table",
340                    id: rel.from_table_id.clone(),
341                })
342            })?;
343        let to_schema = schemas_by_id.get(to_sid).ok_or_else(|| {
344            AppError::Config(crate::error::ConfigError::MissingReference {
345                kind: "schema",
346                id: to_sid.to_string(),
347            })
348        })?;
349        let to_table = tables_by_id.get(rel.to_table_id.as_str()).ok_or_else(|| {
350            AppError::Config(crate::error::ConfigError::MissingReference {
351                kind: "table",
352                id: rel.to_table_id.clone(),
353            })
354        })?;
355
356        let from_schema_name = schema_override.unwrap_or(&from_schema.name);
357        let to_schema_name = schema_override.unwrap_or(&to_schema.name);
358
359        let from_col = config
360            .columns
361            .iter()
362            .find(|c| c.id == rel.from_column_id)
363            .map(|c| c.name.as_str())
364            .ok_or_else(|| {
365                AppError::Config(crate::error::ConfigError::MissingReference {
366                    kind: "column",
367                    id: rel.from_column_id.clone(),
368                })
369            })?;
370        let to_col = config
371            .columns
372            .iter()
373            .find(|c| c.id == rel.to_column_id)
374            .map(|c| c.name.as_str())
375            .ok_or_else(|| {
376                AppError::Config(crate::error::ConfigError::MissingReference {
377                    kind: "column",
378                    id: rel.to_column_id.clone(),
379                })
380            })?;
381
382        let from_full = format!("{}.{}", quote(from_schema_name), quote(&from_table.name));
383        let to_full = format!("{}.{}", quote(to_schema_name), quote(&to_table.name));
384        let constraint_name = rel.name.as_deref().unwrap_or(&rel.id);
385        let on_update = rel.on_update.as_deref().unwrap_or("NO ACTION");
386        let on_delete = rel.on_delete.as_deref().unwrap_or("NO ACTION");
387
388        let sql = format!(
389            "ALTER TABLE {} ADD CONSTRAINT {} FOREIGN KEY ({}) REFERENCES {} ({}) ON UPDATE {} ON DELETE {}",
390            from_full,
391            quote(constraint_name),
392            quote(from_col),
393            to_full,
394            quote(to_col),
395            on_update,
396            on_delete
397        );
398        let _ = sqlx::query(&sql).execute(pool).await;
399    }
400
401    Ok(())
402}
403
404/// Revert migrations for a package: drop tables, enum types, and schema (if not public) in reverse order of apply.
405/// Uses the same schema_override as apply_migrations (tables/enums live in that schema).
406pub async fn revert_migrations(
407    pool: &Pool,
408    config: &FullConfig,
409    schema_override: Option<&str>,
410) -> Result<(), AppError> {
411    let default_sid = config
412        .schemas
413        .first()
414        .map(|s| s.id.as_str())
415        .ok_or_else(|| {
416            AppError::Config(crate::error::ConfigError::Validation(
417                "at least one schema required".into(),
418            ))
419        })?;
420
421    let schemas_by_id: HashMap<_, _> = config.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
422
423    // 1. Drop tables (CASCADE drops FKs and dependent objects)
424    for t in &config.tables {
425        let sid = t.schema_id.as_deref().unwrap_or(default_sid);
426        let schema = schemas_by_id.get(sid).ok_or_else(|| {
427            AppError::Config(crate::error::ConfigError::MissingReference {
428                kind: "schema",
429                id: sid.to_string(),
430            })
431        })?;
432        let schema_raw = schema_override.unwrap_or(&schema.name);
433        let schema_name = quote(schema_raw);
434        let table_name = quote(&t.name);
435        let full_name = format!("{}.{}", schema_name, table_name);
436        if t.audit_log {
437            let audit_full = format!("{}.{}", schema_name, quote(&format!("{}_audit", t.name)));
438            let _ = sqlx::query(&format!("DROP TABLE IF EXISTS {} CASCADE", audit_full))
439                .execute(pool)
440                .await;
441        }
442        let drop_sql = format!("DROP TABLE IF EXISTS {} CASCADE", full_name);
443        let _ = sqlx::query(&drop_sql).execute(pool).await;
444    }
445
446    // 2. Drop enum types
447    for e in &config.enums {
448        let sid = e.schema_id.as_deref().unwrap_or(default_sid);
449        let schema = schemas_by_id.get(sid).ok_or_else(|| {
450            AppError::Config(crate::error::ConfigError::MissingReference {
451                kind: "schema",
452                id: sid.to_string(),
453            })
454        })?;
455        let schema_name = quote(schema_override.unwrap_or(&schema.name));
456        let type_name = quote(&e.name);
457        let drop_sql = format!("DROP TYPE IF EXISTS {}.{} CASCADE", schema_name, type_name);
458        let _ = sqlx::query(&drop_sql).execute(pool).await;
459    }
460
461    // 3. Drop schema only if not public (shared schema)
462    if schema_override.is_none() {
463        for s in &config.schemas {
464            if s.name.eq_ignore_ascii_case("public") {
465                continue;
466            }
467            let schema_name = quote(&s.name);
468            let drop_sql = format!("DROP SCHEMA IF EXISTS {} CASCADE", schema_name);
469            let _ = sqlx::query(&drop_sql).execute(pool).await;
470        }
471    }
472
473    Ok(())
474}
475
476// ─── Migration plan types ────────────────────────────────────────────────────
477
478#[derive(Debug, Clone, Serialize, Deserialize)]
479#[serde(rename_all = "snake_case")]
480pub enum MigrationOperation {
481    CreateSchema,
482    CreateEnum,
483    DropEnum,
484    AddEnumValue,
485    RemoveEnumValue,
486    CreateTable,
487    DropTable,
488    AddColumn,
489    DropColumn,
490    RenameColumn,
491    AlterColumnType,
492    BackfillNulls,
493    SetNotNull,
494    DropNotNull,
495    SetDefault,
496    DropDefault,
497    CreateIndex,
498    DropIndex,
499    AddForeignKey,
500    DropForeignKey,
501}
502
503impl std::fmt::Display for MigrationOperation {
504    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505        let s = serde_json::to_value(self)
506            .ok()
507            .and_then(|v| v.as_str().map(String::from))
508            .unwrap_or_else(|| format!("{:?}", self));
509        write!(f, "{}", s)
510    }
511}
512
513/// How safely a migration step can be executed.
514#[derive(Debug, Clone, Serialize, Deserialize)]
515#[serde(rename_all = "snake_case")]
516pub enum MigrationSafety {
517    /// Guaranteed to succeed, no data impact.
518    Safe,
519    /// Attempted; execution failure is captured as a warning instead of aborting.
520    BestEffort,
521    /// No DDL generated — config change noted as a warning only (e.g. removed tables/columns).
522    WarnOnly,
523}
524
525/// Risk category associated with a migration step.
526#[derive(Debug, Clone, Serialize, Deserialize)]
527#[serde(rename_all = "snake_case")]
528pub enum MigrationRisk {
529    None,
530    /// Cast may fail for incompatible values (e.g. TEXT → INTEGER).
531    MayFail,
532    /// SET NOT NULL will fail if any existing row has NULL in this column.
533    ExistingNullsMustBeAbsent,
534    /// Existing NULL rows will be overwritten with the column default.
535    DataWillBeModified,
536    /// Cannot be automated — requires a manual database action.
537    ManualActionRequired,
538}
539
540/// One step in a migration plan: a DDL statement with metadata.
541#[derive(Debug, Clone, Serialize, Deserialize)]
542pub struct MigrationStep {
543    pub step: usize,
544    pub operation: MigrationOperation,
545    pub schema: String,
546    pub table: Option<String>,
547    /// Column name, index name, FK constraint name, enum name, etc.
548    pub object: String,
549    /// "column" | "table" | "index" | "foreign_key" | "enum" | "enum_value" | "schema"
550    pub object_type: String,
551    pub description: String,
552    /// The SQL to execute. None for WarnOnly steps.
553    pub ddl: Option<String>,
554    pub safety: MigrationSafety,
555    pub risk: MigrationRisk,
556    pub risk_detail: Option<String>,
557}
558
559/// Computed diff between two package versions expressed as ordered migration steps.
560#[derive(Debug, Clone, Serialize, Deserialize)]
561pub struct MigrationPlan {
562    pub steps: Vec<MigrationStep>,
563}
564
565#[derive(Debug, Clone, Serialize)]
566pub struct MigrationSummary {
567    pub total: usize,
568    pub safe: usize,
569    pub best_effort: usize,
570    pub warn_only: usize,
571}
572
573impl MigrationPlan {
574    pub fn summary(&self) -> MigrationSummary {
575        let (mut safe, mut best_effort, mut warn_only) = (0, 0, 0);
576        for s in &self.steps {
577            match s.safety {
578                MigrationSafety::Safe => safe += 1,
579                MigrationSafety::BestEffort => best_effort += 1,
580                MigrationSafety::WarnOnly => warn_only += 1,
581            }
582        }
583        MigrationSummary {
584            total: self.steps.len(),
585            safe,
586            best_effort,
587            warn_only,
588        }
589    }
590}
591
592/// Result returned by `execute_migration_plan`.
593pub struct MigrationExecutionResult {
594    pub applied: usize,
595    pub warned: usize,
596    pub warnings: Vec<String>,
597}
598
599fn default_str(d: &ColumnDefaultConfig) -> String {
600    match d {
601        ColumnDefaultConfig::Literal(s) => s.clone(),
602        ColumnDefaultConfig::Expression { expression } => expression.clone(),
603    }
604}
605
606// ─── compute_migration_plan ──────────────────────────────────────────────────
607
608/// Diff two package configs and produce an ordered list of migration steps.
609/// This is a pure function — it does not touch the database.
610/// Pass the result to `execute_migration_plan` after user confirmation.
611pub fn compute_migration_plan(
612    old: &FullConfig,
613    new: &FullConfig,
614    schema_override: Option<&str>,
615    _rls_tenant_column: Option<&str>,
616    dialect: &dyn Dialect,
617) -> Result<MigrationPlan, AppError> {
618    validate(new)?;
619
620    let default_old_sid = old.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
621    let default_new_sid = new.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
622
623    let old_schemas: HashMap<&str, &SchemaConfig> =
624        old.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
625    let new_schemas: HashMap<&str, &SchemaConfig> =
626        new.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
627    let old_tables: HashMap<&str, &TableConfig> =
628        old.tables.iter().map(|t| (t.id.as_str(), t)).collect();
629    let new_tables: HashMap<&str, &TableConfig> =
630        new.tables.iter().map(|t| (t.id.as_str(), t)).collect();
631    let old_columns: HashMap<&str, &ColumnConfig> =
632        old.columns.iter().map(|c| (c.id.as_str(), c)).collect();
633    let old_enums: HashMap<&str, &EnumConfig> =
634        old.enums.iter().map(|e| (e.id.as_str(), e)).collect();
635    let new_enums: HashMap<&str, &EnumConfig> =
636        new.enums.iter().map(|e| (e.id.as_str(), e)).collect();
637    let old_indexes: HashMap<&str, &IndexConfig> =
638        old.indexes.iter().map(|i| (i.id.as_str(), i)).collect();
639    let new_indexes: HashMap<&str, &IndexConfig> =
640        new.indexes.iter().map(|i| (i.id.as_str(), i)).collect();
641    let old_rels: HashMap<&str, &RelationshipConfig> = old
642        .relationships
643        .iter()
644        .map(|r| (r.id.as_str(), r))
645        .collect();
646    let new_rels: HashMap<&str, &RelationshipConfig> = new
647        .relationships
648        .iter()
649        .map(|r| (r.id.as_str(), r))
650        .collect();
651
652    let mut steps: Vec<MigrationStep> = Vec::new();
653
654    let schema_name_for = |sid: &str, schemas: &HashMap<&str, &SchemaConfig>| -> String {
655        schema_override.map(String::from).unwrap_or_else(|| {
656            schemas
657                .get(sid)
658                .map(|s| s.name.clone())
659                .unwrap_or_else(|| sid.to_string())
660        })
661    };
662
663    // ── 1. New schemas ───────────────────────────────────────────────────────
664    if schema_override.is_none() {
665        for s in &new.schemas {
666            if !old_schemas.contains_key(s.id.as_str()) {
667                steps.push(MigrationStep {
668                    step: 0,
669                    operation: MigrationOperation::CreateSchema,
670                    schema: s.name.clone(),
671                    table: None,
672                    object: s.name.clone(),
673                    object_type: "schema".into(),
674                    description: format!("Create schema \"{}\"", s.name),
675                    ddl: Some(format!("CREATE SCHEMA IF NOT EXISTS {}", quote(&s.name))),
676                    safety: MigrationSafety::Safe,
677                    risk: MigrationRisk::None,
678                    risk_detail: None,
679                });
680            }
681        }
682    }
683
684    // ── 2. Enums ─────────────────────────────────────────────────────────────
685    for new_enum in &new.enums {
686        let sid = new_enum.schema_id.as_deref().unwrap_or(default_new_sid);
687        let schema = schema_name_for(sid, &new_schemas);
688
689        if let Some(old_enum) = old_enums.get(new_enum.id.as_str()) {
690            let old_vals: HashSet<&str> = old_enum.values.iter().map(String::as_str).collect();
691            let new_vals: HashSet<&str> = new_enum.values.iter().map(String::as_str).collect();
692            for val in new_enum
693                .values
694                .iter()
695                .map(String::as_str)
696                .filter(|v| !old_vals.contains(v))
697            {
698                steps.push(MigrationStep {
699                    step: 0,
700                    operation: MigrationOperation::AddEnumValue,
701                    schema: schema.clone(),
702                    table: None,
703                    object: format!("{}:{}", new_enum.name, val),
704                    object_type: "enum_value".into(),
705                    description: format!(
706                        "Add value '{}' to enum \"{}\".\"{}\"",
707                        val, schema, new_enum.name
708                    ),
709                    ddl: Some(format!(
710                        "ALTER TYPE {}.{} ADD VALUE IF NOT EXISTS '{}'",
711                        quote(&schema),
712                        quote(&new_enum.name),
713                        val.replace('\'', "''")
714                    )),
715                    safety: MigrationSafety::Safe,
716                    risk: MigrationRisk::None,
717                    risk_detail: None,
718                });
719            }
720            for val in old_enum
721                .values
722                .iter()
723                .map(String::as_str)
724                .filter(|v| !new_vals.contains(v))
725            {
726                steps.push(MigrationStep {
727                    step: 0,
728                    operation: MigrationOperation::RemoveEnumValue,
729                    schema: schema.clone(),
730                    table: None,
731                    object: format!("{}:{}", new_enum.name, val),
732                    object_type: "enum_value".into(),
733                    description: format!("Enum value '{}' removed from config on \"{}\".\"{}\"", val, schema, new_enum.name),
734                    ddl: None,
735                    safety: MigrationSafety::WarnOnly,
736                    risk: MigrationRisk::ManualActionRequired,
737                    risk_detail: Some(format!(
738                        "PostgreSQL does not support removing enum values. '{}' was removed from config but NOT from the database type. Recreate the type manually if needed.",
739                        val
740                    )),
741                });
742            }
743        } else {
744            let values: Vec<String> = new_enum
745                .values
746                .iter()
747                .map(|v| format!("'{}'", v.replace('\'', "''")))
748                .collect();
749            steps.push(MigrationStep {
750                step: 0,
751                operation: MigrationOperation::CreateEnum,
752                schema: schema.clone(),
753                table: None,
754                object: new_enum.name.clone(),
755                object_type: "enum".into(),
756                description: format!("Create enum type \"{}\".\"{}\"", schema, new_enum.name),
757                ddl: Some(format!("CREATE TYPE {}.{} AS ENUM ({})", quote(&schema), quote(&new_enum.name), values.join(", "))),
758                safety: MigrationSafety::BestEffort,
759                risk: MigrationRisk::None,
760                risk_detail: Some("PostgreSQL has no CREATE TYPE IF NOT EXISTS; ignored if the type already exists.".into()),
761            });
762        }
763    }
764    for old_enum in &old.enums {
765        if !new_enums.contains_key(old_enum.id.as_str()) {
766            let sid = old_enum.schema_id.as_deref().unwrap_or(default_old_sid);
767            let schema = schema_name_for(sid, &old_schemas);
768            steps.push(MigrationStep {
769                step: 0,
770                operation: MigrationOperation::DropEnum,
771                schema: schema.clone(),
772                table: None,
773                object: old_enum.name.clone(),
774                object_type: "enum".into(),
775                description: format!("Enum \"{}\".\"{}\" removed from config", schema, old_enum.name),
776                ddl: None,
777                safety: MigrationSafety::WarnOnly,
778                risk: MigrationRisk::ManualActionRequired,
779                risk_detail: Some("Enum type NOT dropped from database (data safety). Run DROP TYPE manually if intended.".into()),
780            });
781        }
782    }
783
784    // ── 3. New and removed tables ────────────────────────────────────────────
785    let added_table_ids: HashSet<&str> = new
786        .tables
787        .iter()
788        .filter(|t| !old_tables.contains_key(t.id.as_str()))
789        .map(|t| t.id.as_str())
790        .collect();
791
792    let cols_by_table: HashMap<&str, Vec<&ColumnConfig>> =
793        new.columns.iter().fold(HashMap::new(), |mut m, c| {
794            m.entry(c.table_id.as_str()).or_default().push(c);
795            m
796        });
797
798    for new_table in &new.tables {
799        if !added_table_ids.contains(new_table.id.as_str()) {
800            continue;
801        }
802        let sid = new_table.schema_id.as_deref().unwrap_or(default_new_sid);
803        let schema = schema_name_for(sid, &new_schemas);
804        let full = format!("{}.{}", quote(&schema), quote(&new_table.name));
805
806        let cols = cols_by_table
807            .get(new_table.id.as_str())
808            .map(|v| v.as_slice())
809            .unwrap_or(&[]);
810        let mut col_defs: Vec<String> = Vec::new();
811        for c in cols {
812            let typ = dialect.ddl_type(&parse_canonical(&c.type_));
813            let mut def = format!("{} {}", quote(&c.name), typ);
814            if !c.nullable {
815                def.push_str(" NOT NULL");
816            }
817            if let Some(ref d) = c.default {
818                def.push_str(" DEFAULT ");
819                match d {
820                    ColumnDefaultConfig::Literal(s) => def.push_str(s),
821                    ColumnDefaultConfig::Expression { expression } => def.push_str(expression),
822                }
823            }
824            col_defs.push(def);
825        }
826        let cfg_col_names: HashSet<&str> = cols.iter().map(|c| c.name.as_str()).collect();
827        // Note: compute_migration_plan is a pure DDL-generation function; timestamp strings are
828        // embedded in the DDL output for display/execution. We use postgres-compatible strings
829        // here since the plan is always applied to a real DB via execute_migration_plan which
830        // uses the dialect there. If dialect-awareness is needed here in future, pass dialect in.
831        for (name, suf) in [
832            ("created_at", "TIMESTAMPTZ NOT NULL DEFAULT NOW()"),
833            ("updated_at", "TIMESTAMPTZ NOT NULL DEFAULT NOW()"),
834            ("archived_at", "TIMESTAMPTZ"),
835            ("created_by", "TEXT"),
836            ("updated_by", "TEXT"),
837        ] {
838            if !cfg_col_names.contains(name) {
839                col_defs.push(format!("{} {}", quote(name), suf));
840            }
841        }
842        let pk_cols = match &new_table.primary_key {
843            PrimaryKeyConfig::Single(s) => vec![quote(s)],
844            PrimaryKeyConfig::Composite(v) => v.iter().map(|s| quote(s)).collect(),
845        };
846        col_defs.push(format!("PRIMARY KEY ({})", pk_cols.join(", ")));
847        for u in &new_table.unique {
848            col_defs.push(format!(
849                "UNIQUE ({})",
850                u.iter().map(|s| quote(s)).collect::<Vec<_>>().join(", ")
851            ));
852        }
853        for ch in &new_table.check {
854            col_defs.push(format!(
855                "CONSTRAINT {} CHECK ({})",
856                quote(&ch.name),
857                ch.expression
858            ));
859        }
860
861        steps.push(MigrationStep {
862            step: 0,
863            operation: MigrationOperation::CreateTable,
864            schema: schema.clone(),
865            table: Some(new_table.name.clone()),
866            object: new_table.name.clone(),
867            object_type: "table".into(),
868            description: format!("Create table \"{}\".\"{}\"", schema, new_table.name),
869            ddl: Some(format!(
870                "CREATE TABLE IF NOT EXISTS {} (\n  {}\n)",
871                full,
872                col_defs.join(",\n  ")
873            )),
874            safety: MigrationSafety::Safe,
875            risk: MigrationRisk::None,
876            risk_detail: None,
877        });
878        if new_table.audit_log {
879            let audit_ddl = audit_table_ddl(&schema, &new_table.name, cols, dialect);
880            steps.push(MigrationStep {
881                step: 0,
882                operation: MigrationOperation::CreateTable,
883                schema: schema.clone(),
884                table: Some(format!("{}_audit", new_table.name)),
885                object: format!("{}_audit", new_table.name),
886                object_type: "table".into(),
887                description: format!(
888                    "Create audit table \"{}\".\"{}_audit\"",
889                    schema, new_table.name
890                ),
891                ddl: Some(audit_ddl),
892                safety: MigrationSafety::Safe,
893                risk: MigrationRisk::None,
894                risk_detail: None,
895            });
896        }
897    }
898
899    // Existing tables that gained audit_log
900    for new_table in &new.tables {
901        if added_table_ids.contains(new_table.id.as_str()) {
902            continue;
903        }
904        if let Some(old_table) = old_tables.get(new_table.id.as_str()) {
905            if !old_table.audit_log && new_table.audit_log {
906                let sid = new_table.schema_id.as_deref().unwrap_or(default_new_sid);
907                let schema = schema_name_for(sid, &new_schemas);
908                let cols = cols_by_table
909                    .get(new_table.id.as_str())
910                    .map(|v| v.as_slice())
911                    .unwrap_or(&[]);
912                let audit_ddl = audit_table_ddl(&schema, &new_table.name, cols, dialect);
913                steps.push(MigrationStep {
914                    step: 0,
915                    operation: MigrationOperation::CreateTable,
916                    schema: schema.clone(),
917                    table: Some(format!("{}_audit", new_table.name)),
918                    object: format!("{}_audit", new_table.name),
919                    object_type: "table".into(),
920                    description: format!(
921                        "Enable audit log: create \"{}\".\"{}_audit\"",
922                        schema, new_table.name
923                    ),
924                    ddl: Some(audit_ddl),
925                    safety: MigrationSafety::Safe,
926                    risk: MigrationRisk::None,
927                    risk_detail: None,
928                });
929            }
930        }
931    }
932
933    for old_table in &old.tables {
934        if !new_tables.contains_key(old_table.id.as_str()) {
935            let sid = old_table.schema_id.as_deref().unwrap_or(default_old_sid);
936            let schema = schema_name_for(sid, &old_schemas);
937            steps.push(MigrationStep {
938                step: 0,
939                operation: MigrationOperation::DropTable,
940                schema: schema.clone(),
941                table: Some(old_table.name.clone()),
942                object: old_table.name.clone(),
943                object_type: "table".into(),
944                description: format!("Table \"{}\".\"{}\" removed from config", schema, old_table.name),
945                ddl: None,
946                safety: MigrationSafety::WarnOnly,
947                risk: MigrationRisk::ManualActionRequired,
948                risk_detail: Some("Table NOT dropped from database (data safety). Run DROP TABLE manually if intended.".into()),
949            });
950        }
951    }
952
953    // ── 4. Column changes for existing tables ────────────────────────────────
954    for new_col in &new.columns {
955        if added_table_ids.contains(new_col.table_id.as_str()) {
956            continue;
957        }
958        let table = match new_tables.get(new_col.table_id.as_str()) {
959            Some(t) => t,
960            None => continue,
961        };
962        let sid = table.schema_id.as_deref().unwrap_or(default_new_sid);
963        let schema = schema_name_for(sid, &new_schemas);
964        let full = format!("{}.{}", quote(&schema), quote(&table.name));
965
966        if let Some(old_col) = old_columns.get(new_col.id.as_str()) {
967            if old_col.table_id != new_col.table_id {
968                steps.push(MigrationStep {
969                    step: 0,
970                    operation: MigrationOperation::AddColumn,
971                    schema: schema.clone(),
972                    table: Some(table.name.clone()),
973                    object: new_col.name.clone(),
974                    object_type: "column".into(),
975                    description: format!("Column \"{}\" (id: {}) appears to have moved tables — manual migration required", new_col.name, new_col.id),
976                    ddl: None,
977                    safety: MigrationSafety::WarnOnly,
978                    risk: MigrationRisk::ManualActionRequired,
979                    risk_detail: Some(format!("Cannot automate column move from table {} to {}.", old_col.table_id, new_col.table_id)),
980                });
981                continue;
982            }
983
984            // Rename
985            if old_col.name != new_col.name {
986                steps.push(MigrationStep {
987                    step: 0,
988                    operation: MigrationOperation::RenameColumn,
989                    schema: schema.clone(),
990                    table: Some(table.name.clone()),
991                    object: new_col.name.clone(),
992                    object_type: "column".into(),
993                    description: format!(
994                        "Rename column \"{}\" → \"{}\" on \"{}\".\"{}\"",
995                        old_col.name, new_col.name, schema, table.name
996                    ),
997                    ddl: Some(format!(
998                        "ALTER TABLE {} RENAME COLUMN {} TO {}",
999                        full,
1000                        quote(&old_col.name),
1001                        quote(&new_col.name)
1002                    )),
1003                    safety: MigrationSafety::Safe,
1004                    risk: MigrationRisk::None,
1005                    risk_detail: None,
1006                });
1007            }
1008
1009            // Type change
1010            let old_type = dialect.ddl_type(&parse_canonical(&old_col.type_));
1011            let new_type = dialect.ddl_type(&parse_canonical(&new_col.type_));
1012            if old_type.to_uppercase() != new_type.to_uppercase() {
1013                let col_name = &new_col.name;
1014                steps.push(MigrationStep {
1015                    step: 0,
1016                    operation: MigrationOperation::AlterColumnType,
1017                    schema: schema.clone(),
1018                    table: Some(table.name.clone()),
1019                    object: col_name.clone(),
1020                    object_type: "column".into(),
1021                    description: format!("Change type of \"{}\".\"{}\".\"{}\": {} → {}", schema, table.name, col_name, old_type, new_type),
1022                    ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} TYPE {} USING {}::{}", full, quote(col_name), new_type, quote(col_name), new_type)),
1023                    safety: MigrationSafety::BestEffort,
1024                    risk: MigrationRisk::MayFail,
1025                    risk_detail: Some(format!("USING {}::{} cast may fail for incompatible values. Provide a custom USING expression if needed.", col_name, new_type)),
1026                });
1027            }
1028
1029            // Nullability: nullable → NOT NULL
1030            if old_col.nullable && !new_col.nullable {
1031                if let Some(ref d) = new_col.default {
1032                    let default_val = default_str(d);
1033                    // Backfill NULLs first using the configured default
1034                    steps.push(MigrationStep {
1035                        step: 0,
1036                        operation: MigrationOperation::BackfillNulls,
1037                        schema: schema.clone(),
1038                        table: Some(table.name.clone()),
1039                        object: new_col.name.clone(),
1040                        object_type: "column".into(),
1041                        description: format!("Backfill NULLs in \"{}\".\"{}\".\"{}\": SET {} = {} WHERE {} IS NULL", schema, table.name, new_col.name, new_col.name, default_val, new_col.name),
1042                        ddl: Some(format!("UPDATE {} SET {} = {} WHERE {} IS NULL", full, quote(&new_col.name), default_val, quote(&new_col.name))),
1043                        safety: MigrationSafety::Safe,
1044                        risk: MigrationRisk::DataWillBeModified,
1045                        risk_detail: Some(format!("Existing NULLs in column \"{}\" will be set to {} before NOT NULL is enforced.", new_col.name, default_val)),
1046                    });
1047                    // Then set NOT NULL — safe because NULLs are gone
1048                    steps.push(MigrationStep {
1049                        step: 0,
1050                        operation: MigrationOperation::SetNotNull,
1051                        schema: schema.clone(),
1052                        table: Some(table.name.clone()),
1053                        object: new_col.name.clone(),
1054                        object_type: "column".into(),
1055                        description: format!("Set NOT NULL on \"{}\".\"{}\".\"{}\": NULLs pre-filled with default ({})", schema, table.name, new_col.name, default_val),
1056                        ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} SET NOT NULL", full, quote(&new_col.name))),
1057                        safety: MigrationSafety::Safe,
1058                        risk: MigrationRisk::None,
1059                        risk_detail: None,
1060                    });
1061                } else {
1062                    // No default — best effort; will fail if NULLs exist
1063                    steps.push(MigrationStep {
1064                        step: 0,
1065                        operation: MigrationOperation::SetNotNull,
1066                        schema: schema.clone(),
1067                        table: Some(table.name.clone()),
1068                        object: new_col.name.clone(),
1069                        object_type: "column".into(),
1070                        description: format!("Set NOT NULL on \"{}\".\"{}\".\"{}\": no default configured — will fail if NULLs exist", schema, table.name, new_col.name),
1071                        ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} SET NOT NULL", full, quote(&new_col.name))),
1072                        safety: MigrationSafety::BestEffort,
1073                        risk: MigrationRisk::ExistingNullsMustBeAbsent,
1074                        risk_detail: Some(format!(
1075                            "No default value configured for column \"{}\". Add a default to the config to enable automatic NULL backfill before enforcing NOT NULL.",
1076                            new_col.name
1077                        )),
1078                    });
1079                }
1080            }
1081
1082            // Nullability: NOT NULL → nullable
1083            if !old_col.nullable && new_col.nullable {
1084                steps.push(MigrationStep {
1085                    step: 0,
1086                    operation: MigrationOperation::DropNotNull,
1087                    schema: schema.clone(),
1088                    table: Some(table.name.clone()),
1089                    object: new_col.name.clone(),
1090                    object_type: "column".into(),
1091                    description: format!(
1092                        "Drop NOT NULL on \"{}\".\"{}\".\"{}\": column becomes nullable",
1093                        schema, table.name, new_col.name
1094                    ),
1095                    ddl: Some(format!(
1096                        "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
1097                        full,
1098                        quote(&new_col.name)
1099                    )),
1100                    safety: MigrationSafety::Safe,
1101                    risk: MigrationRisk::None,
1102                    risk_detail: None,
1103                });
1104            }
1105
1106            // Default change
1107            let old_def = old_col.default.as_ref().map(default_str);
1108            let new_def = new_col.default.as_ref().map(default_str);
1109            if old_def != new_def {
1110                match &new_col.default {
1111                    Some(d) => {
1112                        let val = default_str(d);
1113                        steps.push(MigrationStep {
1114                            step: 0,
1115                            operation: MigrationOperation::SetDefault,
1116                            schema: schema.clone(),
1117                            table: Some(table.name.clone()),
1118                            object: new_col.name.clone(),
1119                            object_type: "column".into(),
1120                            description: format!(
1121                                "Set DEFAULT {} on \"{}\".\"{}\".\"{}\": was {}",
1122                                val,
1123                                schema,
1124                                table.name,
1125                                new_col.name,
1126                                old_def.as_deref().unwrap_or("none")
1127                            ),
1128                            ddl: Some(format!(
1129                                "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT {}",
1130                                full,
1131                                quote(&new_col.name),
1132                                val
1133                            )),
1134                            safety: MigrationSafety::Safe,
1135                            risk: MigrationRisk::None,
1136                            risk_detail: None,
1137                        });
1138                    }
1139                    None => {
1140                        steps.push(MigrationStep {
1141                            step: 0,
1142                            operation: MigrationOperation::DropDefault,
1143                            schema: schema.clone(),
1144                            table: Some(table.name.clone()),
1145                            object: new_col.name.clone(),
1146                            object_type: "column".into(),
1147                            description: format!(
1148                                "Drop DEFAULT on \"{}\".\"{}\".\"{}\": was {}",
1149                                schema,
1150                                table.name,
1151                                new_col.name,
1152                                old_def.as_deref().unwrap_or("none")
1153                            ),
1154                            ddl: Some(format!(
1155                                "ALTER TABLE {} ALTER COLUMN {} DROP DEFAULT",
1156                                full,
1157                                quote(&new_col.name)
1158                            )),
1159                            safety: MigrationSafety::Safe,
1160                            risk: MigrationRisk::None,
1161                            risk_detail: None,
1162                        });
1163                    }
1164                }
1165            }
1166        } else {
1167            // New column: ADD COLUMN
1168            let new_type = dialect.ddl_type(&parse_canonical(&new_col.type_));
1169            let mut col_def = format!("{} {}", quote(&new_col.name), new_type);
1170            if !new_col.nullable {
1171                col_def.push_str(" NOT NULL");
1172            }
1173            if let Some(ref d) = new_col.default {
1174                col_def.push_str(" DEFAULT ");
1175                match d {
1176                    ColumnDefaultConfig::Literal(s) => col_def.push_str(s),
1177                    ColumnDefaultConfig::Expression { expression } => col_def.push_str(expression),
1178                }
1179            }
1180            steps.push(MigrationStep {
1181                step: 0,
1182                operation: MigrationOperation::AddColumn,
1183                schema: schema.clone(),
1184                table: Some(table.name.clone()),
1185                object: new_col.name.clone(),
1186                object_type: "column".into(),
1187                description: format!(
1188                    "Add column \"{}\" {} to \"{}\".\"{}\"",
1189                    new_col.name, new_type, schema, table.name
1190                ),
1191                ddl: Some(format!("ALTER TABLE {} ADD COLUMN {}", full, col_def)),
1192                safety: MigrationSafety::Safe,
1193                risk: MigrationRisk::None,
1194                risk_detail: None,
1195            });
1196        }
1197    }
1198
1199    // Removed columns (warn only)
1200    for old_col in &old.columns {
1201        if new.columns.iter().any(|c| c.id == old_col.id) {
1202            continue;
1203        }
1204        if !new_tables.contains_key(old_col.table_id.as_str()) {
1205            continue;
1206        }
1207        let table_name = old_tables
1208            .get(old_col.table_id.as_str())
1209            .map(|t| t.name.as_str())
1210            .unwrap_or(&old_col.table_id);
1211        let sid = old_tables
1212            .get(old_col.table_id.as_str())
1213            .and_then(|t| t.schema_id.as_deref())
1214            .unwrap_or(default_old_sid);
1215        let schema = schema_name_for(sid, &old_schemas);
1216        steps.push(MigrationStep {
1217            step: 0,
1218            operation: MigrationOperation::DropColumn,
1219            schema: schema.clone(),
1220            table: Some(table_name.to_string()),
1221            object: old_col.name.clone(),
1222            object_type: "column".into(),
1223            description: format!("Column \"{}\" removed from config on \"{}\".\"{}\"", old_col.name, schema, table_name),
1224            ddl: None,
1225            safety: MigrationSafety::WarnOnly,
1226            risk: MigrationRisk::ManualActionRequired,
1227            risk_detail: Some("Column NOT dropped from database (data safety). Run ALTER TABLE DROP COLUMN manually if intended.".into()),
1228        });
1229    }
1230
1231    // ── 5. Indexes ───────────────────────────────────────────────────────────
1232    for old_idx in &old.indexes {
1233        if !new_indexes.contains_key(old_idx.id.as_str()) {
1234            let sid = old_idx.schema_id.as_deref().unwrap_or(default_old_sid);
1235            let schema = schema_name_for(sid, &old_schemas);
1236            steps.push(MigrationStep {
1237                step: 0,
1238                operation: MigrationOperation::DropIndex,
1239                schema: schema.clone(),
1240                table: old_tables
1241                    .get(old_idx.table_id.as_str())
1242                    .map(|t| t.name.clone()),
1243                object: old_idx.name.clone(),
1244                object_type: "index".into(),
1245                description: format!("Drop index \"{}\" in schema \"{}\"", old_idx.name, schema),
1246                ddl: Some(format!(
1247                    "DROP INDEX IF EXISTS {}.{}",
1248                    quote(&schema),
1249                    quote(&old_idx.name)
1250                )),
1251                safety: MigrationSafety::Safe,
1252                risk: MigrationRisk::None,
1253                risk_detail: None,
1254            });
1255        }
1256    }
1257    for new_idx in &new.indexes {
1258        if old_indexes.contains_key(new_idx.id.as_str())
1259            || added_table_ids.contains(new_idx.table_id.as_str())
1260        {
1261            continue;
1262        }
1263        let sid = new_idx.schema_id.as_deref().unwrap_or(default_new_sid);
1264        let schema = match new_schemas.get(sid) {
1265            Some(s) => schema_override.unwrap_or(&s.name).to_string(),
1266            None => continue,
1267        };
1268        let table = match new_tables.get(new_idx.table_id.as_str()) {
1269            Some(t) => t,
1270            None => continue,
1271        };
1272        let full_table = format!("{}.{}", quote(&schema), quote(&table.name));
1273        let mut col_parts: Vec<String> = Vec::new();
1274        for col in &new_idx.columns {
1275            match col {
1276                IndexColumnEntry::Name(n) => col_parts.push(quote(n)),
1277                IndexColumnEntry::Spec {
1278                    name, direction, ..
1279                } => {
1280                    let dir = direction
1281                        .as_deref()
1282                        .map(|d| format!(" {}", d.to_uppercase()))
1283                        .unwrap_or_default();
1284                    col_parts.push(format!("{}{}", quote(name), dir));
1285                }
1286                IndexColumnEntry::Expression { expression } => col_parts.push(expression.clone()),
1287            }
1288        }
1289        let method = new_idx.method.as_deref().unwrap_or("btree");
1290        let unique_kw = if new_idx.unique { "UNIQUE " } else { "" };
1291        let include = if new_idx.include.is_empty() {
1292            String::new()
1293        } else {
1294            format!(
1295                " INCLUDE ({})",
1296                new_idx
1297                    .include
1298                    .iter()
1299                    .map(|s| quote(s))
1300                    .collect::<Vec<_>>()
1301                    .join(", ")
1302            )
1303        };
1304        let where_clause = new_idx
1305            .where_
1306            .as_ref()
1307            .map(|w| format!(" WHERE {}", w))
1308            .unwrap_or_default();
1309        steps.push(MigrationStep {
1310            step: 0,
1311            operation: MigrationOperation::CreateIndex,
1312            schema: schema.clone(),
1313            table: Some(table.name.clone()),
1314            object: new_idx.name.clone(),
1315            object_type: "index".into(),
1316            description: format!(
1317                "Create {}index \"{}\" on \"{}\".\"{}\"",
1318                if new_idx.unique { "unique " } else { "" },
1319                new_idx.name,
1320                schema,
1321                table.name
1322            ),
1323            ddl: Some(format!(
1324                "CREATE {}INDEX IF NOT EXISTS {} ON {} USING {} ({}){}{}",
1325                unique_kw,
1326                quote(&new_idx.name),
1327                full_table,
1328                method,
1329                col_parts.join(", "),
1330                include,
1331                where_clause
1332            )),
1333            safety: MigrationSafety::Safe,
1334            risk: MigrationRisk::None,
1335            risk_detail: None,
1336        });
1337    }
1338
1339    // ── 6. Foreign keys ──────────────────────────────────────────────────────
1340    for old_rel in &old.relationships {
1341        if !new_rels.contains_key(old_rel.id.as_str()) {
1342            let from_schema = old_schemas
1343                .get(old_rel.from_schema_id.as_str())
1344                .map(|s| s.name.as_str())
1345                .unwrap_or(&old_rel.from_schema_id);
1346            let from_table = old_tables
1347                .get(old_rel.from_table_id.as_str())
1348                .map(|t| t.name.as_str())
1349                .unwrap_or(&old_rel.from_table_id);
1350            let constraint = old_rel.name.as_deref().unwrap_or(&old_rel.id);
1351            let schema_q = quote(schema_override.unwrap_or(from_schema));
1352            steps.push(MigrationStep {
1353                step: 0,
1354                operation: MigrationOperation::DropForeignKey,
1355                schema: schema_override.unwrap_or(from_schema).to_string(),
1356                table: Some(from_table.to_string()),
1357                object: constraint.to_string(),
1358                object_type: "foreign_key".into(),
1359                description: format!(
1360                    "Drop FK \"{}\" from \"{}\".\"{}\"",
1361                    constraint,
1362                    schema_override.unwrap_or(from_schema),
1363                    from_table
1364                ),
1365                ddl: Some(format!(
1366                    "ALTER TABLE {}.{} DROP CONSTRAINT IF EXISTS {}",
1367                    schema_q,
1368                    quote(from_table),
1369                    quote(constraint)
1370                )),
1371                safety: MigrationSafety::Safe,
1372                risk: MigrationRisk::None,
1373                risk_detail: None,
1374            });
1375        }
1376    }
1377    for new_rel in &new.relationships {
1378        if old_rels.contains_key(new_rel.id.as_str())
1379            || added_table_ids.contains(new_rel.from_table_id.as_str())
1380            || added_table_ids.contains(new_rel.to_table_id.as_str())
1381        {
1382            continue;
1383        }
1384        let from_schema = match new_schemas.get(new_rel.from_schema_id.as_str()) {
1385            Some(s) => s,
1386            None => continue,
1387        };
1388        let from_table = match new_tables.get(new_rel.from_table_id.as_str()) {
1389            Some(t) => t,
1390            None => continue,
1391        };
1392        let to_schema = match new_schemas.get(new_rel.to_schema_id.as_str()) {
1393            Some(s) => s,
1394            None => continue,
1395        };
1396        let to_table = match new_tables.get(new_rel.to_table_id.as_str()) {
1397            Some(t) => t,
1398            None => continue,
1399        };
1400        let from_col = new
1401            .columns
1402            .iter()
1403            .find(|c| c.id == new_rel.from_column_id)
1404            .map(|c| c.name.clone())
1405            .unwrap_or_else(|| new_rel.from_column_id.clone());
1406        let to_col = new
1407            .columns
1408            .iter()
1409            .find(|c| c.id == new_rel.to_column_id)
1410            .map(|c| c.name.clone())
1411            .unwrap_or_else(|| new_rel.to_column_id.clone());
1412        let from_q = format!(
1413            "{}.{}",
1414            quote(schema_override.unwrap_or(&from_schema.name)),
1415            quote(&from_table.name)
1416        );
1417        let to_q = format!(
1418            "{}.{}",
1419            quote(schema_override.unwrap_or(&to_schema.name)),
1420            quote(&to_table.name)
1421        );
1422        let constraint = new_rel.name.as_deref().unwrap_or(&new_rel.id);
1423        let on_update = new_rel.on_update.as_deref().unwrap_or("NO ACTION");
1424        let on_delete = new_rel.on_delete.as_deref().unwrap_or("NO ACTION");
1425        steps.push(MigrationStep {
1426            step: 0,
1427            operation: MigrationOperation::AddForeignKey,
1428            schema: schema_override.unwrap_or(&from_schema.name).to_string(),
1429            table: Some(from_table.name.clone()),
1430            object: constraint.to_string(),
1431            object_type: "foreign_key".into(),
1432            description: format!("Add FK \"{}\" on \"{}\".\"{}\" → \"{}\".\"{}\"", constraint, schema_override.unwrap_or(&from_schema.name), from_table.name, schema_override.unwrap_or(&to_schema.name), to_table.name),
1433            ddl: Some(format!("ALTER TABLE {} ADD CONSTRAINT {} FOREIGN KEY ({}) REFERENCES {} ({}) ON UPDATE {} ON DELETE {}", from_q, quote(constraint), quote(&from_col), to_q, quote(&to_col), on_update, on_delete)),
1434            safety: MigrationSafety::BestEffort,
1435            risk: MigrationRisk::None,
1436            risk_detail: Some("PostgreSQL has no ADD CONSTRAINT IF NOT EXISTS; ignored if constraint already exists.".into()),
1437        });
1438    }
1439
1440    // Assign sequential step numbers
1441    for (i, s) in steps.iter_mut().enumerate() {
1442        s.step = i + 1;
1443    }
1444
1445    Ok(MigrationPlan { steps })
1446}
1447
1448// ─── execute_migration_plan ──────────────────────────────────────────────────
1449
1450/// Execute a pre-computed `MigrationPlan` against the tenant database.
1451/// Writes per-step audit records to the config (architect) database.
1452/// Returns counts and any warning messages collected from best-effort failures.
1453#[allow(clippy::too_many_arguments)]
1454pub async fn execute_migration_plan(
1455    migration_pool: &Pool,
1456    config_pool: &Pool,
1457    plan: &MigrationPlan,
1458    migration_plan_id: &str,
1459    package_id: &str,
1460    tenant_id: &str,
1461    from_version: Option<&str>,
1462    to_version: &str,
1463) -> Result<MigrationExecutionResult, AppError> {
1464    let mut applied = 0usize;
1465    let mut warned = 0usize;
1466    let mut warnings: Vec<String> = Vec::new();
1467
1468    for step in &plan.steps {
1469        let op = step.operation.to_string();
1470        let safety_str = format!("{:?}", step.safety);
1471        let risk_str = format!("{:?}", step.risk);
1472
1473        match step.safety {
1474            MigrationSafety::WarnOnly => {
1475                let msg = step
1476                    .risk_detail
1477                    .clone()
1478                    .unwrap_or_else(|| step.description.clone());
1479                tracing::warn!(step = step.step, %op, "migration plan warning (no DDL)");
1480                warnings.push(format!("[Step {}] {}", step.step, msg));
1481                let _ = crate::store::insert_migration_audit(
1482                    config_pool,
1483                    migration_plan_id,
1484                    package_id,
1485                    tenant_id,
1486                    from_version,
1487                    to_version,
1488                    step.step as i32,
1489                    &op,
1490                    &step.schema,
1491                    step.table.as_deref(),
1492                    &step.object,
1493                    &step.object_type,
1494                    &step.description,
1495                    step.ddl.as_deref(),
1496                    &safety_str,
1497                    &risk_str,
1498                    "skipped",
1499                    None,
1500                )
1501                .await;
1502                warned += 1;
1503            }
1504            MigrationSafety::Safe | MigrationSafety::BestEffort => {
1505                if let Some(ref sql) = step.ddl {
1506                    tracing::info!(step = step.step, %op, %sql, "executing migration step");
1507                    match sqlx::query(sql).execute(migration_pool).await {
1508                        Ok(_) => {
1509                            let _ = crate::store::insert_migration_audit(
1510                                config_pool,
1511                                migration_plan_id,
1512                                package_id,
1513                                tenant_id,
1514                                from_version,
1515                                to_version,
1516                                step.step as i32,
1517                                &op,
1518                                &step.schema,
1519                                step.table.as_deref(),
1520                                &step.object,
1521                                &step.object_type,
1522                                &step.description,
1523                                step.ddl.as_deref(),
1524                                &safety_str,
1525                                &risk_str,
1526                                "applied",
1527                                None,
1528                            )
1529                            .await;
1530                            applied += 1;
1531                        }
1532                        Err(e) => {
1533                            let err_str = e.to_string();
1534                            if matches!(step.safety, MigrationSafety::BestEffort) {
1535                                tracing::warn!(step = step.step, %op, error = %e, "migration step failed (best-effort, continuing)");
1536                                let msg = format!(
1537                                    "[Step {}] {} — Error: {}",
1538                                    step.step, step.description, err_str
1539                                );
1540                                warnings.push(msg);
1541                                let _ = crate::store::insert_migration_audit(
1542                                    config_pool,
1543                                    migration_plan_id,
1544                                    package_id,
1545                                    tenant_id,
1546                                    from_version,
1547                                    to_version,
1548                                    step.step as i32,
1549                                    &op,
1550                                    &step.schema,
1551                                    step.table.as_deref(),
1552                                    &step.object,
1553                                    &step.object_type,
1554                                    &step.description,
1555                                    step.ddl.as_deref(),
1556                                    &safety_str,
1557                                    &risk_str,
1558                                    "warned",
1559                                    Some(&err_str),
1560                                )
1561                                .await;
1562                                warned += 1;
1563                            } else {
1564                                let _ = crate::store::insert_migration_audit(
1565                                    config_pool,
1566                                    migration_plan_id,
1567                                    package_id,
1568                                    tenant_id,
1569                                    from_version,
1570                                    to_version,
1571                                    step.step as i32,
1572                                    &op,
1573                                    &step.schema,
1574                                    step.table.as_deref(),
1575                                    &step.object,
1576                                    &step.object_type,
1577                                    &step.description,
1578                                    step.ddl.as_deref(),
1579                                    &safety_str,
1580                                    &risk_str,
1581                                    "failed",
1582                                    Some(&err_str),
1583                                )
1584                                .await;
1585                                return Err(AppError::Db(e));
1586                            }
1587                        }
1588                    }
1589                }
1590            }
1591        }
1592    }
1593
1594    Ok(MigrationExecutionResult {
1595        applied,
1596        warned,
1597        warnings,
1598    })
1599}
1600
1601/// Build CREATE TABLE DDL for the `{table}_audit` companion table.
1602/// All source columns are replicated as nullable with no constraints, plus five audit metadata
1603/// columns prepended: audit_id (PK), audit_action, audit_at, audit_by, changed_fields.
1604fn audit_table_ddl(
1605    schema_name: &str,
1606    table_name: &str,
1607    source_cols: &[&ColumnConfig],
1608    dialect: &dyn Dialect,
1609) -> String {
1610    let audit_name = format!("{}_audit", table_name);
1611    let audit_full = format!("{}.{}", quote(schema_name), quote(&audit_name));
1612
1613    let mut col_defs: Vec<String> = Vec::new();
1614    col_defs.push(format!(
1615        "{} {} NOT NULL DEFAULT {}",
1616        quote("audit_id"),
1617        "UUID",
1618        dialect.uuid_default_expr()
1619    ));
1620    col_defs.push(format!("{} TEXT NOT NULL", quote("audit_action")));
1621    col_defs.push(format!(
1622        "{} {} NOT NULL DEFAULT {}",
1623        quote("audit_at"),
1624        dialect.audit_timestamp_type(),
1625        dialect.now_fn()
1626    ));
1627    col_defs.push(format!("{} TEXT", quote("audit_by")));
1628    col_defs.push(format!(
1629        "{} {}",
1630        quote("changed_fields"),
1631        dialect.sys_json_type()
1632    ));
1633
1634    let config_col_names: HashSet<&str> = source_cols.iter().map(|c| c.name.as_str()).collect();
1635    for c in source_cols {
1636        let typ = dialect.ddl_type(&parse_canonical(&c.type_));
1637        col_defs.push(format!("{} {}", quote(&c.name), typ));
1638    }
1639    let audit_ts = dialect.audit_timestamp_type();
1640    for (name, typ) in [
1641        ("created_at", audit_ts),
1642        ("updated_at", audit_ts),
1643        ("archived_at", audit_ts),
1644        ("created_by", "TEXT"),
1645        ("updated_by", "TEXT"),
1646    ] {
1647        if !config_col_names.contains(name) {
1648            col_defs.push(format!("{} {}", quote(name), typ));
1649        }
1650    }
1651    col_defs.push(format!("PRIMARY KEY ({})", quote("audit_id")));
1652
1653    format!(
1654        "CREATE TABLE IF NOT EXISTS {} (\n  {}\n)",
1655        audit_full,
1656        col_defs.join(",\n  ")
1657    )
1658}