sqlx_core_guts/mysql/
migrate.rs

1use crate::connection::ConnectOptions;
2use crate::error::Error;
3use crate::executor::Executor;
4use crate::migrate::MigrateError;
5use crate::migrate::{AppliedMigration, Migration};
6use crate::migrate::{Migrate, MigrateDatabase};
7use crate::mysql::{MySql, MySqlConnectOptions, MySqlConnection};
8use crate::query::query;
9use crate::query_as::query_as;
10use crate::query_scalar::query_scalar;
11use futures_core::future::BoxFuture;
12use std::str::FromStr;
13use std::time::Duration;
14use std::time::Instant;
15
16fn parse_for_maintenance(url: &str) -> Result<(MySqlConnectOptions, String), Error> {
17    let mut options = MySqlConnectOptions::from_str(url)?;
18
19    let database = if let Some(database) = &options.database {
20        database.to_owned()
21    } else {
22        return Err(Error::Configuration(
23            "DATABASE_URL does not specify a database".into(),
24        ));
25    };
26
27    // switch us to <no> database for create/drop commands
28    options.database = None;
29
30    Ok((options, database))
31}
32
33impl MigrateDatabase for MySql {
34    fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
35        Box::pin(async move {
36            let (options, database) = parse_for_maintenance(url)?;
37            let mut conn = options.connect().await?;
38
39            let _ = conn
40                .execute(&*format!("CREATE DATABASE `{}`", database))
41                .await?;
42
43            Ok(())
44        })
45    }
46
47    fn database_exists(url: &str) -> BoxFuture<'_, Result<bool, Error>> {
48        Box::pin(async move {
49            let (options, database) = parse_for_maintenance(url)?;
50            let mut conn = options.connect().await?;
51
52            let exists: bool = query_scalar(
53                "select exists(SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?)",
54            )
55            .bind(database)
56            .fetch_one(&mut conn)
57            .await?;
58
59            Ok(exists)
60        })
61    }
62
63    fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
64        Box::pin(async move {
65            let (options, database) = parse_for_maintenance(url)?;
66            let mut conn = options.connect().await?;
67
68            let _ = conn
69                .execute(&*format!("DROP DATABASE IF EXISTS `{}`", database,))
70                .await?;
71
72            Ok(())
73        })
74    }
75}
76
77impl Migrate for MySqlConnection {
78    fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
79        Box::pin(async move {
80            // language=MySQL
81            self.execute(
82                r#"
83CREATE TABLE IF NOT EXISTS _sqlx_migrations (
84    version BIGINT PRIMARY KEY,
85    description TEXT NOT NULL,
86    installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
87    success BOOLEAN NOT NULL,
88    checksum BLOB NOT NULL,
89    execution_time BIGINT NOT NULL
90);
91                "#,
92            )
93            .await?;
94
95            Ok(())
96        })
97    }
98
99    fn version(&mut self) -> BoxFuture<'_, Result<Option<(i64, bool)>, MigrateError>> {
100        Box::pin(async move {
101            // language=SQL
102            let row = query_as(
103                "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1",
104            )
105            .fetch_optional(self)
106            .await?;
107
108            Ok(row)
109        })
110    }
111
112    fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
113        Box::pin(async move {
114            // language=SQL
115            let row: Option<(i64,)> = query_as(
116                "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
117            )
118            .fetch_optional(self)
119            .await?;
120
121            Ok(row.map(|r| r.0))
122        })
123    }
124
125    fn list_applied_migrations(
126        &mut self,
127    ) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
128        Box::pin(async move {
129            // language=SQL
130            let rows: Vec<(i64, Vec<u8>)> =
131                query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
132                    .fetch_all(self)
133                    .await?;
134
135            let migrations = rows
136                .into_iter()
137                .map(|(version, checksum)| AppliedMigration {
138                    version,
139                    checksum: checksum.into(),
140                })
141                .collect();
142
143            Ok(migrations)
144        })
145    }
146
147    fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
148        Box::pin(async move {
149            let database_name = current_database(self).await?;
150            let lock_id = generate_lock_id(&database_name);
151
152            // create an application lock over the database
153            // this function will not return until the lock is acquired
154
155            // https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
156            // https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE
157
158            // language=MySQL
159            let _ = query("SELECT GET_LOCK(?, -1)")
160                .bind(lock_id)
161                .execute(self)
162                .await?;
163
164            Ok(())
165        })
166    }
167
168    fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
169        Box::pin(async move {
170            let database_name = current_database(self).await?;
171            let lock_id = generate_lock_id(&database_name);
172
173            // language=MySQL
174            let _ = query("SELECT RELEASE_LOCK(?)")
175                .bind(lock_id)
176                .execute(self)
177                .await?;
178
179            Ok(())
180        })
181    }
182
183    fn validate<'e: 'm, 'm>(
184        &'e mut self,
185        migration: &'m Migration,
186    ) -> BoxFuture<'m, Result<(), MigrateError>> {
187        Box::pin(async move {
188            // language=SQL
189            let checksum: Option<Vec<u8>> =
190                query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = ?")
191                    .bind(migration.version)
192                    .fetch_optional(self)
193                    .await?;
194
195            if let Some(checksum) = checksum {
196                return if checksum == &*migration.checksum {
197                    Ok(())
198                } else {
199                    Err(MigrateError::VersionMismatch(migration.version))
200                };
201            } else {
202                Err(MigrateError::VersionMissing(migration.version))
203            }
204        })
205    }
206
207    fn apply<'e: 'm, 'm>(
208        &'e mut self,
209        migration: &'m Migration,
210    ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
211        Box::pin(async move {
212            let start = Instant::now();
213
214            let res = self.execute(&*migration.sql).await;
215
216            let elapsed = start.elapsed();
217
218            // language=MySQL
219            let _ = query(
220                r#"
221    INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
222    VALUES ( ?, ?, ?, ?, ? )
223                "#,
224            )
225            .bind(migration.version)
226            .bind(&*migration.description)
227            .bind(res.is_ok())
228            .bind(&*migration.checksum)
229            .bind(elapsed.as_nanos() as i64)
230            .execute(self)
231            .await?;
232
233            res?;
234
235            Ok(elapsed)
236        })
237    }
238
239    fn revert<'e: 'm, 'm>(
240        &'e mut self,
241        migration: &'m Migration,
242    ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
243        Box::pin(async move {
244            let start = Instant::now();
245
246            self.execute(&*migration.sql).await?;
247
248            let elapsed = start.elapsed();
249
250            // language=SQL
251            let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?"#)
252                .bind(migration.version)
253                .execute(self)
254                .await?;
255
256            Ok(elapsed)
257        })
258    }
259}
260
261async fn current_database(conn: &mut MySqlConnection) -> Result<String, MigrateError> {
262    // language=MySQL
263    Ok(query_scalar("SELECT DATABASE()").fetch_one(conn).await?)
264}
265
266// inspired from rails: https://github.com/rails/rails/blob/6e49cc77ab3d16c06e12f93158eaf3e507d4120e/activerecord/lib/active_record/migration.rb#L1308
267fn generate_lock_id(database_name: &str) -> String {
268    const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
269    // 0x3d32ad9e chosen by fair dice roll
270    format!(
271        "{:x}",
272        0x3d32ad9e * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
273    )
274}