1use crate::config::types::*;
5use crate::config::{validate, FullConfig};
6use crate::db::parse_canonical;
7use crate::db::pool::Pool;
8use crate::db::Dialect;
9use crate::error::AppError;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12
13fn quote(s: &str) -> String {
14 format!("\"{}\"", s.replace('\\', "\\\\").replace('"', "\\\""))
15}
16
17pub const RLS_TENANT_COLUMN: &str = "tenant_id";
19
20pub async fn apply_migrations(
25 pool: &Pool,
26 config: &FullConfig,
27 schema_override: Option<&str>,
28 rls_tenant_column: Option<&str>,
29 dialect: &dyn Dialect,
30 cross_package_configs: &HashMap<String, FullConfig>,
31) -> Result<(), AppError> {
32 validate(config)?;
33 let default_sid = config
34 .schemas
35 .first()
36 .map(|s| s.id.as_str())
37 .ok_or_else(|| {
38 AppError::Config(crate::error::ConfigError::Validation(
39 "at least one schema required".into(),
40 ))
41 })?;
42
43 if dialect.supports_schemas() {
44 if let Some(s) = schema_override {
45 let name = quote(s);
46 sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {}", name))
47 .execute(pool)
48 .await?;
49 }
50 }
51
52 let schemas_by_id: HashMap<_, _> = config.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
53 let tables_by_id: HashMap<_, _> = config.tables.iter().map(|t| (t.id.as_str(), t)).collect();
54 let columns_by_table: HashMap<_, Vec<&ColumnConfig>> =
55 config.columns.iter().fold(HashMap::new(), |mut m, c| {
56 m.entry(c.table_id.as_str()).or_default().push(c);
57 m
58 });
59
60 if schema_override.is_none() && dialect.supports_schemas() {
62 for s in &config.schemas {
63 let name = quote(&s.name);
64 let comment = s
65 .comment
66 .as_ref()
67 .map(|c| format!("COMMENT ON SCHEMA {} IS '{}'", name, c.replace('\'', "''")));
68 sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {}", name))
69 .execute(pool)
70 .await?;
71 if let Some(sql) = comment {
72 let _ = sqlx::query(&sql).execute(pool).await;
73 }
74 }
75 }
76
77 for e in &config.enums {
78 let sid = e.schema_id.as_deref().unwrap_or(default_sid);
79 let schema = schemas_by_id.get(sid).ok_or_else(|| {
80 AppError::Config(crate::error::ConfigError::MissingReference {
81 kind: "schema",
82 id: sid.to_string(),
83 })
84 })?;
85 let schema_name = quote(schema_override.unwrap_or(&schema.name));
86 let type_name = quote(&e.name);
87 if dialect.supports_named_enum_types() {
88 let values: Vec<String> = e
89 .values
90 .iter()
91 .map(|v| format!("'{}'", v.replace('\'', "''")))
92 .collect();
93 let sql = format!(
94 "CREATE TYPE {}.{} AS ENUM ({})",
95 schema_name,
96 type_name,
97 values.join(", ")
98 );
99 let _ = sqlx::query(&sql).execute(pool).await;
100 }
101 }
102
103 for t in &config.tables {
104 let sid = t.schema_id.as_deref().unwrap_or(default_sid);
105 let schema = schemas_by_id.get(sid).ok_or_else(|| {
106 AppError::Config(crate::error::ConfigError::MissingReference {
107 kind: "schema",
108 id: sid.to_string(),
109 })
110 })?;
111 let schema_name = quote(schema_override.unwrap_or(&schema.name));
112 let table_name = quote(&t.name);
113 let full_name = format!("{}.{}", schema_name, table_name);
114
115 let cols = columns_by_table
116 .get(t.id.as_str())
117 .map(|v| v.as_slice())
118 .unwrap_or(&[]);
119 let mut col_defs: Vec<String> = Vec::new();
120 for c in cols {
121 let typ = dialect.ddl_type(&parse_canonical(&c.type_));
122 let mut def = format!("{} {}", quote(&c.name), typ);
123 if !c.nullable {
124 def.push_str(" NOT NULL");
125 }
126 if let Some(ref d) = c.default {
127 def.push_str(" DEFAULT ");
128 match d {
129 ColumnDefaultConfig::Literal(s) => def.push_str(s),
130 ColumnDefaultConfig::Expression { expression } => def.push_str(expression),
131 }
132 }
133 col_defs.push(def);
134 }
135
136 let config_col_names: HashSet<&str> = cols.iter().map(|c| c.name.as_str()).collect();
137 let ts_default = format!(
138 "{} NOT NULL DEFAULT {}",
139 dialect.sys_timestamp_type(),
140 dialect.now_fn()
141 );
142 let ts_nullable = dialect.sys_timestamp_type().to_string();
143 for (name, def_suffix) in [
144 ("created_at", ts_default.as_str()),
145 ("updated_at", ts_default.as_str()),
146 ("archived_at", ts_nullable.as_str()),
147 ("created_by", "TEXT"),
148 ("updated_by", "TEXT"),
149 ] {
150 if !config_col_names.contains(name) {
151 col_defs.push(format!("{} {}", quote(name), def_suffix));
152 }
153 }
154
155 let pk_cols = match &t.primary_key {
156 PrimaryKeyConfig::Single(s) => vec![quote(s)],
157 PrimaryKeyConfig::Composite(v) => v.iter().map(|s| quote(s)).collect::<Vec<_>>(),
158 };
159 let pk_def = format!("PRIMARY KEY ({})", pk_cols.join(", "));
160 col_defs.push(pk_def);
161
162 for u in &t.unique {
163 let cols: Vec<String> = u.iter().map(|s| quote(s)).collect();
164 col_defs.push(format!("UNIQUE ({})", cols.join(", ")));
165 }
166 for ch in &t.check {
167 col_defs.push(format!(
168 "CONSTRAINT {} CHECK ({})",
169 quote(&ch.name),
170 ch.expression
171 ));
172 }
173
174 let sql = format!(
175 "CREATE TABLE IF NOT EXISTS {} (\n {}\n)",
176 full_name,
177 col_defs.join(",\n ")
178 );
179 sqlx::query(&sql).execute(pool).await?;
180
181 if t.audit_log {
182 let schema_raw = schema_override.unwrap_or(&schema.name);
183 let audit_sql = audit_table_ddl(schema_raw, &t.name, cols, dialect);
184 sqlx::query(&audit_sql).execute(pool).await?;
185 let pk_col = match &t.primary_key {
186 PrimaryKeyConfig::Single(s) => s.clone(),
187 PrimaryKeyConfig::Composite(v) => v[0].clone(),
188 };
189 let audit_full = format!(
190 "{}.{}",
191 quote(schema_raw),
192 quote(&format!("{}_audit", t.name))
193 );
194 let idx_sql = format!(
195 "CREATE INDEX IF NOT EXISTS {} ON {} ({}, {})",
196 quote(&format!("{}_audit_record_idx", t.name)),
197 audit_full,
198 quote(&pk_col),
199 quote("audit_at")
200 );
201 let _ = sqlx::query(&idx_sql).execute(pool).await;
202 }
203
204 if t.versioning.as_ref().is_some_and(|v| v.enabled) {
205 let schema_raw = schema_override.unwrap_or(&schema.name);
206 let pk_col = match &t.primary_key {
207 PrimaryKeyConfig::Single(s) => s.clone(),
208 PrimaryKeyConfig::Composite(v) => v[0].clone(),
209 };
210 let history_ddl = history_table_ddl(schema_raw, &t.name, &pk_col, cols, dialect);
211 let create_only = history_ddl
213 .lines()
214 .take_while(|l| !l.trim_start().starts_with("-- index:"))
215 .collect::<Vec<_>>()
216 .join("\n");
217 sqlx::query(create_only.trim()).execute(pool).await?;
218 let idx_sql = history_index_ddl(schema_raw, &t.name, &pk_col);
219 let _ = sqlx::query(&idx_sql).execute(pool).await;
220 }
221
222 if let Some(col) = rls_tenant_column {
223 if dialect.supports_rls() {
224 if !config_col_names.contains(col) {
225 let q_col = quote(col);
226 let add_col = format!(
227 "ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} TEXT",
228 full_name, q_col
229 );
230 sqlx::query(&add_col).execute(pool).await?;
231 }
232 let enable_rls = format!("ALTER TABLE {} ENABLE ROW LEVEL SECURITY", full_name);
233 sqlx::query(&enable_rls).execute(pool).await?;
234 let q_col = quote(col);
235 let setting = "current_setting('app.tenant_id', true)";
236 let cond = format!("{} = {}", q_col, setting);
237 let policy_prefix = format!("rls_tenant_{}", t.name);
238 let policies: &[(&str, &str, Option<&str>, Option<&str>)] = &[
239 ("select", "SELECT", Some(cond.as_str()), None),
240 ("insert", "INSERT", None, Some(cond.as_str())),
241 ("update", "UPDATE", Some(cond.as_str()), Some(cond.as_str())),
242 ("delete", "DELETE", Some(cond.as_str()), None),
243 ];
244 for (suffix, cmd, using_cond, with_check) in policies.iter() {
245 let policy_name = format!("{}_{}", policy_prefix, suffix);
246 let drop_sql = format!(
247 "DROP POLICY IF EXISTS {} ON {}",
248 quote(&policy_name),
249 full_name
250 );
251 let _ = sqlx::query(&drop_sql).execute(pool).await;
252 let create_sql = match (using_cond, with_check) {
253 (Some(u), Some(w)) => format!(
254 "CREATE POLICY {} ON {} FOR {} USING ( {} ) WITH CHECK ( {} )",
255 quote(&policy_name),
256 full_name,
257 cmd,
258 u,
259 w
260 ),
261 (Some(u), None) => format!(
262 "CREATE POLICY {} ON {} FOR {} USING ( {} )",
263 quote(&policy_name),
264 full_name,
265 cmd,
266 u
267 ),
268 (None, Some(w)) => format!(
269 "CREATE POLICY {} ON {} FOR {} WITH CHECK ( {} )",
270 quote(&policy_name),
271 full_name,
272 cmd,
273 w
274 ),
275 (None, None) => continue,
276 };
277 sqlx::query(&create_sql).execute(pool).await?;
278 }
279 } else {
280 tracing::warn!(table = %full_name, dialect = %dialect.name(), "RLS requested but not supported by this dialect; skipping");
281 }
282 }
283 }
284
285 for idx in &config.indexes {
286 let sid = idx.schema_id.as_deref().unwrap_or(default_sid);
287 let schema = schemas_by_id.get(sid).ok_or_else(|| {
288 AppError::Config(crate::error::ConfigError::MissingReference {
289 kind: "schema",
290 id: sid.to_string(),
291 })
292 })?;
293 let table = tables_by_id.get(idx.table_id.as_str()).ok_or_else(|| {
294 AppError::Config(crate::error::ConfigError::MissingReference {
295 kind: "table",
296 id: idx.table_id.clone(),
297 })
298 })?;
299 let schema_name = quote(schema_override.unwrap_or(&schema.name));
300 let table_name = quote(&table.name);
301 let full_table = format!("{}.{}", schema_name, table_name);
302 let index_name = quote(&idx.name);
303
304 let mut col_parts: Vec<String> = Vec::new();
305 for col in &idx.columns {
306 match col {
307 IndexColumnEntry::Name(n) => col_parts.push(quote(n)),
308 IndexColumnEntry::Spec {
309 name, direction, ..
310 } => {
311 let dir = direction
312 .as_deref()
313 .map(|d| format!(" {}", d.to_uppercase()))
314 .unwrap_or_default();
315 col_parts.push(format!("{}{}", quote(name), dir));
316 }
317 IndexColumnEntry::Expression { expression } => col_parts.push(expression.clone()),
318 }
319 }
320 let method = idx.method.as_deref().unwrap_or("btree");
321 let unique = if idx.unique { "UNIQUE " } else { "" };
322 let include: String = if idx.include.is_empty() {
323 String::new()
324 } else {
325 let inc: Vec<String> = idx.include.iter().map(|s| quote(s)).collect();
326 format!(" INCLUDE ({})", inc.join(", "))
327 };
328 let where_clause: String = idx
329 .where_
330 .as_ref()
331 .map(|w| format!(" WHERE {}", w))
332 .unwrap_or_default();
333
334 let sql = format!(
335 "CREATE {}INDEX IF NOT EXISTS {} ON {} USING {} ({}){}{}",
336 unique,
337 index_name,
338 full_table,
339 method,
340 col_parts.join(", "),
341 include,
342 where_clause
343 );
344 let _ = sqlx::query(&sql).execute(pool).await;
345 }
346
347 for rel in &config.relationships {
348 let from_sid = rel.from_schema_id.as_deref().unwrap_or(default_sid);
349 let from_schema = schemas_by_id.get(from_sid).ok_or_else(|| {
350 AppError::Config(crate::error::ConfigError::MissingReference {
351 kind: "schema",
352 id: from_sid.to_string(),
353 })
354 })?;
355 let from_table = tables_by_id
356 .get(rel.from_table_id.as_str())
357 .ok_or_else(|| {
358 AppError::Config(crate::error::ConfigError::MissingReference {
359 kind: "table",
360 id: rel.from_table_id.clone(),
361 })
362 })?;
363
364 let (to_schema_name_owned, to_table_name, to_col_name) = if let Some(pkg_id) =
366 rel.to_package_id.as_deref()
367 {
368 let foreign = cross_package_configs.get(pkg_id).ok_or_else(|| {
369 AppError::Config(crate::error::ConfigError::MissingReference {
370 kind: "cross_package",
371 id: pkg_id.to_string(),
372 })
373 })?;
374 let foreign_tables: HashMap<_, _> =
375 foreign.tables.iter().map(|t| (t.id.as_str(), t)).collect();
376 let foreign_schemas: HashMap<_, _> =
377 foreign.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
378 let to_tbl = foreign_tables
379 .get(rel.to_table_id.as_str())
380 .ok_or_else(|| {
381 AppError::Config(crate::error::ConfigError::MissingReference {
382 kind: "table",
383 id: rel.to_table_id.clone(),
384 })
385 })?;
386 let foreign_default_sid = foreign.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
387 let to_sid = rel.to_schema_id.as_deref().unwrap_or(foreign_default_sid);
388 let to_schema = foreign_schemas.get(to_sid).ok_or_else(|| {
389 AppError::Config(crate::error::ConfigError::MissingReference {
390 kind: "schema",
391 id: to_sid.to_string(),
392 })
393 })?;
394 let col_name = foreign
395 .columns
396 .iter()
397 .find(|c| c.id == rel.to_column_id)
398 .map(|c| c.name.clone())
399 .ok_or_else(|| {
400 AppError::Config(crate::error::ConfigError::MissingReference {
401 kind: "column",
402 id: rel.to_column_id.clone(),
403 })
404 })?;
405 (to_schema.name.clone(), to_tbl.name.clone(), col_name)
408 } else {
409 let to_sid = rel.to_schema_id.as_deref().unwrap_or(default_sid);
410 let to_schema = schemas_by_id.get(to_sid).ok_or_else(|| {
411 AppError::Config(crate::error::ConfigError::MissingReference {
412 kind: "schema",
413 id: to_sid.to_string(),
414 })
415 })?;
416 let to_table = tables_by_id.get(rel.to_table_id.as_str()).ok_or_else(|| {
417 AppError::Config(crate::error::ConfigError::MissingReference {
418 kind: "table",
419 id: rel.to_table_id.clone(),
420 })
421 })?;
422 let col_name = config
423 .columns
424 .iter()
425 .find(|c| c.id == rel.to_column_id)
426 .map(|c| c.name.clone())
427 .ok_or_else(|| {
428 AppError::Config(crate::error::ConfigError::MissingReference {
429 kind: "column",
430 id: rel.to_column_id.clone(),
431 })
432 })?;
433 (
434 schema_override.unwrap_or(&to_schema.name).to_string(),
435 to_table.name.clone(),
436 col_name,
437 )
438 };
439
440 let from_schema_name = schema_override.unwrap_or(&from_schema.name);
441 let from_col = config
442 .columns
443 .iter()
444 .find(|c| c.id == rel.from_column_id)
445 .map(|c| c.name.as_str())
446 .ok_or_else(|| {
447 AppError::Config(crate::error::ConfigError::MissingReference {
448 kind: "column",
449 id: rel.from_column_id.clone(),
450 })
451 })?;
452
453 let from_full = format!("{}.{}", quote(from_schema_name), quote(&from_table.name));
454 let to_full = format!("{}.{}", quote(&to_schema_name_owned), quote(&to_table_name));
455 let constraint_name = rel.name.as_deref().unwrap_or(&rel.id);
456 let on_update = rel.on_update.as_deref().unwrap_or("NO ACTION");
457 let on_delete = rel.on_delete.as_deref().unwrap_or("NO ACTION");
458
459 let sql = format!(
460 "ALTER TABLE {} ADD CONSTRAINT {} FOREIGN KEY ({}) REFERENCES {} ({}) ON UPDATE {} ON DELETE {}",
461 from_full,
462 quote(constraint_name),
463 quote(from_col),
464 to_full,
465 quote(&to_col_name),
466 on_update,
467 on_delete
468 );
469 let _ = sqlx::query(&sql).execute(pool).await;
470 }
471
472 Ok(())
473}
474
475pub async fn revert_migrations(
478 pool: &Pool,
479 config: &FullConfig,
480 schema_override: Option<&str>,
481) -> Result<(), AppError> {
482 let default_sid = config
483 .schemas
484 .first()
485 .map(|s| s.id.as_str())
486 .ok_or_else(|| {
487 AppError::Config(crate::error::ConfigError::Validation(
488 "at least one schema required".into(),
489 ))
490 })?;
491
492 let schemas_by_id: HashMap<_, _> = config.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
493
494 for t in &config.tables {
496 let sid = t.schema_id.as_deref().unwrap_or(default_sid);
497 let schema = schemas_by_id.get(sid).ok_or_else(|| {
498 AppError::Config(crate::error::ConfigError::MissingReference {
499 kind: "schema",
500 id: sid.to_string(),
501 })
502 })?;
503 let schema_raw = schema_override.unwrap_or(&schema.name);
504 let schema_name = quote(schema_raw);
505 let table_name = quote(&t.name);
506 let full_name = format!("{}.{}", schema_name, table_name);
507 if t.audit_log {
508 let audit_full = format!("{}.{}", schema_name, quote(&format!("{}_audit", t.name)));
509 let _ = sqlx::query(&format!("DROP TABLE IF EXISTS {} CASCADE", audit_full))
510 .execute(pool)
511 .await;
512 }
513 if t.versioning.as_ref().is_some_and(|v| v.enabled) {
514 let history_full = format!("{}.{}", schema_name, quote(&format!("{}_history", t.name)));
515 let _ = sqlx::query(&format!("DROP TABLE IF EXISTS {} CASCADE", history_full))
516 .execute(pool)
517 .await;
518 }
519 let drop_sql = format!("DROP TABLE IF EXISTS {} CASCADE", full_name);
520 let _ = sqlx::query(&drop_sql).execute(pool).await;
521 }
522
523 for e in &config.enums {
525 let sid = e.schema_id.as_deref().unwrap_or(default_sid);
526 let schema = schemas_by_id.get(sid).ok_or_else(|| {
527 AppError::Config(crate::error::ConfigError::MissingReference {
528 kind: "schema",
529 id: sid.to_string(),
530 })
531 })?;
532 let schema_name = quote(schema_override.unwrap_or(&schema.name));
533 let type_name = quote(&e.name);
534 let drop_sql = format!("DROP TYPE IF EXISTS {}.{} CASCADE", schema_name, type_name);
535 let _ = sqlx::query(&drop_sql).execute(pool).await;
536 }
537
538 if schema_override.is_none() {
540 for s in &config.schemas {
541 if s.name.eq_ignore_ascii_case("public") {
542 continue;
543 }
544 let schema_name = quote(&s.name);
545 let drop_sql = format!("DROP SCHEMA IF EXISTS {} CASCADE", schema_name);
546 let _ = sqlx::query(&drop_sql).execute(pool).await;
547 }
548 }
549
550 Ok(())
551}
552
553#[derive(Debug, Clone, Serialize, Deserialize)]
556#[serde(rename_all = "snake_case")]
557pub enum MigrationOperation {
558 CreateSchema,
559 CreateEnum,
560 DropEnum,
561 AddEnumValue,
562 RemoveEnumValue,
563 CreateTable,
564 DropTable,
565 AddColumn,
566 DropColumn,
567 RenameColumn,
568 AlterColumnType,
569 BackfillNulls,
570 SetNotNull,
571 DropNotNull,
572 SetDefault,
573 DropDefault,
574 CreateIndex,
575 DropIndex,
576 AddForeignKey,
577 DropForeignKey,
578}
579
580impl std::fmt::Display for MigrationOperation {
581 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
582 let s = serde_json::to_value(self)
583 .ok()
584 .and_then(|v| v.as_str().map(String::from))
585 .unwrap_or_else(|| format!("{:?}", self));
586 write!(f, "{}", s)
587 }
588}
589
590#[derive(Debug, Clone, Serialize, Deserialize)]
592#[serde(rename_all = "snake_case")]
593pub enum MigrationSafety {
594 Safe,
596 BestEffort,
598 WarnOnly,
600}
601
602#[derive(Debug, Clone, Serialize, Deserialize)]
604#[serde(rename_all = "snake_case")]
605pub enum MigrationRisk {
606 None,
607 MayFail,
609 ExistingNullsMustBeAbsent,
611 DataWillBeModified,
613 ManualActionRequired,
615}
616
617#[derive(Debug, Clone, Serialize, Deserialize)]
619pub struct MigrationStep {
620 pub step: usize,
621 pub operation: MigrationOperation,
622 pub schema: String,
623 pub table: Option<String>,
624 pub object: String,
626 pub object_type: String,
628 pub description: String,
629 pub ddl: Option<String>,
631 pub safety: MigrationSafety,
632 pub risk: MigrationRisk,
633 pub risk_detail: Option<String>,
634}
635
636#[derive(Debug, Clone, Serialize, Deserialize)]
638pub struct MigrationPlan {
639 pub steps: Vec<MigrationStep>,
640}
641
642#[derive(Debug, Clone, Serialize)]
643pub struct MigrationSummary {
644 pub total: usize,
645 pub safe: usize,
646 pub best_effort: usize,
647 pub warn_only: usize,
648}
649
650impl MigrationPlan {
651 pub fn summary(&self) -> MigrationSummary {
652 let (mut safe, mut best_effort, mut warn_only) = (0, 0, 0);
653 for s in &self.steps {
654 match s.safety {
655 MigrationSafety::Safe => safe += 1,
656 MigrationSafety::BestEffort => best_effort += 1,
657 MigrationSafety::WarnOnly => warn_only += 1,
658 }
659 }
660 MigrationSummary {
661 total: self.steps.len(),
662 safe,
663 best_effort,
664 warn_only,
665 }
666 }
667}
668
669pub struct MigrationExecutionResult {
671 pub applied: usize,
672 pub warned: usize,
673 pub warnings: Vec<String>,
674}
675
676fn default_str(d: &ColumnDefaultConfig) -> String {
677 match d {
678 ColumnDefaultConfig::Literal(s) => s.clone(),
679 ColumnDefaultConfig::Expression { expression } => expression.clone(),
680 }
681}
682
683struct EnumColumnRef {
686 schema: String,
687 table: String,
688 column: String,
689 default: Option<String>,
690 is_array: bool,
691}
692
693fn enum_type_name(t: &crate::db::CanonicalType) -> Option<(String, bool)> {
696 use crate::db::CanonicalType;
697 let unqualified = |s: &str| s.rsplit('.').next().unwrap_or(s).to_string();
698 match t {
699 CanonicalType::Custom(s) => Some((unqualified(s), false)),
700 CanonicalType::Array(inner) => match inner.as_ref() {
701 CanonicalType::Custom(s) => Some((unqualified(s), true)),
702 _ => None,
703 },
704 _ => None,
705 }
706}
707
708fn enum_dependent_columns(
711 new_enum: &EnumConfig,
712 new: &FullConfig,
713 new_tables: &HashMap<&str, &TableConfig>,
714 new_schemas: &HashMap<&str, &SchemaConfig>,
715 schema_override: Option<&str>,
716) -> Vec<EnumColumnRef> {
717 let default_sid = new.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
718 let mut out = Vec::new();
719 for c in &new.columns {
720 let Some((tyname, is_array)) = enum_type_name(&parse_canonical(&c.type_)) else {
721 continue;
722 };
723 if tyname != new_enum.name {
724 continue;
725 }
726 let Some(table) = new_tables.get(c.table_id.as_str()) else {
727 continue;
728 };
729 let tsid = table.schema_id.as_deref().unwrap_or(default_sid);
730 let schema = schema_override.map(String::from).unwrap_or_else(|| {
731 new_schemas
732 .get(tsid)
733 .map(|s| s.name.clone())
734 .unwrap_or_else(|| tsid.to_string())
735 });
736 out.push(EnumColumnRef {
737 schema,
738 table: table.name.clone(),
739 column: c.name.clone(),
740 default: c.default.as_ref().map(default_str),
741 is_array,
742 });
743 }
744 out
745}
746
747fn recreate_enum_steps(
754 steps: &mut Vec<MigrationStep>,
755 schema: &str,
756 new_enum: &EnumConfig,
757 removed: &[&str],
758 dependents: &[EnumColumnRef],
759) {
760 let type_q = format!("{}.{}", quote(schema), quote(&new_enum.name));
761 let tmp_name = format!("{}__arch_old", new_enum.name);
762 let values: Vec<String> = new_enum
763 .values
764 .iter()
765 .map(|v| format!("'{}'", v.replace('\'', "''")))
766 .collect();
767
768 steps.push(MigrationStep {
770 step: 0,
771 operation: MigrationOperation::RemoveEnumValue,
772 schema: schema.to_string(),
773 table: None,
774 object: format!("{}:{}", new_enum.name, removed.join(",")),
775 object_type: "enum".into(),
776 description: format!(
777 "Rebuild enum \"{}\".\"{}\" to remove value(s): {}",
778 schema,
779 new_enum.name,
780 removed.join(", ")
781 ),
782 ddl: None,
783 safety: MigrationSafety::WarnOnly,
784 risk: MigrationRisk::ManualActionRequired,
785 risk_detail: Some(format!(
786 "PostgreSQL cannot drop enum values in place. The type is rebuilt and {} dependent \
787 column(s) are recast via a text cast. Any existing row holding a removed value ({}) \
788 will make its recast fail — reassign those rows first.",
789 dependents.len(),
790 removed.join(", ")
791 )),
792 });
793
794 steps.push(MigrationStep {
796 step: 0,
797 operation: MigrationOperation::DropEnum,
798 schema: schema.to_string(),
799 table: None,
800 object: new_enum.name.clone(),
801 object_type: "enum".into(),
802 description: format!(
803 "Rename enum \"{}\".\"{}\" to \"{}\" before rebuild",
804 schema, new_enum.name, tmp_name
805 ),
806 ddl: Some(format!(
807 "ALTER TYPE {} RENAME TO {}",
808 type_q,
809 quote(&tmp_name)
810 )),
811 safety: MigrationSafety::BestEffort,
812 risk: MigrationRisk::None,
813 risk_detail: None,
814 });
815
816 steps.push(MigrationStep {
818 step: 0,
819 operation: MigrationOperation::CreateEnum,
820 schema: schema.to_string(),
821 table: None,
822 object: new_enum.name.clone(),
823 object_type: "enum".into(),
824 description: format!(
825 "Recreate enum \"{}\".\"{}\" with {} value(s)",
826 schema,
827 new_enum.name,
828 new_enum.values.len()
829 ),
830 ddl: Some(format!(
831 "CREATE TYPE {} AS ENUM ({})",
832 type_q,
833 values.join(", ")
834 )),
835 safety: MigrationSafety::BestEffort,
836 risk: MigrationRisk::None,
837 risk_detail: None,
838 });
839
840 for dep in dependents {
843 let table_q = format!("{}.{}", quote(&dep.schema), quote(&dep.table));
844 let col_q = quote(&dep.column);
845
846 if dep.default.is_some() {
847 steps.push(MigrationStep {
848 step: 0,
849 operation: MigrationOperation::DropDefault,
850 schema: dep.schema.clone(),
851 table: Some(dep.table.clone()),
852 object: dep.column.clone(),
853 object_type: "column".into(),
854 description: format!(
855 "Drop default on {}.{} before enum recast",
856 dep.table, dep.column
857 ),
858 ddl: Some(format!(
859 "ALTER TABLE {} ALTER COLUMN {} DROP DEFAULT",
860 table_q, col_q
861 )),
862 safety: MigrationSafety::BestEffort,
863 risk: MigrationRisk::None,
864 risk_detail: None,
865 });
866 }
867
868 let (col_type, using) = if dep.is_array {
869 (
870 format!("{}[]", type_q),
871 format!("{}::text[]::{}[]", col_q, type_q),
872 )
873 } else {
874 (type_q.clone(), format!("{}::text::{}", col_q, type_q))
875 };
876 steps.push(MigrationStep {
877 step: 0,
878 operation: MigrationOperation::AlterColumnType,
879 schema: dep.schema.clone(),
880 table: Some(dep.table.clone()),
881 object: dep.column.clone(),
882 object_type: "column".into(),
883 description: format!(
884 "Recast {}.{} onto rebuilt enum \"{}\"",
885 dep.table, dep.column, new_enum.name
886 ),
887 ddl: Some(format!(
888 "ALTER TABLE {} ALTER COLUMN {} TYPE {} USING {}",
889 table_q, col_q, col_type, using
890 )),
891 safety: MigrationSafety::BestEffort,
892 risk: MigrationRisk::MayFail,
893 risk_detail: Some(format!(
894 "Cast fails if any row holds a removed value ({}).",
895 removed.join(", ")
896 )),
897 });
898
899 if let Some(def) = &dep.default {
900 steps.push(MigrationStep {
901 step: 0,
902 operation: MigrationOperation::SetDefault,
903 schema: dep.schema.clone(),
904 table: Some(dep.table.clone()),
905 object: dep.column.clone(),
906 object_type: "column".into(),
907 description: format!(
908 "Restore default on {}.{} after enum recast",
909 dep.table, dep.column
910 ),
911 ddl: Some(format!(
912 "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT {}",
913 table_q, col_q, def
914 )),
915 safety: MigrationSafety::BestEffort,
916 risk: MigrationRisk::None,
917 risk_detail: None,
918 });
919 }
920 }
921
922 steps.push(MigrationStep {
924 step: 0,
925 operation: MigrationOperation::DropEnum,
926 schema: schema.to_string(),
927 table: None,
928 object: tmp_name.clone(),
929 object_type: "enum".into(),
930 description: format!("Drop superseded enum \"{}\".\"{}\"", schema, tmp_name),
931 ddl: Some(format!(
932 "DROP TYPE IF EXISTS {}.{}",
933 quote(schema),
934 quote(&tmp_name)
935 )),
936 safety: MigrationSafety::BestEffort,
937 risk: MigrationRisk::None,
938 risk_detail: None,
939 });
940}
941
942pub fn compute_migration_plan(
948 old: &FullConfig,
949 new: &FullConfig,
950 schema_override: Option<&str>,
951 _rls_tenant_column: Option<&str>,
952 dialect: &dyn Dialect,
953 cross_package_configs: &HashMap<String, FullConfig>,
954) -> Result<MigrationPlan, AppError> {
955 validate(new)?;
956
957 let default_old_sid = old.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
958 let default_new_sid = new.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
959
960 let old_schemas: HashMap<&str, &SchemaConfig> =
961 old.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
962 let new_schemas: HashMap<&str, &SchemaConfig> =
963 new.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
964 let old_tables: HashMap<&str, &TableConfig> =
965 old.tables.iter().map(|t| (t.id.as_str(), t)).collect();
966 let new_tables: HashMap<&str, &TableConfig> =
967 new.tables.iter().map(|t| (t.id.as_str(), t)).collect();
968 let old_columns: HashMap<&str, &ColumnConfig> =
969 old.columns.iter().map(|c| (c.id.as_str(), c)).collect();
970 let old_enums: HashMap<&str, &EnumConfig> =
971 old.enums.iter().map(|e| (e.id.as_str(), e)).collect();
972 let new_enums: HashMap<&str, &EnumConfig> =
973 new.enums.iter().map(|e| (e.id.as_str(), e)).collect();
974 let old_indexes: HashMap<&str, &IndexConfig> =
975 old.indexes.iter().map(|i| (i.id.as_str(), i)).collect();
976 let new_indexes: HashMap<&str, &IndexConfig> =
977 new.indexes.iter().map(|i| (i.id.as_str(), i)).collect();
978 let old_rels: HashMap<&str, &RelationshipConfig> = old
979 .relationships
980 .iter()
981 .map(|r| (r.id.as_str(), r))
982 .collect();
983 let new_rels: HashMap<&str, &RelationshipConfig> = new
984 .relationships
985 .iter()
986 .map(|r| (r.id.as_str(), r))
987 .collect();
988
989 let mut steps: Vec<MigrationStep> = Vec::new();
990
991 let schema_name_for = |sid: &str, schemas: &HashMap<&str, &SchemaConfig>| -> String {
992 schema_override.map(String::from).unwrap_or_else(|| {
993 schemas
994 .get(sid)
995 .map(|s| s.name.clone())
996 .unwrap_or_else(|| sid.to_string())
997 })
998 };
999
1000 if schema_override.is_none() {
1002 for s in &new.schemas {
1003 if !old_schemas.contains_key(s.id.as_str()) {
1004 steps.push(MigrationStep {
1005 step: 0,
1006 operation: MigrationOperation::CreateSchema,
1007 schema: s.name.clone(),
1008 table: None,
1009 object: s.name.clone(),
1010 object_type: "schema".into(),
1011 description: format!("Create schema \"{}\"", s.name),
1012 ddl: Some(format!("CREATE SCHEMA IF NOT EXISTS {}", quote(&s.name))),
1013 safety: MigrationSafety::Safe,
1014 risk: MigrationRisk::None,
1015 risk_detail: None,
1016 });
1017 }
1018 }
1019 }
1020
1021 for new_enum in &new.enums {
1023 let sid = new_enum.schema_id.as_deref().unwrap_or(default_new_sid);
1024 let schema = schema_name_for(sid, &new_schemas);
1025
1026 if let Some(old_enum) = old_enums.get(new_enum.id.as_str()) {
1027 let old_vals: HashSet<&str> = old_enum.values.iter().map(String::as_str).collect();
1028 let new_vals: HashSet<&str> = new_enum.values.iter().map(String::as_str).collect();
1029 let removed: Vec<&str> = old_enum
1030 .values
1031 .iter()
1032 .map(String::as_str)
1033 .filter(|v| !new_vals.contains(v))
1034 .collect();
1035
1036 if removed.is_empty() {
1037 for val in new_enum
1039 .values
1040 .iter()
1041 .map(String::as_str)
1042 .filter(|v| !old_vals.contains(v))
1043 {
1044 steps.push(MigrationStep {
1045 step: 0,
1046 operation: MigrationOperation::AddEnumValue,
1047 schema: schema.clone(),
1048 table: None,
1049 object: format!("{}:{}", new_enum.name, val),
1050 object_type: "enum_value".into(),
1051 description: format!(
1052 "Add value '{}' to enum \"{}\".\"{}\"",
1053 val, schema, new_enum.name
1054 ),
1055 ddl: Some(format!(
1056 "ALTER TYPE {}.{} ADD VALUE IF NOT EXISTS '{}'",
1057 quote(&schema),
1058 quote(&new_enum.name),
1059 val.replace('\'', "''")
1060 )),
1061 safety: MigrationSafety::Safe,
1062 risk: MigrationRisk::None,
1063 risk_detail: None,
1064 });
1065 }
1066 } else {
1067 let dependents = enum_dependent_columns(
1071 new_enum,
1072 new,
1073 &new_tables,
1074 &new_schemas,
1075 schema_override,
1076 );
1077 recreate_enum_steps(&mut steps, &schema, new_enum, &removed, &dependents);
1078 }
1079 } else {
1080 let values: Vec<String> = new_enum
1081 .values
1082 .iter()
1083 .map(|v| format!("'{}'", v.replace('\'', "''")))
1084 .collect();
1085 steps.push(MigrationStep {
1086 step: 0,
1087 operation: MigrationOperation::CreateEnum,
1088 schema: schema.clone(),
1089 table: None,
1090 object: new_enum.name.clone(),
1091 object_type: "enum".into(),
1092 description: format!("Create enum type \"{}\".\"{}\"", schema, new_enum.name),
1093 ddl: Some(format!("CREATE TYPE {}.{} AS ENUM ({})", quote(&schema), quote(&new_enum.name), values.join(", "))),
1094 safety: MigrationSafety::BestEffort,
1095 risk: MigrationRisk::None,
1096 risk_detail: Some("PostgreSQL has no CREATE TYPE IF NOT EXISTS; ignored if the type already exists.".into()),
1097 });
1098 }
1099 }
1100 for old_enum in &old.enums {
1101 if !new_enums.contains_key(old_enum.id.as_str()) {
1102 let sid = old_enum.schema_id.as_deref().unwrap_or(default_old_sid);
1103 let schema = schema_name_for(sid, &old_schemas);
1104 steps.push(MigrationStep {
1105 step: 0,
1106 operation: MigrationOperation::DropEnum,
1107 schema: schema.clone(),
1108 table: None,
1109 object: old_enum.name.clone(),
1110 object_type: "enum".into(),
1111 description: format!("Enum \"{}\".\"{}\" removed from config", schema, old_enum.name),
1112 ddl: None,
1113 safety: MigrationSafety::WarnOnly,
1114 risk: MigrationRisk::ManualActionRequired,
1115 risk_detail: Some("Enum type NOT dropped from database (data safety). Run DROP TYPE manually if intended.".into()),
1116 });
1117 }
1118 }
1119
1120 let added_table_ids: HashSet<&str> = new
1122 .tables
1123 .iter()
1124 .filter(|t| !old_tables.contains_key(t.id.as_str()))
1125 .map(|t| t.id.as_str())
1126 .collect();
1127
1128 let cols_by_table: HashMap<&str, Vec<&ColumnConfig>> =
1129 new.columns.iter().fold(HashMap::new(), |mut m, c| {
1130 m.entry(c.table_id.as_str()).or_default().push(c);
1131 m
1132 });
1133
1134 for new_table in &new.tables {
1135 if !added_table_ids.contains(new_table.id.as_str()) {
1136 continue;
1137 }
1138 let sid = new_table.schema_id.as_deref().unwrap_or(default_new_sid);
1139 let schema = schema_name_for(sid, &new_schemas);
1140 let full = format!("{}.{}", quote(&schema), quote(&new_table.name));
1141
1142 let cols = cols_by_table
1143 .get(new_table.id.as_str())
1144 .map(|v| v.as_slice())
1145 .unwrap_or(&[]);
1146 let mut col_defs: Vec<String> = Vec::new();
1147 for c in cols {
1148 let typ = dialect.ddl_type(&parse_canonical(&c.type_));
1149 let mut def = format!("{} {}", quote(&c.name), typ);
1150 if !c.nullable {
1151 def.push_str(" NOT NULL");
1152 }
1153 if let Some(ref d) = c.default {
1154 def.push_str(" DEFAULT ");
1155 match d {
1156 ColumnDefaultConfig::Literal(s) => def.push_str(s),
1157 ColumnDefaultConfig::Expression { expression } => def.push_str(expression),
1158 }
1159 }
1160 col_defs.push(def);
1161 }
1162 let cfg_col_names: HashSet<&str> = cols.iter().map(|c| c.name.as_str()).collect();
1163 for (name, suf) in [
1168 ("created_at", "TIMESTAMPTZ NOT NULL DEFAULT NOW()"),
1169 ("updated_at", "TIMESTAMPTZ NOT NULL DEFAULT NOW()"),
1170 ("archived_at", "TIMESTAMPTZ"),
1171 ("created_by", "TEXT"),
1172 ("updated_by", "TEXT"),
1173 ] {
1174 if !cfg_col_names.contains(name) {
1175 col_defs.push(format!("{} {}", quote(name), suf));
1176 }
1177 }
1178 let pk_cols = match &new_table.primary_key {
1179 PrimaryKeyConfig::Single(s) => vec![quote(s)],
1180 PrimaryKeyConfig::Composite(v) => v.iter().map(|s| quote(s)).collect(),
1181 };
1182 col_defs.push(format!("PRIMARY KEY ({})", pk_cols.join(", ")));
1183 for u in &new_table.unique {
1184 col_defs.push(format!(
1185 "UNIQUE ({})",
1186 u.iter().map(|s| quote(s)).collect::<Vec<_>>().join(", ")
1187 ));
1188 }
1189 for ch in &new_table.check {
1190 col_defs.push(format!(
1191 "CONSTRAINT {} CHECK ({})",
1192 quote(&ch.name),
1193 ch.expression
1194 ));
1195 }
1196
1197 steps.push(MigrationStep {
1198 step: 0,
1199 operation: MigrationOperation::CreateTable,
1200 schema: schema.clone(),
1201 table: Some(new_table.name.clone()),
1202 object: new_table.name.clone(),
1203 object_type: "table".into(),
1204 description: format!("Create table \"{}\".\"{}\"", schema, new_table.name),
1205 ddl: Some(format!(
1206 "CREATE TABLE IF NOT EXISTS {} (\n {}\n)",
1207 full,
1208 col_defs.join(",\n ")
1209 )),
1210 safety: MigrationSafety::Safe,
1211 risk: MigrationRisk::None,
1212 risk_detail: None,
1213 });
1214 if new_table.audit_log {
1215 let audit_ddl = audit_table_ddl(&schema, &new_table.name, cols, dialect);
1216 steps.push(MigrationStep {
1217 step: 0,
1218 operation: MigrationOperation::CreateTable,
1219 schema: schema.clone(),
1220 table: Some(format!("{}_audit", new_table.name)),
1221 object: format!("{}_audit", new_table.name),
1222 object_type: "table".into(),
1223 description: format!(
1224 "Create audit table \"{}\".\"{}_audit\"",
1225 schema, new_table.name
1226 ),
1227 ddl: Some(audit_ddl),
1228 safety: MigrationSafety::Safe,
1229 risk: MigrationRisk::None,
1230 risk_detail: None,
1231 });
1232 }
1233 if new_table.versioning.as_ref().is_some_and(|v| v.enabled) {
1234 let pk_col = match &new_table.primary_key {
1235 PrimaryKeyConfig::Single(s) => s.clone(),
1236 PrimaryKeyConfig::Composite(v) => v[0].clone(),
1237 };
1238 let history_create = format!(
1240 "CREATE TABLE IF NOT EXISTS {}.{} (\n {}\n)",
1241 quote(&schema),
1242 quote(&format!("{}_history", new_table.name)),
1243 {
1244 let full_ddl =
1245 history_table_ddl(&schema, &new_table.name, &pk_col, cols, dialect);
1246 full_ddl
1247 .lines()
1248 .skip(1) .take_while(|l| !l.trim_start().starts_with("-- index:"))
1250 .collect::<Vec<_>>()
1251 .join("\n")
1252 .trim_end_matches(['\n', ',', ')'])
1253 .to_string()
1254 + "\n)"
1255 }
1256 );
1257 steps.push(MigrationStep {
1258 step: 0,
1259 operation: MigrationOperation::CreateTable,
1260 schema: schema.clone(),
1261 table: Some(format!("{}_history", new_table.name)),
1262 object: format!("{}_history", new_table.name),
1263 object_type: "table".into(),
1264 description: format!(
1265 "Create history table \"{}\".\"{}_history\" (versioning)",
1266 schema, new_table.name
1267 ),
1268 ddl: Some(history_create),
1269 safety: MigrationSafety::Safe,
1270 risk: MigrationRisk::None,
1271 risk_detail: None,
1272 });
1273 steps.push(MigrationStep {
1274 step: 0,
1275 operation: MigrationOperation::CreateIndex,
1276 schema: schema.clone(),
1277 table: Some(format!("{}_history", new_table.name)),
1278 object: format!("{}_history_{}_idx", new_table.name, pk_col),
1279 object_type: "index".into(),
1280 description: format!(
1281 "Create index on history table \"{}\".\"{}\" ({pk_col}, _version DESC)",
1282 schema, new_table.name
1283 ),
1284 ddl: Some(history_index_ddl(&schema, &new_table.name, &pk_col)),
1285 safety: MigrationSafety::Safe,
1286 risk: MigrationRisk::None,
1287 risk_detail: None,
1288 });
1289 }
1290 }
1291
1292 for new_table in &new.tables {
1294 if added_table_ids.contains(new_table.id.as_str()) {
1295 continue;
1296 }
1297 if let Some(old_table) = old_tables.get(new_table.id.as_str()) {
1298 let sid = new_table.schema_id.as_deref().unwrap_or(default_new_sid);
1299 let schema = schema_name_for(sid, &new_schemas);
1300 let cols = cols_by_table
1301 .get(new_table.id.as_str())
1302 .map(|v| v.as_slice())
1303 .unwrap_or(&[]);
1304
1305 if !old_table.audit_log && new_table.audit_log {
1306 let audit_ddl = audit_table_ddl(&schema, &new_table.name, cols, dialect);
1307 steps.push(MigrationStep {
1308 step: 0,
1309 operation: MigrationOperation::CreateTable,
1310 schema: schema.clone(),
1311 table: Some(format!("{}_audit", new_table.name)),
1312 object: format!("{}_audit", new_table.name),
1313 object_type: "table".into(),
1314 description: format!(
1315 "Enable audit log: create \"{}\".\"{}_audit\"",
1316 schema, new_table.name
1317 ),
1318 ddl: Some(audit_ddl),
1319 safety: MigrationSafety::Safe,
1320 risk: MigrationRisk::None,
1321 risk_detail: None,
1322 });
1323 }
1324
1325 let old_versioning_enabled = old_table.versioning.as_ref().is_some_and(|v| v.enabled);
1326 let new_versioning_enabled = new_table.versioning.as_ref().is_some_and(|v| v.enabled);
1327 if !old_versioning_enabled && new_versioning_enabled {
1328 let pk_col = match &new_table.primary_key {
1329 PrimaryKeyConfig::Single(s) => s.clone(),
1330 PrimaryKeyConfig::Composite(v) => v[0].clone(),
1331 };
1332 let history_ddl =
1333 history_table_ddl(&schema, &new_table.name, &pk_col, cols, dialect);
1334 let create_only = history_ddl
1335 .lines()
1336 .take_while(|l| !l.trim_start().starts_with("-- index:"))
1337 .collect::<Vec<_>>()
1338 .join("\n");
1339 steps.push(MigrationStep {
1340 step: 0,
1341 operation: MigrationOperation::CreateTable,
1342 schema: schema.clone(),
1343 table: Some(format!("{}_history", new_table.name)),
1344 object: format!("{}_history", new_table.name),
1345 object_type: "table".into(),
1346 description: format!(
1347 "Enable versioning: create \"{}\".\"{}_history\"",
1348 schema, new_table.name
1349 ),
1350 ddl: Some(create_only.trim().to_string()),
1351 safety: MigrationSafety::Safe,
1352 risk: MigrationRisk::None,
1353 risk_detail: None,
1354 });
1355 steps.push(MigrationStep {
1356 step: 0,
1357 operation: MigrationOperation::CreateIndex,
1358 schema: schema.clone(),
1359 table: Some(format!("{}_history", new_table.name)),
1360 object: format!("{}_history_{}_idx", new_table.name, pk_col),
1361 object_type: "index".into(),
1362 description: format!(
1363 "Create history index on \"{}\".\"{}\"",
1364 schema, new_table.name
1365 ),
1366 ddl: Some(history_index_ddl(&schema, &new_table.name, &pk_col)),
1367 safety: MigrationSafety::Safe,
1368 risk: MigrationRisk::None,
1369 risk_detail: None,
1370 });
1371 }
1372 }
1373 }
1374
1375 for old_table in &old.tables {
1376 if !new_tables.contains_key(old_table.id.as_str()) {
1377 let sid = old_table.schema_id.as_deref().unwrap_or(default_old_sid);
1378 let schema = schema_name_for(sid, &old_schemas);
1379 steps.push(MigrationStep {
1380 step: 0,
1381 operation: MigrationOperation::DropTable,
1382 schema: schema.clone(),
1383 table: Some(old_table.name.clone()),
1384 object: old_table.name.clone(),
1385 object_type: "table".into(),
1386 description: format!("Table \"{}\".\"{}\" removed from config", schema, old_table.name),
1387 ddl: None,
1388 safety: MigrationSafety::WarnOnly,
1389 risk: MigrationRisk::ManualActionRequired,
1390 risk_detail: Some("Table NOT dropped from database (data safety). Run DROP TABLE manually if intended.".into()),
1391 });
1392 }
1393 }
1394
1395 for new_col in &new.columns {
1397 if added_table_ids.contains(new_col.table_id.as_str()) {
1398 continue;
1399 }
1400 let table = match new_tables.get(new_col.table_id.as_str()) {
1401 Some(t) => t,
1402 None => continue,
1403 };
1404 let sid = table.schema_id.as_deref().unwrap_or(default_new_sid);
1405 let schema = schema_name_for(sid, &new_schemas);
1406 let full = format!("{}.{}", quote(&schema), quote(&table.name));
1407
1408 if let Some(old_col) = old_columns.get(new_col.id.as_str()) {
1409 if old_col.table_id != new_col.table_id {
1410 steps.push(MigrationStep {
1411 step: 0,
1412 operation: MigrationOperation::AddColumn,
1413 schema: schema.clone(),
1414 table: Some(table.name.clone()),
1415 object: new_col.name.clone(),
1416 object_type: "column".into(),
1417 description: format!("Column \"{}\" (id: {}) appears to have moved tables — manual migration required", new_col.name, new_col.id),
1418 ddl: None,
1419 safety: MigrationSafety::WarnOnly,
1420 risk: MigrationRisk::ManualActionRequired,
1421 risk_detail: Some(format!("Cannot automate column move from table {} to {}.", old_col.table_id, new_col.table_id)),
1422 });
1423 continue;
1424 }
1425
1426 if old_col.name != new_col.name {
1428 steps.push(MigrationStep {
1429 step: 0,
1430 operation: MigrationOperation::RenameColumn,
1431 schema: schema.clone(),
1432 table: Some(table.name.clone()),
1433 object: new_col.name.clone(),
1434 object_type: "column".into(),
1435 description: format!(
1436 "Rename column \"{}\" → \"{}\" on \"{}\".\"{}\"",
1437 old_col.name, new_col.name, schema, table.name
1438 ),
1439 ddl: Some(format!(
1440 "ALTER TABLE {} RENAME COLUMN {} TO {}",
1441 full,
1442 quote(&old_col.name),
1443 quote(&new_col.name)
1444 )),
1445 safety: MigrationSafety::Safe,
1446 risk: MigrationRisk::None,
1447 risk_detail: None,
1448 });
1449 steps.extend(companion_column_steps(
1450 &schema,
1451 table,
1452 &CompanionColumnOp::Rename {
1453 old: &old_col.name,
1454 new: &new_col.name,
1455 },
1456 ));
1457 }
1458
1459 let old_type = dialect.ddl_type(&parse_canonical(&old_col.type_));
1461 let new_type = dialect.ddl_type(&parse_canonical(&new_col.type_));
1462 if old_type.to_uppercase() != new_type.to_uppercase() {
1463 let col_name = &new_col.name;
1464 steps.push(MigrationStep {
1465 step: 0,
1466 operation: MigrationOperation::AlterColumnType,
1467 schema: schema.clone(),
1468 table: Some(table.name.clone()),
1469 object: col_name.clone(),
1470 object_type: "column".into(),
1471 description: format!("Change type of \"{}\".\"{}\".\"{}\": {} → {}", schema, table.name, col_name, old_type, new_type),
1472 ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} TYPE {} USING {}::{}", full, quote(col_name), new_type, quote(col_name), new_type)),
1473 safety: MigrationSafety::BestEffort,
1474 risk: MigrationRisk::MayFail,
1475 risk_detail: Some(format!("USING {}::{} cast may fail for incompatible values. Provide a custom USING expression if needed.", col_name, new_type)),
1476 });
1477 steps.extend(companion_column_steps(
1478 &schema,
1479 table,
1480 &CompanionColumnOp::AlterType {
1481 name: col_name.as_str(),
1482 ty: &new_type,
1483 },
1484 ));
1485 }
1486
1487 if old_col.nullable && !new_col.nullable {
1489 if let Some(ref d) = new_col.default {
1490 let default_val = default_str(d);
1491 steps.push(MigrationStep {
1493 step: 0,
1494 operation: MigrationOperation::BackfillNulls,
1495 schema: schema.clone(),
1496 table: Some(table.name.clone()),
1497 object: new_col.name.clone(),
1498 object_type: "column".into(),
1499 description: format!("Backfill NULLs in \"{}\".\"{}\".\"{}\": SET {} = {} WHERE {} IS NULL", schema, table.name, new_col.name, new_col.name, default_val, new_col.name),
1500 ddl: Some(format!("UPDATE {} SET {} = {} WHERE {} IS NULL", full, quote(&new_col.name), default_val, quote(&new_col.name))),
1501 safety: MigrationSafety::Safe,
1502 risk: MigrationRisk::DataWillBeModified,
1503 risk_detail: Some(format!("Existing NULLs in column \"{}\" will be set to {} before NOT NULL is enforced.", new_col.name, default_val)),
1504 });
1505 steps.push(MigrationStep {
1507 step: 0,
1508 operation: MigrationOperation::SetNotNull,
1509 schema: schema.clone(),
1510 table: Some(table.name.clone()),
1511 object: new_col.name.clone(),
1512 object_type: "column".into(),
1513 description: format!("Set NOT NULL on \"{}\".\"{}\".\"{}\": NULLs pre-filled with default ({})", schema, table.name, new_col.name, default_val),
1514 ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} SET NOT NULL", full, quote(&new_col.name))),
1515 safety: MigrationSafety::Safe,
1516 risk: MigrationRisk::None,
1517 risk_detail: None,
1518 });
1519 } else {
1520 steps.push(MigrationStep {
1522 step: 0,
1523 operation: MigrationOperation::SetNotNull,
1524 schema: schema.clone(),
1525 table: Some(table.name.clone()),
1526 object: new_col.name.clone(),
1527 object_type: "column".into(),
1528 description: format!("Set NOT NULL on \"{}\".\"{}\".\"{}\": no default configured — will fail if NULLs exist", schema, table.name, new_col.name),
1529 ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} SET NOT NULL", full, quote(&new_col.name))),
1530 safety: MigrationSafety::BestEffort,
1531 risk: MigrationRisk::ExistingNullsMustBeAbsent,
1532 risk_detail: Some(format!(
1533 "No default value configured for column \"{}\". Add a default to the config to enable automatic NULL backfill before enforcing NOT NULL.",
1534 new_col.name
1535 )),
1536 });
1537 }
1538 }
1539
1540 if !old_col.nullable && new_col.nullable {
1542 steps.push(MigrationStep {
1543 step: 0,
1544 operation: MigrationOperation::DropNotNull,
1545 schema: schema.clone(),
1546 table: Some(table.name.clone()),
1547 object: new_col.name.clone(),
1548 object_type: "column".into(),
1549 description: format!(
1550 "Drop NOT NULL on \"{}\".\"{}\".\"{}\": column becomes nullable",
1551 schema, table.name, new_col.name
1552 ),
1553 ddl: Some(format!(
1554 "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
1555 full,
1556 quote(&new_col.name)
1557 )),
1558 safety: MigrationSafety::Safe,
1559 risk: MigrationRisk::None,
1560 risk_detail: None,
1561 });
1562 }
1563
1564 let old_def = old_col.default.as_ref().map(default_str);
1566 let new_def = new_col.default.as_ref().map(default_str);
1567 if old_def != new_def {
1568 match &new_col.default {
1569 Some(d) => {
1570 let val = default_str(d);
1571 steps.push(MigrationStep {
1572 step: 0,
1573 operation: MigrationOperation::SetDefault,
1574 schema: schema.clone(),
1575 table: Some(table.name.clone()),
1576 object: new_col.name.clone(),
1577 object_type: "column".into(),
1578 description: format!(
1579 "Set DEFAULT {} on \"{}\".\"{}\".\"{}\": was {}",
1580 val,
1581 schema,
1582 table.name,
1583 new_col.name,
1584 old_def.as_deref().unwrap_or("none")
1585 ),
1586 ddl: Some(format!(
1587 "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT {}",
1588 full,
1589 quote(&new_col.name),
1590 val
1591 )),
1592 safety: MigrationSafety::Safe,
1593 risk: MigrationRisk::None,
1594 risk_detail: None,
1595 });
1596 }
1597 None => {
1598 steps.push(MigrationStep {
1599 step: 0,
1600 operation: MigrationOperation::DropDefault,
1601 schema: schema.clone(),
1602 table: Some(table.name.clone()),
1603 object: new_col.name.clone(),
1604 object_type: "column".into(),
1605 description: format!(
1606 "Drop DEFAULT on \"{}\".\"{}\".\"{}\": was {}",
1607 schema,
1608 table.name,
1609 new_col.name,
1610 old_def.as_deref().unwrap_or("none")
1611 ),
1612 ddl: Some(format!(
1613 "ALTER TABLE {} ALTER COLUMN {} DROP DEFAULT",
1614 full,
1615 quote(&new_col.name)
1616 )),
1617 safety: MigrationSafety::Safe,
1618 risk: MigrationRisk::None,
1619 risk_detail: None,
1620 });
1621 }
1622 }
1623 }
1624 } else {
1625 let new_type = dialect.ddl_type(&parse_canonical(&new_col.type_));
1627 let mut col_def = format!("{} {}", quote(&new_col.name), new_type);
1628 if !new_col.nullable {
1629 col_def.push_str(" NOT NULL");
1630 }
1631 if let Some(ref d) = new_col.default {
1632 col_def.push_str(" DEFAULT ");
1633 match d {
1634 ColumnDefaultConfig::Literal(s) => col_def.push_str(s),
1635 ColumnDefaultConfig::Expression { expression } => col_def.push_str(expression),
1636 }
1637 }
1638 steps.push(MigrationStep {
1639 step: 0,
1640 operation: MigrationOperation::AddColumn,
1641 schema: schema.clone(),
1642 table: Some(table.name.clone()),
1643 object: new_col.name.clone(),
1644 object_type: "column".into(),
1645 description: format!(
1646 "Add column \"{}\" {} to \"{}\".\"{}\"",
1647 new_col.name, new_type, schema, table.name
1648 ),
1649 ddl: Some(format!("ALTER TABLE {} ADD COLUMN {}", full, col_def)),
1650 safety: MigrationSafety::Safe,
1651 risk: MigrationRisk::None,
1652 risk_detail: None,
1653 });
1654 steps.extend(companion_column_steps(
1655 &schema,
1656 table,
1657 &CompanionColumnOp::Add {
1658 name: &new_col.name,
1659 ty: &new_type,
1660 },
1661 ));
1662 }
1663 }
1664
1665 for old_col in &old.columns {
1667 if new.columns.iter().any(|c| c.id == old_col.id) {
1668 continue;
1669 }
1670 if !new_tables.contains_key(old_col.table_id.as_str()) {
1671 continue;
1672 }
1673 let table_name = old_tables
1674 .get(old_col.table_id.as_str())
1675 .map(|t| t.name.as_str())
1676 .unwrap_or(&old_col.table_id);
1677 let sid = old_tables
1678 .get(old_col.table_id.as_str())
1679 .and_then(|t| t.schema_id.as_deref())
1680 .unwrap_or(default_old_sid);
1681 let schema = schema_name_for(sid, &old_schemas);
1682 steps.push(MigrationStep {
1683 step: 0,
1684 operation: MigrationOperation::DropColumn,
1685 schema: schema.clone(),
1686 table: Some(table_name.to_string()),
1687 object: old_col.name.clone(),
1688 object_type: "column".into(),
1689 description: format!("Column \"{}\" removed from config on \"{}\".\"{}\"", old_col.name, schema, table_name),
1690 ddl: None,
1691 safety: MigrationSafety::WarnOnly,
1692 risk: MigrationRisk::ManualActionRequired,
1693 risk_detail: Some("Column NOT dropped from database (data safety). Run ALTER TABLE DROP COLUMN manually if intended.".into()),
1694 });
1695 }
1696
1697 for old_idx in &old.indexes {
1699 if !new_indexes.contains_key(old_idx.id.as_str()) {
1700 let sid = old_idx.schema_id.as_deref().unwrap_or(default_old_sid);
1701 let schema = schema_name_for(sid, &old_schemas);
1702 steps.push(MigrationStep {
1703 step: 0,
1704 operation: MigrationOperation::DropIndex,
1705 schema: schema.clone(),
1706 table: old_tables
1707 .get(old_idx.table_id.as_str())
1708 .map(|t| t.name.clone()),
1709 object: old_idx.name.clone(),
1710 object_type: "index".into(),
1711 description: format!("Drop index \"{}\" in schema \"{}\"", old_idx.name, schema),
1712 ddl: Some(format!(
1713 "DROP INDEX IF EXISTS {}.{}",
1714 quote(&schema),
1715 quote(&old_idx.name)
1716 )),
1717 safety: MigrationSafety::Safe,
1718 risk: MigrationRisk::None,
1719 risk_detail: None,
1720 });
1721 }
1722 }
1723 for new_idx in &new.indexes {
1724 if old_indexes.contains_key(new_idx.id.as_str())
1725 || added_table_ids.contains(new_idx.table_id.as_str())
1726 {
1727 continue;
1728 }
1729 let sid = new_idx.schema_id.as_deref().unwrap_or(default_new_sid);
1730 let schema = match new_schemas.get(sid) {
1731 Some(s) => schema_override.unwrap_or(&s.name).to_string(),
1732 None => continue,
1733 };
1734 let table = match new_tables.get(new_idx.table_id.as_str()) {
1735 Some(t) => t,
1736 None => continue,
1737 };
1738 let full_table = format!("{}.{}", quote(&schema), quote(&table.name));
1739 let mut col_parts: Vec<String> = Vec::new();
1740 for col in &new_idx.columns {
1741 match col {
1742 IndexColumnEntry::Name(n) => col_parts.push(quote(n)),
1743 IndexColumnEntry::Spec {
1744 name, direction, ..
1745 } => {
1746 let dir = direction
1747 .as_deref()
1748 .map(|d| format!(" {}", d.to_uppercase()))
1749 .unwrap_or_default();
1750 col_parts.push(format!("{}{}", quote(name), dir));
1751 }
1752 IndexColumnEntry::Expression { expression } => col_parts.push(expression.clone()),
1753 }
1754 }
1755 let method = new_idx.method.as_deref().unwrap_or("btree");
1756 let unique_kw = if new_idx.unique { "UNIQUE " } else { "" };
1757 let include = if new_idx.include.is_empty() {
1758 String::new()
1759 } else {
1760 format!(
1761 " INCLUDE ({})",
1762 new_idx
1763 .include
1764 .iter()
1765 .map(|s| quote(s))
1766 .collect::<Vec<_>>()
1767 .join(", ")
1768 )
1769 };
1770 let where_clause = new_idx
1771 .where_
1772 .as_ref()
1773 .map(|w| format!(" WHERE {}", w))
1774 .unwrap_or_default();
1775 steps.push(MigrationStep {
1776 step: 0,
1777 operation: MigrationOperation::CreateIndex,
1778 schema: schema.clone(),
1779 table: Some(table.name.clone()),
1780 object: new_idx.name.clone(),
1781 object_type: "index".into(),
1782 description: format!(
1783 "Create {}index \"{}\" on \"{}\".\"{}\"",
1784 if new_idx.unique { "unique " } else { "" },
1785 new_idx.name,
1786 schema,
1787 table.name
1788 ),
1789 ddl: Some(format!(
1790 "CREATE {}INDEX IF NOT EXISTS {} ON {} USING {} ({}){}{}",
1791 unique_kw,
1792 quote(&new_idx.name),
1793 full_table,
1794 method,
1795 col_parts.join(", "),
1796 include,
1797 where_clause
1798 )),
1799 safety: MigrationSafety::Safe,
1800 risk: MigrationRisk::None,
1801 risk_detail: None,
1802 });
1803 }
1804
1805 for old_rel in &old.relationships {
1807 if !new_rels.contains_key(old_rel.id.as_str()) {
1808 let from_sid_fallback = old_rel.from_schema_id.as_deref().unwrap_or(default_old_sid);
1809 let from_schema = old_schemas
1810 .get(from_sid_fallback)
1811 .map(|s| s.name.as_str())
1812 .unwrap_or(from_sid_fallback);
1813 let from_table = old_tables
1814 .get(old_rel.from_table_id.as_str())
1815 .map(|t| t.name.as_str())
1816 .unwrap_or(&old_rel.from_table_id);
1817 let constraint = old_rel.name.as_deref().unwrap_or(&old_rel.id);
1818 let schema_q = quote(schema_override.unwrap_or(from_schema));
1819 steps.push(MigrationStep {
1820 step: 0,
1821 operation: MigrationOperation::DropForeignKey,
1822 schema: schema_override.unwrap_or(from_schema).to_string(),
1823 table: Some(from_table.to_string()),
1824 object: constraint.to_string(),
1825 object_type: "foreign_key".into(),
1826 description: format!(
1827 "Drop FK \"{}\" from \"{}\".\"{}\"",
1828 constraint,
1829 schema_override.unwrap_or(from_schema),
1830 from_table
1831 ),
1832 ddl: Some(format!(
1833 "ALTER TABLE {}.{} DROP CONSTRAINT IF EXISTS {}",
1834 schema_q,
1835 quote(from_table),
1836 quote(constraint)
1837 )),
1838 safety: MigrationSafety::Safe,
1839 risk: MigrationRisk::None,
1840 risk_detail: None,
1841 });
1842 }
1843 }
1844 for new_rel in &new.relationships {
1845 if old_rels.contains_key(new_rel.id.as_str())
1846 || added_table_ids.contains(new_rel.from_table_id.as_str())
1847 || added_table_ids.contains(new_rel.to_table_id.as_str())
1848 {
1849 continue;
1850 }
1851 let from_sid = new_rel.from_schema_id.as_deref().unwrap_or(default_new_sid);
1852 let from_schema = match new_schemas.get(from_sid) {
1853 Some(s) => s,
1854 None => continue,
1855 };
1856 let from_table = match new_tables.get(new_rel.from_table_id.as_str()) {
1857 Some(t) => t,
1858 None => continue,
1859 };
1860 let from_col = new
1861 .columns
1862 .iter()
1863 .find(|c| c.id == new_rel.from_column_id)
1864 .map(|c| c.name.clone())
1865 .unwrap_or_else(|| new_rel.from_column_id.clone());
1866
1867 let (to_schema_name, to_table_name, to_col) =
1869 if let Some(pkg_id) = new_rel.to_package_id.as_deref() {
1870 match cross_package_configs.get(pkg_id) {
1871 Some(foreign) => {
1872 let foreign_tables: HashMap<_, _> =
1873 foreign.tables.iter().map(|t| (t.id.as_str(), t)).collect();
1874 let foreign_schemas: HashMap<_, _> =
1875 foreign.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
1876 let foreign_default_sid =
1877 foreign.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
1878 let to_sid = new_rel
1879 .to_schema_id
1880 .as_deref()
1881 .unwrap_or(foreign_default_sid);
1882 let tbl = match foreign_tables.get(new_rel.to_table_id.as_str()) {
1883 Some(t) => t,
1884 None => continue,
1885 };
1886 let schema = match foreign_schemas.get(to_sid) {
1887 Some(s) => s,
1888 None => continue,
1889 };
1890 let col = foreign
1891 .columns
1892 .iter()
1893 .find(|c| c.id == new_rel.to_column_id)
1894 .map(|c| c.name.clone())
1895 .unwrap_or_else(|| new_rel.to_column_id.clone());
1896 (schema.name.clone(), tbl.name.clone(), col)
1897 }
1898 None => continue,
1899 }
1900 } else {
1901 let to_sid = new_rel.to_schema_id.as_deref().unwrap_or(default_new_sid);
1902 let to_schema = match new_schemas.get(to_sid) {
1903 Some(s) => s,
1904 None => continue,
1905 };
1906 let to_table = match new_tables.get(new_rel.to_table_id.as_str()) {
1907 Some(t) => t,
1908 None => continue,
1909 };
1910 let col = new
1911 .columns
1912 .iter()
1913 .find(|c| c.id == new_rel.to_column_id)
1914 .map(|c| c.name.clone())
1915 .unwrap_or_else(|| new_rel.to_column_id.clone());
1916 (
1917 schema_override.unwrap_or(&to_schema.name).to_string(),
1918 to_table.name.clone(),
1919 col,
1920 )
1921 };
1922
1923 let from_schema_str = schema_override.unwrap_or(&from_schema.name);
1924 let from_q = format!("{}.{}", quote(from_schema_str), quote(&from_table.name));
1925 let to_q = format!("{}.{}", quote(&to_schema_name), quote(&to_table_name));
1926 let constraint = new_rel.name.as_deref().unwrap_or(&new_rel.id);
1927 let on_update = new_rel.on_update.as_deref().unwrap_or("NO ACTION");
1928 let on_delete = new_rel.on_delete.as_deref().unwrap_or("NO ACTION");
1929 steps.push(MigrationStep {
1930 step: 0,
1931 operation: MigrationOperation::AddForeignKey,
1932 schema: from_schema_str.to_string(),
1933 table: Some(from_table.name.clone()),
1934 object: constraint.to_string(),
1935 object_type: "foreign_key".into(),
1936 description: format!(
1937 "Add FK \"{}\" on \"{}\".\"{}\" → \"{}\".\"{}\"",
1938 constraint, from_schema_str, from_table.name, to_schema_name, to_table_name
1939 ),
1940 ddl: Some(format!(
1941 "ALTER TABLE {} ADD CONSTRAINT {} FOREIGN KEY ({}) REFERENCES {} ({}) ON UPDATE {} ON DELETE {}",
1942 from_q, quote(constraint), quote(&from_col), to_q, quote(&to_col), on_update, on_delete
1943 )),
1944 safety: MigrationSafety::BestEffort,
1945 risk: MigrationRisk::None,
1946 risk_detail: Some("PostgreSQL has no ADD CONSTRAINT IF NOT EXISTS; ignored if constraint already exists.".into()),
1947 });
1948 }
1949
1950 for (i, s) in steps.iter_mut().enumerate() {
1952 s.step = i + 1;
1953 }
1954
1955 Ok(MigrationPlan { steps })
1956}
1957
1958#[allow(clippy::too_many_arguments)]
1964pub async fn execute_migration_plan(
1965 migration_pool: &Pool,
1966 config_pool: &Pool,
1967 plan: &MigrationPlan,
1968 migration_plan_id: &str,
1969 package_id: &str,
1970 tenant_id: &str,
1971 from_version: Option<&str>,
1972 to_version: &str,
1973) -> Result<MigrationExecutionResult, AppError> {
1974 let mut applied = 0usize;
1975 let mut warned = 0usize;
1976 let mut warnings: Vec<String> = Vec::new();
1977
1978 for step in &plan.steps {
1979 let op = step.operation.to_string();
1980 let safety_str = format!("{:?}", step.safety);
1981 let risk_str = format!("{:?}", step.risk);
1982
1983 match step.safety {
1984 MigrationSafety::WarnOnly => {
1985 let msg = step
1986 .risk_detail
1987 .clone()
1988 .unwrap_or_else(|| step.description.clone());
1989 tracing::warn!(step = step.step, %op, "migration plan warning (no DDL)");
1990 warnings.push(format!("[Step {}] {}", step.step, msg));
1991 let _ = crate::store::insert_migration_audit(
1992 config_pool,
1993 migration_plan_id,
1994 package_id,
1995 tenant_id,
1996 from_version,
1997 to_version,
1998 step.step as i32,
1999 &op,
2000 &step.schema,
2001 step.table.as_deref(),
2002 &step.object,
2003 &step.object_type,
2004 &step.description,
2005 step.ddl.as_deref(),
2006 &safety_str,
2007 &risk_str,
2008 "skipped",
2009 None,
2010 )
2011 .await;
2012 warned += 1;
2013 }
2014 MigrationSafety::Safe | MigrationSafety::BestEffort => {
2015 if let Some(ref sql) = step.ddl {
2016 tracing::info!(step = step.step, %op, %sql, "executing migration step");
2017 match sqlx::query(sql).execute(migration_pool).await {
2018 Ok(_) => {
2019 let _ = crate::store::insert_migration_audit(
2020 config_pool,
2021 migration_plan_id,
2022 package_id,
2023 tenant_id,
2024 from_version,
2025 to_version,
2026 step.step as i32,
2027 &op,
2028 &step.schema,
2029 step.table.as_deref(),
2030 &step.object,
2031 &step.object_type,
2032 &step.description,
2033 step.ddl.as_deref(),
2034 &safety_str,
2035 &risk_str,
2036 "applied",
2037 None,
2038 )
2039 .await;
2040 applied += 1;
2041 }
2042 Err(e) => {
2043 let err_str = e.to_string();
2044 if matches!(step.safety, MigrationSafety::BestEffort) {
2045 tracing::warn!(step = step.step, %op, error = %e, "migration step failed (best-effort, continuing)");
2046 let msg = format!(
2047 "[Step {}] {} — Error: {}",
2048 step.step, step.description, err_str
2049 );
2050 warnings.push(msg);
2051 let _ = crate::store::insert_migration_audit(
2052 config_pool,
2053 migration_plan_id,
2054 package_id,
2055 tenant_id,
2056 from_version,
2057 to_version,
2058 step.step as i32,
2059 &op,
2060 &step.schema,
2061 step.table.as_deref(),
2062 &step.object,
2063 &step.object_type,
2064 &step.description,
2065 step.ddl.as_deref(),
2066 &safety_str,
2067 &risk_str,
2068 "warned",
2069 Some(&err_str),
2070 )
2071 .await;
2072 warned += 1;
2073 } else {
2074 let _ = crate::store::insert_migration_audit(
2075 config_pool,
2076 migration_plan_id,
2077 package_id,
2078 tenant_id,
2079 from_version,
2080 to_version,
2081 step.step as i32,
2082 &op,
2083 &step.schema,
2084 step.table.as_deref(),
2085 &step.object,
2086 &step.object_type,
2087 &step.description,
2088 step.ddl.as_deref(),
2089 &safety_str,
2090 &risk_str,
2091 "failed",
2092 Some(&err_str),
2093 )
2094 .await;
2095 return Err(AppError::Db(e));
2096 }
2097 }
2098 }
2099 }
2100 }
2101 }
2102 }
2103
2104 Ok(MigrationExecutionResult {
2105 applied,
2106 warned,
2107 warnings,
2108 })
2109}
2110
2111pub fn history_table_ddl(
2115 schema_name: &str,
2116 table_name: &str,
2117 pk_col: &str,
2118 source_cols: &[&ColumnConfig],
2119 dialect: &dyn Dialect,
2120) -> String {
2121 let history_name = format!("{}_history", table_name);
2122 let history_full = format!("{}.{}", quote(schema_name), quote(&history_name));
2123
2124 let mut col_defs: Vec<String> = Vec::new();
2125 col_defs.push(format!(
2126 "{} {} NOT NULL DEFAULT {}",
2127 quote("_history_id"),
2128 "UUID",
2129 dialect.uuid_default_expr()
2130 ));
2131 col_defs.push(format!("{} BIGINT NOT NULL", quote("_version")));
2132 col_defs.push(format!("{} TEXT NOT NULL", quote("_operation")));
2133 col_defs.push(format!(
2134 "{} {} NOT NULL DEFAULT {}",
2135 quote("_recorded_at"),
2136 dialect.audit_timestamp_type(),
2137 dialect.now_fn()
2138 ));
2139 col_defs.push(format!(
2140 "{} {}",
2141 quote("_valid_from"),
2142 dialect.audit_timestamp_type()
2143 ));
2144 col_defs.push(format!(
2145 "{} {}",
2146 quote("_valid_to"),
2147 dialect.audit_timestamp_type()
2148 ));
2149
2150 let config_col_names: HashSet<&str> = source_cols.iter().map(|c| c.name.as_str()).collect();
2151 for c in source_cols {
2152 let typ = dialect.ddl_type(&parse_canonical(&c.type_));
2153 col_defs.push(format!("{} {}", quote(&c.name), typ));
2154 }
2155 let audit_ts = dialect.audit_timestamp_type();
2156 for (name, typ) in [
2157 ("created_at", audit_ts),
2158 ("updated_at", audit_ts),
2159 ("archived_at", audit_ts),
2160 ("created_by", "TEXT"),
2161 ("updated_by", "TEXT"),
2162 ] {
2163 if !config_col_names.contains(name) {
2164 col_defs.push(format!("{} {}", quote(name), typ));
2165 }
2166 }
2167 col_defs.push(format!("PRIMARY KEY ({})", quote("_history_id")));
2168
2169 let history_full_quoted = format!("{}.{}", quote(schema_name), quote(&history_name));
2170 let idx_sql = format!(
2171 "-- index: CREATE INDEX IF NOT EXISTS {} ON {} ({}, {})",
2172 quote(&format!("{}_history_{}_idx", table_name, pk_col)),
2173 history_full_quoted,
2174 quote(pk_col),
2175 quote("_version")
2176 );
2177
2178 format!(
2179 "CREATE TABLE IF NOT EXISTS {} (\n {}\n)\n{}",
2180 history_full,
2181 col_defs.join(",\n "),
2182 idx_sql
2183 )
2184}
2185
2186fn history_index_ddl(schema_name: &str, table_name: &str, pk_col: &str) -> String {
2188 format!(
2189 "CREATE INDEX IF NOT EXISTS {} ON {}.{} ({}, {} DESC)",
2190 quote(&format!("{}_history_{}_idx", table_name, pk_col)),
2191 quote(schema_name),
2192 quote(&format!("{}_history", table_name)),
2193 quote(pk_col),
2194 quote("_version")
2195 )
2196}
2197
2198enum CompanionColumnOp<'a> {
2200 Add { name: &'a str, ty: &'a str },
2202 Rename { old: &'a str, new: &'a str },
2204 AlterType { name: &'a str, ty: &'a str },
2206}
2207
2208fn enabled_companion_suffixes(table: &TableConfig) -> Vec<&'static str> {
2210 let mut suffixes = Vec::new();
2211 if table.audit_log {
2212 suffixes.push("audit");
2213 }
2214 if table.versioning.as_ref().is_some_and(|v| v.enabled) {
2215 suffixes.push("history");
2216 }
2217 suffixes
2218}
2219
2220fn companion_column_steps(
2227 schema: &str,
2228 table: &TableConfig,
2229 op: &CompanionColumnOp<'_>,
2230) -> Vec<MigrationStep> {
2231 let mut steps = Vec::new();
2232 for suffix in enabled_companion_suffixes(table) {
2233 let companion = format!("{}_{}", table.name, suffix);
2234 let full = format!("{}.{}", quote(schema), quote(&companion));
2235 let (operation, object, ddl, description, safety, risk, risk_detail) = match op {
2236 CompanionColumnOp::Add { name, ty } => (
2237 MigrationOperation::AddColumn,
2238 name.to_string(),
2239 format!(
2242 "ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} {}",
2243 full,
2244 quote(name),
2245 ty
2246 ),
2247 format!(
2248 "Sync {} table: add column \"{}\" to \"{}\".\"{}\"",
2249 suffix, name, schema, companion
2250 ),
2251 MigrationSafety::Safe,
2252 MigrationRisk::None,
2253 None,
2254 ),
2255 CompanionColumnOp::Rename { old, new } => (
2256 MigrationOperation::RenameColumn,
2257 new.to_string(),
2258 format!(
2259 "ALTER TABLE {} RENAME COLUMN {} TO {}",
2260 full,
2261 quote(old),
2262 quote(new)
2263 ),
2264 format!(
2265 "Sync {} table: rename column \"{}\" → \"{}\" on \"{}\".\"{}\"",
2266 suffix, old, new, schema, companion
2267 ),
2268 MigrationSafety::Safe,
2269 MigrationRisk::None,
2270 None,
2271 ),
2272 CompanionColumnOp::AlterType { name, ty } => (
2273 MigrationOperation::AlterColumnType,
2274 name.to_string(),
2275 format!(
2276 "ALTER TABLE {} ALTER COLUMN {} TYPE {} USING {}::{}",
2277 full,
2278 quote(name),
2279 ty,
2280 quote(name),
2281 ty
2282 ),
2283 format!(
2284 "Sync {} table: change type of \"{}\".\"{}\".\"{}\" → {}",
2285 suffix, schema, companion, name, ty
2286 ),
2287 MigrationSafety::BestEffort,
2288 MigrationRisk::MayFail,
2289 Some(format!(
2290 "USING {}::{} cast may fail for incompatible values in the {} table.",
2291 name, ty, suffix
2292 )),
2293 ),
2294 };
2295 steps.push(MigrationStep {
2296 step: 0,
2297 operation,
2298 schema: schema.to_string(),
2299 table: Some(companion.clone()),
2300 object,
2301 object_type: "column".into(),
2302 description,
2303 ddl: Some(ddl),
2304 safety,
2305 risk,
2306 risk_detail,
2307 });
2308 }
2309 steps
2310}
2311
2312fn audit_table_ddl(
2316 schema_name: &str,
2317 table_name: &str,
2318 source_cols: &[&ColumnConfig],
2319 dialect: &dyn Dialect,
2320) -> String {
2321 let audit_name = format!("{}_audit", table_name);
2322 let audit_full = format!("{}.{}", quote(schema_name), quote(&audit_name));
2323
2324 let mut col_defs: Vec<String> = Vec::new();
2325 col_defs.push(format!(
2326 "{} {} NOT NULL DEFAULT {}",
2327 quote("audit_id"),
2328 "UUID",
2329 dialect.uuid_default_expr()
2330 ));
2331 col_defs.push(format!("{} TEXT NOT NULL", quote("audit_action")));
2332 col_defs.push(format!(
2333 "{} {} NOT NULL DEFAULT {}",
2334 quote("audit_at"),
2335 dialect.audit_timestamp_type(),
2336 dialect.now_fn()
2337 ));
2338 col_defs.push(format!("{} TEXT", quote("audit_by")));
2339 col_defs.push(format!(
2340 "{} {}",
2341 quote("changed_fields"),
2342 dialect.sys_json_type()
2343 ));
2344
2345 let config_col_names: HashSet<&str> = source_cols.iter().map(|c| c.name.as_str()).collect();
2346 for c in source_cols {
2347 let typ = dialect.ddl_type(&parse_canonical(&c.type_));
2348 col_defs.push(format!("{} {}", quote(&c.name), typ));
2349 }
2350 let audit_ts = dialect.audit_timestamp_type();
2351 for (name, typ) in [
2352 ("created_at", audit_ts),
2353 ("updated_at", audit_ts),
2354 ("archived_at", audit_ts),
2355 ("created_by", "TEXT"),
2356 ("updated_by", "TEXT"),
2357 ] {
2358 if !config_col_names.contains(name) {
2359 col_defs.push(format!("{} {}", quote(name), typ));
2360 }
2361 }
2362 col_defs.push(format!("PRIMARY KEY ({})", quote("audit_id")));
2363
2364 format!(
2365 "CREATE TABLE IF NOT EXISTS {} (\n {}\n)",
2366 audit_full,
2367 col_defs.join(",\n ")
2368 )
2369}
2370
2371#[cfg(test)]
2372mod enum_recreate_tests {
2373 use super::*;
2374
2375 fn schema(id: &str, name: &str) -> SchemaConfig {
2376 SchemaConfig {
2377 id: id.into(),
2378 name: name.into(),
2379 comment: None,
2380 }
2381 }
2382
2383 fn table(id: &str, name: &str, schema_id: &str) -> TableConfig {
2384 TableConfig {
2385 id: id.into(),
2386 schema_id: Some(schema_id.into()),
2387 name: name.into(),
2388 comment: None,
2389 primary_key: PrimaryKeyConfig::Single("id".into()),
2390 unique: vec![],
2391 check: vec![],
2392 audit_log: false,
2393 versioning: None,
2394 }
2395 }
2396
2397 fn col(id: &str, table_id: &str, name: &str, ty: &str, default: Option<&str>) -> ColumnConfig {
2398 ColumnConfig {
2399 id: id.into(),
2400 table_id: table_id.into(),
2401 name: name.into(),
2402 type_: ColumnTypeConfig::Simple(ty.into()),
2403 nullable: true,
2404 default: default.map(|d| ColumnDefaultConfig::Literal(d.into())),
2405 comment: None,
2406 asset: None,
2407 extensible: false,
2408 }
2409 }
2410
2411 fn enum_cfg(id: &str, name: &str, schema_id: &str, values: &[&str]) -> EnumConfig {
2412 EnumConfig {
2413 id: id.into(),
2414 schema_id: Some(schema_id.into()),
2415 name: name.into(),
2416 values: values.iter().map(|s| s.to_string()).collect(),
2417 comment: None,
2418 }
2419 }
2420
2421 fn ddls(steps: &[MigrationStep]) -> Vec<String> {
2422 steps.iter().filter_map(|s| s.ddl.clone()).collect()
2423 }
2424
2425 #[test]
2426 fn finds_scalar_and_array_dependent_columns() {
2427 let mut cfg = FullConfig::default();
2428 cfg.schemas = vec![schema("s1", "app")];
2429 cfg.tables = vec![table("t_orders", "orders", "s1")];
2430 cfg.columns = vec![
2431 col(
2432 "c1",
2433 "t_orders",
2434 "status",
2435 "order_status",
2436 Some("'pending'"),
2437 ),
2438 col("c2", "t_orders", "tags", "order_status[]", None),
2439 col("c3", "t_orders", "name", "text", None), ];
2441 let e = enum_cfg("e1", "order_status", "s1", &["pending", "shipped"]);
2442 let new_tables: HashMap<&str, &TableConfig> =
2443 cfg.tables.iter().map(|t| (t.id.as_str(), t)).collect();
2444 let new_schemas: HashMap<&str, &SchemaConfig> =
2445 cfg.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
2446
2447 let deps = enum_dependent_columns(&e, &cfg, &new_tables, &new_schemas, None);
2448 assert_eq!(deps.len(), 2);
2449 let scalar = deps.iter().find(|d| d.column == "status").unwrap();
2450 assert_eq!(scalar.schema, "app");
2451 assert_eq!(scalar.table, "orders");
2452 assert!(!scalar.is_array);
2453 assert_eq!(scalar.default.as_deref(), Some("'pending'"));
2454 let arr = deps.iter().find(|d| d.column == "tags").unwrap();
2455 assert!(arr.is_array);
2456 assert!(arr.default.is_none());
2457 }
2458
2459 #[test]
2460 fn recreate_sequence_emits_rename_create_recast_drop() {
2461 let e = enum_cfg("e1", "order_status", "s1", &["pending", "shipped"]);
2462 let deps = vec![
2463 EnumColumnRef {
2464 schema: "app".into(),
2465 table: "orders".into(),
2466 column: "status".into(),
2467 default: Some("'pending'".into()),
2468 is_array: false,
2469 },
2470 EnumColumnRef {
2471 schema: "app".into(),
2472 table: "orders".into(),
2473 column: "tags".into(),
2474 default: None,
2475 is_array: true,
2476 },
2477 ];
2478 let mut steps = Vec::new();
2479 recreate_enum_steps(&mut steps, "app", &e, &["cancelled"], &deps);
2480 let sql = ddls(&steps);
2481
2482 assert!(matches!(steps[0].safety, MigrationSafety::WarnOnly));
2484 assert!(steps[0].ddl.is_none());
2485
2486 assert_eq!(
2488 sql[0],
2489 r#"ALTER TYPE "app"."order_status" RENAME TO "order_status__arch_old""#
2490 );
2491 assert_eq!(
2492 sql[1],
2493 r#"CREATE TYPE "app"."order_status" AS ENUM ('pending', 'shipped')"#
2494 );
2495 assert_eq!(
2496 sql[2],
2497 r#"ALTER TABLE "app"."orders" ALTER COLUMN "status" DROP DEFAULT"#
2498 );
2499 assert_eq!(
2500 sql[3],
2501 r#"ALTER TABLE "app"."orders" ALTER COLUMN "status" TYPE "app"."order_status" USING "status"::text::"app"."order_status""#
2502 );
2503 assert_eq!(
2504 sql[4],
2505 r#"ALTER TABLE "app"."orders" ALTER COLUMN "status" SET DEFAULT 'pending'"#
2506 );
2507 assert_eq!(
2509 sql[5],
2510 r#"ALTER TABLE "app"."orders" ALTER COLUMN "tags" TYPE "app"."order_status"[] USING "tags"::text[]::"app"."order_status"[]"#
2511 );
2512 assert_eq!(
2513 *sql.last().unwrap(),
2514 r#"DROP TYPE IF EXISTS "app"."order_status__arch_old""#
2515 );
2516 }
2517}
2518
2519#[cfg(all(test, feature = "sqlite"))]
2520mod companion_sync_tests {
2521 use super::*;
2522 use crate::db::sqlite::SqliteDialect;
2523
2524 fn schema(id: &str, name: &str) -> SchemaConfig {
2525 SchemaConfig {
2526 id: id.into(),
2527 name: name.into(),
2528 comment: None,
2529 }
2530 }
2531
2532 fn table(id: &str, name: &str, audit: bool, versioning: bool) -> TableConfig {
2533 TableConfig {
2534 id: id.into(),
2535 schema_id: Some("s1".into()),
2536 name: name.into(),
2537 comment: None,
2538 primary_key: PrimaryKeyConfig::Single("id".into()),
2539 unique: vec![],
2540 check: vec![],
2541 audit_log: audit,
2542 versioning: versioning.then(|| VersioningConfig {
2543 enabled: true,
2544 keep_versions: None,
2545 }),
2546 }
2547 }
2548
2549 fn col(id: &str, name: &str, ty: &str) -> ColumnConfig {
2550 ColumnConfig {
2551 id: id.into(),
2552 table_id: "t1".into(),
2553 name: name.into(),
2554 type_: ColumnTypeConfig::Simple(ty.into()),
2555 nullable: true,
2556 default: None,
2557 comment: None,
2558 asset: None,
2559 extensible: false,
2560 }
2561 }
2562
2563 fn base(audit: bool, versioning: bool) -> FullConfig {
2564 let mut cfg = FullConfig::default();
2565 cfg.schemas = vec![schema("s1", "app")];
2566 cfg.tables = vec![table("t1", "orders", audit, versioning)];
2567 cfg.columns = vec![col("c0", "id", "uuid"), col("c1", "status", "text")];
2568 cfg
2569 }
2570
2571 fn plan(old: &FullConfig, new: &FullConfig) -> Vec<String> {
2572 let dialect = SqliteDialect;
2573 compute_migration_plan(old, new, None, None, &dialect, &HashMap::new())
2574 .unwrap()
2575 .steps
2576 .into_iter()
2577 .filter_map(|s| s.ddl)
2578 .collect()
2579 }
2580
2581 #[test]
2582 fn add_column_syncs_audit_and_history() {
2583 let old = base(true, true);
2584 let mut new = base(true, true);
2585 new.columns.push(col("c2", "note", "text"));
2586
2587 let sql = plan(&old, &new);
2588 assert!(sql
2589 .iter()
2590 .any(|s| s == r#"ALTER TABLE "app"."orders" ADD COLUMN "note" TEXT"#));
2591 assert!(sql.iter().any(
2592 |s| s == r#"ALTER TABLE "app"."orders_audit" ADD COLUMN IF NOT EXISTS "note" TEXT"#
2593 ));
2594 assert!(sql
2595 .iter()
2596 .any(|s| s
2597 == r#"ALTER TABLE "app"."orders_history" ADD COLUMN IF NOT EXISTS "note" TEXT"#));
2598 }
2599
2600 #[test]
2601 fn rename_column_syncs_companions() {
2602 let old = base(true, true);
2603 let mut new = base(true, true);
2604 new.columns[1].name = "state".into();
2605
2606 let sql = plan(&old, &new);
2607 assert!(sql
2608 .iter()
2609 .any(|s| s == r#"ALTER TABLE "app"."orders_audit" RENAME COLUMN "status" TO "state""#));
2610 assert!(sql.iter().any(
2611 |s| s == r#"ALTER TABLE "app"."orders_history" RENAME COLUMN "status" TO "state""#
2612 ));
2613 }
2614
2615 #[test]
2616 fn alter_type_syncs_companions() {
2617 let old = base(true, false);
2618 let mut new = base(true, false);
2619 new.columns[1].type_ = ColumnTypeConfig::Simple("integer".into());
2620
2621 let sql = plan(&old, &new);
2622 assert!(sql.iter().any(|s| s
2623 == r#"ALTER TABLE "app"."orders_audit" ALTER COLUMN "status" TYPE INTEGER USING "status"::INTEGER"#));
2624 assert!(!sql.iter().any(|s| s.contains("orders_history")));
2626 }
2627
2628 #[test]
2629 fn no_companion_steps_when_features_disabled() {
2630 let old = base(false, false);
2631 let mut new = base(false, false);
2632 new.columns.push(col("c2", "note", "text"));
2633
2634 let sql = plan(&old, &new);
2635 assert!(sql
2636 .iter()
2637 .any(|s| s == r#"ALTER TABLE "app"."orders" ADD COLUMN "note" TEXT"#));
2638 assert!(!sql.iter().any(|s| s.contains("orders_audit")));
2639 assert!(!sql.iter().any(|s| s.contains("orders_history")));
2640 }
2641
2642 #[test]
2643 fn nullability_change_does_not_touch_companions() {
2644 let old = base(true, true);
2645 let mut new = base(true, true);
2646 new.columns[1].nullable = false;
2647
2648 let sql = plan(&old, &new);
2649 assert!(sql.iter().any(|s| s.contains("SET NOT NULL")));
2651 assert!(!sql.iter().any(|s| s.contains("orders_audit")));
2652 assert!(!sql.iter().any(|s| s.contains("orders_history")));
2653 }
2654}