sqlx_core_guts/mysql/
migrate.rs1use crate::connection::ConnectOptions;
2use crate::error::Error;
3use crate::executor::Executor;
4use crate::migrate::MigrateError;
5use crate::migrate::{AppliedMigration, Migration};
6use crate::migrate::{Migrate, MigrateDatabase};
7use crate::mysql::{MySql, MySqlConnectOptions, MySqlConnection};
8use crate::query::query;
9use crate::query_as::query_as;
10use crate::query_scalar::query_scalar;
11use futures_core::future::BoxFuture;
12use std::str::FromStr;
13use std::time::Duration;
14use std::time::Instant;
15
16fn parse_for_maintenance(url: &str) -> Result<(MySqlConnectOptions, String), Error> {
17 let mut options = MySqlConnectOptions::from_str(url)?;
18
19 let database = if let Some(database) = &options.database {
20 database.to_owned()
21 } else {
22 return Err(Error::Configuration(
23 "DATABASE_URL does not specify a database".into(),
24 ));
25 };
26
27 options.database = None;
29
30 Ok((options, database))
31}
32
33impl MigrateDatabase for MySql {
34 fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
35 Box::pin(async move {
36 let (options, database) = parse_for_maintenance(url)?;
37 let mut conn = options.connect().await?;
38
39 let _ = conn
40 .execute(&*format!("CREATE DATABASE `{}`", database))
41 .await?;
42
43 Ok(())
44 })
45 }
46
47 fn database_exists(url: &str) -> BoxFuture<'_, Result<bool, Error>> {
48 Box::pin(async move {
49 let (options, database) = parse_for_maintenance(url)?;
50 let mut conn = options.connect().await?;
51
52 let exists: bool = query_scalar(
53 "select exists(SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?)",
54 )
55 .bind(database)
56 .fetch_one(&mut conn)
57 .await?;
58
59 Ok(exists)
60 })
61 }
62
63 fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
64 Box::pin(async move {
65 let (options, database) = parse_for_maintenance(url)?;
66 let mut conn = options.connect().await?;
67
68 let _ = conn
69 .execute(&*format!("DROP DATABASE IF EXISTS `{}`", database,))
70 .await?;
71
72 Ok(())
73 })
74 }
75}
76
77impl Migrate for MySqlConnection {
78 fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
79 Box::pin(async move {
80 self.execute(
82 r#"
83CREATE TABLE IF NOT EXISTS _sqlx_migrations (
84 version BIGINT PRIMARY KEY,
85 description TEXT NOT NULL,
86 installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
87 success BOOLEAN NOT NULL,
88 checksum BLOB NOT NULL,
89 execution_time BIGINT NOT NULL
90);
91 "#,
92 )
93 .await?;
94
95 Ok(())
96 })
97 }
98
99 fn version(&mut self) -> BoxFuture<'_, Result<Option<(i64, bool)>, MigrateError>> {
100 Box::pin(async move {
101 let row = query_as(
103 "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1",
104 )
105 .fetch_optional(self)
106 .await?;
107
108 Ok(row)
109 })
110 }
111
112 fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
113 Box::pin(async move {
114 let row: Option<(i64,)> = query_as(
116 "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
117 )
118 .fetch_optional(self)
119 .await?;
120
121 Ok(row.map(|r| r.0))
122 })
123 }
124
125 fn list_applied_migrations(
126 &mut self,
127 ) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
128 Box::pin(async move {
129 let rows: Vec<(i64, Vec<u8>)> =
131 query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
132 .fetch_all(self)
133 .await?;
134
135 let migrations = rows
136 .into_iter()
137 .map(|(version, checksum)| AppliedMigration {
138 version,
139 checksum: checksum.into(),
140 })
141 .collect();
142
143 Ok(migrations)
144 })
145 }
146
147 fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
148 Box::pin(async move {
149 let database_name = current_database(self).await?;
150 let lock_id = generate_lock_id(&database_name);
151
152 let _ = query("SELECT GET_LOCK(?, -1)")
160 .bind(lock_id)
161 .execute(self)
162 .await?;
163
164 Ok(())
165 })
166 }
167
168 fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
169 Box::pin(async move {
170 let database_name = current_database(self).await?;
171 let lock_id = generate_lock_id(&database_name);
172
173 let _ = query("SELECT RELEASE_LOCK(?)")
175 .bind(lock_id)
176 .execute(self)
177 .await?;
178
179 Ok(())
180 })
181 }
182
183 fn validate<'e: 'm, 'm>(
184 &'e mut self,
185 migration: &'m Migration,
186 ) -> BoxFuture<'m, Result<(), MigrateError>> {
187 Box::pin(async move {
188 let checksum: Option<Vec<u8>> =
190 query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = ?")
191 .bind(migration.version)
192 .fetch_optional(self)
193 .await?;
194
195 if let Some(checksum) = checksum {
196 return if checksum == &*migration.checksum {
197 Ok(())
198 } else {
199 Err(MigrateError::VersionMismatch(migration.version))
200 };
201 } else {
202 Err(MigrateError::VersionMissing(migration.version))
203 }
204 })
205 }
206
207 fn apply<'e: 'm, 'm>(
208 &'e mut self,
209 migration: &'m Migration,
210 ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
211 Box::pin(async move {
212 let start = Instant::now();
213
214 let res = self.execute(&*migration.sql).await;
215
216 let elapsed = start.elapsed();
217
218 let _ = query(
220 r#"
221 INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
222 VALUES ( ?, ?, ?, ?, ? )
223 "#,
224 )
225 .bind(migration.version)
226 .bind(&*migration.description)
227 .bind(res.is_ok())
228 .bind(&*migration.checksum)
229 .bind(elapsed.as_nanos() as i64)
230 .execute(self)
231 .await?;
232
233 res?;
234
235 Ok(elapsed)
236 })
237 }
238
239 fn revert<'e: 'm, 'm>(
240 &'e mut self,
241 migration: &'m Migration,
242 ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
243 Box::pin(async move {
244 let start = Instant::now();
245
246 self.execute(&*migration.sql).await?;
247
248 let elapsed = start.elapsed();
249
250 let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?"#)
252 .bind(migration.version)
253 .execute(self)
254 .await?;
255
256 Ok(elapsed)
257 })
258 }
259}
260
261async fn current_database(conn: &mut MySqlConnection) -> Result<String, MigrateError> {
262 Ok(query_scalar("SELECT DATABASE()").fetch_one(conn).await?)
264}
265
266fn generate_lock_id(database_name: &str) -> String {
268 const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
269 format!(
271 "{:x}",
272 0x3d32ad9e * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
273 )
274}