1use crate::parser::ParsedStatement;
8use crate::types::RiskLevel;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
32#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
33pub enum LockMode {
34 AccessShare, RowShare, RowExclusive, ShareUpdateExclusive, Share, ShareRowExclusive, Exclusive, AccessExclusive, }
43
44impl LockMode {
45 pub fn name(&self) -> &'static str {
46 match self {
47 LockMode::AccessShare => "ACCESS SHARE",
48 LockMode::RowShare => "ROW SHARE",
49 LockMode::RowExclusive => "ROW EXCLUSIVE",
50 LockMode::ShareUpdateExclusive => "SHARE UPDATE EXCLUSIVE",
51 LockMode::Share => "SHARE",
52 LockMode::ShareRowExclusive => "SHARE ROW EXCLUSIVE",
53 LockMode::Exclusive => "EXCLUSIVE",
54 LockMode::AccessExclusive => "ACCESS EXCLUSIVE",
55 }
56 }
57
58 pub fn blocks_reads(&self) -> bool {
60 *self >= LockMode::AccessExclusive
61 }
62
63 pub fn blocks_writes(&self) -> bool {
65 *self >= LockMode::Share
66 }
67
68 pub fn impact(&self) -> &'static str {
70 match self {
71 LockMode::AccessShare => "no blocking",
72 LockMode::RowShare => "blocks ACCESS EXCLUSIVE only",
73 LockMode::RowExclusive => "blocks SHARE and stronger",
74 LockMode::ShareUpdateExclusive => "blocks DDL; allows reads+writes",
75 LockMode::Share => "blocks writes (DML blocked)",
76 LockMode::ShareRowExclusive => "blocks writes + most DDL",
77 LockMode::Exclusive => "blocks reads + writes",
78 LockMode::AccessExclusive => "blocks ALL access (reads + writes)",
79 }
80 }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct LockEvent {
89 pub statement: String,
91 pub tables: Vec<String>,
93 pub lock_mode: LockMode,
95 pub blocks_reads: bool,
97 pub blocks_writes: bool,
99 pub impact: String,
101 pub estimated_hold_secs: u64,
103 pub safe_alternative: Option<String>,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct TimelineStep {
113 pub offset_secs: u64,
115 pub event: String,
117 pub lock: Option<LockMode>,
119 pub tables: Vec<String>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct MigrationTimeline {
125 pub total_secs: u64,
127 pub steps: Vec<TimelineStep>,
128 pub lock_events: Vec<LockEvent>,
129 pub lock_risk: RiskLevel,
131 pub max_lock_hold_secs: u64,
133}
134
135pub struct LockSimulator {
140 row_counts: HashMap<String, u64>,
142}
143
144impl LockSimulator {
145 pub fn new(row_counts: HashMap<String, u64>) -> Self {
146 Self { row_counts }
147 }
148
149 pub fn simulate(&self, statements: &[ParsedStatement]) -> MigrationTimeline {
151 let mut lock_events: Vec<LockEvent> = Vec::new();
152 for stmt in statements {
153 if let Some(ev) = self.lock_for(stmt) {
154 lock_events.push(ev);
155 }
156 }
157
158 let timeline = self.build_timeline(&lock_events);
159 let total_secs = timeline.last().map(|s| s.offset_secs).unwrap_or(0);
160 let max_lock_hold_secs = lock_events
161 .iter()
162 .map(|e| e.estimated_hold_secs)
163 .max()
164 .unwrap_or(0);
165
166 let lock_risk = self.assess_lock_risk(&lock_events, max_lock_hold_secs);
167
168 MigrationTimeline {
169 total_secs,
170 steps: timeline,
171 lock_events,
172 lock_risk,
173 max_lock_hold_secs,
174 }
175 }
176
177 fn lock_for(&self, stmt: &ParsedStatement) -> Option<LockEvent> {
180 match stmt {
181 ParsedStatement::DropTable { tables, .. } => {
183 let hold = self.row_based_hold(tables, 1, 5);
184 Some(LockEvent {
185 statement: format!("DROP TABLE {}", tables.join(", ")),
186 tables: tables.clone(),
187 lock_mode: LockMode::AccessExclusive,
188 blocks_reads: true,
189 blocks_writes: true,
190 impact: LockMode::AccessExclusive.impact().to_string(),
191 estimated_hold_secs: hold,
192 safe_alternative: None, })
194 }
195
196 ParsedStatement::AlterTableAddColumn { table, column } => {
198 let is_instant = column.nullable || column.has_default;
202 let hold = if is_instant {
203 1
204 } else {
205 self.row_based_hold(std::slice::from_ref(table), 1, 8)
206 };
207 Some(LockEvent {
208 statement: format!("ALTER TABLE {} ADD COLUMN {}", table, column.name),
209 tables: vec![table.clone()],
210 lock_mode: LockMode::AccessExclusive,
211 blocks_reads: true,
212 blocks_writes: true,
213 impact: LockMode::AccessExclusive.impact().to_string(),
214 estimated_hold_secs: hold,
215 safe_alternative: if !column.nullable && !column.has_default {
216 Some(format!(
217 "ALTER TABLE {} ADD COLUMN {} {} DEFAULT <expr> NOT NULL; \
218 -- then in a separate migration: ALTER TABLE {} ALTER COLUMN {} DROP DEFAULT",
219 table, column.name, column.data_type, table, column.name
220 ))
221 } else {
222 None
223 },
224 })
225 }
226
227 ParsedStatement::AlterTableDropColumn { table, column, .. } => {
229 let hold = self.row_based_hold(std::slice::from_ref(table), 1, 10);
230 Some(LockEvent {
231 statement: format!("ALTER TABLE {} DROP COLUMN {}", table, column),
232 tables: vec![table.clone()],
233 lock_mode: LockMode::AccessExclusive,
234 blocks_reads: true,
235 blocks_writes: true,
236 impact: LockMode::AccessExclusive.impact().to_string(),
237 estimated_hold_secs: hold,
238 safe_alternative: Some(
239 "Phase 1: remove all app code that reads this column → deploy. \
240 Phase 2: run this DROP COLUMN in the next migration."
241 .to_string(),
242 ),
243 })
244 }
245
246 ParsedStatement::AlterTableAlterColumnType { table, column, new_type } => {
249 let hold = self.row_based_hold(std::slice::from_ref(table), 2, 15);
250 Some(LockEvent {
251 statement: format!(
252 "ALTER TABLE {} ALTER COLUMN {} TYPE {}",
253 table, column, new_type
254 ),
255 tables: vec![table.clone()],
256 lock_mode: LockMode::AccessExclusive,
257 blocks_reads: true,
258 blocks_writes: true,
259 impact: LockMode::AccessExclusive.impact().to_string(),
260 estimated_hold_secs: hold,
261 safe_alternative: Some(format!(
262 "Background migration: \
263 1. ADD COLUMN {col}_new {ty} \
264 2. Backfill in batches: UPDATE {tbl} SET {col}_new = {col}::text LIMIT 10000 \
265 3. Deploy app with dual-read \
266 4. DROP COLUMN {col}; RENAME COLUMN {col}_new TO {col}",
267 col = column, ty = new_type, tbl = table
268 )),
269 })
270 }
271
272 ParsedStatement::AlterTableSetNotNull { table, column } => {
275 let hold = self.row_based_hold(std::slice::from_ref(table), 1, 10);
276 Some(LockEvent {
277 statement: format!(
278 "ALTER TABLE {} ALTER COLUMN {} SET NOT NULL",
279 table, column
280 ),
281 tables: vec![table.clone()],
282 lock_mode: LockMode::AccessExclusive,
283 blocks_reads: true,
284 blocks_writes: true,
285 impact: LockMode::AccessExclusive.impact().to_string(),
286 estimated_hold_secs: hold,
287 safe_alternative: Some(format!(
288 "Use a NOT VALID CHECK constraint first: \
289 ALTER TABLE {tbl} ADD CONSTRAINT {col}_not_null CHECK ({col} IS NOT NULL) NOT VALID; \
290 -- validate in background: \
291 ALTER TABLE {tbl} VALIDATE CONSTRAINT {col}_not_null;",
292 tbl = table, col = column
293 )),
294 })
295 }
296
297 ParsedStatement::CreateIndex {
299 index_name,
300 table,
301 columns,
302 unique,
303 concurrently,
304 } if !concurrently => {
305 let hold = self.row_based_hold(std::slice::from_ref(table), 1, 20);
306 let name = index_name.as_deref().unwrap_or("unnamed");
307 Some(LockEvent {
308 statement: format!(
309 "CREATE {}INDEX {} ON {} ({})",
310 if *unique { "UNIQUE " } else { "" },
311 name,
312 table,
313 columns.join(", ")
314 ),
315 tables: vec![table.clone()],
316 lock_mode: LockMode::Share,
317 blocks_reads: false,
318 blocks_writes: true,
319 impact: LockMode::Share.impact().to_string(),
320 estimated_hold_secs: hold,
321 safe_alternative: Some(format!(
322 "CREATE {}INDEX CONCURRENTLY {} ON {} ({});",
323 if *unique { "UNIQUE " } else { "" },
324 name,
325 table,
326 columns.join(", ")
327 )),
328 })
329 }
330
331 ParsedStatement::CreateIndex {
333 index_name,
334 table,
335 columns,
336 unique,
337 concurrently: true,
338 } => {
339 let hold = self.row_based_hold(std::slice::from_ref(table), 2, 30);
341 let name = index_name.as_deref().unwrap_or("unnamed");
342 Some(LockEvent {
343 statement: format!(
344 "CREATE {}INDEX CONCURRENTLY {} ON {} ({})",
345 if *unique { "UNIQUE " } else { "" },
346 name,
347 table,
348 columns.join(", ")
349 ),
350 tables: vec![table.clone()],
351 lock_mode: LockMode::ShareUpdateExclusive,
352 blocks_reads: false,
353 blocks_writes: false,
354 impact: LockMode::ShareUpdateExclusive.impact().to_string(),
355 estimated_hold_secs: hold,
356 safe_alternative: None, })
358 }
359
360 ParsedStatement::AlterTableAddForeignKey { table, fk } => {
362 let hold = self.row_based_hold(&[table.clone(), fk.ref_table.clone()], 1, 5);
363 Some(LockEvent {
364 statement: format!(
365 "ALTER TABLE {} ADD FOREIGN KEY ({}) REFERENCES {}({})",
366 table,
367 fk.columns.join(", "),
368 fk.ref_table,
369 fk.ref_columns.join(", ")
370 ),
371 tables: vec![table.clone(), fk.ref_table.clone()],
372 lock_mode: LockMode::ShareRowExclusive,
373 blocks_reads: false,
374 blocks_writes: true,
375 impact: LockMode::ShareRowExclusive.impact().to_string(),
376 estimated_hold_secs: hold,
377 safe_alternative: Some(format!(
378 "ALTER TABLE {} ADD CONSTRAINT {} FOREIGN KEY ({}) REFERENCES {}({}) NOT VALID; \
379 -- then in a separate session (low traffic): \
380 ALTER TABLE {} VALIDATE CONSTRAINT {};",
381 table,
382 fk.constraint_name.as_deref().unwrap_or("fk_name"),
383 fk.columns.join(", "),
384 fk.ref_table,
385 fk.ref_columns.join(", "),
386 table,
387 fk.constraint_name.as_deref().unwrap_or("fk_name"),
388 )),
389 })
390 }
391
392 ParsedStatement::AlterTableRenameTable { old, new } => {
394 Some(LockEvent {
395 statement: format!("ALTER TABLE {} RENAME TO {}", old, new),
396 tables: vec![old.clone()],
397 lock_mode: LockMode::AccessExclusive,
398 blocks_reads: true,
399 blocks_writes: true,
400 impact: LockMode::AccessExclusive.impact().to_string(),
401 estimated_hold_secs: 1,
402 safe_alternative: Some(format!(
403 "Create a view: CREATE VIEW {} AS SELECT * FROM {}; \
404 then migrate app code to use the new name before dropping the view.",
405 new, old
406 )),
407 })
408 }
409
410 ParsedStatement::AlterTableRenameColumn { table, old, new } => {
412 Some(LockEvent {
413 statement: format!(
414 "ALTER TABLE {} RENAME COLUMN {} TO {}",
415 table, old, new
416 ),
417 tables: vec![table.clone()],
418 lock_mode: LockMode::AccessExclusive,
419 blocks_reads: true,
420 blocks_writes: true,
421 impact: LockMode::AccessExclusive.impact().to_string(),
422 estimated_hold_secs: 1,
423 safe_alternative: Some(format!(
424 "Phase 1: ADD COLUMN {new} type; sync writes in app code to both columns. \
425 Phase 2: backfill. Phase 3: remove old references. Phase 4: DROP COLUMN {old}.",
426 old = old, new = new
427 )),
428 })
429 }
430
431 ParsedStatement::DropIndex { names, concurrently, .. } => {
433 Some(LockEvent {
434 statement: format!("DROP INDEX {}", names.join(", ")),
435 tables: vec![],
436 lock_mode: if *concurrently {
437 LockMode::ShareUpdateExclusive
438 } else {
439 LockMode::AccessExclusive
440 },
441 blocks_reads: !concurrently,
442 blocks_writes: true,
443 impact: if *concurrently {
444 LockMode::ShareUpdateExclusive.impact().to_string()
445 } else {
446 LockMode::AccessExclusive.impact().to_string()
447 },
448 estimated_hold_secs: 1,
449 safe_alternative: if !concurrently {
450 Some(format!("DROP INDEX CONCURRENTLY {};", names.join(", ")))
451 } else {
452 None
453 },
454 })
455 }
456
457 ParsedStatement::AlterTableDropConstraint { table, constraint, .. } => {
459 Some(LockEvent {
460 statement: format!(
461 "ALTER TABLE {} DROP CONSTRAINT {}",
462 table, constraint
463 ),
464 tables: vec![table.clone()],
465 lock_mode: LockMode::AccessExclusive,
466 blocks_reads: true,
467 blocks_writes: true,
468 impact: LockMode::AccessExclusive.impact().to_string(),
469 estimated_hold_secs: 1,
470 safe_alternative: None,
471 })
472 }
473
474 ParsedStatement::AlterTableAddPrimaryKey { table, columns } => {
475 let hold = self.row_based_hold(std::slice::from_ref(table), 2, 25);
476 Some(LockEvent {
477 statement: format!(
478 "ALTER TABLE {} ADD PRIMARY KEY ({})",
479 table,
480 columns.join(", ")
481 ),
482 tables: vec![table.clone()],
483 lock_mode: LockMode::AccessExclusive,
484 blocks_reads: true,
485 blocks_writes: true,
486 impact: LockMode::AccessExclusive.impact().to_string(),
487 estimated_hold_secs: hold,
488 safe_alternative: Some(format!(
489 "CREATE UNIQUE INDEX CONCURRENTLY pkey_idx ON {} ({});\n\
490 ALTER TABLE {} ADD CONSTRAINT {}_pkey PRIMARY KEY USING INDEX pkey_idx;",
491 table,
492 columns.join(", "),
493 table,
494 table,
495 )),
496 })
497 }
498
499 _ => None,
501 }
502 }
503
504 fn row_based_hold(&self, tables: &[String], base_secs: u64, secs_per_million: u64) -> u64 {
508 let max_rows: u64 = tables
509 .iter()
510 .filter_map(|t| self.row_counts.get(t))
511 .max()
512 .copied()
513 .unwrap_or(0);
514
515 let millions = max_rows / 1_000_000;
516 base_secs + millions * secs_per_million
517 }
518
519 fn build_timeline(&self, events: &[LockEvent]) -> Vec<TimelineStep> {
521 let mut steps: Vec<TimelineStep> = Vec::new();
522 let mut offset: u64 = 0;
523
524 steps.push(TimelineStep {
525 offset_secs: 0,
526 event: "Migration started".to_string(),
527 lock: None,
528 tables: vec![],
529 });
530
531 for ev in events {
532 steps.push(TimelineStep {
534 offset_secs: offset,
535 event: format!("Acquire {} lock — {}", ev.lock_mode.name(), ev.statement),
536 lock: Some(ev.lock_mode),
537 tables: ev.tables.clone(),
538 });
539
540 let exec_offset = offset + ev.estimated_hold_secs / 2 + 1;
542 steps.push(TimelineStep {
543 offset_secs: exec_offset,
544 event: format!("Execute: {}", ev.statement),
545 lock: Some(ev.lock_mode),
546 tables: ev.tables.clone(),
547 });
548
549 offset += ev.estimated_hold_secs;
551 steps.push(TimelineStep {
552 offset_secs: offset,
553 event: format!("Release {} lock", ev.lock_mode.name()),
554 lock: None,
555 tables: ev.tables.clone(),
556 });
557
558 offset += 1; }
560
561 steps.push(TimelineStep {
562 offset_secs: offset,
563 event: "Migration complete".to_string(),
564 lock: None,
565 tables: vec![],
566 });
567
568 steps
569 }
570
571 fn assess_lock_risk(&self, events: &[LockEvent], max_hold: u64) -> RiskLevel {
573 let has_read_block = events.iter().any(|e| e.blocks_reads);
574 let has_write_block = events.iter().any(|e| e.blocks_writes);
575
576 match (has_read_block, has_write_block, max_hold) {
577 (true, _, secs) if secs > 30 => RiskLevel::Critical,
578 (true, _, _) => RiskLevel::High,
579 (false, true, secs) if secs > 60 => RiskLevel::High,
580 (false, true, _) => RiskLevel::Medium,
581 _ => RiskLevel::Low,
582 }
583 }
584}