1use std::{ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
2
3use futures_core::future::BoxFuture;
4use futures_util::TryStreamExt;
5use sqlx_core::{
6 connection::Connection,
7 error::DatabaseError,
8 executor::Executor,
9 pool::{Pool, PoolOptions},
10 query, query_scalar,
11 testing::{FixtureSnapshot, TestArgs, TestContext, TestSupport},
12 Error,
13};
14
15use crate::{
16 connection::ExaConnection, database::Exasol, options::ExaConnectOptions, ExaQueryResult,
17};
18
19static MASTER_POOL: OnceLock<Pool<Exasol>> = OnceLock::new();
20
21impl TestSupport for Exasol {
22 fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
23 Box::pin(test_context(args))
24 }
25
26 fn cleanup_test(db_name: &str) -> BoxFuture<'_, Result<(), Error>> {
27 Box::pin(async move {
28 let mut conn = MASTER_POOL
29 .get()
30 .expect("cleanup_test() invoked outside `#[sqlx::test]")
31 .acquire()
32 .await?;
33
34 do_cleanup(&mut conn, db_name).await
35 })
36 }
37
38 fn cleanup_test_dbs() -> BoxFuture<'static, Result<Option<usize>, Error>> {
39 Box::pin(async move {
40 let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
41
42 let mut conn = ExaConnection::connect(&url).await?;
43
44 let query_str = r#"SELECT db_name FROM "_sqlx_tests"."_sqlx_test_databases";"#;
45 let db_names_to_delete: Vec<String> = query_scalar::query_scalar(query_str)
46 .fetch_all(&mut conn)
47 .await?;
48
49 if db_names_to_delete.is_empty() {
50 return Ok(None);
51 }
52
53 let mut deleted_db_names = Vec::with_capacity(db_names_to_delete.len());
54
55 for db_name in &db_names_to_delete {
56 let query_str = format!(r#"DROP SCHEMA IF EXISTS "{db_name}" CASCADE;"#);
57
58 match conn.execute(&*query_str).await {
59 Ok(_deleted) => {
60 deleted_db_names.push(db_name);
61 }
62 Err(Error::Database(dbe)) => {
64 eprintln!("could not clean test database {db_name}: {dbe}");
65 }
66 Err(e) => return Err(e),
68 }
69 }
70
71 if deleted_db_names.is_empty() {
72 return Ok(None);
73 }
74
75 query::query(r#"DELETE FROM "_sqlx_tests"."_sqlx_test_databases" WHERE db_name = ?;"#)
76 .bind(&deleted_db_names)
77 .execute(&mut conn)
78 .await?;
79
80 conn.close().await.ok();
81 Ok(Some(db_names_to_delete.len()))
82 })
83 }
84
85 fn snapshot(
86 _conn: &mut Self::Connection,
87 ) -> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>> {
88 todo!()
90 }
91}
92
93async fn test_context(args: &TestArgs) -> Result<TestContext<Exasol>, Error> {
94 let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
95 let master_opts = ExaConnectOptions::from_str(&url).expect("failed to parse DATABASE_URL");
96
97 let master_pool = MASTER_POOL.get_or_init(|| {
98 PoolOptions::new()
99 .max_connections(20)
102 .after_release(|_conn, _| Box::pin(async move { Ok(false) }))
105 .connect_lazy_with(master_opts.clone())
106 });
107
108 assert_eq!(
110 master_pool.connect_options().hosts_details,
111 master_opts.hosts_details,
112 "DATABASE_URL changed at runtime, host differs"
113 );
114
115 assert_eq!(
116 master_pool.connect_options().schema,
117 master_opts.schema,
118 "DATABASE_URL changed at runtime, database differs"
119 );
120
121 let mut conn = master_pool.acquire().await?;
122
123 cleanup_old_dbs(&mut conn).await?;
124
125 let setup_res = conn
126 .execute_many(
127 r#"
128 CREATE SCHEMA IF NOT EXISTS "_sqlx_tests";
129 CREATE TABLE IF NOT EXISTS "_sqlx_tests"."_sqlx_test_databases" (
130 db_name CLOB NOT NULL,
131 test_path CLOB NOT NULL,
132 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
133 );"#,
134 )
135 .try_collect::<ExaQueryResult>()
136 .await;
137
138 if let Err(e) = setup_res {
139 match e
140 .as_database_error()
141 .and_then(DatabaseError::code)
142 .as_deref()
143 {
144 Some("40001") => Ok(()),
154 _ => Err(e),
155 }?;
156 }
157
158 let db_name = Exasol::db_name(args);
159 do_cleanup(&mut conn, &db_name).await?;
160
161 let mut tx = conn.begin().await?;
162
163 let query_str = r#"
164 INSERT INTO "_sqlx_tests"."_sqlx_test_databases" (db_name, test_path)
165 VALUES (?, ?)"#;
166
167 query::query(query_str)
168 .bind(&db_name)
169 .bind(args.test_path)
170 .execute(&mut *tx)
171 .await?;
172
173 tx.execute(&*format!(r#"CREATE SCHEMA "{db_name}";"#))
174 .await?;
175 tx.commit().await?;
176
177 eprintln!("created database {db_name}");
178
179 let mut connect_opts = master_pool.connect_options().deref().clone();
180
181 connect_opts.schema = Some(db_name.clone());
182
183 Ok(TestContext {
184 pool_opts: PoolOptions::new()
185 .max_connections(5)
189 .idle_timeout(Some(Duration::from_secs(1)))
191 .parent(master_pool.clone()),
192 connect_opts,
193 db_name,
194 })
195}
196
197async fn do_cleanup(conn: &mut ExaConnection, db_name: &str) -> Result<(), Error> {
198 conn.execute(&*format!(r#"DROP SCHEMA IF EXISTS "{db_name}" CASCADE"#))
199 .await?;
200
201 query::query(r#"DELETE FROM "_sqlx_tests"."_sqlx_test_databases" WHERE db_name = ?;"#)
202 .bind(db_name)
203 .execute(&mut *conn)
204 .await?;
205
206 Ok(())
207}
208
209async fn cleanup_old_dbs(conn: &mut ExaConnection) -> Result<(), Error> {
211 let res =
212 query_scalar::query_scalar(r#"SELECT db_id FROM "_sqlx_tests"."_sqlx_test_databases";"#)
213 .fetch_all(&mut *conn)
214 .await;
215
216 let db_ids: Vec<u64> = match res {
217 Ok(db_ids) => db_ids,
218 Err(e) => {
219 return match e
220 .as_database_error()
221 .and_then(DatabaseError::code)
222 .as_deref()
223 {
224 Some("42000") => Ok(()),
230 _ => Err(e),
231 };
232 }
233 };
234
235 for id in db_ids {
237 let query = format!(r#"DROP SCHEMA IF EXISTS "_sqlx_test_database_{id}" CASCADE"#);
238 match conn.execute(&*query).await {
239 Ok(_deleted) => (),
240 Err(Error::Database(dbe)) => {
242 eprintln!("could not clean old test database _sqlx_test_database_{id}: {dbe}");
243 }
244 Err(e) => return Err(e),
246 }
247 }
248
249 conn.execute(r#"DROP TABLE IF EXISTS "_sqlx_tests"."_sqlx_test_databases";"#)
250 .await?;
251
252 Ok(())
253}