use std::{ops::Deref, str::FromStr, sync::OnceLock, time::Duration};
use futures_util::TryStreamExt;
use sqlx_core::{
connection::Connection,
error::DatabaseError,
executor::Executor,
pool::{Pool, PoolOptions},
sql_str::AssertSqlSafe,
testing::{FixtureSnapshot, TestArgs, TestContext, TestSupport},
Error,
};
use crate::{
connection::ExaConnection, database::Exasol, options::ExaConnectOptions, ExaQueryResult,
SqlxResult,
};
static MASTER_POOL: OnceLock<Pool<Exasol>> = OnceLock::new();
impl TestSupport for Exasol {
async fn test_context(args: &TestArgs) -> SqlxResult<TestContext<Self>> {
test_context(args).await
}
async fn cleanup_test(db_name: &str) -> SqlxResult<()> {
let mut conn = MASTER_POOL
.get()
.expect("cleanup_test() invoked outside `#[sqlx_exasol::test]")
.acquire()
.await?;
do_cleanup(&mut conn, db_name).await
}
async fn cleanup_test_dbs() -> SqlxResult<Option<usize>> {
let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
let mut conn = ExaConnection::connect(&url).await?;
let query = r#"SELECT db_name FROM "_sqlx_tests"."_sqlx_test_databases";"#;
let db_names_to_delete: Vec<String> = sqlx_core::query_scalar::query_scalar(query)
.fetch_all(&mut conn)
.await?;
if db_names_to_delete.is_empty() {
return Ok(None);
}
let mut deleted_db_names = Vec::with_capacity(db_names_to_delete.len());
for db_name in &db_names_to_delete {
let query = format!(r#"DROP SCHEMA IF EXISTS "{db_name}" CASCADE;"#);
match conn.execute(AssertSqlSafe(query)).await {
Ok(_deleted) => {
deleted_db_names.push(db_name);
}
Err(Error::Database(dbe)) => {
eprintln!("could not clean test database {db_name}: {dbe}");
}
Err(e) => return Err(e),
}
}
if deleted_db_names.is_empty() {
return Ok(None);
}
sqlx_core::query::query(
r#"DELETE FROM "_sqlx_tests"."_sqlx_test_databases" WHERE db_name = ?;"#,
)
.bind(&deleted_db_names)
.execute(&mut conn)
.await?;
conn.close().await.ok();
Ok(Some(db_names_to_delete.len()))
}
async fn snapshot(_conn: &mut Self::Connection) -> SqlxResult<FixtureSnapshot<Self>> {
todo!()
}
}
async fn test_context(args: &TestArgs) -> Result<TestContext<Exasol>, Error> {
let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
let master_opts = ExaConnectOptions::from_str(&url).expect("failed to parse DATABASE_URL");
let master_pool = MASTER_POOL.get_or_init(|| {
PoolOptions::new()
.max_connections(20)
.after_release(|_conn, _| Box::pin(async move { Ok(false) }))
.connect_lazy_with(master_opts.clone())
});
assert_eq!(
master_pool.connect_options().hosts,
master_opts.hosts,
"DATABASE_URL changed at runtime, host differs"
);
assert_eq!(
master_pool.connect_options().schema,
master_opts.schema,
"DATABASE_URL changed at runtime, database differs"
);
#[allow(clippy::large_futures, reason = "silencing clippy")]
let mut conn = master_pool.acquire().await?;
cleanup_old_dbs(&mut conn).await?;
let setup_res = conn
.execute_many(
r#"
CREATE SCHEMA IF NOT EXISTS "_sqlx_tests";
CREATE TABLE IF NOT EXISTS "_sqlx_tests"."_sqlx_test_databases" (
db_name CLOB NOT NULL,
test_path CLOB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);"#,
)
.try_collect::<ExaQueryResult>()
.await;
if let Err(e) = setup_res {
match e
.as_database_error()
.and_then(DatabaseError::code)
.as_deref()
{
Some("40001") => Ok(()),
_ => Err(e),
}?;
}
let db_name = Exasol::db_name(args);
do_cleanup(&mut conn, &db_name).await?;
let mut tx = conn.begin().await?;
let query_str = r#"
INSERT INTO "_sqlx_tests"."_sqlx_test_databases" (db_name, test_path)
VALUES (?, ?)"#;
sqlx_core::query::query(query_str)
.bind(&db_name)
.bind(args.test_path)
.execute(&mut *tx)
.await?;
tx.execute(AssertSqlSafe(format!(r#"CREATE SCHEMA "{db_name}";"#)))
.await?;
tx.commit().await?;
eprintln!("created database {db_name}");
let mut connect_opts = master_pool.connect_options().deref().clone();
connect_opts.schema = Some(db_name.clone());
Ok(TestContext {
pool_opts: PoolOptions::new()
.max_connections(5)
.idle_timeout(Some(Duration::from_secs(1)))
.parent(master_pool.clone()),
connect_opts,
db_name,
})
}
async fn do_cleanup(conn: &mut ExaConnection, db_name: &str) -> Result<(), Error> {
let query = format!(r#"DROP SCHEMA IF EXISTS "{db_name}" CASCADE"#);
conn.execute(AssertSqlSafe(query)).await?;
sqlx_core::query::query(
r#"DELETE FROM "_sqlx_tests"."_sqlx_test_databases" WHERE db_name = ?;"#,
)
.bind(db_name)
.execute(&mut *conn)
.await?;
Ok(())
}
async fn cleanup_old_dbs(conn: &mut ExaConnection) -> Result<(), Error> {
let res = sqlx_core::query_scalar::query_scalar(
r#"SELECT db_id FROM "_sqlx_tests"."_sqlx_test_databases";"#,
)
.fetch_all(&mut *conn)
.await;
let db_ids: Vec<i64> = match res {
Ok(db_ids) => db_ids,
Err(e) => {
return match e
.as_database_error()
.and_then(DatabaseError::code)
.as_deref()
{
Some("42000") => Ok(()),
_ => Err(e),
};
}
};
for id in db_ids {
let query = format!(r#"DROP SCHEMA IF EXISTS "_sqlx_test_database_{id}" CASCADE"#);
match conn.execute(AssertSqlSafe(query)).await {
Ok(_deleted) => (),
Err(Error::Database(dbe)) => {
eprintln!("could not clean old test database _sqlx_test_database_{id}: {dbe}");
}
Err(e) => return Err(e),
}
}
conn.execute(r#"DROP TABLE IF EXISTS "_sqlx_tests"."_sqlx_test_databases";"#)
.await?;
Ok(())
}