Skip to main content

architect_sdk/
migration.rs

1//! Apply config to the database: DDL for schemas, enums, tables, indexes, and foreign keys.
2//! Order follows PostgreSQL dependencies (see docs/postgres-config-schema.md § 3.5).
3
4use crate::config::types::*;
5use crate::config::{validate, FullConfig};
6use crate::db::parse_canonical;
7use crate::db::pool::Pool;
8use crate::db::Dialect;
9use crate::error::AppError;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12
13fn quote(s: &str) -> String {
14    format!("\"{}\"", s.replace('\\', "\\\\").replace('"', "\\\""))
15}
16
17/// Name of the column added to app tables when RLS is enabled. Used by migration and CRUD.
18pub const RLS_TENANT_COLUMN: &str = "tenant_id";
19
20/// Apply full config to the database: CREATE SCHEMA, CREATE TYPE, CREATE TABLE, CREATE INDEX, ADD FK.
21/// Validates config first. Idempotent for schemas and types (IF NOT EXISTS); tables are CREATE TABLE only (fails if exists).
22/// When `schema_override` is `Some(s)`, app tables/indexes/FKs are created in schema `s` instead of config schema names (e.g. for schema-strategy tenants).
23/// When `rls_tenant_column` is `Some(col)`, each table gets that column (if missing), RLS enabled, and policies using `current_setting('app.tenant_id', true)`.
24pub async fn apply_migrations(
25    pool: &Pool,
26    config: &FullConfig,
27    schema_override: Option<&str>,
28    rls_tenant_column: Option<&str>,
29    dialect: &dyn Dialect,
30    cross_package_configs: &HashMap<String, FullConfig>,
31) -> Result<(), AppError> {
32    validate(config)?;
33    let default_sid = config
34        .schemas
35        .first()
36        .map(|s| s.id.as_str())
37        .ok_or_else(|| {
38            AppError::Config(crate::error::ConfigError::Validation(
39                "at least one schema required".into(),
40            ))
41        })?;
42
43    if dialect.supports_schemas() {
44        if let Some(s) = schema_override {
45            let name = quote(s);
46            sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {}", name))
47                .execute(pool)
48                .await?;
49        }
50    }
51
52    let schemas_by_id: HashMap<_, _> = config.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
53    let tables_by_id: HashMap<_, _> = config.tables.iter().map(|t| (t.id.as_str(), t)).collect();
54    let columns_by_table: HashMap<_, Vec<&ColumnConfig>> =
55        config.columns.iter().fold(HashMap::new(), |mut m, c| {
56            m.entry(c.table_id.as_str()).or_default().push(c);
57            m
58        });
59
60    // When schema_override is set, we only create the override schema; otherwise create config schemas.
61    if schema_override.is_none() && dialect.supports_schemas() {
62        for s in &config.schemas {
63            let name = quote(&s.name);
64            let comment = s
65                .comment
66                .as_ref()
67                .map(|c| format!("COMMENT ON SCHEMA {} IS '{}'", name, c.replace('\'', "''")));
68            sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {}", name))
69                .execute(pool)
70                .await?;
71            if let Some(sql) = comment {
72                let _ = sqlx::query(&sql).execute(pool).await;
73            }
74        }
75    }
76
77    for e in &config.enums {
78        let sid = e.schema_id.as_deref().unwrap_or(default_sid);
79        let schema = schemas_by_id.get(sid).ok_or_else(|| {
80            AppError::Config(crate::error::ConfigError::MissingReference {
81                kind: "schema",
82                id: sid.to_string(),
83            })
84        })?;
85        let schema_name = quote(schema_override.unwrap_or(&schema.name));
86        let type_name = quote(&e.name);
87        if dialect.supports_named_enum_types() {
88            let values: Vec<String> = e
89                .values
90                .iter()
91                .map(|v| format!("'{}'", v.replace('\'', "''")))
92                .collect();
93            let sql = format!(
94                "CREATE TYPE {}.{} AS ENUM ({})",
95                schema_name,
96                type_name,
97                values.join(", ")
98            );
99            let _ = sqlx::query(&sql).execute(pool).await;
100        }
101    }
102
103    for t in &config.tables {
104        let sid = t.schema_id.as_deref().unwrap_or(default_sid);
105        let schema = schemas_by_id.get(sid).ok_or_else(|| {
106            AppError::Config(crate::error::ConfigError::MissingReference {
107                kind: "schema",
108                id: sid.to_string(),
109            })
110        })?;
111        let schema_name = quote(schema_override.unwrap_or(&schema.name));
112        let table_name = quote(&t.name);
113        let full_name = format!("{}.{}", schema_name, table_name);
114
115        let cols = columns_by_table
116            .get(t.id.as_str())
117            .map(|v| v.as_slice())
118            .unwrap_or(&[]);
119        let mut col_defs: Vec<String> = Vec::new();
120        for c in cols {
121            let typ = dialect.ddl_type(&parse_canonical(&c.type_));
122            let mut def = format!("{} {}", quote(&c.name), typ);
123            if !c.nullable {
124                def.push_str(" NOT NULL");
125            }
126            if let Some(ref d) = c.default {
127                def.push_str(" DEFAULT ");
128                match d {
129                    ColumnDefaultConfig::Literal(s) => def.push_str(s),
130                    ColumnDefaultConfig::Expression { expression } => def.push_str(expression),
131                }
132            }
133            col_defs.push(def);
134        }
135
136        let config_col_names: HashSet<&str> = cols.iter().map(|c| c.name.as_str()).collect();
137        let ts_default = format!(
138            "{} NOT NULL DEFAULT {}",
139            dialect.sys_timestamp_type(),
140            dialect.now_fn()
141        );
142        let ts_nullable = dialect.sys_timestamp_type().to_string();
143        for (name, def_suffix) in [
144            ("created_at", ts_default.as_str()),
145            ("updated_at", ts_default.as_str()),
146            ("archived_at", ts_nullable.as_str()),
147            ("created_by", "TEXT"),
148            ("updated_by", "TEXT"),
149        ] {
150            if !config_col_names.contains(name) {
151                col_defs.push(format!("{} {}", quote(name), def_suffix));
152            }
153        }
154
155        let pk_cols = match &t.primary_key {
156            PrimaryKeyConfig::Single(s) => vec![quote(s)],
157            PrimaryKeyConfig::Composite(v) => v.iter().map(|s| quote(s)).collect::<Vec<_>>(),
158        };
159        let pk_def = format!("PRIMARY KEY ({})", pk_cols.join(", "));
160        col_defs.push(pk_def);
161
162        for u in &t.unique {
163            let cols: Vec<String> = u.iter().map(|s| quote(s)).collect();
164            col_defs.push(format!("UNIQUE ({})", cols.join(", ")));
165        }
166        for ch in &t.check {
167            col_defs.push(format!(
168                "CONSTRAINT {} CHECK ({})",
169                quote(&ch.name),
170                ch.expression
171            ));
172        }
173
174        let sql = format!(
175            "CREATE TABLE IF NOT EXISTS {} (\n  {}\n)",
176            full_name,
177            col_defs.join(",\n  ")
178        );
179        sqlx::query(&sql).execute(pool).await?;
180
181        if t.audit_log {
182            let schema_raw = schema_override.unwrap_or(&schema.name);
183            let audit_sql = audit_table_ddl(schema_raw, &t.name, cols, dialect);
184            sqlx::query(&audit_sql).execute(pool).await?;
185            let pk_col = match &t.primary_key {
186                PrimaryKeyConfig::Single(s) => s.clone(),
187                PrimaryKeyConfig::Composite(v) => v[0].clone(),
188            };
189            let audit_full = format!(
190                "{}.{}",
191                quote(schema_raw),
192                quote(&format!("{}_audit", t.name))
193            );
194            let idx_sql = format!(
195                "CREATE INDEX IF NOT EXISTS {} ON {} ({}, {})",
196                quote(&format!("{}_audit_record_idx", t.name)),
197                audit_full,
198                quote(&pk_col),
199                quote("audit_at")
200            );
201            let _ = sqlx::query(&idx_sql).execute(pool).await;
202        }
203
204        if t.versioning.as_ref().is_some_and(|v| v.enabled) {
205            let schema_raw = schema_override.unwrap_or(&schema.name);
206            let pk_col = match &t.primary_key {
207                PrimaryKeyConfig::Single(s) => s.clone(),
208                PrimaryKeyConfig::Composite(v) => v[0].clone(),
209            };
210            let history_ddl = history_table_ddl(schema_raw, &t.name, &pk_col, cols, dialect);
211            // history_table_ddl embeds a comment line for the index; execute CREATE TABLE only
212            let create_only = history_ddl
213                .lines()
214                .take_while(|l| !l.trim_start().starts_with("-- index:"))
215                .collect::<Vec<_>>()
216                .join("\n");
217            sqlx::query(create_only.trim()).execute(pool).await?;
218            let idx_sql = history_index_ddl(schema_raw, &t.name, &pk_col);
219            let _ = sqlx::query(&idx_sql).execute(pool).await;
220        }
221
222        if let Some(col) = rls_tenant_column {
223            if dialect.supports_rls() {
224                if !config_col_names.contains(col) {
225                    let q_col = quote(col);
226                    let add_col = format!(
227                        "ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} TEXT",
228                        full_name, q_col
229                    );
230                    sqlx::query(&add_col).execute(pool).await?;
231                }
232                let enable_rls = format!("ALTER TABLE {} ENABLE ROW LEVEL SECURITY", full_name);
233                sqlx::query(&enable_rls).execute(pool).await?;
234                let q_col = quote(col);
235                let setting = "current_setting('app.tenant_id', true)";
236                let cond = format!("{} = {}", q_col, setting);
237                let policy_prefix = format!("rls_tenant_{}", t.name);
238                let policies: &[(&str, &str, Option<&str>, Option<&str>)] = &[
239                    ("select", "SELECT", Some(cond.as_str()), None),
240                    ("insert", "INSERT", None, Some(cond.as_str())),
241                    ("update", "UPDATE", Some(cond.as_str()), Some(cond.as_str())),
242                    ("delete", "DELETE", Some(cond.as_str()), None),
243                ];
244                for (suffix, cmd, using_cond, with_check) in policies.iter() {
245                    let policy_name = format!("{}_{}", policy_prefix, suffix);
246                    let drop_sql = format!(
247                        "DROP POLICY IF EXISTS {} ON {}",
248                        quote(&policy_name),
249                        full_name
250                    );
251                    let _ = sqlx::query(&drop_sql).execute(pool).await;
252                    let create_sql = match (using_cond, with_check) {
253                        (Some(u), Some(w)) => format!(
254                            "CREATE POLICY {} ON {} FOR {} USING ( {} ) WITH CHECK ( {} )",
255                            quote(&policy_name),
256                            full_name,
257                            cmd,
258                            u,
259                            w
260                        ),
261                        (Some(u), None) => format!(
262                            "CREATE POLICY {} ON {} FOR {} USING ( {} )",
263                            quote(&policy_name),
264                            full_name,
265                            cmd,
266                            u
267                        ),
268                        (None, Some(w)) => format!(
269                            "CREATE POLICY {} ON {} FOR {} WITH CHECK ( {} )",
270                            quote(&policy_name),
271                            full_name,
272                            cmd,
273                            w
274                        ),
275                        (None, None) => continue,
276                    };
277                    sqlx::query(&create_sql).execute(pool).await?;
278                }
279            } else {
280                tracing::warn!(table = %full_name, dialect = %dialect.name(), "RLS requested but not supported by this dialect; skipping");
281            }
282        }
283    }
284
285    for idx in &config.indexes {
286        let sid = idx.schema_id.as_deref().unwrap_or(default_sid);
287        let schema = schemas_by_id.get(sid).ok_or_else(|| {
288            AppError::Config(crate::error::ConfigError::MissingReference {
289                kind: "schema",
290                id: sid.to_string(),
291            })
292        })?;
293        let table = tables_by_id.get(idx.table_id.as_str()).ok_or_else(|| {
294            AppError::Config(crate::error::ConfigError::MissingReference {
295                kind: "table",
296                id: idx.table_id.clone(),
297            })
298        })?;
299        let schema_name = quote(schema_override.unwrap_or(&schema.name));
300        let table_name = quote(&table.name);
301        let full_table = format!("{}.{}", schema_name, table_name);
302        let index_name = quote(&idx.name);
303
304        let mut col_parts: Vec<String> = Vec::new();
305        for col in &idx.columns {
306            match col {
307                IndexColumnEntry::Name(n) => col_parts.push(quote(n)),
308                IndexColumnEntry::Spec {
309                    name, direction, ..
310                } => {
311                    let dir = direction
312                        .as_deref()
313                        .map(|d| format!(" {}", d.to_uppercase()))
314                        .unwrap_or_default();
315                    col_parts.push(format!("{}{}", quote(name), dir));
316                }
317                IndexColumnEntry::Expression { expression } => col_parts.push(expression.clone()),
318            }
319        }
320        let method = idx.method.as_deref().unwrap_or("btree");
321        let unique = if idx.unique { "UNIQUE " } else { "" };
322        let include: String = if idx.include.is_empty() {
323            String::new()
324        } else {
325            let inc: Vec<String> = idx.include.iter().map(|s| quote(s)).collect();
326            format!(" INCLUDE ({})", inc.join(", "))
327        };
328        let where_clause: String = idx
329            .where_
330            .as_ref()
331            .map(|w| format!(" WHERE {}", w))
332            .unwrap_or_default();
333
334        let sql = format!(
335            "CREATE {}INDEX IF NOT EXISTS {} ON {} USING {} ({}){}{}",
336            unique,
337            index_name,
338            full_table,
339            method,
340            col_parts.join(", "),
341            include,
342            where_clause
343        );
344        let _ = sqlx::query(&sql).execute(pool).await;
345    }
346
347    for rel in &config.relationships {
348        let from_sid = rel.from_schema_id.as_deref().unwrap_or(default_sid);
349        let from_schema = schemas_by_id.get(from_sid).ok_or_else(|| {
350            AppError::Config(crate::error::ConfigError::MissingReference {
351                kind: "schema",
352                id: from_sid.to_string(),
353            })
354        })?;
355        let from_table = tables_by_id
356            .get(rel.from_table_id.as_str())
357            .ok_or_else(|| {
358                AppError::Config(crate::error::ConfigError::MissingReference {
359                    kind: "table",
360                    id: rel.from_table_id.clone(),
361                })
362            })?;
363
364        // Resolve the target schema and table — either from a cross-package config or this config.
365        let (to_schema_name_owned, to_table_name, to_col_name) = if let Some(pkg_id) =
366            rel.to_package_id.as_deref()
367        {
368            let foreign = cross_package_configs.get(pkg_id).ok_or_else(|| {
369                AppError::Config(crate::error::ConfigError::MissingReference {
370                    kind: "cross_package",
371                    id: pkg_id.to_string(),
372                })
373            })?;
374            let foreign_tables: HashMap<_, _> =
375                foreign.tables.iter().map(|t| (t.id.as_str(), t)).collect();
376            let foreign_schemas: HashMap<_, _> =
377                foreign.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
378            let to_tbl = foreign_tables
379                .get(rel.to_table_id.as_str())
380                .ok_or_else(|| {
381                    AppError::Config(crate::error::ConfigError::MissingReference {
382                        kind: "table",
383                        id: rel.to_table_id.clone(),
384                    })
385                })?;
386            let foreign_default_sid = foreign.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
387            let to_sid = rel.to_schema_id.as_deref().unwrap_or(foreign_default_sid);
388            let to_schema = foreign_schemas.get(to_sid).ok_or_else(|| {
389                AppError::Config(crate::error::ConfigError::MissingReference {
390                    kind: "schema",
391                    id: to_sid.to_string(),
392                })
393            })?;
394            let col_name = foreign
395                .columns
396                .iter()
397                .find(|c| c.id == rel.to_column_id)
398                .map(|c| c.name.clone())
399                .ok_or_else(|| {
400                    AppError::Config(crate::error::ConfigError::MissingReference {
401                        kind: "column",
402                        id: rel.to_column_id.clone(),
403                    })
404                })?;
405            // Cross-package FKs always use the real schema name (no schema_override — the
406            // target package lives in its own schema, not the tenant override).
407            (to_schema.name.clone(), to_tbl.name.clone(), col_name)
408        } else {
409            let to_sid = rel.to_schema_id.as_deref().unwrap_or(default_sid);
410            let to_schema = schemas_by_id.get(to_sid).ok_or_else(|| {
411                AppError::Config(crate::error::ConfigError::MissingReference {
412                    kind: "schema",
413                    id: to_sid.to_string(),
414                })
415            })?;
416            let to_table = tables_by_id.get(rel.to_table_id.as_str()).ok_or_else(|| {
417                AppError::Config(crate::error::ConfigError::MissingReference {
418                    kind: "table",
419                    id: rel.to_table_id.clone(),
420                })
421            })?;
422            let col_name = config
423                .columns
424                .iter()
425                .find(|c| c.id == rel.to_column_id)
426                .map(|c| c.name.clone())
427                .ok_or_else(|| {
428                    AppError::Config(crate::error::ConfigError::MissingReference {
429                        kind: "column",
430                        id: rel.to_column_id.clone(),
431                    })
432                })?;
433            (
434                schema_override.unwrap_or(&to_schema.name).to_string(),
435                to_table.name.clone(),
436                col_name,
437            )
438        };
439
440        let from_schema_name = schema_override.unwrap_or(&from_schema.name);
441        let from_col = config
442            .columns
443            .iter()
444            .find(|c| c.id == rel.from_column_id)
445            .map(|c| c.name.as_str())
446            .ok_or_else(|| {
447                AppError::Config(crate::error::ConfigError::MissingReference {
448                    kind: "column",
449                    id: rel.from_column_id.clone(),
450                })
451            })?;
452
453        let from_full = format!("{}.{}", quote(from_schema_name), quote(&from_table.name));
454        let to_full = format!("{}.{}", quote(&to_schema_name_owned), quote(&to_table_name));
455        let constraint_name = rel.name.as_deref().unwrap_or(&rel.id);
456        let on_update = rel.on_update.as_deref().unwrap_or("NO ACTION");
457        let on_delete = rel.on_delete.as_deref().unwrap_or("NO ACTION");
458
459        let sql = format!(
460            "ALTER TABLE {} ADD CONSTRAINT {} FOREIGN KEY ({}) REFERENCES {} ({}) ON UPDATE {} ON DELETE {}",
461            from_full,
462            quote(constraint_name),
463            quote(from_col),
464            to_full,
465            quote(&to_col_name),
466            on_update,
467            on_delete
468        );
469        let _ = sqlx::query(&sql).execute(pool).await;
470    }
471
472    Ok(())
473}
474
475/// Revert migrations for a package: drop tables, enum types, and schema (if not public) in reverse order of apply.
476/// Uses the same schema_override as apply_migrations (tables/enums live in that schema).
477pub async fn revert_migrations(
478    pool: &Pool,
479    config: &FullConfig,
480    schema_override: Option<&str>,
481) -> Result<(), AppError> {
482    let default_sid = config
483        .schemas
484        .first()
485        .map(|s| s.id.as_str())
486        .ok_or_else(|| {
487            AppError::Config(crate::error::ConfigError::Validation(
488                "at least one schema required".into(),
489            ))
490        })?;
491
492    let schemas_by_id: HashMap<_, _> = config.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
493
494    // 1. Drop tables (CASCADE drops FKs and dependent objects)
495    for t in &config.tables {
496        let sid = t.schema_id.as_deref().unwrap_or(default_sid);
497        let schema = schemas_by_id.get(sid).ok_or_else(|| {
498            AppError::Config(crate::error::ConfigError::MissingReference {
499                kind: "schema",
500                id: sid.to_string(),
501            })
502        })?;
503        let schema_raw = schema_override.unwrap_or(&schema.name);
504        let schema_name = quote(schema_raw);
505        let table_name = quote(&t.name);
506        let full_name = format!("{}.{}", schema_name, table_name);
507        if t.audit_log {
508            let audit_full = format!("{}.{}", schema_name, quote(&format!("{}_audit", t.name)));
509            let _ = sqlx::query(&format!("DROP TABLE IF EXISTS {} CASCADE", audit_full))
510                .execute(pool)
511                .await;
512        }
513        if t.versioning.as_ref().is_some_and(|v| v.enabled) {
514            let history_full = format!("{}.{}", schema_name, quote(&format!("{}_history", t.name)));
515            let _ = sqlx::query(&format!("DROP TABLE IF EXISTS {} CASCADE", history_full))
516                .execute(pool)
517                .await;
518        }
519        let drop_sql = format!("DROP TABLE IF EXISTS {} CASCADE", full_name);
520        let _ = sqlx::query(&drop_sql).execute(pool).await;
521    }
522
523    // 2. Drop enum types
524    for e in &config.enums {
525        let sid = e.schema_id.as_deref().unwrap_or(default_sid);
526        let schema = schemas_by_id.get(sid).ok_or_else(|| {
527            AppError::Config(crate::error::ConfigError::MissingReference {
528                kind: "schema",
529                id: sid.to_string(),
530            })
531        })?;
532        let schema_name = quote(schema_override.unwrap_or(&schema.name));
533        let type_name = quote(&e.name);
534        let drop_sql = format!("DROP TYPE IF EXISTS {}.{} CASCADE", schema_name, type_name);
535        let _ = sqlx::query(&drop_sql).execute(pool).await;
536    }
537
538    // 3. Drop schema only if not public (shared schema)
539    if schema_override.is_none() {
540        for s in &config.schemas {
541            if s.name.eq_ignore_ascii_case("public") {
542                continue;
543            }
544            let schema_name = quote(&s.name);
545            let drop_sql = format!("DROP SCHEMA IF EXISTS {} CASCADE", schema_name);
546            let _ = sqlx::query(&drop_sql).execute(pool).await;
547        }
548    }
549
550    Ok(())
551}
552
553// ─── Migration plan types ────────────────────────────────────────────────────
554
555#[derive(Debug, Clone, Serialize, Deserialize)]
556#[serde(rename_all = "snake_case")]
557pub enum MigrationOperation {
558    CreateSchema,
559    CreateEnum,
560    DropEnum,
561    AddEnumValue,
562    RemoveEnumValue,
563    CreateTable,
564    DropTable,
565    AddColumn,
566    DropColumn,
567    RenameColumn,
568    AlterColumnType,
569    BackfillNulls,
570    SetNotNull,
571    DropNotNull,
572    SetDefault,
573    DropDefault,
574    CreateIndex,
575    DropIndex,
576    AddForeignKey,
577    DropForeignKey,
578}
579
580impl std::fmt::Display for MigrationOperation {
581    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
582        let s = serde_json::to_value(self)
583            .ok()
584            .and_then(|v| v.as_str().map(String::from))
585            .unwrap_or_else(|| format!("{:?}", self));
586        write!(f, "{}", s)
587    }
588}
589
590/// How safely a migration step can be executed.
591#[derive(Debug, Clone, Serialize, Deserialize)]
592#[serde(rename_all = "snake_case")]
593pub enum MigrationSafety {
594    /// Guaranteed to succeed, no data impact.
595    Safe,
596    /// Attempted; execution failure is captured as a warning instead of aborting.
597    BestEffort,
598    /// No DDL generated — config change noted as a warning only (e.g. removed tables/columns).
599    WarnOnly,
600}
601
602/// Risk category associated with a migration step.
603#[derive(Debug, Clone, Serialize, Deserialize)]
604#[serde(rename_all = "snake_case")]
605pub enum MigrationRisk {
606    None,
607    /// Cast may fail for incompatible values (e.g. TEXT → INTEGER).
608    MayFail,
609    /// SET NOT NULL will fail if any existing row has NULL in this column.
610    ExistingNullsMustBeAbsent,
611    /// Existing NULL rows will be overwritten with the column default.
612    DataWillBeModified,
613    /// Cannot be automated — requires a manual database action.
614    ManualActionRequired,
615}
616
617/// One step in a migration plan: a DDL statement with metadata.
618#[derive(Debug, Clone, Serialize, Deserialize)]
619pub struct MigrationStep {
620    pub step: usize,
621    pub operation: MigrationOperation,
622    pub schema: String,
623    pub table: Option<String>,
624    /// Column name, index name, FK constraint name, enum name, etc.
625    pub object: String,
626    /// "column" | "table" | "index" | "foreign_key" | "enum" | "enum_value" | "schema"
627    pub object_type: String,
628    pub description: String,
629    /// The SQL to execute. None for WarnOnly steps.
630    pub ddl: Option<String>,
631    pub safety: MigrationSafety,
632    pub risk: MigrationRisk,
633    pub risk_detail: Option<String>,
634}
635
636/// Computed diff between two package versions expressed as ordered migration steps.
637#[derive(Debug, Clone, Serialize, Deserialize)]
638pub struct MigrationPlan {
639    pub steps: Vec<MigrationStep>,
640}
641
642#[derive(Debug, Clone, Serialize)]
643pub struct MigrationSummary {
644    pub total: usize,
645    pub safe: usize,
646    pub best_effort: usize,
647    pub warn_only: usize,
648}
649
650impl MigrationPlan {
651    pub fn summary(&self) -> MigrationSummary {
652        let (mut safe, mut best_effort, mut warn_only) = (0, 0, 0);
653        for s in &self.steps {
654            match s.safety {
655                MigrationSafety::Safe => safe += 1,
656                MigrationSafety::BestEffort => best_effort += 1,
657                MigrationSafety::WarnOnly => warn_only += 1,
658            }
659        }
660        MigrationSummary {
661            total: self.steps.len(),
662            safe,
663            best_effort,
664            warn_only,
665        }
666    }
667}
668
669/// Result returned by `execute_migration_plan`.
670pub struct MigrationExecutionResult {
671    pub applied: usize,
672    pub warned: usize,
673    pub warnings: Vec<String>,
674}
675
676fn default_str(d: &ColumnDefaultConfig) -> String {
677    match d {
678        ColumnDefaultConfig::Literal(s) => s.clone(),
679        ColumnDefaultConfig::Expression { expression } => expression.clone(),
680    }
681}
682
683/// A column whose type is (an array of) a given enum — i.e. one that must be recast when the
684/// enum type is rebuilt.
685struct EnumColumnRef {
686    schema: String,
687    table: String,
688    column: String,
689    default: Option<String>,
690    is_array: bool,
691}
692
693/// If `t` is a custom enum reference (`schema.name`, bare `name`, or an array of one), return
694/// `(enum_name, is_array)` where `enum_name` is the unqualified type name. None for built-in types.
695fn enum_type_name(t: &crate::db::CanonicalType) -> Option<(String, bool)> {
696    use crate::db::CanonicalType;
697    let unqualified = |s: &str| s.rsplit('.').next().unwrap_or(s).to_string();
698    match t {
699        CanonicalType::Custom(s) => Some((unqualified(s), false)),
700        CanonicalType::Array(inner) => match inner.as_ref() {
701            CanonicalType::Custom(s) => Some((unqualified(s), true)),
702            _ => None,
703        },
704        _ => None,
705    }
706}
707
708/// Find every column in `new` whose type is `new_enum` (or an array of it), resolved to its live
709/// table/schema name. Used to drive the recast steps of an enum rebuild.
710fn enum_dependent_columns(
711    new_enum: &EnumConfig,
712    new: &FullConfig,
713    new_tables: &HashMap<&str, &TableConfig>,
714    new_schemas: &HashMap<&str, &SchemaConfig>,
715    schema_override: Option<&str>,
716) -> Vec<EnumColumnRef> {
717    let default_sid = new.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
718    let mut out = Vec::new();
719    for c in &new.columns {
720        let Some((tyname, is_array)) = enum_type_name(&parse_canonical(&c.type_)) else {
721            continue;
722        };
723        if tyname != new_enum.name {
724            continue;
725        }
726        let Some(table) = new_tables.get(c.table_id.as_str()) else {
727            continue;
728        };
729        let tsid = table.schema_id.as_deref().unwrap_or(default_sid);
730        let schema = schema_override.map(String::from).unwrap_or_else(|| {
731            new_schemas
732                .get(tsid)
733                .map(|s| s.name.clone())
734                .unwrap_or_else(|| tsid.to_string())
735        });
736        out.push(EnumColumnRef {
737            schema,
738            table: table.name.clone(),
739            column: c.name.clone(),
740            default: c.default.as_ref().map(default_str),
741            is_array,
742        });
743    }
744    out
745}
746
747/// PostgreSQL cannot drop a value from an enum in place, so when one is removed the type must be
748/// rebuilt: rename the live type aside, create it afresh with the reduced value set, recast every
749/// dependent column through a text cast, then drop the old type. Steps are emitted as `BestEffort`
750/// — a recast that fails because a row still holds a removed value is surfaced as a warning rather
751/// than aborting the whole upgrade. Each statement is a separate step because `execute_migration_plan`
752/// runs them individually (the extended protocol forbids multiple statements per query).
753fn recreate_enum_steps(
754    steps: &mut Vec<MigrationStep>,
755    schema: &str,
756    new_enum: &EnumConfig,
757    removed: &[&str],
758    dependents: &[EnumColumnRef],
759) {
760    let type_q = format!("{}.{}", quote(schema), quote(&new_enum.name));
761    let tmp_name = format!("{}__arch_old", new_enum.name);
762    let values: Vec<String> = new_enum
763        .values
764        .iter()
765        .map(|v| format!("'{}'", v.replace('\'', "''")))
766        .collect();
767
768    // 0. Informational summary of the destructive rebuild (no DDL).
769    steps.push(MigrationStep {
770        step: 0,
771        operation: MigrationOperation::RemoveEnumValue,
772        schema: schema.to_string(),
773        table: None,
774        object: format!("{}:{}", new_enum.name, removed.join(",")),
775        object_type: "enum".into(),
776        description: format!(
777            "Rebuild enum \"{}\".\"{}\" to remove value(s): {}",
778            schema,
779            new_enum.name,
780            removed.join(", ")
781        ),
782        ddl: None,
783        safety: MigrationSafety::WarnOnly,
784        risk: MigrationRisk::ManualActionRequired,
785        risk_detail: Some(format!(
786            "PostgreSQL cannot drop enum values in place. The type is rebuilt and {} dependent \
787             column(s) are recast via a text cast. Any existing row holding a removed value ({}) \
788             will make its recast fail — reassign those rows first.",
789            dependents.len(),
790            removed.join(", ")
791        )),
792    });
793
794    // 1. Rename the live type aside.
795    steps.push(MigrationStep {
796        step: 0,
797        operation: MigrationOperation::DropEnum,
798        schema: schema.to_string(),
799        table: None,
800        object: new_enum.name.clone(),
801        object_type: "enum".into(),
802        description: format!(
803            "Rename enum \"{}\".\"{}\" to \"{}\" before rebuild",
804            schema, new_enum.name, tmp_name
805        ),
806        ddl: Some(format!(
807            "ALTER TYPE {} RENAME TO {}",
808            type_q,
809            quote(&tmp_name)
810        )),
811        safety: MigrationSafety::BestEffort,
812        risk: MigrationRisk::None,
813        risk_detail: None,
814    });
815
816    // 2. Create the type afresh with the reduced value set (folds in any added values too).
817    steps.push(MigrationStep {
818        step: 0,
819        operation: MigrationOperation::CreateEnum,
820        schema: schema.to_string(),
821        table: None,
822        object: new_enum.name.clone(),
823        object_type: "enum".into(),
824        description: format!(
825            "Recreate enum \"{}\".\"{}\" with {} value(s)",
826            schema,
827            new_enum.name,
828            new_enum.values.len()
829        ),
830        ddl: Some(format!(
831            "CREATE TYPE {} AS ENUM ({})",
832            type_q,
833            values.join(", ")
834        )),
835        safety: MigrationSafety::BestEffort,
836        risk: MigrationRisk::None,
837        risk_detail: None,
838    });
839
840    // 3. Recast every dependent column from the renamed type onto the rebuilt one. A column with a
841    //    default must drop it first (the default still references the renamed type) and restore it after.
842    for dep in dependents {
843        let table_q = format!("{}.{}", quote(&dep.schema), quote(&dep.table));
844        let col_q = quote(&dep.column);
845
846        if dep.default.is_some() {
847            steps.push(MigrationStep {
848                step: 0,
849                operation: MigrationOperation::DropDefault,
850                schema: dep.schema.clone(),
851                table: Some(dep.table.clone()),
852                object: dep.column.clone(),
853                object_type: "column".into(),
854                description: format!(
855                    "Drop default on {}.{} before enum recast",
856                    dep.table, dep.column
857                ),
858                ddl: Some(format!(
859                    "ALTER TABLE {} ALTER COLUMN {} DROP DEFAULT",
860                    table_q, col_q
861                )),
862                safety: MigrationSafety::BestEffort,
863                risk: MigrationRisk::None,
864                risk_detail: None,
865            });
866        }
867
868        let (col_type, using) = if dep.is_array {
869            (
870                format!("{}[]", type_q),
871                format!("{}::text[]::{}[]", col_q, type_q),
872            )
873        } else {
874            (type_q.clone(), format!("{}::text::{}", col_q, type_q))
875        };
876        steps.push(MigrationStep {
877            step: 0,
878            operation: MigrationOperation::AlterColumnType,
879            schema: dep.schema.clone(),
880            table: Some(dep.table.clone()),
881            object: dep.column.clone(),
882            object_type: "column".into(),
883            description: format!(
884                "Recast {}.{} onto rebuilt enum \"{}\"",
885                dep.table, dep.column, new_enum.name
886            ),
887            ddl: Some(format!(
888                "ALTER TABLE {} ALTER COLUMN {} TYPE {} USING {}",
889                table_q, col_q, col_type, using
890            )),
891            safety: MigrationSafety::BestEffort,
892            risk: MigrationRisk::MayFail,
893            risk_detail: Some(format!(
894                "Cast fails if any row holds a removed value ({}).",
895                removed.join(", ")
896            )),
897        });
898
899        if let Some(def) = &dep.default {
900            steps.push(MigrationStep {
901                step: 0,
902                operation: MigrationOperation::SetDefault,
903                schema: dep.schema.clone(),
904                table: Some(dep.table.clone()),
905                object: dep.column.clone(),
906                object_type: "column".into(),
907                description: format!(
908                    "Restore default on {}.{} after enum recast",
909                    dep.table, dep.column
910                ),
911                ddl: Some(format!(
912                    "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT {}",
913                    table_q, col_q, def
914                )),
915                safety: MigrationSafety::BestEffort,
916                risk: MigrationRisk::None,
917                risk_detail: None,
918            });
919        }
920    }
921
922    // 4. Drop the renamed original type.
923    steps.push(MigrationStep {
924        step: 0,
925        operation: MigrationOperation::DropEnum,
926        schema: schema.to_string(),
927        table: None,
928        object: tmp_name.clone(),
929        object_type: "enum".into(),
930        description: format!("Drop superseded enum \"{}\".\"{}\"", schema, tmp_name),
931        ddl: Some(format!(
932            "DROP TYPE IF EXISTS {}.{}",
933            quote(schema),
934            quote(&tmp_name)
935        )),
936        safety: MigrationSafety::BestEffort,
937        risk: MigrationRisk::None,
938        risk_detail: None,
939    });
940}
941
942// ─── compute_migration_plan ──────────────────────────────────────────────────
943
944/// Diff two package configs and produce an ordered list of migration steps.
945/// This is a pure function — it does not touch the database.
946/// Pass the result to `execute_migration_plan` after user confirmation.
947pub fn compute_migration_plan(
948    old: &FullConfig,
949    new: &FullConfig,
950    schema_override: Option<&str>,
951    _rls_tenant_column: Option<&str>,
952    dialect: &dyn Dialect,
953    cross_package_configs: &HashMap<String, FullConfig>,
954) -> Result<MigrationPlan, AppError> {
955    validate(new)?;
956
957    let default_old_sid = old.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
958    let default_new_sid = new.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
959
960    let old_schemas: HashMap<&str, &SchemaConfig> =
961        old.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
962    let new_schemas: HashMap<&str, &SchemaConfig> =
963        new.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
964    let old_tables: HashMap<&str, &TableConfig> =
965        old.tables.iter().map(|t| (t.id.as_str(), t)).collect();
966    let new_tables: HashMap<&str, &TableConfig> =
967        new.tables.iter().map(|t| (t.id.as_str(), t)).collect();
968    let old_columns: HashMap<&str, &ColumnConfig> =
969        old.columns.iter().map(|c| (c.id.as_str(), c)).collect();
970    let old_enums: HashMap<&str, &EnumConfig> =
971        old.enums.iter().map(|e| (e.id.as_str(), e)).collect();
972    let new_enums: HashMap<&str, &EnumConfig> =
973        new.enums.iter().map(|e| (e.id.as_str(), e)).collect();
974    let old_indexes: HashMap<&str, &IndexConfig> =
975        old.indexes.iter().map(|i| (i.id.as_str(), i)).collect();
976    let new_indexes: HashMap<&str, &IndexConfig> =
977        new.indexes.iter().map(|i| (i.id.as_str(), i)).collect();
978    let old_rels: HashMap<&str, &RelationshipConfig> = old
979        .relationships
980        .iter()
981        .map(|r| (r.id.as_str(), r))
982        .collect();
983    let new_rels: HashMap<&str, &RelationshipConfig> = new
984        .relationships
985        .iter()
986        .map(|r| (r.id.as_str(), r))
987        .collect();
988
989    let mut steps: Vec<MigrationStep> = Vec::new();
990
991    let schema_name_for = |sid: &str, schemas: &HashMap<&str, &SchemaConfig>| -> String {
992        schema_override.map(String::from).unwrap_or_else(|| {
993            schemas
994                .get(sid)
995                .map(|s| s.name.clone())
996                .unwrap_or_else(|| sid.to_string())
997        })
998    };
999
1000    // ── 1. New schemas ───────────────────────────────────────────────────────
1001    if schema_override.is_none() {
1002        for s in &new.schemas {
1003            if !old_schemas.contains_key(s.id.as_str()) {
1004                steps.push(MigrationStep {
1005                    step: 0,
1006                    operation: MigrationOperation::CreateSchema,
1007                    schema: s.name.clone(),
1008                    table: None,
1009                    object: s.name.clone(),
1010                    object_type: "schema".into(),
1011                    description: format!("Create schema \"{}\"", s.name),
1012                    ddl: Some(format!("CREATE SCHEMA IF NOT EXISTS {}", quote(&s.name))),
1013                    safety: MigrationSafety::Safe,
1014                    risk: MigrationRisk::None,
1015                    risk_detail: None,
1016                });
1017            }
1018        }
1019    }
1020
1021    // ── 2. Enums ─────────────────────────────────────────────────────────────
1022    for new_enum in &new.enums {
1023        let sid = new_enum.schema_id.as_deref().unwrap_or(default_new_sid);
1024        let schema = schema_name_for(sid, &new_schemas);
1025
1026        if let Some(old_enum) = old_enums.get(new_enum.id.as_str()) {
1027            let old_vals: HashSet<&str> = old_enum.values.iter().map(String::as_str).collect();
1028            let new_vals: HashSet<&str> = new_enum.values.iter().map(String::as_str).collect();
1029            let removed: Vec<&str> = old_enum
1030                .values
1031                .iter()
1032                .map(String::as_str)
1033                .filter(|v| !new_vals.contains(v))
1034                .collect();
1035
1036            if removed.is_empty() {
1037                // Purely additive — append each new value in place (cheap, non-destructive).
1038                for val in new_enum
1039                    .values
1040                    .iter()
1041                    .map(String::as_str)
1042                    .filter(|v| !old_vals.contains(v))
1043                {
1044                    steps.push(MigrationStep {
1045                        step: 0,
1046                        operation: MigrationOperation::AddEnumValue,
1047                        schema: schema.clone(),
1048                        table: None,
1049                        object: format!("{}:{}", new_enum.name, val),
1050                        object_type: "enum_value".into(),
1051                        description: format!(
1052                            "Add value '{}' to enum \"{}\".\"{}\"",
1053                            val, schema, new_enum.name
1054                        ),
1055                        ddl: Some(format!(
1056                            "ALTER TYPE {}.{} ADD VALUE IF NOT EXISTS '{}'",
1057                            quote(&schema),
1058                            quote(&new_enum.name),
1059                            val.replace('\'', "''")
1060                        )),
1061                        safety: MigrationSafety::Safe,
1062                        risk: MigrationRisk::None,
1063                        risk_detail: None,
1064                    });
1065                }
1066            } else {
1067                // One or more values removed. PostgreSQL has no DROP VALUE, so rebuild the type and
1068                // recast every dependent column. Added values (if any) are folded into the rebuilt
1069                // value list, so no separate ADD VALUE step is needed.
1070                let dependents = enum_dependent_columns(
1071                    new_enum,
1072                    new,
1073                    &new_tables,
1074                    &new_schemas,
1075                    schema_override,
1076                );
1077                recreate_enum_steps(&mut steps, &schema, new_enum, &removed, &dependents);
1078            }
1079        } else {
1080            let values: Vec<String> = new_enum
1081                .values
1082                .iter()
1083                .map(|v| format!("'{}'", v.replace('\'', "''")))
1084                .collect();
1085            steps.push(MigrationStep {
1086                step: 0,
1087                operation: MigrationOperation::CreateEnum,
1088                schema: schema.clone(),
1089                table: None,
1090                object: new_enum.name.clone(),
1091                object_type: "enum".into(),
1092                description: format!("Create enum type \"{}\".\"{}\"", schema, new_enum.name),
1093                ddl: Some(format!("CREATE TYPE {}.{} AS ENUM ({})", quote(&schema), quote(&new_enum.name), values.join(", "))),
1094                safety: MigrationSafety::BestEffort,
1095                risk: MigrationRisk::None,
1096                risk_detail: Some("PostgreSQL has no CREATE TYPE IF NOT EXISTS; ignored if the type already exists.".into()),
1097            });
1098        }
1099    }
1100    for old_enum in &old.enums {
1101        if !new_enums.contains_key(old_enum.id.as_str()) {
1102            let sid = old_enum.schema_id.as_deref().unwrap_or(default_old_sid);
1103            let schema = schema_name_for(sid, &old_schemas);
1104            steps.push(MigrationStep {
1105                step: 0,
1106                operation: MigrationOperation::DropEnum,
1107                schema: schema.clone(),
1108                table: None,
1109                object: old_enum.name.clone(),
1110                object_type: "enum".into(),
1111                description: format!("Enum \"{}\".\"{}\" removed from config", schema, old_enum.name),
1112                ddl: None,
1113                safety: MigrationSafety::WarnOnly,
1114                risk: MigrationRisk::ManualActionRequired,
1115                risk_detail: Some("Enum type NOT dropped from database (data safety). Run DROP TYPE manually if intended.".into()),
1116            });
1117        }
1118    }
1119
1120    // ── 3. New and removed tables ────────────────────────────────────────────
1121    let added_table_ids: HashSet<&str> = new
1122        .tables
1123        .iter()
1124        .filter(|t| !old_tables.contains_key(t.id.as_str()))
1125        .map(|t| t.id.as_str())
1126        .collect();
1127
1128    let cols_by_table: HashMap<&str, Vec<&ColumnConfig>> =
1129        new.columns.iter().fold(HashMap::new(), |mut m, c| {
1130            m.entry(c.table_id.as_str()).or_default().push(c);
1131            m
1132        });
1133
1134    for new_table in &new.tables {
1135        if !added_table_ids.contains(new_table.id.as_str()) {
1136            continue;
1137        }
1138        let sid = new_table.schema_id.as_deref().unwrap_or(default_new_sid);
1139        let schema = schema_name_for(sid, &new_schemas);
1140        let full = format!("{}.{}", quote(&schema), quote(&new_table.name));
1141
1142        let cols = cols_by_table
1143            .get(new_table.id.as_str())
1144            .map(|v| v.as_slice())
1145            .unwrap_or(&[]);
1146        let mut col_defs: Vec<String> = Vec::new();
1147        for c in cols {
1148            let typ = dialect.ddl_type(&parse_canonical(&c.type_));
1149            let mut def = format!("{} {}", quote(&c.name), typ);
1150            if !c.nullable {
1151                def.push_str(" NOT NULL");
1152            }
1153            if let Some(ref d) = c.default {
1154                def.push_str(" DEFAULT ");
1155                match d {
1156                    ColumnDefaultConfig::Literal(s) => def.push_str(s),
1157                    ColumnDefaultConfig::Expression { expression } => def.push_str(expression),
1158                }
1159            }
1160            col_defs.push(def);
1161        }
1162        let cfg_col_names: HashSet<&str> = cols.iter().map(|c| c.name.as_str()).collect();
1163        // Note: compute_migration_plan is a pure DDL-generation function; timestamp strings are
1164        // embedded in the DDL output for display/execution. We use postgres-compatible strings
1165        // here since the plan is always applied to a real DB via execute_migration_plan which
1166        // uses the dialect there. If dialect-awareness is needed here in future, pass dialect in.
1167        for (name, suf) in [
1168            ("created_at", "TIMESTAMPTZ NOT NULL DEFAULT NOW()"),
1169            ("updated_at", "TIMESTAMPTZ NOT NULL DEFAULT NOW()"),
1170            ("archived_at", "TIMESTAMPTZ"),
1171            ("created_by", "TEXT"),
1172            ("updated_by", "TEXT"),
1173        ] {
1174            if !cfg_col_names.contains(name) {
1175                col_defs.push(format!("{} {}", quote(name), suf));
1176            }
1177        }
1178        let pk_cols = match &new_table.primary_key {
1179            PrimaryKeyConfig::Single(s) => vec![quote(s)],
1180            PrimaryKeyConfig::Composite(v) => v.iter().map(|s| quote(s)).collect(),
1181        };
1182        col_defs.push(format!("PRIMARY KEY ({})", pk_cols.join(", ")));
1183        for u in &new_table.unique {
1184            col_defs.push(format!(
1185                "UNIQUE ({})",
1186                u.iter().map(|s| quote(s)).collect::<Vec<_>>().join(", ")
1187            ));
1188        }
1189        for ch in &new_table.check {
1190            col_defs.push(format!(
1191                "CONSTRAINT {} CHECK ({})",
1192                quote(&ch.name),
1193                ch.expression
1194            ));
1195        }
1196
1197        steps.push(MigrationStep {
1198            step: 0,
1199            operation: MigrationOperation::CreateTable,
1200            schema: schema.clone(),
1201            table: Some(new_table.name.clone()),
1202            object: new_table.name.clone(),
1203            object_type: "table".into(),
1204            description: format!("Create table \"{}\".\"{}\"", schema, new_table.name),
1205            ddl: Some(format!(
1206                "CREATE TABLE IF NOT EXISTS {} (\n  {}\n)",
1207                full,
1208                col_defs.join(",\n  ")
1209            )),
1210            safety: MigrationSafety::Safe,
1211            risk: MigrationRisk::None,
1212            risk_detail: None,
1213        });
1214        if new_table.audit_log {
1215            let audit_ddl = audit_table_ddl(&schema, &new_table.name, cols, dialect);
1216            steps.push(MigrationStep {
1217                step: 0,
1218                operation: MigrationOperation::CreateTable,
1219                schema: schema.clone(),
1220                table: Some(format!("{}_audit", new_table.name)),
1221                object: format!("{}_audit", new_table.name),
1222                object_type: "table".into(),
1223                description: format!(
1224                    "Create audit table \"{}\".\"{}_audit\"",
1225                    schema, new_table.name
1226                ),
1227                ddl: Some(audit_ddl),
1228                safety: MigrationSafety::Safe,
1229                risk: MigrationRisk::None,
1230                risk_detail: None,
1231            });
1232        }
1233        if new_table.versioning.as_ref().is_some_and(|v| v.enabled) {
1234            let pk_col = match &new_table.primary_key {
1235                PrimaryKeyConfig::Single(s) => s.clone(),
1236                PrimaryKeyConfig::Composite(v) => v[0].clone(),
1237            };
1238            // Split history DDL into CREATE TABLE and index
1239            let history_create = format!(
1240                "CREATE TABLE IF NOT EXISTS {}.{} (\n  {}\n)",
1241                quote(&schema),
1242                quote(&format!("{}_history", new_table.name)),
1243                {
1244                    let full_ddl =
1245                        history_table_ddl(&schema, &new_table.name, &pk_col, cols, dialect);
1246                    full_ddl
1247                        .lines()
1248                        .skip(1) // skip CREATE TABLE line
1249                        .take_while(|l| !l.trim_start().starts_with("-- index:"))
1250                        .collect::<Vec<_>>()
1251                        .join("\n")
1252                        .trim_end_matches(['\n', ',', ')'])
1253                        .to_string()
1254                        + "\n)"
1255                }
1256            );
1257            steps.push(MigrationStep {
1258                step: 0,
1259                operation: MigrationOperation::CreateTable,
1260                schema: schema.clone(),
1261                table: Some(format!("{}_history", new_table.name)),
1262                object: format!("{}_history", new_table.name),
1263                object_type: "table".into(),
1264                description: format!(
1265                    "Create history table \"{}\".\"{}_history\" (versioning)",
1266                    schema, new_table.name
1267                ),
1268                ddl: Some(history_create),
1269                safety: MigrationSafety::Safe,
1270                risk: MigrationRisk::None,
1271                risk_detail: None,
1272            });
1273            steps.push(MigrationStep {
1274                step: 0,
1275                operation: MigrationOperation::CreateIndex,
1276                schema: schema.clone(),
1277                table: Some(format!("{}_history", new_table.name)),
1278                object: format!("{}_history_{}_idx", new_table.name, pk_col),
1279                object_type: "index".into(),
1280                description: format!(
1281                    "Create index on history table \"{}\".\"{}\" ({pk_col}, _version DESC)",
1282                    schema, new_table.name
1283                ),
1284                ddl: Some(history_index_ddl(&schema, &new_table.name, &pk_col)),
1285                safety: MigrationSafety::Safe,
1286                risk: MigrationRisk::None,
1287                risk_detail: None,
1288            });
1289        }
1290    }
1291
1292    // Existing tables that gained audit_log or versioning
1293    for new_table in &new.tables {
1294        if added_table_ids.contains(new_table.id.as_str()) {
1295            continue;
1296        }
1297        if let Some(old_table) = old_tables.get(new_table.id.as_str()) {
1298            let sid = new_table.schema_id.as_deref().unwrap_or(default_new_sid);
1299            let schema = schema_name_for(sid, &new_schemas);
1300            let cols = cols_by_table
1301                .get(new_table.id.as_str())
1302                .map(|v| v.as_slice())
1303                .unwrap_or(&[]);
1304
1305            if !old_table.audit_log && new_table.audit_log {
1306                let audit_ddl = audit_table_ddl(&schema, &new_table.name, cols, dialect);
1307                steps.push(MigrationStep {
1308                    step: 0,
1309                    operation: MigrationOperation::CreateTable,
1310                    schema: schema.clone(),
1311                    table: Some(format!("{}_audit", new_table.name)),
1312                    object: format!("{}_audit", new_table.name),
1313                    object_type: "table".into(),
1314                    description: format!(
1315                        "Enable audit log: create \"{}\".\"{}_audit\"",
1316                        schema, new_table.name
1317                    ),
1318                    ddl: Some(audit_ddl),
1319                    safety: MigrationSafety::Safe,
1320                    risk: MigrationRisk::None,
1321                    risk_detail: None,
1322                });
1323            }
1324
1325            let old_versioning_enabled = old_table.versioning.as_ref().is_some_and(|v| v.enabled);
1326            let new_versioning_enabled = new_table.versioning.as_ref().is_some_and(|v| v.enabled);
1327            if !old_versioning_enabled && new_versioning_enabled {
1328                let pk_col = match &new_table.primary_key {
1329                    PrimaryKeyConfig::Single(s) => s.clone(),
1330                    PrimaryKeyConfig::Composite(v) => v[0].clone(),
1331                };
1332                let history_ddl =
1333                    history_table_ddl(&schema, &new_table.name, &pk_col, cols, dialect);
1334                let create_only = history_ddl
1335                    .lines()
1336                    .take_while(|l| !l.trim_start().starts_with("-- index:"))
1337                    .collect::<Vec<_>>()
1338                    .join("\n");
1339                steps.push(MigrationStep {
1340                    step: 0,
1341                    operation: MigrationOperation::CreateTable,
1342                    schema: schema.clone(),
1343                    table: Some(format!("{}_history", new_table.name)),
1344                    object: format!("{}_history", new_table.name),
1345                    object_type: "table".into(),
1346                    description: format!(
1347                        "Enable versioning: create \"{}\".\"{}_history\"",
1348                        schema, new_table.name
1349                    ),
1350                    ddl: Some(create_only.trim().to_string()),
1351                    safety: MigrationSafety::Safe,
1352                    risk: MigrationRisk::None,
1353                    risk_detail: None,
1354                });
1355                steps.push(MigrationStep {
1356                    step: 0,
1357                    operation: MigrationOperation::CreateIndex,
1358                    schema: schema.clone(),
1359                    table: Some(format!("{}_history", new_table.name)),
1360                    object: format!("{}_history_{}_idx", new_table.name, pk_col),
1361                    object_type: "index".into(),
1362                    description: format!(
1363                        "Create history index on \"{}\".\"{}\"",
1364                        schema, new_table.name
1365                    ),
1366                    ddl: Some(history_index_ddl(&schema, &new_table.name, &pk_col)),
1367                    safety: MigrationSafety::Safe,
1368                    risk: MigrationRisk::None,
1369                    risk_detail: None,
1370                });
1371            }
1372        }
1373    }
1374
1375    for old_table in &old.tables {
1376        if !new_tables.contains_key(old_table.id.as_str()) {
1377            let sid = old_table.schema_id.as_deref().unwrap_or(default_old_sid);
1378            let schema = schema_name_for(sid, &old_schemas);
1379            steps.push(MigrationStep {
1380                step: 0,
1381                operation: MigrationOperation::DropTable,
1382                schema: schema.clone(),
1383                table: Some(old_table.name.clone()),
1384                object: old_table.name.clone(),
1385                object_type: "table".into(),
1386                description: format!("Table \"{}\".\"{}\" removed from config", schema, old_table.name),
1387                ddl: None,
1388                safety: MigrationSafety::WarnOnly,
1389                risk: MigrationRisk::ManualActionRequired,
1390                risk_detail: Some("Table NOT dropped from database (data safety). Run DROP TABLE manually if intended.".into()),
1391            });
1392        }
1393    }
1394
1395    // ── 4. Column changes for existing tables ────────────────────────────────
1396    for new_col in &new.columns {
1397        if added_table_ids.contains(new_col.table_id.as_str()) {
1398            continue;
1399        }
1400        let table = match new_tables.get(new_col.table_id.as_str()) {
1401            Some(t) => t,
1402            None => continue,
1403        };
1404        let sid = table.schema_id.as_deref().unwrap_or(default_new_sid);
1405        let schema = schema_name_for(sid, &new_schemas);
1406        let full = format!("{}.{}", quote(&schema), quote(&table.name));
1407
1408        if let Some(old_col) = old_columns.get(new_col.id.as_str()) {
1409            if old_col.table_id != new_col.table_id {
1410                steps.push(MigrationStep {
1411                    step: 0,
1412                    operation: MigrationOperation::AddColumn,
1413                    schema: schema.clone(),
1414                    table: Some(table.name.clone()),
1415                    object: new_col.name.clone(),
1416                    object_type: "column".into(),
1417                    description: format!("Column \"{}\" (id: {}) appears to have moved tables — manual migration required", new_col.name, new_col.id),
1418                    ddl: None,
1419                    safety: MigrationSafety::WarnOnly,
1420                    risk: MigrationRisk::ManualActionRequired,
1421                    risk_detail: Some(format!("Cannot automate column move from table {} to {}.", old_col.table_id, new_col.table_id)),
1422                });
1423                continue;
1424            }
1425
1426            // Rename
1427            if old_col.name != new_col.name {
1428                steps.push(MigrationStep {
1429                    step: 0,
1430                    operation: MigrationOperation::RenameColumn,
1431                    schema: schema.clone(),
1432                    table: Some(table.name.clone()),
1433                    object: new_col.name.clone(),
1434                    object_type: "column".into(),
1435                    description: format!(
1436                        "Rename column \"{}\" → \"{}\" on \"{}\".\"{}\"",
1437                        old_col.name, new_col.name, schema, table.name
1438                    ),
1439                    ddl: Some(format!(
1440                        "ALTER TABLE {} RENAME COLUMN {} TO {}",
1441                        full,
1442                        quote(&old_col.name),
1443                        quote(&new_col.name)
1444                    )),
1445                    safety: MigrationSafety::Safe,
1446                    risk: MigrationRisk::None,
1447                    risk_detail: None,
1448                });
1449                steps.extend(companion_column_steps(
1450                    &schema,
1451                    table,
1452                    &CompanionColumnOp::Rename {
1453                        old: &old_col.name,
1454                        new: &new_col.name,
1455                    },
1456                ));
1457            }
1458
1459            // Type change
1460            let old_type = dialect.ddl_type(&parse_canonical(&old_col.type_));
1461            let new_type = dialect.ddl_type(&parse_canonical(&new_col.type_));
1462            if old_type.to_uppercase() != new_type.to_uppercase() {
1463                let col_name = &new_col.name;
1464                steps.push(MigrationStep {
1465                    step: 0,
1466                    operation: MigrationOperation::AlterColumnType,
1467                    schema: schema.clone(),
1468                    table: Some(table.name.clone()),
1469                    object: col_name.clone(),
1470                    object_type: "column".into(),
1471                    description: format!("Change type of \"{}\".\"{}\".\"{}\": {} → {}", schema, table.name, col_name, old_type, new_type),
1472                    ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} TYPE {} USING {}::{}", full, quote(col_name), new_type, quote(col_name), new_type)),
1473                    safety: MigrationSafety::BestEffort,
1474                    risk: MigrationRisk::MayFail,
1475                    risk_detail: Some(format!("USING {}::{} cast may fail for incompatible values. Provide a custom USING expression if needed.", col_name, new_type)),
1476                });
1477                steps.extend(companion_column_steps(
1478                    &schema,
1479                    table,
1480                    &CompanionColumnOp::AlterType {
1481                        name: col_name.as_str(),
1482                        ty: &new_type,
1483                    },
1484                ));
1485            }
1486
1487            // Nullability: nullable → NOT NULL
1488            if old_col.nullable && !new_col.nullable {
1489                if let Some(ref d) = new_col.default {
1490                    let default_val = default_str(d);
1491                    // Backfill NULLs first using the configured default
1492                    steps.push(MigrationStep {
1493                        step: 0,
1494                        operation: MigrationOperation::BackfillNulls,
1495                        schema: schema.clone(),
1496                        table: Some(table.name.clone()),
1497                        object: new_col.name.clone(),
1498                        object_type: "column".into(),
1499                        description: format!("Backfill NULLs in \"{}\".\"{}\".\"{}\": SET {} = {} WHERE {} IS NULL", schema, table.name, new_col.name, new_col.name, default_val, new_col.name),
1500                        ddl: Some(format!("UPDATE {} SET {} = {} WHERE {} IS NULL", full, quote(&new_col.name), default_val, quote(&new_col.name))),
1501                        safety: MigrationSafety::Safe,
1502                        risk: MigrationRisk::DataWillBeModified,
1503                        risk_detail: Some(format!("Existing NULLs in column \"{}\" will be set to {} before NOT NULL is enforced.", new_col.name, default_val)),
1504                    });
1505                    // Then set NOT NULL — safe because NULLs are gone
1506                    steps.push(MigrationStep {
1507                        step: 0,
1508                        operation: MigrationOperation::SetNotNull,
1509                        schema: schema.clone(),
1510                        table: Some(table.name.clone()),
1511                        object: new_col.name.clone(),
1512                        object_type: "column".into(),
1513                        description: format!("Set NOT NULL on \"{}\".\"{}\".\"{}\": NULLs pre-filled with default ({})", schema, table.name, new_col.name, default_val),
1514                        ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} SET NOT NULL", full, quote(&new_col.name))),
1515                        safety: MigrationSafety::Safe,
1516                        risk: MigrationRisk::None,
1517                        risk_detail: None,
1518                    });
1519                } else {
1520                    // No default — best effort; will fail if NULLs exist
1521                    steps.push(MigrationStep {
1522                        step: 0,
1523                        operation: MigrationOperation::SetNotNull,
1524                        schema: schema.clone(),
1525                        table: Some(table.name.clone()),
1526                        object: new_col.name.clone(),
1527                        object_type: "column".into(),
1528                        description: format!("Set NOT NULL on \"{}\".\"{}\".\"{}\": no default configured — will fail if NULLs exist", schema, table.name, new_col.name),
1529                        ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} SET NOT NULL", full, quote(&new_col.name))),
1530                        safety: MigrationSafety::BestEffort,
1531                        risk: MigrationRisk::ExistingNullsMustBeAbsent,
1532                        risk_detail: Some(format!(
1533                            "No default value configured for column \"{}\". Add a default to the config to enable automatic NULL backfill before enforcing NOT NULL.",
1534                            new_col.name
1535                        )),
1536                    });
1537                }
1538            }
1539
1540            // Nullability: NOT NULL → nullable
1541            if !old_col.nullable && new_col.nullable {
1542                steps.push(MigrationStep {
1543                    step: 0,
1544                    operation: MigrationOperation::DropNotNull,
1545                    schema: schema.clone(),
1546                    table: Some(table.name.clone()),
1547                    object: new_col.name.clone(),
1548                    object_type: "column".into(),
1549                    description: format!(
1550                        "Drop NOT NULL on \"{}\".\"{}\".\"{}\": column becomes nullable",
1551                        schema, table.name, new_col.name
1552                    ),
1553                    ddl: Some(format!(
1554                        "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
1555                        full,
1556                        quote(&new_col.name)
1557                    )),
1558                    safety: MigrationSafety::Safe,
1559                    risk: MigrationRisk::None,
1560                    risk_detail: None,
1561                });
1562            }
1563
1564            // Default change
1565            let old_def = old_col.default.as_ref().map(default_str);
1566            let new_def = new_col.default.as_ref().map(default_str);
1567            if old_def != new_def {
1568                match &new_col.default {
1569                    Some(d) => {
1570                        let val = default_str(d);
1571                        steps.push(MigrationStep {
1572                            step: 0,
1573                            operation: MigrationOperation::SetDefault,
1574                            schema: schema.clone(),
1575                            table: Some(table.name.clone()),
1576                            object: new_col.name.clone(),
1577                            object_type: "column".into(),
1578                            description: format!(
1579                                "Set DEFAULT {} on \"{}\".\"{}\".\"{}\": was {}",
1580                                val,
1581                                schema,
1582                                table.name,
1583                                new_col.name,
1584                                old_def.as_deref().unwrap_or("none")
1585                            ),
1586                            ddl: Some(format!(
1587                                "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT {}",
1588                                full,
1589                                quote(&new_col.name),
1590                                val
1591                            )),
1592                            safety: MigrationSafety::Safe,
1593                            risk: MigrationRisk::None,
1594                            risk_detail: None,
1595                        });
1596                    }
1597                    None => {
1598                        steps.push(MigrationStep {
1599                            step: 0,
1600                            operation: MigrationOperation::DropDefault,
1601                            schema: schema.clone(),
1602                            table: Some(table.name.clone()),
1603                            object: new_col.name.clone(),
1604                            object_type: "column".into(),
1605                            description: format!(
1606                                "Drop DEFAULT on \"{}\".\"{}\".\"{}\": was {}",
1607                                schema,
1608                                table.name,
1609                                new_col.name,
1610                                old_def.as_deref().unwrap_or("none")
1611                            ),
1612                            ddl: Some(format!(
1613                                "ALTER TABLE {} ALTER COLUMN {} DROP DEFAULT",
1614                                full,
1615                                quote(&new_col.name)
1616                            )),
1617                            safety: MigrationSafety::Safe,
1618                            risk: MigrationRisk::None,
1619                            risk_detail: None,
1620                        });
1621                    }
1622                }
1623            }
1624        } else {
1625            // New column: ADD COLUMN
1626            let new_type = dialect.ddl_type(&parse_canonical(&new_col.type_));
1627            let mut col_def = format!("{} {}", quote(&new_col.name), new_type);
1628            if !new_col.nullable {
1629                col_def.push_str(" NOT NULL");
1630            }
1631            if let Some(ref d) = new_col.default {
1632                col_def.push_str(" DEFAULT ");
1633                match d {
1634                    ColumnDefaultConfig::Literal(s) => col_def.push_str(s),
1635                    ColumnDefaultConfig::Expression { expression } => col_def.push_str(expression),
1636                }
1637            }
1638            steps.push(MigrationStep {
1639                step: 0,
1640                operation: MigrationOperation::AddColumn,
1641                schema: schema.clone(),
1642                table: Some(table.name.clone()),
1643                object: new_col.name.clone(),
1644                object_type: "column".into(),
1645                description: format!(
1646                    "Add column \"{}\" {} to \"{}\".\"{}\"",
1647                    new_col.name, new_type, schema, table.name
1648                ),
1649                ddl: Some(format!("ALTER TABLE {} ADD COLUMN {}", full, col_def)),
1650                safety: MigrationSafety::Safe,
1651                risk: MigrationRisk::None,
1652                risk_detail: None,
1653            });
1654            steps.extend(companion_column_steps(
1655                &schema,
1656                table,
1657                &CompanionColumnOp::Add {
1658                    name: &new_col.name,
1659                    ty: &new_type,
1660                },
1661            ));
1662        }
1663    }
1664
1665    // Removed columns (warn only)
1666    for old_col in &old.columns {
1667        if new.columns.iter().any(|c| c.id == old_col.id) {
1668            continue;
1669        }
1670        if !new_tables.contains_key(old_col.table_id.as_str()) {
1671            continue;
1672        }
1673        let table_name = old_tables
1674            .get(old_col.table_id.as_str())
1675            .map(|t| t.name.as_str())
1676            .unwrap_or(&old_col.table_id);
1677        let sid = old_tables
1678            .get(old_col.table_id.as_str())
1679            .and_then(|t| t.schema_id.as_deref())
1680            .unwrap_or(default_old_sid);
1681        let schema = schema_name_for(sid, &old_schemas);
1682        steps.push(MigrationStep {
1683            step: 0,
1684            operation: MigrationOperation::DropColumn,
1685            schema: schema.clone(),
1686            table: Some(table_name.to_string()),
1687            object: old_col.name.clone(),
1688            object_type: "column".into(),
1689            description: format!("Column \"{}\" removed from config on \"{}\".\"{}\"", old_col.name, schema, table_name),
1690            ddl: None,
1691            safety: MigrationSafety::WarnOnly,
1692            risk: MigrationRisk::ManualActionRequired,
1693            risk_detail: Some("Column NOT dropped from database (data safety). Run ALTER TABLE DROP COLUMN manually if intended.".into()),
1694        });
1695    }
1696
1697    // ── 5. Indexes ───────────────────────────────────────────────────────────
1698    for old_idx in &old.indexes {
1699        if !new_indexes.contains_key(old_idx.id.as_str()) {
1700            let sid = old_idx.schema_id.as_deref().unwrap_or(default_old_sid);
1701            let schema = schema_name_for(sid, &old_schemas);
1702            steps.push(MigrationStep {
1703                step: 0,
1704                operation: MigrationOperation::DropIndex,
1705                schema: schema.clone(),
1706                table: old_tables
1707                    .get(old_idx.table_id.as_str())
1708                    .map(|t| t.name.clone()),
1709                object: old_idx.name.clone(),
1710                object_type: "index".into(),
1711                description: format!("Drop index \"{}\" in schema \"{}\"", old_idx.name, schema),
1712                ddl: Some(format!(
1713                    "DROP INDEX IF EXISTS {}.{}",
1714                    quote(&schema),
1715                    quote(&old_idx.name)
1716                )),
1717                safety: MigrationSafety::Safe,
1718                risk: MigrationRisk::None,
1719                risk_detail: None,
1720            });
1721        }
1722    }
1723    for new_idx in &new.indexes {
1724        if old_indexes.contains_key(new_idx.id.as_str())
1725            || added_table_ids.contains(new_idx.table_id.as_str())
1726        {
1727            continue;
1728        }
1729        let sid = new_idx.schema_id.as_deref().unwrap_or(default_new_sid);
1730        let schema = match new_schemas.get(sid) {
1731            Some(s) => schema_override.unwrap_or(&s.name).to_string(),
1732            None => continue,
1733        };
1734        let table = match new_tables.get(new_idx.table_id.as_str()) {
1735            Some(t) => t,
1736            None => continue,
1737        };
1738        let full_table = format!("{}.{}", quote(&schema), quote(&table.name));
1739        let mut col_parts: Vec<String> = Vec::new();
1740        for col in &new_idx.columns {
1741            match col {
1742                IndexColumnEntry::Name(n) => col_parts.push(quote(n)),
1743                IndexColumnEntry::Spec {
1744                    name, direction, ..
1745                } => {
1746                    let dir = direction
1747                        .as_deref()
1748                        .map(|d| format!(" {}", d.to_uppercase()))
1749                        .unwrap_or_default();
1750                    col_parts.push(format!("{}{}", quote(name), dir));
1751                }
1752                IndexColumnEntry::Expression { expression } => col_parts.push(expression.clone()),
1753            }
1754        }
1755        let method = new_idx.method.as_deref().unwrap_or("btree");
1756        let unique_kw = if new_idx.unique { "UNIQUE " } else { "" };
1757        let include = if new_idx.include.is_empty() {
1758            String::new()
1759        } else {
1760            format!(
1761                " INCLUDE ({})",
1762                new_idx
1763                    .include
1764                    .iter()
1765                    .map(|s| quote(s))
1766                    .collect::<Vec<_>>()
1767                    .join(", ")
1768            )
1769        };
1770        let where_clause = new_idx
1771            .where_
1772            .as_ref()
1773            .map(|w| format!(" WHERE {}", w))
1774            .unwrap_or_default();
1775        steps.push(MigrationStep {
1776            step: 0,
1777            operation: MigrationOperation::CreateIndex,
1778            schema: schema.clone(),
1779            table: Some(table.name.clone()),
1780            object: new_idx.name.clone(),
1781            object_type: "index".into(),
1782            description: format!(
1783                "Create {}index \"{}\" on \"{}\".\"{}\"",
1784                if new_idx.unique { "unique " } else { "" },
1785                new_idx.name,
1786                schema,
1787                table.name
1788            ),
1789            ddl: Some(format!(
1790                "CREATE {}INDEX IF NOT EXISTS {} ON {} USING {} ({}){}{}",
1791                unique_kw,
1792                quote(&new_idx.name),
1793                full_table,
1794                method,
1795                col_parts.join(", "),
1796                include,
1797                where_clause
1798            )),
1799            safety: MigrationSafety::Safe,
1800            risk: MigrationRisk::None,
1801            risk_detail: None,
1802        });
1803    }
1804
1805    // ── 6. Foreign keys ──────────────────────────────────────────────────────
1806    for old_rel in &old.relationships {
1807        if !new_rels.contains_key(old_rel.id.as_str()) {
1808            let from_sid_fallback = old_rel.from_schema_id.as_deref().unwrap_or(default_old_sid);
1809            let from_schema = old_schemas
1810                .get(from_sid_fallback)
1811                .map(|s| s.name.as_str())
1812                .unwrap_or(from_sid_fallback);
1813            let from_table = old_tables
1814                .get(old_rel.from_table_id.as_str())
1815                .map(|t| t.name.as_str())
1816                .unwrap_or(&old_rel.from_table_id);
1817            let constraint = old_rel.name.as_deref().unwrap_or(&old_rel.id);
1818            let schema_q = quote(schema_override.unwrap_or(from_schema));
1819            steps.push(MigrationStep {
1820                step: 0,
1821                operation: MigrationOperation::DropForeignKey,
1822                schema: schema_override.unwrap_or(from_schema).to_string(),
1823                table: Some(from_table.to_string()),
1824                object: constraint.to_string(),
1825                object_type: "foreign_key".into(),
1826                description: format!(
1827                    "Drop FK \"{}\" from \"{}\".\"{}\"",
1828                    constraint,
1829                    schema_override.unwrap_or(from_schema),
1830                    from_table
1831                ),
1832                ddl: Some(format!(
1833                    "ALTER TABLE {}.{} DROP CONSTRAINT IF EXISTS {}",
1834                    schema_q,
1835                    quote(from_table),
1836                    quote(constraint)
1837                )),
1838                safety: MigrationSafety::Safe,
1839                risk: MigrationRisk::None,
1840                risk_detail: None,
1841            });
1842        }
1843    }
1844    for new_rel in &new.relationships {
1845        if old_rels.contains_key(new_rel.id.as_str())
1846            || added_table_ids.contains(new_rel.from_table_id.as_str())
1847            || added_table_ids.contains(new_rel.to_table_id.as_str())
1848        {
1849            continue;
1850        }
1851        let from_sid = new_rel.from_schema_id.as_deref().unwrap_or(default_new_sid);
1852        let from_schema = match new_schemas.get(from_sid) {
1853            Some(s) => s,
1854            None => continue,
1855        };
1856        let from_table = match new_tables.get(new_rel.from_table_id.as_str()) {
1857            Some(t) => t,
1858            None => continue,
1859        };
1860        let from_col = new
1861            .columns
1862            .iter()
1863            .find(|c| c.id == new_rel.from_column_id)
1864            .map(|c| c.name.clone())
1865            .unwrap_or_else(|| new_rel.from_column_id.clone());
1866
1867        // Resolve the target side — cross-package or same-package.
1868        let (to_schema_name, to_table_name, to_col) =
1869            if let Some(pkg_id) = new_rel.to_package_id.as_deref() {
1870                match cross_package_configs.get(pkg_id) {
1871                    Some(foreign) => {
1872                        let foreign_tables: HashMap<_, _> =
1873                            foreign.tables.iter().map(|t| (t.id.as_str(), t)).collect();
1874                        let foreign_schemas: HashMap<_, _> =
1875                            foreign.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
1876                        let foreign_default_sid =
1877                            foreign.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
1878                        let to_sid = new_rel
1879                            .to_schema_id
1880                            .as_deref()
1881                            .unwrap_or(foreign_default_sid);
1882                        let tbl = match foreign_tables.get(new_rel.to_table_id.as_str()) {
1883                            Some(t) => t,
1884                            None => continue,
1885                        };
1886                        let schema = match foreign_schemas.get(to_sid) {
1887                            Some(s) => s,
1888                            None => continue,
1889                        };
1890                        let col = foreign
1891                            .columns
1892                            .iter()
1893                            .find(|c| c.id == new_rel.to_column_id)
1894                            .map(|c| c.name.clone())
1895                            .unwrap_or_else(|| new_rel.to_column_id.clone());
1896                        (schema.name.clone(), tbl.name.clone(), col)
1897                    }
1898                    None => continue,
1899                }
1900            } else {
1901                let to_sid = new_rel.to_schema_id.as_deref().unwrap_or(default_new_sid);
1902                let to_schema = match new_schemas.get(to_sid) {
1903                    Some(s) => s,
1904                    None => continue,
1905                };
1906                let to_table = match new_tables.get(new_rel.to_table_id.as_str()) {
1907                    Some(t) => t,
1908                    None => continue,
1909                };
1910                let col = new
1911                    .columns
1912                    .iter()
1913                    .find(|c| c.id == new_rel.to_column_id)
1914                    .map(|c| c.name.clone())
1915                    .unwrap_or_else(|| new_rel.to_column_id.clone());
1916                (
1917                    schema_override.unwrap_or(&to_schema.name).to_string(),
1918                    to_table.name.clone(),
1919                    col,
1920                )
1921            };
1922
1923        let from_schema_str = schema_override.unwrap_or(&from_schema.name);
1924        let from_q = format!("{}.{}", quote(from_schema_str), quote(&from_table.name));
1925        let to_q = format!("{}.{}", quote(&to_schema_name), quote(&to_table_name));
1926        let constraint = new_rel.name.as_deref().unwrap_or(&new_rel.id);
1927        let on_update = new_rel.on_update.as_deref().unwrap_or("NO ACTION");
1928        let on_delete = new_rel.on_delete.as_deref().unwrap_or("NO ACTION");
1929        steps.push(MigrationStep {
1930            step: 0,
1931            operation: MigrationOperation::AddForeignKey,
1932            schema: from_schema_str.to_string(),
1933            table: Some(from_table.name.clone()),
1934            object: constraint.to_string(),
1935            object_type: "foreign_key".into(),
1936            description: format!(
1937                "Add FK \"{}\" on \"{}\".\"{}\" → \"{}\".\"{}\"",
1938                constraint, from_schema_str, from_table.name, to_schema_name, to_table_name
1939            ),
1940            ddl: Some(format!(
1941                "ALTER TABLE {} ADD CONSTRAINT {} FOREIGN KEY ({}) REFERENCES {} ({}) ON UPDATE {} ON DELETE {}",
1942                from_q, quote(constraint), quote(&from_col), to_q, quote(&to_col), on_update, on_delete
1943            )),
1944            safety: MigrationSafety::BestEffort,
1945            risk: MigrationRisk::None,
1946            risk_detail: Some("PostgreSQL has no ADD CONSTRAINT IF NOT EXISTS; ignored if constraint already exists.".into()),
1947        });
1948    }
1949
1950    // Assign sequential step numbers
1951    for (i, s) in steps.iter_mut().enumerate() {
1952        s.step = i + 1;
1953    }
1954
1955    Ok(MigrationPlan { steps })
1956}
1957
1958// ─── execute_migration_plan ──────────────────────────────────────────────────
1959
1960/// Execute a pre-computed `MigrationPlan` against the tenant database.
1961/// Writes per-step audit records to the config (architect) database.
1962/// Returns counts and any warning messages collected from best-effort failures.
1963#[allow(clippy::too_many_arguments)]
1964pub async fn execute_migration_plan(
1965    migration_pool: &Pool,
1966    config_pool: &Pool,
1967    plan: &MigrationPlan,
1968    migration_plan_id: &str,
1969    package_id: &str,
1970    tenant_id: &str,
1971    from_version: Option<&str>,
1972    to_version: &str,
1973) -> Result<MigrationExecutionResult, AppError> {
1974    let mut applied = 0usize;
1975    let mut warned = 0usize;
1976    let mut warnings: Vec<String> = Vec::new();
1977
1978    for step in &plan.steps {
1979        let op = step.operation.to_string();
1980        let safety_str = format!("{:?}", step.safety);
1981        let risk_str = format!("{:?}", step.risk);
1982
1983        match step.safety {
1984            MigrationSafety::WarnOnly => {
1985                let msg = step
1986                    .risk_detail
1987                    .clone()
1988                    .unwrap_or_else(|| step.description.clone());
1989                tracing::warn!(step = step.step, %op, "migration plan warning (no DDL)");
1990                warnings.push(format!("[Step {}] {}", step.step, msg));
1991                let _ = crate::store::insert_migration_audit(
1992                    config_pool,
1993                    migration_plan_id,
1994                    package_id,
1995                    tenant_id,
1996                    from_version,
1997                    to_version,
1998                    step.step as i32,
1999                    &op,
2000                    &step.schema,
2001                    step.table.as_deref(),
2002                    &step.object,
2003                    &step.object_type,
2004                    &step.description,
2005                    step.ddl.as_deref(),
2006                    &safety_str,
2007                    &risk_str,
2008                    "skipped",
2009                    None,
2010                )
2011                .await;
2012                warned += 1;
2013            }
2014            MigrationSafety::Safe | MigrationSafety::BestEffort => {
2015                if let Some(ref sql) = step.ddl {
2016                    tracing::info!(step = step.step, %op, %sql, "executing migration step");
2017                    match sqlx::query(sql).execute(migration_pool).await {
2018                        Ok(_) => {
2019                            let _ = crate::store::insert_migration_audit(
2020                                config_pool,
2021                                migration_plan_id,
2022                                package_id,
2023                                tenant_id,
2024                                from_version,
2025                                to_version,
2026                                step.step as i32,
2027                                &op,
2028                                &step.schema,
2029                                step.table.as_deref(),
2030                                &step.object,
2031                                &step.object_type,
2032                                &step.description,
2033                                step.ddl.as_deref(),
2034                                &safety_str,
2035                                &risk_str,
2036                                "applied",
2037                                None,
2038                            )
2039                            .await;
2040                            applied += 1;
2041                        }
2042                        Err(e) => {
2043                            let err_str = e.to_string();
2044                            if matches!(step.safety, MigrationSafety::BestEffort) {
2045                                tracing::warn!(step = step.step, %op, error = %e, "migration step failed (best-effort, continuing)");
2046                                let msg = format!(
2047                                    "[Step {}] {} — Error: {}",
2048                                    step.step, step.description, err_str
2049                                );
2050                                warnings.push(msg);
2051                                let _ = crate::store::insert_migration_audit(
2052                                    config_pool,
2053                                    migration_plan_id,
2054                                    package_id,
2055                                    tenant_id,
2056                                    from_version,
2057                                    to_version,
2058                                    step.step as i32,
2059                                    &op,
2060                                    &step.schema,
2061                                    step.table.as_deref(),
2062                                    &step.object,
2063                                    &step.object_type,
2064                                    &step.description,
2065                                    step.ddl.as_deref(),
2066                                    &safety_str,
2067                                    &risk_str,
2068                                    "warned",
2069                                    Some(&err_str),
2070                                )
2071                                .await;
2072                                warned += 1;
2073                            } else {
2074                                let _ = crate::store::insert_migration_audit(
2075                                    config_pool,
2076                                    migration_plan_id,
2077                                    package_id,
2078                                    tenant_id,
2079                                    from_version,
2080                                    to_version,
2081                                    step.step as i32,
2082                                    &op,
2083                                    &step.schema,
2084                                    step.table.as_deref(),
2085                                    &step.object,
2086                                    &step.object_type,
2087                                    &step.description,
2088                                    step.ddl.as_deref(),
2089                                    &safety_str,
2090                                    &risk_str,
2091                                    "failed",
2092                                    Some(&err_str),
2093                                )
2094                                .await;
2095                                return Err(AppError::Db(e));
2096                            }
2097                        }
2098                    }
2099                }
2100            }
2101        }
2102    }
2103
2104    Ok(MigrationExecutionResult {
2105        applied,
2106        warned,
2107        warnings,
2108    })
2109}
2110
2111/// Build CREATE TABLE DDL for the `{table}_history` companion table used by row versioning.
2112/// All source columns are replicated with their types but as nullable and without any constraints
2113/// (no NOT NULL, no UNIQUE, no FK, no CHECK). Five versioning metadata columns are prepended.
2114pub fn history_table_ddl(
2115    schema_name: &str,
2116    table_name: &str,
2117    pk_col: &str,
2118    source_cols: &[&ColumnConfig],
2119    dialect: &dyn Dialect,
2120) -> String {
2121    let history_name = format!("{}_history", table_name);
2122    let history_full = format!("{}.{}", quote(schema_name), quote(&history_name));
2123
2124    let mut col_defs: Vec<String> = Vec::new();
2125    col_defs.push(format!(
2126        "{} {} NOT NULL DEFAULT {}",
2127        quote("_history_id"),
2128        "UUID",
2129        dialect.uuid_default_expr()
2130    ));
2131    col_defs.push(format!("{} BIGINT NOT NULL", quote("_version")));
2132    col_defs.push(format!("{} TEXT NOT NULL", quote("_operation")));
2133    col_defs.push(format!(
2134        "{} {} NOT NULL DEFAULT {}",
2135        quote("_recorded_at"),
2136        dialect.audit_timestamp_type(),
2137        dialect.now_fn()
2138    ));
2139    col_defs.push(format!(
2140        "{} {}",
2141        quote("_valid_from"),
2142        dialect.audit_timestamp_type()
2143    ));
2144    col_defs.push(format!(
2145        "{} {}",
2146        quote("_valid_to"),
2147        dialect.audit_timestamp_type()
2148    ));
2149
2150    let config_col_names: HashSet<&str> = source_cols.iter().map(|c| c.name.as_str()).collect();
2151    for c in source_cols {
2152        let typ = dialect.ddl_type(&parse_canonical(&c.type_));
2153        col_defs.push(format!("{} {}", quote(&c.name), typ));
2154    }
2155    let audit_ts = dialect.audit_timestamp_type();
2156    for (name, typ) in [
2157        ("created_at", audit_ts),
2158        ("updated_at", audit_ts),
2159        ("archived_at", audit_ts),
2160        ("created_by", "TEXT"),
2161        ("updated_by", "TEXT"),
2162    ] {
2163        if !config_col_names.contains(name) {
2164            col_defs.push(format!("{} {}", quote(name), typ));
2165        }
2166    }
2167    col_defs.push(format!("PRIMARY KEY ({})", quote("_history_id")));
2168
2169    let history_full_quoted = format!("{}.{}", quote(schema_name), quote(&history_name));
2170    let idx_sql = format!(
2171        "-- index: CREATE INDEX IF NOT EXISTS {} ON {} ({}, {})",
2172        quote(&format!("{}_history_{}_idx", table_name, pk_col)),
2173        history_full_quoted,
2174        quote(pk_col),
2175        quote("_version")
2176    );
2177
2178    format!(
2179        "CREATE TABLE IF NOT EXISTS {} (\n  {}\n)\n{}",
2180        history_full,
2181        col_defs.join(",\n  "),
2182        idx_sql
2183    )
2184}
2185
2186/// Build just the index DDL for the `{table}_history` table (separate from CREATE TABLE).
2187fn history_index_ddl(schema_name: &str, table_name: &str, pk_col: &str) -> String {
2188    format!(
2189        "CREATE INDEX IF NOT EXISTS {} ON {}.{} ({}, {} DESC)",
2190        quote(&format!("{}_history_{}_idx", table_name, pk_col)),
2191        quote(schema_name),
2192        quote(&format!("{}_history", table_name)),
2193        quote(pk_col),
2194        quote("_version")
2195    )
2196}
2197
2198/// A structural column change that must be mirrored onto companion (`_audit`/`_history`) tables.
2199enum CompanionColumnOp<'a> {
2200    /// A new column was added to the source table.
2201    Add { name: &'a str, ty: &'a str },
2202    /// A source column was renamed.
2203    Rename { old: &'a str, new: &'a str },
2204    /// A source column's type changed.
2205    AlterType { name: &'a str, ty: &'a str },
2206}
2207
2208/// Companion-table suffixes that are enabled for a table (`audit`, `history`).
2209fn enabled_companion_suffixes(table: &TableConfig) -> Vec<&'static str> {
2210    let mut suffixes = Vec::new();
2211    if table.audit_log {
2212        suffixes.push("audit");
2213    }
2214    if table.versioning.as_ref().is_some_and(|v| v.enabled) {
2215        suffixes.push("history");
2216    }
2217    suffixes
2218}
2219
2220/// Generate ALTER steps that keep the `{table}_audit` / `{table}_history` companion tables in
2221/// schema-sync with their source table when a column is added, renamed, or retyped.
2222///
2223/// Companion tables replicate source columns as **nullable with no constraints**, so only
2224/// structural changes propagate here. Nullability and default changes on the source column are
2225/// intentionally NOT mirrored — companion rows are historical snapshots that must stay nullable.
2226fn companion_column_steps(
2227    schema: &str,
2228    table: &TableConfig,
2229    op: &CompanionColumnOp<'_>,
2230) -> Vec<MigrationStep> {
2231    let mut steps = Vec::new();
2232    for suffix in enabled_companion_suffixes(table) {
2233        let companion = format!("{}_{}", table.name, suffix);
2234        let full = format!("{}.{}", quote(schema), quote(&companion));
2235        let (operation, object, ddl, description, safety, risk, risk_detail) = match op {
2236            CompanionColumnOp::Add { name, ty } => (
2237                MigrationOperation::AddColumn,
2238                name.to_string(),
2239                // IF NOT EXISTS guards against collisions with synthetic columns the companion
2240                // table may already carry (e.g. created_at/updated_by).
2241                format!(
2242                    "ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}",
2243                    full,
2244                    quote(name),
2245                    ty
2246                ),
2247                format!(
2248                    "Sync {} table: add column \"{}\" to \"{}\".\"{}\"",
2249                    suffix, name, schema, companion
2250                ),
2251                MigrationSafety::Safe,
2252                MigrationRisk::None,
2253                None,
2254            ),
2255            CompanionColumnOp::Rename { old, new } => (
2256                MigrationOperation::RenameColumn,
2257                new.to_string(),
2258                format!(
2259                    "ALTER TABLE {} RENAME COLUMN {} TO {}",
2260                    full,
2261                    quote(old),
2262                    quote(new)
2263                ),
2264                format!(
2265                    "Sync {} table: rename column \"{}\" → \"{}\" on \"{}\".\"{}\"",
2266                    suffix, old, new, schema, companion
2267                ),
2268                MigrationSafety::Safe,
2269                MigrationRisk::None,
2270                None,
2271            ),
2272            CompanionColumnOp::AlterType { name, ty } => (
2273                MigrationOperation::AlterColumnType,
2274                name.to_string(),
2275                format!(
2276                    "ALTER TABLE {} ALTER COLUMN {} TYPE {} USING {}::{}",
2277                    full,
2278                    quote(name),
2279                    ty,
2280                    quote(name),
2281                    ty
2282                ),
2283                format!(
2284                    "Sync {} table: change type of \"{}\".\"{}\".\"{}\" → {}",
2285                    suffix, schema, companion, name, ty
2286                ),
2287                MigrationSafety::BestEffort,
2288                MigrationRisk::MayFail,
2289                Some(format!(
2290                    "USING {}::{} cast may fail for incompatible values in the {} table.",
2291                    name, ty, suffix
2292                )),
2293            ),
2294        };
2295        steps.push(MigrationStep {
2296            step: 0,
2297            operation,
2298            schema: schema.to_string(),
2299            table: Some(companion.clone()),
2300            object,
2301            object_type: "column".into(),
2302            description,
2303            ddl: Some(ddl),
2304            safety,
2305            risk,
2306            risk_detail,
2307        });
2308    }
2309    steps
2310}
2311
2312/// Build CREATE TABLE DDL for the `{table}_audit` companion table.
2313/// All source columns are replicated as nullable with no constraints, plus five audit metadata
2314/// columns prepended: audit_id (PK), audit_action, audit_at, audit_by, changed_fields.
2315fn audit_table_ddl(
2316    schema_name: &str,
2317    table_name: &str,
2318    source_cols: &[&ColumnConfig],
2319    dialect: &dyn Dialect,
2320) -> String {
2321    let audit_name = format!("{}_audit", table_name);
2322    let audit_full = format!("{}.{}", quote(schema_name), quote(&audit_name));
2323
2324    let mut col_defs: Vec<String> = Vec::new();
2325    col_defs.push(format!(
2326        "{} {} NOT NULL DEFAULT {}",
2327        quote("audit_id"),
2328        "UUID",
2329        dialect.uuid_default_expr()
2330    ));
2331    col_defs.push(format!("{} TEXT NOT NULL", quote("audit_action")));
2332    col_defs.push(format!(
2333        "{} {} NOT NULL DEFAULT {}",
2334        quote("audit_at"),
2335        dialect.audit_timestamp_type(),
2336        dialect.now_fn()
2337    ));
2338    col_defs.push(format!("{} TEXT", quote("audit_by")));
2339    col_defs.push(format!(
2340        "{} {}",
2341        quote("changed_fields"),
2342        dialect.sys_json_type()
2343    ));
2344
2345    let config_col_names: HashSet<&str> = source_cols.iter().map(|c| c.name.as_str()).collect();
2346    for c in source_cols {
2347        let typ = dialect.ddl_type(&parse_canonical(&c.type_));
2348        col_defs.push(format!("{} {}", quote(&c.name), typ));
2349    }
2350    let audit_ts = dialect.audit_timestamp_type();
2351    for (name, typ) in [
2352        ("created_at", audit_ts),
2353        ("updated_at", audit_ts),
2354        ("archived_at", audit_ts),
2355        ("created_by", "TEXT"),
2356        ("updated_by", "TEXT"),
2357    ] {
2358        if !config_col_names.contains(name) {
2359            col_defs.push(format!("{} {}", quote(name), typ));
2360        }
2361    }
2362    col_defs.push(format!("PRIMARY KEY ({})", quote("audit_id")));
2363
2364    format!(
2365        "CREATE TABLE IF NOT EXISTS {} (\n  {}\n)",
2366        audit_full,
2367        col_defs.join(",\n  ")
2368    )
2369}
2370
2371#[cfg(test)]
2372mod enum_recreate_tests {
2373    use super::*;
2374
2375    fn schema(id: &str, name: &str) -> SchemaConfig {
2376        SchemaConfig {
2377            id: id.into(),
2378            name: name.into(),
2379            comment: None,
2380        }
2381    }
2382
2383    fn table(id: &str, name: &str, schema_id: &str) -> TableConfig {
2384        TableConfig {
2385            id: id.into(),
2386            schema_id: Some(schema_id.into()),
2387            name: name.into(),
2388            comment: None,
2389            primary_key: PrimaryKeyConfig::Single("id".into()),
2390            unique: vec![],
2391            check: vec![],
2392            audit_log: false,
2393            versioning: None,
2394        }
2395    }
2396
2397    fn col(id: &str, table_id: &str, name: &str, ty: &str, default: Option<&str>) -> ColumnConfig {
2398        ColumnConfig {
2399            id: id.into(),
2400            table_id: table_id.into(),
2401            name: name.into(),
2402            type_: ColumnTypeConfig::Simple(ty.into()),
2403            nullable: true,
2404            default: default.map(|d| ColumnDefaultConfig::Literal(d.into())),
2405            comment: None,
2406            asset: None,
2407            extensible: false,
2408        }
2409    }
2410
2411    fn enum_cfg(id: &str, name: &str, schema_id: &str, values: &[&str]) -> EnumConfig {
2412        EnumConfig {
2413            id: id.into(),
2414            schema_id: Some(schema_id.into()),
2415            name: name.into(),
2416            values: values.iter().map(|s| s.to_string()).collect(),
2417            comment: None,
2418        }
2419    }
2420
2421    fn ddls(steps: &[MigrationStep]) -> Vec<String> {
2422        steps.iter().filter_map(|s| s.ddl.clone()).collect()
2423    }
2424
2425    #[test]
2426    fn finds_scalar_and_array_dependent_columns() {
2427        let mut cfg = FullConfig::default();
2428        cfg.schemas = vec![schema("s1", "app")];
2429        cfg.tables = vec![table("t_orders", "orders", "s1")];
2430        cfg.columns = vec![
2431            col(
2432                "c1",
2433                "t_orders",
2434                "status",
2435                "order_status",
2436                Some("'pending'"),
2437            ),
2438            col("c2", "t_orders", "tags", "order_status[]", None),
2439            col("c3", "t_orders", "name", "text", None), // not an enum
2440        ];
2441        let e = enum_cfg("e1", "order_status", "s1", &["pending", "shipped"]);
2442        let new_tables: HashMap<&str, &TableConfig> =
2443            cfg.tables.iter().map(|t| (t.id.as_str(), t)).collect();
2444        let new_schemas: HashMap<&str, &SchemaConfig> =
2445            cfg.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
2446
2447        let deps = enum_dependent_columns(&e, &cfg, &new_tables, &new_schemas, None);
2448        assert_eq!(deps.len(), 2);
2449        let scalar = deps.iter().find(|d| d.column == "status").unwrap();
2450        assert_eq!(scalar.schema, "app");
2451        assert_eq!(scalar.table, "orders");
2452        assert!(!scalar.is_array);
2453        assert_eq!(scalar.default.as_deref(), Some("'pending'"));
2454        let arr = deps.iter().find(|d| d.column == "tags").unwrap();
2455        assert!(arr.is_array);
2456        assert!(arr.default.is_none());
2457    }
2458
2459    #[test]
2460    fn recreate_sequence_emits_rename_create_recast_drop() {
2461        let e = enum_cfg("e1", "order_status", "s1", &["pending", "shipped"]);
2462        let deps = vec![
2463            EnumColumnRef {
2464                schema: "app".into(),
2465                table: "orders".into(),
2466                column: "status".into(),
2467                default: Some("'pending'".into()),
2468                is_array: false,
2469            },
2470            EnumColumnRef {
2471                schema: "app".into(),
2472                table: "orders".into(),
2473                column: "tags".into(),
2474                default: None,
2475                is_array: true,
2476            },
2477        ];
2478        let mut steps = Vec::new();
2479        recreate_enum_steps(&mut steps, "app", &e, &["cancelled"], &deps);
2480        let sql = ddls(&steps);
2481
2482        // Leading informational step carries no DDL.
2483        assert!(matches!(steps[0].safety, MigrationSafety::WarnOnly));
2484        assert!(steps[0].ddl.is_none());
2485
2486        // Rename → create → (drop default, recast, set default) → recast array → drop old.
2487        assert_eq!(
2488            sql[0],
2489            r#"ALTER TYPE "app"."order_status" RENAME TO "order_status__arch_old""#
2490        );
2491        assert_eq!(
2492            sql[1],
2493            r#"CREATE TYPE "app"."order_status" AS ENUM ('pending', 'shipped')"#
2494        );
2495        assert_eq!(
2496            sql[2],
2497            r#"ALTER TABLE "app"."orders" ALTER COLUMN "status" DROP DEFAULT"#
2498        );
2499        assert_eq!(
2500            sql[3],
2501            r#"ALTER TABLE "app"."orders" ALTER COLUMN "status" TYPE "app"."order_status" USING "status"::text::"app"."order_status""#
2502        );
2503        assert_eq!(
2504            sql[4],
2505            r#"ALTER TABLE "app"."orders" ALTER COLUMN "status" SET DEFAULT 'pending'"#
2506        );
2507        // Array column: no default, single recast with array casts.
2508        assert_eq!(
2509            sql[5],
2510            r#"ALTER TABLE "app"."orders" ALTER COLUMN "tags" TYPE "app"."order_status"[] USING "tags"::text[]::"app"."order_status"[]"#
2511        );
2512        assert_eq!(
2513            *sql.last().unwrap(),
2514            r#"DROP TYPE IF EXISTS "app"."order_status__arch_old""#
2515        );
2516    }
2517}
2518
2519#[cfg(all(test, feature = "sqlite"))]
2520mod companion_sync_tests {
2521    use super::*;
2522    use crate::db::sqlite::SqliteDialect;
2523
2524    fn schema(id: &str, name: &str) -> SchemaConfig {
2525        SchemaConfig {
2526            id: id.into(),
2527            name: name.into(),
2528            comment: None,
2529        }
2530    }
2531
2532    fn table(id: &str, name: &str, audit: bool, versioning: bool) -> TableConfig {
2533        TableConfig {
2534            id: id.into(),
2535            schema_id: Some("s1".into()),
2536            name: name.into(),
2537            comment: None,
2538            primary_key: PrimaryKeyConfig::Single("id".into()),
2539            unique: vec![],
2540            check: vec![],
2541            audit_log: audit,
2542            versioning: versioning.then(|| VersioningConfig {
2543                enabled: true,
2544                keep_versions: None,
2545            }),
2546        }
2547    }
2548
2549    fn col(id: &str, name: &str, ty: &str) -> ColumnConfig {
2550        ColumnConfig {
2551            id: id.into(),
2552            table_id: "t1".into(),
2553            name: name.into(),
2554            type_: ColumnTypeConfig::Simple(ty.into()),
2555            nullable: true,
2556            default: None,
2557            comment: None,
2558            asset: None,
2559            extensible: false,
2560        }
2561    }
2562
2563    fn base(audit: bool, versioning: bool) -> FullConfig {
2564        let mut cfg = FullConfig::default();
2565        cfg.schemas = vec![schema("s1", "app")];
2566        cfg.tables = vec![table("t1", "orders", audit, versioning)];
2567        cfg.columns = vec![col("c0", "id", "uuid"), col("c1", "status", "text")];
2568        cfg
2569    }
2570
2571    fn plan(old: &FullConfig, new: &FullConfig) -> Vec<String> {
2572        let dialect = SqliteDialect;
2573        compute_migration_plan(old, new, None, None, &dialect, &HashMap::new())
2574            .unwrap()
2575            .steps
2576            .into_iter()
2577            .filter_map(|s| s.ddl)
2578            .collect()
2579    }
2580
2581    #[test]
2582    fn add_column_syncs_audit_and_history() {
2583        let old = base(true, true);
2584        let mut new = base(true, true);
2585        new.columns.push(col("c2", "note", "text"));
2586
2587        let sql = plan(&old, &new);
2588        assert!(sql
2589            .iter()
2590            .any(|s| s == r#"ALTER TABLE "app"."orders" ADD COLUMN "note" TEXT"#));
2591        assert!(sql.iter().any(
2592            |s| s == r#"ALTER TABLE "app"."orders_audit" ADD COLUMN IF NOT EXISTS "note" TEXT"#
2593        ));
2594        assert!(sql
2595            .iter()
2596            .any(|s| s
2597                == r#"ALTER TABLE "app"."orders_history" ADD COLUMN IF NOT EXISTS "note" TEXT"#));
2598    }
2599
2600    #[test]
2601    fn rename_column_syncs_companions() {
2602        let old = base(true, true);
2603        let mut new = base(true, true);
2604        new.columns[1].name = "state".into();
2605
2606        let sql = plan(&old, &new);
2607        assert!(sql
2608            .iter()
2609            .any(|s| s == r#"ALTER TABLE "app"."orders_audit" RENAME COLUMN "status" TO "state""#));
2610        assert!(sql.iter().any(
2611            |s| s == r#"ALTER TABLE "app"."orders_history" RENAME COLUMN "status" TO "state""#
2612        ));
2613    }
2614
2615    #[test]
2616    fn alter_type_syncs_companions() {
2617        let old = base(true, false);
2618        let mut new = base(true, false);
2619        new.columns[1].type_ = ColumnTypeConfig::Simple("integer".into());
2620
2621        let sql = plan(&old, &new);
2622        assert!(sql.iter().any(|s| s
2623            == r#"ALTER TABLE "app"."orders_audit" ALTER COLUMN "status" TYPE INTEGER USING "status"::INTEGER"#));
2624        // versioning disabled → no history sync
2625        assert!(!sql.iter().any(|s| s.contains("orders_history")));
2626    }
2627
2628    #[test]
2629    fn no_companion_steps_when_features_disabled() {
2630        let old = base(false, false);
2631        let mut new = base(false, false);
2632        new.columns.push(col("c2", "note", "text"));
2633
2634        let sql = plan(&old, &new);
2635        assert!(sql
2636            .iter()
2637            .any(|s| s == r#"ALTER TABLE "app"."orders" ADD COLUMN "note" TEXT"#));
2638        assert!(!sql.iter().any(|s| s.contains("orders_audit")));
2639        assert!(!sql.iter().any(|s| s.contains("orders_history")));
2640    }
2641
2642    #[test]
2643    fn nullability_change_does_not_touch_companions() {
2644        let old = base(true, true);
2645        let mut new = base(true, true);
2646        new.columns[1].nullable = false;
2647
2648        let sql = plan(&old, &new);
2649        // Main table gets SET NOT NULL, companions are untouched (snapshots stay nullable).
2650        assert!(sql.iter().any(|s| s.contains("SET NOT NULL")));
2651        assert!(!sql.iter().any(|s| s.contains("orders_audit")));
2652        assert!(!sql.iter().any(|s| s.contains("orders_history")));
2653    }
2654}