1use std::{
2 str::FromStr,
3 time::{Duration, Instant},
4};
5
6use futures_core::future::BoxFuture;
7use sqlx_core::{
8 connection::{ConnectOptions, Connection},
9 executor::Executor,
10 migrate::{AppliedMigration, Migrate, MigrateDatabase, MigrateError, Migration},
11 query::query,
12 query_as::query_as,
13 query_scalar::query_scalar,
14};
15
16use crate::{
17 connection::{
18 websocket::future::{ExecuteBatch, WebSocketFuture},
19 ExaConnection,
20 },
21 database::Exasol,
22 options::ExaConnectOptions,
23 SqlxError, SqlxResult,
24};
25
26const LOCK_WARN: &str = "Exasol does not support database locking!";
27
28fn parse_for_maintenance(url: &str) -> SqlxResult<(ExaConnectOptions, String)> {
29 let mut options = ExaConnectOptions::from_str(url)?;
30
31 let database = options.schema.ok_or_else(|| {
32 SqlxError::Configuration("DATABASE_URL does not specify a database".into())
33 })?;
34
35 options.schema = None;
37
38 Ok((options, database))
39}
40
41impl MigrateDatabase for Exasol {
42 fn create_database(url: &str) -> BoxFuture<'_, SqlxResult<()>> {
43 Box::pin(async move {
44 let (options, database) = parse_for_maintenance(url)?;
45 let mut conn = options.connect().await?;
46
47 let query = format!("CREATE SCHEMA \"{}\"", database.replace('"', "\"\""));
48 let _ = conn.execute(&*query).await?;
49
50 Ok(())
51 })
52 }
53
54 fn database_exists(url: &str) -> BoxFuture<'_, SqlxResult<bool>> {
55 Box::pin(async move {
56 let (options, database) = parse_for_maintenance(url)?;
57 let mut conn = options.connect().await?;
58
59 let query = "SELECT true FROM exa_schemas WHERE schema_name = ?";
60 let exists: bool = query_scalar(query)
61 .bind(database)
62 .fetch_one(&mut conn)
63 .await?;
64
65 Ok(exists)
66 })
67 }
68
69 fn drop_database(url: &str) -> BoxFuture<'_, SqlxResult<()>> {
70 Box::pin(async move {
71 let (options, database) = parse_for_maintenance(url)?;
72 let mut conn = options.connect().await?;
73
74 let query = format!("DROP SCHEMA IF EXISTS `{database}`");
75 let _ = conn.execute(&*query).await?;
76
77 Ok(())
78 })
79 }
80}
81
82impl Migrate for ExaConnection {
83 fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
84 Box::pin(async move {
85 let query = r#"
86 CREATE TABLE IF NOT EXISTS "_sqlx_migrations" (
87 version DECIMAL(20, 0),
88 description CLOB NOT NULL,
89 installed_on TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
90 success BOOLEAN NOT NULL,
91 checksum CLOB NOT NULL,
92 execution_time DECIMAL(20, 0) NOT NULL
93 );"#;
94
95 self.execute(query).await?;
96 Ok(())
97 })
98 }
99
100 fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
101 Box::pin(async move {
102 let query = r#"
103 SELECT version
104 FROM "_sqlx_migrations"
105 WHERE success = false
106 ORDER BY version
107 LIMIT 1
108 "#;
109
110 let row: Option<(i64,)> = query_as(query).fetch_optional(self).await?;
111
112 Ok(row.map(|r| r.0))
113 })
114 }
115
116 fn list_applied_migrations(
117 &mut self,
118 ) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
119 Box::pin(async move {
120 let query = r#"
121 SELECT version, checksum
122 FROM "_sqlx_migrations"
123 ORDER BY version
124 "#;
125
126 let rows: Vec<(i64, String)> = query_as(query).fetch_all(self).await?;
127
128 let mut migrations = Vec::with_capacity(rows.len());
129
130 for (version, checksum) in rows {
131 let checksum = hex::decode(checksum)
132 .map_err(From::from)
133 .map_err(MigrateError::Source)?
134 .into();
135
136 let migration = AppliedMigration { version, checksum };
137 migrations.push(migration);
138 }
139
140 Ok(migrations)
141 })
142 }
143
144 fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
145 Box::pin(async move {
146 tracing::warn!("{LOCK_WARN}");
147 Ok(())
148 })
149 }
150
151 fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
152 Box::pin(async move {
153 tracing::warn!("{LOCK_WARN}");
154 Ok(())
155 })
156 }
157
158 fn apply<'e: 'm, 'm>(
159 &'e mut self,
160 migration: &'m Migration,
161 ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
162 Box::pin(async move {
163 let mut tx = self.begin().await?;
164 let start = Instant::now();
165
166 ExecuteBatch::new(migration.sql.as_ref())
167 .future(&mut tx.ws)
168 .await?;
169
170 let checksum = hex::encode(&*migration.checksum);
171
172 let query_str = r#"
173 INSERT INTO "_sqlx_migrations" ( version, description, success, checksum, execution_time )
174 VALUES ( ?, ?, TRUE, ?, -1 );
175 "#;
176
177 let _ = query(query_str)
178 .bind(migration.version)
179 .bind(&*migration.description)
180 .bind(checksum)
181 .execute(&mut *tx)
182 .await?;
183
184 tx.commit().await?;
185
186 let elapsed = start.elapsed();
187
188 let query_str = r#"
189 UPDATE "_sqlx_migrations"
190 SET execution_time = ?
191 WHERE version = ?
192 "#;
193
194 let _ = query(query_str)
195 .bind(elapsed.as_nanos())
196 .bind(migration.version)
197 .execute(self)
198 .await?;
199
200 Ok(elapsed)
201 })
202 }
203
204 fn revert<'e: 'm, 'm>(
205 &'e mut self,
206 migration: &'m Migration,
207 ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
208 Box::pin(async move {
209 let mut tx = self.begin().await?;
210 let start = Instant::now();
211
212 ExecuteBatch::new(migration.sql.as_ref())
213 .future(&mut tx.ws)
214 .await?;
215
216 let query_str = r#" DELETE FROM "_sqlx_migrations" WHERE version = ? "#;
217 let _ = query(query_str)
218 .bind(migration.version)
219 .execute(&mut *tx)
220 .await?;
221
222 tx.commit().await?;
223
224 let elapsed = start.elapsed();
225
226 Ok(elapsed)
227 })
228 }
229}