sqlx_core_oldapi/mysql/
migrate.rs1use crate::connection::{ConnectOptions, Connection};
2use crate::error::{Error, Result};
3use crate::executor::Executor;
4use crate::migrate::{AppliedMigration, Migration};
5use crate::migrate::{Migrate, MigrateDatabase};
6use crate::migrate::{MigrateError, MigrateResult};
7use crate::mysql::{MySql, MySqlConnectOptions, MySqlConnection};
8use crate::query::query;
9use crate::query_as::query_as;
10use crate::query_scalar::query_scalar;
11use futures_core::future::BoxFuture;
12use std::str::FromStr;
13use std::time::Duration;
14use std::time::Instant;
15
16fn parse_for_maintenance(url: &str) -> Result<(MySqlConnectOptions, String)> {
17 let mut options = MySqlConnectOptions::from_str(url)?;
18
19 let database = if let Some(database) = &options.database {
20 database.to_owned()
21 } else {
22 return Err(Error::Configuration(
23 "DATABASE_URL does not specify a database".into(),
24 ));
25 };
26
27 options.database = None;
29
30 Ok((options, database))
31}
32
33impl MigrateDatabase for MySql {
34 fn create_database(url: &str) -> BoxFuture<'_, Result<()>> {
35 Box::pin(async move {
36 let (options, database) = parse_for_maintenance(url)?;
37 let mut conn = options.connect().await?;
38
39 let _ = conn
40 .execute(&*format!("CREATE DATABASE `{}`", database))
41 .await?;
42
43 Ok(())
44 })
45 }
46
47 fn database_exists(url: &str) -> BoxFuture<'_, Result<bool>> {
48 Box::pin(async move {
49 let (options, database) = parse_for_maintenance(url)?;
50 let mut conn = options.connect().await?;
51
52 let exists: bool = query_scalar(
53 "select exists(SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?)",
54 )
55 .bind(database)
56 .fetch_one(&mut conn)
57 .await?;
58
59 Ok(exists)
60 })
61 }
62
63 fn drop_database(url: &str) -> BoxFuture<'_, Result<()>> {
64 Box::pin(async move {
65 let (options, database) = parse_for_maintenance(url)?;
66 let mut conn = options.connect().await?;
67
68 let _ = conn
69 .execute(&*format!("DROP DATABASE IF EXISTS `{}`", database,))
70 .await?;
71
72 Ok(())
73 })
74 }
75}
76
77impl Migrate for MySqlConnection {
78 fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<()>> {
79 Box::pin(async move {
80 self.execute(
82 r#"
83CREATE TABLE IF NOT EXISTS _sqlx_migrations (
84 version BIGINT PRIMARY KEY,
85 description TEXT NOT NULL,
86 installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
87 success BOOLEAN NOT NULL,
88 checksum BLOB NOT NULL,
89 execution_time BIGINT NOT NULL
90);
91 "#,
92 )
93 .await?;
94
95 Ok(())
96 })
97 }
98
99 fn version(&mut self) -> BoxFuture<'_, Result<Option<(i64, bool)>>> {
100 Box::pin(async move {
101 let row = query_as(
103 "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1",
104 )
105 .fetch_optional(self)
106 .await?;
107
108 Ok(row)
109 })
110 }
111
112 fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>>> {
113 Box::pin(async move {
114 let row: Option<(i64,)> = query_as(
116 "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
117 )
118 .fetch_optional(self)
119 .await?;
120
121 Ok(row.map(|r| r.0))
122 })
123 }
124
125 fn list_applied_migrations(&mut self) -> BoxFuture<'_, Result<Vec<AppliedMigration>>> {
126 Box::pin(async move {
127 let rows: Vec<(i64, Vec<u8>)> =
129 query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
130 .fetch_all(self)
131 .await?;
132
133 let migrations = rows
134 .into_iter()
135 .map(|(version, checksum)| AppliedMigration {
136 version,
137 checksum: checksum.into(),
138 })
139 .collect();
140
141 Ok(migrations)
142 })
143 }
144
145 fn lock(&mut self) -> BoxFuture<'_, Result<()>> {
146 Box::pin(async move {
147 let database_name = current_database(self).await?;
148 let lock_id = generate_lock_id(&database_name);
149
150 let _ = query("SELECT GET_LOCK(?, -1)")
158 .bind(lock_id)
159 .execute(self)
160 .await?;
161
162 Ok(())
163 })
164 }
165
166 fn unlock(&mut self) -> BoxFuture<'_, Result<()>> {
167 Box::pin(async move {
168 let database_name = current_database(self).await?;
169 let lock_id = generate_lock_id(&database_name);
170
171 let _ = query("SELECT RELEASE_LOCK(?)")
173 .bind(lock_id)
174 .execute(self)
175 .await?;
176
177 Ok(())
178 })
179 }
180
181 fn validate<'e: 'm, 'm>(
182 &'e mut self,
183 migration: &'m Migration,
184 ) -> BoxFuture<'m, MigrateResult<()>> {
185 Box::pin(async move {
186 let checksum: Option<Vec<u8>> =
188 query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = ?")
189 .bind(migration.version)
190 .fetch_optional(self)
191 .await
192 .map_err(MigrateError::AccessMigrationMetadata)?;
193
194 if let Some(checksum) = checksum {
195 return if checksum == &*migration.checksum {
196 Ok(())
197 } else {
198 Err(MigrateError::VersionMismatch(migration.version))
199 };
200 } else {
201 Err(MigrateError::VersionMissing(migration.version))
202 }
203 })
204 }
205
206 fn apply<'e: 'm, 'm>(
207 &'e mut self,
208 migration: &'m Migration,
209 ) -> BoxFuture<'m, Result<Duration>> {
210 Box::pin(async move {
211 let mut tx = self.begin().await?;
217 let start = Instant::now();
218
219 let _ = query(
227 r#"
228 INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
229 VALUES ( ?, ?, FALSE, ?, -1 )
230 "#,
231 )
232 .bind(migration.version)
233 .bind(&*migration.description)
234 .bind(&*migration.checksum)
235 .execute(&mut tx)
236 .await?;
237
238 let _ = tx.execute(&*migration.sql).await?;
239
240 let _ = query(
242 r#"
243 UPDATE _sqlx_migrations
244 SET success = TRUE
245 WHERE version = ?
246 "#,
247 )
248 .bind(migration.version)
249 .execute(&mut tx)
250 .await?;
251
252 tx.commit().await?;
253
254 let elapsed = start.elapsed();
259
260 let _ = query(
261 r#"
262 UPDATE _sqlx_migrations
263 SET execution_time = ?
264 WHERE version = ?
265 "#,
266 )
267 .bind(elapsed.as_nanos() as i64)
268 .bind(migration.version)
269 .execute(self)
270 .await?;
271
272 Ok(elapsed)
273 })
274 }
275
276 fn revert<'e: 'm, 'm>(
277 &'e mut self,
278 migration: &'m Migration,
279 ) -> BoxFuture<'m, Result<Duration>> {
280 Box::pin(async move {
281 let mut tx = self.begin().await?;
284 let start = Instant::now();
285
286 let _ = query(
294 r#"
295 UPDATE _sqlx_migrations
296 SET success = FALSE
297 WHERE version = ?
298 "#,
299 )
300 .bind(migration.version)
301 .execute(&mut tx)
302 .await?;
303
304 tx.execute(&*migration.sql).await?;
305
306 let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?"#)
308 .bind(migration.version)
309 .execute(&mut tx)
310 .await?;
311
312 tx.commit().await?;
313
314 let elapsed = start.elapsed();
315
316 Ok(elapsed)
317 })
318 }
319}
320
321async fn current_database(conn: &mut MySqlConnection) -> Result<String> {
322 Ok(query_scalar("SELECT DATABASE()").fetch_one(conn).await?)
324}
325
326fn generate_lock_id(database_name: &str) -> String {
328 const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
329 format!(
331 "{:x}",
332 0x3d32ad9e * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
333 )
334}