sqlx_core_guts/postgres/
migrate.rs

1use crate::connection::{ConnectOptions, Connection};
2use crate::error::Error;
3use crate::executor::Executor;
4use crate::migrate::MigrateError;
5use crate::migrate::{AppliedMigration, Migration};
6use crate::migrate::{Migrate, MigrateDatabase};
7use crate::postgres::{PgConnectOptions, PgConnection, Postgres};
8use crate::query::query;
9use crate::query_as::query_as;
10use crate::query_scalar::query_scalar;
11use futures_core::future::BoxFuture;
12use std::str::FromStr;
13use std::time::Duration;
14use std::time::Instant;
15
16fn parse_for_maintenance(url: &str) -> Result<(PgConnectOptions, String), Error> {
17    let mut options = PgConnectOptions::from_str(url)?;
18
19    // pull out the name of the database to create
20    let database = options
21        .database
22        .as_deref()
23        .unwrap_or(&options.username)
24        .to_owned();
25
26    // switch us to the maintenance database
27    // use `postgres` _unless_ the database is postgres, in which case, use `template1`
28    // this matches the behavior of the `createdb` util
29    options.database = if database == "postgres" {
30        Some("template1".into())
31    } else {
32        Some("postgres".into())
33    };
34
35    Ok((options, database))
36}
37
38impl MigrateDatabase for Postgres {
39    fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
40        Box::pin(async move {
41            let (options, database) = parse_for_maintenance(url)?;
42            let mut conn = options.connect().await?;
43
44            let _ = conn
45                .execute(&*format!(
46                    "CREATE DATABASE \"{}\"",
47                    database.replace('"', "\"\"")
48                ))
49                .await?;
50
51            Ok(())
52        })
53    }
54
55    fn database_exists(url: &str) -> BoxFuture<'_, Result<bool, Error>> {
56        Box::pin(async move {
57            let (options, database) = parse_for_maintenance(url)?;
58            let mut conn = options.connect().await?;
59
60            let exists: bool =
61                query_scalar("select exists(SELECT 1 from pg_database WHERE datname = $1)")
62                    .bind(database)
63                    .fetch_one(&mut conn)
64                    .await?;
65
66            Ok(exists)
67        })
68    }
69
70    fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
71        Box::pin(async move {
72            let (options, database) = parse_for_maintenance(url)?;
73            let mut conn = options.connect().await?;
74
75            let _ = conn
76                .execute(&*format!(
77                    "DROP DATABASE IF EXISTS \"{}\"",
78                    database.replace('"', "\"\"")
79                ))
80                .await?;
81
82            Ok(())
83        })
84    }
85}
86
87impl Migrate for PgConnection {
88    fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
89        Box::pin(async move {
90            // language=SQL
91            self.execute(
92                r#"
93CREATE TABLE IF NOT EXISTS _sqlx_migrations (
94    version BIGINT PRIMARY KEY,
95    description TEXT NOT NULL,
96    installed_on TIMESTAMPTZ NOT NULL DEFAULT now(),
97    success BOOLEAN NOT NULL,
98    checksum BYTEA NOT NULL,
99    execution_time BIGINT NOT NULL
100);
101                "#,
102            )
103            .await?;
104
105            Ok(())
106        })
107    }
108
109    fn version(&mut self) -> BoxFuture<'_, Result<Option<(i64, bool)>, MigrateError>> {
110        Box::pin(async move {
111            // language=SQL
112            let row = query_as(
113                "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1",
114            )
115            .fetch_optional(self)
116            .await?;
117
118            Ok(row)
119        })
120    }
121
122    fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
123        Box::pin(async move {
124            // language=SQL
125            let row: Option<(i64,)> = query_as(
126                "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
127            )
128            .fetch_optional(self)
129            .await?;
130
131            Ok(row.map(|r| r.0))
132        })
133    }
134
135    fn list_applied_migrations(
136        &mut self,
137    ) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
138        Box::pin(async move {
139            // language=SQL
140            let rows: Vec<(i64, Vec<u8>)> =
141                query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
142                    .fetch_all(self)
143                    .await?;
144
145            let migrations = rows
146                .into_iter()
147                .map(|(version, checksum)| AppliedMigration {
148                    version,
149                    checksum: checksum.into(),
150                })
151                .collect();
152
153            Ok(migrations)
154        })
155    }
156
157    fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
158        Box::pin(async move {
159            let database_name = current_database(self).await?;
160            let lock_id = generate_lock_id(&database_name);
161
162            // create an application lock over the database
163            // this function will not return until the lock is acquired
164
165            // https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
166            // https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE
167
168            // language=SQL
169            let _ = query("SELECT pg_advisory_lock($1)")
170                .bind(lock_id)
171                .execute(self)
172                .await?;
173
174            Ok(())
175        })
176    }
177
178    fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
179        Box::pin(async move {
180            let database_name = current_database(self).await?;
181            let lock_id = generate_lock_id(&database_name);
182
183            // language=SQL
184            let _ = query("SELECT pg_advisory_unlock($1)")
185                .bind(lock_id)
186                .execute(self)
187                .await?;
188
189            Ok(())
190        })
191    }
192
193    fn validate<'e: 'm, 'm>(
194        &'e mut self,
195        migration: &'m Migration,
196    ) -> BoxFuture<'m, Result<(), MigrateError>> {
197        Box::pin(async move {
198            // language=SQL
199            let checksum: Option<Vec<u8>> =
200                query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = $1")
201                    .bind(migration.version)
202                    .fetch_optional(self)
203                    .await?;
204
205            if let Some(checksum) = checksum {
206                return if checksum == &*migration.checksum {
207                    Ok(())
208                } else {
209                    Err(MigrateError::VersionMismatch(migration.version))
210                };
211            } else {
212                Err(MigrateError::VersionMissing(migration.version))
213            }
214        })
215    }
216
217    fn apply<'e: 'm, 'm>(
218        &'e mut self,
219        migration: &'m Migration,
220    ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
221        Box::pin(async move {
222            let mut tx = self.begin().await?;
223            let start = Instant::now();
224
225            let _ = tx.execute(&*migration.sql).await?;
226
227            tx.commit().await?;
228
229            let elapsed = start.elapsed();
230
231            // language=SQL
232            let _ = query(
233                r#"
234    INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
235    VALUES ( $1, $2, TRUE, $3, $4 )
236                "#,
237            )
238            .bind(migration.version)
239            .bind(&*migration.description)
240            .bind(&*migration.checksum)
241            .bind(elapsed.as_nanos() as i64)
242            .execute(self)
243            .await?;
244
245            Ok(elapsed)
246        })
247    }
248
249    fn revert<'e: 'm, 'm>(
250        &'e mut self,
251        migration: &'m Migration,
252    ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
253        Box::pin(async move {
254            let mut tx = self.begin().await?;
255            let start = Instant::now();
256
257            let _ = tx.execute(&*migration.sql).await?;
258
259            tx.commit().await?;
260
261            let elapsed = start.elapsed();
262
263            // language=SQL
264            let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = $1"#)
265                .bind(migration.version)
266                .execute(self)
267                .await?;
268
269            Ok(elapsed)
270        })
271    }
272}
273
274async fn current_database(conn: &mut PgConnection) -> Result<String, MigrateError> {
275    // language=SQL
276    Ok(query_scalar("SELECT current_database()")
277        .fetch_one(conn)
278        .await?)
279}
280
281// inspired from rails: https://github.com/rails/rails/blob/6e49cc77ab3d16c06e12f93158eaf3e507d4120e/activerecord/lib/active_record/migration.rb#L1308
282fn generate_lock_id(database_name: &str) -> i64 {
283    const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
284    // 0x3d32ad9e chosen by fair dice roll
285    0x3d32ad9e * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
286}