cratestack_sqlx/
migrations.rs1use crate::sqlx;
6use cratestack_core::CoolError;
7use sha2::{Digest, Sha256};
8
9pub const MIGRATIONS_TABLE_DDL: &str = r#"
10CREATE TABLE IF NOT EXISTS cratestack_migrations (
11 id TEXT PRIMARY KEY,
12 description TEXT NOT NULL,
13 checksum BYTEA NOT NULL,
14 applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
15);
16"#;
17
18#[derive(Debug, Clone)]
22pub struct Migration {
23 pub id: String,
25 pub description: String,
26 pub up: String,
27 pub down: Option<String>,
28}
29
30impl Migration {
31 pub fn checksum(&self) -> [u8; 32] {
32 let mut hasher = Sha256::new();
33 hasher.update(self.id.as_bytes());
34 hasher.update(b"\0");
35 hasher.update(self.description.as_bytes());
36 hasher.update(b"\0");
37 hasher.update(self.up.as_bytes());
38 hasher.finalize().into()
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum MigrationStatus {
44 Pending,
45 Applied,
46 ChecksumMismatch,
47}
48
49#[derive(Debug, Clone)]
50pub struct MigrationState {
51 pub id: String,
52 pub status: MigrationStatus,
53}
54
55pub async fn ensure_migrations_table(pool: &sqlx::PgPool) -> Result<(), CoolError> {
56 for statement in MIGRATIONS_TABLE_DDL
57 .split(';')
58 .map(str::trim)
59 .filter(|s| !s.is_empty())
60 {
61 sqlx::query(statement)
62 .execute(pool)
63 .await
64 .map_err(|error| CoolError::Database(error.to_string()))?;
65 }
66 Ok(())
67}
68
69pub async fn status(
73 pool: &sqlx::PgPool,
74 migrations: &[Migration],
75) -> Result<Vec<MigrationState>, CoolError> {
76 ensure_migrations_table(pool).await?;
77 let rows = sqlx::query_as::<_, (String, Vec<u8>)>(
78 "SELECT id, checksum FROM cratestack_migrations ORDER BY id",
79 )
80 .fetch_all(pool)
81 .await
82 .map_err(|error| CoolError::Database(error.to_string()))?;
83
84 let mut applied: std::collections::HashMap<String, Vec<u8>> = std::collections::HashMap::new();
85 for (id, checksum) in rows {
86 applied.insert(id, checksum);
87 }
88
89 Ok(migrations
90 .iter()
91 .map(|m| {
92 let id = m.id.clone();
93 match applied.get(&id) {
94 Some(stored) if stored.as_slice() == m.checksum().as_slice() => MigrationState {
95 id,
96 status: MigrationStatus::Applied,
97 },
98 Some(_) => MigrationState {
99 id,
100 status: MigrationStatus::ChecksumMismatch,
101 },
102 None => MigrationState {
103 id,
104 status: MigrationStatus::Pending,
105 },
106 }
107 })
108 .collect())
109}
110
111pub async fn apply_pending(
116 pool: &sqlx::PgPool,
117 migrations: &[Migration],
118) -> Result<Vec<String>, CoolError> {
119 let states = status(pool, migrations).await?;
120 for (state, migration) in states.iter().zip(migrations) {
121 if state.status == MigrationStatus::ChecksumMismatch {
122 return Err(CoolError::Internal(format!(
123 "migration `{}` is recorded as applied but its SQL has changed; \
124 resolve drift before continuing",
125 migration.id
126 )));
127 }
128 }
129
130 let mut applied = Vec::new();
131 for (state, migration) in states.iter().zip(migrations) {
132 if state.status != MigrationStatus::Pending {
133 continue;
134 }
135 let mut tx = pool
136 .begin()
137 .await
138 .map_err(|error| CoolError::Database(error.to_string()))?;
139 for statement in migration
144 .up
145 .split(';')
146 .map(str::trim)
147 .filter(|s| !s.is_empty())
148 {
149 sqlx::query(statement)
150 .execute(&mut *tx)
151 .await
152 .map_err(|error| CoolError::Database(error.to_string()))?;
153 }
154 sqlx::query(
155 "INSERT INTO cratestack_migrations (id, description, checksum) VALUES ($1, $2, $3)",
156 )
157 .bind(&migration.id)
158 .bind(&migration.description)
159 .bind(migration.checksum().as_slice())
160 .execute(&mut *tx)
161 .await
162 .map_err(|error| CoolError::Database(error.to_string()))?;
163 tx.commit()
164 .await
165 .map_err(|error| CoolError::Database(error.to_string()))?;
166 applied.push(migration.id.clone());
167 }
168
169 Ok(applied)
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175
176 fn migration(id: &str, up: &str) -> Migration {
177 Migration {
178 id: id.to_owned(),
179 description: format!("migration {id}"),
180 up: up.to_owned(),
181 down: None,
182 }
183 }
184
185 #[test]
186 fn checksum_changes_when_up_sql_changes() {
187 let a = migration("20260101000000_init", "CREATE TABLE a (id INT);");
188 let mut b = a.clone();
189 b.up = "CREATE TABLE a (id BIGINT);".to_owned();
190 assert_ne!(a.checksum(), b.checksum());
191 }
192
193 #[test]
194 fn checksum_is_stable_for_same_inputs() {
195 let a = migration("20260101000000_init", "CREATE TABLE a (id INT);");
196 let b = a.clone();
197 assert_eq!(a.checksum(), b.checksum());
198 }
199}