cratestack_sqlx/
migrations.rs1use crate::sqlx;
10
11
12use cratestack_core::CoolError;
13use sha2::{Digest, Sha256};
14
15pub const MIGRATIONS_TABLE_DDL: &str = r#"
16CREATE TABLE IF NOT EXISTS cratestack_migrations (
17 id TEXT PRIMARY KEY,
18 description TEXT NOT NULL,
19 checksum BYTEA NOT NULL,
20 applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
21);
22"#;
23
24#[derive(Debug, Clone)]
29pub struct Migration {
30 pub id: String,
32 pub description: String,
33 pub up: String,
34 pub down: Option<String>,
35}
36
37impl Migration {
38 pub fn checksum(&self) -> [u8; 32] {
39 let mut hasher = Sha256::new();
40 hasher.update(self.id.as_bytes());
41 hasher.update(b"\0");
42 hasher.update(self.description.as_bytes());
43 hasher.update(b"\0");
44 hasher.update(self.up.as_bytes());
45 hasher.finalize().into()
46 }
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum MigrationStatus {
51 Pending,
52 Applied,
53 ChecksumMismatch,
54}
55
56#[derive(Debug, Clone)]
57pub struct MigrationState {
58 pub id: String,
59 pub status: MigrationStatus,
60}
61
62pub async fn ensure_migrations_table(pool: &sqlx::PgPool) -> Result<(), CoolError> {
63 for statement in MIGRATIONS_TABLE_DDL
64 .split(';')
65 .map(str::trim)
66 .filter(|s| !s.is_empty())
67 {
68 sqlx::query(statement)
69 .execute(pool)
70 .await
71 .map_err(|error| CoolError::Database(error.to_string()))?;
72 }
73 Ok(())
74}
75
76pub async fn status(
80 pool: &sqlx::PgPool,
81 migrations: &[Migration],
82) -> Result<Vec<MigrationState>, CoolError> {
83 ensure_migrations_table(pool).await?;
84 let rows = sqlx::query_as::<_, (String, Vec<u8>)>(
85 "SELECT id, checksum FROM cratestack_migrations ORDER BY id",
86 )
87 .fetch_all(pool)
88 .await
89 .map_err(|error| CoolError::Database(error.to_string()))?;
90
91 let mut applied: std::collections::HashMap<String, Vec<u8>> = std::collections::HashMap::new();
92 for (id, checksum) in rows {
93 applied.insert(id, checksum);
94 }
95
96 Ok(migrations
97 .iter()
98 .map(|m| {
99 let id = m.id.clone();
100 match applied.get(&id) {
101 Some(stored) if stored.as_slice() == m.checksum().as_slice() => MigrationState {
102 id,
103 status: MigrationStatus::Applied,
104 },
105 Some(_) => MigrationState {
106 id,
107 status: MigrationStatus::ChecksumMismatch,
108 },
109 None => MigrationState {
110 id,
111 status: MigrationStatus::Pending,
112 },
113 }
114 })
115 .collect())
116}
117
118pub async fn apply_pending(
123 pool: &sqlx::PgPool,
124 migrations: &[Migration],
125) -> Result<Vec<String>, CoolError> {
126 let states = status(pool, migrations).await?;
127 for (state, migration) in states.iter().zip(migrations) {
128 if state.status == MigrationStatus::ChecksumMismatch {
129 return Err(CoolError::Internal(format!(
130 "migration `{}` is recorded as applied but its SQL has changed; \
131 resolve drift before continuing",
132 migration.id
133 )));
134 }
135 }
136
137 let mut applied = Vec::new();
138 for (state, migration) in states.iter().zip(migrations) {
139 if state.status != MigrationStatus::Pending {
140 continue;
141 }
142 let mut tx = pool
143 .begin()
144 .await
145 .map_err(|error| CoolError::Database(error.to_string()))?;
146 for statement in migration
156 .up
157 .split(';')
158 .map(str::trim)
159 .filter(|s| !s.is_empty())
160 {
161 sqlx::query(statement)
162 .execute(&mut *tx)
163 .await
164 .map_err(|error| CoolError::Database(error.to_string()))?;
165 }
166 sqlx::query(
167 "INSERT INTO cratestack_migrations (id, description, checksum) VALUES ($1, $2, $3)",
168 )
169 .bind(&migration.id)
170 .bind(&migration.description)
171 .bind(migration.checksum().as_slice())
172 .execute(&mut *tx)
173 .await
174 .map_err(|error| CoolError::Database(error.to_string()))?;
175 tx.commit()
176 .await
177 .map_err(|error| CoolError::Database(error.to_string()))?;
178 applied.push(migration.id.clone());
179 }
180
181 Ok(applied)
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187
188 fn migration(id: &str, up: &str) -> Migration {
189 Migration {
190 id: id.to_owned(),
191 description: format!("migration {id}"),
192 up: up.to_owned(),
193 down: None,
194 }
195 }
196
197 #[test]
198 fn checksum_changes_when_up_sql_changes() {
199 let a = migration("20260101000000_init", "CREATE TABLE a (id INT);");
200 let mut b = a.clone();
201 b.up = "CREATE TABLE a (id BIGINT);".to_owned();
202 assert_ne!(a.checksum(), b.checksum());
203 }
204
205 #[test]
206 fn checksum_is_stable_for_same_inputs() {
207 let a = migration("20260101000000_init", "CREATE TABLE a (id INT);");
208 let b = a.clone();
209 assert_eq!(a.checksum(), b.checksum());
210 }
211}