Skip to main content

cratestack_sqlx/
migrations.rs

1//! Forward-only migration runner.
2//!
3//! Phase 3 ships the foundations: a tracking table, a runner that applies
4//! pending migrations in order, and a checksum guard so the SQL recorded
5//! in the database can be cross-checked against the one a deployment ships
6//! today. Banks running zero-downtime migrations write their migrations by
7//! hand (the contract under regulation is "the change is reviewable as a
8//! SQL diff") — schema-diff-driven generation is out of scope here.
9use crate::sqlx;
10
11
12use cratestack_core::CoolError;
13use sha2::{Digest, Sha256};
14
15pub const MIGRATIONS_TABLE_DDL: &str = r#"
16CREATE TABLE IF NOT EXISTS cratestack_migrations (
17    id TEXT PRIMARY KEY,
18    description TEXT NOT NULL,
19    checksum BYTEA NOT NULL,
20    applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
21);
22"#;
23
24/// A single migration step. Banks store these in source control alongside
25/// the schema; the runner applies any rows in `migrations` not yet present
26/// in `cratestack_migrations`. `down` is recorded but the runner doesn't
27/// call it — irreversible-by-default is the safe banking posture.
28#[derive(Debug, Clone)]
29pub struct Migration {
30    /// Sortable id, conventionally `YYYYMMDDHHMMSS_<slug>`.
31    pub id: String,
32    pub description: String,
33    pub up: String,
34    pub down: Option<String>,
35}
36
37impl Migration {
38    pub fn checksum(&self) -> [u8; 32] {
39        let mut hasher = Sha256::new();
40        hasher.update(self.id.as_bytes());
41        hasher.update(b"\0");
42        hasher.update(self.description.as_bytes());
43        hasher.update(b"\0");
44        hasher.update(self.up.as_bytes());
45        hasher.finalize().into()
46    }
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum MigrationStatus {
51    Pending,
52    Applied,
53    ChecksumMismatch,
54}
55
56#[derive(Debug, Clone)]
57pub struct MigrationState {
58    pub id: String,
59    pub status: MigrationStatus,
60}
61
62pub async fn ensure_migrations_table(pool: &sqlx::PgPool) -> Result<(), CoolError> {
63    for statement in MIGRATIONS_TABLE_DDL
64        .split(';')
65        .map(str::trim)
66        .filter(|s| !s.is_empty())
67    {
68        sqlx::query(statement)
69            .execute(pool)
70            .await
71            .map_err(|error| CoolError::Database(error.to_string()))?;
72    }
73    Ok(())
74}
75
76/// Inspect each migration in `migrations` against `cratestack_migrations`
77/// and report which are pending / applied / drifted. Use before `apply` to
78/// surface drift to the operator without changing state.
79pub async fn status(
80    pool: &sqlx::PgPool,
81    migrations: &[Migration],
82) -> Result<Vec<MigrationState>, CoolError> {
83    ensure_migrations_table(pool).await?;
84    let rows = sqlx::query_as::<_, (String, Vec<u8>)>(
85        "SELECT id, checksum FROM cratestack_migrations ORDER BY id",
86    )
87    .fetch_all(pool)
88    .await
89    .map_err(|error| CoolError::Database(error.to_string()))?;
90
91    let mut applied: std::collections::HashMap<String, Vec<u8>> = std::collections::HashMap::new();
92    for (id, checksum) in rows {
93        applied.insert(id, checksum);
94    }
95
96    Ok(migrations
97        .iter()
98        .map(|m| {
99            let id = m.id.clone();
100            match applied.get(&id) {
101                Some(stored) if stored.as_slice() == m.checksum().as_slice() => MigrationState {
102                    id,
103                    status: MigrationStatus::Applied,
104                },
105                Some(_) => MigrationState {
106                    id,
107                    status: MigrationStatus::ChecksumMismatch,
108                },
109                None => MigrationState {
110                    id,
111                    status: MigrationStatus::Pending,
112                },
113            }
114        })
115        .collect())
116}
117
118/// Apply every pending migration in the input slice, in order. Each
119/// migration runs inside its own transaction; checksum drift aborts the
120/// whole apply (banks treat drift as a release-process failure to be
121/// resolved by humans, not silently overwritten).
122pub async fn apply_pending(
123    pool: &sqlx::PgPool,
124    migrations: &[Migration],
125) -> Result<Vec<String>, CoolError> {
126    let states = status(pool, migrations).await?;
127    for (state, migration) in states.iter().zip(migrations) {
128        if state.status == MigrationStatus::ChecksumMismatch {
129            return Err(CoolError::Internal(format!(
130                "migration `{}` is recorded as applied but its SQL has changed; \
131                 resolve drift before continuing",
132                migration.id
133            )));
134        }
135    }
136
137    let mut applied = Vec::new();
138    for (state, migration) in states.iter().zip(migrations) {
139        if state.status != MigrationStatus::Pending {
140            continue;
141        }
142        let mut tx = pool
143            .begin()
144            .await
145            .map_err(|error| CoolError::Database(error.to_string()))?;
146        // PG prepared statements only carry one command per round-trip,
147        // so a multi-statement migration like
148        //   CREATE TABLE foo (...);
149        //   CREATE INDEX bar ON foo (id);
150        // would fail before being recorded. Other DDL helpers in this
151        // crate (audit::ensure_audit_table, idempotency ensure_schema)
152        // already split on `;`; this loop does the same inside the
153        // migration's transaction so partial state can't survive a
154        // mid-script failure.
155        for statement in migration
156            .up
157            .split(';')
158            .map(str::trim)
159            .filter(|s| !s.is_empty())
160        {
161            sqlx::query(statement)
162                .execute(&mut *tx)
163                .await
164                .map_err(|error| CoolError::Database(error.to_string()))?;
165        }
166        sqlx::query(
167            "INSERT INTO cratestack_migrations (id, description, checksum) VALUES ($1, $2, $3)",
168        )
169        .bind(&migration.id)
170        .bind(&migration.description)
171        .bind(migration.checksum().as_slice())
172        .execute(&mut *tx)
173        .await
174        .map_err(|error| CoolError::Database(error.to_string()))?;
175        tx.commit()
176            .await
177            .map_err(|error| CoolError::Database(error.to_string()))?;
178        applied.push(migration.id.clone());
179    }
180
181    Ok(applied)
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187
188    fn migration(id: &str, up: &str) -> Migration {
189        Migration {
190            id: id.to_owned(),
191            description: format!("migration {id}"),
192            up: up.to_owned(),
193            down: None,
194        }
195    }
196
197    #[test]
198    fn checksum_changes_when_up_sql_changes() {
199        let a = migration("20260101000000_init", "CREATE TABLE a (id INT);");
200        let mut b = a.clone();
201        b.up = "CREATE TABLE a (id BIGINT);".to_owned();
202        assert_ne!(a.checksum(), b.checksum());
203    }
204
205    #[test]
206    fn checksum_is_stable_for_same_inputs() {
207        let a = migration("20260101000000_init", "CREATE TABLE a (id INT);");
208        let b = a.clone();
209        assert_eq!(a.checksum(), b.checksum());
210    }
211}