1use crate::db::LiveSchema;
8use crate::graph::SchemaGraph;
9use crate::parser::ParsedStatement;
10use crate::types::{DetectedOperation, FkImpact, MigrationReport, RiskLevel};
11use chrono::Utc;
12
13pub struct RiskEngine {
18 pub row_counts: std::collections::HashMap<String, u64>,
21 pub live_schema: Option<LiveSchema>,
23}
24
25impl RiskEngine {
26 pub fn new(row_counts: std::collections::HashMap<String, u64>) -> Self {
27 Self {
28 row_counts,
29 live_schema: None,
30 }
31 }
32
33 pub fn with_live_schema(
36 mut row_counts: std::collections::HashMap<String, u64>,
37 live: LiveSchema,
38 ) -> Self {
39 for (name, meta) in &live.tables {
41 row_counts.insert(name.clone(), meta.estimated_rows.max(0) as u64);
42 }
43 Self {
44 row_counts,
45 live_schema: Some(live),
46 }
47 }
48
49 pub fn analyze(&self, file: &str, statements: &[ParsedStatement]) -> MigrationReport {
52 let mut graph = SchemaGraph::new();
53 let mut operations: Vec<DetectedOperation> = Vec::new();
54 let mut fk_impacts: Vec<FkImpact> = Vec::new();
55
56 for stmt in statements {
58 self.populate_graph(&mut graph, stmt);
59 }
60
61 for stmt in statements {
63 let ops = self.evaluate(stmt, &graph, &mut fk_impacts);
64 operations.extend(ops);
65 }
66
67 let score: u32 = operations.iter().map(|o| o.score).sum();
69 let overall_risk = RiskLevel::from_score(score);
70
71 let mut affected_tables: Vec<String> = operations
72 .iter()
73 .flat_map(|o| o.tables.iter().cloned())
74 .collect();
75 affected_tables.sort();
76 affected_tables.dedup();
77
78 let index_rebuild_required = operations.iter().any(|o| o.index_rebuild);
79 let requires_maintenance_window = overall_risk >= RiskLevel::High;
80
81 let warnings: Vec<String> = operations
82 .iter()
83 .filter_map(|o| o.warning.clone())
84 .collect();
85
86 let recommendations =
87 self.build_recommendations(&operations, &affected_tables, overall_risk);
88
89 let estimated_lock_seconds = self.estimate_lock_seconds(&operations, &affected_tables);
91
92 MigrationReport {
93 file: file.to_string(),
94 overall_risk,
95 score,
96 affected_tables,
97 operations,
98 warnings,
99 recommendations,
100 fk_impacts,
101 estimated_lock_seconds,
102 index_rebuild_required,
103 requires_maintenance_window,
104 analyzed_at: Utc::now().to_rfc3339(),
105 guard_required: false,
106 guard_decisions: Vec::new(),
107 }
108 }
109
110 fn populate_graph(&self, graph: &mut SchemaGraph, stmt: &ParsedStatement) {
115 match stmt {
116 ParsedStatement::CreateTable {
117 table,
118 columns,
119 foreign_keys,
120 ..
121 } => {
122 let rows = self.row_counts.get(table).copied();
123 graph.add_table(table, rows);
124 for col in columns {
125 graph.add_column(table, &col.name, &col.data_type, col.nullable);
126 }
127 for fk in foreign_keys {
128 graph.add_foreign_key(
129 table,
130 &fk.ref_table,
131 fk.constraint_name.clone(),
132 fk.columns.clone(),
133 fk.ref_columns.clone(),
134 fk.on_delete_cascade,
135 fk.on_update_cascade,
136 );
137 }
138 }
139 ParsedStatement::AlterTableAddForeignKey { table, fk } => {
140 graph.add_foreign_key(
141 table,
142 &fk.ref_table,
143 fk.constraint_name.clone(),
144 fk.columns.clone(),
145 fk.ref_columns.clone(),
146 fk.on_delete_cascade,
147 fk.on_update_cascade,
148 );
149 }
150 ParsedStatement::CreateIndex {
151 index_name,
152 table,
153 unique,
154 ..
155 } => {
156 let name = index_name
157 .clone()
158 .unwrap_or_else(|| format!("unnamed_idx_{}", table));
159 graph.add_table(table, self.row_counts.get(table).copied());
160 graph.add_index(&name, table, *unique);
161 }
162 _ => {}
163 }
164 }
165
166 fn evaluate(
171 &self,
172 stmt: &ParsedStatement,
173 graph: &SchemaGraph,
174 fk_impacts: &mut Vec<FkImpact>,
175 ) -> Vec<DetectedOperation> {
176 match stmt {
177 ParsedStatement::DropTable {
179 tables, cascade, ..
180 } => {
181 let mut ops = Vec::new();
182 for table in tables {
183 let refs = graph.tables_referencing(table);
185 let ref_count = refs.len();
186 let downstream = graph.fk_downstream(table);
187
188 let mut score = 100u32;
189 let mut extra = String::new();
190
191 if ref_count > 0 {
192 score += (ref_count as u32) * 20;
193 extra =
194 format!(" Referenced by {} table(s): {}", ref_count, refs.join(", "));
195 for r in &refs {
196 fk_impacts.push(FkImpact {
197 constraint_name: format!("{}_fk", r),
198 from_table: r.clone(),
199 to_table: table.clone(),
200 cascade: *cascade,
201 });
202 }
203 }
204 if !downstream.is_empty() {
205 score += (downstream.len() as u32) * 10;
206 }
207
208 ops.push(DetectedOperation {
209 description: format!("DROP TABLE {}{}", table, extra),
210 tables: vec![table.clone()],
211 risk_level: RiskLevel::from_score(score),
212 score,
213 warning: Some(format!(
214 "Dropping '{}' is irreversible. Cascade: {}.{}",
215 table,
216 cascade,
217 if !downstream.is_empty() {
218 format!(" Downstream tables affected: {}", downstream.join(", "))
219 } else {
220 String::new()
221 }
222 )),
223 acquires_lock: true,
224 index_rebuild: false,
225 });
226 }
227 ops
228 }
229
230 ParsedStatement::AlterTableDropColumn { table, column, .. } => {
232 vec![DetectedOperation {
233 description: format!("ALTER TABLE {} DROP COLUMN {}", table, column),
234 tables: vec![table.clone()],
235 risk_level: RiskLevel::High,
236 score: 60,
237 warning: Some(format!(
238 "Dropping column '{}.{}' is irreversible and may break application code",
239 table, column
240 )),
241 acquires_lock: true,
242 index_rebuild: false,
243 }]
244 }
245
246 ParsedStatement::AlterTableAlterColumnType {
248 table,
249 column,
250 new_type,
251 } => {
252 let rows = self.row_counts.get(table).copied().unwrap_or(0);
253 let score = if rows > 1_000_000 { 90 } else { 40 };
254 let row_note = if rows > 0 {
255 format!(" (~{} rows)", rows)
256 } else {
257 String::new()
258 };
259 vec![DetectedOperation {
260 description: format!(
261 "ALTER TABLE {} ALTER COLUMN {} TYPE {}{}",
262 table, column, new_type, row_note
263 ),
264 tables: vec![table.clone()],
265 risk_level: RiskLevel::from_score(score),
266 score,
267 warning: Some(format!(
268 "Type change on '{}.{}' may cause data loss and requires a full table rewrite{}",
269 table, column, row_note
270 )),
271 acquires_lock: true,
272 index_rebuild: true,
273 }]
274 }
275
276 ParsedStatement::AlterTableAddColumn { table, column } => {
278 if !column.nullable && !column.has_default {
279 let rows = self.row_counts.get(table).copied().unwrap_or(0);
280 let score = if rows > 0 { 50 } else { 25 };
281 vec![DetectedOperation {
282 description: format!(
283 "ALTER TABLE {} ADD COLUMN {} {} NOT NULL (no default)",
284 table, column.name, column.data_type
285 ),
286 tables: vec![table.clone()],
287 risk_level: RiskLevel::from_score(score),
288 score,
289 warning: Some(format!(
290 "Adding NOT NULL column '{}.{}' without a DEFAULT will fail if the table has existing rows",
291 table, column.name
292 )),
293 acquires_lock: true,
294 index_rebuild: false,
295 }]
296 } else {
297 vec![DetectedOperation {
298 description: format!(
299 "ALTER TABLE {} ADD COLUMN {} {}",
300 table, column.name, column.data_type
301 ),
302 tables: vec![table.clone()],
303 risk_level: RiskLevel::Low,
304 score: 5,
305 warning: None,
306 acquires_lock: false,
307 index_rebuild: false,
308 }]
309 }
310 }
311
312 ParsedStatement::CreateIndex {
314 index_name,
315 table,
316 unique,
317 concurrently,
318 columns,
319 } => {
320 let name = index_name.as_deref().unwrap_or("unnamed");
321 let rows = self.row_counts.get(table).copied().unwrap_or(0);
322 let score: u32 = if *concurrently {
323 5
324 } else if rows > 1_000_000 {
325 70
326 } else {
327 20
328 };
329
330 let warning = if !concurrently {
331 Some(format!(
332 "CREATE INDEX on '{}' without CONCURRENTLY will hold a SHARE lock for the duration of the build (cols: {})",
333 table, columns.join(", ")
334 ))
335 } else {
336 None
337 };
338
339 vec![DetectedOperation {
340 description: format!(
341 "CREATE {}INDEX {} ON {} ({})",
342 if *unique { "UNIQUE " } else { "" },
343 name,
344 table,
345 columns.join(", ")
346 ),
347 tables: vec![table.clone()],
348 risk_level: RiskLevel::from_score(score),
349 score,
350 warning,
351 acquires_lock: !concurrently,
352 index_rebuild: true,
353 }]
354 }
355
356 ParsedStatement::DropIndex {
358 names,
359 concurrently,
360 ..
361 } => {
362 let score: u32 = if *concurrently { 2 } else { 10 };
363 let warning = if !concurrently {
364 Some(format!(
365 "DROP INDEX without CONCURRENTLY acquires an ACCESS EXCLUSIVE lock: {}",
366 names.join(", ")
367 ))
368 } else {
369 None
370 };
371 vec![DetectedOperation {
372 description: format!("DROP INDEX {}", names.join(", ")),
373 tables: vec![],
374 risk_level: RiskLevel::from_score(score),
375 score,
376 warning,
377 acquires_lock: !concurrently,
378 index_rebuild: false,
379 }]
380 }
381
382 ParsedStatement::AlterTableAddForeignKey { table, fk } => {
384 let cascade_note = if fk.on_delete_cascade {
385 " (ON DELETE CASCADE)"
386 } else {
387 ""
388 };
389 let score = if fk.on_delete_cascade { 30 } else { 15 };
390 fk_impacts.push(FkImpact {
391 constraint_name: fk
392 .constraint_name
393 .clone()
394 .unwrap_or_else(|| format!("{}_fk", table)),
395 from_table: table.clone(),
396 to_table: fk.ref_table.clone(),
397 cascade: fk.on_delete_cascade,
398 });
399 vec![DetectedOperation {
400 description: format!(
401 "ADD FOREIGN KEY {}.({}) → {}.({}){}",
402 table,
403 fk.columns.join(", "),
404 fk.ref_table,
405 fk.ref_columns.join(", "),
406 cascade_note
407 ),
408 tables: vec![table.clone(), fk.ref_table.clone()],
409 risk_level: RiskLevel::from_score(score),
410 score,
411 warning: if fk.on_delete_cascade {
412 Some(format!(
413 "ON DELETE CASCADE on '{}.{}' can silently delete rows in '{}' when the parent is deleted",
414 table, fk.columns.join(", "), fk.ref_table
415 ))
416 } else {
417 None
418 },
419 acquires_lock: true,
420 index_rebuild: false,
421 }]
422 }
423
424 ParsedStatement::AlterTableDropConstraint {
426 table,
427 constraint,
428 cascade,
429 } => {
430 let score = if *cascade { 25 } else { 10 };
431 vec![DetectedOperation {
432 description: format!(
433 "ALTER TABLE {} DROP CONSTRAINT {}{}",
434 table,
435 constraint,
436 if *cascade { " CASCADE" } else { "" }
437 ),
438 tables: vec![table.clone()],
439 risk_level: RiskLevel::from_score(score),
440 score,
441 warning: if *cascade {
442 Some(format!(
443 "Dropping constraint '{}' with CASCADE may drop dependent objects",
444 constraint
445 ))
446 } else {
447 None
448 },
449 acquires_lock: true,
450 index_rebuild: false,
451 }]
452 }
453
454 ParsedStatement::AlterTableRenameColumn { table, old, new } => {
456 vec![DetectedOperation {
457 description: format!(
458 "ALTER TABLE {} RENAME COLUMN {} TO {}",
459 table, old, new
460 ),
461 tables: vec![table.clone()],
462 risk_level: RiskLevel::High,
463 score: 55,
464 warning: Some(format!(
465 "Renaming column '{}.{}' is a breaking change for any downstream code that references the old name",
466 table, old
467 )),
468 acquires_lock: true,
469 index_rebuild: false,
470 }]
471 }
472
473 ParsedStatement::AlterTableRenameTable { old, new } => {
475 vec![DetectedOperation {
476 description: format!("ALTER TABLE {} RENAME TO {}", old, new),
477 tables: vec![old.clone(), new.clone()],
478 risk_level: RiskLevel::High,
479 score: 65,
480 warning: Some(format!(
481 "Renaming table '{}' to '{}' breaks all queries, ORMs, and FK constraints referencing the old name",
482 old, new
483 )),
484 acquires_lock: true,
485 index_rebuild: false,
486 }]
487 }
488
489 ParsedStatement::AlterTableSetNotNull { table, column } => {
491 let rows = self.row_counts.get(table).copied().unwrap_or(0);
492 let score = if rows > 1_000_000 { 45 } else { 20 };
493 vec![DetectedOperation {
494 description: format!(
495 "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
496 table, column
497 ),
498 tables: vec![table.clone()],
499 risk_level: RiskLevel::from_score(score),
500 score,
501 warning: Some(format!(
502 "SET NOT NULL on '{}.{}' requires a full table scan to validate existing rows",
503 table, column
504 )),
505 acquires_lock: true,
506 index_rebuild: false,
507 }]
508 }
509
510 ParsedStatement::CreateTable { table, .. } => {
512 vec![DetectedOperation {
513 description: format!("CREATE TABLE {}", table),
514 tables: vec![table.clone()],
515 risk_level: RiskLevel::Low,
516 score: 2,
517 warning: None,
518 acquires_lock: false,
519 index_rebuild: false,
520 }]
521 }
522
523 ParsedStatement::AlterTableAddPrimaryKey { table, columns } => {
525 let rows = self.row_counts.get(table).copied().unwrap_or(0);
526 let score = if rows > 1_000_000 { 80 } else { 35 };
527 vec![DetectedOperation {
528 description: format!(
529 "ALTER TABLE {} ADD PRIMARY KEY ({})",
530 table,
531 columns.join(", ")
532 ),
533 tables: vec![table.clone()],
534 risk_level: RiskLevel::from_score(score),
535 score,
536 warning: Some(format!(
537 "Adding PRIMARY KEY to '{}' builds an index over the entire table",
538 table
539 )),
540 acquires_lock: true,
541 index_rebuild: true,
542 }]
543 }
544
545 ParsedStatement::Other { raw } => {
547 if raw.contains("Unmodelled DDL") {
548 vec![DetectedOperation {
550 description: raw.chars().take(100).collect(),
551 tables: vec![],
552 risk_level: RiskLevel::Medium,
553 score: 30,
554 warning: Some(
555 "Unmodelled DDL — manual review required before running".to_string(),
556 ),
557 acquires_lock: true,
558 index_rebuild: false,
559 }]
560 } else {
561 vec![]
562 }
563 }
564 _ => vec![],
565 }
566 }
567
568 fn build_recommendations(
573 &self,
574 ops: &[DetectedOperation],
575 _tables: &[String],
576 overall: RiskLevel,
577 ) -> Vec<String> {
578 let mut rec = Vec::new();
579
580 let has_drop_table = ops.iter().any(|o| o.description.contains("DROP TABLE"));
581 let has_drop_column = ops.iter().any(|o| o.description.contains("DROP COLUMN"));
582 let has_type_change = ops
583 .iter()
584 .any(|o| o.description.contains("TYPE ") && o.acquires_lock);
585 let has_index_without_concurrent = ops
586 .iter()
587 .any(|o| o.description.contains("CREATE") && o.index_rebuild && o.acquires_lock);
588 let has_not_null_no_default = ops
589 .iter()
590 .any(|o| o.description.contains("NOT NULL (no default)"));
591 let has_rename = ops.iter().any(|o| o.description.contains("RENAME"));
592 let has_cascade = ops.iter().any(|o| o.description.contains("CASCADE"));
593
594 if has_drop_table || has_drop_column {
595 rec.push("Deploy in two phases: first deploy app code that no longer reads the column/table, then drop it in a later migration".to_string());
596 rec.push("Take a full database backup before running this migration".to_string());
597 }
598
599 if has_type_change {
600 rec.push("Use a background migration: add a new column with the new type, backfill in batches, then swap and drop the old column".to_string());
601 }
602
603 if has_index_without_concurrent {
604 rec.push("Use CREATE INDEX CONCURRENTLY to build indexes without locking the table for writes".to_string());
605 }
606
607 if has_not_null_no_default {
608 rec.push("Add a DEFAULT value first, deploy the app change, then remove the default in a follow-up migration if needed".to_string());
609 }
610
611 if has_rename {
612 rec.push("Avoid renaming tables/columns in a single step; use a backward-compatible alias or view transition strategy".to_string());
613 }
614
615 if has_cascade {
616 rec.push("Review all ON DELETE CASCADE constraints — a single delete can silently remove rows across many tables".to_string());
617 }
618
619 if overall >= RiskLevel::High {
620 rec.push("Schedule this migration during a low-traffic maintenance window".to_string());
621 rec.push(
622 "Test this migration on a staging environment with production-sized data"
623 .to_string(),
624 );
625 }
626
627 if overall >= RiskLevel::Medium {
628 let large: Vec<&str> = self
629 .row_counts
630 .iter()
631 .filter(|(_, &v)| v > 100_000)
632 .map(|(k, _)| k.as_str())
633 .collect();
634 if !large.is_empty() {
635 rec.push(format!(
636 "Large tables detected ({}): consider batching long-running operations",
637 large.join(", ")
638 ));
639 }
640 }
641
642 if rec.is_empty() {
643 rec.push(
644 "No specific recommendations – this migration looks safe to deploy".to_string(),
645 );
646 }
647
648 if let Some(live) = &self.live_schema {
650 for table in _tables {
651 if let Some(meta) = live.tables.get(table) {
652 let mb = meta.total_size_bytes / (1024 * 1024);
653 if mb > 1000 {
654 rec.push(format!(
655 "Table '{}' is {} on disk — ensure you have at least 2× free disk space for any rewrite operations",
656 table, meta.total_size_pretty
657 ));
658 }
659 }
660 }
661 }
662
663 rec
664 }
665
666 fn estimate_lock_seconds(&self, ops: &[DetectedOperation], _tables: &[String]) -> Option<u64> {
671 let locking_ops: Vec<&DetectedOperation> = ops.iter().filter(|o| o.acquires_lock).collect();
672
673 if locking_ops.is_empty() {
674 return None;
675 }
676
677 let mut total_secs: u64 = 0;
679 for op in &locking_ops {
680 let mut secs = 1u64;
681 for table in &op.tables {
682 if let Some(&rows) = self.row_counts.get(table) {
683 let row_factor = rows / 100_000;
684 if op.index_rebuild {
685 secs += row_factor * 5; } else {
687 secs += row_factor;
688 }
689 }
690 }
691 total_secs += secs;
692 }
693
694 Some(total_secs.max(1))
695 }
696}