Skip to main content

forge_runtime/pg/migration/
runner.rs

1//! Migration runner with mesh-safe locking.
2//!
3//! Ensures only one node runs migrations at a time using PostgreSQL advisory locks.
4//!
5//! # Migration Types
6//!
7//! 1. **System migrations** (`__forge_vXXX`): Internal FORGE schema changes.
8//!    Versioned numerically and always run before user migrations.
9//!
10//! 2. **User migrations** (`XXXX_name.sql`): Application-specific schema changes.
11//!    Sorted alphabetically by name.
12
13// Migrations execute user-supplied DDL/DML strings, so the compile-time query
14// macros can't see the schema for those statements. Runtime sqlx::query is
15// required for the user-SQL execution path only; the runner's own bookkeeping
16// (forge_system_migrations) uses compile-time macros.
17#![allow(clippy::disallowed_methods)]
18
19use forge_core::error::{ForgeError, Result};
20use sqlx::{PgPool, Postgres};
21use std::collections::HashMap;
22use std::path::Path;
23use std::time::{Duration, Instant};
24use tracing::{debug, info, warn};
25
26use super::builtin::extract_version;
27
28/// Advisory lock ID for migrations, derived from "FORGE" in ASCII.
29const MIGRATION_LOCK_ID: i64 = 0x464F524745;
30
31/// Bootstrap SQL embedded at compile time. Creates `forge_system_migrations`.
32/// Runs idempotently on every startup before any tracked migration applies.
33const BOOTSTRAP_SQL: &str = include_str!("../../../migrations/system/v000_bootstrap.sql");
34
35/// Tunable knobs for the migration runner. Defaults match the deploy-friendly
36/// values: bounded lock-acquire wait so a stuck lock fails CI loudly instead of
37/// hanging silently.
38#[derive(Debug, Clone)]
39pub struct MigrationConfig {
40    /// Overall deadline for acquiring the migration advisory lock.
41    pub lock_acquire_timeout: Duration,
42    /// Polling interval between `pg_try_advisory_lock` attempts.
43    pub lock_poll_interval: Duration,
44    /// Cadence for emitting WARN logs identifying the current lock holder.
45    pub lock_warn_interval: Duration,
46}
47
48impl Default for MigrationConfig {
49    fn default() -> Self {
50        Self {
51            lock_acquire_timeout: Duration::from_secs(300),
52            lock_poll_interval: Duration::from_secs(2),
53            lock_warn_interval: Duration::from_secs(30),
54        }
55    }
56}
57
58/// A single migration with up SQL.
59#[derive(Debug, Clone)]
60pub struct Migration {
61    /// Stable version identifier — for system migrations `__forge_vNNN`,
62    /// for user migrations the file stem like `0001_create_users`.
63    pub version: String,
64    /// SQL to execute for upgrade.
65    pub up_sql: String,
66    /// Wrap the migration in a transaction. Defaults to true.
67    /// Set to false for statements PostgreSQL refuses to run inside a tx
68    /// (`CREATE INDEX CONCURRENTLY`, `ALTER TYPE ... ADD VALUE`, `VACUUM`,
69    /// `REINDEX CONCURRENTLY`) via the `-- @transactional false` directive.
70    pub transactional: bool,
71}
72
73impl Migration {
74    /// Create a migration.
75    pub fn new(version: impl Into<String>, sql: impl Into<String>) -> Self {
76        Self {
77            version: version.into(),
78            up_sql: sql.into(),
79            transactional: true,
80        }
81    }
82
83    /// Parse migration content.
84    ///
85    /// Recognized directives in the leading comment block (first 20 lines):
86    /// - `-- @up` (legacy marker, stripped if present)
87    /// - `-- @transactional false` opts out of the wrapping transaction
88    ///
89    /// The `@` prefix is required so plain English comments
90    /// (`-- transactional design ...`) don't accidentally trigger directives.
91    pub fn parse(version: impl Into<String>, content: &str) -> Self {
92        let mut transactional = true;
93        for line in content.lines().take(20) {
94            let line = line.trim();
95            if line.is_empty() {
96                continue;
97            }
98            if !line.starts_with("--") {
99                break;
100            }
101            let body = line.trim_start_matches("--").trim();
102            let Some(directive) = body.strip_prefix('@') else {
103                continue;
104            };
105            if let Some(rest) = directive.strip_prefix("transactional") {
106                let val = rest
107                    .trim()
108                    .trim_start_matches('=')
109                    .trim()
110                    .to_ascii_lowercase();
111                transactional = !matches!(val.as_str(), "false" | "no" | "0");
112            }
113        }
114
115        let up_sql = content
116            .replace("-- @up", "")
117            .replace("--@up", "")
118            .replace("-- @UP", "")
119            .replace("--@UP", "")
120            .trim()
121            .to_string();
122        Self {
123            version: version.into(),
124            up_sql,
125            transactional,
126        }
127    }
128}
129
130/// Migration runner that handles both built-in and user migrations.
131pub struct MigrationRunner {
132    pool: PgPool,
133    config: MigrationConfig,
134}
135
136impl MigrationRunner {
137    pub fn new(pool: PgPool) -> Self {
138        Self::with_config(pool, MigrationConfig::default())
139    }
140
141    pub fn with_config(pool: PgPool, config: MigrationConfig) -> Self {
142        Self { pool, config }
143    }
144
145    /// Run all pending migrations with mesh-safe locking.
146    pub async fn run(&self, user_migrations: Vec<Migration>) -> Result<()> {
147        let mut lock_conn = self.acquire_lock_connection().await?;
148
149        let result = self.run_migrations_inner(user_migrations).await;
150
151        if let Err(e) = self.release_lock_connection(&mut lock_conn).await {
152            warn!("Failed to release migration lock: {}", e);
153        }
154
155        result
156    }
157
158    async fn run_migrations_inner(&self, user_migrations: Vec<Migration>) -> Result<()> {
159        self.bootstrap_tracking_table().await?;
160
161        let applied = self.applied_versions().await?;
162        debug!(
163            "Already applied migrations: {:?}",
164            applied.keys().collect::<Vec<_>>()
165        );
166
167        let max_applied_version = self.get_max_system_version(&applied);
168        debug!("Max applied system version: {:?}", max_applied_version);
169
170        let system_migrations = super::builtin::get_system_migrations();
171
172        let max_known_version = system_migrations.iter().map(|m| m.version).max();
173        if let (Some(applied_max), Some(known_max)) = (max_applied_version, max_known_version)
174            && applied_max > known_max
175        {
176            return Err(ForgeError::internal(format!(
177                "Database is at system migration v{applied_max} but this binary only knows up to v{known_max}. \
178                 Refusing to start — running an older binary on a newer schema risks data loss. \
179                 Upgrade the binary or restore the database to a compatible version."
180            )));
181        }
182
183        let known_user_versions: std::collections::HashSet<&str> =
184            user_migrations.iter().map(|m| m.version.as_str()).collect();
185        let mut unknown_applied: Vec<&str> = applied
186            .keys()
187            .filter(|v| {
188                !super::builtin::is_system_migration(v) && !known_user_versions.contains(v.as_str())
189            })
190            .map(|v| v.as_str())
191            .collect();
192        if !unknown_applied.is_empty() {
193            unknown_applied.sort_unstable();
194            return Err(ForgeError::internal(format!(
195                "Database has {} user migration(s) this binary does not know about: [{}]. \
196                 Refusing to start — the database schema is ahead of this binary. \
197                 Deploy the latest binary version.",
198                unknown_applied.len(),
199                unknown_applied.join(", "),
200            )));
201        }
202
203        let mut new_migrations_applied = false;
204
205        for sys_migration in system_migrations {
206            let migration = sys_migration.to_migration();
207            if let Some(recorded) = applied.get(&migration.version) {
208                verify_checksum(&migration, recorded)?;
209                debug!(
210                    "Skipping system migration {} (already applied, checksum verified)",
211                    migration.version
212                );
213                continue;
214            }
215            info!(
216                "Applying system migration: {} ({})",
217                migration.version, sys_migration.description
218            );
219            self.apply_migration(&migration).await?;
220            new_migrations_applied = true;
221        }
222
223        for migration in user_migrations {
224            if let Some(recorded) = applied.get(&migration.version) {
225                verify_checksum(&migration, recorded)?;
226                debug!(
227                    "Skipping user migration {} (already applied, checksum verified)",
228                    migration.version
229                );
230                continue;
231            }
232            self.apply_migration(&migration).await?;
233            new_migrations_applied = true;
234        }
235
236        if new_migrations_applied {
237            self.notify_schema_changed().await;
238        }
239
240        Ok(())
241    }
242
243    /// Broadcast a schema-changed notification over `forge_schema_changed` so
244    /// any node listening (monitoring, ops tooling, or a future auto-restart
245    /// mechanism) can react without polling.
246    ///
247    /// This is best-effort: a failure to notify must never abort the migration
248    /// sequence — the schema changes already committed are the authoritative
249    /// state. We log the error and move on.
250    async fn notify_schema_changed(&self) {
251        match sqlx::query("SELECT pg_notify('forge_schema_changed', 'migrations_applied')")
252            .execute(&self.pool)
253            .await
254        {
255            Ok(_) => debug!("Schema change notification sent"),
256            Err(e) => warn!(error = %e, "Failed to send schema change notification (non-fatal)"),
257        }
258    }
259
260    fn get_max_system_version(&self, applied: &HashMap<String, String>) -> Option<u32> {
261        applied.keys().filter_map(|v| extract_version(v)).max()
262    }
263
264    async fn acquire_lock_connection(&self) -> Result<sqlx::pool::PoolConnection<Postgres>> {
265        debug!(
266            timeout_secs = self.config.lock_acquire_timeout.as_secs(),
267            "Acquiring migration lock..."
268        );
269        let mut conn = self
270            .pool
271            .acquire()
272            .await
273            .map_err(|e| ForgeError::internal_with("Failed to acquire lock connection", e))?;
274
275        let classid = (MIGRATION_LOCK_ID >> 32) as i32;
276        let objid = (MIGRATION_LOCK_ID & 0xFFFF_FFFF) as i32;
277
278        let deadline = Instant::now() + self.config.lock_acquire_timeout;
279        // Backdate so the first contended iteration logs immediately.
280        let mut last_warn = Instant::now()
281            .checked_sub(self.config.lock_warn_interval)
282            .unwrap_or_else(Instant::now);
283
284        loop {
285            let acquired = sqlx::query_scalar!(
286                r#"SELECT pg_try_advisory_lock($1) AS "acquired!""#,
287                MIGRATION_LOCK_ID
288            )
289            .fetch_one(&mut *conn)
290            .await
291            .map_err(|e| ForgeError::internal_with("Failed to attempt migration lock", e))?;
292
293            if acquired {
294                debug!("Migration lock acquired");
295                return Ok(conn);
296            }
297
298            let now = Instant::now();
299            if now >= deadline {
300                let holder = lookup_lock_holder(&mut conn, classid, objid).await;
301                return Err(ForgeError::internal(format!(
302                    "Timed out after {:?} waiting for migration lock (holder pid: {:?}). \
303                     Another node is likely running migrations or stalled holding the lock.",
304                    self.config.lock_acquire_timeout, holder
305                )));
306            }
307
308            if now.duration_since(last_warn) >= self.config.lock_warn_interval {
309                let holder = lookup_lock_holder(&mut conn, classid, objid).await;
310                warn!(
311                    holder_pid = ?holder,
312                    "Still waiting for migration lock — another node is holding it"
313                );
314                last_warn = now;
315            }
316
317            tokio::time::sleep(self.config.lock_poll_interval).await;
318        }
319    }
320
321    async fn release_lock_connection(
322        &self,
323        conn: &mut sqlx::pool::PoolConnection<Postgres>,
324    ) -> Result<()> {
325        sqlx::query_scalar!("SELECT pg_advisory_unlock($1)", MIGRATION_LOCK_ID)
326            .fetch_one(&mut **conn)
327            .await
328            .map_err(|e| ForgeError::internal_with("Failed to release migration lock", e))?;
329        debug!("Migration lock released");
330        Ok(())
331    }
332
333    async fn bootstrap_tracking_table(&self) -> Result<()> {
334        let mut conn =
335            self.pool.acquire().await.map_err(|e| {
336                ForgeError::internal_with("Failed to acquire bootstrap connection", e)
337            })?;
338        for statement in split_sql_statements(BOOTSTRAP_SQL) {
339            let stmt = statement.trim();
340            if is_empty_or_comment_only(stmt) {
341                continue;
342            }
343            sqlx::query(stmt)
344                .execute(&mut *conn)
345                .await
346                .map_err(|e| ForgeError::internal_with("Bootstrap failed", e))?;
347        }
348        Ok(())
349    }
350
351    async fn applied_versions(&self) -> Result<HashMap<String, String>> {
352        let rows = sqlx::query!("SELECT version, checksum FROM forge_system_migrations")
353            .fetch_all(&self.pool)
354            .await
355            .map_err(|e| ForgeError::internal_with("Failed to get applied migrations", e))?;
356
357        Ok(rows
358            .into_iter()
359            .map(|row| (row.version, row.checksum))
360            .collect())
361    }
362
363    async fn apply_migration(&self, migration: &Migration) -> Result<()> {
364        if migration.transactional {
365            self.apply_transactional(migration).await
366        } else {
367            self.apply_non_transactional(migration).await
368        }
369    }
370
371    async fn apply_transactional(&self, migration: &Migration) -> Result<()> {
372        info!("Applying migration: {}", migration.version);
373        let start = Instant::now();
374
375        let mut tx = self.pool.begin().await.map_err(|e| {
376            ForgeError::internal_with(
377                format!(
378                    "Failed to begin migration transaction for '{}'",
379                    migration.version
380                ),
381                e,
382            )
383        })?;
384
385        // SET LOCAL scopes these timeouts to this transaction only.
386        sqlx::query("SET LOCAL lock_timeout = '5s'")
387            .execute(&mut *tx)
388            .await
389            .map_err(|e| ForgeError::internal_with("Failed to set lock_timeout", e))?;
390        sqlx::query("SET LOCAL statement_timeout = '5min'")
391            .execute(&mut *tx)
392            .await
393            .map_err(|e| ForgeError::internal_with("Failed to set statement_timeout", e))?;
394
395        for statement in split_sql_statements(&migration.up_sql) {
396            let statement = statement.trim();
397            if is_empty_or_comment_only(statement) {
398                continue;
399            }
400
401            sqlx::query(statement)
402                .execute(&mut *tx)
403                .await
404                .map_err(|e| {
405                    ForgeError::internal_with(
406                        format!("Failed to apply migration '{}'", migration.version),
407                        e,
408                    )
409                })?;
410        }
411
412        let checksum = crate::stable_hash::sha256_hex(migration.up_sql.as_bytes());
413        sqlx::query!(
414            "INSERT INTO forge_system_migrations (version, checksum) VALUES ($1, $2)",
415            migration.version,
416            checksum,
417        )
418        .execute(&mut *tx)
419        .await
420        .map_err(|e| {
421            ForgeError::internal_with(
422                format!("Failed to record migration '{}'", migration.version),
423                e,
424            )
425        })?;
426
427        tx.commit().await.map_err(|e| {
428            ForgeError::internal_with(
429                format!("Failed to commit migration '{}'", migration.version),
430                e,
431            )
432        })?;
433
434        info!(
435            "Migration applied: {} ({:?})",
436            migration.version,
437            start.elapsed()
438        );
439        Ok(())
440    }
441
442    /// Run a migration without wrapping it in a transaction. PostgreSQL
443    /// rejects statements like `CREATE INDEX CONCURRENTLY`,
444    /// `ALTER TYPE ... ADD VALUE`, `VACUUM`, and `REINDEX CONCURRENTLY`
445    /// inside a transaction block, so opt-in migrations skip the BEGIN.
446    ///
447    /// Tradeoffs the migration author must accept:
448    /// - A partial failure leaves the schema half-applied and the
449    ///   bookkeeping row missing, so the next run will retry from the top.
450    /// - Even if all DDL succeeds, the bookkeeping `INSERT` runs on a
451    ///   *fresh* pool connection — if that insert fails, the migration is
452    ///   re-run on the next startup despite already having taken effect.
453    ///
454    /// Migrations using this mode must be authored idempotently
455    /// (`IF NOT EXISTS`, `ADD VALUE IF NOT EXISTS`, and so on).
456    async fn apply_non_transactional(&self, migration: &Migration) -> Result<()> {
457        info!(
458            "Applying non-transactional migration: {}",
459            migration.version
460        );
461        let start = Instant::now();
462
463        let mut conn = self.pool.acquire().await.map_err(|e| {
464            ForgeError::internal_with(
465                format!(
466                    "Failed to acquire connection for migration '{}'",
467                    migration.version
468                ),
469                e,
470            )
471        })?;
472
473        // Session-level SET without LOCAL — no transaction to scope to.
474        // Statement timeout is 30 min because CREATE INDEX CONCURRENTLY and
475        // REINDEX CONCURRENTLY routinely run 10+ minutes on production tables.
476        sqlx::query("SET lock_timeout = '5s'")
477            .execute(&mut *conn)
478            .await
479            .map_err(|e| ForgeError::internal_with("Failed to set lock_timeout", e))?;
480        sqlx::query("SET statement_timeout = '30min'")
481            .execute(&mut *conn)
482            .await
483            .map_err(|e| ForgeError::internal_with("Failed to set statement_timeout", e))?;
484
485        let exec_result: Result<()> = async {
486            for statement in split_sql_statements(&migration.up_sql) {
487                let statement = statement.trim();
488                if is_empty_or_comment_only(statement) {
489                    continue;
490                }
491                sqlx::query(statement)
492                    .execute(&mut *conn)
493                    .await
494                    .map_err(|e| {
495                        ForgeError::internal_with(
496                            format!("Failed to apply migration '{}'", migration.version),
497                            e,
498                        )
499                    })?;
500            }
501            Ok(())
502        }
503        .await;
504
505        // Always reset before returning the connection to the pool — even on
506        // failure. A failed RESET is rare but operators need visibility into it.
507        if let Err(e) = sqlx::query("RESET lock_timeout").execute(&mut *conn).await {
508            warn!(error = %e, "Failed to RESET lock_timeout after non-tx migration");
509        }
510        if let Err(e) = sqlx::query("RESET statement_timeout")
511            .execute(&mut *conn)
512            .await
513        {
514            warn!(error = %e, "Failed to RESET statement_timeout after non-tx migration");
515        }
516        drop(conn);
517
518        exec_result?;
519
520        let checksum = crate::stable_hash::sha256_hex(migration.up_sql.as_bytes());
521        sqlx::query!(
522            "INSERT INTO forge_system_migrations (version, checksum) VALUES ($1, $2)",
523            migration.version,
524            checksum,
525        )
526        .execute(&self.pool)
527        .await
528        .map_err(|e| {
529            ForgeError::internal_with(
530                format!("Failed to record migration '{}'", migration.version),
531                e,
532            )
533        })?;
534
535        info!(
536            "Non-transactional migration applied: {} ({:?})",
537            migration.version,
538            start.elapsed()
539        );
540        Ok(())
541    }
542
543    /// Get the status of all migrations. Each applied row is annotated with a
544    /// `DriftStatus` so `forge migrate status` can distinguish three cases
545    /// without triggering a full apply:
546    ///
547    /// - source still matches the recorded checksum,
548    /// - source has been edited (drifted),
549    /// - source file is gone entirely (the version is no longer in
550    ///   `available` — usually a deleted migration file).
551    pub async fn status(&self, available: &[Migration]) -> Result<MigrationStatus> {
552        self.bootstrap_tracking_table().await?;
553
554        let applied = self.applied_versions().await?;
555
556        let available_by_version: HashMap<&str, &Migration> =
557            available.iter().map(|m| (m.version.as_str(), m)).collect();
558
559        let applied_list: Vec<AppliedMigration> = sqlx::query!(
560            "SELECT version, applied_at, checksum FROM forge_system_migrations ORDER BY applied_at ASC"
561        )
562        .fetch_all(&self.pool)
563        .await
564        .map_err(|e| ForgeError::internal_with("Failed to get migrations", e))?
565        .into_iter()
566        .map(|row| {
567            let drift = match available_by_version.get(row.version.as_str()) {
568                None => DriftStatus::SourceMissing,
569                Some(m) => {
570                    let computed = crate::stable_hash::sha256_hex(m.up_sql.as_bytes());
571                    if computed == row.checksum {
572                        DriftStatus::Unchanged
573                    } else {
574                        DriftStatus::Drifted {
575                            current_checksum: computed,
576                        }
577                    }
578                }
579            };
580            AppliedMigration {
581                version: row.version,
582                applied_at: row.applied_at,
583                checksum: row.checksum,
584                drift,
585            }
586        })
587        .collect();
588
589        let pending: Vec<String> = available
590            .iter()
591            .filter(|m| !applied.contains_key(&m.version))
592            .map(|m| m.version.clone())
593            .collect();
594
595        Ok(MigrationStatus {
596            applied: applied_list,
597            pending,
598        })
599    }
600}
601
602/// Information about an applied migration.
603#[derive(Debug, Clone)]
604pub struct AppliedMigration {
605    pub version: String,
606    pub applied_at: chrono::DateTime<chrono::Utc>,
607    pub checksum: String,
608    /// Relationship between the recorded checksum and the on-disk source.
609    /// `forge migrate status` uses this to flag tampering or missing files
610    /// without attempting a full apply.
611    pub drift: DriftStatus,
612}
613
614/// Whether the on-disk source for an already-applied migration still matches
615/// the checksum recorded at apply time.
616#[derive(Debug, Clone, PartialEq, Eq)]
617pub enum DriftStatus {
618    /// Source file is present and its checksum matches the bookkeeping row.
619    Unchanged,
620    /// Source file is present but its checksum differs from the bookkeeping
621    /// row — the file was edited after being applied.
622    Drifted {
623        /// SHA-256 of the on-disk source. Different from
624        /// `AppliedMigration::checksum`, which holds the originally-recorded
625        /// value.
626        current_checksum: String,
627    },
628    /// Bookkeeping row exists but no migration with this version was supplied
629    /// to `status()` — typically the file has been deleted from the
630    /// migrations directory.
631    SourceMissing,
632}
633
634/// Status of migrations.
635#[derive(Debug, Clone)]
636pub struct MigrationStatus {
637    pub applied: Vec<AppliedMigration>,
638    pub pending: Vec<String>,
639}
640
641/// Verify that a migration's source SQL matches the checksum recorded
642/// when it was originally applied. This catches the silent-drift case
643/// where someone edits an already-applied migration file: without the
644/// check, `MigrationRunner` would happily skip the migration based on
645/// version alone, and the file's new contents would never run.
646fn verify_checksum(migration: &Migration, recorded: &str) -> Result<()> {
647    let computed = crate::stable_hash::sha256_hex(migration.up_sql.as_bytes());
648    if computed != recorded {
649        return Err(ForgeError::internal(format!(
650            "Migration '{}' has changed since it was applied. \
651             Recorded checksum: {recorded}, but current file checksum: {computed}. \
652             Migrations are immutable once applied — revert the file or create a new migration.",
653            migration.version
654        )));
655    }
656    Ok(())
657}
658
659/// Look up the PID currently holding the migration advisory lock, if any.
660/// Returns `None` if the holder released between the failed try-lock and
661/// this lookup, or if the SELECT itself failed — both are best-effort
662/// diagnostics, never used as a correctness signal.
663async fn lookup_lock_holder(
664    conn: &mut sqlx::pool::PoolConnection<Postgres>,
665    classid: i32,
666    objid: i32,
667) -> Option<i32> {
668    sqlx::query_scalar!(
669        r#"SELECT pid AS "pid!" FROM pg_locks
670           WHERE locktype = 'advisory'
671             AND classid::int = $1
672             AND objid::int = $2
673             AND granted
674           LIMIT 1"#,
675        classid,
676        objid
677    )
678    .fetch_optional(&mut **conn)
679    .await
680    .ok()
681    .flatten()
682}
683
684/// True for statements that are blank or made up entirely of `--` comment
685/// lines. Such "statements" come out of `split_sql_statements` when the
686/// migration ends with a trailing comment and would otherwise be sent to
687/// PostgreSQL as an empty query.
688fn is_empty_or_comment_only(stmt: &str) -> bool {
689    stmt.is_empty()
690        || stmt.lines().all(|l| {
691            let l = l.trim();
692            l.is_empty() || l.starts_with("--")
693        })
694}
695
696/// Split SQL into individual statements, respecting dollar-quoted strings.
697/// Handles PL/pgSQL functions that contain semicolons inside $$ delimiters.
698fn split_sql_statements(sql: &str) -> Vec<String> {
699    let mut statements = Vec::new();
700    let mut current = String::new();
701    let mut in_dollar_quote = false;
702    let mut dollar_tag = String::new();
703    let mut in_line_comment = false;
704    let mut in_block_comment = false;
705    let mut in_string_literal = false;
706    let mut chars = sql.chars().peekable();
707
708    while let Some(c) = chars.next() {
709        current.push(c);
710
711        if in_line_comment {
712            if c == '\n' {
713                in_line_comment = false;
714            }
715            continue;
716        }
717
718        if in_block_comment {
719            if c == '*' && chars.peek() == Some(&'/') {
720                current.push(chars.next().expect("peeked char"));
721                in_block_comment = false;
722            }
723            continue;
724        }
725
726        if in_string_literal {
727            if c == '\'' {
728                if chars.peek() == Some(&'\'') {
729                    current.push(chars.next().expect("peeked char"));
730                } else {
731                    in_string_literal = false;
732                }
733            }
734            continue;
735        }
736
737        if in_dollar_quote {
738            if c == '$' {
739                let mut potential_tag = String::from("$");
740                while let Some(&next_c) = chars.peek() {
741                    if next_c == '$' {
742                        potential_tag.push(chars.next().expect("peeked char"));
743                        current.push('$');
744                        break;
745                    } else if next_c.is_alphanumeric() || next_c == '_' {
746                        let ch = chars.next().expect("peeked char");
747                        potential_tag.push(ch);
748                        current.push(ch);
749                    } else {
750                        break;
751                    }
752                }
753                if potential_tag.len() >= 2
754                    && potential_tag.ends_with('$')
755                    && potential_tag == dollar_tag
756                {
757                    in_dollar_quote = false;
758                    dollar_tag.clear();
759                }
760            }
761            continue;
762        }
763
764        if c == '-' && chars.peek() == Some(&'-') {
765            current.push(chars.next().expect("peeked char"));
766            in_line_comment = true;
767            continue;
768        }
769
770        if c == '/' && chars.peek() == Some(&'*') {
771            current.push(chars.next().expect("peeked char"));
772            in_block_comment = true;
773            continue;
774        }
775
776        if c == '\'' {
777            in_string_literal = true;
778            continue;
779        }
780
781        if c == '$' {
782            let mut potential_tag = String::from("$");
783            while let Some(&next_c) = chars.peek() {
784                if next_c == '$' {
785                    potential_tag.push(chars.next().expect("peeked char"));
786                    current.push('$');
787                    break;
788                } else if next_c.is_alphanumeric() || next_c == '_' {
789                    let ch = chars.next().expect("peeked char");
790                    potential_tag.push(ch);
791                    current.push(ch);
792                } else {
793                    break;
794                }
795            }
796            if potential_tag.len() >= 2 && potential_tag.ends_with('$') {
797                in_dollar_quote = true;
798                dollar_tag = potential_tag;
799            }
800            continue;
801        }
802
803        if c == ';' {
804            let stmt = current.trim().trim_end_matches(';').trim().to_string();
805            if !stmt.is_empty() {
806                statements.push(stmt);
807            }
808            current.clear();
809        }
810    }
811
812    let stmt = current.trim().trim_end_matches(';').trim().to_string();
813    if !stmt.is_empty() {
814        statements.push(stmt);
815    }
816
817    statements
818}
819
820/// Load user migrations from a directory.
821///
822/// Migrations must be named with a zero-padded numeric prefix followed by
823/// an underscore and a slug, e.g. `0001_create_users.sql`. All migrations
824/// in the directory must use the same prefix width, otherwise alphabetic
825/// ordering silently desynchronizes from numeric ordering.
826pub fn load_migrations_from_dir(dir: &Path) -> Result<Vec<Migration>> {
827    if !dir.exists() {
828        debug!("Migrations directory does not exist: {:?}", dir);
829        return Ok(Vec::new());
830    }
831
832    let mut migrations = Vec::new();
833    let mut prefix_width: Option<usize> = None;
834    let mut seen_versions: std::collections::HashSet<u64> = std::collections::HashSet::new();
835
836    let entries = std::fs::read_dir(dir).map_err(ForgeError::Io)?;
837
838    for entry in entries {
839        let entry = entry.map_err(ForgeError::Io)?;
840        let path = entry.path();
841
842        if path.extension().map(|e| e == "sql").unwrap_or(false) {
843            let name = path
844                .file_stem()
845                .and_then(|s| s.to_str())
846                .ok_or_else(|| ForgeError::config("Invalid migration filename"))?
847                .to_string();
848
849            let (digits, version) = parse_migration_prefix(&name)?;
850
851            match prefix_width {
852                Some(w) if w != digits.len() => {
853                    return Err(ForgeError::config(format!(
854                        "Inconsistent migration prefix width: {} uses {} digits but earlier migrations use {}. \
855                         Pad all migration filenames to the same width (e.g. 0001_*.sql).",
856                        name,
857                        digits.len(),
858                        w,
859                    )));
860                }
861                None => prefix_width = Some(digits.len()),
862                _ => {}
863            }
864
865            if !seen_versions.insert(version) {
866                return Err(ForgeError::config(format!(
867                    "Duplicate migration version {} for {}",
868                    version, name
869                )));
870            }
871
872            let content = std::fs::read_to_string(&path).map_err(ForgeError::Io)?;
873
874            migrations.push((version, Migration::parse(name, &content)));
875        }
876    }
877
878    migrations.sort_by_key(|(v, _)| *v);
879
880    debug!("Loaded {} user migrations", migrations.len());
881    Ok(migrations.into_iter().map(|(_, m)| m).collect())
882}
883
884/// Split a migration filename like `0001_create_users` into its digit prefix
885/// and parsed numeric version. Errors out for missing or non-numeric prefixes
886/// rather than letting them sort lexically and silently run out of order.
887fn parse_migration_prefix(name: &str) -> Result<(&str, u64)> {
888    let digits_end = name
889        .find(|c: char| !c.is_ascii_digit())
890        .unwrap_or(name.len());
891    if digits_end == 0 {
892        return Err(ForgeError::config(format!(
893            "Migration {} is missing a numeric prefix (expected NNNN_name.sql)",
894            name
895        )));
896    }
897    let digits = name.get(..digits_end).unwrap_or("");
898    let version: u64 = digits.parse().map_err(|_| {
899        ForgeError::config(format!(
900            "Migration {} has an unparseable numeric prefix",
901            name
902        ))
903    })?;
904    Ok((digits, version))
905}
906
907#[cfg(test)]
908#[allow(clippy::unwrap_used, clippy::indexing_slicing, clippy::panic)]
909mod tests {
910    use super::*;
911    use std::collections::HashMap;
912    use std::fs;
913    use tempfile::TempDir;
914
915    #[test]
916    fn test_load_migrations_from_empty_dir() {
917        let dir = TempDir::new().unwrap();
918        let migrations = load_migrations_from_dir(dir.path()).unwrap();
919        assert!(migrations.is_empty());
920    }
921
922    #[test]
923    fn test_load_migrations_from_nonexistent_dir() {
924        let migrations = load_migrations_from_dir(Path::new("/nonexistent/path")).unwrap();
925        assert!(migrations.is_empty());
926    }
927
928    #[test]
929    fn test_load_migrations_sorted() {
930        let dir = TempDir::new().unwrap();
931
932        fs::write(dir.path().join("0002_second.sql"), "SELECT 2;").unwrap();
933        fs::write(dir.path().join("0001_first.sql"), "SELECT 1;").unwrap();
934        fs::write(dir.path().join("0003_third.sql"), "SELECT 3;").unwrap();
935
936        let migrations = load_migrations_from_dir(dir.path()).unwrap();
937        assert_eq!(migrations.len(), 3);
938        assert_eq!(migrations[0].version, "0001_first");
939        assert_eq!(migrations[1].version, "0002_second");
940        assert_eq!(migrations[2].version, "0003_third");
941    }
942
943    #[test]
944    fn test_load_migrations_rejects_mixed_prefix_widths() {
945        let dir = TempDir::new().unwrap();
946        fs::write(dir.path().join("0001_first.sql"), "SELECT 1;").unwrap();
947        fs::write(dir.path().join("2_second.sql"), "SELECT 2;").unwrap();
948
949        let err = load_migrations_from_dir(dir.path()).unwrap_err();
950        let msg = err.to_string();
951        assert!(
952            msg.contains("Inconsistent migration prefix width") || msg.contains("digits"),
953            "unexpected error: {msg}"
954        );
955    }
956
957    #[test]
958    fn test_load_migrations_rejects_missing_prefix() {
959        let dir = TempDir::new().unwrap();
960        fs::write(dir.path().join("create_users.sql"), "SELECT 1;").unwrap();
961
962        let err = load_migrations_from_dir(dir.path()).unwrap_err();
963        assert!(err.to_string().contains("missing a numeric prefix"));
964    }
965
966    #[test]
967    fn test_load_migrations_rejects_duplicate_versions() {
968        let dir = TempDir::new().unwrap();
969        fs::write(dir.path().join("0001_a.sql"), "SELECT 1;").unwrap();
970        fs::write(dir.path().join("0001_b.sql"), "SELECT 2;").unwrap();
971
972        let err = load_migrations_from_dir(dir.path()).unwrap_err();
973        assert!(err.to_string().contains("Duplicate migration version"));
974    }
975
976    #[test]
977    fn test_load_migrations_ignores_non_sql() {
978        let dir = TempDir::new().unwrap();
979
980        fs::write(dir.path().join("0001_migration.sql"), "SELECT 1;").unwrap();
981        fs::write(dir.path().join("readme.txt"), "Not a migration").unwrap();
982        fs::write(dir.path().join("backup.sql.bak"), "Backup").unwrap();
983
984        let migrations = load_migrations_from_dir(dir.path()).unwrap();
985        assert_eq!(migrations.len(), 1);
986        assert_eq!(migrations[0].version, "0001_migration");
987    }
988
989    #[test]
990    fn test_migration_new() {
991        let m = Migration::new("test", "SELECT 1");
992        assert_eq!(m.version, "test");
993        assert_eq!(m.up_sql, "SELECT 1");
994    }
995
996    #[test]
997    fn test_migration_parse_strips_up_marker() {
998        let content = "-- @up\nCREATE TABLE users (id INT);";
999        let m = Migration::parse("0001_test", content);
1000        assert_eq!(m.version, "0001_test");
1001        assert_eq!(m.up_sql, "CREATE TABLE users (id INT);");
1002        assert!(m.transactional, "default should be transactional=true");
1003    }
1004
1005    #[test]
1006    fn test_migration_parse_no_directive_defaults_transactional() {
1007        let m = Migration::parse("0001_test", "CREATE TABLE x (id INT);");
1008        assert!(m.transactional);
1009    }
1010
1011    #[test]
1012    fn test_migration_parse_transactional_false_directive() {
1013        let content = "-- @transactional false\nCREATE INDEX CONCURRENTLY idx_x ON t(c);";
1014        let m = Migration::parse("0001_idx", content);
1015        assert!(!m.transactional);
1016        assert!(m.up_sql.contains("CREATE INDEX CONCURRENTLY"));
1017    }
1018
1019    #[test]
1020    fn test_migration_parse_transactional_no_space_directive() {
1021        let content = "--@transactional false\nCREATE INDEX CONCURRENTLY idx_x ON t(c);";
1022        let m = Migration::parse("0001_idx", content);
1023        assert!(!m.transactional);
1024    }
1025
1026    #[test]
1027    fn test_migration_parse_transactional_equals_form() {
1028        let content = "-- @transactional = false\nVACUUM ANALYZE;";
1029        let m = Migration::parse("0001_vac", content);
1030        assert!(!m.transactional);
1031    }
1032
1033    #[test]
1034    fn test_migration_parse_transactional_uppercase_value() {
1035        let content = "-- @transactional FALSE\nVACUUM;";
1036        let m = Migration::parse("0001_vac", content);
1037        assert!(!m.transactional);
1038    }
1039
1040    #[test]
1041    fn test_migration_parse_transactional_true_explicit() {
1042        let content = "-- @transactional true\nCREATE TABLE t (id INT);";
1043        let m = Migration::parse("0001_t", content);
1044        assert!(m.transactional);
1045    }
1046
1047    #[test]
1048    fn test_migration_parse_directive_only_in_leading_block() {
1049        // A `-- @transactional false` after real SQL must not flip the flag —
1050        // the directive lives in the file header, not buried mid-file.
1051        let content = "CREATE TABLE t (id INT);\n-- @transactional false\nCREATE INDEX i ON t(id);";
1052        let m = Migration::parse("0001_t", content);
1053        assert!(m.transactional);
1054    }
1055
1056    #[test]
1057    fn test_migration_parse_requires_at_prefix() {
1058        // Without `@`, the line is plain prose and must not flip the flag —
1059        // even when it spells out "transactional false".
1060        let content = "-- transactional false (this is just prose)\nCREATE TABLE t (id INT);";
1061        let m = Migration::parse("0001_t", content);
1062        assert!(m.transactional);
1063    }
1064
1065    #[test]
1066    fn test_migration_parse_prose_only_no_directive() {
1067        let content = "-- This migration creates a transactional ledger\nCREATE TABLE t (id INT);";
1068        let m = Migration::parse("0001_t", content);
1069        assert!(m.transactional);
1070    }
1071
1072    #[test]
1073    fn test_migration_parse_directive_after_blank_lines_in_header() {
1074        let content =
1075            "\n\n-- file header\n-- @transactional false\nCREATE INDEX CONCURRENTLY i ON t(id);";
1076        let m = Migration::parse("0001_t", content);
1077        assert!(!m.transactional);
1078    }
1079
1080    #[test]
1081    fn test_migration_new_defaults_transactional() {
1082        let m = Migration::new("v", "SELECT 1");
1083        assert!(m.transactional);
1084    }
1085
1086    #[test]
1087    fn test_is_empty_or_comment_only() {
1088        assert!(is_empty_or_comment_only(""));
1089        assert!(is_empty_or_comment_only("-- just a comment"));
1090        assert!(is_empty_or_comment_only("-- one\n-- two\n   "));
1091        assert!(!is_empty_or_comment_only("SELECT 1"));
1092        assert!(!is_empty_or_comment_only("-- header\nSELECT 1"));
1093    }
1094
1095    #[tokio::test]
1096    async fn test_get_max_system_version_prefers_highest_applied_version() {
1097        let pool = sqlx::postgres::PgPoolOptions::new()
1098            .max_connections(1)
1099            .connect_lazy("postgres://localhost/nonexistent")
1100            .expect("lazy pool must build");
1101        let runner = MigrationRunner::new(pool);
1102
1103        let applied = HashMap::from([
1104            ("__forge_v003".to_string(), "checksum-3".to_string()),
1105            ("__forge_v001".to_string(), "checksum-1".to_string()),
1106            ("0001_user_schema".to_string(), "checksum-u".to_string()),
1107        ]);
1108
1109        assert_eq!(runner.get_max_system_version(&applied), Some(3));
1110    }
1111
1112    #[test]
1113    fn test_verify_checksum_matches() {
1114        let m = Migration::new("0001_test", "CREATE TABLE t (id INT);");
1115        let computed = crate::stable_hash::sha256_hex(m.up_sql.as_bytes());
1116        verify_checksum(&m, &computed).expect("matching checksum should pass");
1117    }
1118
1119    #[test]
1120    fn test_verify_checksum_catches_system_migration_drift() {
1121        // The system-migration loop runs the same verify_checksum logic as
1122        // the user path, but pinning it directly here guards against future
1123        // refactors that might accidentally skip the check for built-ins.
1124        let migrations = super::super::builtin::get_system_migrations();
1125        let sys = migrations
1126            .first()
1127            .expect("at least one system migration is bundled");
1128        let migration = sys.to_migration();
1129        let real_checksum = crate::stable_hash::sha256_hex(migration.up_sql.as_bytes());
1130        verify_checksum(&migration, &real_checksum)
1131            .expect("matching checksum must pass for system migrations");
1132
1133        let err = verify_checksum(&migration, "stale-checksum").unwrap_err();
1134        let msg = err.to_string();
1135        assert!(
1136            msg.contains(&migration.version),
1137            "drift error must name the system migration: {msg}"
1138        );
1139    }
1140
1141    #[test]
1142    fn test_verify_checksum_mismatch_reports_versions() {
1143        let m = Migration::new("0001_test", "CREATE TABLE t (id INT);");
1144        let err = verify_checksum(&m, "deadbeef-old-checksum").unwrap_err();
1145        let msg = err.to_string();
1146        assert!(
1147            msg.contains("0001_test"),
1148            "error should name the migration: {msg}"
1149        );
1150        assert!(
1151            msg.contains("deadbeef-old-checksum"),
1152            "error should include recorded checksum: {msg}"
1153        );
1154        assert!(
1155            msg.to_lowercase().contains("changed") || msg.to_lowercase().contains("immutable"),
1156            "error should explain the drift policy: {msg}"
1157        );
1158    }
1159
1160    #[test]
1161    fn test_split_simple_statements() {
1162        let sql = "SELECT 1; SELECT 2; SELECT 3;";
1163        let stmts = super::split_sql_statements(sql);
1164        assert_eq!(stmts.len(), 3);
1165        assert_eq!(stmts[0], "SELECT 1");
1166        assert_eq!(stmts[1], "SELECT 2");
1167        assert_eq!(stmts[2], "SELECT 3");
1168    }
1169
1170    #[test]
1171    fn test_split_with_dollar_quoted_function() {
1172        let sql = r#"
1173CREATE FUNCTION test() RETURNS void AS $$
1174BEGIN
1175    SELECT 1;
1176    SELECT 2;
1177END;
1178$$ LANGUAGE plpgsql;
1179
1180SELECT 3;
1181"#;
1182        let stmts = super::split_sql_statements(sql);
1183        assert_eq!(stmts.len(), 2);
1184        assert!(stmts[0].contains("CREATE FUNCTION"));
1185        assert!(stmts[0].contains("$$ LANGUAGE plpgsql"));
1186        assert!(stmts[1].contains("SELECT 3"));
1187    }
1188
1189    #[test]
1190    fn test_split_preserves_dollar_quote_content() {
1191        let sql = r#"
1192CREATE FUNCTION notify() RETURNS trigger AS $$
1193DECLARE
1194    row_id TEXT;
1195BEGIN
1196    row_id := NEW.id::TEXT;
1197    RETURN NEW;
1198END;
1199$$ LANGUAGE plpgsql;
1200"#;
1201        let stmts = super::split_sql_statements(sql);
1202        assert_eq!(stmts.len(), 1);
1203        assert!(stmts[0].contains("row_id := NEW.id::TEXT"));
1204    }
1205}
1206
1207#[cfg(all(test, feature = "testcontainers"))]
1208#[allow(
1209    clippy::unwrap_used,
1210    clippy::indexing_slicing,
1211    clippy::panic,
1212    clippy::disallowed_methods
1213)]
1214mod integration_tests {
1215    use super::*;
1216    use forge_core::testing::{IsolatedTestDb, TestDatabase};
1217
1218    async fn setup_db(test_name: &str) -> IsolatedTestDb {
1219        let base = TestDatabase::from_env()
1220            .await
1221            .expect("Failed to create test database");
1222        base.isolated(test_name)
1223            .await
1224            .expect("Failed to create isolated db")
1225    }
1226
1227    /// CREATE INDEX CONCURRENTLY is rejected by PG inside a transaction
1228    /// block. The non-transactional path opens no BEGIN, so it must succeed.
1229    #[tokio::test]
1230    async fn non_transactional_migration_runs_create_index_concurrently() {
1231        let db = setup_db("mig_non_tx_create_index").await;
1232        let runner = MigrationRunner::new(db.pool().clone());
1233
1234        let setup = Migration::new(
1235            "0001_setup",
1236            "CREATE TABLE items (id INT PRIMARY KEY, name TEXT);",
1237        );
1238        let concurrent = Migration::parse(
1239            "0002_index",
1240            "-- @transactional false\nCREATE INDEX CONCURRENTLY items_name_idx ON items(name);",
1241        );
1242        assert!(!concurrent.transactional);
1243
1244        runner
1245            .run(vec![setup, concurrent])
1246            .await
1247            .expect("migrations apply cleanly");
1248
1249        let exists = sqlx::query_scalar!(
1250            r#"SELECT EXISTS(
1251                SELECT 1 FROM pg_indexes
1252                WHERE schemaname='public' AND tablename='items' AND indexname='items_name_idx'
1253            ) AS "exists!""#
1254        )
1255        .fetch_one(db.pool())
1256        .await
1257        .unwrap();
1258        assert!(exists, "index should be created");
1259
1260        let recorded = sqlx::query_scalar!(
1261            r#"SELECT COUNT(*) AS "n!" FROM forge_system_migrations WHERE version='0002_index'"#
1262        )
1263        .fetch_one(db.pool())
1264        .await
1265        .unwrap();
1266        assert_eq!(recorded, 1, "non-tx migration must record bookkeeping row");
1267    }
1268
1269    /// Trying to run CREATE INDEX CONCURRENTLY in the default (transactional)
1270    /// path must fail with PG's "cannot run inside a transaction block" error
1271    /// — proves we actually need the opt-out and aren't silently transacting.
1272    #[tokio::test]
1273    async fn transactional_migration_rejects_create_index_concurrently() {
1274        let db = setup_db("mig_tx_rejects_concurrent").await;
1275        let runner = MigrationRunner::new(db.pool().clone());
1276
1277        let setup = Migration::new(
1278            "0001_setup",
1279            "CREATE TABLE items (id INT PRIMARY KEY, name TEXT);",
1280        );
1281        // No directive, so transactional defaults to true.
1282        let concurrent = Migration::new(
1283            "0002_index",
1284            "CREATE INDEX CONCURRENTLY items_name_idx ON items(name);",
1285        );
1286        assert!(concurrent.transactional);
1287
1288        let err = runner.run(vec![setup, concurrent]).await.unwrap_err();
1289        let msg = err.to_string();
1290        assert!(
1291            msg.contains("CONCURRENTLY") || msg.to_lowercase().contains("transaction"),
1292            "expected PG to reject concurrent index in tx, got: {msg}"
1293        );
1294    }
1295
1296    /// Editing a migration after it has been applied must be caught by
1297    /// checksum verification. Without this check, the runner would silently
1298    /// skip the edited migration based on version alone, leaving the file
1299    /// out of sync with the database.
1300    #[tokio::test]
1301    async fn rerun_with_modified_sql_errors_with_checksum_drift() {
1302        let db = setup_db("mig_checksum_drift").await;
1303        let runner = MigrationRunner::new(db.pool().clone());
1304
1305        let original = Migration::new("0001_users", "CREATE TABLE users (id INT PRIMARY KEY);");
1306        runner
1307            .run(vec![original])
1308            .await
1309            .expect("first run applies cleanly");
1310
1311        // Same version, different SQL — simulates someone editing the
1312        // migration file after deploying.
1313        let tampered = Migration::new(
1314            "0001_users",
1315            "CREATE TABLE users (id INT PRIMARY KEY, name TEXT);",
1316        );
1317        let err = runner.run(vec![tampered]).await.unwrap_err();
1318        let msg = err.to_string();
1319        assert!(
1320            msg.contains("0001_users") && msg.to_lowercase().contains("changed"),
1321            "expected drift error mentioning the migration, got: {msg}"
1322        );
1323    }
1324
1325    /// `status()` must surface checksum drift via the `drift` field. Without
1326    /// this, `forge migrate status` silently reports a tampered migration as
1327    /// "applied" — operators learn about it only on the next `migrate up`.
1328    /// This drives the user-facing UX of the audit fix.
1329    #[tokio::test]
1330    async fn status_surfaces_checksum_drift_on_modified_migration() {
1331        let db = setup_db("mig_status_drift").await;
1332        let runner = MigrationRunner::new(db.pool().clone());
1333
1334        let original = Migration::new("0001_users", "CREATE TABLE users (id INT PRIMARY KEY);");
1335        runner
1336            .run(vec![original])
1337            .await
1338            .expect("first run applies cleanly");
1339
1340        let tampered = Migration::new(
1341            "0001_users",
1342            "CREATE TABLE users (id INT PRIMARY KEY, email TEXT);",
1343        );
1344        let status = runner
1345            .status(std::slice::from_ref(&tampered))
1346            .await
1347            .expect("status must succeed even with drift");
1348
1349        let row = status
1350            .applied
1351            .iter()
1352            .find(|a| a.version == "0001_users")
1353            .expect("applied row must exist");
1354        let expected = crate::stable_hash::sha256_hex(tampered.up_sql.as_bytes());
1355        match &row.drift {
1356            DriftStatus::Drifted { current_checksum } => {
1357                assert_eq!(
1358                    current_checksum, &expected,
1359                    "current_checksum must equal the *new* on-disk checksum",
1360                );
1361                assert_ne!(
1362                    current_checksum, &row.checksum,
1363                    "current_checksum must differ from the recorded checksum",
1364                );
1365            }
1366            other => panic!("expected DriftStatus::Drifted, got {other:?}"),
1367        }
1368    }
1369
1370    /// `status()` must report `SourceMissing` for an applied migration whose
1371    /// file no longer exists on disk — that's a louder signal than "looks
1372    /// fine", which is what a single Option<String> conflated. Operators need
1373    /// to see deleted migrations as a distinct condition.
1374    #[tokio::test]
1375    async fn status_reports_source_missing_when_file_gone() {
1376        let db = setup_db("mig_status_missing").await;
1377        let runner = MigrationRunner::new(db.pool().clone());
1378
1379        let m = Migration::new("0001_users", "CREATE TABLE users (id INT PRIMARY KEY);");
1380        runner
1381            .run(vec![m])
1382            .await
1383            .expect("first run applies cleanly");
1384
1385        // Simulate the migration file being deleted: pass an empty slice.
1386        let status = runner
1387            .status(&[])
1388            .await
1389            .expect("status must succeed with missing source");
1390
1391        let row = status
1392            .applied
1393            .iter()
1394            .find(|a| a.version == "0001_users")
1395            .expect("applied row must exist");
1396        assert_eq!(row.drift, DriftStatus::SourceMissing);
1397    }
1398
1399    /// Sanity: when the on-disk source matches the recorded checksum,
1400    /// `drift` is `Unchanged` (not the same as `SourceMissing`).
1401    #[tokio::test]
1402    async fn status_reports_unchanged_when_source_matches() {
1403        let db = setup_db("mig_status_unchanged").await;
1404        let runner = MigrationRunner::new(db.pool().clone());
1405
1406        let m = Migration::new("0001_users", "CREATE TABLE users (id INT PRIMARY KEY);");
1407        runner
1408            .run(vec![m.clone()])
1409            .await
1410            .expect("first run applies cleanly");
1411
1412        let status = runner
1413            .status(std::slice::from_ref(&m))
1414            .await
1415            .expect("status must succeed for clean source");
1416
1417        let row = status
1418            .applied
1419            .iter()
1420            .find(|a| a.version == "0001_users")
1421            .expect("applied row must exist");
1422        assert_eq!(row.drift, DriftStatus::Unchanged);
1423    }
1424
1425    /// When another connection is already holding the migration advisory lock,
1426    /// the runner must time out with a helpful message rather than block
1427    /// forever. We grab the lock manually then run a runner with a tiny
1428    /// timeout and assert the error mentions the holder PID.
1429    #[tokio::test]
1430    async fn lock_acquire_times_out_when_another_holder_present() {
1431        let db = setup_db("mig_lock_timeout").await;
1432
1433        // Hold the migration lock from a separate connection.
1434        let mut blocker = db.pool().acquire().await.unwrap();
1435        let acquired = sqlx::query_scalar!(
1436            r#"SELECT pg_try_advisory_lock($1) AS "ok!""#,
1437            MIGRATION_LOCK_ID
1438        )
1439        .fetch_one(&mut *blocker)
1440        .await
1441        .unwrap();
1442        assert!(acquired, "blocker must acquire the lock first");
1443
1444        let config = MigrationConfig {
1445            lock_acquire_timeout: Duration::from_millis(500),
1446            lock_poll_interval: Duration::from_millis(50),
1447            lock_warn_interval: Duration::from_secs(60),
1448        };
1449        let runner = MigrationRunner::with_config(db.pool().clone(), config);
1450
1451        let err = runner.run(vec![]).await.unwrap_err();
1452        let msg = err.to_string();
1453        assert!(
1454            msg.contains("Timed out") && msg.contains("migration lock"),
1455            "expected timeout error, got: {msg}"
1456        );
1457        assert!(
1458            msg.contains("holder pid"),
1459            "expected holder pid in error: {msg}"
1460        );
1461
1462        // Release on the blocker side so test cleanup is clean.
1463        sqlx::query_scalar!("SELECT pg_advisory_unlock($1)", MIGRATION_LOCK_ID)
1464            .fetch_one(&mut *blocker)
1465            .await
1466            .unwrap();
1467    }
1468
1469    /// Bring up the system schema (so `forge_validate_identifier` and the
1470    /// reactivity helpers exist) and return the pool.
1471    async fn db_with_system_schema(name: &str) -> IsolatedTestDb {
1472        let db = setup_db(name).await;
1473        MigrationRunner::new(db.pool().clone())
1474            .run(vec![])
1475            .await
1476            .expect("system migrations apply cleanly");
1477        db
1478    }
1479
1480    /// Pull the message out of a Postgres error so assertions don't depend
1481    /// on the surrounding sqlx error wrapping.
1482    fn pg_message(err: &sqlx::Error) -> String {
1483        match err {
1484            sqlx::Error::Database(db_err) => db_err.message().to_string(),
1485            other => other.to_string(),
1486        }
1487    }
1488
1489    #[tokio::test]
1490    async fn validate_identifier_rejects_empty() {
1491        let db = db_with_system_schema("vid_empty").await;
1492        let err = sqlx::query("SELECT forge_validate_identifier('')")
1493            .execute(db.pool())
1494            .await
1495            .unwrap_err();
1496        let msg = pg_message(&err);
1497        assert!(
1498            msg.contains("empty"),
1499            "expected empty-name error, got: {msg}"
1500        );
1501    }
1502
1503    #[tokio::test]
1504    async fn validate_identifier_rejects_overlong_name() {
1505        let db = db_with_system_schema("vid_overlong").await;
1506        // 64 ASCII bytes — one over PG's 63-byte budget.
1507        let name = "a".repeat(64);
1508        let err = sqlx::query(&format!("SELECT forge_validate_identifier('{name}')"))
1509            .execute(db.pool())
1510            .await
1511            .unwrap_err();
1512        let msg = pg_message(&err);
1513        assert!(
1514            msg.contains("63 bytes"),
1515            "expected 63-byte limit in error, got: {msg}",
1516        );
1517    }
1518
1519    #[tokio::test]
1520    async fn validate_identifier_rejects_pg_prefix() {
1521        let db = db_with_system_schema("vid_pgprefix").await;
1522        let err = sqlx::query("SELECT forge_validate_identifier('pg_my_table')")
1523            .execute(db.pool())
1524            .await
1525            .unwrap_err();
1526        let msg = pg_message(&err);
1527        assert!(
1528            msg.contains("pg_") || msg.to_lowercase().contains("reserved"),
1529            "expected pg_ reservation error, got: {msg}",
1530        );
1531    }
1532
1533    #[tokio::test]
1534    async fn validate_identifier_accepts_valid_name() {
1535        let db = db_with_system_schema("vid_ok").await;
1536        sqlx::query("SELECT forge_validate_identifier('orders_2026')")
1537            .execute(db.pool())
1538            .await
1539            .expect("normal identifier must be accepted");
1540    }
1541
1542    /// Starting with migrations the binary does not know about must fail.
1543    /// This simulates the rolling-deploy case: a newer binary ran user migration
1544    /// `0002_extra`; the older binary then starts up and must refuse rather than
1545    /// silently ignoring the unknown row.
1546    #[tokio::test]
1547    async fn startup_rejects_schema_ahead_of_binary() {
1548        let db = setup_db("mig_schema_ahead").await;
1549        let runner = MigrationRunner::new(db.pool().clone());
1550
1551        // Newer binary ran both migrations.
1552        let m1 = Migration::new("0001_users", "CREATE TABLE users (id INT PRIMARY KEY);");
1553        let m2 = Migration::new("0002_extra", "CREATE TABLE extra (id INT PRIMARY KEY);");
1554        runner
1555            .run(vec![m1.clone(), m2])
1556            .await
1557            .expect("newer binary applies both cleanly");
1558
1559        // Older binary only knows about 0001 — must refuse to start.
1560        let err = runner
1561            .run(vec![m1])
1562            .await
1563            .expect_err("older binary must refuse to start against a newer schema");
1564        let msg = err.to_string();
1565        assert!(
1566            msg.contains("0002_extra"),
1567            "error must name the unknown migration: {msg}",
1568        );
1569        assert!(
1570            msg.to_lowercase().contains("ahead") || msg.to_lowercase().contains("does not know"),
1571            "error must explain the schema-ahead condition: {msg}",
1572        );
1573    }
1574
1575    /// `forge_enable_reactivity` must catch the case where the input passes
1576    /// (≤ 63 bytes) but the derived `forge_notify_<name>` trigger overflows.
1577    /// 51 bytes of input pushes the prefixed trigger name to 64.
1578    #[tokio::test]
1579    async fn enable_reactivity_rejects_derived_trigger_overflow() {
1580        let db = db_with_system_schema("enrx_overflow").await;
1581        let name = "a".repeat(51);
1582        let err = sqlx::query(&format!("SELECT forge_enable_reactivity('{name}')"))
1583            .execute(db.pool())
1584            .await
1585            .unwrap_err();
1586        let msg = pg_message(&err);
1587        assert!(
1588            msg.contains("63 bytes"),
1589            "expected derived-name overflow error, got: {msg}",
1590        );
1591    }
1592}