1use crate::db::LiveSchema;
8use crate::graph::SchemaGraph;
9use crate::parser::ParsedStatement;
10use crate::types::{DetectedOperation, FkImpact, MigrationReport, RiskLevel};
11use chrono::Utc;
12
13pub struct RiskEngine {
18 pub row_counts: std::collections::HashMap<String, u64>,
21 pub live_schema: Option<LiveSchema>,
23 pub pg_version: u32,
28}
29
30impl RiskEngine {
31 pub fn new(row_counts: std::collections::HashMap<String, u64>) -> Self {
32 Self {
33 row_counts,
34 live_schema: None,
35 pg_version: 14,
36 }
37 }
38
39 pub fn with_pg_version(mut self, version: u32) -> Self {
43 self.pg_version = version;
44 self
45 }
46
47 pub fn with_live_schema(
50 mut row_counts: std::collections::HashMap<String, u64>,
51 live: LiveSchema,
52 ) -> Self {
53 for (name, meta) in &live.tables {
55 row_counts.insert(name.clone(), meta.estimated_rows.max(0) as u64);
56 }
57 Self {
58 row_counts,
59 live_schema: Some(live),
60 pg_version: 14,
61 }
62 }
63
64 pub fn analyze(&self, file: &str, statements: &[ParsedStatement]) -> MigrationReport {
67 let mut graph = SchemaGraph::new();
68 let mut operations: Vec<DetectedOperation> = Vec::new();
69 let mut fk_impacts: Vec<FkImpact> = Vec::new();
70
71 for stmt in statements {
73 self.populate_graph(&mut graph, stmt);
74 }
75
76 for stmt in statements {
78 let ops = self.evaluate(stmt, &graph, &mut fk_impacts);
79 operations.extend(ops);
80 }
81
82 let score: u32 = operations.iter().map(|o| o.score).sum();
84 let overall_risk = RiskLevel::from_score(score);
85
86 let mut affected_tables: Vec<String> = operations
87 .iter()
88 .flat_map(|o| o.tables.iter().cloned())
89 .collect();
90 affected_tables.sort();
91 affected_tables.dedup();
92
93 let index_rebuild_required = operations.iter().any(|o| o.index_rebuild);
94 let requires_maintenance_window = overall_risk >= RiskLevel::High;
95
96 let warnings: Vec<String> = operations
97 .iter()
98 .filter_map(|o| o.warning.clone())
99 .collect();
100
101 let recommendations =
102 self.build_recommendations(&operations, &affected_tables, overall_risk);
103
104 let estimated_lock_seconds = self.estimate_lock_seconds(&operations, &affected_tables);
106
107 MigrationReport {
108 file: file.to_string(),
109 overall_risk,
110 score,
111 affected_tables,
112 operations,
113 warnings,
114 recommendations,
115 fk_impacts,
116 estimated_lock_seconds,
117 index_rebuild_required,
118 requires_maintenance_window,
119 analyzed_at: Utc::now().to_rfc3339(),
120 pg_version: self.pg_version,
121 guard_required: false,
122 guard_decisions: Vec::new(),
123 }
124 }
125
126 fn populate_graph(&self, graph: &mut SchemaGraph, stmt: &ParsedStatement) {
131 match stmt {
132 ParsedStatement::CreateTable {
133 table,
134 columns,
135 foreign_keys,
136 ..
137 } => {
138 let rows = self.row_counts.get(table).copied();
139 graph.add_table(table, rows);
140 for col in columns {
141 graph.add_column(table, &col.name, &col.data_type, col.nullable);
142 }
143 for fk in foreign_keys {
144 graph.add_foreign_key(
145 table,
146 &fk.ref_table,
147 fk.constraint_name.clone(),
148 fk.columns.clone(),
149 fk.ref_columns.clone(),
150 fk.on_delete_cascade,
151 fk.on_update_cascade,
152 );
153 }
154 }
155 ParsedStatement::AlterTableAddForeignKey { table, fk } => {
156 graph.add_foreign_key(
157 table,
158 &fk.ref_table,
159 fk.constraint_name.clone(),
160 fk.columns.clone(),
161 fk.ref_columns.clone(),
162 fk.on_delete_cascade,
163 fk.on_update_cascade,
164 );
165 }
166 ParsedStatement::CreateIndex {
167 index_name,
168 table,
169 unique,
170 ..
171 } => {
172 let name = index_name
173 .clone()
174 .unwrap_or_else(|| format!("unnamed_idx_{}", table));
175 graph.add_table(table, self.row_counts.get(table).copied());
176 graph.add_index(&name, table, *unique);
177 }
178 _ => {}
179 }
180 }
181
182 fn evaluate(
187 &self,
188 stmt: &ParsedStatement,
189 graph: &SchemaGraph,
190 fk_impacts: &mut Vec<FkImpact>,
191 ) -> Vec<DetectedOperation> {
192 match stmt {
193 ParsedStatement::DropTable {
195 tables, cascade, ..
196 } => {
197 let mut ops = Vec::new();
198 for table in tables {
199 let refs = graph.tables_referencing(table);
201 let ref_count = refs.len();
202 let downstream = graph.fk_downstream(table);
203
204 let mut score = 100u32;
205 let mut extra = String::new();
206
207 if ref_count > 0 {
208 score += (ref_count as u32) * 20;
209 extra =
210 format!(" Referenced by {} table(s): {}", ref_count, refs.join(", "));
211 for r in &refs {
212 fk_impacts.push(FkImpact {
213 constraint_name: format!("{}_fk", r),
214 from_table: r.clone(),
215 to_table: table.clone(),
216 cascade: *cascade,
217 });
218 }
219 }
220 if !downstream.is_empty() {
221 score += (downstream.len() as u32) * 10;
222 }
223
224 ops.push(DetectedOperation {
225 description: format!("DROP TABLE {}{}", table, extra),
226 tables: vec![table.clone()],
227 risk_level: RiskLevel::from_score(score),
228 score,
229 warning: Some(format!(
230 "Dropping '{}' is irreversible. Cascade: {}.{}",
231 table,
232 cascade,
233 if !downstream.is_empty() {
234 format!(" Downstream tables affected: {}", downstream.join(", "))
235 } else {
236 String::new()
237 }
238 )),
239 acquires_lock: true,
240 index_rebuild: false,
241 });
242 }
243 ops
244 }
245
246 ParsedStatement::AlterTableDropColumn { table, column, .. } => {
248 vec![DetectedOperation {
249 description: format!("ALTER TABLE {} DROP COLUMN {}", table, column),
250 tables: vec![table.clone()],
251 risk_level: RiskLevel::High,
252 score: 60,
253 warning: Some(format!(
254 "Dropping column '{}.{}' is irreversible and may break application code",
255 table, column
256 )),
257 acquires_lock: true,
258 index_rebuild: false,
259 }]
260 }
261
262 ParsedStatement::AlterTableAlterColumnType {
264 table,
265 column,
266 new_type,
267 } => {
268 let rows = self.row_counts.get(table).copied().unwrap_or(0);
269 let score = if rows > 1_000_000 { 90 } else { 40 };
270 let row_note = if rows > 0 {
271 format!(" (~{} rows)", rows)
272 } else {
273 String::new()
274 };
275 vec![DetectedOperation {
276 description: format!(
277 "ALTER TABLE {} ALTER COLUMN {} TYPE {}{}",
278 table, column, new_type, row_note
279 ),
280 tables: vec![table.clone()],
281 risk_level: RiskLevel::from_score(score),
282 score,
283 warning: Some(format!(
284 "Type change on '{}.{}' → {} requires a full table rewrite on ALL PostgreSQL versions{}. \
285 Use the 4-step zero-downtime pattern: add new column, backfill, drop old, rename.",
286 table, column, new_type, row_note
287 )),
288 acquires_lock: true,
289 index_rebuild: true,
290 }]
291 }
292
293 ParsedStatement::AlterTableAddColumn { table, column } => {
295 if !column.nullable && !column.has_default {
296 let rows = self.row_counts.get(table).copied().unwrap_or(0);
298 let score = if rows > 0 { 50 } else { 25 };
299 vec![DetectedOperation {
300 description: format!(
301 "ALTER TABLE {} ADD COLUMN {} {} NOT NULL (no default)",
302 table, column.name, column.data_type
303 ),
304 tables: vec![table.clone()],
305 risk_level: RiskLevel::from_score(score),
306 score,
307 warning: Some(format!(
308 "Adding NOT NULL column '{}.{}' without a DEFAULT will fail if the table has existing rows",
309 table, column.name
310 )),
311 acquires_lock: true,
312 index_rebuild: false,
313 }]
314 } else if column.has_default && self.pg_version < 11 {
315 let rows = self.row_counts.get(table).copied().unwrap_or(0);
317 let score = if rows > 1_000_000 { 80 } else { 45 };
318 let row_note = if rows > 0 {
319 format!(" (~{} rows)", rows)
320 } else {
321 String::new()
322 };
323 vec![DetectedOperation {
324 description: format!(
325 "ALTER TABLE {} ADD COLUMN {} {} WITH DEFAULT (PG{} — table rewrite{})",
326 table, column.name, column.data_type, self.pg_version, row_note
327 ),
328 tables: vec![table.clone()],
329 risk_level: RiskLevel::from_score(score),
330 score,
331 warning: Some(format!(
332 "PostgreSQL {} rewrites the ENTIRE table when adding a column with a DEFAULT value{}. \
333 Upgrade to PG11+ where this is a metadata-only operation.",
334 self.pg_version, row_note
335 )),
336 acquires_lock: true,
337 index_rebuild: false,
338 }]
339 } else {
340 let pg_note = if column.has_default {
342 format!(" (metadata-only on PG{})", self.pg_version)
343 } else {
344 String::new()
345 };
346 vec![DetectedOperation {
347 description: format!(
348 "ALTER TABLE {} ADD COLUMN {} {}{}",
349 table, column.name, column.data_type, pg_note
350 ),
351 tables: vec![table.clone()],
352 risk_level: RiskLevel::Low,
353 score: 5,
354 warning: None,
355 acquires_lock: false,
356 index_rebuild: false,
357 }]
358 }
359 }
360
361 ParsedStatement::CreateIndex {
363 index_name,
364 table,
365 unique,
366 concurrently,
367 columns,
368 } => {
369 let name = index_name.as_deref().unwrap_or("unnamed");
370 let rows = self.row_counts.get(table).copied().unwrap_or(0);
371 let score: u32 = if *concurrently {
372 5
373 } else if rows > 1_000_000 {
374 70
375 } else {
376 20
377 };
378
379 let warning = if !concurrently {
380 Some(format!(
381 "CREATE INDEX on '{}' without CONCURRENTLY will hold a SHARE lock for the duration of the build (cols: {})",
382 table, columns.join(", ")
383 ))
384 } else {
385 None
386 };
387
388 vec![DetectedOperation {
389 description: format!(
390 "CREATE {}INDEX {} ON {} ({})",
391 if *unique { "UNIQUE " } else { "" },
392 name,
393 table,
394 columns.join(", ")
395 ),
396 tables: vec![table.clone()],
397 risk_level: RiskLevel::from_score(score),
398 score,
399 warning,
400 acquires_lock: !concurrently,
401 index_rebuild: true,
402 }]
403 }
404
405 ParsedStatement::DropIndex {
407 names,
408 concurrently,
409 ..
410 } => {
411 let score: u32 = if *concurrently { 2 } else { 10 };
412 let warning = if !concurrently {
413 Some(format!(
414 "DROP INDEX without CONCURRENTLY acquires an ACCESS EXCLUSIVE lock: {}",
415 names.join(", ")
416 ))
417 } else {
418 None
419 };
420 vec![DetectedOperation {
421 description: format!("DROP INDEX {}", names.join(", ")),
422 tables: vec![],
423 risk_level: RiskLevel::from_score(score),
424 score,
425 warning,
426 acquires_lock: !concurrently,
427 index_rebuild: false,
428 }]
429 }
430
431 ParsedStatement::AlterTableAddForeignKey { table, fk } => {
433 let cascade_note = if fk.on_delete_cascade {
434 " (ON DELETE CASCADE)"
435 } else {
436 ""
437 };
438 let score = if fk.on_delete_cascade { 30 } else { 15 };
439 fk_impacts.push(FkImpact {
440 constraint_name: fk
441 .constraint_name
442 .clone()
443 .unwrap_or_else(|| format!("{}_fk", table)),
444 from_table: table.clone(),
445 to_table: fk.ref_table.clone(),
446 cascade: fk.on_delete_cascade,
447 });
448 vec![DetectedOperation {
449 description: format!(
450 "ADD FOREIGN KEY {}.({}) → {}.({}){}",
451 table,
452 fk.columns.join(", "),
453 fk.ref_table,
454 fk.ref_columns.join(", "),
455 cascade_note
456 ),
457 tables: vec![table.clone(), fk.ref_table.clone()],
458 risk_level: RiskLevel::from_score(score),
459 score,
460 warning: if fk.on_delete_cascade {
461 Some(format!(
462 "ON DELETE CASCADE on '{}.{}' can silently delete rows in '{}' when the parent is deleted",
463 table, fk.columns.join(", "), fk.ref_table
464 ))
465 } else {
466 None
467 },
468 acquires_lock: true,
469 index_rebuild: false,
470 }]
471 }
472
473 ParsedStatement::AlterTableDropConstraint {
475 table,
476 constraint,
477 cascade,
478 } => {
479 let score = if *cascade { 25 } else { 10 };
480 vec![DetectedOperation {
481 description: format!(
482 "ALTER TABLE {} DROP CONSTRAINT {}{}",
483 table,
484 constraint,
485 if *cascade { " CASCADE" } else { "" }
486 ),
487 tables: vec![table.clone()],
488 risk_level: RiskLevel::from_score(score),
489 score,
490 warning: if *cascade {
491 Some(format!(
492 "Dropping constraint '{}' with CASCADE may drop dependent objects",
493 constraint
494 ))
495 } else {
496 None
497 },
498 acquires_lock: true,
499 index_rebuild: false,
500 }]
501 }
502
503 ParsedStatement::AlterTableRenameColumn { table, old, new } => {
505 vec![DetectedOperation {
506 description: format!(
507 "ALTER TABLE {} RENAME COLUMN {} TO {}",
508 table, old, new
509 ),
510 tables: vec![table.clone()],
511 risk_level: RiskLevel::High,
512 score: 55,
513 warning: Some(format!(
514 "Renaming column '{}.{}' is a breaking change for any downstream code that references the old name",
515 table, old
516 )),
517 acquires_lock: true,
518 index_rebuild: false,
519 }]
520 }
521
522 ParsedStatement::AlterTableRenameTable { old, new } => {
524 vec![DetectedOperation {
525 description: format!("ALTER TABLE {} RENAME TO {}", old, new),
526 tables: vec![old.clone(), new.clone()],
527 risk_level: RiskLevel::High,
528 score: 65,
529 warning: Some(format!(
530 "Renaming table '{}' to '{}' breaks all queries, ORMs, and FK constraints referencing the old name",
531 old, new
532 )),
533 acquires_lock: true,
534 index_rebuild: false,
535 }]
536 }
537
538 ParsedStatement::AlterTableSetNotNull { table, column } => {
540 let rows = self.row_counts.get(table).copied().unwrap_or(0);
541 if self.pg_version >= 12 {
542 let score = if rows > 1_000_000 { 40 } else { 15 };
544 vec![DetectedOperation {
545 description: format!(
546 "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL (PG{})",
547 table, column, self.pg_version
548 ),
549 tables: vec![table.clone()],
550 risk_level: RiskLevel::from_score(score),
551 score,
552 warning: Some(format!(
553 "SET NOT NULL on '{}.{}' still scans the entire table on PG{}. \
554 Use a CHECK constraint with NOT VALID first, then VALIDATE CONSTRAINT \
555 to minimize lock time on large tables.",
556 table, column, self.pg_version
557 )),
558 acquires_lock: true,
559 index_rebuild: false,
560 }]
561 } else {
562 let score = if rows > 1_000_000 { 55 } else { 25 };
563 vec![DetectedOperation {
564 description: format!(
565 "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
566 table, column
567 ),
568 tables: vec![table.clone()],
569 risk_level: RiskLevel::from_score(score),
570 score,
571 warning: Some(format!(
572 "SET NOT NULL on '{}.{}' requires a full table scan to validate existing rows \
573 and holds an ACCESS EXCLUSIVE lock throughout.",
574 table, column
575 )),
576 acquires_lock: true,
577 index_rebuild: false,
578 }]
579 }
580 }
581
582 ParsedStatement::CreateTable { table, .. } => {
584 vec![DetectedOperation {
585 description: format!("CREATE TABLE {}", table),
586 tables: vec![table.clone()],
587 risk_level: RiskLevel::Low,
588 score: 2,
589 warning: None,
590 acquires_lock: false,
591 index_rebuild: false,
592 }]
593 }
594
595 ParsedStatement::AlterTableAddPrimaryKey { table, columns } => {
597 let rows = self.row_counts.get(table).copied().unwrap_or(0);
598 let score = if rows > 1_000_000 { 80 } else { 35 };
599 vec![DetectedOperation {
600 description: format!(
601 "ALTER TABLE {} ADD PRIMARY KEY ({})",
602 table,
603 columns.join(", ")
604 ),
605 tables: vec![table.clone()],
606 risk_level: RiskLevel::from_score(score),
607 score,
608 warning: Some(format!(
609 "Adding PRIMARY KEY to '{}' builds an index over the entire table",
610 table
611 )),
612 acquires_lock: true,
613 index_rebuild: true,
614 }]
615 }
616
617 ParsedStatement::Other { raw } => {
619 if raw.contains("Unmodelled DDL") {
620 vec![DetectedOperation {
622 description: raw.chars().take(100).collect(),
623 tables: vec![],
624 risk_level: RiskLevel::Medium,
625 score: 30,
626 warning: Some(
627 "Unmodelled DDL — manual review required before running".to_string(),
628 ),
629 acquires_lock: true,
630 index_rebuild: false,
631 }]
632 } else {
633 vec![]
634 }
635 }
636 _ => vec![],
637 }
638 }
639
640 fn build_recommendations(
645 &self,
646 ops: &[DetectedOperation],
647 _tables: &[String],
648 overall: RiskLevel,
649 ) -> Vec<String> {
650 let mut rec = Vec::new();
651
652 let has_drop_table = ops.iter().any(|o| o.description.contains("DROP TABLE"));
653 let has_drop_column = ops.iter().any(|o| o.description.contains("DROP COLUMN"));
654 let has_type_change = ops
655 .iter()
656 .any(|o| o.description.contains("TYPE ") && o.acquires_lock);
657 let has_index_without_concurrent = ops
658 .iter()
659 .any(|o| o.description.contains("CREATE") && o.index_rebuild && o.acquires_lock);
660 let has_not_null_no_default = ops
661 .iter()
662 .any(|o| o.description.contains("NOT NULL (no default)"));
663 let has_rename = ops.iter().any(|o| o.description.contains("RENAME"));
664 let has_cascade = ops.iter().any(|o| o.description.contains("CASCADE"));
665
666 if has_drop_table || has_drop_column {
667 rec.push("Deploy in two phases: first deploy app code that no longer reads the column/table, then drop it in a later migration".to_string());
668 rec.push("Take a full database backup before running this migration".to_string());
669 }
670
671 if has_type_change {
672 rec.push("Use a background migration: add a new column with the new type, backfill in batches, then swap and drop the old column".to_string());
673 }
674
675 if has_index_without_concurrent {
676 rec.push("Use CREATE INDEX CONCURRENTLY to build indexes without locking the table for writes".to_string());
677 }
678
679 if has_not_null_no_default {
680 rec.push("Add a DEFAULT value first, deploy the app change, then remove the default in a follow-up migration if needed".to_string());
681 }
682
683 if has_rename {
684 rec.push("Avoid renaming tables/columns in a single step; use a backward-compatible alias or view transition strategy".to_string());
685 }
686
687 if has_cascade {
688 rec.push("Review all ON DELETE CASCADE constraints — a single delete can silently remove rows across many tables".to_string());
689 }
690
691 if overall >= RiskLevel::High {
692 rec.push("Schedule this migration during a low-traffic maintenance window".to_string());
693 rec.push(
694 "Test this migration on a staging environment with production-sized data"
695 .to_string(),
696 );
697 }
698
699 if overall >= RiskLevel::Medium {
700 let large: Vec<&str> = self
701 .row_counts
702 .iter()
703 .filter(|(_, &v)| v > 100_000)
704 .map(|(k, _)| k.as_str())
705 .collect();
706 if !large.is_empty() {
707 rec.push(format!(
708 "Large tables detected ({}): consider batching long-running operations",
709 large.join(", ")
710 ));
711 }
712 }
713
714 if rec.is_empty() {
715 rec.push(
716 "No specific recommendations – this migration looks safe to deploy".to_string(),
717 );
718 }
719
720 if let Some(live) = &self.live_schema {
722 for table in _tables {
723 if let Some(meta) = live.tables.get(table) {
724 let mb = meta.total_size_bytes / (1024 * 1024);
725 if mb > 1000 {
726 rec.push(format!(
727 "Table '{}' is {} on disk — ensure you have at least 2× free disk space for any rewrite operations",
728 table, meta.total_size_pretty
729 ));
730 }
731 }
732 }
733 }
734
735 rec
736 }
737
738 fn estimate_lock_seconds(&self, ops: &[DetectedOperation], _tables: &[String]) -> Option<u64> {
743 let locking_ops: Vec<&DetectedOperation> = ops.iter().filter(|o| o.acquires_lock).collect();
744
745 if locking_ops.is_empty() {
746 return None;
747 }
748
749 let mut total_secs: u64 = 0;
751 for op in &locking_ops {
752 let mut secs = 1u64;
753 for table in &op.tables {
754 if let Some(&rows) = self.row_counts.get(table) {
755 let row_factor = rows / 100_000;
756 if op.index_rebuild {
757 secs += row_factor * 5; } else {
759 secs += row_factor;
760 }
761 }
762 }
763 total_secs += secs;
764 }
765
766 Some(total_secs.max(1))
767 }
768}