#![allow(dead_code)]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::Duration;
use mysql::prelude::*;
use mysql::{Conn, Opts, Pool, PooledConn};
use testcontainers::core::logs::LogFrame;
use testcontainers::runners::AsyncRunner;
use testcontainers::{ContainerAsync, GenericImage, ImageExt};
use uuid::Uuid;
static MYSQL_INNER: RwLock<Option<ContainerAsync<GenericImage>>> = RwLock::new(None);
async fn mysql() -> RwLockReadGuard<'static, Option<ContainerAsync<GenericImage>>> {
{
let mut mysql = MYSQL_INNER.write().unwrap();
if mysql.is_none() {
let created = create_mysql_image_async().await;
*mysql = Some(created);
}
}
MYSQL_INNER.read().unwrap()
}
async fn create_mysql_image_async() -> ContainerAsync<GenericImage> {
let temporary_server_started = Arc::new(AtomicBool::new(false));
let mysql_ready = Arc::new(AtomicBool::new(false));
let temp_clone = Arc::clone(&temporary_server_started);
let mysql_clone = Arc::clone(&mysql_ready);
let log_consumer = move |log: &LogFrame| {
let msg = format!("{:?}", log);
if msg.contains("Temporary server started") {
temp_clone.store(true, Ordering::SeqCst);
} else if temp_clone.load(Ordering::SeqCst)
&& msg.contains("/usr/sbin/mysqld: ready for connections")
{
mysql_clone.store(true, Ordering::SeqCst);
}
};
let image = GenericImage::new("mysql", "8.4")
.with_log_consumer(log_consumer)
.with_env_var("MYSQL_ROOT_PASSWORD", "rootpw")
.with_env_var("MYSQL_DATABASE", "bootstrap");
let started = AsyncRunner::start(image)
.await
.expect("failed to start mysql docker image");
while !mysql_ready.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(20)).await;
}
let port = started.get_host_port_ipv4(3306).await.unwrap();
let admin_url = format!("mysql://root:rootpw@127.0.0.1:{port}");
let admin_pool =
Pool::new(Opts::from_url(&admin_url).expect("parse admin url")).expect("create admin pool");
let mut admin: PooledConn = admin_pool.get_conn().expect("failed to get admin conn");
admin
.query_drop("SET GLOBAL max_connections = 1000")
.expect("failed to set max connections");
started
}
async fn mysql_base_url() -> String {
let port = mysql()
.await
.as_ref()
.unwrap()
.get_host_port_ipv4(3306)
.await
.unwrap();
format!("mysql://root:rootpw@127.0.0.1:{port}")
}
async fn url_with_db(db: &str) -> String {
format!("{}/{}", mysql_base_url().await, db)
}
pub async fn fresh_mysql_db() -> (Pool, String) {
let admin_url = url_with_db("bootstrap").await;
let admin_pool =
Pool::new(Opts::from_url(&admin_url).expect("parse admin url")).expect("create admin pool");
let mut admin: PooledConn = admin_pool.get_conn().expect("failed to get admin conn");
let db_name = format!("test_{}", Uuid::new_v4().simple());
let create_stmt = format!(
"CREATE DATABASE `{}` CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci",
db_name
);
if admin.exec_drop(&create_stmt, ()).is_err() {
admin
.exec_drop(
format!(
"CREATE DATABASE `{}` CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci",
db_name
),
(),
)
.expect("create db (fallback)");
}
let url = url_with_db(&db_name).await;
let pool = Pool::new(Opts::from_url(&url).expect("parse test url")).expect("create test pool");
(pool, db_name)
}
pub async fn get_test_conn() -> (Pool, Conn) {
let (pool, _db_name) = fresh_mysql_db().await;
let conn = pool.get_conn().unwrap().unwrap();
(pool, conn)
}
#[ctor::dtor]
fn stop_shared_mysql() {
let Ok(rt) = tokio::runtime::Runtime::new() else {
return;
};
rt.block_on(async {
if let Some(c) = MYSQL_INNER.write().unwrap().take() {
drop(c);
}
})
}