1use crate::config::types::*;
5use crate::config::{validate, FullConfig};
6use crate::db::parse_canonical;
7use crate::db::pool::Pool;
8use crate::db::Dialect;
9use crate::error::AppError;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12
13fn quote(s: &str) -> String {
14 format!("\"{}\"", s.replace('\\', "\\\\").replace('"', "\\\""))
15}
16
17pub const RLS_TENANT_COLUMN: &str = "tenant_id";
19
20pub async fn apply_migrations(
25 pool: &Pool,
26 config: &FullConfig,
27 schema_override: Option<&str>,
28 rls_tenant_column: Option<&str>,
29 dialect: &dyn Dialect,
30) -> Result<(), AppError> {
31 validate(config)?;
32 let default_sid = config
33 .schemas
34 .first()
35 .map(|s| s.id.as_str())
36 .ok_or_else(|| {
37 AppError::Config(crate::error::ConfigError::Validation(
38 "at least one schema required".into(),
39 ))
40 })?;
41
42 if let Some(s) = schema_override {
43 let name = quote(s);
44 sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {}", name))
45 .execute(pool)
46 .await?;
47 }
48
49 let schemas_by_id: HashMap<_, _> = config.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
50 let tables_by_id: HashMap<_, _> = config.tables.iter().map(|t| (t.id.as_str(), t)).collect();
51 let columns_by_table: HashMap<_, Vec<&ColumnConfig>> =
52 config.columns.iter().fold(HashMap::new(), |mut m, c| {
53 m.entry(c.table_id.as_str()).or_default().push(c);
54 m
55 });
56
57 if schema_override.is_none() {
59 for s in &config.schemas {
60 let name = quote(&s.name);
61 let comment = s
62 .comment
63 .as_ref()
64 .map(|c| format!("COMMENT ON SCHEMA {} IS '{}'", name, c.replace('\'', "''")));
65 sqlx::query(&format!("CREATE SCHEMA IF NOT EXISTS {}", name))
66 .execute(pool)
67 .await?;
68 if let Some(sql) = comment {
69 let _ = sqlx::query(&sql).execute(pool).await;
70 }
71 }
72 }
73
74 for e in &config.enums {
75 let sid = e.schema_id.as_deref().unwrap_or(default_sid);
76 let schema = schemas_by_id.get(sid).ok_or_else(|| {
77 AppError::Config(crate::error::ConfigError::MissingReference {
78 kind: "schema",
79 id: sid.to_string(),
80 })
81 })?;
82 let schema_name = quote(schema_override.unwrap_or(&schema.name));
83 let type_name = quote(&e.name);
84 if dialect.supports_named_enum_types() {
85 let values: Vec<String> = e
86 .values
87 .iter()
88 .map(|v| format!("'{}'", v.replace('\'', "''")))
89 .collect();
90 let sql = format!(
91 "CREATE TYPE {}.{} AS ENUM ({})",
92 schema_name,
93 type_name,
94 values.join(", ")
95 );
96 let _ = sqlx::query(&sql).execute(pool).await;
97 }
98 }
99
100 for t in &config.tables {
101 let sid = t.schema_id.as_deref().unwrap_or(default_sid);
102 let schema = schemas_by_id.get(sid).ok_or_else(|| {
103 AppError::Config(crate::error::ConfigError::MissingReference {
104 kind: "schema",
105 id: sid.to_string(),
106 })
107 })?;
108 let schema_name = quote(schema_override.unwrap_or(&schema.name));
109 let table_name = quote(&t.name);
110 let full_name = format!("{}.{}", schema_name, table_name);
111
112 let cols = columns_by_table
113 .get(t.id.as_str())
114 .map(|v| v.as_slice())
115 .unwrap_or(&[]);
116 let mut col_defs: Vec<String> = Vec::new();
117 for c in cols {
118 let typ = dialect.ddl_type(&parse_canonical(&c.type_));
119 let mut def = format!("{} {}", quote(&c.name), typ);
120 if !c.nullable {
121 def.push_str(" NOT NULL");
122 }
123 if let Some(ref d) = c.default {
124 def.push_str(" DEFAULT ");
125 match d {
126 ColumnDefaultConfig::Literal(s) => def.push_str(s),
127 ColumnDefaultConfig::Expression { expression } => def.push_str(expression),
128 }
129 }
130 col_defs.push(def);
131 }
132
133 let config_col_names: HashSet<&str> = cols.iter().map(|c| c.name.as_str()).collect();
134 let ts_default = format!(
135 "{} NOT NULL DEFAULT {}",
136 dialect.sys_timestamp_type(),
137 dialect.now_fn()
138 );
139 let ts_nullable = dialect.sys_timestamp_type().to_string();
140 for (name, def_suffix) in [
141 ("created_at", ts_default.as_str()),
142 ("updated_at", ts_default.as_str()),
143 ("archived_at", ts_nullable.as_str()),
144 ("created_by", "TEXT"),
145 ("updated_by", "TEXT"),
146 ] {
147 if !config_col_names.contains(name) {
148 col_defs.push(format!("{} {}", quote(name), def_suffix));
149 }
150 }
151
152 let pk_cols = match &t.primary_key {
153 PrimaryKeyConfig::Single(s) => vec![quote(s)],
154 PrimaryKeyConfig::Composite(v) => v.iter().map(|s| quote(s)).collect::<Vec<_>>(),
155 };
156 let pk_def = format!("PRIMARY KEY ({})", pk_cols.join(", "));
157 col_defs.push(pk_def);
158
159 for u in &t.unique {
160 let cols: Vec<String> = u.iter().map(|s| quote(s)).collect();
161 col_defs.push(format!("UNIQUE ({})", cols.join(", ")));
162 }
163 for ch in &t.check {
164 col_defs.push(format!(
165 "CONSTRAINT {} CHECK ({})",
166 quote(&ch.name),
167 ch.expression
168 ));
169 }
170
171 let sql = format!(
172 "CREATE TABLE IF NOT EXISTS {} (\n {}\n)",
173 full_name,
174 col_defs.join(",\n ")
175 );
176 sqlx::query(&sql).execute(pool).await?;
177
178 if t.audit_log {
179 let schema_raw = schema_override.unwrap_or(&schema.name);
180 let audit_sql = audit_table_ddl(schema_raw, &t.name, cols, dialect);
181 sqlx::query(&audit_sql).execute(pool).await?;
182 let pk_col = match &t.primary_key {
183 PrimaryKeyConfig::Single(s) => s.clone(),
184 PrimaryKeyConfig::Composite(v) => v[0].clone(),
185 };
186 let audit_full = format!(
187 "{}.{}",
188 quote(schema_raw),
189 quote(&format!("{}_audit", t.name))
190 );
191 let idx_sql = format!(
192 "CREATE INDEX IF NOT EXISTS {} ON {} ({}, {})",
193 quote(&format!("{}_audit_record_idx", t.name)),
194 audit_full,
195 quote(&pk_col),
196 quote("audit_at")
197 );
198 let _ = sqlx::query(&idx_sql).execute(pool).await;
199 }
200
201 if let Some(col) = rls_tenant_column {
202 if dialect.supports_rls() {
203 if !config_col_names.contains(col) {
204 let q_col = quote(col);
205 let add_col = format!(
206 "ALTER TABLE {} ADD COLUMN IF NOT EXISTS {} TEXT",
207 full_name, q_col
208 );
209 sqlx::query(&add_col).execute(pool).await?;
210 }
211 let enable_rls = format!("ALTER TABLE {} ENABLE ROW LEVEL SECURITY", full_name);
212 sqlx::query(&enable_rls).execute(pool).await?;
213 let q_col = quote(col);
214 let setting = "current_setting('app.tenant_id', true)";
215 let cond = format!("{} = {}", q_col, setting);
216 let policy_prefix = format!("rls_tenant_{}", t.name);
217 let policies: &[(&str, &str, Option<&str>, Option<&str>)] = &[
218 ("select", "SELECT", Some(cond.as_str()), None),
219 ("insert", "INSERT", None, Some(cond.as_str())),
220 ("update", "UPDATE", Some(cond.as_str()), Some(cond.as_str())),
221 ("delete", "DELETE", Some(cond.as_str()), None),
222 ];
223 for (suffix, cmd, using_cond, with_check) in policies.iter() {
224 let policy_name = format!("{}_{}", policy_prefix, suffix);
225 let drop_sql = format!(
226 "DROP POLICY IF EXISTS {} ON {}",
227 quote(&policy_name),
228 full_name
229 );
230 let _ = sqlx::query(&drop_sql).execute(pool).await;
231 let create_sql = match (using_cond, with_check) {
232 (Some(u), Some(w)) => format!(
233 "CREATE POLICY {} ON {} FOR {} USING ( {} ) WITH CHECK ( {} )",
234 quote(&policy_name),
235 full_name,
236 cmd,
237 u,
238 w
239 ),
240 (Some(u), None) => format!(
241 "CREATE POLICY {} ON {} FOR {} USING ( {} )",
242 quote(&policy_name),
243 full_name,
244 cmd,
245 u
246 ),
247 (None, Some(w)) => format!(
248 "CREATE POLICY {} ON {} FOR {} WITH CHECK ( {} )",
249 quote(&policy_name),
250 full_name,
251 cmd,
252 w
253 ),
254 (None, None) => continue,
255 };
256 sqlx::query(&create_sql).execute(pool).await?;
257 }
258 } else {
259 tracing::warn!(table = %full_name, dialect = %dialect.name(), "RLS requested but not supported by this dialect; skipping");
260 }
261 }
262 }
263
264 for idx in &config.indexes {
265 let sid = idx.schema_id.as_deref().unwrap_or(default_sid);
266 let schema = schemas_by_id.get(sid).ok_or_else(|| {
267 AppError::Config(crate::error::ConfigError::MissingReference {
268 kind: "schema",
269 id: sid.to_string(),
270 })
271 })?;
272 let table = tables_by_id.get(idx.table_id.as_str()).ok_or_else(|| {
273 AppError::Config(crate::error::ConfigError::MissingReference {
274 kind: "table",
275 id: idx.table_id.clone(),
276 })
277 })?;
278 let schema_name = quote(schema_override.unwrap_or(&schema.name));
279 let table_name = quote(&table.name);
280 let full_table = format!("{}.{}", schema_name, table_name);
281 let index_name = quote(&idx.name);
282
283 let mut col_parts: Vec<String> = Vec::new();
284 for col in &idx.columns {
285 match col {
286 IndexColumnEntry::Name(n) => col_parts.push(quote(n)),
287 IndexColumnEntry::Spec {
288 name, direction, ..
289 } => {
290 let dir = direction
291 .as_deref()
292 .map(|d| format!(" {}", d.to_uppercase()))
293 .unwrap_or_default();
294 col_parts.push(format!("{}{}", quote(name), dir));
295 }
296 IndexColumnEntry::Expression { expression } => col_parts.push(expression.clone()),
297 }
298 }
299 let method = idx.method.as_deref().unwrap_or("btree");
300 let unique = if idx.unique { "UNIQUE " } else { "" };
301 let include: String = if idx.include.is_empty() {
302 String::new()
303 } else {
304 let inc: Vec<String> = idx.include.iter().map(|s| quote(s)).collect();
305 format!(" INCLUDE ({})", inc.join(", "))
306 };
307 let where_clause: String = idx
308 .where_
309 .as_ref()
310 .map(|w| format!(" WHERE {}", w))
311 .unwrap_or_default();
312
313 let sql = format!(
314 "CREATE {}INDEX IF NOT EXISTS {} ON {} USING {} ({}){}{}",
315 unique,
316 index_name,
317 full_table,
318 method,
319 col_parts.join(", "),
320 include,
321 where_clause
322 );
323 let _ = sqlx::query(&sql).execute(pool).await;
324 }
325
326 for rel in &config.relationships {
327 let from_sid = rel.from_schema_id.as_str();
328 let to_sid = rel.to_schema_id.as_str();
329 let from_schema = schemas_by_id.get(from_sid).ok_or_else(|| {
330 AppError::Config(crate::error::ConfigError::MissingReference {
331 kind: "schema",
332 id: from_sid.to_string(),
333 })
334 })?;
335 let from_table = tables_by_id
336 .get(rel.from_table_id.as_str())
337 .ok_or_else(|| {
338 AppError::Config(crate::error::ConfigError::MissingReference {
339 kind: "table",
340 id: rel.from_table_id.clone(),
341 })
342 })?;
343 let to_schema = schemas_by_id.get(to_sid).ok_or_else(|| {
344 AppError::Config(crate::error::ConfigError::MissingReference {
345 kind: "schema",
346 id: to_sid.to_string(),
347 })
348 })?;
349 let to_table = tables_by_id.get(rel.to_table_id.as_str()).ok_or_else(|| {
350 AppError::Config(crate::error::ConfigError::MissingReference {
351 kind: "table",
352 id: rel.to_table_id.clone(),
353 })
354 })?;
355
356 let from_schema_name = schema_override.unwrap_or(&from_schema.name);
357 let to_schema_name = schema_override.unwrap_or(&to_schema.name);
358
359 let from_col = config
360 .columns
361 .iter()
362 .find(|c| c.id == rel.from_column_id)
363 .map(|c| c.name.as_str())
364 .ok_or_else(|| {
365 AppError::Config(crate::error::ConfigError::MissingReference {
366 kind: "column",
367 id: rel.from_column_id.clone(),
368 })
369 })?;
370 let to_col = config
371 .columns
372 .iter()
373 .find(|c| c.id == rel.to_column_id)
374 .map(|c| c.name.as_str())
375 .ok_or_else(|| {
376 AppError::Config(crate::error::ConfigError::MissingReference {
377 kind: "column",
378 id: rel.to_column_id.clone(),
379 })
380 })?;
381
382 let from_full = format!("{}.{}", quote(from_schema_name), quote(&from_table.name));
383 let to_full = format!("{}.{}", quote(to_schema_name), quote(&to_table.name));
384 let constraint_name = rel.name.as_deref().unwrap_or(&rel.id);
385 let on_update = rel.on_update.as_deref().unwrap_or("NO ACTION");
386 let on_delete = rel.on_delete.as_deref().unwrap_or("NO ACTION");
387
388 let sql = format!(
389 "ALTER TABLE {} ADD CONSTRAINT {} FOREIGN KEY ({}) REFERENCES {} ({}) ON UPDATE {} ON DELETE {}",
390 from_full,
391 quote(constraint_name),
392 quote(from_col),
393 to_full,
394 quote(to_col),
395 on_update,
396 on_delete
397 );
398 let _ = sqlx::query(&sql).execute(pool).await;
399 }
400
401 Ok(())
402}
403
404pub async fn revert_migrations(
407 pool: &Pool,
408 config: &FullConfig,
409 schema_override: Option<&str>,
410) -> Result<(), AppError> {
411 let default_sid = config
412 .schemas
413 .first()
414 .map(|s| s.id.as_str())
415 .ok_or_else(|| {
416 AppError::Config(crate::error::ConfigError::Validation(
417 "at least one schema required".into(),
418 ))
419 })?;
420
421 let schemas_by_id: HashMap<_, _> = config.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
422
423 for t in &config.tables {
425 let sid = t.schema_id.as_deref().unwrap_or(default_sid);
426 let schema = schemas_by_id.get(sid).ok_or_else(|| {
427 AppError::Config(crate::error::ConfigError::MissingReference {
428 kind: "schema",
429 id: sid.to_string(),
430 })
431 })?;
432 let schema_raw = schema_override.unwrap_or(&schema.name);
433 let schema_name = quote(schema_raw);
434 let table_name = quote(&t.name);
435 let full_name = format!("{}.{}", schema_name, table_name);
436 if t.audit_log {
437 let audit_full = format!("{}.{}", schema_name, quote(&format!("{}_audit", t.name)));
438 let _ = sqlx::query(&format!("DROP TABLE IF EXISTS {} CASCADE", audit_full))
439 .execute(pool)
440 .await;
441 }
442 let drop_sql = format!("DROP TABLE IF EXISTS {} CASCADE", full_name);
443 let _ = sqlx::query(&drop_sql).execute(pool).await;
444 }
445
446 for e in &config.enums {
448 let sid = e.schema_id.as_deref().unwrap_or(default_sid);
449 let schema = schemas_by_id.get(sid).ok_or_else(|| {
450 AppError::Config(crate::error::ConfigError::MissingReference {
451 kind: "schema",
452 id: sid.to_string(),
453 })
454 })?;
455 let schema_name = quote(schema_override.unwrap_or(&schema.name));
456 let type_name = quote(&e.name);
457 let drop_sql = format!("DROP TYPE IF EXISTS {}.{} CASCADE", schema_name, type_name);
458 let _ = sqlx::query(&drop_sql).execute(pool).await;
459 }
460
461 if schema_override.is_none() {
463 for s in &config.schemas {
464 if s.name.eq_ignore_ascii_case("public") {
465 continue;
466 }
467 let schema_name = quote(&s.name);
468 let drop_sql = format!("DROP SCHEMA IF EXISTS {} CASCADE", schema_name);
469 let _ = sqlx::query(&drop_sql).execute(pool).await;
470 }
471 }
472
473 Ok(())
474}
475
476#[derive(Debug, Clone, Serialize, Deserialize)]
479#[serde(rename_all = "snake_case")]
480pub enum MigrationOperation {
481 CreateSchema,
482 CreateEnum,
483 DropEnum,
484 AddEnumValue,
485 RemoveEnumValue,
486 CreateTable,
487 DropTable,
488 AddColumn,
489 DropColumn,
490 RenameColumn,
491 AlterColumnType,
492 BackfillNulls,
493 SetNotNull,
494 DropNotNull,
495 SetDefault,
496 DropDefault,
497 CreateIndex,
498 DropIndex,
499 AddForeignKey,
500 DropForeignKey,
501}
502
503impl std::fmt::Display for MigrationOperation {
504 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
505 let s = serde_json::to_value(self)
506 .ok()
507 .and_then(|v| v.as_str().map(String::from))
508 .unwrap_or_else(|| format!("{:?}", self));
509 write!(f, "{}", s)
510 }
511}
512
513#[derive(Debug, Clone, Serialize, Deserialize)]
515#[serde(rename_all = "snake_case")]
516pub enum MigrationSafety {
517 Safe,
519 BestEffort,
521 WarnOnly,
523}
524
525#[derive(Debug, Clone, Serialize, Deserialize)]
527#[serde(rename_all = "snake_case")]
528pub enum MigrationRisk {
529 None,
530 MayFail,
532 ExistingNullsMustBeAbsent,
534 DataWillBeModified,
536 ManualActionRequired,
538}
539
540#[derive(Debug, Clone, Serialize, Deserialize)]
542pub struct MigrationStep {
543 pub step: usize,
544 pub operation: MigrationOperation,
545 pub schema: String,
546 pub table: Option<String>,
547 pub object: String,
549 pub object_type: String,
551 pub description: String,
552 pub ddl: Option<String>,
554 pub safety: MigrationSafety,
555 pub risk: MigrationRisk,
556 pub risk_detail: Option<String>,
557}
558
559#[derive(Debug, Clone, Serialize, Deserialize)]
561pub struct MigrationPlan {
562 pub steps: Vec<MigrationStep>,
563}
564
565#[derive(Debug, Clone, Serialize)]
566pub struct MigrationSummary {
567 pub total: usize,
568 pub safe: usize,
569 pub best_effort: usize,
570 pub warn_only: usize,
571}
572
573impl MigrationPlan {
574 pub fn summary(&self) -> MigrationSummary {
575 let (mut safe, mut best_effort, mut warn_only) = (0, 0, 0);
576 for s in &self.steps {
577 match s.safety {
578 MigrationSafety::Safe => safe += 1,
579 MigrationSafety::BestEffort => best_effort += 1,
580 MigrationSafety::WarnOnly => warn_only += 1,
581 }
582 }
583 MigrationSummary {
584 total: self.steps.len(),
585 safe,
586 best_effort,
587 warn_only,
588 }
589 }
590}
591
592pub struct MigrationExecutionResult {
594 pub applied: usize,
595 pub warned: usize,
596 pub warnings: Vec<String>,
597}
598
599fn default_str(d: &ColumnDefaultConfig) -> String {
600 match d {
601 ColumnDefaultConfig::Literal(s) => s.clone(),
602 ColumnDefaultConfig::Expression { expression } => expression.clone(),
603 }
604}
605
606pub fn compute_migration_plan(
612 old: &FullConfig,
613 new: &FullConfig,
614 schema_override: Option<&str>,
615 _rls_tenant_column: Option<&str>,
616 dialect: &dyn Dialect,
617) -> Result<MigrationPlan, AppError> {
618 validate(new)?;
619
620 let default_old_sid = old.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
621 let default_new_sid = new.schemas.first().map(|s| s.id.as_str()).unwrap_or("");
622
623 let old_schemas: HashMap<&str, &SchemaConfig> =
624 old.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
625 let new_schemas: HashMap<&str, &SchemaConfig> =
626 new.schemas.iter().map(|s| (s.id.as_str(), s)).collect();
627 let old_tables: HashMap<&str, &TableConfig> =
628 old.tables.iter().map(|t| (t.id.as_str(), t)).collect();
629 let new_tables: HashMap<&str, &TableConfig> =
630 new.tables.iter().map(|t| (t.id.as_str(), t)).collect();
631 let old_columns: HashMap<&str, &ColumnConfig> =
632 old.columns.iter().map(|c| (c.id.as_str(), c)).collect();
633 let old_enums: HashMap<&str, &EnumConfig> =
634 old.enums.iter().map(|e| (e.id.as_str(), e)).collect();
635 let new_enums: HashMap<&str, &EnumConfig> =
636 new.enums.iter().map(|e| (e.id.as_str(), e)).collect();
637 let old_indexes: HashMap<&str, &IndexConfig> =
638 old.indexes.iter().map(|i| (i.id.as_str(), i)).collect();
639 let new_indexes: HashMap<&str, &IndexConfig> =
640 new.indexes.iter().map(|i| (i.id.as_str(), i)).collect();
641 let old_rels: HashMap<&str, &RelationshipConfig> = old
642 .relationships
643 .iter()
644 .map(|r| (r.id.as_str(), r))
645 .collect();
646 let new_rels: HashMap<&str, &RelationshipConfig> = new
647 .relationships
648 .iter()
649 .map(|r| (r.id.as_str(), r))
650 .collect();
651
652 let mut steps: Vec<MigrationStep> = Vec::new();
653
654 let schema_name_for = |sid: &str, schemas: &HashMap<&str, &SchemaConfig>| -> String {
655 schema_override.map(String::from).unwrap_or_else(|| {
656 schemas
657 .get(sid)
658 .map(|s| s.name.clone())
659 .unwrap_or_else(|| sid.to_string())
660 })
661 };
662
663 if schema_override.is_none() {
665 for s in &new.schemas {
666 if !old_schemas.contains_key(s.id.as_str()) {
667 steps.push(MigrationStep {
668 step: 0,
669 operation: MigrationOperation::CreateSchema,
670 schema: s.name.clone(),
671 table: None,
672 object: s.name.clone(),
673 object_type: "schema".into(),
674 description: format!("Create schema \"{}\"", s.name),
675 ddl: Some(format!("CREATE SCHEMA IF NOT EXISTS {}", quote(&s.name))),
676 safety: MigrationSafety::Safe,
677 risk: MigrationRisk::None,
678 risk_detail: None,
679 });
680 }
681 }
682 }
683
684 for new_enum in &new.enums {
686 let sid = new_enum.schema_id.as_deref().unwrap_or(default_new_sid);
687 let schema = schema_name_for(sid, &new_schemas);
688
689 if let Some(old_enum) = old_enums.get(new_enum.id.as_str()) {
690 let old_vals: HashSet<&str> = old_enum.values.iter().map(String::as_str).collect();
691 let new_vals: HashSet<&str> = new_enum.values.iter().map(String::as_str).collect();
692 for val in new_enum
693 .values
694 .iter()
695 .map(String::as_str)
696 .filter(|v| !old_vals.contains(v))
697 {
698 steps.push(MigrationStep {
699 step: 0,
700 operation: MigrationOperation::AddEnumValue,
701 schema: schema.clone(),
702 table: None,
703 object: format!("{}:{}", new_enum.name, val),
704 object_type: "enum_value".into(),
705 description: format!(
706 "Add value '{}' to enum \"{}\".\"{}\"",
707 val, schema, new_enum.name
708 ),
709 ddl: Some(format!(
710 "ALTER TYPE {}.{} ADD VALUE IF NOT EXISTS '{}'",
711 quote(&schema),
712 quote(&new_enum.name),
713 val.replace('\'', "''")
714 )),
715 safety: MigrationSafety::Safe,
716 risk: MigrationRisk::None,
717 risk_detail: None,
718 });
719 }
720 for val in old_enum
721 .values
722 .iter()
723 .map(String::as_str)
724 .filter(|v| !new_vals.contains(v))
725 {
726 steps.push(MigrationStep {
727 step: 0,
728 operation: MigrationOperation::RemoveEnumValue,
729 schema: schema.clone(),
730 table: None,
731 object: format!("{}:{}", new_enum.name, val),
732 object_type: "enum_value".into(),
733 description: format!("Enum value '{}' removed from config on \"{}\".\"{}\"", val, schema, new_enum.name),
734 ddl: None,
735 safety: MigrationSafety::WarnOnly,
736 risk: MigrationRisk::ManualActionRequired,
737 risk_detail: Some(format!(
738 "PostgreSQL does not support removing enum values. '{}' was removed from config but NOT from the database type. Recreate the type manually if needed.",
739 val
740 )),
741 });
742 }
743 } else {
744 let values: Vec<String> = new_enum
745 .values
746 .iter()
747 .map(|v| format!("'{}'", v.replace('\'', "''")))
748 .collect();
749 steps.push(MigrationStep {
750 step: 0,
751 operation: MigrationOperation::CreateEnum,
752 schema: schema.clone(),
753 table: None,
754 object: new_enum.name.clone(),
755 object_type: "enum".into(),
756 description: format!("Create enum type \"{}\".\"{}\"", schema, new_enum.name),
757 ddl: Some(format!("CREATE TYPE {}.{} AS ENUM ({})", quote(&schema), quote(&new_enum.name), values.join(", "))),
758 safety: MigrationSafety::BestEffort,
759 risk: MigrationRisk::None,
760 risk_detail: Some("PostgreSQL has no CREATE TYPE IF NOT EXISTS; ignored if the type already exists.".into()),
761 });
762 }
763 }
764 for old_enum in &old.enums {
765 if !new_enums.contains_key(old_enum.id.as_str()) {
766 let sid = old_enum.schema_id.as_deref().unwrap_or(default_old_sid);
767 let schema = schema_name_for(sid, &old_schemas);
768 steps.push(MigrationStep {
769 step: 0,
770 operation: MigrationOperation::DropEnum,
771 schema: schema.clone(),
772 table: None,
773 object: old_enum.name.clone(),
774 object_type: "enum".into(),
775 description: format!("Enum \"{}\".\"{}\" removed from config", schema, old_enum.name),
776 ddl: None,
777 safety: MigrationSafety::WarnOnly,
778 risk: MigrationRisk::ManualActionRequired,
779 risk_detail: Some("Enum type NOT dropped from database (data safety). Run DROP TYPE manually if intended.".into()),
780 });
781 }
782 }
783
784 let added_table_ids: HashSet<&str> = new
786 .tables
787 .iter()
788 .filter(|t| !old_tables.contains_key(t.id.as_str()))
789 .map(|t| t.id.as_str())
790 .collect();
791
792 let cols_by_table: HashMap<&str, Vec<&ColumnConfig>> =
793 new.columns.iter().fold(HashMap::new(), |mut m, c| {
794 m.entry(c.table_id.as_str()).or_default().push(c);
795 m
796 });
797
798 for new_table in &new.tables {
799 if !added_table_ids.contains(new_table.id.as_str()) {
800 continue;
801 }
802 let sid = new_table.schema_id.as_deref().unwrap_or(default_new_sid);
803 let schema = schema_name_for(sid, &new_schemas);
804 let full = format!("{}.{}", quote(&schema), quote(&new_table.name));
805
806 let cols = cols_by_table
807 .get(new_table.id.as_str())
808 .map(|v| v.as_slice())
809 .unwrap_or(&[]);
810 let mut col_defs: Vec<String> = Vec::new();
811 for c in cols {
812 let typ = dialect.ddl_type(&parse_canonical(&c.type_));
813 let mut def = format!("{} {}", quote(&c.name), typ);
814 if !c.nullable {
815 def.push_str(" NOT NULL");
816 }
817 if let Some(ref d) = c.default {
818 def.push_str(" DEFAULT ");
819 match d {
820 ColumnDefaultConfig::Literal(s) => def.push_str(s),
821 ColumnDefaultConfig::Expression { expression } => def.push_str(expression),
822 }
823 }
824 col_defs.push(def);
825 }
826 let cfg_col_names: HashSet<&str> = cols.iter().map(|c| c.name.as_str()).collect();
827 for (name, suf) in [
832 ("created_at", "TIMESTAMPTZ NOT NULL DEFAULT NOW()"),
833 ("updated_at", "TIMESTAMPTZ NOT NULL DEFAULT NOW()"),
834 ("archived_at", "TIMESTAMPTZ"),
835 ("created_by", "TEXT"),
836 ("updated_by", "TEXT"),
837 ] {
838 if !cfg_col_names.contains(name) {
839 col_defs.push(format!("{} {}", quote(name), suf));
840 }
841 }
842 let pk_cols = match &new_table.primary_key {
843 PrimaryKeyConfig::Single(s) => vec![quote(s)],
844 PrimaryKeyConfig::Composite(v) => v.iter().map(|s| quote(s)).collect(),
845 };
846 col_defs.push(format!("PRIMARY KEY ({})", pk_cols.join(", ")));
847 for u in &new_table.unique {
848 col_defs.push(format!(
849 "UNIQUE ({})",
850 u.iter().map(|s| quote(s)).collect::<Vec<_>>().join(", ")
851 ));
852 }
853 for ch in &new_table.check {
854 col_defs.push(format!(
855 "CONSTRAINT {} CHECK ({})",
856 quote(&ch.name),
857 ch.expression
858 ));
859 }
860
861 steps.push(MigrationStep {
862 step: 0,
863 operation: MigrationOperation::CreateTable,
864 schema: schema.clone(),
865 table: Some(new_table.name.clone()),
866 object: new_table.name.clone(),
867 object_type: "table".into(),
868 description: format!("Create table \"{}\".\"{}\"", schema, new_table.name),
869 ddl: Some(format!(
870 "CREATE TABLE IF NOT EXISTS {} (\n {}\n)",
871 full,
872 col_defs.join(",\n ")
873 )),
874 safety: MigrationSafety::Safe,
875 risk: MigrationRisk::None,
876 risk_detail: None,
877 });
878 if new_table.audit_log {
879 let audit_ddl = audit_table_ddl(&schema, &new_table.name, cols, dialect);
880 steps.push(MigrationStep {
881 step: 0,
882 operation: MigrationOperation::CreateTable,
883 schema: schema.clone(),
884 table: Some(format!("{}_audit", new_table.name)),
885 object: format!("{}_audit", new_table.name),
886 object_type: "table".into(),
887 description: format!(
888 "Create audit table \"{}\".\"{}_audit\"",
889 schema, new_table.name
890 ),
891 ddl: Some(audit_ddl),
892 safety: MigrationSafety::Safe,
893 risk: MigrationRisk::None,
894 risk_detail: None,
895 });
896 }
897 }
898
899 for new_table in &new.tables {
901 if added_table_ids.contains(new_table.id.as_str()) {
902 continue;
903 }
904 if let Some(old_table) = old_tables.get(new_table.id.as_str()) {
905 if !old_table.audit_log && new_table.audit_log {
906 let sid = new_table.schema_id.as_deref().unwrap_or(default_new_sid);
907 let schema = schema_name_for(sid, &new_schemas);
908 let cols = cols_by_table
909 .get(new_table.id.as_str())
910 .map(|v| v.as_slice())
911 .unwrap_or(&[]);
912 let audit_ddl = audit_table_ddl(&schema, &new_table.name, cols, dialect);
913 steps.push(MigrationStep {
914 step: 0,
915 operation: MigrationOperation::CreateTable,
916 schema: schema.clone(),
917 table: Some(format!("{}_audit", new_table.name)),
918 object: format!("{}_audit", new_table.name),
919 object_type: "table".into(),
920 description: format!(
921 "Enable audit log: create \"{}\".\"{}_audit\"",
922 schema, new_table.name
923 ),
924 ddl: Some(audit_ddl),
925 safety: MigrationSafety::Safe,
926 risk: MigrationRisk::None,
927 risk_detail: None,
928 });
929 }
930 }
931 }
932
933 for old_table in &old.tables {
934 if !new_tables.contains_key(old_table.id.as_str()) {
935 let sid = old_table.schema_id.as_deref().unwrap_or(default_old_sid);
936 let schema = schema_name_for(sid, &old_schemas);
937 steps.push(MigrationStep {
938 step: 0,
939 operation: MigrationOperation::DropTable,
940 schema: schema.clone(),
941 table: Some(old_table.name.clone()),
942 object: old_table.name.clone(),
943 object_type: "table".into(),
944 description: format!("Table \"{}\".\"{}\" removed from config", schema, old_table.name),
945 ddl: None,
946 safety: MigrationSafety::WarnOnly,
947 risk: MigrationRisk::ManualActionRequired,
948 risk_detail: Some("Table NOT dropped from database (data safety). Run DROP TABLE manually if intended.".into()),
949 });
950 }
951 }
952
953 for new_col in &new.columns {
955 if added_table_ids.contains(new_col.table_id.as_str()) {
956 continue;
957 }
958 let table = match new_tables.get(new_col.table_id.as_str()) {
959 Some(t) => t,
960 None => continue,
961 };
962 let sid = table.schema_id.as_deref().unwrap_or(default_new_sid);
963 let schema = schema_name_for(sid, &new_schemas);
964 let full = format!("{}.{}", quote(&schema), quote(&table.name));
965
966 if let Some(old_col) = old_columns.get(new_col.id.as_str()) {
967 if old_col.table_id != new_col.table_id {
968 steps.push(MigrationStep {
969 step: 0,
970 operation: MigrationOperation::AddColumn,
971 schema: schema.clone(),
972 table: Some(table.name.clone()),
973 object: new_col.name.clone(),
974 object_type: "column".into(),
975 description: format!("Column \"{}\" (id: {}) appears to have moved tables — manual migration required", new_col.name, new_col.id),
976 ddl: None,
977 safety: MigrationSafety::WarnOnly,
978 risk: MigrationRisk::ManualActionRequired,
979 risk_detail: Some(format!("Cannot automate column move from table {} to {}.", old_col.table_id, new_col.table_id)),
980 });
981 continue;
982 }
983
984 if old_col.name != new_col.name {
986 steps.push(MigrationStep {
987 step: 0,
988 operation: MigrationOperation::RenameColumn,
989 schema: schema.clone(),
990 table: Some(table.name.clone()),
991 object: new_col.name.clone(),
992 object_type: "column".into(),
993 description: format!(
994 "Rename column \"{}\" → \"{}\" on \"{}\".\"{}\"",
995 old_col.name, new_col.name, schema, table.name
996 ),
997 ddl: Some(format!(
998 "ALTER TABLE {} RENAME COLUMN {} TO {}",
999 full,
1000 quote(&old_col.name),
1001 quote(&new_col.name)
1002 )),
1003 safety: MigrationSafety::Safe,
1004 risk: MigrationRisk::None,
1005 risk_detail: None,
1006 });
1007 }
1008
1009 let old_type = dialect.ddl_type(&parse_canonical(&old_col.type_));
1011 let new_type = dialect.ddl_type(&parse_canonical(&new_col.type_));
1012 if old_type.to_uppercase() != new_type.to_uppercase() {
1013 let col_name = &new_col.name;
1014 steps.push(MigrationStep {
1015 step: 0,
1016 operation: MigrationOperation::AlterColumnType,
1017 schema: schema.clone(),
1018 table: Some(table.name.clone()),
1019 object: col_name.clone(),
1020 object_type: "column".into(),
1021 description: format!("Change type of \"{}\".\"{}\".\"{}\": {} → {}", schema, table.name, col_name, old_type, new_type),
1022 ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} TYPE {} USING {}::{}", full, quote(col_name), new_type, quote(col_name), new_type)),
1023 safety: MigrationSafety::BestEffort,
1024 risk: MigrationRisk::MayFail,
1025 risk_detail: Some(format!("USING {}::{} cast may fail for incompatible values. Provide a custom USING expression if needed.", col_name, new_type)),
1026 });
1027 }
1028
1029 if old_col.nullable && !new_col.nullable {
1031 if let Some(ref d) = new_col.default {
1032 let default_val = default_str(d);
1033 steps.push(MigrationStep {
1035 step: 0,
1036 operation: MigrationOperation::BackfillNulls,
1037 schema: schema.clone(),
1038 table: Some(table.name.clone()),
1039 object: new_col.name.clone(),
1040 object_type: "column".into(),
1041 description: format!("Backfill NULLs in \"{}\".\"{}\".\"{}\": SET {} = {} WHERE {} IS NULL", schema, table.name, new_col.name, new_col.name, default_val, new_col.name),
1042 ddl: Some(format!("UPDATE {} SET {} = {} WHERE {} IS NULL", full, quote(&new_col.name), default_val, quote(&new_col.name))),
1043 safety: MigrationSafety::Safe,
1044 risk: MigrationRisk::DataWillBeModified,
1045 risk_detail: Some(format!("Existing NULLs in column \"{}\" will be set to {} before NOT NULL is enforced.", new_col.name, default_val)),
1046 });
1047 steps.push(MigrationStep {
1049 step: 0,
1050 operation: MigrationOperation::SetNotNull,
1051 schema: schema.clone(),
1052 table: Some(table.name.clone()),
1053 object: new_col.name.clone(),
1054 object_type: "column".into(),
1055 description: format!("Set NOT NULL on \"{}\".\"{}\".\"{}\": NULLs pre-filled with default ({})", schema, table.name, new_col.name, default_val),
1056 ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} SET NOT NULL", full, quote(&new_col.name))),
1057 safety: MigrationSafety::Safe,
1058 risk: MigrationRisk::None,
1059 risk_detail: None,
1060 });
1061 } else {
1062 steps.push(MigrationStep {
1064 step: 0,
1065 operation: MigrationOperation::SetNotNull,
1066 schema: schema.clone(),
1067 table: Some(table.name.clone()),
1068 object: new_col.name.clone(),
1069 object_type: "column".into(),
1070 description: format!("Set NOT NULL on \"{}\".\"{}\".\"{}\": no default configured — will fail if NULLs exist", schema, table.name, new_col.name),
1071 ddl: Some(format!("ALTER TABLE {} ALTER COLUMN {} SET NOT NULL", full, quote(&new_col.name))),
1072 safety: MigrationSafety::BestEffort,
1073 risk: MigrationRisk::ExistingNullsMustBeAbsent,
1074 risk_detail: Some(format!(
1075 "No default value configured for column \"{}\". Add a default to the config to enable automatic NULL backfill before enforcing NOT NULL.",
1076 new_col.name
1077 )),
1078 });
1079 }
1080 }
1081
1082 if !old_col.nullable && new_col.nullable {
1084 steps.push(MigrationStep {
1085 step: 0,
1086 operation: MigrationOperation::DropNotNull,
1087 schema: schema.clone(),
1088 table: Some(table.name.clone()),
1089 object: new_col.name.clone(),
1090 object_type: "column".into(),
1091 description: format!(
1092 "Drop NOT NULL on \"{}\".\"{}\".\"{}\": column becomes nullable",
1093 schema, table.name, new_col.name
1094 ),
1095 ddl: Some(format!(
1096 "ALTER TABLE {} ALTER COLUMN {} DROP NOT NULL",
1097 full,
1098 quote(&new_col.name)
1099 )),
1100 safety: MigrationSafety::Safe,
1101 risk: MigrationRisk::None,
1102 risk_detail: None,
1103 });
1104 }
1105
1106 let old_def = old_col.default.as_ref().map(default_str);
1108 let new_def = new_col.default.as_ref().map(default_str);
1109 if old_def != new_def {
1110 match &new_col.default {
1111 Some(d) => {
1112 let val = default_str(d);
1113 steps.push(MigrationStep {
1114 step: 0,
1115 operation: MigrationOperation::SetDefault,
1116 schema: schema.clone(),
1117 table: Some(table.name.clone()),
1118 object: new_col.name.clone(),
1119 object_type: "column".into(),
1120 description: format!(
1121 "Set DEFAULT {} on \"{}\".\"{}\".\"{}\": was {}",
1122 val,
1123 schema,
1124 table.name,
1125 new_col.name,
1126 old_def.as_deref().unwrap_or("none")
1127 ),
1128 ddl: Some(format!(
1129 "ALTER TABLE {} ALTER COLUMN {} SET DEFAULT {}",
1130 full,
1131 quote(&new_col.name),
1132 val
1133 )),
1134 safety: MigrationSafety::Safe,
1135 risk: MigrationRisk::None,
1136 risk_detail: None,
1137 });
1138 }
1139 None => {
1140 steps.push(MigrationStep {
1141 step: 0,
1142 operation: MigrationOperation::DropDefault,
1143 schema: schema.clone(),
1144 table: Some(table.name.clone()),
1145 object: new_col.name.clone(),
1146 object_type: "column".into(),
1147 description: format!(
1148 "Drop DEFAULT on \"{}\".\"{}\".\"{}\": was {}",
1149 schema,
1150 table.name,
1151 new_col.name,
1152 old_def.as_deref().unwrap_or("none")
1153 ),
1154 ddl: Some(format!(
1155 "ALTER TABLE {} ALTER COLUMN {} DROP DEFAULT",
1156 full,
1157 quote(&new_col.name)
1158 )),
1159 safety: MigrationSafety::Safe,
1160 risk: MigrationRisk::None,
1161 risk_detail: None,
1162 });
1163 }
1164 }
1165 }
1166 } else {
1167 let new_type = dialect.ddl_type(&parse_canonical(&new_col.type_));
1169 let mut col_def = format!("{} {}", quote(&new_col.name), new_type);
1170 if !new_col.nullable {
1171 col_def.push_str(" NOT NULL");
1172 }
1173 if let Some(ref d) = new_col.default {
1174 col_def.push_str(" DEFAULT ");
1175 match d {
1176 ColumnDefaultConfig::Literal(s) => col_def.push_str(s),
1177 ColumnDefaultConfig::Expression { expression } => col_def.push_str(expression),
1178 }
1179 }
1180 steps.push(MigrationStep {
1181 step: 0,
1182 operation: MigrationOperation::AddColumn,
1183 schema: schema.clone(),
1184 table: Some(table.name.clone()),
1185 object: new_col.name.clone(),
1186 object_type: "column".into(),
1187 description: format!(
1188 "Add column \"{}\" {} to \"{}\".\"{}\"",
1189 new_col.name, new_type, schema, table.name
1190 ),
1191 ddl: Some(format!("ALTER TABLE {} ADD COLUMN {}", full, col_def)),
1192 safety: MigrationSafety::Safe,
1193 risk: MigrationRisk::None,
1194 risk_detail: None,
1195 });
1196 }
1197 }
1198
1199 for old_col in &old.columns {
1201 if new.columns.iter().any(|c| c.id == old_col.id) {
1202 continue;
1203 }
1204 if !new_tables.contains_key(old_col.table_id.as_str()) {
1205 continue;
1206 }
1207 let table_name = old_tables
1208 .get(old_col.table_id.as_str())
1209 .map(|t| t.name.as_str())
1210 .unwrap_or(&old_col.table_id);
1211 let sid = old_tables
1212 .get(old_col.table_id.as_str())
1213 .and_then(|t| t.schema_id.as_deref())
1214 .unwrap_or(default_old_sid);
1215 let schema = schema_name_for(sid, &old_schemas);
1216 steps.push(MigrationStep {
1217 step: 0,
1218 operation: MigrationOperation::DropColumn,
1219 schema: schema.clone(),
1220 table: Some(table_name.to_string()),
1221 object: old_col.name.clone(),
1222 object_type: "column".into(),
1223 description: format!("Column \"{}\" removed from config on \"{}\".\"{}\"", old_col.name, schema, table_name),
1224 ddl: None,
1225 safety: MigrationSafety::WarnOnly,
1226 risk: MigrationRisk::ManualActionRequired,
1227 risk_detail: Some("Column NOT dropped from database (data safety). Run ALTER TABLE DROP COLUMN manually if intended.".into()),
1228 });
1229 }
1230
1231 for old_idx in &old.indexes {
1233 if !new_indexes.contains_key(old_idx.id.as_str()) {
1234 let sid = old_idx.schema_id.as_deref().unwrap_or(default_old_sid);
1235 let schema = schema_name_for(sid, &old_schemas);
1236 steps.push(MigrationStep {
1237 step: 0,
1238 operation: MigrationOperation::DropIndex,
1239 schema: schema.clone(),
1240 table: old_tables
1241 .get(old_idx.table_id.as_str())
1242 .map(|t| t.name.clone()),
1243 object: old_idx.name.clone(),
1244 object_type: "index".into(),
1245 description: format!("Drop index \"{}\" in schema \"{}\"", old_idx.name, schema),
1246 ddl: Some(format!(
1247 "DROP INDEX IF EXISTS {}.{}",
1248 quote(&schema),
1249 quote(&old_idx.name)
1250 )),
1251 safety: MigrationSafety::Safe,
1252 risk: MigrationRisk::None,
1253 risk_detail: None,
1254 });
1255 }
1256 }
1257 for new_idx in &new.indexes {
1258 if old_indexes.contains_key(new_idx.id.as_str())
1259 || added_table_ids.contains(new_idx.table_id.as_str())
1260 {
1261 continue;
1262 }
1263 let sid = new_idx.schema_id.as_deref().unwrap_or(default_new_sid);
1264 let schema = match new_schemas.get(sid) {
1265 Some(s) => schema_override.unwrap_or(&s.name).to_string(),
1266 None => continue,
1267 };
1268 let table = match new_tables.get(new_idx.table_id.as_str()) {
1269 Some(t) => t,
1270 None => continue,
1271 };
1272 let full_table = format!("{}.{}", quote(&schema), quote(&table.name));
1273 let mut col_parts: Vec<String> = Vec::new();
1274 for col in &new_idx.columns {
1275 match col {
1276 IndexColumnEntry::Name(n) => col_parts.push(quote(n)),
1277 IndexColumnEntry::Spec {
1278 name, direction, ..
1279 } => {
1280 let dir = direction
1281 .as_deref()
1282 .map(|d| format!(" {}", d.to_uppercase()))
1283 .unwrap_or_default();
1284 col_parts.push(format!("{}{}", quote(name), dir));
1285 }
1286 IndexColumnEntry::Expression { expression } => col_parts.push(expression.clone()),
1287 }
1288 }
1289 let method = new_idx.method.as_deref().unwrap_or("btree");
1290 let unique_kw = if new_idx.unique { "UNIQUE " } else { "" };
1291 let include = if new_idx.include.is_empty() {
1292 String::new()
1293 } else {
1294 format!(
1295 " INCLUDE ({})",
1296 new_idx
1297 .include
1298 .iter()
1299 .map(|s| quote(s))
1300 .collect::<Vec<_>>()
1301 .join(", ")
1302 )
1303 };
1304 let where_clause = new_idx
1305 .where_
1306 .as_ref()
1307 .map(|w| format!(" WHERE {}", w))
1308 .unwrap_or_default();
1309 steps.push(MigrationStep {
1310 step: 0,
1311 operation: MigrationOperation::CreateIndex,
1312 schema: schema.clone(),
1313 table: Some(table.name.clone()),
1314 object: new_idx.name.clone(),
1315 object_type: "index".into(),
1316 description: format!(
1317 "Create {}index \"{}\" on \"{}\".\"{}\"",
1318 if new_idx.unique { "unique " } else { "" },
1319 new_idx.name,
1320 schema,
1321 table.name
1322 ),
1323 ddl: Some(format!(
1324 "CREATE {}INDEX IF NOT EXISTS {} ON {} USING {} ({}){}{}",
1325 unique_kw,
1326 quote(&new_idx.name),
1327 full_table,
1328 method,
1329 col_parts.join(", "),
1330 include,
1331 where_clause
1332 )),
1333 safety: MigrationSafety::Safe,
1334 risk: MigrationRisk::None,
1335 risk_detail: None,
1336 });
1337 }
1338
1339 for old_rel in &old.relationships {
1341 if !new_rels.contains_key(old_rel.id.as_str()) {
1342 let from_schema = old_schemas
1343 .get(old_rel.from_schema_id.as_str())
1344 .map(|s| s.name.as_str())
1345 .unwrap_or(&old_rel.from_schema_id);
1346 let from_table = old_tables
1347 .get(old_rel.from_table_id.as_str())
1348 .map(|t| t.name.as_str())
1349 .unwrap_or(&old_rel.from_table_id);
1350 let constraint = old_rel.name.as_deref().unwrap_or(&old_rel.id);
1351 let schema_q = quote(schema_override.unwrap_or(from_schema));
1352 steps.push(MigrationStep {
1353 step: 0,
1354 operation: MigrationOperation::DropForeignKey,
1355 schema: schema_override.unwrap_or(from_schema).to_string(),
1356 table: Some(from_table.to_string()),
1357 object: constraint.to_string(),
1358 object_type: "foreign_key".into(),
1359 description: format!(
1360 "Drop FK \"{}\" from \"{}\".\"{}\"",
1361 constraint,
1362 schema_override.unwrap_or(from_schema),
1363 from_table
1364 ),
1365 ddl: Some(format!(
1366 "ALTER TABLE {}.{} DROP CONSTRAINT IF EXISTS {}",
1367 schema_q,
1368 quote(from_table),
1369 quote(constraint)
1370 )),
1371 safety: MigrationSafety::Safe,
1372 risk: MigrationRisk::None,
1373 risk_detail: None,
1374 });
1375 }
1376 }
1377 for new_rel in &new.relationships {
1378 if old_rels.contains_key(new_rel.id.as_str())
1379 || added_table_ids.contains(new_rel.from_table_id.as_str())
1380 || added_table_ids.contains(new_rel.to_table_id.as_str())
1381 {
1382 continue;
1383 }
1384 let from_schema = match new_schemas.get(new_rel.from_schema_id.as_str()) {
1385 Some(s) => s,
1386 None => continue,
1387 };
1388 let from_table = match new_tables.get(new_rel.from_table_id.as_str()) {
1389 Some(t) => t,
1390 None => continue,
1391 };
1392 let to_schema = match new_schemas.get(new_rel.to_schema_id.as_str()) {
1393 Some(s) => s,
1394 None => continue,
1395 };
1396 let to_table = match new_tables.get(new_rel.to_table_id.as_str()) {
1397 Some(t) => t,
1398 None => continue,
1399 };
1400 let from_col = new
1401 .columns
1402 .iter()
1403 .find(|c| c.id == new_rel.from_column_id)
1404 .map(|c| c.name.clone())
1405 .unwrap_or_else(|| new_rel.from_column_id.clone());
1406 let to_col = new
1407 .columns
1408 .iter()
1409 .find(|c| c.id == new_rel.to_column_id)
1410 .map(|c| c.name.clone())
1411 .unwrap_or_else(|| new_rel.to_column_id.clone());
1412 let from_q = format!(
1413 "{}.{}",
1414 quote(schema_override.unwrap_or(&from_schema.name)),
1415 quote(&from_table.name)
1416 );
1417 let to_q = format!(
1418 "{}.{}",
1419 quote(schema_override.unwrap_or(&to_schema.name)),
1420 quote(&to_table.name)
1421 );
1422 let constraint = new_rel.name.as_deref().unwrap_or(&new_rel.id);
1423 let on_update = new_rel.on_update.as_deref().unwrap_or("NO ACTION");
1424 let on_delete = new_rel.on_delete.as_deref().unwrap_or("NO ACTION");
1425 steps.push(MigrationStep {
1426 step: 0,
1427 operation: MigrationOperation::AddForeignKey,
1428 schema: schema_override.unwrap_or(&from_schema.name).to_string(),
1429 table: Some(from_table.name.clone()),
1430 object: constraint.to_string(),
1431 object_type: "foreign_key".into(),
1432 description: format!("Add FK \"{}\" on \"{}\".\"{}\" → \"{}\".\"{}\"", constraint, schema_override.unwrap_or(&from_schema.name), from_table.name, schema_override.unwrap_or(&to_schema.name), to_table.name),
1433 ddl: Some(format!("ALTER TABLE {} ADD CONSTRAINT {} FOREIGN KEY ({}) REFERENCES {} ({}) ON UPDATE {} ON DELETE {}", from_q, quote(constraint), quote(&from_col), to_q, quote(&to_col), on_update, on_delete)),
1434 safety: MigrationSafety::BestEffort,
1435 risk: MigrationRisk::None,
1436 risk_detail: Some("PostgreSQL has no ADD CONSTRAINT IF NOT EXISTS; ignored if constraint already exists.".into()),
1437 });
1438 }
1439
1440 for (i, s) in steps.iter_mut().enumerate() {
1442 s.step = i + 1;
1443 }
1444
1445 Ok(MigrationPlan { steps })
1446}
1447
1448#[allow(clippy::too_many_arguments)]
1454pub async fn execute_migration_plan(
1455 migration_pool: &Pool,
1456 config_pool: &Pool,
1457 plan: &MigrationPlan,
1458 migration_plan_id: &str,
1459 package_id: &str,
1460 tenant_id: &str,
1461 from_version: Option<&str>,
1462 to_version: &str,
1463) -> Result<MigrationExecutionResult, AppError> {
1464 let mut applied = 0usize;
1465 let mut warned = 0usize;
1466 let mut warnings: Vec<String> = Vec::new();
1467
1468 for step in &plan.steps {
1469 let op = step.operation.to_string();
1470 let safety_str = format!("{:?}", step.safety);
1471 let risk_str = format!("{:?}", step.risk);
1472
1473 match step.safety {
1474 MigrationSafety::WarnOnly => {
1475 let msg = step
1476 .risk_detail
1477 .clone()
1478 .unwrap_or_else(|| step.description.clone());
1479 tracing::warn!(step = step.step, %op, "migration plan warning (no DDL)");
1480 warnings.push(format!("[Step {}] {}", step.step, msg));
1481 let _ = crate::store::insert_migration_audit(
1482 config_pool,
1483 migration_plan_id,
1484 package_id,
1485 tenant_id,
1486 from_version,
1487 to_version,
1488 step.step as i32,
1489 &op,
1490 &step.schema,
1491 step.table.as_deref(),
1492 &step.object,
1493 &step.object_type,
1494 &step.description,
1495 step.ddl.as_deref(),
1496 &safety_str,
1497 &risk_str,
1498 "skipped",
1499 None,
1500 )
1501 .await;
1502 warned += 1;
1503 }
1504 MigrationSafety::Safe | MigrationSafety::BestEffort => {
1505 if let Some(ref sql) = step.ddl {
1506 tracing::info!(step = step.step, %op, %sql, "executing migration step");
1507 match sqlx::query(sql).execute(migration_pool).await {
1508 Ok(_) => {
1509 let _ = crate::store::insert_migration_audit(
1510 config_pool,
1511 migration_plan_id,
1512 package_id,
1513 tenant_id,
1514 from_version,
1515 to_version,
1516 step.step as i32,
1517 &op,
1518 &step.schema,
1519 step.table.as_deref(),
1520 &step.object,
1521 &step.object_type,
1522 &step.description,
1523 step.ddl.as_deref(),
1524 &safety_str,
1525 &risk_str,
1526 "applied",
1527 None,
1528 )
1529 .await;
1530 applied += 1;
1531 }
1532 Err(e) => {
1533 let err_str = e.to_string();
1534 if matches!(step.safety, MigrationSafety::BestEffort) {
1535 tracing::warn!(step = step.step, %op, error = %e, "migration step failed (best-effort, continuing)");
1536 let msg = format!(
1537 "[Step {}] {} — Error: {}",
1538 step.step, step.description, err_str
1539 );
1540 warnings.push(msg);
1541 let _ = crate::store::insert_migration_audit(
1542 config_pool,
1543 migration_plan_id,
1544 package_id,
1545 tenant_id,
1546 from_version,
1547 to_version,
1548 step.step as i32,
1549 &op,
1550 &step.schema,
1551 step.table.as_deref(),
1552 &step.object,
1553 &step.object_type,
1554 &step.description,
1555 step.ddl.as_deref(),
1556 &safety_str,
1557 &risk_str,
1558 "warned",
1559 Some(&err_str),
1560 )
1561 .await;
1562 warned += 1;
1563 } else {
1564 let _ = crate::store::insert_migration_audit(
1565 config_pool,
1566 migration_plan_id,
1567 package_id,
1568 tenant_id,
1569 from_version,
1570 to_version,
1571 step.step as i32,
1572 &op,
1573 &step.schema,
1574 step.table.as_deref(),
1575 &step.object,
1576 &step.object_type,
1577 &step.description,
1578 step.ddl.as_deref(),
1579 &safety_str,
1580 &risk_str,
1581 "failed",
1582 Some(&err_str),
1583 )
1584 .await;
1585 return Err(AppError::Db(e));
1586 }
1587 }
1588 }
1589 }
1590 }
1591 }
1592 }
1593
1594 Ok(MigrationExecutionResult {
1595 applied,
1596 warned,
1597 warnings,
1598 })
1599}
1600
1601fn audit_table_ddl(
1605 schema_name: &str,
1606 table_name: &str,
1607 source_cols: &[&ColumnConfig],
1608 dialect: &dyn Dialect,
1609) -> String {
1610 let audit_name = format!("{}_audit", table_name);
1611 let audit_full = format!("{}.{}", quote(schema_name), quote(&audit_name));
1612
1613 let mut col_defs: Vec<String> = Vec::new();
1614 col_defs.push(format!(
1615 "{} {} NOT NULL DEFAULT {}",
1616 quote("audit_id"),
1617 "UUID",
1618 dialect.uuid_default_expr()
1619 ));
1620 col_defs.push(format!("{} TEXT NOT NULL", quote("audit_action")));
1621 col_defs.push(format!(
1622 "{} {} NOT NULL DEFAULT {}",
1623 quote("audit_at"),
1624 dialect.audit_timestamp_type(),
1625 dialect.now_fn()
1626 ));
1627 col_defs.push(format!("{} TEXT", quote("audit_by")));
1628 col_defs.push(format!(
1629 "{} {}",
1630 quote("changed_fields"),
1631 dialect.sys_json_type()
1632 ));
1633
1634 let config_col_names: HashSet<&str> = source_cols.iter().map(|c| c.name.as_str()).collect();
1635 for c in source_cols {
1636 let typ = dialect.ddl_type(&parse_canonical(&c.type_));
1637 col_defs.push(format!("{} {}", quote(&c.name), typ));
1638 }
1639 let audit_ts = dialect.audit_timestamp_type();
1640 for (name, typ) in [
1641 ("created_at", audit_ts),
1642 ("updated_at", audit_ts),
1643 ("archived_at", audit_ts),
1644 ("created_by", "TEXT"),
1645 ("updated_by", "TEXT"),
1646 ] {
1647 if !config_col_names.contains(name) {
1648 col_defs.push(format!("{} {}", quote(name), typ));
1649 }
1650 }
1651 col_defs.push(format!("PRIMARY KEY ({})", quote("audit_id")));
1652
1653 format!(
1654 "CREATE TABLE IF NOT EXISTS {} (\n {}\n)",
1655 audit_full,
1656 col_defs.join(",\n ")
1657 )
1658}