sqlx_core_oldapi/mysql/
migrate.rs

1use crate::connection::{ConnectOptions, Connection};
2use crate::error::{Error, Result};
3use crate::executor::Executor;
4use crate::migrate::{AppliedMigration, Migration};
5use crate::migrate::{Migrate, MigrateDatabase};
6use crate::migrate::{MigrateError, MigrateResult};
7use crate::mysql::{MySql, MySqlConnectOptions, MySqlConnection};
8use crate::query::query;
9use crate::query_as::query_as;
10use crate::query_scalar::query_scalar;
11use futures_core::future::BoxFuture;
12use std::str::FromStr;
13use std::time::Duration;
14use std::time::Instant;
15
16fn parse_for_maintenance(url: &str) -> Result<(MySqlConnectOptions, String)> {
17    let mut options = MySqlConnectOptions::from_str(url)?;
18
19    let database = if let Some(database) = &options.database {
20        database.to_owned()
21    } else {
22        return Err(Error::Configuration(
23            "DATABASE_URL does not specify a database".into(),
24        ));
25    };
26
27    // switch us to <no> database for create/drop commands
28    options.database = None;
29
30    Ok((options, database))
31}
32
33impl MigrateDatabase for MySql {
34    fn create_database(url: &str) -> BoxFuture<'_, Result<()>> {
35        Box::pin(async move {
36            let (options, database) = parse_for_maintenance(url)?;
37            let mut conn = options.connect().await?;
38
39            let _ = conn
40                .execute(&*format!("CREATE DATABASE `{}`", database))
41                .await?;
42
43            Ok(())
44        })
45    }
46
47    fn database_exists(url: &str) -> BoxFuture<'_, Result<bool>> {
48        Box::pin(async move {
49            let (options, database) = parse_for_maintenance(url)?;
50            let mut conn = options.connect().await?;
51
52            let exists: bool = query_scalar(
53                "select exists(SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?)",
54            )
55            .bind(database)
56            .fetch_one(&mut conn)
57            .await?;
58
59            Ok(exists)
60        })
61    }
62
63    fn drop_database(url: &str) -> BoxFuture<'_, Result<()>> {
64        Box::pin(async move {
65            let (options, database) = parse_for_maintenance(url)?;
66            let mut conn = options.connect().await?;
67
68            let _ = conn
69                .execute(&*format!("DROP DATABASE IF EXISTS `{}`", database,))
70                .await?;
71
72            Ok(())
73        })
74    }
75}
76
77impl Migrate for MySqlConnection {
78    fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<()>> {
79        Box::pin(async move {
80            // language=MySQL
81            self.execute(
82                r#"
83CREATE TABLE IF NOT EXISTS _sqlx_migrations (
84    version BIGINT PRIMARY KEY,
85    description TEXT NOT NULL,
86    installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
87    success BOOLEAN NOT NULL,
88    checksum BLOB NOT NULL,
89    execution_time BIGINT NOT NULL
90);
91                "#,
92            )
93            .await?;
94
95            Ok(())
96        })
97    }
98
99    fn version(&mut self) -> BoxFuture<'_, Result<Option<(i64, bool)>>> {
100        Box::pin(async move {
101            // language=SQL
102            let row = query_as(
103                "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1",
104            )
105            .fetch_optional(self)
106            .await?;
107
108            Ok(row)
109        })
110    }
111
112    fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>>> {
113        Box::pin(async move {
114            // language=SQL
115            let row: Option<(i64,)> = query_as(
116                "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
117            )
118            .fetch_optional(self)
119            .await?;
120
121            Ok(row.map(|r| r.0))
122        })
123    }
124
125    fn list_applied_migrations(&mut self) -> BoxFuture<'_, Result<Vec<AppliedMigration>>> {
126        Box::pin(async move {
127            // language=SQL
128            let rows: Vec<(i64, Vec<u8>)> =
129                query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
130                    .fetch_all(self)
131                    .await?;
132
133            let migrations = rows
134                .into_iter()
135                .map(|(version, checksum)| AppliedMigration {
136                    version,
137                    checksum: checksum.into(),
138                })
139                .collect();
140
141            Ok(migrations)
142        })
143    }
144
145    fn lock(&mut self) -> BoxFuture<'_, Result<()>> {
146        Box::pin(async move {
147            let database_name = current_database(self).await?;
148            let lock_id = generate_lock_id(&database_name);
149
150            // create an application lock over the database
151            // this function will not return until the lock is acquired
152
153            // https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
154            // https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE
155
156            // language=MySQL
157            let _ = query("SELECT GET_LOCK(?, -1)")
158                .bind(lock_id)
159                .execute(self)
160                .await?;
161
162            Ok(())
163        })
164    }
165
166    fn unlock(&mut self) -> BoxFuture<'_, Result<()>> {
167        Box::pin(async move {
168            let database_name = current_database(self).await?;
169            let lock_id = generate_lock_id(&database_name);
170
171            // language=MySQL
172            let _ = query("SELECT RELEASE_LOCK(?)")
173                .bind(lock_id)
174                .execute(self)
175                .await?;
176
177            Ok(())
178        })
179    }
180
181    fn validate<'e: 'm, 'm>(
182        &'e mut self,
183        migration: &'m Migration,
184    ) -> BoxFuture<'m, MigrateResult<()>> {
185        Box::pin(async move {
186            // language=SQL
187            let checksum: Option<Vec<u8>> =
188                query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = ?")
189                    .bind(migration.version)
190                    .fetch_optional(self)
191                    .await
192                    .map_err(MigrateError::AccessMigrationMetadata)?;
193
194            if let Some(checksum) = checksum {
195                return if checksum == &*migration.checksum {
196                    Ok(())
197                } else {
198                    Err(MigrateError::VersionMismatch(migration.version))
199                };
200            } else {
201                Err(MigrateError::VersionMissing(migration.version))
202            }
203        })
204    }
205
206    fn apply<'e: 'm, 'm>(
207        &'e mut self,
208        migration: &'m Migration,
209    ) -> BoxFuture<'m, Result<Duration>> {
210        Box::pin(async move {
211            // Use a single transaction for the actual migration script and the essential bookeeping so we never
212            // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
213            // The `execution_time` however can only be measured for the whole transaction. This value _only_ exists for
214            // data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1
215            // and update it once the actual transaction completed.
216            let mut tx = self.begin().await?;
217            let start = Instant::now();
218
219            // For MySQL we cannot really isolate migrations due to implicit commits caused by table modification, see
220            // https://dev.mysql.com/doc/refman/8.0/en/implicit-commit.html
221            //
222            // To somewhat try to detect this, we first insert the migration into the migration table with
223            // `success=FALSE` and later modify the flag.
224            //
225            // language=MySQL
226            let _ = query(
227                r#"
228    INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
229    VALUES ( ?, ?, FALSE, ?, -1 )
230                "#,
231            )
232            .bind(migration.version)
233            .bind(&*migration.description)
234            .bind(&*migration.checksum)
235            .execute(&mut tx)
236            .await?;
237
238            let _ = tx.execute(&*migration.sql).await?;
239
240            // language=MySQL
241            let _ = query(
242                r#"
243    UPDATE _sqlx_migrations
244    SET success = TRUE
245    WHERE version = ?
246                "#,
247            )
248            .bind(migration.version)
249            .execute(&mut tx)
250            .await?;
251
252            tx.commit().await?;
253
254            // Update `elapsed_time`.
255            // NOTE: The process may disconnect/die at this point, so the elapsed time value might be lost. We accept
256            //       this small risk since this value is not super important.
257
258            let elapsed = start.elapsed();
259
260            let _ = query(
261                r#"
262    UPDATE _sqlx_migrations
263    SET execution_time = ?
264    WHERE version = ?
265                "#,
266            )
267            .bind(elapsed.as_nanos() as i64)
268            .bind(migration.version)
269            .execute(self)
270            .await?;
271
272            Ok(elapsed)
273        })
274    }
275
276    fn revert<'e: 'm, 'm>(
277        &'e mut self,
278        migration: &'m Migration,
279    ) -> BoxFuture<'m, Result<Duration>> {
280        Box::pin(async move {
281            // Use a single transaction for the actual migration script and the essential bookeeping so we never
282            // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
283            let mut tx = self.begin().await?;
284            let start = Instant::now();
285
286            // For MySQL we cannot really isolate migrations due to implicit commits caused by table modification, see
287            // https://dev.mysql.com/doc/refman/8.0/en/implicit-commit.html
288            //
289            // To somewhat try to detect this, we first insert the migration into the migration table with
290            // `success=FALSE` and later remove the migration altogether.
291            //
292            // language=MySQL
293            let _ = query(
294                r#"
295    UPDATE _sqlx_migrations
296    SET success = FALSE
297    WHERE version = ?
298                "#,
299            )
300            .bind(migration.version)
301            .execute(&mut tx)
302            .await?;
303
304            tx.execute(&*migration.sql).await?;
305
306            // language=SQL
307            let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?"#)
308                .bind(migration.version)
309                .execute(&mut tx)
310                .await?;
311
312            tx.commit().await?;
313
314            let elapsed = start.elapsed();
315
316            Ok(elapsed)
317        })
318    }
319}
320
321async fn current_database(conn: &mut MySqlConnection) -> Result<String> {
322    // language=MySQL
323    Ok(query_scalar("SELECT DATABASE()").fetch_one(conn).await?)
324}
325
326// inspired from rails: https://github.com/rails/rails/blob/6e49cc77ab3d16c06e12f93158eaf3e507d4120e/activerecord/lib/active_record/migration.rb#L1308
327fn generate_lock_id(database_name: &str) -> String {
328    const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
329    // 0x3d32ad9e chosen by fair dice roll
330    format!(
331        "{:x}",
332        0x3d32ad9e * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
333    )
334}