sqlx_core_guts/postgres/
migrate.rs1use crate::connection::{ConnectOptions, Connection};
2use crate::error::Error;
3use crate::executor::Executor;
4use crate::migrate::MigrateError;
5use crate::migrate::{AppliedMigration, Migration};
6use crate::migrate::{Migrate, MigrateDatabase};
7use crate::postgres::{PgConnectOptions, PgConnection, Postgres};
8use crate::query::query;
9use crate::query_as::query_as;
10use crate::query_scalar::query_scalar;
11use futures_core::future::BoxFuture;
12use std::str::FromStr;
13use std::time::Duration;
14use std::time::Instant;
15
16fn parse_for_maintenance(url: &str) -> Result<(PgConnectOptions, String), Error> {
17 let mut options = PgConnectOptions::from_str(url)?;
18
19 let database = options
21 .database
22 .as_deref()
23 .unwrap_or(&options.username)
24 .to_owned();
25
26 options.database = if database == "postgres" {
30 Some("template1".into())
31 } else {
32 Some("postgres".into())
33 };
34
35 Ok((options, database))
36}
37
38impl MigrateDatabase for Postgres {
39 fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
40 Box::pin(async move {
41 let (options, database) = parse_for_maintenance(url)?;
42 let mut conn = options.connect().await?;
43
44 let _ = conn
45 .execute(&*format!(
46 "CREATE DATABASE \"{}\"",
47 database.replace('"', "\"\"")
48 ))
49 .await?;
50
51 Ok(())
52 })
53 }
54
55 fn database_exists(url: &str) -> BoxFuture<'_, Result<bool, Error>> {
56 Box::pin(async move {
57 let (options, database) = parse_for_maintenance(url)?;
58 let mut conn = options.connect().await?;
59
60 let exists: bool =
61 query_scalar("select exists(SELECT 1 from pg_database WHERE datname = $1)")
62 .bind(database)
63 .fetch_one(&mut conn)
64 .await?;
65
66 Ok(exists)
67 })
68 }
69
70 fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
71 Box::pin(async move {
72 let (options, database) = parse_for_maintenance(url)?;
73 let mut conn = options.connect().await?;
74
75 let _ = conn
76 .execute(&*format!(
77 "DROP DATABASE IF EXISTS \"{}\"",
78 database.replace('"', "\"\"")
79 ))
80 .await?;
81
82 Ok(())
83 })
84 }
85}
86
87impl Migrate for PgConnection {
88 fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
89 Box::pin(async move {
90 self.execute(
92 r#"
93CREATE TABLE IF NOT EXISTS _sqlx_migrations (
94 version BIGINT PRIMARY KEY,
95 description TEXT NOT NULL,
96 installed_on TIMESTAMPTZ NOT NULL DEFAULT now(),
97 success BOOLEAN NOT NULL,
98 checksum BYTEA NOT NULL,
99 execution_time BIGINT NOT NULL
100);
101 "#,
102 )
103 .await?;
104
105 Ok(())
106 })
107 }
108
109 fn version(&mut self) -> BoxFuture<'_, Result<Option<(i64, bool)>, MigrateError>> {
110 Box::pin(async move {
111 let row = query_as(
113 "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1",
114 )
115 .fetch_optional(self)
116 .await?;
117
118 Ok(row)
119 })
120 }
121
122 fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
123 Box::pin(async move {
124 let row: Option<(i64,)> = query_as(
126 "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
127 )
128 .fetch_optional(self)
129 .await?;
130
131 Ok(row.map(|r| r.0))
132 })
133 }
134
135 fn list_applied_migrations(
136 &mut self,
137 ) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
138 Box::pin(async move {
139 let rows: Vec<(i64, Vec<u8>)> =
141 query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
142 .fetch_all(self)
143 .await?;
144
145 let migrations = rows
146 .into_iter()
147 .map(|(version, checksum)| AppliedMigration {
148 version,
149 checksum: checksum.into(),
150 })
151 .collect();
152
153 Ok(migrations)
154 })
155 }
156
157 fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
158 Box::pin(async move {
159 let database_name = current_database(self).await?;
160 let lock_id = generate_lock_id(&database_name);
161
162 let _ = query("SELECT pg_advisory_lock($1)")
170 .bind(lock_id)
171 .execute(self)
172 .await?;
173
174 Ok(())
175 })
176 }
177
178 fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
179 Box::pin(async move {
180 let database_name = current_database(self).await?;
181 let lock_id = generate_lock_id(&database_name);
182
183 let _ = query("SELECT pg_advisory_unlock($1)")
185 .bind(lock_id)
186 .execute(self)
187 .await?;
188
189 Ok(())
190 })
191 }
192
193 fn validate<'e: 'm, 'm>(
194 &'e mut self,
195 migration: &'m Migration,
196 ) -> BoxFuture<'m, Result<(), MigrateError>> {
197 Box::pin(async move {
198 let checksum: Option<Vec<u8>> =
200 query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = $1")
201 .bind(migration.version)
202 .fetch_optional(self)
203 .await?;
204
205 if let Some(checksum) = checksum {
206 return if checksum == &*migration.checksum {
207 Ok(())
208 } else {
209 Err(MigrateError::VersionMismatch(migration.version))
210 };
211 } else {
212 Err(MigrateError::VersionMissing(migration.version))
213 }
214 })
215 }
216
217 fn apply<'e: 'm, 'm>(
218 &'e mut self,
219 migration: &'m Migration,
220 ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
221 Box::pin(async move {
222 let mut tx = self.begin().await?;
223 let start = Instant::now();
224
225 let _ = tx.execute(&*migration.sql).await?;
226
227 tx.commit().await?;
228
229 let elapsed = start.elapsed();
230
231 let _ = query(
233 r#"
234 INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
235 VALUES ( $1, $2, TRUE, $3, $4 )
236 "#,
237 )
238 .bind(migration.version)
239 .bind(&*migration.description)
240 .bind(&*migration.checksum)
241 .bind(elapsed.as_nanos() as i64)
242 .execute(self)
243 .await?;
244
245 Ok(elapsed)
246 })
247 }
248
249 fn revert<'e: 'm, 'm>(
250 &'e mut self,
251 migration: &'m Migration,
252 ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
253 Box::pin(async move {
254 let mut tx = self.begin().await?;
255 let start = Instant::now();
256
257 let _ = tx.execute(&*migration.sql).await?;
258
259 tx.commit().await?;
260
261 let elapsed = start.elapsed();
262
263 let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = $1"#)
265 .bind(migration.version)
266 .execute(self)
267 .await?;
268
269 Ok(elapsed)
270 })
271 }
272}
273
274async fn current_database(conn: &mut PgConnection) -> Result<String, MigrateError> {
275 Ok(query_scalar("SELECT current_database()")
277 .fetch_one(conn)
278 .await?)
279}
280
281fn generate_lock_id(database_name: &str) -> i64 {
283 const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
284 0x3d32ad9e * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
286}