Skip to main content

cratestack_sqlx/
migrations.rs

1//! Forward-only migration runner.
2//!
3//! Phase 3 ships the foundations: a tracking table, a runner that applies
4//! pending migrations in order, and a checksum guard so the SQL recorded
5//! in the database can be cross-checked against the one a deployment ships
6//! today. Banks running zero-downtime migrations write their migrations by
7//! hand (the contract under regulation is "the change is reviewable as a
8//! SQL diff") — schema-diff-driven generation is out of scope here.
9
10use cratestack_core::CoolError;
11use sha2::{Digest, Sha256};
12
13pub const MIGRATIONS_TABLE_DDL: &str = r#"
14CREATE TABLE IF NOT EXISTS cratestack_migrations (
15    id TEXT PRIMARY KEY,
16    description TEXT NOT NULL,
17    checksum BYTEA NOT NULL,
18    applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
19);
20"#;
21
22/// A single migration step. Banks store these in source control alongside
23/// the schema; the runner applies any rows in `migrations` not yet present
24/// in `cratestack_migrations`. `down` is recorded but the runner doesn't
25/// call it — irreversible-by-default is the safe banking posture.
26#[derive(Debug, Clone)]
27pub struct Migration {
28    /// Sortable id, conventionally `YYYYMMDDHHMMSS_<slug>`.
29    pub id: String,
30    pub description: String,
31    pub up: String,
32    pub down: Option<String>,
33}
34
35impl Migration {
36    pub fn checksum(&self) -> [u8; 32] {
37        let mut hasher = Sha256::new();
38        hasher.update(self.id.as_bytes());
39        hasher.update(b"\0");
40        hasher.update(self.description.as_bytes());
41        hasher.update(b"\0");
42        hasher.update(self.up.as_bytes());
43        hasher.finalize().into()
44    }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum MigrationStatus {
49    Pending,
50    Applied,
51    ChecksumMismatch,
52}
53
54#[derive(Debug, Clone)]
55pub struct MigrationState {
56    pub id: String,
57    pub status: MigrationStatus,
58}
59
60pub async fn ensure_migrations_table(pool: &sqlx::PgPool) -> Result<(), CoolError> {
61    for statement in MIGRATIONS_TABLE_DDL
62        .split(';')
63        .map(str::trim)
64        .filter(|s| !s.is_empty())
65    {
66        sqlx::query(statement)
67            .execute(pool)
68            .await
69            .map_err(|error| CoolError::Database(error.to_string()))?;
70    }
71    Ok(())
72}
73
74/// Inspect each migration in `migrations` against `cratestack_migrations`
75/// and report which are pending / applied / drifted. Use before `apply` to
76/// surface drift to the operator without changing state.
77pub async fn status(
78    pool: &sqlx::PgPool,
79    migrations: &[Migration],
80) -> Result<Vec<MigrationState>, CoolError> {
81    ensure_migrations_table(pool).await?;
82    let rows = sqlx::query_as::<_, (String, Vec<u8>)>(
83        "SELECT id, checksum FROM cratestack_migrations ORDER BY id",
84    )
85    .fetch_all(pool)
86    .await
87    .map_err(|error| CoolError::Database(error.to_string()))?;
88
89    let mut applied: std::collections::HashMap<String, Vec<u8>> = std::collections::HashMap::new();
90    for (id, checksum) in rows {
91        applied.insert(id, checksum);
92    }
93
94    Ok(migrations
95        .iter()
96        .map(|m| {
97            let id = m.id.clone();
98            match applied.get(&id) {
99                Some(stored) if stored.as_slice() == m.checksum().as_slice() => MigrationState {
100                    id,
101                    status: MigrationStatus::Applied,
102                },
103                Some(_) => MigrationState {
104                    id,
105                    status: MigrationStatus::ChecksumMismatch,
106                },
107                None => MigrationState {
108                    id,
109                    status: MigrationStatus::Pending,
110                },
111            }
112        })
113        .collect())
114}
115
116/// Apply every pending migration in the input slice, in order. Each
117/// migration runs inside its own transaction; checksum drift aborts the
118/// whole apply (banks treat drift as a release-process failure to be
119/// resolved by humans, not silently overwritten).
120pub async fn apply_pending(
121    pool: &sqlx::PgPool,
122    migrations: &[Migration],
123) -> Result<Vec<String>, CoolError> {
124    let states = status(pool, migrations).await?;
125    for (state, migration) in states.iter().zip(migrations) {
126        if state.status == MigrationStatus::ChecksumMismatch {
127            return Err(CoolError::Internal(format!(
128                "migration `{}` is recorded as applied but its SQL has changed; \
129                 resolve drift before continuing",
130                migration.id
131            )));
132        }
133    }
134
135    let mut applied = Vec::new();
136    for (state, migration) in states.iter().zip(migrations) {
137        if state.status != MigrationStatus::Pending {
138            continue;
139        }
140        let mut tx = pool
141            .begin()
142            .await
143            .map_err(|error| CoolError::Database(error.to_string()))?;
144        // PG prepared statements only carry one command per round-trip,
145        // so a multi-statement migration like
146        //   CREATE TABLE foo (...);
147        //   CREATE INDEX bar ON foo (id);
148        // would fail before being recorded. Other DDL helpers in this
149        // crate (audit::ensure_audit_table, idempotency ensure_schema)
150        // already split on `;`; this loop does the same inside the
151        // migration's transaction so partial state can't survive a
152        // mid-script failure.
153        for statement in migration
154            .up
155            .split(';')
156            .map(str::trim)
157            .filter(|s| !s.is_empty())
158        {
159            sqlx::query(statement)
160                .execute(&mut *tx)
161                .await
162                .map_err(|error| CoolError::Database(error.to_string()))?;
163        }
164        sqlx::query(
165            "INSERT INTO cratestack_migrations (id, description, checksum) VALUES ($1, $2, $3)",
166        )
167        .bind(&migration.id)
168        .bind(&migration.description)
169        .bind(migration.checksum().as_slice())
170        .execute(&mut *tx)
171        .await
172        .map_err(|error| CoolError::Database(error.to_string()))?;
173        tx.commit()
174            .await
175            .map_err(|error| CoolError::Database(error.to_string()))?;
176        applied.push(migration.id.clone());
177    }
178
179    Ok(applied)
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    fn migration(id: &str, up: &str) -> Migration {
187        Migration {
188            id: id.to_owned(),
189            description: format!("migration {id}"),
190            up: up.to_owned(),
191            down: None,
192        }
193    }
194
195    #[test]
196    fn checksum_changes_when_up_sql_changes() {
197        let a = migration("20260101000000_init", "CREATE TABLE a (id INT);");
198        let mut b = a.clone();
199        b.up = "CREATE TABLE a (id BIGINT);".to_owned();
200        assert_ne!(a.checksum(), b.checksum());
201    }
202
203    #[test]
204    fn checksum_is_stable_for_same_inputs() {
205        let a = migration("20260101000000_init", "CREATE TABLE a (id INT);");
206        let b = a.clone();
207        assert_eq!(a.checksum(), b.checksum());
208    }
209}