Skip to main content

schema_risk/
locks.rs

1//! Lock simulation and migration execution timeline.
2//!
3//! PostgreSQL has a well-defined lock hierarchy.  Every DDL operation acquires
4//! a specific lock mode.  This module encodes those rules so the tool can tell
5//! the user exactly what will be blocked — and for how long.
6
7use crate::parser::ParsedStatement;
8use crate::types::RiskLevel;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12// ─────────────────────────────────────────────
13// PostgreSQL lock hierarchy (8 modes)
14// ─────────────────────────────────────────────
15//
16// Strongest → weakest:
17//   ACCESS EXCLUSIVE  — blocks everything (ALTER TABLE, DROP, TRUNCATE)
18//   EXCLUSIVE         — blocks reads + writes (rare in DDL)
19//   SHARE ROW EXCLUSIVE
20//   SHARE UPDATE EXCLUSIVE — used by VACUUM, ANALYZE
21//   SHARE             — blocks writes; CREATE INDEX (non-concurrent)
22//   ROW SHARE         — SELECT FOR UPDATE
23//   ROW EXCLUSIVE     — DML (INSERT, UPDATE, DELETE)
24//   ACCESS SHARE      — plain SELECT
25//
26// Conflict matrix summary we care about:
27//   ACCESS EXCLUSIVE conflicts with everything.
28//   SHARE conflicts with ROW EXCLUSIVE and stronger.
29//   ACCESS SHARE conflicts only with ACCESS EXCLUSIVE.
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
32#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
33pub enum LockMode {
34    AccessShare,          // 1 — SELECT
35    RowShare,             // 2 — SELECT FOR UPDATE
36    RowExclusive,         // 3 — DML
37    ShareUpdateExclusive, // 4 — VACUUM, CREATE INDEX CONCURRENTLY phase 1
38    Share,                // 5 — CREATE INDEX (blocking)
39    ShareRowExclusive,    // 6 — CREATE TRIGGER
40    Exclusive,            // 7 — rare
41    AccessExclusive,      // 8 — ALTER TABLE, DROP, TRUNCATE
42}
43
44impl LockMode {
45    pub fn name(&self) -> &'static str {
46        match self {
47            LockMode::AccessShare => "ACCESS SHARE",
48            LockMode::RowShare => "ROW SHARE",
49            LockMode::RowExclusive => "ROW EXCLUSIVE",
50            LockMode::ShareUpdateExclusive => "SHARE UPDATE EXCLUSIVE",
51            LockMode::Share => "SHARE",
52            LockMode::ShareRowExclusive => "SHARE ROW EXCLUSIVE",
53            LockMode::Exclusive => "EXCLUSIVE",
54            LockMode::AccessExclusive => "ACCESS EXCLUSIVE",
55        }
56    }
57
58    /// Does this lock block ordinary reads (SELECT)?
59    pub fn blocks_reads(&self) -> bool {
60        *self >= LockMode::AccessExclusive
61    }
62
63    /// Does this lock block ordinary writes (INSERT / UPDATE / DELETE)?
64    pub fn blocks_writes(&self) -> bool {
65        *self >= LockMode::Share
66    }
67
68    /// Human-readable impact string.
69    pub fn impact(&self) -> &'static str {
70        match self {
71            LockMode::AccessShare => "no blocking",
72            LockMode::RowShare => "blocks ACCESS EXCLUSIVE only",
73            LockMode::RowExclusive => "blocks SHARE and stronger",
74            LockMode::ShareUpdateExclusive => "blocks DDL; allows reads+writes",
75            LockMode::Share => "blocks writes (DML blocked)",
76            LockMode::ShareRowExclusive => "blocks writes + most DDL",
77            LockMode::Exclusive => "blocks reads + writes",
78            LockMode::AccessExclusive => "blocks ALL access (reads + writes)",
79        }
80    }
81}
82
83// ─────────────────────────────────────────────
84// Lock event for a single statement
85// ─────────────────────────────────────────────
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct LockEvent {
89    /// SQL statement description
90    pub statement: String,
91    /// Affected table(s)
92    pub tables: Vec<String>,
93    /// The lock mode acquired
94    pub lock_mode: LockMode,
95    /// True if ordinary SELECT queries will be blocked
96    pub blocks_reads: bool,
97    /// True if INSERT/UPDATE/DELETE will be blocked
98    pub blocks_writes: bool,
99    /// Human-readable impact
100    pub impact: String,
101    /// Estimated hold duration in seconds
102    pub estimated_hold_secs: u64,
103    /// Safety recommendation for this specific operation
104    pub safe_alternative: Option<String>,
105}
106
107// ─────────────────────────────────────────────
108// Timeline step
109// ─────────────────────────────────────────────
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct TimelineStep {
113    /// Offset from migration start in seconds
114    pub offset_secs: u64,
115    /// Human-readable event description
116    pub event: String,
117    /// Which lock is held at this moment (None = free)
118    pub lock: Option<LockMode>,
119    /// Which tables are affected
120    pub tables: Vec<String>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct MigrationTimeline {
125    /// Total estimated duration in seconds
126    pub total_secs: u64,
127    pub steps: Vec<TimelineStep>,
128    pub lock_events: Vec<LockEvent>,
129    /// Risk assessment of the whole lock pattern
130    pub lock_risk: RiskLevel,
131    /// Maximum concurrent lock hold duration  
132    pub max_lock_hold_secs: u64,
133}
134
135// ─────────────────────────────────────────────
136// The simulator
137// ─────────────────────────────────────────────
138
139pub struct LockSimulator {
140    /// row_counts: table_name → estimated rows
141    row_counts: HashMap<String, u64>,
142}
143
144impl LockSimulator {
145    pub fn new(row_counts: HashMap<String, u64>) -> Self {
146        Self { row_counts }
147    }
148
149    /// Simulate the full migration and return a timeline.
150    pub fn simulate(&self, statements: &[ParsedStatement]) -> MigrationTimeline {
151        let mut lock_events: Vec<LockEvent> = Vec::new();
152        for stmt in statements {
153            if let Some(ev) = self.lock_for(stmt) {
154                lock_events.push(ev);
155            }
156        }
157
158        let timeline = self.build_timeline(&lock_events);
159        let total_secs = timeline.last().map(|s| s.offset_secs).unwrap_or(0);
160        let max_lock_hold_secs = lock_events
161            .iter()
162            .map(|e| e.estimated_hold_secs)
163            .max()
164            .unwrap_or(0);
165
166        let lock_risk = self.assess_lock_risk(&lock_events, max_lock_hold_secs);
167
168        MigrationTimeline {
169            total_secs,
170            steps: timeline,
171            lock_events,
172            lock_risk,
173            max_lock_hold_secs,
174        }
175    }
176
177    // ── Derive the lock event for a single parsed statement ──────────────
178
179    fn lock_for(&self, stmt: &ParsedStatement) -> Option<LockEvent> {
180        match stmt {
181            // ── DROP TABLE: ACCESS EXCLUSIVE ──────────────────────────────
182            ParsedStatement::DropTable { tables, .. } => {
183                let hold = self.row_based_hold(tables, 1, 5);
184                Some(LockEvent {
185                    statement: format!("DROP TABLE {}", tables.join(", ")),
186                    tables: tables.clone(),
187                    lock_mode: LockMode::AccessExclusive,
188                    blocks_reads: true,
189                    blocks_writes: true,
190                    impact: LockMode::AccessExclusive.impact().to_string(),
191                    estimated_hold_secs: hold,
192                    safe_alternative: None, // DROP TABLE has no safe alternative
193                })
194            }
195
196            // ── ALTER TABLE ADD COLUMN ─────────────────────────────────────
197            ParsedStatement::AlterTableAddColumn { table, column } => {
198                // In PG 11+ adding a NOT NULL column with a volatile default
199                // requires a full table rewrite → long ACCESS EXCLUSIVE lock.
200                // Adding a nullable/defaulted column is instant in PG 11+.
201                let is_instant = column.nullable || column.has_default;
202                let hold = if is_instant {
203                    1
204                } else {
205                    self.row_based_hold(std::slice::from_ref(table), 1, 8)
206                };
207                Some(LockEvent {
208                    statement: format!("ALTER TABLE {} ADD COLUMN {}", table, column.name),
209                    tables: vec![table.clone()],
210                    lock_mode: LockMode::AccessExclusive,
211                    blocks_reads: true,
212                    blocks_writes: true,
213                    impact: LockMode::AccessExclusive.impact().to_string(),
214                    estimated_hold_secs: hold,
215                    safe_alternative: if !column.nullable && !column.has_default {
216                        Some(format!(
217                            "ALTER TABLE {} ADD COLUMN {} {} DEFAULT <expr> NOT NULL; \
218                             -- then in a separate migration: ALTER TABLE {} ALTER COLUMN {} DROP DEFAULT",
219                            table, column.name, column.data_type, table, column.name
220                        ))
221                    } else {
222                        None
223                    },
224                })
225            }
226
227            // ── ALTER TABLE DROP COLUMN ───────────────────────────────────
228            ParsedStatement::AlterTableDropColumn { table, column, .. } => {
229                let hold = self.row_based_hold(std::slice::from_ref(table), 1, 10);
230                Some(LockEvent {
231                    statement: format!("ALTER TABLE {} DROP COLUMN {}", table, column),
232                    tables: vec![table.clone()],
233                    lock_mode: LockMode::AccessExclusive,
234                    blocks_reads: true,
235                    blocks_writes: true,
236                    impact: LockMode::AccessExclusive.impact().to_string(),
237                    estimated_hold_secs: hold,
238                    safe_alternative: Some(
239                        "Phase 1: remove all app code that reads this column → deploy. \
240                         Phase 2: run this DROP COLUMN in the next migration."
241                            .to_string(),
242                    ),
243                })
244            }
245
246            // ── ALTER COLUMN TYPE ─────────────────────────────────────────
247            // Full table rewrite → long ACCESS EXCLUSIVE.
248            ParsedStatement::AlterTableAlterColumnType { table, column, new_type } => {
249                let hold = self.row_based_hold(std::slice::from_ref(table), 2, 15);
250                Some(LockEvent {
251                    statement: format!(
252                        "ALTER TABLE {} ALTER COLUMN {} TYPE {}",
253                        table, column, new_type
254                    ),
255                    tables: vec![table.clone()],
256                    lock_mode: LockMode::AccessExclusive,
257                    blocks_reads: true,
258                    blocks_writes: true,
259                    impact: LockMode::AccessExclusive.impact().to_string(),
260                    estimated_hold_secs: hold,
261                    safe_alternative: Some(format!(
262                        "Background migration: \
263                         1. ADD COLUMN {col}_new {ty} \
264                         2. Backfill in batches: UPDATE {tbl} SET {col}_new = {col}::text LIMIT 10000 \
265                         3. Deploy app with dual-read \
266                         4. DROP COLUMN {col}; RENAME COLUMN {col}_new TO {col}",
267                        col = column, ty = new_type, tbl = table
268                    )),
269                })
270            }
271
272            // ── SET NOT NULL ──────────────────────────────────────────────
273            // PostgreSQL must scan every row → can be slow.
274            ParsedStatement::AlterTableSetNotNull { table, column } => {
275                let hold = self.row_based_hold(std::slice::from_ref(table), 1, 10);
276                Some(LockEvent {
277                    statement: format!(
278                        "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
279                        table, column
280                    ),
281                    tables: vec![table.clone()],
282                    lock_mode: LockMode::AccessExclusive,
283                    blocks_reads: true,
284                    blocks_writes: true,
285                    impact: LockMode::AccessExclusive.impact().to_string(),
286                    estimated_hold_secs: hold,
287                    safe_alternative: Some(format!(
288                        "Use a NOT VALID CHECK constraint first: \
289                         ALTER TABLE {tbl} ADD CONSTRAINT {col}_not_null CHECK ({col} IS NOT NULL) NOT VALID; \
290                         -- validate in background: \
291                         ALTER TABLE {tbl} VALIDATE CONSTRAINT {col}_not_null;",
292                        tbl = table, col = column
293                    )),
294                })
295            }
296
297            // ── CREATE INDEX (blocking) ───────────────────────────────────
298            ParsedStatement::CreateIndex {
299                index_name,
300                table,
301                columns,
302                unique,
303                concurrently,
304            } if !concurrently => {
305                let hold = self.row_based_hold(std::slice::from_ref(table), 1, 20);
306                let name = index_name.as_deref().unwrap_or("unnamed");
307                Some(LockEvent {
308                    statement: format!(
309                        "CREATE {}INDEX {} ON {} ({})",
310                        if *unique { "UNIQUE " } else { "" },
311                        name,
312                        table,
313                        columns.join(", ")
314                    ),
315                    tables: vec![table.clone()],
316                    lock_mode: LockMode::Share,
317                    blocks_reads: false,
318                    blocks_writes: true,
319                    impact: LockMode::Share.impact().to_string(),
320                    estimated_hold_secs: hold,
321                    safe_alternative: Some(format!(
322                        "CREATE {}INDEX CONCURRENTLY {} ON {} ({});",
323                        if *unique { "UNIQUE " } else { "" },
324                        name,
325                        table,
326                        columns.join(", ")
327                    )),
328                })
329            }
330
331            // ── CREATE INDEX CONCURRENTLY ─────────────────────────────────
332            ParsedStatement::CreateIndex {
333                index_name,
334                table,
335                columns,
336                unique,
337                concurrently: true,
338            } => {
339                // CONCURRENTLY takes SHARE UPDATE EXCLUSIVE (allows reads+writes)
340                let hold = self.row_based_hold(std::slice::from_ref(table), 2, 30);
341                let name = index_name.as_deref().unwrap_or("unnamed");
342                Some(LockEvent {
343                    statement: format!(
344                        "CREATE {}INDEX CONCURRENTLY {} ON {} ({})",
345                        if *unique { "UNIQUE " } else { "" },
346                        name,
347                        table,
348                        columns.join(", ")
349                    ),
350                    tables: vec![table.clone()],
351                    lock_mode: LockMode::ShareUpdateExclusive,
352                    blocks_reads: false,
353                    blocks_writes: false,
354                    impact: LockMode::ShareUpdateExclusive.impact().to_string(),
355                    estimated_hold_secs: hold,
356                    safe_alternative: None, // already optimal
357                })
358            }
359
360            // ── ADD FOREIGN KEY ───────────────────────────────────────────
361            ParsedStatement::AlterTableAddForeignKey { table, fk } => {
362                let hold = self.row_based_hold(&[table.clone(), fk.ref_table.clone()], 1, 5);
363                Some(LockEvent {
364                    statement: format!(
365                        "ALTER TABLE {} ADD FOREIGN KEY ({}) REFERENCES {}({})",
366                        table,
367                        fk.columns.join(", "),
368                        fk.ref_table,
369                        fk.ref_columns.join(", ")
370                    ),
371                    tables: vec![table.clone(), fk.ref_table.clone()],
372                    lock_mode: LockMode::ShareRowExclusive,
373                    blocks_reads: false,
374                    blocks_writes: true,
375                    impact: LockMode::ShareRowExclusive.impact().to_string(),
376                    estimated_hold_secs: hold,
377                    safe_alternative: Some(format!(
378                        "ALTER TABLE {} ADD CONSTRAINT {} FOREIGN KEY ({}) REFERENCES {}({}) NOT VALID; \
379                         -- then in a separate session (low traffic): \
380                         ALTER TABLE {} VALIDATE CONSTRAINT {};",
381                        table,
382                        fk.constraint_name.as_deref().unwrap_or("fk_name"),
383                        fk.columns.join(", "),
384                        fk.ref_table,
385                        fk.ref_columns.join(", "),
386                        table,
387                        fk.constraint_name.as_deref().unwrap_or("fk_name"),
388                    )),
389                })
390            }
391
392            // ── RENAME TABLE ──────────────────────────────────────────────
393            ParsedStatement::AlterTableRenameTable { old, new } => {
394                Some(LockEvent {
395                    statement: format!("ALTER TABLE {} RENAME TO {}", old, new),
396                    tables: vec![old.clone()],
397                    lock_mode: LockMode::AccessExclusive,
398                    blocks_reads: true,
399                    blocks_writes: true,
400                    impact: LockMode::AccessExclusive.impact().to_string(),
401                    estimated_hold_secs: 1,
402                    safe_alternative: Some(format!(
403                        "Create a view: CREATE VIEW {} AS SELECT * FROM {}; \
404                         then migrate app code to use the new name before dropping the view.",
405                        new, old
406                    )),
407                })
408            }
409
410            // ── RENAME COLUMN ─────────────────────────────────────────────
411            ParsedStatement::AlterTableRenameColumn { table, old, new } => {
412                Some(LockEvent {
413                    statement: format!(
414                        "ALTER TABLE {} RENAME COLUMN {} TO {}",
415                        table, old, new
416                    ),
417                    tables: vec![table.clone()],
418                    lock_mode: LockMode::AccessExclusive,
419                    blocks_reads: true,
420                    blocks_writes: true,
421                    impact: LockMode::AccessExclusive.impact().to_string(),
422                    estimated_hold_secs: 1,
423                    safe_alternative: Some(format!(
424                        "Phase 1: ADD COLUMN {new} type; sync writes in app code to both columns. \
425                         Phase 2: backfill. Phase 3: remove old references. Phase 4: DROP COLUMN {old}.",
426                        old = old, new = new
427                    )),
428                })
429            }
430
431            // ── DROP INDEX ────────────────────────────────────────────────
432            ParsedStatement::DropIndex { names, concurrently, .. } => {
433                Some(LockEvent {
434                    statement: format!("DROP INDEX {}", names.join(", ")),
435                    tables: vec![],
436                    lock_mode: if *concurrently {
437                        LockMode::ShareUpdateExclusive
438                    } else {
439                        LockMode::AccessExclusive
440                    },
441                    blocks_reads: !concurrently,
442                    blocks_writes: true,
443                    impact: if *concurrently {
444                        LockMode::ShareUpdateExclusive.impact().to_string()
445                    } else {
446                        LockMode::AccessExclusive.impact().to_string()
447                    },
448                    estimated_hold_secs: 1,
449                    safe_alternative: if !concurrently {
450                        Some(format!("DROP INDEX CONCURRENTLY {};", names.join(", ")))
451                    } else {
452                        None
453                    },
454                })
455            }
456
457            // ── ANYTHING ELSE that touches a table ────────────────────────
458            ParsedStatement::AlterTableDropConstraint { table, constraint, .. } => {
459                Some(LockEvent {
460                    statement: format!(
461                        "ALTER TABLE {} DROP CONSTRAINT {}",
462                        table, constraint
463                    ),
464                    tables: vec![table.clone()],
465                    lock_mode: LockMode::AccessExclusive,
466                    blocks_reads: true,
467                    blocks_writes: true,
468                    impact: LockMode::AccessExclusive.impact().to_string(),
469                    estimated_hold_secs: 1,
470                    safe_alternative: None,
471                })
472            }
473
474            ParsedStatement::AlterTableAddPrimaryKey { table, columns } => {
475                let hold = self.row_based_hold(std::slice::from_ref(table), 2, 25);
476                Some(LockEvent {
477                    statement: format!(
478                        "ALTER TABLE {} ADD PRIMARY KEY ({})",
479                        table,
480                        columns.join(", ")
481                    ),
482                    tables: vec![table.clone()],
483                    lock_mode: LockMode::AccessExclusive,
484                    blocks_reads: true,
485                    blocks_writes: true,
486                    impact: LockMode::AccessExclusive.impact().to_string(),
487                    estimated_hold_secs: hold,
488                    safe_alternative: Some(format!(
489                        "CREATE UNIQUE INDEX CONCURRENTLY pkey_idx ON {} ({});\n\
490                         ALTER TABLE {} ADD CONSTRAINT {}_pkey PRIMARY KEY USING INDEX pkey_idx;",
491                        table,
492                        columns.join(", "),
493                        table,
494                        table,
495                    )),
496                })
497            }
498
499            // CREATE TABLE, Other — no lock events worth surfacing
500            _ => None,
501        }
502    }
503
504    // ── Row-count-based hold time heuristic ──────────────────────────────
505    // base_secs: minimum hold regardless of rows
506    // secs_per_million: how many extra seconds per million rows
507    fn row_based_hold(&self, tables: &[String], base_secs: u64, secs_per_million: u64) -> u64 {
508        let max_rows: u64 = tables
509            .iter()
510            .filter_map(|t| self.row_counts.get(t))
511            .max()
512            .copied()
513            .unwrap_or(0);
514
515        let millions = max_rows / 1_000_000;
516        base_secs + millions * secs_per_million
517    }
518
519    // ── Build the human-readable timeline from lock events ───────────────
520    fn build_timeline(&self, events: &[LockEvent]) -> Vec<TimelineStep> {
521        let mut steps: Vec<TimelineStep> = Vec::new();
522        let mut offset: u64 = 0;
523
524        steps.push(TimelineStep {
525            offset_secs: 0,
526            event: "Migration started".to_string(),
527            lock: None,
528            tables: vec![],
529        });
530
531        for ev in events {
532            // Acquire lock
533            steps.push(TimelineStep {
534                offset_secs: offset,
535                event: format!("Acquire {} lock — {}", ev.lock_mode.name(), ev.statement),
536                lock: Some(ev.lock_mode),
537                tables: ev.tables.clone(),
538            });
539
540            // Execute statement (halfway through hold time)
541            let exec_offset = offset + ev.estimated_hold_secs / 2 + 1;
542            steps.push(TimelineStep {
543                offset_secs: exec_offset,
544                event: format!("Execute: {}", ev.statement),
545                lock: Some(ev.lock_mode),
546                tables: ev.tables.clone(),
547            });
548
549            // Release lock
550            offset += ev.estimated_hold_secs;
551            steps.push(TimelineStep {
552                offset_secs: offset,
553                event: format!("Release {} lock", ev.lock_mode.name()),
554                lock: None,
555                tables: ev.tables.clone(),
556            });
557
558            offset += 1; // 1s pause between statements
559        }
560
561        steps.push(TimelineStep {
562            offset_secs: offset,
563            event: "Migration complete".to_string(),
564            lock: None,
565            tables: vec![],
566        });
567
568        steps
569    }
570
571    // ── Overall lock risk assessment ─────────────────────────────────────
572    fn assess_lock_risk(&self, events: &[LockEvent], max_hold: u64) -> RiskLevel {
573        let has_read_block = events.iter().any(|e| e.blocks_reads);
574        let has_write_block = events.iter().any(|e| e.blocks_writes);
575
576        match (has_read_block, has_write_block, max_hold) {
577            (true, _, secs) if secs > 30 => RiskLevel::Critical,
578            (true, _, _) => RiskLevel::High,
579            (false, true, secs) if secs > 60 => RiskLevel::High,
580            (false, true, _) => RiskLevel::Medium,
581            _ => RiskLevel::Low,
582        }
583    }
584}