Skip to main content

schema_risk/
engine.rs

1//! Risk analysis engine.
2//!
3//! Each rule is a pure function that inspects one `ParsedStatement` and
4//! returns a `DetectedOperation`.  The engine aggregates them into a final
5//! `MigrationReport`.
6
7use crate::db::LiveSchema;
8use crate::graph::SchemaGraph;
9use crate::parser::ParsedStatement;
10use crate::types::{DetectedOperation, FkImpact, MigrationReport, RiskLevel};
11use chrono::Utc;
12
13// ─────────────────────────────────────────────
14// Engine
15// ─────────────────────────────────────────────
16
17pub struct RiskEngine {
18    /// Estimated rows per table – supplied by the user via --table-rows flag
19    /// OR imported from the live database via --db-url.
20    pub row_counts: std::collections::HashMap<String, u64>,
21    /// Optional live schema snapshot fetched via --db-url.
22    pub live_schema: Option<LiveSchema>,
23}
24
25impl RiskEngine {
26    pub fn new(row_counts: std::collections::HashMap<String, u64>) -> Self {
27        Self {
28            row_counts,
29            live_schema: None,
30        }
31    }
32
33    /// Create an engine seeded from a live database snapshot.
34    /// Row counts from `live` override any manually provided `row_counts`.
35    pub fn with_live_schema(
36        mut row_counts: std::collections::HashMap<String, u64>,
37        live: LiveSchema,
38    ) -> Self {
39        // Merge live row counts (live wins)
40        for (name, meta) in &live.tables {
41            row_counts.insert(name.clone(), meta.estimated_rows.max(0) as u64);
42        }
43        Self {
44            row_counts,
45            live_schema: Some(live),
46        }
47    }
48
49    /// Run every rule against the parsed statements and build a graph, then
50    /// return the final report for the file.
51    pub fn analyze(&self, file: &str, statements: &[ParsedStatement]) -> MigrationReport {
52        let mut graph = SchemaGraph::new();
53        let mut operations: Vec<DetectedOperation> = Vec::new();
54        let mut fk_impacts: Vec<FkImpact> = Vec::new();
55
56        // ── Pass 1: populate the schema graph ────────────────────────────
57        for stmt in statements {
58            self.populate_graph(&mut graph, stmt);
59        }
60
61        // ── Pass 2: evaluate every rule ──────────────────────────────────
62        for stmt in statements {
63            let ops = self.evaluate(stmt, &graph, &mut fk_impacts);
64            operations.extend(ops);
65        }
66
67        // ── Aggregate results ────────────────────────────────────────────
68        let score: u32 = operations.iter().map(|o| o.score).sum();
69        let overall_risk = RiskLevel::from_score(score);
70
71        let mut affected_tables: Vec<String> = operations
72            .iter()
73            .flat_map(|o| o.tables.iter().cloned())
74            .collect();
75        affected_tables.sort();
76        affected_tables.dedup();
77
78        let index_rebuild_required = operations.iter().any(|o| o.index_rebuild);
79        let requires_maintenance_window = overall_risk >= RiskLevel::High;
80
81        let warnings: Vec<String> = operations
82            .iter()
83            .filter_map(|o| o.warning.clone())
84            .collect();
85
86        let recommendations =
87            self.build_recommendations(&operations, &affected_tables, overall_risk);
88
89        // Lock estimate: rough heuristic based on table size
90        let estimated_lock_seconds = self.estimate_lock_seconds(&operations, &affected_tables);
91
92        MigrationReport {
93            file: file.to_string(),
94            overall_risk,
95            score,
96            affected_tables,
97            operations,
98            warnings,
99            recommendations,
100            fk_impacts,
101            estimated_lock_seconds,
102            index_rebuild_required,
103            requires_maintenance_window,
104            analyzed_at: Utc::now().to_rfc3339(),
105            guard_required: false,
106            guard_decisions: Vec::new(),
107        }
108    }
109
110    // ─────────────────────────────────────────────────────────────────────
111    // Graph population pass
112    // ─────────────────────────────────────────────────────────────────────
113
114    fn populate_graph(&self, graph: &mut SchemaGraph, stmt: &ParsedStatement) {
115        match stmt {
116            ParsedStatement::CreateTable {
117                table,
118                columns,
119                foreign_keys,
120                ..
121            } => {
122                let rows = self.row_counts.get(table).copied();
123                graph.add_table(table, rows);
124                for col in columns {
125                    graph.add_column(table, &col.name, &col.data_type, col.nullable);
126                }
127                for fk in foreign_keys {
128                    graph.add_foreign_key(
129                        table,
130                        &fk.ref_table,
131                        fk.constraint_name.clone(),
132                        fk.columns.clone(),
133                        fk.ref_columns.clone(),
134                        fk.on_delete_cascade,
135                        fk.on_update_cascade,
136                    );
137                }
138            }
139            ParsedStatement::AlterTableAddForeignKey { table, fk } => {
140                graph.add_foreign_key(
141                    table,
142                    &fk.ref_table,
143                    fk.constraint_name.clone(),
144                    fk.columns.clone(),
145                    fk.ref_columns.clone(),
146                    fk.on_delete_cascade,
147                    fk.on_update_cascade,
148                );
149            }
150            ParsedStatement::CreateIndex {
151                index_name,
152                table,
153                unique,
154                ..
155            } => {
156                let name = index_name
157                    .clone()
158                    .unwrap_or_else(|| format!("unnamed_idx_{}", table));
159                graph.add_table(table, self.row_counts.get(table).copied());
160                graph.add_index(&name, table, *unique);
161            }
162            _ => {}
163        }
164    }
165
166    // ─────────────────────────────────────────────────────────────────────
167    // Rule evaluation pass
168    // ─────────────────────────────────────────────────────────────────────
169
170    fn evaluate(
171        &self,
172        stmt: &ParsedStatement,
173        graph: &SchemaGraph,
174        fk_impacts: &mut Vec<FkImpact>,
175    ) -> Vec<DetectedOperation> {
176        match stmt {
177            // ── DROP TABLE  ──────────────────────────────────────────────────
178            ParsedStatement::DropTable {
179                tables, cascade, ..
180            } => {
181                let mut ops = Vec::new();
182                for table in tables {
183                    // Who references this table?
184                    let refs = graph.tables_referencing(table);
185                    let ref_count = refs.len();
186                    let downstream = graph.fk_downstream(table);
187
188                    let mut score = 100u32;
189                    let mut extra = String::new();
190
191                    if ref_count > 0 {
192                        score += (ref_count as u32) * 20;
193                        extra =
194                            format!(" Referenced by {} table(s): {}", ref_count, refs.join(", "));
195                        for r in &refs {
196                            fk_impacts.push(FkImpact {
197                                constraint_name: format!("{}_fk", r),
198                                from_table: r.clone(),
199                                to_table: table.clone(),
200                                cascade: *cascade,
201                            });
202                        }
203                    }
204                    if !downstream.is_empty() {
205                        score += (downstream.len() as u32) * 10;
206                    }
207
208                    ops.push(DetectedOperation {
209                        description: format!("DROP TABLE {}{}", table, extra),
210                        tables: vec![table.clone()],
211                        risk_level: RiskLevel::from_score(score),
212                        score,
213                        warning: Some(format!(
214                            "Dropping '{}' is irreversible. Cascade: {}.{}",
215                            table,
216                            cascade,
217                            if !downstream.is_empty() {
218                                format!(" Downstream tables affected: {}", downstream.join(", "))
219                            } else {
220                                String::new()
221                            }
222                        )),
223                        acquires_lock: true,
224                        index_rebuild: false,
225                    });
226                }
227                ops
228            }
229
230            // ── DROP COLUMN  ─────────────────────────────────────────────────
231            ParsedStatement::AlterTableDropColumn { table, column, .. } => {
232                vec![DetectedOperation {
233                    description: format!("ALTER TABLE {} DROP COLUMN {}", table, column),
234                    tables: vec![table.clone()],
235                    risk_level: RiskLevel::High,
236                    score: 60,
237                    warning: Some(format!(
238                        "Dropping column '{}.{}' is irreversible and may break application code",
239                        table, column
240                    )),
241                    acquires_lock: true,
242                    index_rebuild: false,
243                }]
244            }
245
246            // ── ALTER COLUMN TYPE ────────────────────────────────────────────
247            ParsedStatement::AlterTableAlterColumnType {
248                table,
249                column,
250                new_type,
251            } => {
252                let rows = self.row_counts.get(table).copied().unwrap_or(0);
253                let score = if rows > 1_000_000 { 90 } else { 40 };
254                let row_note = if rows > 0 {
255                    format!(" (~{} rows)", rows)
256                } else {
257                    String::new()
258                };
259                vec![DetectedOperation {
260                    description: format!(
261                        "ALTER TABLE {} ALTER COLUMN {} TYPE {}{}",
262                        table, column, new_type, row_note
263                    ),
264                    tables: vec![table.clone()],
265                    risk_level: RiskLevel::from_score(score),
266                    score,
267                    warning: Some(format!(
268                        "Type change on '{}.{}' may cause data loss and requires a full table rewrite{}",
269                        table, column, row_note
270                    )),
271                    acquires_lock: true,
272                    index_rebuild: true,
273                }]
274            }
275
276            // ── ADD COLUMN (NOT NULL, no default)  ───────────────────────────
277            ParsedStatement::AlterTableAddColumn { table, column } => {
278                if !column.nullable && !column.has_default {
279                    let rows = self.row_counts.get(table).copied().unwrap_or(0);
280                    let score = if rows > 0 { 50 } else { 25 };
281                    vec![DetectedOperation {
282                        description: format!(
283                            "ALTER TABLE {} ADD COLUMN {} {} NOT NULL (no default)",
284                            table, column.name, column.data_type
285                        ),
286                        tables: vec![table.clone()],
287                        risk_level: RiskLevel::from_score(score),
288                        score,
289                        warning: Some(format!(
290                            "Adding NOT NULL column '{}.{}' without a DEFAULT will fail if the table has existing rows",
291                            table, column.name
292                        )),
293                        acquires_lock: true,
294                        index_rebuild: false,
295                    }]
296                } else {
297                    vec![DetectedOperation {
298                        description: format!(
299                            "ALTER TABLE {} ADD COLUMN {} {}",
300                            table, column.name, column.data_type
301                        ),
302                        tables: vec![table.clone()],
303                        risk_level: RiskLevel::Low,
304                        score: 5,
305                        warning: None,
306                        acquires_lock: false,
307                        index_rebuild: false,
308                    }]
309                }
310            }
311
312            // ── CREATE INDEX (without CONCURRENTLY) ──────────────────────────
313            ParsedStatement::CreateIndex {
314                index_name,
315                table,
316                unique,
317                concurrently,
318                columns,
319            } => {
320                let name = index_name.as_deref().unwrap_or("unnamed");
321                let rows = self.row_counts.get(table).copied().unwrap_or(0);
322                let score: u32 = if *concurrently {
323                    5
324                } else if rows > 1_000_000 {
325                    70
326                } else {
327                    20
328                };
329
330                let warning = if !concurrently {
331                    Some(format!(
332                        "CREATE INDEX on '{}' without CONCURRENTLY will hold a SHARE lock for the duration of the build (cols: {})",
333                        table, columns.join(", ")
334                    ))
335                } else {
336                    None
337                };
338
339                vec![DetectedOperation {
340                    description: format!(
341                        "CREATE {}INDEX {} ON {} ({})",
342                        if *unique { "UNIQUE " } else { "" },
343                        name,
344                        table,
345                        columns.join(", ")
346                    ),
347                    tables: vec![table.clone()],
348                    risk_level: RiskLevel::from_score(score),
349                    score,
350                    warning,
351                    acquires_lock: !concurrently,
352                    index_rebuild: true,
353                }]
354            }
355
356            // ── DROP INDEX ───────────────────────────────────────────────────
357            ParsedStatement::DropIndex {
358                names,
359                concurrently,
360                ..
361            } => {
362                let score: u32 = if *concurrently { 2 } else { 10 };
363                let warning = if !concurrently {
364                    Some(format!(
365                        "DROP INDEX without CONCURRENTLY acquires an ACCESS EXCLUSIVE lock: {}",
366                        names.join(", ")
367                    ))
368                } else {
369                    None
370                };
371                vec![DetectedOperation {
372                    description: format!("DROP INDEX {}", names.join(", ")),
373                    tables: vec![],
374                    risk_level: RiskLevel::from_score(score),
375                    score,
376                    warning,
377                    acquires_lock: !concurrently,
378                    index_rebuild: false,
379                }]
380            }
381
382            // ── ADD FOREIGN KEY ──────────────────────────────────────────────
383            ParsedStatement::AlterTableAddForeignKey { table, fk } => {
384                let cascade_note = if fk.on_delete_cascade {
385                    " (ON DELETE CASCADE)"
386                } else {
387                    ""
388                };
389                let score = if fk.on_delete_cascade { 30 } else { 15 };
390                fk_impacts.push(FkImpact {
391                    constraint_name: fk
392                        .constraint_name
393                        .clone()
394                        .unwrap_or_else(|| format!("{}_fk", table)),
395                    from_table: table.clone(),
396                    to_table: fk.ref_table.clone(),
397                    cascade: fk.on_delete_cascade,
398                });
399                vec![DetectedOperation {
400                    description: format!(
401                        "ADD FOREIGN KEY {}.({}) → {}.({}){}",
402                        table,
403                        fk.columns.join(", "),
404                        fk.ref_table,
405                        fk.ref_columns.join(", "),
406                        cascade_note
407                    ),
408                    tables: vec![table.clone(), fk.ref_table.clone()],
409                    risk_level: RiskLevel::from_score(score),
410                    score,
411                    warning: if fk.on_delete_cascade {
412                        Some(format!(
413                            "ON DELETE CASCADE on '{}.{}' can silently delete rows in '{}' when the parent is deleted",
414                            table, fk.columns.join(", "), fk.ref_table
415                        ))
416                    } else {
417                        None
418                    },
419                    acquires_lock: true,
420                    index_rebuild: false,
421                }]
422            }
423
424            // ── DROP CONSTRAINT ──────────────────────────────────────────────
425            ParsedStatement::AlterTableDropConstraint {
426                table,
427                constraint,
428                cascade,
429            } => {
430                let score = if *cascade { 25 } else { 10 };
431                vec![DetectedOperation {
432                    description: format!(
433                        "ALTER TABLE {} DROP CONSTRAINT {}{}",
434                        table,
435                        constraint,
436                        if *cascade { " CASCADE" } else { "" }
437                    ),
438                    tables: vec![table.clone()],
439                    risk_level: RiskLevel::from_score(score),
440                    score,
441                    warning: if *cascade {
442                        Some(format!(
443                            "Dropping constraint '{}' with CASCADE may drop dependent objects",
444                            constraint
445                        ))
446                    } else {
447                        None
448                    },
449                    acquires_lock: true,
450                    index_rebuild: false,
451                }]
452            }
453
454            // ── RENAME COLUMN ────────────────────────────────────────────────
455            ParsedStatement::AlterTableRenameColumn { table, old, new } => {
456                vec![DetectedOperation {
457                    description: format!(
458                        "ALTER TABLE {} RENAME COLUMN {} TO {}",
459                        table, old, new
460                    ),
461                    tables: vec![table.clone()],
462                    risk_level: RiskLevel::High,
463                    score: 55,
464                    warning: Some(format!(
465                        "Renaming column '{}.{}' is a breaking change for any downstream code that references the old name",
466                        table, old
467                    )),
468                    acquires_lock: true,
469                    index_rebuild: false,
470                }]
471            }
472
473            // ── RENAME TABLE ─────────────────────────────────────────────────
474            ParsedStatement::AlterTableRenameTable { old, new } => {
475                vec![DetectedOperation {
476                    description: format!("ALTER TABLE {} RENAME TO {}", old, new),
477                    tables: vec![old.clone(), new.clone()],
478                    risk_level: RiskLevel::High,
479                    score: 65,
480                    warning: Some(format!(
481                        "Renaming table '{}' to '{}' breaks all queries, ORMs, and FK constraints referencing the old name",
482                        old, new
483                    )),
484                    acquires_lock: true,
485                    index_rebuild: false,
486                }]
487            }
488
489            // ── SET NOT NULL ─────────────────────────────────────────────────
490            ParsedStatement::AlterTableSetNotNull { table, column } => {
491                let rows = self.row_counts.get(table).copied().unwrap_or(0);
492                let score = if rows > 1_000_000 { 45 } else { 20 };
493                vec![DetectedOperation {
494                    description: format!(
495                        "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
496                        table, column
497                    ),
498                    tables: vec![table.clone()],
499                    risk_level: RiskLevel::from_score(score),
500                    score,
501                    warning: Some(format!(
502                        "SET NOT NULL on '{}.{}' requires a full table scan to validate existing rows",
503                        table, column
504                    )),
505                    acquires_lock: true,
506                    index_rebuild: false,
507                }]
508            }
509
510            // ── CREATE TABLE ─────────────────────────────────────────────────
511            ParsedStatement::CreateTable { table, .. } => {
512                vec![DetectedOperation {
513                    description: format!("CREATE TABLE {}", table),
514                    tables: vec![table.clone()],
515                    risk_level: RiskLevel::Low,
516                    score: 2,
517                    warning: None,
518                    acquires_lock: false,
519                    index_rebuild: false,
520                }]
521            }
522
523            // ── ADD PRIMARY KEY ──────────────────────────────────────────────
524            ParsedStatement::AlterTableAddPrimaryKey { table, columns } => {
525                let rows = self.row_counts.get(table).copied().unwrap_or(0);
526                let score = if rows > 1_000_000 { 80 } else { 35 };
527                vec![DetectedOperation {
528                    description: format!(
529                        "ALTER TABLE {} ADD PRIMARY KEY ({})",
530                        table,
531                        columns.join(", ")
532                    ),
533                    tables: vec![table.clone()],
534                    risk_level: RiskLevel::from_score(score),
535                    score,
536                    warning: Some(format!(
537                        "Adding PRIMARY KEY to '{}' builds an index over the entire table",
538                        table
539                    )),
540                    acquires_lock: true,
541                    index_rebuild: true,
542                }]
543            }
544
545            // ── OTHER (unmodelled DDL) — B-01 fix ────────────────────────────
546            ParsedStatement::Other { raw } => {
547                if raw.contains("Unmodelled DDL") {
548                    // Belt-and-suspenders: parser flagged this as potentially dangerous
549                    vec![DetectedOperation {
550                        description: raw.chars().take(100).collect(),
551                        tables: vec![],
552                        risk_level: RiskLevel::Medium,
553                        score: 30,
554                        warning: Some(
555                            "Unmodelled DDL — manual review required before running".to_string(),
556                        ),
557                        acquires_lock: true,
558                        index_rebuild: false,
559                    }]
560                } else {
561                    vec![]
562                }
563            }
564            _ => vec![],
565        }
566    }
567
568    // ─────────────────────────────────────────────────────────────────────
569    // Recommendation engine
570    // ─────────────────────────────────────────────────────────────────────
571
572    fn build_recommendations(
573        &self,
574        ops: &[DetectedOperation],
575        _tables: &[String],
576        overall: RiskLevel,
577    ) -> Vec<String> {
578        let mut rec = Vec::new();
579
580        let has_drop_table = ops.iter().any(|o| o.description.contains("DROP TABLE"));
581        let has_drop_column = ops.iter().any(|o| o.description.contains("DROP COLUMN"));
582        let has_type_change = ops
583            .iter()
584            .any(|o| o.description.contains("TYPE ") && o.acquires_lock);
585        let has_index_without_concurrent = ops
586            .iter()
587            .any(|o| o.description.contains("CREATE") && o.index_rebuild && o.acquires_lock);
588        let has_not_null_no_default = ops
589            .iter()
590            .any(|o| o.description.contains("NOT NULL (no default)"));
591        let has_rename = ops.iter().any(|o| o.description.contains("RENAME"));
592        let has_cascade = ops.iter().any(|o| o.description.contains("CASCADE"));
593
594        if has_drop_table || has_drop_column {
595            rec.push("Deploy in two phases: first deploy app code that no longer reads the column/table, then drop it in a later migration".to_string());
596            rec.push("Take a full database backup before running this migration".to_string());
597        }
598
599        if has_type_change {
600            rec.push("Use a background migration: add a new column with the new type, backfill in batches, then swap and drop the old column".to_string());
601        }
602
603        if has_index_without_concurrent {
604            rec.push("Use CREATE INDEX CONCURRENTLY to build indexes without locking the table for writes".to_string());
605        }
606
607        if has_not_null_no_default {
608            rec.push("Add a DEFAULT value first, deploy the app change, then remove the default in a follow-up migration if needed".to_string());
609        }
610
611        if has_rename {
612            rec.push("Avoid renaming tables/columns in a single step; use a backward-compatible alias or view transition strategy".to_string());
613        }
614
615        if has_cascade {
616            rec.push("Review all ON DELETE CASCADE constraints — a single delete can silently remove rows across many tables".to_string());
617        }
618
619        if overall >= RiskLevel::High {
620            rec.push("Schedule this migration during a low-traffic maintenance window".to_string());
621            rec.push(
622                "Test this migration on a staging environment with production-sized data"
623                    .to_string(),
624            );
625        }
626
627        if overall >= RiskLevel::Medium {
628            let large: Vec<&str> = self
629                .row_counts
630                .iter()
631                .filter(|(_, &v)| v > 100_000)
632                .map(|(k, _)| k.as_str())
633                .collect();
634            if !large.is_empty() {
635                rec.push(format!(
636                    "Large tables detected ({}): consider batching long-running operations",
637                    large.join(", ")
638                ));
639            }
640        }
641
642        if rec.is_empty() {
643            rec.push(
644                "No specific recommendations – this migration looks safe to deploy".to_string(),
645            );
646        }
647
648        // Live-schema-aware additions
649        if let Some(live) = &self.live_schema {
650            for table in _tables {
651                if let Some(meta) = live.tables.get(table) {
652                    let mb = meta.total_size_bytes / (1024 * 1024);
653                    if mb > 1000 {
654                        rec.push(format!(
655                            "Table '{}' is {} on disk — ensure you have at least 2× free disk space for any rewrite operations",
656                            table, meta.total_size_pretty
657                        ));
658                    }
659                }
660            }
661        }
662
663        rec
664    }
665
666    // ─────────────────────────────────────────────────────────────────────
667    // Lock duration heuristic
668    // ─────────────────────────────────────────────────────────────────────
669
670    fn estimate_lock_seconds(&self, ops: &[DetectedOperation], _tables: &[String]) -> Option<u64> {
671        let locking_ops: Vec<&DetectedOperation> = ops.iter().filter(|o| o.acquires_lock).collect();
672
673        if locking_ops.is_empty() {
674            return None;
675        }
676
677        // Rough model: 1s base + 1s per 100k rows for rebuild/type-change ops
678        let mut total_secs: u64 = 0;
679        for op in &locking_ops {
680            let mut secs = 1u64;
681            for table in &op.tables {
682                if let Some(&rows) = self.row_counts.get(table) {
683                    let row_factor = rows / 100_000;
684                    if op.index_rebuild {
685                        secs += row_factor * 5; // index builds ~5s / 100k rows
686                    } else {
687                        secs += row_factor;
688                    }
689                }
690            }
691            total_secs += secs;
692        }
693
694        Some(total_secs.max(1))
695    }
696}