Skip to main content

ferrule_core/
migrate.rs

1//! Lightweight, multi-backend SQL migration runner.
2//!
3//! A `migrations/` directory of timestamp-ordered `.up.sql` / `.down.sql`
4//! files is tracked in a `ferrule_migrations` table inside the target
5//! database.  Pure SQL, no ORM, no DSL.
6//!
7//! File naming: `YYYYMMDDHHMMSS_<name>.{up,down}.sql` — lex sort = order.
8
9use ferrule_sql::quote_string;
10use ferrule_sql::{Connection, Dialect, SqlError};
11use sha2::{Digest, Sha256};
12use std::collections::{HashMap, HashSet};
13use std::path::PathBuf;
14
15/// A single discovered migration file.
16#[derive(Debug, Clone)]
17pub struct MigrationFile {
18    pub version: String,
19    pub path: PathBuf,
20    pub direction: Direction,
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum Direction {
25    Up,
26    Down,
27}
28
29/// Migration engine bound to an open connection.
30pub struct MigrationEngine {
31    conn: Box<dyn Connection>,
32    migrations_dir: PathBuf,
33    dialect: Dialect,
34}
35
36impl MigrationEngine {
37    pub fn new(conn: Box<dyn Connection>, migrations_dir: PathBuf, dialect: Dialect) -> Self {
38        Self {
39            conn,
40            migrations_dir,
41            dialect,
42        }
43    }
44
45    /// Ensure the `ferrule_migrations` tracking table exists.
46    ///
47    /// The DDL is dialect-specific because the canonical column types,
48    /// the timestamp default, and the "create if absent" idiom differ
49    /// across backends:
50    ///
51    /// - **SQLite / Postgres** — `TEXT` keys are valid and
52    ///   `CREATE TABLE IF NOT EXISTS` is supported.
53    /// - **MySQL** — `TEXT` cannot be a `PRIMARY KEY` without a prefix
54    ///   length, so the keyed columns use `VARCHAR`.
55    /// - **MSSQL** — `CREATE TABLE IF NOT EXISTS` is not valid T-SQL,
56    ///   `TEXT` cannot key a table, and the `TIMESTAMP` type is a
57    ///   rowversion (not a datetime); we guard creation with
58    ///   `IF OBJECT_ID(...) IS NULL` and use `DATETIME2`.
59    /// - **Oracle** — has no `TEXT` type and `IF NOT EXISTS` is
60    ///   unsupported pre-23c, so creation runs inside a PL/SQL block
61    ///   that swallows ORA-00955 (name already used).
62    pub fn ensure_migration_table(&mut self) -> Result<(), SqlError> {
63        let sql = match self.dialect {
64            Dialect::Sqlite | Dialect::Postgres => {
65                r#"CREATE TABLE IF NOT EXISTS ferrule_migrations (
66    version TEXT PRIMARY KEY,
67    applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
68    checksum TEXT NOT NULL
69)"#
70                .to_string()
71            }
72            Dialect::MySql => r#"CREATE TABLE IF NOT EXISTS ferrule_migrations (
73    version VARCHAR(255) PRIMARY KEY,
74    applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
75    checksum VARCHAR(64) NOT NULL
76)"#
77            .to_string(),
78            Dialect::MsSql => r#"IF OBJECT_ID(N'ferrule_migrations', N'U') IS NULL
79CREATE TABLE ferrule_migrations (
80    version NVARCHAR(255) PRIMARY KEY,
81    applied_at DATETIME2 DEFAULT SYSUTCDATETIME(),
82    checksum NVARCHAR(64) NOT NULL
83)"#
84            .to_string(),
85            Dialect::Oracle => {
86                // ORA-00955 ("name is already used by an existing object")
87                // is the table-already-exists signal; swallow it so the
88                // call is idempotent like the other dialects.
89                r#"BEGIN
90    EXECUTE IMMEDIATE 'CREATE TABLE ferrule_migrations (
91        version VARCHAR2(255) PRIMARY KEY,
92        applied_at TIMESTAMP DEFAULT SYSTIMESTAMP,
93        checksum VARCHAR2(64) NOT NULL
94    )';
95EXCEPTION
96    WHEN OTHERS THEN
97        IF SQLCODE != -955 THEN
98            RAISE;
99        END IF;
100END;"#
101                    .to_string()
102            }
103        };
104        self.conn.execute(&sql)?;
105        Ok(())
106    }
107
108    /// Return the list of migrations that have **not** yet been applied,
109    /// sorted lexicographically by version.
110    pub fn pending_migrations(&mut self) -> Result<Vec<MigrationFile>, SqlError> {
111        let applied = self.applied_versions()?;
112        let mut pending = self.scan_dir(Direction::Up)?;
113        pending.retain(|m| !applied.contains(&m.version));
114        Ok(pending)
115    }
116
117    /// Apply a single migration (`.up.sql`).
118    ///
119    /// The migration script and the `ferrule_migrations` tracking-row
120    /// `INSERT` are committed as a single unit on backends with
121    /// transactional DDL (SQLite, Postgres, MSSQL): both succeed or both
122    /// roll back, so a mid-script failure can never leave the migration
123    /// recorded-but-partial or applied-but-untracked. On MySQL and Oracle,
124    /// DDL implicitly commits, so the two steps run best-effort and a
125    /// failure in the middle can leave the schema partially applied — see
126    /// `apply_atomic` for the per-dialect details.
127    pub fn apply_up(&mut self, file: &MigrationFile) -> Result<(), SqlError> {
128        let sql = std::fs::read_to_string(&file.path).map_err(|e| {
129            SqlError::QueryFailed(format!(
130                "cannot read migration {}: {}",
131                file.path.display(),
132                e
133            ))
134        })?;
135        let checksum = hex_digest(&sql);
136
137        validate_version(&file.version)?;
138        let track = format!(
139            "INSERT INTO ferrule_migrations (version, checksum) VALUES ({}, {})",
140            quote_string(&file.version),
141            quote_string(&checksum)
142        );
143        self.apply_atomic(&sql, &track)
144    }
145
146    /// Rollback a single migration (`.down.sql`).
147    ///
148    /// The rollback script and the `ferrule_migrations` tracking-row
149    /// `DELETE` are committed together on backends with transactional DDL
150    /// (SQLite, Postgres, MSSQL): the row is removed only if the entire
151    /// down script succeeds, so a mid-script failure can never leave the
152    /// schema half-rolled-back while the row still marks the migration
153    /// applied. On MySQL and Oracle, DDL implicitly commits, so the two
154    /// steps run best-effort — see `apply_atomic`.
155    pub fn apply_down(&mut self, file: &MigrationFile) -> Result<(), SqlError> {
156        let sql = std::fs::read_to_string(&file.path).map_err(|e| {
157            SqlError::QueryFailed(format!(
158                "cannot read migration {}: {}",
159                file.path.display(),
160                e
161            ))
162        })?;
163
164        validate_version(&file.version)?;
165        let track = format!(
166            "DELETE FROM ferrule_migrations WHERE version = {}",
167            quote_string(&file.version)
168        );
169        self.apply_atomic(&sql, &track)
170    }
171
172    /// Run a migration `script` and its tracking-table statement `track`
173    /// (the `INSERT` for an up, the `DELETE` for a down) as a single unit.
174    ///
175    /// Atomicity depends on whether the backend supports transactional DDL:
176    ///
177    /// - **SQLite / Postgres / MSSQL** — DDL participates in transactions,
178    ///   so the script and the tracking statement are wrapped in one
179    ///   `BEGIN`/`COMMIT` batch. If any statement fails, an explicit
180    ///   `ROLLBACK` discards every change (schema and tracking row) so the
181    ///   migration is left exactly as it was before the attempt. MSSQL
182    ///   additionally sets `XACT_ABORT ON`, which makes a runtime error
183    ///   abort the whole batch (T-SQL does not roll back on error by
184    ///   default).
185    /// - **MySQL / Oracle** — DDL implicitly commits, so wrapping it in a
186    ///   transaction would not protect it. The script and the tracking
187    ///   statement run as two separate operations (best-effort). A failure
188    ///   partway through the script can therefore leave the schema partially
189    ///   changed and the tracking row out of sync; this is an inherent
190    ///   limitation of these engines, not a bug in the runner. Oracle does
191    ///   not autocommit DML, so an explicit `COMMIT` persists the
192    ///   tracking-row write (and any DML in the script); MySQL autocommits,
193    ///   so it needs none.
194    fn apply_atomic(&mut self, script: &str, track: &str) -> Result<(), SqlError> {
195        match self.dialect {
196            Dialect::Sqlite | Dialect::Postgres | Dialect::MsSql => {
197                let (begin, prelude) = match self.dialect {
198                    Dialect::MsSql => ("BEGIN TRANSACTION;", "SET XACT_ABORT ON;\n"),
199                    _ => ("BEGIN;", ""),
200                };
201                // Separate the migration script from the tracking statement
202                // with exactly one `;`. A bare/empty statement (`;` on its own)
203                // makes SQLite's per-statement execution path fail with the
204                // misleading "not an error", so only emit a terminator when the
205                // script actually has content.
206                let script = script.trim();
207                let body = if script.is_empty() {
208                    String::new()
209                } else if script.ends_with(';') {
210                    format!("{script}\n")
211                } else {
212                    format!("{script};\n")
213                };
214                let batch = format!("{prelude}{begin}\n{body}{track};\nCOMMIT;");
215                // SQLite and Postgres run the whole batch through
216                // `execute_multi` (SQLite splits it statement-by-statement;
217                // Postgres sends it as one `simple_query`). MSSQL must use
218                // `execute` instead: its `execute_multi` runs the batch via
219                // tiberius' result-returning `query` path, whose metadata pass
220                // executes a DDL batch twice (the second `CREATE TABLE` then
221                // fails "already exists"). `execute` uses the plain
222                // non-resultset path and runs the batch exactly once.
223                let run = match self.dialect {
224                    Dialect::MsSql => self.conn.execute(&batch).map(|_| ()),
225                    _ => self.conn.execute_multi(&batch).map(|_| ()),
226                };
227                match run {
228                    Ok(()) => Ok(()),
229                    Err(e) => {
230                        // Discard any partial work and return the original
231                        // error. The rollback is best-effort: if it also
232                        // fails (e.g. the connection is gone, or XACT_ABORT
233                        // already rolled back) the caller still sees the
234                        // underlying migration failure.
235                        let _ = self.conn.execute("ROLLBACK;");
236                        Err(e)
237                    }
238                }
239            }
240            Dialect::MySql | Dialect::Oracle => {
241                self.conn.execute_multi(script)?;
242                self.conn.execute(track)?;
243                // Oracle does not autocommit DML: without an explicit COMMIT
244                // the tracking-row INSERT/DELETE (and any DML in the script)
245                // is rolled back when the connection closes. MySQL autocommits
246                // each statement, so this is Oracle-only.
247                if self.dialect == Dialect::Oracle {
248                    self.conn.execute("COMMIT")?;
249                }
250                Ok(())
251            }
252        }
253    }
254
255    /// Read the last N applied versions from the tracking table,
256    /// ordered by most-recent first.
257    ///
258    /// The ordering uses `version DESC` as a deterministic tiebreak after
259    /// `applied_at DESC`: `applied_at` has second granularity, and
260    /// `migrate up` can apply a whole batch inside a single second, so
261    /// without the tiebreak `down` could roll back an arbitrary member of
262    /// that batch rather than the genuinely newest one.
263    ///
264    /// The row-limit clause is dialect-specific: SQLite, Postgres, and
265    /// MySQL accept `LIMIT n`; MSSQL uses `SELECT TOP n`; Oracle (12c+)
266    /// uses `FETCH FIRST n ROWS ONLY`.
267    pub fn last_applied(&mut self, n: usize) -> Result<Vec<AppliedMigration>, SqlError> {
268        let order = "ORDER BY applied_at DESC, version DESC";
269        let sql = match self.dialect {
270            Dialect::Sqlite | Dialect::Postgres | Dialect::MySql => {
271                format!("SELECT version, checksum FROM ferrule_migrations {order} LIMIT {n}")
272            }
273            Dialect::MsSql => {
274                format!("SELECT TOP {n} version, checksum FROM ferrule_migrations {order}")
275            }
276            Dialect::Oracle => {
277                format!(
278                    "SELECT version, checksum FROM ferrule_migrations {order} FETCH FIRST {n} ROWS ONLY"
279                )
280            }
281        };
282        self.query_applied(&sql)
283    }
284
285    /// Read **every** applied migration from the tracking table, ordered
286    /// most-recent first (`applied_at DESC, version DESC`).
287    ///
288    /// Unlike [`MigrationEngine::last_applied`] this applies no row cap, so
289    /// `migrate history` and `migrate verify` see the full set rather than a
290    /// silently truncated window. The ordering is identical to
291    /// `last_applied` and needs no dialect-specific limit clause, so the
292    /// same query runs on all backends.
293    pub fn all_applied(&mut self) -> Result<Vec<AppliedMigration>, SqlError> {
294        let sql =
295            "SELECT version, checksum FROM ferrule_migrations ORDER BY applied_at DESC, version DESC";
296        self.query_applied(sql)
297    }
298
299    /// Run a `SELECT version, checksum FROM ferrule_migrations ...` query
300    /// and collect the rows into [`AppliedMigration`]s.
301    fn query_applied(&mut self, sql: &str) -> Result<Vec<AppliedMigration>, SqlError> {
302        let result = self.conn.query(sql)?;
303        let mut out = Vec::with_capacity(result.rows.len());
304        for row in result.rows {
305            let version = row[0].to_string();
306            let checksum = row[1].to_string();
307            out.push(AppliedMigration { version, checksum });
308        }
309        Ok(out)
310    }
311
312    /// Verify that a single migration's `.up.sql` file on disk still matches
313    /// the checksum recorded in the database. Returns `Ok(())` if clean,
314    /// `Err` on drift.
315    ///
316    /// The on-disk file is resolved by reusing [`MigrationEngine::scan_dir`]
317    /// (`Direction::Up`) and matching the derived version **exactly**, the
318    /// same way `apply_up` and `pending_migrations` identify files. An
319    /// earlier `name.starts_with(version)` prefix match could bind the
320    /// wrong file (e.g. version `2026` matching `20260602_x.up.sql`),
321    /// reporting spurious drift or masking real drift.
322    pub fn verify_checksum(&mut self, version: &str) -> Result<(), SqlError> {
323        validate_version(version)?;
324        let sql = format!(
325            "SELECT checksum FROM ferrule_migrations WHERE version = {}",
326            quote_string(version)
327        );
328        let result = self.conn.query(&sql)?;
329        let db_checksum = result
330            .rows
331            .first()
332            .map(|r| r[0].to_string())
333            .ok_or_else(|| {
334                SqlError::QueryFailed(format!(
335                    "migration '{}' not found in tracking table",
336                    version
337                ))
338            })?;
339
340        let up_files = self.scan_dir(Direction::Up)?;
341        let file = up_files
342            .iter()
343            .find(|f| f.version == version)
344            .ok_or_else(|| {
345                SqlError::QueryFailed(format!(
346                    "migration file for version '{}' not found in {}",
347                    version,
348                    self.migrations_dir.display()
349                ))
350            })?;
351
352        let content = std::fs::read_to_string(&file.path).map_err(|e| {
353            SqlError::QueryFailed(format!(
354                "cannot read migration file {}: {}",
355                file.path.display(),
356                e
357            ))
358        })?;
359        let file_checksum = hex_digest(&content);
360
361        if db_checksum != file_checksum {
362            return Err(SqlError::QueryFailed(format!(
363                "checksum mismatch for migration '{}':\n  db:    {}\n  file:  {}\n  The migration file was edited after it was applied.",
364                version, db_checksum, file_checksum
365            )));
366        }
367        Ok(())
368    }
369
370    // ------------------------------------------------------------------
371    // Internal helpers
372    // ------------------------------------------------------------------
373
374    pub fn applied_versions(&mut self) -> Result<HashSet<String>, SqlError> {
375        let sql = "SELECT version FROM ferrule_migrations";
376        let result = self.conn.query(sql)?;
377        let mut set = HashSet::with_capacity(result.rows.len());
378        for row in result.rows {
379            set.insert(row[0].to_string());
380        }
381        Ok(set)
382    }
383
384    /// Scan the migrations directory for files in the given `direction`,
385    /// returning them sorted lexicographically by version.
386    ///
387    /// The version is the substring before the first `_` in the file stem
388    /// (`20260602120000_add_users.up.sql` -> `20260602120000`). Because
389    /// two distinct files can collapse onto the same version under that
390    /// rule, this detects the collision up front and returns a
391    /// [`SqlError::QueryFailed`] naming the conflicting files. Surfacing
392    /// it here — before any `apply_up` runs — prevents a duplicate version
393    /// from being discovered only when the second tracking-row `INSERT`
394    /// hits the primary-key constraint mid-run, which would leave one
395    /// migration's DDL applied but untracked.
396    pub fn scan_dir(&self, direction: Direction) -> Result<Vec<MigrationFile>, SqlError> {
397        let ext = match direction {
398            Direction::Up => ".up.sql",
399            Direction::Down => ".down.sql",
400        };
401        let mut files = Vec::new();
402        let entries = std::fs::read_dir(&self.migrations_dir).map_err(|e| {
403            SqlError::QueryFailed(format!(
404                "cannot read migrations directory '{}': {}",
405                self.migrations_dir.display(),
406                e
407            ))
408        })?;
409        for entry in entries {
410            let entry = entry.map_err(|e| {
411                SqlError::QueryFailed(format!("cannot read directory entry: {}", e))
412            })?;
413            let name = entry.file_name();
414            let name = name.to_string_lossy();
415            if name.ends_with(ext) {
416                let stem = name.strip_suffix(ext).unwrap_or(&name);
417                let version = stem.split('_').next().unwrap_or(stem).to_string();
418                files.push(MigrationFile {
419                    version,
420                    path: entry.path(),
421                    direction,
422                });
423            }
424        }
425        files.sort_by(|a, b| a.version.cmp(&b.version));
426
427        // Reject two files that derive the same version: applying both
428        // would commit the first migration's DDL then violate the
429        // tracking table's primary key on the second.
430        let mut seen: HashMap<&str, &PathBuf> = HashMap::with_capacity(files.len());
431        for file in &files {
432            if let Some(prev) = seen.insert(file.version.as_str(), &file.path) {
433                return Err(SqlError::QueryFailed(format!(
434                    "duplicate migration version '{}' derived from two files:\n  {}\n  {}\nRename one so each version (the text before the first '_') is unique.",
435                    file.version,
436                    prev.display(),
437                    file.path.display()
438                )));
439            }
440        }
441
442        Ok(files)
443    }
444
445    /// Build a `version -> file-checksum` map by scanning the up migrations
446    /// directory exactly once.
447    ///
448    /// The checksum is the SHA-256 hex digest of the `.up.sql` file's
449    /// contents, identical to what [`MigrationEngine::apply_up`] records in
450    /// the tracking table. This lets `verify` compare every applied
451    /// migration against on-disk content using the checksums it already has
452    /// from the applied-list query — one directory scan and zero extra
453    /// `SELECT`s, instead of re-reading the directory and re-querying the
454    /// database once per applied migration.
455    pub fn on_disk_checksums(&self) -> Result<HashMap<String, String>, SqlError> {
456        let files = self.scan_dir(Direction::Up)?;
457        let mut map = HashMap::with_capacity(files.len());
458        for file in files {
459            let content = std::fs::read_to_string(&file.path).map_err(|e| {
460                SqlError::QueryFailed(format!(
461                    "cannot read migration file {}: {}",
462                    file.path.display(),
463                    e
464                ))
465            })?;
466            map.insert(file.version, hex_digest(&content));
467        }
468        Ok(map)
469    }
470
471    /// Verify that every supplied applied migration still matches its
472    /// `.up.sql` file on disk, using checksums already held in hand.
473    ///
474    /// `applied` is the list returned by [`MigrationEngine::last_applied`]
475    /// (or any caller-built list of applied versions + recorded checksums).
476    /// The directory is scanned once via
477    /// [`MigrationEngine::on_disk_checksums`]; no per-migration queries are
478    /// issued. The returned vector lists every drifted migration with a
479    /// human-readable reason — empty means all clean.
480    pub fn verify_applied(
481        &self,
482        applied: &[AppliedMigration],
483    ) -> Result<Vec<ChecksumDrift>, SqlError> {
484        let on_disk = self.on_disk_checksums()?;
485        let mut drift = Vec::new();
486        for migration in applied {
487            match on_disk.get(&migration.version) {
488                None => drift.push(ChecksumDrift {
489                    version: migration.version.clone(),
490                    reason: format!(
491                        "migration file for version '{}' not found in {}",
492                        migration.version,
493                        self.migrations_dir.display()
494                    ),
495                }),
496                Some(file_checksum) if *file_checksum != migration.checksum => {
497                    drift.push(ChecksumDrift {
498                        version: migration.version.clone(),
499                        reason: format!(
500                            "checksum mismatch:\n      db:   {}\n      file: {}\n      The migration file was edited after it was applied.",
501                            migration.checksum, file_checksum
502                        ),
503                    });
504                }
505                Some(_) => {}
506            }
507        }
508        Ok(drift)
509    }
510}
511
512/// A migration that has been applied, as read from the tracking table.
513#[derive(Debug, Clone)]
514pub struct AppliedMigration {
515    pub version: String,
516    pub checksum: String,
517}
518
519/// A drifted migration found by [`MigrationEngine::verify_applied`]:
520/// either its `.up.sql` file is missing on disk, or its on-disk checksum
521/// no longer matches the value recorded when it was applied.
522#[derive(Debug, Clone)]
523pub struct ChecksumDrift {
524    pub version: String,
525    pub reason: String,
526}
527
528/// Generate a hex SHA-256 digest of the input string.
529fn hex_digest(input: &str) -> String {
530    let mut hasher = Sha256::new();
531    hasher.update(input.as_bytes());
532    let result = hasher.finalize();
533    hex::encode(result)
534}
535
536/// Reject a migration version that cannot be safely interpolated into a
537/// single-quoted SQL literal.
538///
539/// Versions are interpolated into the tracking-table `INSERT`/`DELETE`/`SELECT`
540/// statements and quoted with [`ferrule_sql::quote_string`], which doubles
541/// embedded `'`. That alone is enough for SQLite, Postgres, MSSQL, and Oracle,
542/// but MySQL interprets `\` as an escape inside string literals by default, so
543/// a backslash in a version could still break out of the literal. The version
544/// originates from a filename stem (the text before the first `_`), which the
545/// naming convention defines as timestamp digits, so any `'` or `\` indicates a
546/// malformed name rather than a legitimate version; reject it before building
547/// the statement.
548fn validate_version(version: &str) -> Result<(), SqlError> {
549    if version.contains('\'') || version.contains('\\') {
550        return Err(SqlError::QueryFailed(format!(
551            "migration version '{version}' contains a quote or backslash; \
552             rename the file so the version (the text before the first '_') \
553             has no ' or \\ characters"
554        )));
555    }
556    Ok(())
557}
558
559#[cfg(all(test, feature = "sqlite"))]
560mod tests {
561    //! SQLite-backed integration tests for the migration engine.
562    //!
563    //! SQLite needs no external container, so these exercise the real
564    //! apply/rollback/verify code paths end-to-end (including the
565    //! transactional-atomicity batch in `apply_atomic`).
566
567    use super::*;
568    use ferrule_sql::ConnectOptions;
569    use ferrule_sql::DatabaseUrl;
570    use std::sync::atomic::{AtomicU64, Ordering};
571
572    static CTR: AtomicU64 = AtomicU64::new(0);
573
574    /// A throwaway temp directory (migrations + db file) cleaned up on drop.
575    struct TestDir {
576        base: PathBuf,
577        mig: PathBuf,
578        db: PathBuf,
579    }
580
581    impl Drop for TestDir {
582        fn drop(&mut self) {
583            let _ = std::fs::remove_dir_all(&self.base);
584        }
585    }
586
587    fn test_dir() -> TestDir {
588        let pid = std::process::id();
589        let n = CTR.fetch_add(1, Ordering::SeqCst);
590        let base = std::env::temp_dir().join(format!("ferrule-migrate-test-{pid}-{n}"));
591        let mig = base.join("mig");
592        std::fs::create_dir_all(&mig).expect("create temp migrations dir");
593        let db = base.join("test.db");
594        TestDir { base, mig, db }
595    }
596
597    fn engine(t: &TestDir) -> MigrationEngine {
598        let url =
599            DatabaseUrl::parse(&format!("sqlite://{}", t.db.display())).expect("parse sqlite url");
600        let conn =
601            ferrule_sql::connect(&url, &ConnectOptions::default(), None).expect("connect sqlite");
602        MigrationEngine::new(conn, t.mig.clone(), Dialect::Sqlite)
603    }
604
605    fn write_pair(t: &TestDir, stem: &str, up: &str, down: &str) {
606        std::fs::write(t.mig.join(format!("{stem}.up.sql")), up).expect("write up");
607        std::fs::write(t.mig.join(format!("{stem}.down.sql")), down).expect("write down");
608    }
609
610    #[test]
611    fn lifecycle_up_verify_down() {
612        let t = test_dir();
613        write_pair(
614            &t,
615            "20240101_users",
616            "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);\n\
617             INSERT INTO users (name) VALUES ('a');\n",
618            "DROP TABLE users;\n",
619        );
620        let mut eng = engine(&t);
621        eng.ensure_migration_table().unwrap();
622
623        let pending = eng.pending_migrations().unwrap();
624        assert_eq!(pending.len(), 1);
625        eng.apply_up(&pending[0]).unwrap();
626
627        // Recorded, schema + data present, no drift, nothing left pending.
628        assert!(eng.applied_versions().unwrap().contains("20240101"));
629        let rows = eng.conn.query("SELECT name FROM users").unwrap();
630        assert_eq!(rows.rows.len(), 1);
631        let all = eng.all_applied().unwrap();
632        assert!(eng.verify_applied(&all).unwrap().is_empty());
633        assert!(eng.pending_migrations().unwrap().is_empty());
634
635        // Roll back: tracking row gone and the table dropped.
636        let downs = eng.scan_dir(Direction::Down).unwrap();
637        eng.apply_down(&downs[0]).unwrap();
638        assert!(eng.applied_versions().unwrap().is_empty());
639        let tbls = eng
640            .conn
641            .query("SELECT name FROM sqlite_master WHERE type='table' AND name='users'")
642            .unwrap();
643        assert_eq!(tbls.rows.len(), 0);
644    }
645
646    #[test]
647    fn apply_up_rolls_back_on_failure() {
648        // Regression guard: the second statement fails, so the whole
649        // migration (the first statement + the tracking row) must roll back.
650        let t = test_dir();
651        write_pair(
652            &t,
653            "20240101_bad",
654            "CREATE TABLE keep_me (id INTEGER);\nCREATE TABLE keep_me (id INTEGER);\n",
655            "DROP TABLE keep_me;\n",
656        );
657        let mut eng = engine(&t);
658        eng.ensure_migration_table().unwrap();
659        let pending = eng.pending_migrations().unwrap();
660
661        assert!(
662            eng.apply_up(&pending[0]).is_err(),
663            "apply_up must fail on the duplicate CREATE"
664        );
665        let tbls = eng
666            .conn
667            .query("SELECT name FROM sqlite_master WHERE type='table' AND name='keep_me'")
668            .unwrap();
669        assert_eq!(tbls.rows.len(), 0, "partial schema must be rolled back");
670        assert!(
671            eng.applied_versions().unwrap().is_empty(),
672            "a failed migration must not be recorded"
673        );
674    }
675
676    #[test]
677    fn duplicate_version_is_rejected() {
678        let t = test_dir();
679        write_pair(&t, "20240101_a", "CREATE TABLE a(x);\n", "DROP TABLE a;\n");
680        write_pair(&t, "20240101_b", "CREATE TABLE b(x);\n", "DROP TABLE b;\n");
681        let eng = engine(&t);
682        assert!(
683            eng.scan_dir(Direction::Up).is_err(),
684            "two files deriving the same version must be rejected up front"
685        );
686    }
687
688    #[test]
689    fn verify_applied_detects_checksum_drift() {
690        let t = test_dir();
691        write_pair(&t, "20240101_e", "CREATE TABLE e(x);\n", "DROP TABLE e;\n");
692        let mut eng = engine(&t);
693        eng.ensure_migration_table().unwrap();
694        let pending = eng.pending_migrations().unwrap();
695        eng.apply_up(&pending[0]).unwrap();
696
697        // Edit the applied file: verify must flag the checksum drift.
698        std::fs::write(t.mig.join("20240101_e.up.sql"), "CREATE TABLE e(x, y);\n").unwrap();
699        let all = eng.all_applied().unwrap();
700        let drift = eng.verify_applied(&all).unwrap();
701        assert_eq!(drift.len(), 1);
702        assert_eq!(drift[0].version, "20240101");
703    }
704
705    #[test]
706    fn verify_applied_detects_missing_file() {
707        let t = test_dir();
708        write_pair(&t, "20240101_g", "CREATE TABLE g(x);\n", "DROP TABLE g;\n");
709        let mut eng = engine(&t);
710        eng.ensure_migration_table().unwrap();
711        let pending = eng.pending_migrations().unwrap();
712        eng.apply_up(&pending[0]).unwrap();
713
714        // Delete the on-disk file: verify must report applied-but-missing drift.
715        std::fs::remove_file(t.mig.join("20240101_g.up.sql")).unwrap();
716        std::fs::remove_file(t.mig.join("20240101_g.down.sql")).unwrap();
717        let all = eng.all_applied().unwrap();
718        let drift = eng.verify_applied(&all).unwrap();
719        assert_eq!(drift.len(), 1);
720        assert_eq!(drift[0].version, "20240101");
721    }
722}