sqlx_core_oldapi/postgres/
migrate.rs

1use crate::connection::{ConnectOptions, Connection};
2use crate::error::Result;
3use crate::executor::Executor;
4use crate::migrate::{AppliedMigration, Migration};
5use crate::migrate::{Migrate, MigrateDatabase};
6use crate::migrate::{MigrateError, MigrateResult};
7use crate::postgres::{PgConnectOptions, PgConnection, Postgres};
8use crate::query::query;
9use crate::query_as::query_as;
10use crate::query_scalar::query_scalar;
11use futures_core::future::BoxFuture;
12use std::str::FromStr;
13use std::time::Duration;
14use std::time::Instant;
15
16fn parse_for_maintenance(url: &str) -> Result<(PgConnectOptions, String)> {
17    let mut options = PgConnectOptions::from_str(url)?;
18
19    // pull out the name of the database to create
20    let database = options
21        .database
22        .as_deref()
23        .unwrap_or(&options.username)
24        .to_owned();
25
26    // switch us to the maintenance database
27    // use `postgres` _unless_ the database is postgres, in which case, use `template1`
28    // this matches the behavior of the `createdb` util
29    options.database = if database == "postgres" {
30        Some("template1".into())
31    } else {
32        Some("postgres".into())
33    };
34
35    Ok((options, database))
36}
37
38impl MigrateDatabase for Postgres {
39    fn create_database(url: &str) -> BoxFuture<'_, Result<()>> {
40        Box::pin(async move {
41            let (options, database) = parse_for_maintenance(url)?;
42            let mut conn = options.connect().await?;
43
44            let _ = conn
45                .execute(&*format!(
46                    "CREATE DATABASE \"{}\"",
47                    database.replace('"', "\"\"")
48                ))
49                .await?;
50
51            Ok(())
52        })
53    }
54
55    fn database_exists(url: &str) -> BoxFuture<'_, Result<bool>> {
56        Box::pin(async move {
57            let (options, database) = parse_for_maintenance(url)?;
58            let mut conn = options.connect().await?;
59
60            let exists: bool =
61                query_scalar("select exists(SELECT 1 from pg_database WHERE datname = $1)")
62                    .bind(database)
63                    .fetch_one(&mut conn)
64                    .await?;
65
66            Ok(exists)
67        })
68    }
69
70    fn drop_database(url: &str) -> BoxFuture<'_, Result<()>> {
71        Box::pin(async move {
72            let (options, database) = parse_for_maintenance(url)?;
73            let mut conn = options.connect().await?;
74
75            let _ = conn
76                .execute(&*format!(
77                    "DROP DATABASE IF EXISTS \"{}\"",
78                    database.replace('"', "\"\"")
79                ))
80                .await?;
81
82            Ok(())
83        })
84    }
85}
86
87impl Migrate for PgConnection {
88    fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<()>> {
89        Box::pin(async move {
90            // language=SQL
91            self.execute(
92                r#"
93CREATE TABLE IF NOT EXISTS _sqlx_migrations (
94    version BIGINT PRIMARY KEY,
95    description TEXT NOT NULL,
96    installed_on TIMESTAMPTZ NOT NULL DEFAULT now(),
97    success BOOLEAN NOT NULL,
98    checksum BYTEA NOT NULL,
99    execution_time BIGINT NOT NULL
100);
101                "#,
102            )
103            .await?;
104
105            Ok(())
106        })
107    }
108
109    fn version(&mut self) -> BoxFuture<'_, Result<Option<(i64, bool)>>> {
110        Box::pin(async move {
111            // language=SQL
112            let row = query_as(
113                "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1",
114            )
115            .fetch_optional(self)
116            .await?;
117
118            Ok(row)
119        })
120    }
121
122    fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>>> {
123        Box::pin(async move {
124            // language=SQL
125            let row: Option<(i64,)> = query_as(
126                "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
127            )
128            .fetch_optional(self)
129            .await?;
130
131            Ok(row.map(|r| r.0))
132        })
133    }
134
135    fn list_applied_migrations(&mut self) -> BoxFuture<'_, Result<Vec<AppliedMigration>>> {
136        Box::pin(async move {
137            // language=SQL
138            let rows: Vec<(i64, Vec<u8>)> =
139                query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
140                    .fetch_all(self)
141                    .await?;
142
143            let migrations = rows
144                .into_iter()
145                .map(|(version, checksum)| AppliedMigration {
146                    version,
147                    checksum: checksum.into(),
148                })
149                .collect();
150
151            Ok(migrations)
152        })
153    }
154
155    fn lock(&mut self) -> BoxFuture<'_, Result<()>> {
156        Box::pin(async move {
157            let database_name = current_database(self).await?;
158            let lock_id = generate_lock_id(&database_name);
159
160            // create an application lock over the database
161            // this function will not return until the lock is acquired
162
163            // https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
164            // https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE
165
166            // language=SQL
167            let _ = query("SELECT pg_advisory_lock($1)")
168                .bind(lock_id)
169                .execute(self)
170                .await?;
171
172            Ok(())
173        })
174    }
175
176    fn unlock(&mut self) -> BoxFuture<'_, Result<()>> {
177        Box::pin(async move {
178            let database_name = current_database(self).await?;
179            let lock_id = generate_lock_id(&database_name);
180
181            // language=SQL
182            let _ = query("SELECT pg_advisory_unlock($1)")
183                .bind(lock_id)
184                .execute(self)
185                .await?;
186
187            Ok(())
188        })
189    }
190
191    fn validate<'e: 'm, 'm>(
192        &'e mut self,
193        migration: &'m Migration,
194    ) -> BoxFuture<'m, MigrateResult<()>> {
195        Box::pin(async move {
196            // language=SQL
197            let checksum: Option<Vec<u8>> =
198                query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = $1")
199                    .bind(migration.version)
200                    .fetch_optional(self)
201                    .await
202                    .map_err(MigrateError::AccessMigrationMetadata)?;
203
204            if let Some(checksum) = checksum {
205                if checksum == *migration.checksum {
206                    Ok(())
207                } else {
208                    Err(MigrateError::VersionMismatch(migration.version))
209                }
210            } else {
211                Err(MigrateError::VersionMissing(migration.version))
212            }
213        })
214    }
215
216    fn apply<'e: 'm, 'm>(
217        &'e mut self,
218        migration: &'m Migration,
219    ) -> BoxFuture<'m, Result<Duration>> {
220        Box::pin(async move {
221            let mut tx = self.begin().await?;
222            let start = Instant::now();
223
224            // Use a single transaction for the actual migration script and the essential bookeeping so we never
225            // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
226            // The `execution_time` however can only be measured for the whole transaction. This value _only_ exists for
227            // data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1
228            // and update it once the actual transaction completed.
229            let _ = tx.execute(&*migration.sql).await?;
230
231            // language=SQL
232            let _ = query(
233                r#"
234    INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
235    VALUES ( $1, $2, TRUE, $3, -1 )
236                "#,
237            )
238            .bind(migration.version)
239            .bind(&*migration.description)
240            .bind(&*migration.checksum)
241            .execute(&mut tx)
242            .await?;
243
244            tx.commit().await?;
245
246            // Update `elapsed_time`.
247            // NOTE: The process may disconnect/die at this point, so the elapsed time value might be lost. We accept
248            //       this small risk since this value is not super important.
249
250            let elapsed = start.elapsed();
251
252            // language=SQL
253            let _ = query(
254                r#"
255    UPDATE _sqlx_migrations
256    SET execution_time = $1
257    WHERE version = $2
258                "#,
259            )
260            .bind(i64::try_from(elapsed.as_nanos()).map_err(crate::error::Error::IntegerOverflow)?)
261            .bind(migration.version)
262            .execute(self)
263            .await?;
264
265            Ok(elapsed)
266        })
267    }
268
269    fn revert<'e: 'm, 'm>(
270        &'e mut self,
271        migration: &'m Migration,
272    ) -> BoxFuture<'m, Result<Duration>> {
273        Box::pin(async move {
274            // Use a single transaction for the actual migration script and the essential bookeeping so we never
275            // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
276            let mut tx = self.begin().await?;
277            let start = Instant::now();
278
279            let _ = tx.execute(&*migration.sql).await?;
280
281            // language=SQL
282            let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = $1"#)
283                .bind(migration.version)
284                .execute(&mut tx)
285                .await?;
286
287            tx.commit().await?;
288
289            let elapsed = start.elapsed();
290
291            Ok(elapsed)
292        })
293    }
294}
295
296async fn current_database(conn: &mut PgConnection) -> Result<String> {
297    // language=SQL
298    query_scalar("SELECT current_database()")
299        .fetch_one(conn)
300        .await
301}
302
303// inspired from rails: https://github.com/rails/rails/blob/6e49cc77ab3d16c06e12f93158eaf3e507d4120e/activerecord/lib/active_record/migration.rb#L1308
304fn generate_lock_id(database_name: &str) -> i64 {
305    const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
306    // 0x3d32ad9e chosen by fair dice roll
307    0x3d32ad9e * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
308}