Skip to main content

cratestack_sqlx/
migrations.rs

1//! Forward-only migration runner with a checksum guard against drift.
2//! Banks write migrations by hand (the contract under regulation is "the
3//! change is reviewable as a SQL diff").
4
5use crate::sqlx;
6use cratestack_core::CoolError;
7use sha2::{Digest, Sha256};
8
9pub const MIGRATIONS_TABLE_DDL: &str = r#"
10CREATE TABLE IF NOT EXISTS cratestack_migrations (
11    id TEXT PRIMARY KEY,
12    description TEXT NOT NULL,
13    checksum BYTEA NOT NULL,
14    applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
15);
16"#;
17
18/// A single migration step. The runner applies any rows not yet
19/// present in `cratestack_migrations`. `down` is recorded but never
20/// called — irreversible-by-default is the safe banking posture.
21#[derive(Debug, Clone)]
22pub struct Migration {
23    /// Sortable id, conventionally `YYYYMMDDHHMMSS_<slug>`.
24    pub id: String,
25    pub description: String,
26    pub up: String,
27    pub down: Option<String>,
28}
29
30impl Migration {
31    pub fn checksum(&self) -> [u8; 32] {
32        let mut hasher = Sha256::new();
33        hasher.update(self.id.as_bytes());
34        hasher.update(b"\0");
35        hasher.update(self.description.as_bytes());
36        hasher.update(b"\0");
37        hasher.update(self.up.as_bytes());
38        hasher.finalize().into()
39    }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum MigrationStatus {
44    Pending,
45    Applied,
46    ChecksumMismatch,
47}
48
49#[derive(Debug, Clone)]
50pub struct MigrationState {
51    pub id: String,
52    pub status: MigrationStatus,
53}
54
55pub async fn ensure_migrations_table(pool: &sqlx::PgPool) -> Result<(), CoolError> {
56    for statement in MIGRATIONS_TABLE_DDL
57        .split(';')
58        .map(str::trim)
59        .filter(|s| !s.is_empty())
60    {
61        sqlx::query(statement)
62            .execute(pool)
63            .await
64            .map_err(|error| CoolError::Database(error.to_string()))?;
65    }
66    Ok(())
67}
68
69/// Inspect each migration in `migrations` against `cratestack_migrations`
70/// and report which are pending / applied / drifted. Use before `apply` to
71/// surface drift to the operator without changing state.
72pub async fn status(
73    pool: &sqlx::PgPool,
74    migrations: &[Migration],
75) -> Result<Vec<MigrationState>, CoolError> {
76    ensure_migrations_table(pool).await?;
77    let rows = sqlx::query_as::<_, (String, Vec<u8>)>(
78        "SELECT id, checksum FROM cratestack_migrations ORDER BY id",
79    )
80    .fetch_all(pool)
81    .await
82    .map_err(|error| CoolError::Database(error.to_string()))?;
83
84    let mut applied: std::collections::HashMap<String, Vec<u8>> = std::collections::HashMap::new();
85    for (id, checksum) in rows {
86        applied.insert(id, checksum);
87    }
88
89    Ok(migrations
90        .iter()
91        .map(|m| {
92            let id = m.id.clone();
93            match applied.get(&id) {
94                Some(stored) if stored.as_slice() == m.checksum().as_slice() => MigrationState {
95                    id,
96                    status: MigrationStatus::Applied,
97                },
98                Some(_) => MigrationState {
99                    id,
100                    status: MigrationStatus::ChecksumMismatch,
101                },
102                None => MigrationState {
103                    id,
104                    status: MigrationStatus::Pending,
105                },
106            }
107        })
108        .collect())
109}
110
111/// Apply every pending migration in the input slice in order. Each
112/// runs in its own transaction; checksum drift aborts the whole apply
113/// (banks treat drift as a release-process failure for humans, not a
114/// silent overwrite).
115pub async fn apply_pending(
116    pool: &sqlx::PgPool,
117    migrations: &[Migration],
118) -> Result<Vec<String>, CoolError> {
119    let states = status(pool, migrations).await?;
120    for (state, migration) in states.iter().zip(migrations) {
121        if state.status == MigrationStatus::ChecksumMismatch {
122            return Err(CoolError::Internal(format!(
123                "migration `{}` is recorded as applied but its SQL has changed; \
124                 resolve drift before continuing",
125                migration.id
126            )));
127        }
128    }
129
130    let mut applied = Vec::new();
131    for (state, migration) in states.iter().zip(migrations) {
132        if state.status != MigrationStatus::Pending {
133            continue;
134        }
135        let mut tx = pool
136            .begin()
137            .await
138            .map_err(|error| CoolError::Database(error.to_string()))?;
139        // PG prepared statements only carry one command per round-trip.
140        // Split on `;` inside the migration's transaction so partial
141        // state can't survive a mid-script failure (audit/idempotency
142        // DDL helpers do the same).
143        for statement in migration
144            .up
145            .split(';')
146            .map(str::trim)
147            .filter(|s| !s.is_empty())
148        {
149            sqlx::query(statement)
150                .execute(&mut *tx)
151                .await
152                .map_err(|error| CoolError::Database(error.to_string()))?;
153        }
154        sqlx::query(
155            "INSERT INTO cratestack_migrations (id, description, checksum) VALUES ($1, $2, $3)",
156        )
157        .bind(&migration.id)
158        .bind(&migration.description)
159        .bind(migration.checksum().as_slice())
160        .execute(&mut *tx)
161        .await
162        .map_err(|error| CoolError::Database(error.to_string()))?;
163        tx.commit()
164            .await
165            .map_err(|error| CoolError::Database(error.to_string()))?;
166        applied.push(migration.id.clone());
167    }
168
169    Ok(applied)
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175
176    fn migration(id: &str, up: &str) -> Migration {
177        Migration {
178            id: id.to_owned(),
179            description: format!("migration {id}"),
180            up: up.to_owned(),
181            down: None,
182        }
183    }
184
185    #[test]
186    fn checksum_changes_when_up_sql_changes() {
187        let a = migration("20260101000000_init", "CREATE TABLE a (id INT);");
188        let mut b = a.clone();
189        b.up = "CREATE TABLE a (id BIGINT);".to_owned();
190        assert_ne!(a.checksum(), b.checksum());
191    }
192
193    #[test]
194    fn checksum_is_stable_for_same_inputs() {
195        let a = migration("20260101000000_init", "CREATE TABLE a (id INT);");
196        let b = a.clone();
197        assert_eq!(a.checksum(), b.checksum());
198    }
199}