sqlx_exasol/
migrate.rs

1use std::{
2    str::FromStr,
3    time::{Duration, Instant},
4};
5
6use futures_core::future::BoxFuture;
7use sqlx_core::{
8    connection::{ConnectOptions, Connection},
9    executor::Executor,
10    migrate::{AppliedMigration, Migrate, MigrateDatabase, MigrateError, Migration},
11    query::query,
12    query_as::query_as,
13    query_scalar::query_scalar,
14};
15
16use crate::{
17    connection::{
18        websocket::future::{ExecuteBatch, WebSocketFuture},
19        ExaConnection,
20    },
21    database::Exasol,
22    options::ExaConnectOptions,
23    SqlxError, SqlxResult,
24};
25
26const LOCK_WARN: &str = "Exasol does not support database locking!";
27
28fn parse_for_maintenance(url: &str) -> SqlxResult<(ExaConnectOptions, String)> {
29    let mut options = ExaConnectOptions::from_str(url)?;
30
31    let database = options.schema.ok_or_else(|| {
32        SqlxError::Configuration("DATABASE_URL does not specify a database".into())
33    })?;
34
35    // switch to <no> database for create/drop commands
36    options.schema = None;
37
38    Ok((options, database))
39}
40
41impl MigrateDatabase for Exasol {
42    fn create_database(url: &str) -> BoxFuture<'_, SqlxResult<()>> {
43        Box::pin(async move {
44            let (options, database) = parse_for_maintenance(url)?;
45            let mut conn = options.connect().await?;
46
47            let query = format!("CREATE SCHEMA \"{}\"", database.replace('"', "\"\""));
48            let _ = conn.execute(&*query).await?;
49
50            Ok(())
51        })
52    }
53
54    fn database_exists(url: &str) -> BoxFuture<'_, SqlxResult<bool>> {
55        Box::pin(async move {
56            let (options, database) = parse_for_maintenance(url)?;
57            let mut conn = options.connect().await?;
58
59            let query = "SELECT true FROM exa_schemas WHERE schema_name = ?";
60            let exists: bool = query_scalar(query)
61                .bind(database)
62                .fetch_one(&mut conn)
63                .await?;
64
65            Ok(exists)
66        })
67    }
68
69    fn drop_database(url: &str) -> BoxFuture<'_, SqlxResult<()>> {
70        Box::pin(async move {
71            let (options, database) = parse_for_maintenance(url)?;
72            let mut conn = options.connect().await?;
73
74            let query = format!("DROP SCHEMA IF EXISTS `{database}`");
75            let _ = conn.execute(&*query).await?;
76
77            Ok(())
78        })
79    }
80}
81
82impl Migrate for ExaConnection {
83    fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
84        Box::pin(async move {
85            let query = r#"
86            CREATE TABLE IF NOT EXISTS "_sqlx_migrations" (
87                version DECIMAL(20, 0),
88                description CLOB NOT NULL,
89                installed_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
90                success BOOLEAN NOT NULL,
91                checksum CLOB NOT NULL,
92                execution_time DECIMAL(20, 0) NOT NULL
93            );"#;
94
95            self.execute(query).await?;
96            Ok(())
97        })
98    }
99
100    fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
101        Box::pin(async move {
102            let query = r#"
103            SELECT version
104            FROM "_sqlx_migrations"
105            WHERE success = false
106            ORDER BY version
107            LIMIT 1
108            "#;
109
110            let row: Option<(i64,)> = query_as(query).fetch_optional(self).await?;
111
112            Ok(row.map(|r| r.0))
113        })
114    }
115
116    fn list_applied_migrations(
117        &mut self,
118    ) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
119        Box::pin(async move {
120            let query = r#"
121                SELECT version, checksum
122                FROM "_sqlx_migrations"
123                ORDER BY version
124                "#;
125
126            let rows: Vec<(i64, String)> = query_as(query).fetch_all(self).await?;
127
128            let mut migrations = Vec::with_capacity(rows.len());
129
130            for (version, checksum) in rows {
131                let checksum = hex::decode(checksum)
132                    .map_err(From::from)
133                    .map_err(MigrateError::Source)?
134                    .into();
135
136                let migration = AppliedMigration { version, checksum };
137                migrations.push(migration);
138            }
139
140            Ok(migrations)
141        })
142    }
143
144    fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
145        Box::pin(async move {
146            tracing::warn!("{LOCK_WARN}");
147            Ok(())
148        })
149    }
150
151    fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
152        Box::pin(async move {
153            tracing::warn!("{LOCK_WARN}");
154            Ok(())
155        })
156    }
157
158    fn apply<'e: 'm, 'm>(
159        &'e mut self,
160        migration: &'m Migration,
161    ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
162        Box::pin(async move {
163            let mut tx = self.begin().await?;
164            let start = Instant::now();
165
166            ExecuteBatch::new(migration.sql.as_ref())
167                .future(&mut tx.ws)
168                .await?;
169
170            let checksum = hex::encode(&*migration.checksum);
171
172            let query_str = r#"
173            INSERT INTO "_sqlx_migrations" ( version, description, success, checksum, execution_time )
174            VALUES ( ?, ?, TRUE, ?, -1 );
175            "#;
176
177            let _ = query(query_str)
178                .bind(migration.version)
179                .bind(&*migration.description)
180                .bind(checksum)
181                .execute(&mut *tx)
182                .await?;
183
184            tx.commit().await?;
185
186            let elapsed = start.elapsed();
187
188            let query_str = r#"
189                UPDATE "_sqlx_migrations"
190                SET execution_time = ?
191                WHERE version = ?
192                "#;
193
194            let _ = query(query_str)
195                .bind(elapsed.as_nanos())
196                .bind(migration.version)
197                .execute(self)
198                .await?;
199
200            Ok(elapsed)
201        })
202    }
203
204    fn revert<'e: 'm, 'm>(
205        &'e mut self,
206        migration: &'m Migration,
207    ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
208        Box::pin(async move {
209            let mut tx = self.begin().await?;
210            let start = Instant::now();
211
212            ExecuteBatch::new(migration.sql.as_ref())
213                .future(&mut tx.ws)
214                .await?;
215
216            let query_str = r#" DELETE FROM "_sqlx_migrations" WHERE version = ? "#;
217            let _ = query(query_str)
218                .bind(migration.version)
219                .execute(&mut *tx)
220                .await?;
221
222            tx.commit().await?;
223
224            let elapsed = start.elapsed();
225
226            Ok(elapsed)
227        })
228    }
229}