sqlx_exasol/
testing.rs

1use std::{ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
2
3use futures_core::future::BoxFuture;
4use futures_util::TryStreamExt;
5use sqlx_core::{
6    connection::Connection,
7    error::DatabaseError,
8    executor::Executor,
9    pool::{Pool, PoolOptions},
10    query, query_scalar,
11    testing::{FixtureSnapshot, TestArgs, TestContext, TestSupport},
12    Error,
13};
14
15use crate::{
16    connection::ExaConnection, database::Exasol, options::ExaConnectOptions, ExaQueryResult,
17};
18
19static MASTER_POOL: OnceLock<Pool<Exasol>> = OnceLock::new();
20
21impl TestSupport for Exasol {
22    fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
23        Box::pin(test_context(args))
24    }
25
26    fn cleanup_test(db_name: &str) -> BoxFuture<'_, Result<(), Error>> {
27        Box::pin(async move {
28            let mut conn = MASTER_POOL
29                .get()
30                .expect("cleanup_test() invoked outside `#[sqlx::test]")
31                .acquire()
32                .await?;
33
34            do_cleanup(&mut conn, db_name).await
35        })
36    }
37
38    fn cleanup_test_dbs() -> BoxFuture<'static, Result<Option<usize>, Error>> {
39        Box::pin(async move {
40            let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
41
42            let mut conn = ExaConnection::connect(&url).await?;
43
44            let query_str = r#"SELECT db_name FROM "_sqlx_tests"."_sqlx_test_databases";"#;
45            let db_names_to_delete: Vec<String> = query_scalar::query_scalar(query_str)
46                .fetch_all(&mut conn)
47                .await?;
48
49            if db_names_to_delete.is_empty() {
50                return Ok(None);
51            }
52
53            let mut deleted_db_names = Vec::with_capacity(db_names_to_delete.len());
54
55            for db_name in &db_names_to_delete {
56                let query_str = format!(r#"DROP SCHEMA IF EXISTS "{db_name}" CASCADE;"#);
57
58                match conn.execute(&*query_str).await {
59                    Ok(_deleted) => {
60                        deleted_db_names.push(db_name);
61                    }
62                    // Assume a database error just means the DB is still in use.
63                    Err(Error::Database(dbe)) => {
64                        eprintln!("could not clean test database {db_name}: {dbe}");
65                    }
66                    // Bubble up other errors
67                    Err(e) => return Err(e),
68                }
69            }
70
71            if deleted_db_names.is_empty() {
72                return Ok(None);
73            }
74
75            query::query(r#"DELETE FROM "_sqlx_tests"."_sqlx_test_databases" WHERE db_name = ?;"#)
76                .bind(&deleted_db_names)
77                .execute(&mut conn)
78                .await?;
79
80            conn.close().await.ok();
81            Ok(Some(db_names_to_delete.len()))
82        })
83    }
84
85    fn snapshot(
86        _conn: &mut Self::Connection,
87    ) -> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>> {
88        // TODO: SQLx doesn't implement this yet either.
89        todo!()
90    }
91}
92
93async fn test_context(args: &TestArgs) -> Result<TestContext<Exasol>, Error> {
94    let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
95    let master_opts = ExaConnectOptions::from_str(&url).expect("failed to parse DATABASE_URL");
96
97    let master_pool = MASTER_POOL.get_or_init(|| {
98        PoolOptions::new()
99            // Exasol supports 100 connections.
100            // This should be more than enough for testing purposes.
101            .max_connections(20)
102            // Immediately close master connections. Tokio's I/O streams don't like hopping
103            // runtimes.
104            .after_release(|_conn, _| Box::pin(async move { Ok(false) }))
105            .connect_lazy_with(master_opts.clone())
106    });
107
108    // Sanity checks:
109    assert_eq!(
110        master_pool.connect_options().hosts_details,
111        master_opts.hosts_details,
112        "DATABASE_URL changed at runtime, host differs"
113    );
114
115    assert_eq!(
116        master_pool.connect_options().schema,
117        master_opts.schema,
118        "DATABASE_URL changed at runtime, database differs"
119    );
120
121    let mut conn = master_pool.acquire().await?;
122
123    cleanup_old_dbs(&mut conn).await?;
124
125    let setup_res = conn
126        .execute_many(
127            r#"
128        CREATE SCHEMA IF NOT EXISTS "_sqlx_tests";
129        CREATE TABLE IF NOT EXISTS "_sqlx_tests"."_sqlx_test_databases" (
130            db_name CLOB NOT NULL,
131            test_path CLOB NOT NULL,
132            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
133        );"#,
134        )
135        .try_collect::<ExaQueryResult>()
136        .await;
137
138    if let Err(e) = setup_res {
139        match e
140            .as_database_error()
141            .and_then(DatabaseError::code)
142            .as_deref()
143        {
144            // Error code for when an object already exists.
145            //
146            // Multiple tests concurrenclty trying to create the test schema and table can cause a
147            // `GlobalTransactionRollback`, where the objects did not exist when creating them was
148            // attempted but they got created by another test before the current one could create
149            // them.
150            //
151            // This means that, in spite of the failure, the objects now exist, which is what we
152            // wanted all along.
153            Some("40001") => Ok(()),
154            _ => Err(e),
155        }?;
156    }
157
158    let db_name = Exasol::db_name(args);
159    do_cleanup(&mut conn, &db_name).await?;
160
161    let mut tx = conn.begin().await?;
162
163    let query_str = r#"
164        INSERT INTO "_sqlx_tests"."_sqlx_test_databases" (db_name, test_path)
165        VALUES (?, ?)"#;
166
167    query::query(query_str)
168        .bind(&db_name)
169        .bind(args.test_path)
170        .execute(&mut *tx)
171        .await?;
172
173    tx.execute(&*format!(r#"CREATE SCHEMA "{db_name}";"#))
174        .await?;
175    tx.commit().await?;
176
177    eprintln!("created database {db_name}");
178
179    let mut connect_opts = master_pool.connect_options().deref().clone();
180
181    connect_opts.schema = Some(db_name.clone());
182
183    Ok(TestContext {
184        pool_opts: PoolOptions::new()
185            // Don't allow a single test to take all the connections.
186            // Most tests shouldn't require more than 5 connections concurrently,
187            // or else they're likely doing too much in one test.
188            .max_connections(5)
189            // Close connections ASAP if left in the idle queue.
190            .idle_timeout(Some(Duration::from_secs(1)))
191            .parent(master_pool.clone()),
192        connect_opts,
193        db_name,
194    })
195}
196
197async fn do_cleanup(conn: &mut ExaConnection, db_name: &str) -> Result<(), Error> {
198    conn.execute(&*format!(r#"DROP SCHEMA IF EXISTS "{db_name}" CASCADE"#))
199        .await?;
200
201    query::query(r#"DELETE FROM "_sqlx_tests"."_sqlx_test_databases" WHERE db_name = ?;"#)
202        .bind(db_name)
203        .execute(&mut *conn)
204        .await?;
205
206    Ok(())
207}
208
209/// Pre <0.8.4, test databases were stored by integer ID.
210async fn cleanup_old_dbs(conn: &mut ExaConnection) -> Result<(), Error> {
211    let res =
212        query_scalar::query_scalar(r#"SELECT db_id FROM "_sqlx_tests"."_sqlx_test_databases";"#)
213            .fetch_all(&mut *conn)
214            .await;
215
216    let db_ids: Vec<u64> = match res {
217        Ok(db_ids) => db_ids,
218        Err(e) => {
219            return match e
220                .as_database_error()
221                .and_then(DatabaseError::code)
222                .as_deref()
223            {
224                // Common error code for when an object does not exist.
225                //
226                // Applies to both a missing `_sqlx_test_databases` table,
227                // in which case no cleanup is needed OR a missing `db_id`
228                // column, in which case the table has already been migrated.
229                Some("42000") => Ok(()),
230                _ => Err(e),
231            };
232        }
233    };
234
235    // Drop old-style test databases.
236    for id in db_ids {
237        let query = format!(r#"DROP SCHEMA IF EXISTS "_sqlx_test_database_{id}" CASCADE"#);
238        match conn.execute(&*query).await {
239            Ok(_deleted) => (),
240            // Assume a database error just means the DB is still in use.
241            Err(Error::Database(dbe)) => {
242                eprintln!("could not clean old test database _sqlx_test_database_{id}: {dbe}");
243            }
244            // Bubble up other errors
245            Err(e) => return Err(e),
246        }
247    }
248
249    conn.execute(r#"DROP TABLE IF EXISTS "_sqlx_tests"."_sqlx_test_databases";"#)
250        .await?;
251
252    Ok(())
253}