cratestack_sqlx/
migrations.rs1use cratestack_core::CoolError;
11use sha2::{Digest, Sha256};
12
13pub const MIGRATIONS_TABLE_DDL: &str = r#"
14CREATE TABLE IF NOT EXISTS cratestack_migrations (
15 id TEXT PRIMARY KEY,
16 description TEXT NOT NULL,
17 checksum BYTEA NOT NULL,
18 applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
19);
20"#;
21
22#[derive(Debug, Clone)]
27pub struct Migration {
28 pub id: String,
30 pub description: String,
31 pub up: String,
32 pub down: Option<String>,
33}
34
35impl Migration {
36 pub fn checksum(&self) -> [u8; 32] {
37 let mut hasher = Sha256::new();
38 hasher.update(self.id.as_bytes());
39 hasher.update(b"\0");
40 hasher.update(self.description.as_bytes());
41 hasher.update(b"\0");
42 hasher.update(self.up.as_bytes());
43 hasher.finalize().into()
44 }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum MigrationStatus {
49 Pending,
50 Applied,
51 ChecksumMismatch,
52}
53
54#[derive(Debug, Clone)]
55pub struct MigrationState {
56 pub id: String,
57 pub status: MigrationStatus,
58}
59
60pub async fn ensure_migrations_table(pool: &sqlx::PgPool) -> Result<(), CoolError> {
61 for statement in MIGRATIONS_TABLE_DDL
62 .split(';')
63 .map(str::trim)
64 .filter(|s| !s.is_empty())
65 {
66 sqlx::query(statement)
67 .execute(pool)
68 .await
69 .map_err(|error| CoolError::Database(error.to_string()))?;
70 }
71 Ok(())
72}
73
74pub async fn status(
78 pool: &sqlx::PgPool,
79 migrations: &[Migration],
80) -> Result<Vec<MigrationState>, CoolError> {
81 ensure_migrations_table(pool).await?;
82 let rows = sqlx::query_as::<_, (String, Vec<u8>)>(
83 "SELECT id, checksum FROM cratestack_migrations ORDER BY id",
84 )
85 .fetch_all(pool)
86 .await
87 .map_err(|error| CoolError::Database(error.to_string()))?;
88
89 let mut applied: std::collections::HashMap<String, Vec<u8>> = std::collections::HashMap::new();
90 for (id, checksum) in rows {
91 applied.insert(id, checksum);
92 }
93
94 Ok(migrations
95 .iter()
96 .map(|m| {
97 let id = m.id.clone();
98 match applied.get(&id) {
99 Some(stored) if stored.as_slice() == m.checksum().as_slice() => MigrationState {
100 id,
101 status: MigrationStatus::Applied,
102 },
103 Some(_) => MigrationState {
104 id,
105 status: MigrationStatus::ChecksumMismatch,
106 },
107 None => MigrationState {
108 id,
109 status: MigrationStatus::Pending,
110 },
111 }
112 })
113 .collect())
114}
115
116pub async fn apply_pending(
121 pool: &sqlx::PgPool,
122 migrations: &[Migration],
123) -> Result<Vec<String>, CoolError> {
124 let states = status(pool, migrations).await?;
125 for (state, migration) in states.iter().zip(migrations) {
126 if state.status == MigrationStatus::ChecksumMismatch {
127 return Err(CoolError::Internal(format!(
128 "migration `{}` is recorded as applied but its SQL has changed; \
129 resolve drift before continuing",
130 migration.id
131 )));
132 }
133 }
134
135 let mut applied = Vec::new();
136 for (state, migration) in states.iter().zip(migrations) {
137 if state.status != MigrationStatus::Pending {
138 continue;
139 }
140 let mut tx = pool
141 .begin()
142 .await
143 .map_err(|error| CoolError::Database(error.to_string()))?;
144 for statement in migration
154 .up
155 .split(';')
156 .map(str::trim)
157 .filter(|s| !s.is_empty())
158 {
159 sqlx::query(statement)
160 .execute(&mut *tx)
161 .await
162 .map_err(|error| CoolError::Database(error.to_string()))?;
163 }
164 sqlx::query(
165 "INSERT INTO cratestack_migrations (id, description, checksum) VALUES ($1, $2, $3)",
166 )
167 .bind(&migration.id)
168 .bind(&migration.description)
169 .bind(migration.checksum().as_slice())
170 .execute(&mut *tx)
171 .await
172 .map_err(|error| CoolError::Database(error.to_string()))?;
173 tx.commit()
174 .await
175 .map_err(|error| CoolError::Database(error.to_string()))?;
176 applied.push(migration.id.clone());
177 }
178
179 Ok(applied)
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185
186 fn migration(id: &str, up: &str) -> Migration {
187 Migration {
188 id: id.to_owned(),
189 description: format!("migration {id}"),
190 up: up.to_owned(),
191 down: None,
192 }
193 }
194
195 #[test]
196 fn checksum_changes_when_up_sql_changes() {
197 let a = migration("20260101000000_init", "CREATE TABLE a (id INT);");
198 let mut b = a.clone();
199 b.up = "CREATE TABLE a (id BIGINT);".to_owned();
200 assert_ne!(a.checksum(), b.checksum());
201 }
202
203 #[test]
204 fn checksum_is_stable_for_same_inputs() {
205 let a = migration("20260101000000_init", "CREATE TABLE a (id INT);");
206 let b = a.clone();
207 assert_eq!(a.checksum(), b.checksum());
208 }
209}