Skip to main content

schema_risk/
engine.rs

1//! Risk analysis engine.
2//!
3//! Each rule is a pure function that inspects one `ParsedStatement` and
4//! returns a `DetectedOperation`.  The engine aggregates them into a final
5//! `MigrationReport`.
6
7use crate::db::LiveSchema;
8use crate::graph::SchemaGraph;
9use crate::parser::ParsedStatement;
10use crate::types::{DetectedOperation, FkImpact, MigrationReport, RiskLevel};
11use chrono::Utc;
12use std::collections::{HashMap, HashSet};
13
14// ─────────────────────────────────────────────
15// Engine
16// ─────────────────────────────────────────────
17
18pub struct RiskEngine {
19    /// Estimated rows per table – supplied by the user via --table-rows flag
20    /// OR imported from the live database via --db-url.
21    pub row_counts: HashMap<String, u64>,
22    /// Optional live schema snapshot fetched via --db-url.
23    pub live_schema: Option<LiveSchema>,
24    /// Target PostgreSQL major version (e.g. 14 for PG14).
25    /// Rules adapt their scoring based on this value — e.g. ADD COLUMN with
26    /// a DEFAULT is metadata-only on PG11+ but triggers a full table rewrite
27    /// on PG10 and below.  Defaults to 14 (current PostgreSQL LTS).
28    pub pg_version: u32,
29}
30
31impl RiskEngine {
32    pub fn new(row_counts: HashMap<String, u64>) -> Self {
33        Self {
34            row_counts,
35            live_schema: None,
36            pg_version: 14,
37        }
38    }
39
40    /// Set the target PostgreSQL major version for version-aware scoring.
41    ///
42    /// Example: `.with_pg_version(11)` activates PG11+ metadata-only rules.
43    pub fn with_pg_version(mut self, version: u32) -> Self {
44        self.pg_version = version;
45        self
46    }
47
48    /// Create an engine seeded from a live database snapshot.
49    /// Row counts from `live` override any manually provided `row_counts`.
50    pub fn with_live_schema(mut row_counts: HashMap<String, u64>, live: LiveSchema) -> Self {
51        // Merge live row counts (live wins)
52        for (name, meta) in &live.tables {
53            row_counts.insert(name.clone(), meta.estimated_rows.max(0) as u64);
54        }
55        Self {
56            row_counts,
57            live_schema: Some(live),
58            pg_version: 14,
59        }
60    }
61
62    /// Run every rule against the parsed statements and build a graph, then
63    /// return the final report for the file.
64    pub fn analyze(&self, file: &str, statements: &[ParsedStatement]) -> MigrationReport {
65        let mut graph = SchemaGraph::new();
66        let mut operations: Vec<DetectedOperation> = Vec::new();
67        let mut fk_impacts: Vec<FkImpact> = Vec::new();
68
69        // ── Pass 1: populate the schema graph ────────────────────────────
70        for stmt in statements {
71            self.populate_graph(&mut graph, stmt);
72        }
73
74        // ── Pass 2: evaluate every rule ──────────────────────────────────
75        for stmt in statements {
76            let ops = self.evaluate(stmt, &graph, &mut fk_impacts);
77            operations.extend(ops);
78        }
79
80        // ── Aggregate results ────────────────────────────────────────────
81        let score: u32 = operations
82            .iter()
83            .fold(0u32, |acc, operation| acc.saturating_add(operation.score));
84        let overall_risk = RiskLevel::from_score(score);
85
86        let mut affected_tables: Vec<String> = operations
87            .iter()
88            .flat_map(|o| o.tables.iter().cloned())
89            .collect();
90        affected_tables.sort();
91        affected_tables.dedup();
92
93        let index_rebuild_required = operations.iter().any(|o| o.index_rebuild);
94        let requires_maintenance_window = overall_risk >= RiskLevel::High;
95
96        let warnings: Vec<String> = Self::dedupe_preserve_order(
97            operations
98                .iter()
99                .filter_map(|o| o.warning.clone())
100                .collect(),
101        );
102
103        let recommendations = Self::dedupe_preserve_order(self.build_recommendations(
104            &operations,
105            &affected_tables,
106            overall_risk,
107        ));
108        let guard_required = operations
109            .iter()
110            .any(|o| o.score >= 40 || o.risk_level >= RiskLevel::High);
111
112        // Lock estimate: rough heuristic based on table size
113        let estimated_lock_seconds = self.estimate_lock_seconds(&operations, &affected_tables);
114
115        MigrationReport {
116            file: file.to_string(),
117            overall_risk,
118            score,
119            affected_tables,
120            operations,
121            warnings,
122            recommendations,
123            fk_impacts,
124            estimated_lock_seconds,
125            index_rebuild_required,
126            requires_maintenance_window,
127            analyzed_at: Utc::now().to_rfc3339(),
128            pg_version: self.pg_version,
129            guard_required,
130            guard_decisions: Vec::new(),
131        }
132    }
133
134    fn dedupe_preserve_order(items: Vec<String>) -> Vec<String> {
135        let mut seen = HashSet::new();
136        let mut deduped = Vec::new();
137        for item in items {
138            if seen.insert(item.clone()) {
139                deduped.push(item);
140            }
141        }
142        deduped
143    }
144
145    // ─────────────────────────────────────────────────────────────────────
146    // Graph population pass
147    // ─────────────────────────────────────────────────────────────────────
148
149    fn populate_graph(&self, graph: &mut SchemaGraph, stmt: &ParsedStatement) {
150        match stmt {
151            ParsedStatement::CreateTable {
152                table,
153                columns,
154                foreign_keys,
155                ..
156            } => {
157                let rows = self.row_counts.get(table).copied();
158                graph.add_table(table, rows);
159                for col in columns {
160                    graph.add_column(table, &col.name, &col.data_type, col.nullable);
161                }
162                for fk in foreign_keys {
163                    graph.add_foreign_key(
164                        table,
165                        &fk.ref_table,
166                        fk.constraint_name.clone(),
167                        fk.columns.clone(),
168                        fk.ref_columns.clone(),
169                        fk.on_delete_cascade,
170                        fk.on_update_cascade,
171                    );
172                }
173            }
174            ParsedStatement::AlterTableAddForeignKey { table, fk } => {
175                graph.add_foreign_key(
176                    table,
177                    &fk.ref_table,
178                    fk.constraint_name.clone(),
179                    fk.columns.clone(),
180                    fk.ref_columns.clone(),
181                    fk.on_delete_cascade,
182                    fk.on_update_cascade,
183                );
184            }
185            ParsedStatement::CreateIndex {
186                index_name,
187                table,
188                unique,
189                ..
190            } => {
191                let name = index_name
192                    .clone()
193                    .unwrap_or_else(|| format!("unnamed_idx_{}", table));
194                graph.add_table(table, self.row_counts.get(table).copied());
195                graph.add_index(&name, table, *unique);
196            }
197            _ => {}
198        }
199    }
200
201    // ─────────────────────────────────────────────────────────────────────
202    // Rule evaluation pass
203    // ─────────────────────────────────────────────────────────────────────
204
205    fn evaluate(
206        &self,
207        stmt: &ParsedStatement,
208        graph: &SchemaGraph,
209        fk_impacts: &mut Vec<FkImpact>,
210    ) -> Vec<DetectedOperation> {
211        match stmt {
212            // ── DROP TABLE  ──────────────────────────────────────────────────
213            ParsedStatement::DropTable {
214                tables, cascade, ..
215            } => {
216                let mut ops = Vec::new();
217                for table in tables {
218                    // Who references this table?
219                    let refs = graph.tables_referencing(table);
220                    let ref_count = refs.len();
221                    let downstream = graph.fk_downstream(table);
222
223                    let mut score = 100u32;
224                    let mut extra = String::new();
225
226                    if ref_count > 0 {
227                        score += (ref_count as u32) * 20;
228                        extra =
229                            format!(" Referenced by {} table(s): {}", ref_count, refs.join(", "));
230                        for r in &refs {
231                            fk_impacts.push(FkImpact {
232                                constraint_name: format!("{}_fk", r),
233                                from_table: r.clone(),
234                                to_table: table.clone(),
235                                cascade: *cascade,
236                            });
237                        }
238                    }
239                    if !downstream.is_empty() {
240                        score += (downstream.len() as u32) * 10;
241                    }
242
243                    ops.push(DetectedOperation {
244                        description: format!("DROP TABLE {}{}", table, extra),
245                        tables: vec![table.clone()],
246                        risk_level: RiskLevel::from_score(score),
247                        score,
248                        warning: Some(format!(
249                            "Dropping '{}' is irreversible. Cascade: {}.{}",
250                            table,
251                            cascade,
252                            if !downstream.is_empty() {
253                                format!(" Downstream tables affected: {}", downstream.join(", "))
254                            } else {
255                                String::new()
256                            }
257                        )),
258                        acquires_lock: true,
259                        index_rebuild: false,
260                    });
261                }
262                ops
263            }
264
265            // ── DROP COLUMN  ─────────────────────────────────────────────────
266            ParsedStatement::AlterTableDropColumn { table, column, .. } => {
267                vec![DetectedOperation {
268                    description: format!("ALTER TABLE {} DROP COLUMN {}", table, column),
269                    tables: vec![table.clone()],
270                    risk_level: RiskLevel::High,
271                    score: 60,
272                    warning: Some(format!(
273                        "Dropping column '{}.{}' is irreversible and may break application code",
274                        table, column
275                    )),
276                    acquires_lock: true,
277                    index_rebuild: false,
278                }]
279            }
280
281            // ── ALTER COLUMN TYPE ────────────────────────────────────────────
282            ParsedStatement::AlterTableAlterColumnType {
283                table,
284                column,
285                new_type,
286            } => {
287                let rows = self.row_counts.get(table).copied().unwrap_or(0);
288                let upper_type = new_type.to_uppercase();
289
290                // Detect safe type conversions that are metadata-only on PG9.2+
291                // Reference: https://www.postgresql.org/docs/current/sql-altertable.html
292                let is_safe_varchar_expansion = upper_type.starts_with("VARCHAR")
293                    || upper_type.starts_with("CHARACTER VARYING")
294                    || upper_type == "TEXT";
295                let is_safe_numeric_precision_increase =
296                    upper_type.starts_with("NUMERIC") || upper_type.starts_with("DECIMAL");
297
298                // VARCHAR(n) -> VARCHAR(m) where m > n is metadata-only
299                // VARCHAR(n) -> TEXT is metadata-only
300                // NUMERIC(p,s) -> NUMERIC(p',s) where p' > p is metadata-only (same scale)
301                let (score, risk_level, is_safe) = if is_safe_varchar_expansion {
302                    // Expanding VARCHAR or converting to TEXT is safe
303                    (15, RiskLevel::Low, true)
304                } else if is_safe_numeric_precision_increase {
305                    // Increasing numeric precision MAY be safe (depends on scale)
306                    // We score it medium because we can't verify the old type
307                    (30, RiskLevel::Medium, false)
308                } else if rows > 1_000_000 {
309                    (90, RiskLevel::High, false)
310                } else {
311                    (40, RiskLevel::Medium, false)
312                };
313
314                let row_note = if rows > 0 {
315                    format!(" (~{} rows)", rows)
316                } else {
317                    String::new()
318                };
319
320                let warning = if is_safe {
321                    Some(format!(
322                        "Type change '{}.{}' → {} is likely a metadata-only operation on PG9.2+ \
323                         (expanding VARCHAR or converting to TEXT). Verify the current type is compatible.",
324                        table, column, new_type
325                    ))
326                } else {
327                    Some(format!(
328                        "Type change on '{}.{}' → {} requires a full table rewrite on ALL PostgreSQL versions{}. \
329                         Use the 4-step zero-downtime pattern: add new column, backfill, drop old, rename.",
330                        table, column, new_type, row_note
331                    ))
332                };
333
334                vec![DetectedOperation {
335                    description: format!(
336                        "ALTER TABLE {} ALTER COLUMN {} TYPE {}{}",
337                        table, column, new_type, row_note
338                    ),
339                    tables: vec![table.clone()],
340                    risk_level,
341                    score,
342                    warning,
343                    acquires_lock: !is_safe,
344                    index_rebuild: !is_safe,
345                }]
346            }
347
348            // ── ADD COLUMN ───────────────────────────────────────────────────
349            ParsedStatement::AlterTableAddColumn { table, column } => {
350                if !column.nullable && !column.has_default {
351                    // NOT NULL with no DEFAULT — always risky on non-empty tables
352                    let rows = self.row_counts.get(table).copied().unwrap_or(0);
353                    let score = if rows > 0 { 50 } else { 25 };
354                    vec![DetectedOperation {
355                        description: format!(
356                            "ALTER TABLE {} ADD COLUMN {} {} NOT NULL (no default)",
357                            table, column.name, column.data_type
358                        ),
359                        tables: vec![table.clone()],
360                        risk_level: RiskLevel::from_score(score),
361                        score,
362                        warning: Some(format!(
363                            "Adding NOT NULL column '{}.{}' without a DEFAULT will fail if the table has existing rows",
364                            table, column.name
365                        )),
366                        acquires_lock: true,
367                        index_rebuild: false,
368                    }]
369                } else if column.has_default && self.pg_version < 11 {
370                    // PG10 and below: ADD COLUMN WITH DEFAULT triggers a full table rewrite
371                    let rows = self.row_counts.get(table).copied().unwrap_or(0);
372                    let score = if rows > 1_000_000 { 80 } else { 45 };
373                    let row_note = if rows > 0 {
374                        format!(" (~{} rows)", rows)
375                    } else {
376                        String::new()
377                    };
378                    vec![DetectedOperation {
379                        description: format!(
380                            "ALTER TABLE {} ADD COLUMN {} {} WITH DEFAULT (PG{} — table rewrite{})",
381                            table, column.name, column.data_type, self.pg_version, row_note
382                        ),
383                        tables: vec![table.clone()],
384                        risk_level: RiskLevel::from_score(score),
385                        score,
386                        warning: Some(format!(
387                            "PostgreSQL {} rewrites the ENTIRE table when adding a column with a DEFAULT value{}. \
388                             Upgrade to PG11+ where this is a metadata-only operation.",
389                            self.pg_version, row_note
390                        )),
391                        acquires_lock: true,
392                        index_rebuild: false,
393                    }]
394                } else {
395                    // PG11+: ADD COLUMN with constant DEFAULT is metadata-only — safe!
396                    let pg_note = if column.has_default {
397                        format!(" (metadata-only on PG{})", self.pg_version)
398                    } else {
399                        String::new()
400                    };
401                    vec![DetectedOperation {
402                        description: format!(
403                            "ALTER TABLE {} ADD COLUMN {} {}{}",
404                            table, column.name, column.data_type, pg_note
405                        ),
406                        tables: vec![table.clone()],
407                        risk_level: RiskLevel::Low,
408                        score: 5,
409                        warning: None,
410                        acquires_lock: false,
411                        index_rebuild: false,
412                    }]
413                }
414            }
415
416            // ── CREATE INDEX (without CONCURRENTLY) ──────────────────────────
417            ParsedStatement::CreateIndex {
418                index_name,
419                table,
420                unique,
421                concurrently,
422                columns,
423            } => {
424                let name = index_name.as_deref().unwrap_or("unnamed");
425                let rows = self.row_counts.get(table).copied().unwrap_or(0);
426                let score: u32 = if *concurrently {
427                    5
428                } else if rows > 1_000_000 {
429                    70
430                } else {
431                    20
432                };
433
434                let warning = if !concurrently {
435                    Some(format!(
436                        "CREATE INDEX on '{}' without CONCURRENTLY will hold a SHARE lock for the duration of the build (cols: {})",
437                        table, columns.join(", ")
438                    ))
439                } else {
440                    None
441                };
442
443                vec![DetectedOperation {
444                    description: format!(
445                        "CREATE {}INDEX {} ON {} ({})",
446                        if *unique { "UNIQUE " } else { "" },
447                        name,
448                        table,
449                        columns.join(", ")
450                    ),
451                    tables: vec![table.clone()],
452                    risk_level: RiskLevel::from_score(score),
453                    score,
454                    warning,
455                    acquires_lock: !concurrently,
456                    index_rebuild: true,
457                }]
458            }
459
460            // ── DROP INDEX ───────────────────────────────────────────────────
461            ParsedStatement::DropIndex {
462                names,
463                concurrently,
464                ..
465            } => {
466                let score: u32 = if *concurrently { 2 } else { 10 };
467                let warning = if !concurrently {
468                    Some(format!(
469                        "DROP INDEX without CONCURRENTLY acquires an ACCESS EXCLUSIVE lock: {}",
470                        names.join(", ")
471                    ))
472                } else {
473                    None
474                };
475                vec![DetectedOperation {
476                    description: format!("DROP INDEX {}", names.join(", ")),
477                    tables: vec![],
478                    risk_level: RiskLevel::from_score(score),
479                    score,
480                    warning,
481                    acquires_lock: !concurrently,
482                    index_rebuild: false,
483                }]
484            }
485
486            // ── ADD FOREIGN KEY ──────────────────────────────────────────────
487            ParsedStatement::AlterTableAddForeignKey { table, fk } => {
488                let cascade_note = if fk.on_delete_cascade {
489                    " (ON DELETE CASCADE)"
490                } else {
491                    ""
492                };
493                let score = if fk.on_delete_cascade { 30 } else { 15 };
494                fk_impacts.push(FkImpact {
495                    constraint_name: fk
496                        .constraint_name
497                        .clone()
498                        .unwrap_or_else(|| format!("{}_fk", table)),
499                    from_table: table.clone(),
500                    to_table: fk.ref_table.clone(),
501                    cascade: fk.on_delete_cascade,
502                });
503                vec![DetectedOperation {
504                    description: format!(
505                        "ADD FOREIGN KEY {}.({}) → {}.({}){}",
506                        table,
507                        fk.columns.join(", "),
508                        fk.ref_table,
509                        fk.ref_columns.join(", "),
510                        cascade_note
511                    ),
512                    tables: vec![table.clone(), fk.ref_table.clone()],
513                    risk_level: RiskLevel::from_score(score),
514                    score,
515                    warning: if fk.on_delete_cascade {
516                        Some(format!(
517                            "ON DELETE CASCADE on '{}.{}' can silently delete rows in '{}' when the parent is deleted",
518                            table, fk.columns.join(", "), fk.ref_table
519                        ))
520                    } else {
521                        None
522                    },
523                    acquires_lock: true,
524                    index_rebuild: false,
525                }]
526            }
527
528            // ── DROP CONSTRAINT ──────────────────────────────────────────────
529            ParsedStatement::AlterTableDropConstraint {
530                table,
531                constraint,
532                cascade,
533            } => {
534                let score = if *cascade { 25 } else { 10 };
535                vec![DetectedOperation {
536                    description: format!(
537                        "ALTER TABLE {} DROP CONSTRAINT {}{}",
538                        table,
539                        constraint,
540                        if *cascade { " CASCADE" } else { "" }
541                    ),
542                    tables: vec![table.clone()],
543                    risk_level: RiskLevel::from_score(score),
544                    score,
545                    warning: if *cascade {
546                        Some(format!(
547                            "Dropping constraint '{}' with CASCADE may drop dependent objects",
548                            constraint
549                        ))
550                    } else {
551                        None
552                    },
553                    acquires_lock: true,
554                    index_rebuild: false,
555                }]
556            }
557
558            // ── RENAME COLUMN ────────────────────────────────────────────────
559            ParsedStatement::AlterTableRenameColumn { table, old, new } => {
560                vec![DetectedOperation {
561                    description: format!(
562                        "ALTER TABLE {} RENAME COLUMN {} TO {}",
563                        table, old, new
564                    ),
565                    tables: vec![table.clone()],
566                    risk_level: RiskLevel::High,
567                    score: 55,
568                    warning: Some(format!(
569                        "Renaming column '{}.{}' is a breaking change for any downstream code that references the old name",
570                        table, old
571                    )),
572                    acquires_lock: true,
573                    index_rebuild: false,
574                }]
575            }
576
577            // ── RENAME TABLE ─────────────────────────────────────────────────
578            ParsedStatement::AlterTableRenameTable { old, new } => {
579                vec![DetectedOperation {
580                    description: format!("ALTER TABLE {} RENAME TO {}", old, new),
581                    tables: vec![old.clone(), new.clone()],
582                    risk_level: RiskLevel::High,
583                    score: 65,
584                    warning: Some(format!(
585                        "Renaming table '{}' to '{}' breaks all queries, ORMs, and FK constraints referencing the old name",
586                        old, new
587                    )),
588                    acquires_lock: true,
589                    index_rebuild: false,
590                }]
591            }
592
593            // ── SET NOT NULL ─────────────────────────────────────────────────
594            ParsedStatement::AlterTableSetNotNull { table, column } => {
595                let rows = self.row_counts.get(table).copied().unwrap_or(0);
596                if self.pg_version >= 12 {
597                    // PG12+: can use NOT VALID CHECK constraint then VALIDATE to reduce lock window
598                    let score = if rows > 1_000_000 { 40 } else { 15 };
599                    vec![DetectedOperation {
600                        description: format!(
601                            "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL (PG{})",
602                            table, column, self.pg_version
603                        ),
604                        tables: vec![table.clone()],
605                        risk_level: RiskLevel::from_score(score),
606                        score,
607                        warning: Some(format!(
608                            "SET NOT NULL on '{}.{}' still scans the entire table on PG{}. \
609                             Use a CHECK constraint with NOT VALID first, then VALIDATE CONSTRAINT \
610                             to minimize lock time on large tables.",
611                            table, column, self.pg_version
612                        )),
613                        acquires_lock: true,
614                        index_rebuild: false,
615                    }]
616                } else {
617                    let score = if rows > 1_000_000 { 55 } else { 25 };
618                    vec![DetectedOperation {
619                        description: format!(
620                            "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
621                            table, column
622                        ),
623                        tables: vec![table.clone()],
624                        risk_level: RiskLevel::from_score(score),
625                        score,
626                        warning: Some(format!(
627                            "SET NOT NULL on '{}.{}' requires a full table scan to validate existing rows \
628                             and holds an ACCESS EXCLUSIVE lock throughout.",
629                            table, column
630                        )),
631                        acquires_lock: true,
632                        index_rebuild: false,
633                    }]
634                }
635            }
636
637            // ── CREATE TABLE ─────────────────────────────────────────────────
638            ParsedStatement::CreateTable { table, .. } => {
639                vec![DetectedOperation {
640                    description: format!("CREATE TABLE {}", table),
641                    tables: vec![table.clone()],
642                    risk_level: RiskLevel::Low,
643                    score: 2,
644                    warning: None,
645                    acquires_lock: false,
646                    index_rebuild: false,
647                }]
648            }
649
650            // ── ADD PRIMARY KEY ──────────────────────────────────────────────
651            ParsedStatement::AlterTableAddPrimaryKey { table, columns } => {
652                let rows = self.row_counts.get(table).copied().unwrap_or(0);
653                let score = if rows > 1_000_000 { 80 } else { 35 };
654                vec![DetectedOperation {
655                    description: format!(
656                        "ALTER TABLE {} ADD PRIMARY KEY ({})",
657                        table,
658                        columns.join(", ")
659                    ),
660                    tables: vec![table.clone()],
661                    risk_level: RiskLevel::from_score(score),
662                    score,
663                    warning: Some(format!(
664                        "Adding PRIMARY KEY to '{}' builds an index over the entire table",
665                        table
666                    )),
667                    acquires_lock: true,
668                    index_rebuild: true,
669                }]
670            }
671
672            // ── TRUNCATE ─────────────────────────────────────────────────────
673            ParsedStatement::Truncate { tables, cascade } => {
674                let mut ops = Vec::new();
675                for table in tables {
676                    let refs = graph.tables_referencing(table);
677                    let ref_note = if !refs.is_empty() && *cascade {
678                        format!(" CASCADE will also truncate: {}", refs.join(", "))
679                    } else {
680                        String::new()
681                    };
682
683                    ops.push(DetectedOperation {
684                        description: format!("TRUNCATE TABLE {}{}", table, ref_note),
685                        tables: vec![table.clone()],
686                        risk_level: RiskLevel::Critical,
687                        score: 120,
688                        warning: Some(format!(
689                            "TRUNCATE '{}' instantly destroys ALL data in the table.{} \
690                             This cannot be undone without a backup — there is no rollback for TRUNCATE.",
691                            table, ref_note
692                        )),
693                        acquires_lock: true,
694                        index_rebuild: false,
695                    });
696                }
697                ops
698            }
699
700            // ── REINDEX ──────────────────────────────────────────────────────
701            ParsedStatement::Reindex {
702                target_type,
703                target_name,
704                concurrently,
705            } => {
706                let score = if *concurrently { 15 } else { 80 };
707                let risk = if *concurrently {
708                    RiskLevel::Low
709                } else {
710                    RiskLevel::High
711                };
712
713                vec![DetectedOperation {
714                    description: format!(
715                        "REINDEX{} {} {}",
716                        if *concurrently { " CONCURRENTLY" } else { "" },
717                        target_type,
718                        target_name
719                    ),
720                    tables: vec![target_name.clone()],
721                    risk_level: risk,
722                    score,
723                    warning: if *concurrently {
724                        Some(format!(
725                            "REINDEX CONCURRENTLY on '{}' allows concurrent reads/writes (PG12+)",
726                            target_name
727                        ))
728                    } else {
729                        Some(format!(
730                            "REINDEX '{}' without CONCURRENTLY holds ACCESS EXCLUSIVE lock. \
731                             On large tables this can take hours. Use REINDEX CONCURRENTLY (PG12+).",
732                            target_name
733                        ))
734                    },
735                    acquires_lock: !concurrently,
736                    index_rebuild: true,
737                }]
738            }
739
740            // ── CLUSTER ──────────────────────────────────────────────────────
741            ParsedStatement::Cluster { table, index } => {
742                let desc = match (table, index) {
743                    (Some(t), Some(i)) => format!("CLUSTER {} USING {}", t, i),
744                    (Some(t), None) => format!("CLUSTER {}", t),
745                    _ => "CLUSTER".to_string(),
746                };
747                let table_name = table.clone().unwrap_or_else(|| "ALL TABLES".to_string());
748                vec![DetectedOperation {
749                    description: desc,
750                    tables: vec![table_name.clone()],
751                    risk_level: RiskLevel::Critical,
752                    score: 100,
753                    warning: Some(format!(
754                        "CLUSTER completely rewrites '{}' to match index order while holding \
755                         ACCESS EXCLUSIVE lock. This can take hours on large tables and blocks \
756                         all reads and writes.",
757                        table_name
758                    )),
759                    acquires_lock: true,
760                    index_rebuild: true,
761                }]
762            }
763
764            // ── OTHER (unmodelled DDL) — B-01 fix ────────────────────────────
765            ParsedStatement::Other { raw } => self.evaluate_other_statement(raw),
766            _ => vec![],
767        }
768    }
769
770    fn evaluate_other_statement(&self, raw: &str) -> Vec<DetectedOperation> {
771        let upper = raw.to_uppercase();
772        let is_flagged_unmodelled = upper.contains("UNMODELLED DDL");
773
774        let (score, warning, lock_likely) = if upper.contains("DROP DATABASE")
775            || upper.contains("DROP SCHEMA")
776            || upper.contains("TRUNCATE")
777        {
778            (
779                90,
780                "Unmodelled destructive DDL detected — high blast radius and likely irreversible"
781                    .to_string(),
782                true,
783            )
784        } else if upper.contains("DROP TABLE") || upper.contains("DROP COLUMN") {
785            (
786                80,
787                "Unmodelled DROP operation detected — manual review required before execution"
788                    .to_string(),
789                true,
790            )
791        } else if upper.contains("ALTER TABLE") || upper.contains("CREATE POLICY") {
792            (
793                35,
794                "Unmodelled DDL may acquire locks or change access semantics — review migration plan"
795                    .to_string(),
796                true,
797            )
798        } else if is_flagged_unmodelled {
799            (
800                30,
801                "Unmodelled DDL — manual review required before running".to_string(),
802                true,
803            )
804        } else {
805            return vec![];
806        };
807
808        vec![DetectedOperation {
809            description: raw.chars().take(100).collect(),
810            tables: vec![],
811            risk_level: RiskLevel::from_score(score),
812            score,
813            warning: Some(warning),
814            acquires_lock: lock_likely,
815            index_rebuild: false,
816        }]
817    }
818
819    // ─────────────────────────────────────────────────────────────────────
820    // Recommendation engine
821    // ─────────────────────────────────────────────────────────────────────
822
823    fn build_recommendations(
824        &self,
825        ops: &[DetectedOperation],
826        _tables: &[String],
827        overall: RiskLevel,
828    ) -> Vec<String> {
829        let mut rec = Vec::new();
830
831        let has_drop_table = ops.iter().any(|o| o.description.contains("DROP TABLE"));
832        let has_drop_column = ops.iter().any(|o| o.description.contains("DROP COLUMN"));
833        let has_type_change = ops
834            .iter()
835            .any(|o| o.description.contains("TYPE ") && o.acquires_lock);
836        let has_index_without_concurrent = ops
837            .iter()
838            .any(|o| o.description.contains("CREATE") && o.index_rebuild && o.acquires_lock);
839        let has_not_null_no_default = ops
840            .iter()
841            .any(|o| o.description.contains("NOT NULL (no default)"));
842        let has_rename = ops.iter().any(|o| o.description.contains("RENAME"));
843        let has_cascade = ops.iter().any(|o| o.description.contains("CASCADE"));
844
845        if has_drop_table || has_drop_column {
846            rec.push("Deploy in two phases: first deploy app code that no longer reads the column/table, then drop it in a later migration".to_string());
847            rec.push("Take a full database backup before running this migration".to_string());
848        }
849
850        if has_type_change {
851            rec.push("Use a background migration: add a new column with the new type, backfill in batches, then swap and drop the old column".to_string());
852        }
853
854        if has_index_without_concurrent {
855            rec.push("Use CREATE INDEX CONCURRENTLY to build indexes without locking the table for writes".to_string());
856        }
857
858        if has_not_null_no_default {
859            rec.push("Add a DEFAULT value first, deploy the app change, then remove the default in a follow-up migration if needed".to_string());
860        }
861
862        if has_rename {
863            rec.push("Avoid renaming tables/columns in a single step; use a backward-compatible alias or view transition strategy".to_string());
864        }
865
866        if has_cascade {
867            rec.push("Review all ON DELETE CASCADE constraints — a single delete can silently remove rows across many tables".to_string());
868        }
869
870        if overall >= RiskLevel::High {
871            rec.push("Schedule this migration during a low-traffic maintenance window".to_string());
872            rec.push(
873                "Test this migration on a staging environment with production-sized data"
874                    .to_string(),
875            );
876        }
877
878        if overall >= RiskLevel::Medium {
879            let large: Vec<&str> = self
880                .row_counts
881                .iter()
882                .filter(|(_, &v)| v > 100_000)
883                .map(|(k, _)| k.as_str())
884                .collect();
885            if !large.is_empty() {
886                rec.push(format!(
887                    "Large tables detected ({}): consider batching long-running operations",
888                    large.join(", ")
889                ));
890            }
891        }
892
893        if rec.is_empty() {
894            rec.push(
895                "No specific recommendations – this migration looks safe to deploy".to_string(),
896            );
897        }
898
899        // Live-schema-aware additions
900        if let Some(live) = &self.live_schema {
901            for table in _tables {
902                if let Some(meta) = live.tables.get(table) {
903                    let mb = meta.total_size_bytes / (1024 * 1024);
904                    if mb > 1000 {
905                        rec.push(format!(
906                            "Table '{}' is {} on disk — ensure you have at least 2× free disk space for any rewrite operations",
907                            table, meta.total_size_pretty
908                        ));
909                    }
910                }
911            }
912        }
913
914        rec
915    }
916
917    // ─────────────────────────────────────────────────────────────────────
918    // Lock duration heuristic
919    // ─────────────────────────────────────────────────────────────────────
920
921    fn estimate_lock_seconds(&self, ops: &[DetectedOperation], _tables: &[String]) -> Option<u64> {
922        let locking_ops: Vec<&DetectedOperation> = ops.iter().filter(|o| o.acquires_lock).collect();
923
924        if locking_ops.is_empty() {
925            return None;
926        }
927
928        // Rough model: 1s base + 1s per 100k rows for rebuild/type-change ops
929        let mut total_secs: u64 = 0;
930        for op in &locking_ops {
931            let mut secs = 1u64;
932            for table in &op.tables {
933                if let Some(&rows) = self.row_counts.get(table) {
934                    let row_factor = rows / 100_000;
935                    if op.index_rebuild {
936                        secs += row_factor * 5; // index builds ~5s / 100k rows
937                    } else {
938                        secs += row_factor;
939                    }
940                }
941            }
942            total_secs += secs;
943        }
944
945        Some(total_secs.max(1))
946    }
947}