Skip to main content

schema_risk/
engine.rs

1//! Risk analysis engine.
2//!
3//! Each rule is a pure function that inspects one `ParsedStatement` and
4//! returns a `DetectedOperation`.  The engine aggregates them into a final
5//! `MigrationReport`.
6
7use crate::db::LiveSchema;
8use crate::graph::SchemaGraph;
9use crate::parser::ParsedStatement;
10use crate::types::{DetectedOperation, FkImpact, MigrationReport, RiskLevel};
11use chrono::Utc;
12
13// ─────────────────────────────────────────────
14// Engine
15// ─────────────────────────────────────────────
16
17pub struct RiskEngine {
18    /// Estimated rows per table – supplied by the user via --table-rows flag
19    /// OR imported from the live database via --db-url.
20    pub row_counts: std::collections::HashMap<String, u64>,
21    /// Optional live schema snapshot fetched via --db-url.
22    pub live_schema: Option<LiveSchema>,
23    /// Target PostgreSQL major version (e.g. 14 for PG14).
24    /// Rules adapt their scoring based on this value — e.g. ADD COLUMN with
25    /// a DEFAULT is metadata-only on PG11+ but triggers a full table rewrite
26    /// on PG10 and below.  Defaults to 14 (current PostgreSQL LTS).
27    pub pg_version: u32,
28}
29
30impl RiskEngine {
31    pub fn new(row_counts: std::collections::HashMap<String, u64>) -> Self {
32        Self {
33            row_counts,
34            live_schema: None,
35            pg_version: 14,
36        }
37    }
38
39    /// Set the target PostgreSQL major version for version-aware scoring.
40    ///
41    /// Example: `.with_pg_version(11)` activates PG11+ metadata-only rules.
42    pub fn with_pg_version(mut self, version: u32) -> Self {
43        self.pg_version = version;
44        self
45    }
46
47    /// Create an engine seeded from a live database snapshot.
48    /// Row counts from `live` override any manually provided `row_counts`.
49    pub fn with_live_schema(
50        mut row_counts: std::collections::HashMap<String, u64>,
51        live: LiveSchema,
52    ) -> Self {
53        // Merge live row counts (live wins)
54        for (name, meta) in &live.tables {
55            row_counts.insert(name.clone(), meta.estimated_rows.max(0) as u64);
56        }
57        Self {
58            row_counts,
59            live_schema: Some(live),
60            pg_version: 14,
61        }
62    }
63
64    /// Run every rule against the parsed statements and build a graph, then
65    /// return the final report for the file.
66    pub fn analyze(&self, file: &str, statements: &[ParsedStatement]) -> MigrationReport {
67        let mut graph = SchemaGraph::new();
68        let mut operations: Vec<DetectedOperation> = Vec::new();
69        let mut fk_impacts: Vec<FkImpact> = Vec::new();
70
71        // ── Pass 1: populate the schema graph ────────────────────────────
72        for stmt in statements {
73            self.populate_graph(&mut graph, stmt);
74        }
75
76        // ── Pass 2: evaluate every rule ──────────────────────────────────
77        for stmt in statements {
78            let ops = self.evaluate(stmt, &graph, &mut fk_impacts);
79            operations.extend(ops);
80        }
81
82        // ── Aggregate results ────────────────────────────────────────────
83        let score: u32 = operations.iter().map(|o| o.score).sum();
84        let overall_risk = RiskLevel::from_score(score);
85
86        let mut affected_tables: Vec<String> = operations
87            .iter()
88            .flat_map(|o| o.tables.iter().cloned())
89            .collect();
90        affected_tables.sort();
91        affected_tables.dedup();
92
93        let index_rebuild_required = operations.iter().any(|o| o.index_rebuild);
94        let requires_maintenance_window = overall_risk >= RiskLevel::High;
95
96        let warnings: Vec<String> = operations
97            .iter()
98            .filter_map(|o| o.warning.clone())
99            .collect();
100
101        let recommendations =
102            self.build_recommendations(&operations, &affected_tables, overall_risk);
103
104        // Lock estimate: rough heuristic based on table size
105        let estimated_lock_seconds = self.estimate_lock_seconds(&operations, &affected_tables);
106
107        MigrationReport {
108            file: file.to_string(),
109            overall_risk,
110            score,
111            affected_tables,
112            operations,
113            warnings,
114            recommendations,
115            fk_impacts,
116            estimated_lock_seconds,
117            index_rebuild_required,
118            requires_maintenance_window,
119            analyzed_at: Utc::now().to_rfc3339(),
120            pg_version: self.pg_version,
121            guard_required: false,
122            guard_decisions: Vec::new(),
123        }
124    }
125
126    // ─────────────────────────────────────────────────────────────────────
127    // Graph population pass
128    // ─────────────────────────────────────────────────────────────────────
129
130    fn populate_graph(&self, graph: &mut SchemaGraph, stmt: &ParsedStatement) {
131        match stmt {
132            ParsedStatement::CreateTable {
133                table,
134                columns,
135                foreign_keys,
136                ..
137            } => {
138                let rows = self.row_counts.get(table).copied();
139                graph.add_table(table, rows);
140                for col in columns {
141                    graph.add_column(table, &col.name, &col.data_type, col.nullable);
142                }
143                for fk in foreign_keys {
144                    graph.add_foreign_key(
145                        table,
146                        &fk.ref_table,
147                        fk.constraint_name.clone(),
148                        fk.columns.clone(),
149                        fk.ref_columns.clone(),
150                        fk.on_delete_cascade,
151                        fk.on_update_cascade,
152                    );
153                }
154            }
155            ParsedStatement::AlterTableAddForeignKey { table, fk } => {
156                graph.add_foreign_key(
157                    table,
158                    &fk.ref_table,
159                    fk.constraint_name.clone(),
160                    fk.columns.clone(),
161                    fk.ref_columns.clone(),
162                    fk.on_delete_cascade,
163                    fk.on_update_cascade,
164                );
165            }
166            ParsedStatement::CreateIndex {
167                index_name,
168                table,
169                unique,
170                ..
171            } => {
172                let name = index_name
173                    .clone()
174                    .unwrap_or_else(|| format!("unnamed_idx_{}", table));
175                graph.add_table(table, self.row_counts.get(table).copied());
176                graph.add_index(&name, table, *unique);
177            }
178            _ => {}
179        }
180    }
181
182    // ─────────────────────────────────────────────────────────────────────
183    // Rule evaluation pass
184    // ─────────────────────────────────────────────────────────────────────
185
186    fn evaluate(
187        &self,
188        stmt: &ParsedStatement,
189        graph: &SchemaGraph,
190        fk_impacts: &mut Vec<FkImpact>,
191    ) -> Vec<DetectedOperation> {
192        match stmt {
193            // ── DROP TABLE  ──────────────────────────────────────────────────
194            ParsedStatement::DropTable {
195                tables, cascade, ..
196            } => {
197                let mut ops = Vec::new();
198                for table in tables {
199                    // Who references this table?
200                    let refs = graph.tables_referencing(table);
201                    let ref_count = refs.len();
202                    let downstream = graph.fk_downstream(table);
203
204                    let mut score = 100u32;
205                    let mut extra = String::new();
206
207                    if ref_count > 0 {
208                        score += (ref_count as u32) * 20;
209                        extra =
210                            format!(" Referenced by {} table(s): {}", ref_count, refs.join(", "));
211                        for r in &refs {
212                            fk_impacts.push(FkImpact {
213                                constraint_name: format!("{}_fk", r),
214                                from_table: r.clone(),
215                                to_table: table.clone(),
216                                cascade: *cascade,
217                            });
218                        }
219                    }
220                    if !downstream.is_empty() {
221                        score += (downstream.len() as u32) * 10;
222                    }
223
224                    ops.push(DetectedOperation {
225                        description: format!("DROP TABLE {}{}", table, extra),
226                        tables: vec![table.clone()],
227                        risk_level: RiskLevel::from_score(score),
228                        score,
229                        warning: Some(format!(
230                            "Dropping '{}' is irreversible. Cascade: {}.{}",
231                            table,
232                            cascade,
233                            if !downstream.is_empty() {
234                                format!(" Downstream tables affected: {}", downstream.join(", "))
235                            } else {
236                                String::new()
237                            }
238                        )),
239                        acquires_lock: true,
240                        index_rebuild: false,
241                    });
242                }
243                ops
244            }
245
246            // ── DROP COLUMN  ─────────────────────────────────────────────────
247            ParsedStatement::AlterTableDropColumn { table, column, .. } => {
248                vec![DetectedOperation {
249                    description: format!("ALTER TABLE {} DROP COLUMN {}", table, column),
250                    tables: vec![table.clone()],
251                    risk_level: RiskLevel::High,
252                    score: 60,
253                    warning: Some(format!(
254                        "Dropping column '{}.{}' is irreversible and may break application code",
255                        table, column
256                    )),
257                    acquires_lock: true,
258                    index_rebuild: false,
259                }]
260            }
261
262            // ── ALTER COLUMN TYPE ────────────────────────────────────────────
263            ParsedStatement::AlterTableAlterColumnType {
264                table,
265                column,
266                new_type,
267            } => {
268                let rows = self.row_counts.get(table).copied().unwrap_or(0);
269                let score = if rows > 1_000_000 { 90 } else { 40 };
270                let row_note = if rows > 0 {
271                    format!(" (~{} rows)", rows)
272                } else {
273                    String::new()
274                };
275                vec![DetectedOperation {
276                    description: format!(
277                        "ALTER TABLE {} ALTER COLUMN {} TYPE {}{}",
278                        table, column, new_type, row_note
279                    ),
280                    tables: vec![table.clone()],
281                    risk_level: RiskLevel::from_score(score),
282                    score,
283                    warning: Some(format!(
284                        "Type change on '{}.{}' → {} requires a full table rewrite on ALL PostgreSQL versions{}. \
285                         Use the 4-step zero-downtime pattern: add new column, backfill, drop old, rename.",
286                        table, column, new_type, row_note
287                    )),
288                    acquires_lock: true,
289                    index_rebuild: true,
290                }]
291            }
292
293            // ── ADD COLUMN ───────────────────────────────────────────────────
294            ParsedStatement::AlterTableAddColumn { table, column } => {
295                if !column.nullable && !column.has_default {
296                    // NOT NULL with no DEFAULT — always risky on non-empty tables
297                    let rows = self.row_counts.get(table).copied().unwrap_or(0);
298                    let score = if rows > 0 { 50 } else { 25 };
299                    vec![DetectedOperation {
300                        description: format!(
301                            "ALTER TABLE {} ADD COLUMN {} {} NOT NULL (no default)",
302                            table, column.name, column.data_type
303                        ),
304                        tables: vec![table.clone()],
305                        risk_level: RiskLevel::from_score(score),
306                        score,
307                        warning: Some(format!(
308                            "Adding NOT NULL column '{}.{}' without a DEFAULT will fail if the table has existing rows",
309                            table, column.name
310                        )),
311                        acquires_lock: true,
312                        index_rebuild: false,
313                    }]
314                } else if column.has_default && self.pg_version < 11 {
315                    // PG10 and below: ADD COLUMN WITH DEFAULT triggers a full table rewrite
316                    let rows = self.row_counts.get(table).copied().unwrap_or(0);
317                    let score = if rows > 1_000_000 { 80 } else { 45 };
318                    let row_note = if rows > 0 {
319                        format!(" (~{} rows)", rows)
320                    } else {
321                        String::new()
322                    };
323                    vec![DetectedOperation {
324                        description: format!(
325                            "ALTER TABLE {} ADD COLUMN {} {} WITH DEFAULT (PG{} — table rewrite{})",
326                            table, column.name, column.data_type, self.pg_version, row_note
327                        ),
328                        tables: vec![table.clone()],
329                        risk_level: RiskLevel::from_score(score),
330                        score,
331                        warning: Some(format!(
332                            "PostgreSQL {} rewrites the ENTIRE table when adding a column with a DEFAULT value{}. \
333                             Upgrade to PG11+ where this is a metadata-only operation.",
334                            self.pg_version, row_note
335                        )),
336                        acquires_lock: true,
337                        index_rebuild: false,
338                    }]
339                } else {
340                    // PG11+: ADD COLUMN with constant DEFAULT is metadata-only — safe!
341                    let pg_note = if column.has_default {
342                        format!(" (metadata-only on PG{})", self.pg_version)
343                    } else {
344                        String::new()
345                    };
346                    vec![DetectedOperation {
347                        description: format!(
348                            "ALTER TABLE {} ADD COLUMN {} {}{}",
349                            table, column.name, column.data_type, pg_note
350                        ),
351                        tables: vec![table.clone()],
352                        risk_level: RiskLevel::Low,
353                        score: 5,
354                        warning: None,
355                        acquires_lock: false,
356                        index_rebuild: false,
357                    }]
358                }
359            }
360
361            // ── CREATE INDEX (without CONCURRENTLY) ──────────────────────────
362            ParsedStatement::CreateIndex {
363                index_name,
364                table,
365                unique,
366                concurrently,
367                columns,
368            } => {
369                let name = index_name.as_deref().unwrap_or("unnamed");
370                let rows = self.row_counts.get(table).copied().unwrap_or(0);
371                let score: u32 = if *concurrently {
372                    5
373                } else if rows > 1_000_000 {
374                    70
375                } else {
376                    20
377                };
378
379                let warning = if !concurrently {
380                    Some(format!(
381                        "CREATE INDEX on '{}' without CONCURRENTLY will hold a SHARE lock for the duration of the build (cols: {})",
382                        table, columns.join(", ")
383                    ))
384                } else {
385                    None
386                };
387
388                vec![DetectedOperation {
389                    description: format!(
390                        "CREATE {}INDEX {} ON {} ({})",
391                        if *unique { "UNIQUE " } else { "" },
392                        name,
393                        table,
394                        columns.join(", ")
395                    ),
396                    tables: vec![table.clone()],
397                    risk_level: RiskLevel::from_score(score),
398                    score,
399                    warning,
400                    acquires_lock: !concurrently,
401                    index_rebuild: true,
402                }]
403            }
404
405            // ── DROP INDEX ───────────────────────────────────────────────────
406            ParsedStatement::DropIndex {
407                names,
408                concurrently,
409                ..
410            } => {
411                let score: u32 = if *concurrently { 2 } else { 10 };
412                let warning = if !concurrently {
413                    Some(format!(
414                        "DROP INDEX without CONCURRENTLY acquires an ACCESS EXCLUSIVE lock: {}",
415                        names.join(", ")
416                    ))
417                } else {
418                    None
419                };
420                vec![DetectedOperation {
421                    description: format!("DROP INDEX {}", names.join(", ")),
422                    tables: vec![],
423                    risk_level: RiskLevel::from_score(score),
424                    score,
425                    warning,
426                    acquires_lock: !concurrently,
427                    index_rebuild: false,
428                }]
429            }
430
431            // ── ADD FOREIGN KEY ──────────────────────────────────────────────
432            ParsedStatement::AlterTableAddForeignKey { table, fk } => {
433                let cascade_note = if fk.on_delete_cascade {
434                    " (ON DELETE CASCADE)"
435                } else {
436                    ""
437                };
438                let score = if fk.on_delete_cascade { 30 } else { 15 };
439                fk_impacts.push(FkImpact {
440                    constraint_name: fk
441                        .constraint_name
442                        .clone()
443                        .unwrap_or_else(|| format!("{}_fk", table)),
444                    from_table: table.clone(),
445                    to_table: fk.ref_table.clone(),
446                    cascade: fk.on_delete_cascade,
447                });
448                vec![DetectedOperation {
449                    description: format!(
450                        "ADD FOREIGN KEY {}.({}) → {}.({}){}",
451                        table,
452                        fk.columns.join(", "),
453                        fk.ref_table,
454                        fk.ref_columns.join(", "),
455                        cascade_note
456                    ),
457                    tables: vec![table.clone(), fk.ref_table.clone()],
458                    risk_level: RiskLevel::from_score(score),
459                    score,
460                    warning: if fk.on_delete_cascade {
461                        Some(format!(
462                            "ON DELETE CASCADE on '{}.{}' can silently delete rows in '{}' when the parent is deleted",
463                            table, fk.columns.join(", "), fk.ref_table
464                        ))
465                    } else {
466                        None
467                    },
468                    acquires_lock: true,
469                    index_rebuild: false,
470                }]
471            }
472
473            // ── DROP CONSTRAINT ──────────────────────────────────────────────
474            ParsedStatement::AlterTableDropConstraint {
475                table,
476                constraint,
477                cascade,
478            } => {
479                let score = if *cascade { 25 } else { 10 };
480                vec![DetectedOperation {
481                    description: format!(
482                        "ALTER TABLE {} DROP CONSTRAINT {}{}",
483                        table,
484                        constraint,
485                        if *cascade { " CASCADE" } else { "" }
486                    ),
487                    tables: vec![table.clone()],
488                    risk_level: RiskLevel::from_score(score),
489                    score,
490                    warning: if *cascade {
491                        Some(format!(
492                            "Dropping constraint '{}' with CASCADE may drop dependent objects",
493                            constraint
494                        ))
495                    } else {
496                        None
497                    },
498                    acquires_lock: true,
499                    index_rebuild: false,
500                }]
501            }
502
503            // ── RENAME COLUMN ────────────────────────────────────────────────
504            ParsedStatement::AlterTableRenameColumn { table, old, new } => {
505                vec![DetectedOperation {
506                    description: format!(
507                        "ALTER TABLE {} RENAME COLUMN {} TO {}",
508                        table, old, new
509                    ),
510                    tables: vec![table.clone()],
511                    risk_level: RiskLevel::High,
512                    score: 55,
513                    warning: Some(format!(
514                        "Renaming column '{}.{}' is a breaking change for any downstream code that references the old name",
515                        table, old
516                    )),
517                    acquires_lock: true,
518                    index_rebuild: false,
519                }]
520            }
521
522            // ── RENAME TABLE ─────────────────────────────────────────────────
523            ParsedStatement::AlterTableRenameTable { old, new } => {
524                vec![DetectedOperation {
525                    description: format!("ALTER TABLE {} RENAME TO {}", old, new),
526                    tables: vec![old.clone(), new.clone()],
527                    risk_level: RiskLevel::High,
528                    score: 65,
529                    warning: Some(format!(
530                        "Renaming table '{}' to '{}' breaks all queries, ORMs, and FK constraints referencing the old name",
531                        old, new
532                    )),
533                    acquires_lock: true,
534                    index_rebuild: false,
535                }]
536            }
537
538            // ── SET NOT NULL ─────────────────────────────────────────────────
539            ParsedStatement::AlterTableSetNotNull { table, column } => {
540                let rows = self.row_counts.get(table).copied().unwrap_or(0);
541                if self.pg_version >= 12 {
542                    // PG12+: can use NOT VALID CHECK constraint then VALIDATE to reduce lock window
543                    let score = if rows > 1_000_000 { 40 } else { 15 };
544                    vec![DetectedOperation {
545                        description: format!(
546                            "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL (PG{})",
547                            table, column, self.pg_version
548                        ),
549                        tables: vec![table.clone()],
550                        risk_level: RiskLevel::from_score(score),
551                        score,
552                        warning: Some(format!(
553                            "SET NOT NULL on '{}.{}' still scans the entire table on PG{}. \
554                             Use a CHECK constraint with NOT VALID first, then VALIDATE CONSTRAINT \
555                             to minimize lock time on large tables.",
556                            table, column, self.pg_version
557                        )),
558                        acquires_lock: true,
559                        index_rebuild: false,
560                    }]
561                } else {
562                    let score = if rows > 1_000_000 { 55 } else { 25 };
563                    vec![DetectedOperation {
564                        description: format!(
565                            "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
566                            table, column
567                        ),
568                        tables: vec![table.clone()],
569                        risk_level: RiskLevel::from_score(score),
570                        score,
571                        warning: Some(format!(
572                            "SET NOT NULL on '{}.{}' requires a full table scan to validate existing rows \
573                             and holds an ACCESS EXCLUSIVE lock throughout.",
574                            table, column
575                        )),
576                        acquires_lock: true,
577                        index_rebuild: false,
578                    }]
579                }
580            }
581
582            // ── CREATE TABLE ─────────────────────────────────────────────────
583            ParsedStatement::CreateTable { table, .. } => {
584                vec![DetectedOperation {
585                    description: format!("CREATE TABLE {}", table),
586                    tables: vec![table.clone()],
587                    risk_level: RiskLevel::Low,
588                    score: 2,
589                    warning: None,
590                    acquires_lock: false,
591                    index_rebuild: false,
592                }]
593            }
594
595            // ── ADD PRIMARY KEY ──────────────────────────────────────────────
596            ParsedStatement::AlterTableAddPrimaryKey { table, columns } => {
597                let rows = self.row_counts.get(table).copied().unwrap_or(0);
598                let score = if rows > 1_000_000 { 80 } else { 35 };
599                vec![DetectedOperation {
600                    description: format!(
601                        "ALTER TABLE {} ADD PRIMARY KEY ({})",
602                        table,
603                        columns.join(", ")
604                    ),
605                    tables: vec![table.clone()],
606                    risk_level: RiskLevel::from_score(score),
607                    score,
608                    warning: Some(format!(
609                        "Adding PRIMARY KEY to '{}' builds an index over the entire table",
610                        table
611                    )),
612                    acquires_lock: true,
613                    index_rebuild: true,
614                }]
615            }
616
617            // ── OTHER (unmodelled DDL) — B-01 fix ────────────────────────────
618            ParsedStatement::Other { raw } => {
619                if raw.contains("Unmodelled DDL") {
620                    // Belt-and-suspenders: parser flagged this as potentially dangerous
621                    vec![DetectedOperation {
622                        description: raw.chars().take(100).collect(),
623                        tables: vec![],
624                        risk_level: RiskLevel::Medium,
625                        score: 30,
626                        warning: Some(
627                            "Unmodelled DDL — manual review required before running".to_string(),
628                        ),
629                        acquires_lock: true,
630                        index_rebuild: false,
631                    }]
632                } else {
633                    vec![]
634                }
635            }
636            _ => vec![],
637        }
638    }
639
640    // ─────────────────────────────────────────────────────────────────────
641    // Recommendation engine
642    // ─────────────────────────────────────────────────────────────────────
643
644    fn build_recommendations(
645        &self,
646        ops: &[DetectedOperation],
647        _tables: &[String],
648        overall: RiskLevel,
649    ) -> Vec<String> {
650        let mut rec = Vec::new();
651
652        let has_drop_table = ops.iter().any(|o| o.description.contains("DROP TABLE"));
653        let has_drop_column = ops.iter().any(|o| o.description.contains("DROP COLUMN"));
654        let has_type_change = ops
655            .iter()
656            .any(|o| o.description.contains("TYPE ") && o.acquires_lock);
657        let has_index_without_concurrent = ops
658            .iter()
659            .any(|o| o.description.contains("CREATE") && o.index_rebuild && o.acquires_lock);
660        let has_not_null_no_default = ops
661            .iter()
662            .any(|o| o.description.contains("NOT NULL (no default)"));
663        let has_rename = ops.iter().any(|o| o.description.contains("RENAME"));
664        let has_cascade = ops.iter().any(|o| o.description.contains("CASCADE"));
665
666        if has_drop_table || has_drop_column {
667            rec.push("Deploy in two phases: first deploy app code that no longer reads the column/table, then drop it in a later migration".to_string());
668            rec.push("Take a full database backup before running this migration".to_string());
669        }
670
671        if has_type_change {
672            rec.push("Use a background migration: add a new column with the new type, backfill in batches, then swap and drop the old column".to_string());
673        }
674
675        if has_index_without_concurrent {
676            rec.push("Use CREATE INDEX CONCURRENTLY to build indexes without locking the table for writes".to_string());
677        }
678
679        if has_not_null_no_default {
680            rec.push("Add a DEFAULT value first, deploy the app change, then remove the default in a follow-up migration if needed".to_string());
681        }
682
683        if has_rename {
684            rec.push("Avoid renaming tables/columns in a single step; use a backward-compatible alias or view transition strategy".to_string());
685        }
686
687        if has_cascade {
688            rec.push("Review all ON DELETE CASCADE constraints — a single delete can silently remove rows across many tables".to_string());
689        }
690
691        if overall >= RiskLevel::High {
692            rec.push("Schedule this migration during a low-traffic maintenance window".to_string());
693            rec.push(
694                "Test this migration on a staging environment with production-sized data"
695                    .to_string(),
696            );
697        }
698
699        if overall >= RiskLevel::Medium {
700            let large: Vec<&str> = self
701                .row_counts
702                .iter()
703                .filter(|(_, &v)| v > 100_000)
704                .map(|(k, _)| k.as_str())
705                .collect();
706            if !large.is_empty() {
707                rec.push(format!(
708                    "Large tables detected ({}): consider batching long-running operations",
709                    large.join(", ")
710                ));
711            }
712        }
713
714        if rec.is_empty() {
715            rec.push(
716                "No specific recommendations – this migration looks safe to deploy".to_string(),
717            );
718        }
719
720        // Live-schema-aware additions
721        if let Some(live) = &self.live_schema {
722            for table in _tables {
723                if let Some(meta) = live.tables.get(table) {
724                    let mb = meta.total_size_bytes / (1024 * 1024);
725                    if mb > 1000 {
726                        rec.push(format!(
727                            "Table '{}' is {} on disk — ensure you have at least 2× free disk space for any rewrite operations",
728                            table, meta.total_size_pretty
729                        ));
730                    }
731                }
732            }
733        }
734
735        rec
736    }
737
738    // ─────────────────────────────────────────────────────────────────────
739    // Lock duration heuristic
740    // ─────────────────────────────────────────────────────────────────────
741
742    fn estimate_lock_seconds(&self, ops: &[DetectedOperation], _tables: &[String]) -> Option<u64> {
743        let locking_ops: Vec<&DetectedOperation> = ops.iter().filter(|o| o.acquires_lock).collect();
744
745        if locking_ops.is_empty() {
746            return None;
747        }
748
749        // Rough model: 1s base + 1s per 100k rows for rebuild/type-change ops
750        let mut total_secs: u64 = 0;
751        for op in &locking_ops {
752            let mut secs = 1u64;
753            for table in &op.tables {
754                if let Some(&rows) = self.row_counts.get(table) {
755                    let row_factor = rows / 100_000;
756                    if op.index_rebuild {
757                        secs += row_factor * 5; // index builds ~5s / 100k rows
758                    } else {
759                        secs += row_factor;
760                    }
761                }
762            }
763            total_secs += secs;
764        }
765
766        Some(total_secs.max(1))
767    }
768}