1use crate::db::LiveSchema;
8use crate::graph::SchemaGraph;
9use crate::parser::ParsedStatement;
10use crate::types::{DetectedOperation, FkImpact, MigrationReport, RiskLevel};
11use chrono::Utc;
12use std::collections::{HashMap, HashSet};
13
14pub struct RiskEngine {
19 pub row_counts: HashMap<String, u64>,
22 pub live_schema: Option<LiveSchema>,
24 pub pg_version: u32,
29}
30
31impl RiskEngine {
32 pub fn new(row_counts: HashMap<String, u64>) -> Self {
33 Self {
34 row_counts,
35 live_schema: None,
36 pg_version: 14,
37 }
38 }
39
40 pub fn with_pg_version(mut self, version: u32) -> Self {
44 self.pg_version = version;
45 self
46 }
47
48 pub fn with_live_schema(mut row_counts: HashMap<String, u64>, live: LiveSchema) -> Self {
51 for (name, meta) in &live.tables {
53 row_counts.insert(name.clone(), meta.estimated_rows.max(0) as u64);
54 }
55 Self {
56 row_counts,
57 live_schema: Some(live),
58 pg_version: 14,
59 }
60 }
61
62 pub fn analyze(&self, file: &str, statements: &[ParsedStatement]) -> MigrationReport {
65 let mut graph = SchemaGraph::new();
66 let mut operations: Vec<DetectedOperation> = Vec::new();
67 let mut fk_impacts: Vec<FkImpact> = Vec::new();
68
69 for stmt in statements {
71 self.populate_graph(&mut graph, stmt);
72 }
73
74 for stmt in statements {
76 let ops = self.evaluate(stmt, &graph, &mut fk_impacts);
77 operations.extend(ops);
78 }
79
80 let score: u32 = operations
82 .iter()
83 .fold(0u32, |acc, operation| acc.saturating_add(operation.score));
84 let overall_risk = RiskLevel::from_score(score);
85
86 let mut affected_tables: Vec<String> = operations
87 .iter()
88 .flat_map(|o| o.tables.iter().cloned())
89 .collect();
90 affected_tables.sort();
91 affected_tables.dedup();
92
93 let index_rebuild_required = operations.iter().any(|o| o.index_rebuild);
94 let requires_maintenance_window = overall_risk >= RiskLevel::High;
95
96 let warnings: Vec<String> = Self::dedupe_preserve_order(
97 operations
98 .iter()
99 .filter_map(|o| o.warning.clone())
100 .collect(),
101 );
102
103 let recommendations = Self::dedupe_preserve_order(self.build_recommendations(
104 &operations,
105 &affected_tables,
106 overall_risk,
107 ));
108 let guard_required = operations
109 .iter()
110 .any(|o| o.score >= 40 || o.risk_level >= RiskLevel::High);
111
112 let estimated_lock_seconds = self.estimate_lock_seconds(&operations, &affected_tables);
114
115 MigrationReport {
116 file: file.to_string(),
117 overall_risk,
118 score,
119 affected_tables,
120 operations,
121 warnings,
122 recommendations,
123 fk_impacts,
124 estimated_lock_seconds,
125 index_rebuild_required,
126 requires_maintenance_window,
127 analyzed_at: Utc::now().to_rfc3339(),
128 pg_version: self.pg_version,
129 guard_required,
130 guard_decisions: Vec::new(),
131 }
132 }
133
134 fn dedupe_preserve_order(items: Vec<String>) -> Vec<String> {
135 let mut seen = HashSet::new();
136 let mut deduped = Vec::new();
137 for item in items {
138 if seen.insert(item.clone()) {
139 deduped.push(item);
140 }
141 }
142 deduped
143 }
144
145 fn populate_graph(&self, graph: &mut SchemaGraph, stmt: &ParsedStatement) {
150 match stmt {
151 ParsedStatement::CreateTable {
152 table,
153 columns,
154 foreign_keys,
155 ..
156 } => {
157 let rows = self.row_counts.get(table).copied();
158 graph.add_table(table, rows);
159 for col in columns {
160 graph.add_column(table, &col.name, &col.data_type, col.nullable);
161 }
162 for fk in foreign_keys {
163 graph.add_foreign_key(
164 table,
165 &fk.ref_table,
166 fk.constraint_name.clone(),
167 fk.columns.clone(),
168 fk.ref_columns.clone(),
169 fk.on_delete_cascade,
170 fk.on_update_cascade,
171 );
172 }
173 }
174 ParsedStatement::AlterTableAddForeignKey { table, fk } => {
175 graph.add_foreign_key(
176 table,
177 &fk.ref_table,
178 fk.constraint_name.clone(),
179 fk.columns.clone(),
180 fk.ref_columns.clone(),
181 fk.on_delete_cascade,
182 fk.on_update_cascade,
183 );
184 }
185 ParsedStatement::CreateIndex {
186 index_name,
187 table,
188 unique,
189 ..
190 } => {
191 let name = index_name
192 .clone()
193 .unwrap_or_else(|| format!("unnamed_idx_{}", table));
194 graph.add_table(table, self.row_counts.get(table).copied());
195 graph.add_index(&name, table, *unique);
196 }
197 _ => {}
198 }
199 }
200
201 fn evaluate(
206 &self,
207 stmt: &ParsedStatement,
208 graph: &SchemaGraph,
209 fk_impacts: &mut Vec<FkImpact>,
210 ) -> Vec<DetectedOperation> {
211 match stmt {
212 ParsedStatement::DropTable {
214 tables, cascade, ..
215 } => {
216 let mut ops = Vec::new();
217 for table in tables {
218 let refs = graph.tables_referencing(table);
220 let ref_count = refs.len();
221 let downstream = graph.fk_downstream(table);
222
223 let mut score = 100u32;
224 let mut extra = String::new();
225
226 if ref_count > 0 {
227 score += (ref_count as u32) * 20;
228 extra =
229 format!(" Referenced by {} table(s): {}", ref_count, refs.join(", "));
230 for r in &refs {
231 fk_impacts.push(FkImpact {
232 constraint_name: format!("{}_fk", r),
233 from_table: r.clone(),
234 to_table: table.clone(),
235 cascade: *cascade,
236 });
237 }
238 }
239 if !downstream.is_empty() {
240 score += (downstream.len() as u32) * 10;
241 }
242
243 ops.push(DetectedOperation {
244 description: format!("DROP TABLE {}{}", table, extra),
245 tables: vec![table.clone()],
246 risk_level: RiskLevel::from_score(score),
247 score,
248 warning: Some(format!(
249 "Dropping '{}' is irreversible. Cascade: {}.{}",
250 table,
251 cascade,
252 if !downstream.is_empty() {
253 format!(" Downstream tables affected: {}", downstream.join(", "))
254 } else {
255 String::new()
256 }
257 )),
258 acquires_lock: true,
259 index_rebuild: false,
260 });
261 }
262 ops
263 }
264
265 ParsedStatement::AlterTableDropColumn { table, column, .. } => {
267 vec![DetectedOperation {
268 description: format!("ALTER TABLE {} DROP COLUMN {}", table, column),
269 tables: vec![table.clone()],
270 risk_level: RiskLevel::High,
271 score: 60,
272 warning: Some(format!(
273 "Dropping column '{}.{}' is irreversible and may break application code",
274 table, column
275 )),
276 acquires_lock: true,
277 index_rebuild: false,
278 }]
279 }
280
281 ParsedStatement::AlterTableAlterColumnType {
283 table,
284 column,
285 new_type,
286 } => {
287 let rows = self.row_counts.get(table).copied().unwrap_or(0);
288 let upper_type = new_type.to_uppercase();
289
290 let is_safe_varchar_expansion = upper_type.starts_with("VARCHAR")
293 || upper_type.starts_with("CHARACTER VARYING")
294 || upper_type == "TEXT";
295 let is_safe_numeric_precision_increase =
296 upper_type.starts_with("NUMERIC") || upper_type.starts_with("DECIMAL");
297
298 let (score, risk_level, is_safe) = if is_safe_varchar_expansion {
302 (15, RiskLevel::Low, true)
304 } else if is_safe_numeric_precision_increase {
305 (30, RiskLevel::Medium, false)
308 } else if rows > 1_000_000 {
309 (90, RiskLevel::High, false)
310 } else {
311 (40, RiskLevel::Medium, false)
312 };
313
314 let row_note = if rows > 0 {
315 format!(" (~{} rows)", rows)
316 } else {
317 String::new()
318 };
319
320 let warning = if is_safe {
321 Some(format!(
322 "Type change '{}.{}' → {} is likely a metadata-only operation on PG9.2+ \
323 (expanding VARCHAR or converting to TEXT). Verify the current type is compatible.",
324 table, column, new_type
325 ))
326 } else {
327 Some(format!(
328 "Type change on '{}.{}' → {} requires a full table rewrite on ALL PostgreSQL versions{}. \
329 Use the 4-step zero-downtime pattern: add new column, backfill, drop old, rename.",
330 table, column, new_type, row_note
331 ))
332 };
333
334 vec![DetectedOperation {
335 description: format!(
336 "ALTER TABLE {} ALTER COLUMN {} TYPE {}{}",
337 table, column, new_type, row_note
338 ),
339 tables: vec![table.clone()],
340 risk_level,
341 score,
342 warning,
343 acquires_lock: !is_safe,
344 index_rebuild: !is_safe,
345 }]
346 }
347
348 ParsedStatement::AlterTableAddColumn { table, column } => {
350 if !column.nullable && !column.has_default {
351 let rows = self.row_counts.get(table).copied().unwrap_or(0);
353 let score = if rows > 0 { 50 } else { 25 };
354 vec![DetectedOperation {
355 description: format!(
356 "ALTER TABLE {} ADD COLUMN {} {} NOT NULL (no default)",
357 table, column.name, column.data_type
358 ),
359 tables: vec![table.clone()],
360 risk_level: RiskLevel::from_score(score),
361 score,
362 warning: Some(format!(
363 "Adding NOT NULL column '{}.{}' without a DEFAULT will fail if the table has existing rows",
364 table, column.name
365 )),
366 acquires_lock: true,
367 index_rebuild: false,
368 }]
369 } else if column.has_default && self.pg_version < 11 {
370 let rows = self.row_counts.get(table).copied().unwrap_or(0);
372 let score = if rows > 1_000_000 { 80 } else { 45 };
373 let row_note = if rows > 0 {
374 format!(" (~{} rows)", rows)
375 } else {
376 String::new()
377 };
378 vec![DetectedOperation {
379 description: format!(
380 "ALTER TABLE {} ADD COLUMN {} {} WITH DEFAULT (PG{} — table rewrite{})",
381 table, column.name, column.data_type, self.pg_version, row_note
382 ),
383 tables: vec![table.clone()],
384 risk_level: RiskLevel::from_score(score),
385 score,
386 warning: Some(format!(
387 "PostgreSQL {} rewrites the ENTIRE table when adding a column with a DEFAULT value{}. \
388 Upgrade to PG11+ where this is a metadata-only operation.",
389 self.pg_version, row_note
390 )),
391 acquires_lock: true,
392 index_rebuild: false,
393 }]
394 } else {
395 let pg_note = if column.has_default {
397 format!(" (metadata-only on PG{})", self.pg_version)
398 } else {
399 String::new()
400 };
401 vec![DetectedOperation {
402 description: format!(
403 "ALTER TABLE {} ADD COLUMN {} {}{}",
404 table, column.name, column.data_type, pg_note
405 ),
406 tables: vec![table.clone()],
407 risk_level: RiskLevel::Low,
408 score: 5,
409 warning: None,
410 acquires_lock: false,
411 index_rebuild: false,
412 }]
413 }
414 }
415
416 ParsedStatement::CreateIndex {
418 index_name,
419 table,
420 unique,
421 concurrently,
422 columns,
423 } => {
424 let name = index_name.as_deref().unwrap_or("unnamed");
425 let rows = self.row_counts.get(table).copied().unwrap_or(0);
426 let score: u32 = if *concurrently {
427 5
428 } else if rows > 1_000_000 {
429 70
430 } else {
431 20
432 };
433
434 let warning = if !concurrently {
435 Some(format!(
436 "CREATE INDEX on '{}' without CONCURRENTLY will hold a SHARE lock for the duration of the build (cols: {})",
437 table, columns.join(", ")
438 ))
439 } else {
440 None
441 };
442
443 vec![DetectedOperation {
444 description: format!(
445 "CREATE {}INDEX {} ON {} ({})",
446 if *unique { "UNIQUE " } else { "" },
447 name,
448 table,
449 columns.join(", ")
450 ),
451 tables: vec![table.clone()],
452 risk_level: RiskLevel::from_score(score),
453 score,
454 warning,
455 acquires_lock: !concurrently,
456 index_rebuild: true,
457 }]
458 }
459
460 ParsedStatement::DropIndex {
462 names,
463 concurrently,
464 ..
465 } => {
466 let score: u32 = if *concurrently { 2 } else { 10 };
467 let warning = if !concurrently {
468 Some(format!(
469 "DROP INDEX without CONCURRENTLY acquires an ACCESS EXCLUSIVE lock: {}",
470 names.join(", ")
471 ))
472 } else {
473 None
474 };
475 vec![DetectedOperation {
476 description: format!("DROP INDEX {}", names.join(", ")),
477 tables: vec![],
478 risk_level: RiskLevel::from_score(score),
479 score,
480 warning,
481 acquires_lock: !concurrently,
482 index_rebuild: false,
483 }]
484 }
485
486 ParsedStatement::AlterTableAddForeignKey { table, fk } => {
488 let cascade_note = if fk.on_delete_cascade {
489 " (ON DELETE CASCADE)"
490 } else {
491 ""
492 };
493 let score = if fk.on_delete_cascade { 30 } else { 15 };
494 fk_impacts.push(FkImpact {
495 constraint_name: fk
496 .constraint_name
497 .clone()
498 .unwrap_or_else(|| format!("{}_fk", table)),
499 from_table: table.clone(),
500 to_table: fk.ref_table.clone(),
501 cascade: fk.on_delete_cascade,
502 });
503 vec![DetectedOperation {
504 description: format!(
505 "ADD FOREIGN KEY {}.({}) → {}.({}){}",
506 table,
507 fk.columns.join(", "),
508 fk.ref_table,
509 fk.ref_columns.join(", "),
510 cascade_note
511 ),
512 tables: vec![table.clone(), fk.ref_table.clone()],
513 risk_level: RiskLevel::from_score(score),
514 score,
515 warning: if fk.on_delete_cascade {
516 Some(format!(
517 "ON DELETE CASCADE on '{}.{}' can silently delete rows in '{}' when the parent is deleted",
518 table, fk.columns.join(", "), fk.ref_table
519 ))
520 } else {
521 None
522 },
523 acquires_lock: true,
524 index_rebuild: false,
525 }]
526 }
527
528 ParsedStatement::AlterTableDropConstraint {
530 table,
531 constraint,
532 cascade,
533 } => {
534 let score = if *cascade { 25 } else { 10 };
535 vec![DetectedOperation {
536 description: format!(
537 "ALTER TABLE {} DROP CONSTRAINT {}{}",
538 table,
539 constraint,
540 if *cascade { " CASCADE" } else { "" }
541 ),
542 tables: vec![table.clone()],
543 risk_level: RiskLevel::from_score(score),
544 score,
545 warning: if *cascade {
546 Some(format!(
547 "Dropping constraint '{}' with CASCADE may drop dependent objects",
548 constraint
549 ))
550 } else {
551 None
552 },
553 acquires_lock: true,
554 index_rebuild: false,
555 }]
556 }
557
558 ParsedStatement::AlterTableRenameColumn { table, old, new } => {
560 vec![DetectedOperation {
561 description: format!(
562 "ALTER TABLE {} RENAME COLUMN {} TO {}",
563 table, old, new
564 ),
565 tables: vec![table.clone()],
566 risk_level: RiskLevel::High,
567 score: 55,
568 warning: Some(format!(
569 "Renaming column '{}.{}' is a breaking change for any downstream code that references the old name",
570 table, old
571 )),
572 acquires_lock: true,
573 index_rebuild: false,
574 }]
575 }
576
577 ParsedStatement::AlterTableRenameTable { old, new } => {
579 vec![DetectedOperation {
580 description: format!("ALTER TABLE {} RENAME TO {}", old, new),
581 tables: vec![old.clone(), new.clone()],
582 risk_level: RiskLevel::High,
583 score: 65,
584 warning: Some(format!(
585 "Renaming table '{}' to '{}' breaks all queries, ORMs, and FK constraints referencing the old name",
586 old, new
587 )),
588 acquires_lock: true,
589 index_rebuild: false,
590 }]
591 }
592
593 ParsedStatement::AlterTableSetNotNull { table, column } => {
595 let rows = self.row_counts.get(table).copied().unwrap_or(0);
596 if self.pg_version >= 12 {
597 let score = if rows > 1_000_000 { 40 } else { 15 };
599 vec![DetectedOperation {
600 description: format!(
601 "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL (PG{})",
602 table, column, self.pg_version
603 ),
604 tables: vec![table.clone()],
605 risk_level: RiskLevel::from_score(score),
606 score,
607 warning: Some(format!(
608 "SET NOT NULL on '{}.{}' still scans the entire table on PG{}. \
609 Use a CHECK constraint with NOT VALID first, then VALIDATE CONSTRAINT \
610 to minimize lock time on large tables.",
611 table, column, self.pg_version
612 )),
613 acquires_lock: true,
614 index_rebuild: false,
615 }]
616 } else {
617 let score = if rows > 1_000_000 { 55 } else { 25 };
618 vec![DetectedOperation {
619 description: format!(
620 "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
621 table, column
622 ),
623 tables: vec![table.clone()],
624 risk_level: RiskLevel::from_score(score),
625 score,
626 warning: Some(format!(
627 "SET NOT NULL on '{}.{}' requires a full table scan to validate existing rows \
628 and holds an ACCESS EXCLUSIVE lock throughout.",
629 table, column
630 )),
631 acquires_lock: true,
632 index_rebuild: false,
633 }]
634 }
635 }
636
637 ParsedStatement::CreateTable { table, .. } => {
639 vec![DetectedOperation {
640 description: format!("CREATE TABLE {}", table),
641 tables: vec![table.clone()],
642 risk_level: RiskLevel::Low,
643 score: 2,
644 warning: None,
645 acquires_lock: false,
646 index_rebuild: false,
647 }]
648 }
649
650 ParsedStatement::AlterTableAddPrimaryKey { table, columns } => {
652 let rows = self.row_counts.get(table).copied().unwrap_or(0);
653 let score = if rows > 1_000_000 { 80 } else { 35 };
654 vec![DetectedOperation {
655 description: format!(
656 "ALTER TABLE {} ADD PRIMARY KEY ({})",
657 table,
658 columns.join(", ")
659 ),
660 tables: vec![table.clone()],
661 risk_level: RiskLevel::from_score(score),
662 score,
663 warning: Some(format!(
664 "Adding PRIMARY KEY to '{}' builds an index over the entire table",
665 table
666 )),
667 acquires_lock: true,
668 index_rebuild: true,
669 }]
670 }
671
672 ParsedStatement::Truncate { tables, cascade } => {
674 let mut ops = Vec::new();
675 for table in tables {
676 let refs = graph.tables_referencing(table);
677 let ref_note = if !refs.is_empty() && *cascade {
678 format!(" CASCADE will also truncate: {}", refs.join(", "))
679 } else {
680 String::new()
681 };
682
683 ops.push(DetectedOperation {
684 description: format!("TRUNCATE TABLE {}{}", table, ref_note),
685 tables: vec![table.clone()],
686 risk_level: RiskLevel::Critical,
687 score: 120,
688 warning: Some(format!(
689 "TRUNCATE '{}' instantly destroys ALL data in the table.{} \
690 This cannot be undone without a backup — there is no rollback for TRUNCATE.",
691 table, ref_note
692 )),
693 acquires_lock: true,
694 index_rebuild: false,
695 });
696 }
697 ops
698 }
699
700 ParsedStatement::Reindex {
702 target_type,
703 target_name,
704 concurrently,
705 } => {
706 let score = if *concurrently { 15 } else { 80 };
707 let risk = if *concurrently {
708 RiskLevel::Low
709 } else {
710 RiskLevel::High
711 };
712
713 vec![DetectedOperation {
714 description: format!(
715 "REINDEX{} {} {}",
716 if *concurrently { " CONCURRENTLY" } else { "" },
717 target_type,
718 target_name
719 ),
720 tables: vec![target_name.clone()],
721 risk_level: risk,
722 score,
723 warning: if *concurrently {
724 Some(format!(
725 "REINDEX CONCURRENTLY on '{}' allows concurrent reads/writes (PG12+)",
726 target_name
727 ))
728 } else {
729 Some(format!(
730 "REINDEX '{}' without CONCURRENTLY holds ACCESS EXCLUSIVE lock. \
731 On large tables this can take hours. Use REINDEX CONCURRENTLY (PG12+).",
732 target_name
733 ))
734 },
735 acquires_lock: !concurrently,
736 index_rebuild: true,
737 }]
738 }
739
740 ParsedStatement::Cluster { table, index } => {
742 let desc = match (table, index) {
743 (Some(t), Some(i)) => format!("CLUSTER {} USING {}", t, i),
744 (Some(t), None) => format!("CLUSTER {}", t),
745 _ => "CLUSTER".to_string(),
746 };
747 let table_name = table.clone().unwrap_or_else(|| "ALL TABLES".to_string());
748 vec![DetectedOperation {
749 description: desc,
750 tables: vec![table_name.clone()],
751 risk_level: RiskLevel::Critical,
752 score: 100,
753 warning: Some(format!(
754 "CLUSTER completely rewrites '{}' to match index order while holding \
755 ACCESS EXCLUSIVE lock. This can take hours on large tables and blocks \
756 all reads and writes.",
757 table_name
758 )),
759 acquires_lock: true,
760 index_rebuild: true,
761 }]
762 }
763
764 ParsedStatement::Other { raw } => self.evaluate_other_statement(raw),
766 _ => vec![],
767 }
768 }
769
770 fn evaluate_other_statement(&self, raw: &str) -> Vec<DetectedOperation> {
771 let upper = raw.to_uppercase();
772 let is_flagged_unmodelled = upper.contains("UNMODELLED DDL");
773
774 let (score, warning, lock_likely) = if upper.contains("DROP DATABASE")
775 || upper.contains("DROP SCHEMA")
776 || upper.contains("TRUNCATE")
777 {
778 (
779 90,
780 "Unmodelled destructive DDL detected — high blast radius and likely irreversible"
781 .to_string(),
782 true,
783 )
784 } else if upper.contains("DROP TABLE") || upper.contains("DROP COLUMN") {
785 (
786 80,
787 "Unmodelled DROP operation detected — manual review required before execution"
788 .to_string(),
789 true,
790 )
791 } else if upper.contains("ALTER TABLE") || upper.contains("CREATE POLICY") {
792 (
793 35,
794 "Unmodelled DDL may acquire locks or change access semantics — review migration plan"
795 .to_string(),
796 true,
797 )
798 } else if is_flagged_unmodelled {
799 (
800 30,
801 "Unmodelled DDL — manual review required before running".to_string(),
802 true,
803 )
804 } else {
805 return vec![];
806 };
807
808 vec![DetectedOperation {
809 description: raw.chars().take(100).collect(),
810 tables: vec![],
811 risk_level: RiskLevel::from_score(score),
812 score,
813 warning: Some(warning),
814 acquires_lock: lock_likely,
815 index_rebuild: false,
816 }]
817 }
818
819 fn build_recommendations(
824 &self,
825 ops: &[DetectedOperation],
826 _tables: &[String],
827 overall: RiskLevel,
828 ) -> Vec<String> {
829 let mut rec = Vec::new();
830
831 let has_drop_table = ops.iter().any(|o| o.description.contains("DROP TABLE"));
832 let has_drop_column = ops.iter().any(|o| o.description.contains("DROP COLUMN"));
833 let has_type_change = ops
834 .iter()
835 .any(|o| o.description.contains("TYPE ") && o.acquires_lock);
836 let has_index_without_concurrent = ops
837 .iter()
838 .any(|o| o.description.contains("CREATE") && o.index_rebuild && o.acquires_lock);
839 let has_not_null_no_default = ops
840 .iter()
841 .any(|o| o.description.contains("NOT NULL (no default)"));
842 let has_rename = ops.iter().any(|o| o.description.contains("RENAME"));
843 let has_cascade = ops.iter().any(|o| o.description.contains("CASCADE"));
844
845 if has_drop_table || has_drop_column {
846 rec.push("Deploy in two phases: first deploy app code that no longer reads the column/table, then drop it in a later migration".to_string());
847 rec.push("Take a full database backup before running this migration".to_string());
848 }
849
850 if has_type_change {
851 rec.push("Use a background migration: add a new column with the new type, backfill in batches, then swap and drop the old column".to_string());
852 }
853
854 if has_index_without_concurrent {
855 rec.push("Use CREATE INDEX CONCURRENTLY to build indexes without locking the table for writes".to_string());
856 }
857
858 if has_not_null_no_default {
859 rec.push("Add a DEFAULT value first, deploy the app change, then remove the default in a follow-up migration if needed".to_string());
860 }
861
862 if has_rename {
863 rec.push("Avoid renaming tables/columns in a single step; use a backward-compatible alias or view transition strategy".to_string());
864 }
865
866 if has_cascade {
867 rec.push("Review all ON DELETE CASCADE constraints — a single delete can silently remove rows across many tables".to_string());
868 }
869
870 if overall >= RiskLevel::High {
871 rec.push("Schedule this migration during a low-traffic maintenance window".to_string());
872 rec.push(
873 "Test this migration on a staging environment with production-sized data"
874 .to_string(),
875 );
876 }
877
878 if overall >= RiskLevel::Medium {
879 let large: Vec<&str> = self
880 .row_counts
881 .iter()
882 .filter(|(_, &v)| v > 100_000)
883 .map(|(k, _)| k.as_str())
884 .collect();
885 if !large.is_empty() {
886 rec.push(format!(
887 "Large tables detected ({}): consider batching long-running operations",
888 large.join(", ")
889 ));
890 }
891 }
892
893 if rec.is_empty() {
894 rec.push(
895 "No specific recommendations – this migration looks safe to deploy".to_string(),
896 );
897 }
898
899 if let Some(live) = &self.live_schema {
901 for table in _tables {
902 if let Some(meta) = live.tables.get(table) {
903 let mb = meta.total_size_bytes / (1024 * 1024);
904 if mb > 1000 {
905 rec.push(format!(
906 "Table '{}' is {} on disk — ensure you have at least 2× free disk space for any rewrite operations",
907 table, meta.total_size_pretty
908 ));
909 }
910 }
911 }
912 }
913
914 rec
915 }
916
917 fn estimate_lock_seconds(&self, ops: &[DetectedOperation], _tables: &[String]) -> Option<u64> {
922 let locking_ops: Vec<&DetectedOperation> = ops.iter().filter(|o| o.acquires_lock).collect();
923
924 if locking_ops.is_empty() {
925 return None;
926 }
927
928 let mut total_secs: u64 = 0;
930 for op in &locking_ops {
931 let mut secs = 1u64;
932 for table in &op.tables {
933 if let Some(&rows) = self.row_counts.get(table) {
934 let row_factor = rows / 100_000;
935 if op.index_rebuild {
936 secs += row_factor * 5; } else {
938 secs += row_factor;
939 }
940 }
941 }
942 total_secs += secs;
943 }
944
945 Some(total_secs.max(1))
946 }
947}