sqlx_core_oldapi/postgres/
migrate.rs1use crate::connection::{ConnectOptions, Connection};
2use crate::error::Result;
3use crate::executor::Executor;
4use crate::migrate::{AppliedMigration, Migration};
5use crate::migrate::{Migrate, MigrateDatabase};
6use crate::migrate::{MigrateError, MigrateResult};
7use crate::postgres::{PgConnectOptions, PgConnection, Postgres};
8use crate::query::query;
9use crate::query_as::query_as;
10use crate::query_scalar::query_scalar;
11use futures_core::future::BoxFuture;
12use std::str::FromStr;
13use std::time::Duration;
14use std::time::Instant;
15
16fn parse_for_maintenance(url: &str) -> Result<(PgConnectOptions, String)> {
17 let mut options = PgConnectOptions::from_str(url)?;
18
19 let database = options
21 .database
22 .as_deref()
23 .unwrap_or(&options.username)
24 .to_owned();
25
26 options.database = if database == "postgres" {
30 Some("template1".into())
31 } else {
32 Some("postgres".into())
33 };
34
35 Ok((options, database))
36}
37
38impl MigrateDatabase for Postgres {
39 fn create_database(url: &str) -> BoxFuture<'_, Result<()>> {
40 Box::pin(async move {
41 let (options, database) = parse_for_maintenance(url)?;
42 let mut conn = options.connect().await?;
43
44 let _ = conn
45 .execute(&*format!(
46 "CREATE DATABASE \"{}\"",
47 database.replace('"', "\"\"")
48 ))
49 .await?;
50
51 Ok(())
52 })
53 }
54
55 fn database_exists(url: &str) -> BoxFuture<'_, Result<bool>> {
56 Box::pin(async move {
57 let (options, database) = parse_for_maintenance(url)?;
58 let mut conn = options.connect().await?;
59
60 let exists: bool =
61 query_scalar("select exists(SELECT 1 from pg_database WHERE datname = $1)")
62 .bind(database)
63 .fetch_one(&mut conn)
64 .await?;
65
66 Ok(exists)
67 })
68 }
69
70 fn drop_database(url: &str) -> BoxFuture<'_, Result<()>> {
71 Box::pin(async move {
72 let (options, database) = parse_for_maintenance(url)?;
73 let mut conn = options.connect().await?;
74
75 let _ = conn
76 .execute(&*format!(
77 "DROP DATABASE IF EXISTS \"{}\"",
78 database.replace('"', "\"\"")
79 ))
80 .await?;
81
82 Ok(())
83 })
84 }
85}
86
87impl Migrate for PgConnection {
88 fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<()>> {
89 Box::pin(async move {
90 self.execute(
92 r#"
93CREATE TABLE IF NOT EXISTS _sqlx_migrations (
94 version BIGINT PRIMARY KEY,
95 description TEXT NOT NULL,
96 installed_on TIMESTAMPTZ NOT NULL DEFAULT now(),
97 success BOOLEAN NOT NULL,
98 checksum BYTEA NOT NULL,
99 execution_time BIGINT NOT NULL
100);
101 "#,
102 )
103 .await?;
104
105 Ok(())
106 })
107 }
108
109 fn version(&mut self) -> BoxFuture<'_, Result<Option<(i64, bool)>>> {
110 Box::pin(async move {
111 let row = query_as(
113 "SELECT version, NOT success FROM _sqlx_migrations ORDER BY version DESC LIMIT 1",
114 )
115 .fetch_optional(self)
116 .await?;
117
118 Ok(row)
119 })
120 }
121
122 fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>>> {
123 Box::pin(async move {
124 let row: Option<(i64,)> = query_as(
126 "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
127 )
128 .fetch_optional(self)
129 .await?;
130
131 Ok(row.map(|r| r.0))
132 })
133 }
134
135 fn list_applied_migrations(&mut self) -> BoxFuture<'_, Result<Vec<AppliedMigration>>> {
136 Box::pin(async move {
137 let rows: Vec<(i64, Vec<u8>)> =
139 query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
140 .fetch_all(self)
141 .await?;
142
143 let migrations = rows
144 .into_iter()
145 .map(|(version, checksum)| AppliedMigration {
146 version,
147 checksum: checksum.into(),
148 })
149 .collect();
150
151 Ok(migrations)
152 })
153 }
154
155 fn lock(&mut self) -> BoxFuture<'_, Result<()>> {
156 Box::pin(async move {
157 let database_name = current_database(self).await?;
158 let lock_id = generate_lock_id(&database_name);
159
160 let _ = query("SELECT pg_advisory_lock($1)")
168 .bind(lock_id)
169 .execute(self)
170 .await?;
171
172 Ok(())
173 })
174 }
175
176 fn unlock(&mut self) -> BoxFuture<'_, Result<()>> {
177 Box::pin(async move {
178 let database_name = current_database(self).await?;
179 let lock_id = generate_lock_id(&database_name);
180
181 let _ = query("SELECT pg_advisory_unlock($1)")
183 .bind(lock_id)
184 .execute(self)
185 .await?;
186
187 Ok(())
188 })
189 }
190
191 fn validate<'e: 'm, 'm>(
192 &'e mut self,
193 migration: &'m Migration,
194 ) -> BoxFuture<'m, MigrateResult<()>> {
195 Box::pin(async move {
196 let checksum: Option<Vec<u8>> =
198 query_scalar("SELECT checksum FROM _sqlx_migrations WHERE version = $1")
199 .bind(migration.version)
200 .fetch_optional(self)
201 .await
202 .map_err(MigrateError::AccessMigrationMetadata)?;
203
204 if let Some(checksum) = checksum {
205 if checksum == *migration.checksum {
206 Ok(())
207 } else {
208 Err(MigrateError::VersionMismatch(migration.version))
209 }
210 } else {
211 Err(MigrateError::VersionMissing(migration.version))
212 }
213 })
214 }
215
216 fn apply<'e: 'm, 'm>(
217 &'e mut self,
218 migration: &'m Migration,
219 ) -> BoxFuture<'m, Result<Duration>> {
220 Box::pin(async move {
221 let mut tx = self.begin().await?;
222 let start = Instant::now();
223
224 let _ = tx.execute(&*migration.sql).await?;
230
231 let _ = query(
233 r#"
234 INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
235 VALUES ( $1, $2, TRUE, $3, -1 )
236 "#,
237 )
238 .bind(migration.version)
239 .bind(&*migration.description)
240 .bind(&*migration.checksum)
241 .execute(&mut tx)
242 .await?;
243
244 tx.commit().await?;
245
246 let elapsed = start.elapsed();
251
252 let _ = query(
254 r#"
255 UPDATE _sqlx_migrations
256 SET execution_time = $1
257 WHERE version = $2
258 "#,
259 )
260 .bind(i64::try_from(elapsed.as_nanos()).map_err(crate::error::Error::IntegerOverflow)?)
261 .bind(migration.version)
262 .execute(self)
263 .await?;
264
265 Ok(elapsed)
266 })
267 }
268
269 fn revert<'e: 'm, 'm>(
270 &'e mut self,
271 migration: &'m Migration,
272 ) -> BoxFuture<'m, Result<Duration>> {
273 Box::pin(async move {
274 let mut tx = self.begin().await?;
277 let start = Instant::now();
278
279 let _ = tx.execute(&*migration.sql).await?;
280
281 let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = $1"#)
283 .bind(migration.version)
284 .execute(&mut tx)
285 .await?;
286
287 tx.commit().await?;
288
289 let elapsed = start.elapsed();
290
291 Ok(elapsed)
292 })
293 }
294}
295
296async fn current_database(conn: &mut PgConnection) -> Result<String> {
297 query_scalar("SELECT current_database()")
299 .fetch_one(conn)
300 .await
301}
302
303fn generate_lock_id(database_name: &str) -> i64 {
305 const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
306 0x3d32ad9e * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
308}