#[cfg(feature = "testcontainers")]
use rstest::*;
#[cfg(feature = "testcontainers")]
use std::sync::Arc;
#[cfg(feature = "testcontainers")]
use testcontainers::{
ImageExt,
core::{ContainerPort, WaitFor},
runners::AsyncRunner,
};
#[cfg(feature = "testcontainers")]
pub use testcontainers::{ContainerAsync, GenericImage};
#[cfg(feature = "testcontainers")]
async fn is_port_available(port: u16) -> bool {
use tokio::net::TcpListener;
TcpListener::bind(format!("127.0.0.1:{}", port))
.await
.is_ok()
}
#[cfg(feature = "testcontainers")]
async fn is_port_range_available(base_port: u16) -> bool {
for offset in 0..6 {
if !is_port_available(base_port + offset).await {
return false;
}
}
true
}
#[cfg(feature = "testcontainers")]
fn get_pool_config() -> (u32, u64) {
let max_connections = std::env::var("TEST_MAX_CONNECTIONS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(5);
let acquire_timeout = std::env::var("TEST_ACQUIRE_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(60);
(max_connections, acquire_timeout)
}
#[cfg(feature = "testcontainers")]
pub async fn create_test_any_pool(database_url: &str) -> Result<sqlx::AnyPool, sqlx::Error> {
use sqlx::any::AnyPoolOptions;
let (max_conns, timeout_secs) = get_pool_config();
AnyPoolOptions::new()
.max_connections(max_conns)
.min_connections(1)
.acquire_timeout(std::time::Duration::from_secs(timeout_secs))
.idle_timeout(std::time::Duration::from_secs(600))
.max_lifetime(std::time::Duration::from_secs(1800))
.connect(database_url)
.await
}
#[fixture]
#[cfg(feature = "testcontainers")]
pub async fn redis_cluster_base_port() -> u16 {
let pid = std::process::id();
let pid_offset = ((pid % 10) * 10) as u16;
let pid_based_port = 17000 + pid_offset;
let env_preferred = std::env::var("REDIS_CLUSTER_BASE_PORT")
.ok()
.and_then(|s| s.parse().ok());
let mut candidates = Vec::new();
if let Some(env_port) = env_preferred {
candidates.push(env_port);
}
candidates.push(pid_based_port);
if !candidates.contains(&17000) {
candidates.push(17000);
}
candidates.extend_from_slice(&[27000, 37000, 47000]);
for &candidate in &candidates {
if is_port_range_available(candidate).await {
eprintln!(
"Using Redis Cluster port range: {}-{} (PID: {}, offset: {})",
candidate,
candidate + 5,
pid,
if candidate == pid_based_port {
format!("{} [PID-based]", pid_offset)
} else {
"N/A".to_string()
}
);
return candidate;
}
}
eprintln!("WARNING: All preferred port ranges are occupied. Searching 20000-60000...");
for base in (20000..60000).step_by(1000) {
if is_port_range_available(base).await {
eprintln!(
"Found available port range: {}-{} (searched from 20000)",
base,
base + 5
);
return base;
}
}
panic!(
"Failed to find 6 consecutive available ports. Please free up some ports and try again."
);
}
use fs2::FileExt;
pub struct FileLockGuard {
file: std::fs::File,
}
impl FileLockGuard {
pub fn new(lock_path: impl Into<std::path::PathBuf>) -> std::io::Result<Self> {
let path: std::path::PathBuf = lock_path.into();
let file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(&path)?;
file.lock_exclusive()?;
Ok(Self { file })
}
}
impl Drop for FileLockGuard {
fn drop(&mut self) {
let _ = self.file.unlock();
}
}
#[fixture]
pub async fn postgres_container() -> (ContainerAsync<GenericImage>, Arc<sqlx::PgPool>, u16, String)
{
use testcontainers::core::IntoContainerPort;
let image = GenericImage::new("postgres", "16-alpine")
.with_exposed_port(5432.tcp())
.with_wait_for(WaitFor::message_on_stderr(
"database system is ready to accept connections",
))
.with_startup_timeout(std::time::Duration::from_secs(120))
.with_env_var("POSTGRES_HOST_AUTH_METHOD", "trust");
let postgres = image
.start()
.await
.expect("Failed to start PostgreSQL container");
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let mut port_retry = 0;
let max_port_retries = 7; let port = loop {
match postgres.get_host_port_ipv4(5432).await {
Ok(p) => break p,
Err(e) if port_retry < max_port_retries => {
port_retry += 1;
let delay = tokio::time::Duration::from_millis(200 * 2_u64.pow(port_retry));
eprintln!(
"PostgreSQL port query attempt {} of {} failed: {:?}",
port_retry, max_port_retries, e
);
tokio::time::sleep(delay).await;
}
Err(e) => {
panic!(
"Failed to get PostgreSQL port after {} retries: {}",
max_port_retries, e
);
}
}
};
let database_url = format!(
"postgres://postgres@localhost:{}/postgres?sslmode=disable",
port
);
let (max_conns, timeout_secs) = get_pool_config();
let mut retry_count = 0;
let max_retries = 7;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let pool = loop {
match sqlx::postgres::PgPoolOptions::new()
.max_connections(max_conns)
.min_connections(1)
.acquire_timeout(std::time::Duration::from_secs(timeout_secs))
.idle_timeout(std::time::Duration::from_secs(600)) .max_lifetime(std::time::Duration::from_secs(1800)) .test_before_acquire(false) .connect(&database_url)
.await
{
Ok(pool) => {
match sqlx::query("SELECT 1").fetch_one(&pool).await {
Ok(_) => break pool,
Err(e) if retry_count < max_retries => {
eprintln!(
"PostgreSQL health check attempt {} of {} failed: {:?}",
retry_count + 1,
max_retries,
e
);
retry_count += 1;
let delay = std::time::Duration::from_millis(200 * 2_u64.pow(retry_count));
tokio::time::sleep(delay).await;
continue;
}
Err(e) => {
panic!(
"PostgreSQL pool created but health check failed after {} retries: {}",
max_retries, e
);
}
}
}
Err(e) if retry_count < max_retries => {
eprintln!(
"PostgreSQL connection attempt {} of {} failed: {:?}",
retry_count + 1,
max_retries,
e
);
retry_count += 1;
let delay = std::time::Duration::from_millis(200 * 2_u64.pow(retry_count));
tokio::time::sleep(delay).await;
}
Err(e) => {
panic!(
"Failed to connect to PostgreSQL after {} retries: {}",
max_retries, e
);
}
}
};
(postgres, Arc::new(pool), port, database_url)
}
pub async fn cockroachdb_container()
-> (ContainerAsync<GenericImage>, Arc<sqlx::PgPool>, u16, String) {
use testcontainers::core::IntoContainerPort;
let cockroachdb = GenericImage::new("cockroachdb/cockroach", "v23.1.0")
.with_exposed_port(26257.tcp())
.with_wait_for(WaitFor::message_on_stderr("initialized new cluster"))
.with_cmd(vec![
"start-single-node".to_string(),
"--insecure".to_string(),
"--store=type=mem,size=1GiB".to_string(),
])
.start()
.await
.expect("Failed to start CockroachDB container");
let port = cockroachdb
.get_host_port_ipv4(26257)
.await
.expect("Failed to get CockroachDB port");
let postgres_url = format!("postgresql://root@127.0.0.1:{}/postgres", port);
let postgres_pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(1)
.connect(&postgres_url)
.await
.expect("Failed to connect to CockroachDB postgres database");
sqlx::query("CREATE DATABASE IF NOT EXISTS defaultdb")
.execute(&postgres_pool)
.await
.expect("Failed to create defaultdb");
postgres_pool.close().await;
let database_url = format!("postgresql://root@127.0.0.1:{}/defaultdb", port);
let (max_conns, timeout_secs) = get_pool_config();
let pool = sqlx::postgres::PgPoolOptions::new()
.max_connections(max_conns)
.min_connections(1)
.acquire_timeout(std::time::Duration::from_secs(timeout_secs))
.idle_timeout(std::time::Duration::from_secs(30))
.max_lifetime(std::time::Duration::from_secs(120))
.connect(&database_url)
.await
.expect("Failed to connect to CockroachDB defaultdb");
(cockroachdb, Arc::new(pool), port, database_url)
}
#[fixture]
pub async fn redis_container() -> (ContainerAsync<GenericImage>, u16, String) {
const MAX_RETRIES: u32 = 3;
const RETRY_DELAY_MS: u64 = 2000;
let mut last_error = None;
for attempt in 0..MAX_RETRIES {
match try_start_redis_container().await {
Ok(result) => return result,
Err(e) => {
eprintln!(
"Redis container start attempt {} of {} failed: {:?}",
attempt + 1,
MAX_RETRIES,
e
);
last_error = Some(e);
if attempt < MAX_RETRIES - 1 {
tokio::time::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)).await;
}
}
}
}
panic!(
"Failed to start Redis container after {} attempts: {:?}",
MAX_RETRIES, last_error
);
}
async fn try_start_redis_container()
-> Result<(ContainerAsync<GenericImage>, u16, String), Box<dyn std::error::Error>> {
use testcontainers::core::IntoContainerPort;
let redis = GenericImage::new("redis", "7-alpine")
.with_exposed_port(6379.tcp())
.with_wait_for(WaitFor::message_on_stdout("Ready to accept connections"))
.start()
.await?;
let port = redis.get_host_port_ipv4(6379).await?;
let url = format!("redis://localhost:{}", port);
Ok((redis, port, url))
}
pub struct RedisClusterContainer {
pub container: ContainerAsync<GenericImage>,
pub node_ports: Vec<u16>,
}
impl std::fmt::Debug for RedisClusterContainer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RedisClusterContainer")
.field("node_ports", &self.node_ports)
.field("container", &"<ContainerAsync>")
.finish()
}
}
#[fixture]
pub fn redis_cluster_lock() -> FileLockGuard {
let lock_path = std::env::temp_dir().join("reinhardt_redis_cluster.lock");
FileLockGuard::new(lock_path).expect("Failed to acquire Redis cluster lock")
}
#[fixture]
pub async fn redis_cluster_cleanup(_redis_cluster_lock: FileLockGuard) {
}
async fn try_start_redis_cluster(
base_port: u16,
) -> Result<(ContainerAsync<GenericImage>, Vec<u16>), Box<dyn std::error::Error>> {
let cluster = GenericImage::new("grokzen/redis-cluster", "7.0.10")
.with_wait_for(WaitFor::message_on_stdout("Cluster state changed: ok"))
.with_startup_timeout(std::time::Duration::from_secs(600))
.with_env_var("IP", "0.0.0.0")
.with_env_var("INITIAL_PORT", base_port.to_string())
.with_mapped_port(base_port, ContainerPort::Tcp(base_port))
.with_mapped_port(base_port + 1, ContainerPort::Tcp(base_port + 1))
.with_mapped_port(base_port + 2, ContainerPort::Tcp(base_port + 2))
.with_mapped_port(base_port + 3, ContainerPort::Tcp(base_port + 3))
.with_mapped_port(base_port + 4, ContainerPort::Tcp(base_port + 4))
.with_mapped_port(base_port + 5, ContainerPort::Tcp(base_port + 5))
.start()
.await?;
let node_ports = vec![
base_port,
base_port + 1,
base_port + 2,
base_port + 3,
base_port + 4,
base_port + 5,
];
let max_retries = 30;
for retry in 0..max_retries {
let mut all_ready = true;
for &port in &node_ports {
if tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.is_err()
{
all_ready = false;
break;
}
}
if all_ready {
eprintln!("All Redis cluster ports ready after {} attempts", retry + 1);
return Ok((cluster, node_ports));
}
}
Err(format!(
"Redis cluster ports not ready after {} retries. Ports: {:?}",
max_retries, node_ports
)
.into())
}
#[fixture]
pub async fn redis_cluster_ports_ready(
#[future] redis_cluster_cleanup: (),
#[future] redis_cluster_base_port: u16,
) -> (ContainerAsync<GenericImage>, Vec<u16>) {
let _ = redis_cluster_cleanup.await;
let mut base_port = redis_cluster_base_port.await;
const MAX_PORT_RETRIES: usize = 5;
const PORT_INCREMENT: u16 = 1000;
for retry in 0..MAX_PORT_RETRIES {
if retry > 0 {
eprintln!(
"Retrying Redis cluster start with port {} (attempt {}/{})",
base_port,
retry + 1,
MAX_PORT_RETRIES
);
} else {
eprintln!(
"Using Redis Cluster port range: {}-{}",
base_port,
base_port + 5
);
}
if !is_port_range_available(base_port).await {
eprintln!(
"Port range {}-{} became unavailable, trying next range",
base_port,
base_port + 5
);
base_port += PORT_INCREMENT;
continue;
}
match try_start_redis_cluster(base_port).await {
Ok((container, node_ports)) => {
eprintln!(
"Redis cluster started successfully on ports {:?}",
node_ports
);
return (container, node_ports);
}
Err(e) => {
eprintln!("Failed to start Redis cluster on port {}: {}", base_port, e);
if e.to_string().contains("port is already allocated")
|| e.to_string().contains("address already in use")
{
base_port += PORT_INCREMENT;
continue;
}
panic!("Failed to start Redis cluster (non-port error): {}", e);
}
}
}
panic!(
"Failed to start Redis cluster after {} attempts. Last port tried: {}",
MAX_PORT_RETRIES, base_port
);
}
#[fixture]
pub async fn redis_cluster_container(
#[future] redis_cluster_ports_ready: (ContainerAsync<GenericImage>, Vec<u16>),
) -> RedisClusterContainer {
let (cluster, node_ports) = redis_cluster_ports_ready.await;
eprintln!("Redis cluster ready with ports: {:?}", node_ports);
RedisClusterContainer {
container: cluster,
node_ports,
}
}
#[fixture]
pub async fn redis_cluster(
#[future] redis_cluster_container: RedisClusterContainer,
) -> (
RedisClusterContainer,
Arc<redis::cluster::ClusterClient>,
Vec<String>,
) {
let container = redis_cluster_container.await;
let cluster_nodes: Vec<String> = container
.node_ports
.iter()
.map(|&port| format!("redis://127.0.0.1:{}", port))
.collect();
let client = redis::cluster::ClusterClient::new(cluster_nodes.clone())
.expect("Failed to create cluster client");
let mut conn = client
.get_async_connection()
.await
.expect("Failed to connect to cluster");
redis::cmd("PING")
.query_async::<String>(&mut conn)
.await
.expect("Failed to PING cluster");
eprintln!("Redis cluster connection verified");
(container, Arc::new(client), cluster_nodes)
}
#[fixture]
pub async fn redis_cluster_client(
#[future] redis_cluster_container: RedisClusterContainer,
) -> (
Arc<redis::cluster::ClusterClient>,
Vec<String>,
RedisClusterContainer,
) {
let container = redis_cluster_container.await;
let cluster_nodes: Vec<String> = container
.node_ports
.iter()
.map(|&port| format!("redis://127.0.0.1:{}", port))
.collect();
let client = redis::cluster::ClusterClient::new(cluster_nodes.clone())
.expect("Failed to create cluster client");
let mut conn = client
.get_async_connection()
.await
.expect("Failed to connect to cluster");
redis::cmd("PING")
.query_async::<String>(&mut conn)
.await
.expect("Failed to PING cluster");
eprintln!("Redis cluster client created");
(Arc::new(client), cluster_nodes, container)
}
#[fixture]
pub async fn redis_cluster_urls(
#[future] redis_cluster_container: RedisClusterContainer,
) -> (Vec<String>, RedisClusterContainer) {
let container = redis_cluster_container.await;
let cluster_nodes: Vec<String> = container
.node_ports
.iter()
.map(|&port| format!("redis://127.0.0.1:{}", port))
.collect();
(cluster_nodes, container)
}
#[fixture]
pub async fn redis_cluster_fixture() -> (
RedisClusterContainer,
Arc<redis::cluster::ClusterClient>,
Vec<String>,
) {
let _lock = {
let lock_path = std::env::temp_dir().join("reinhardt_redis_cluster.lock");
FileLockGuard::new(lock_path).expect("Failed to acquire Redis cluster lock")
};
{
let output = tokio::process::Command::new("docker")
.args([
"ps",
"-a",
"--filter",
"ancestor=neohq/redis-cluster:latest",
"--format",
"{{.ID}}",
])
.output()
.await;
if let Ok(output) = output {
let container_ids = String::from_utf8_lossy(&output.stdout);
for container_id in container_ids.lines() {
let container_id = container_id.trim();
if !container_id.is_empty() {
eprintln!(
"Stopping existing Redis cluster container: {}",
container_id
);
let _ = tokio::process::Command::new("docker")
.args(["stop", container_id])
.output()
.await;
let _ = tokio::process::Command::new("docker")
.args(["rm", container_id])
.output()
.await;
}
}
}
}
let (cluster, node_ports) = {
use testcontainers::core::IntoContainerPort;
let cluster = GenericImage::new("neohq/redis-cluster", "latest")
.with_exposed_port(7000.tcp())
.with_exposed_port(7001.tcp())
.with_exposed_port(7002.tcp())
.with_exposed_port(7003.tcp())
.with_exposed_port(7004.tcp())
.with_exposed_port(7005.tcp())
.with_wait_for(WaitFor::message_on_stdout("[OK] All 16384 slots covered."))
.with_startup_timeout(std::time::Duration::from_secs(600))
.start()
.await
.expect("Failed to start Redis cluster container");
let node_ports = vec![
cluster
.get_host_port_ipv4(7000)
.await
.expect("Failed to get port for node 7000"),
cluster
.get_host_port_ipv4(7001)
.await
.expect("Failed to get port for node 7001"),
cluster
.get_host_port_ipv4(7002)
.await
.expect("Failed to get port for node 7002"),
cluster
.get_host_port_ipv4(7003)
.await
.expect("Failed to get port for node 7003"),
cluster
.get_host_port_ipv4(7004)
.await
.expect("Failed to get port for node 7004"),
cluster
.get_host_port_ipv4(7005)
.await
.expect("Failed to get port for node 7005"),
];
let max_retries = 30;
for retry in 0..max_retries {
let mut all_ready = true;
for &port in &node_ports {
if tokio::net::TcpStream::connect(format!("127.0.0.1:{}", port))
.await
.is_err()
{
all_ready = false;
break;
}
}
if all_ready {
eprintln!("All Redis cluster ports ready after {} attempts", retry + 1);
break;
}
if retry == max_retries - 1 {
panic!(
"Redis cluster ports not ready after {} retries. Ports: {:?}",
max_retries, node_ports
);
}
}
(cluster, node_ports)
};
{
let max_retries = 60;
for retry in 0..max_retries {
let client_result = redis::Client::open(format!("redis://127.0.0.1:{}", node_ports[0]));
if let Ok(client) = client_result
&& let Ok(mut conn) = client.get_multiplexed_async_connection().await
&& let Ok(info) = redis::cmd("CLUSTER")
.arg("INFO")
.query_async::<String>(&mut conn)
.await && info.contains("cluster_state:ok")
{
eprintln!(
"Redis cluster fully initialized after {} attempts",
retry + 1
);
break;
}
if retry == max_retries - 1 {
panic!(
"Redis cluster not initialized after {} retries. Ports: {:?}",
max_retries, node_ports
);
}
}
}
let cluster_nodes: Vec<String> = node_ports
.iter()
.map(|&port| format!("redis://127.0.0.1:{}", port))
.collect();
let client = redis::cluster::ClusterClient::new(cluster_nodes.clone())
.expect("Failed to create cluster client");
let mut conn = client
.get_async_connection()
.await
.expect("Failed to connect to cluster");
redis::cmd("PING")
.query_async::<String>(&mut conn)
.await
.expect("Failed to PING cluster");
eprintln!("Redis cluster connection verified");
let container = RedisClusterContainer {
container: cluster,
node_ports,
};
(container, Arc::new(client), cluster_nodes)
}
async fn try_start_mongodb_container()
-> Result<(ContainerAsync<GenericImage>, String, u16), Box<dyn std::error::Error>> {
use testcontainers::core::IntoContainerPort;
let mongo = GenericImage::new("mongo", "7.0")
.with_exposed_port(27017.tcp())
.with_wait_for(WaitFor::message_on_stdout("Waiting for connections"))
.with_startup_timeout(std::time::Duration::from_secs(60))
.start()
.await?;
let port = mongo.get_host_port_ipv4(27017).await?;
let connection_string = format!("mongodb://127.0.0.1:{}", port);
Ok((mongo, connection_string, port))
}
#[fixture]
pub async fn mongodb_container() -> (ContainerAsync<GenericImage>, String, u16) {
const MAX_RETRIES: u32 = 3;
const RETRY_DELAY_MS: u64 = 2000;
let mut last_error = None;
for attempt in 0..MAX_RETRIES {
match try_start_mongodb_container().await {
Ok(result) => return result,
Err(e) => {
eprintln!(
"MongoDB container start attempt {} of {} failed: {:?}",
attempt + 1,
MAX_RETRIES,
e
);
last_error = Some(e);
if attempt < MAX_RETRIES - 1 {
tokio::time::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)).await;
}
}
}
}
panic!(
"Failed to start MongoDB container after {} attempts: {:?}",
MAX_RETRIES, last_error
);
}
#[fixture]
pub async fn localstack_fixture() -> (ContainerAsync<GenericImage>, u16, String) {
use testcontainers::core::IntoContainerPort;
let localstack = GenericImage::new("localstack/localstack", "latest")
.with_exposed_port(4566.tcp())
.with_wait_for(WaitFor::message_on_stdout("Ready."))
.with_env_var("SERVICES", "s3,dynamodb")
.start()
.await
.expect("Failed to start LocalStack container");
let port = localstack
.get_host_port_ipv4(4566)
.await
.expect("Failed to get LocalStack port");
let endpoint = format!("http://localhost:{}", port);
(localstack, port, endpoint)
}
#[cfg(feature = "testcontainers")]
pub async fn postgres_with_migrations_from<P: reinhardt_db::migrations::MigrationProvider>()
-> Result<
(
ContainerAsync<GenericImage>,
std::sync::Arc<reinhardt_db::DatabaseConnection>,
),
Box<dyn std::error::Error>,
> {
use reinhardt_db::DatabaseConnection;
use reinhardt_db::migrations::executor::DatabaseMigrationExecutor;
use std::sync::Arc;
let (container, _pool, _port, url) = postgres_container().await;
let connection = DatabaseConnection::connect_postgres(&url)
.await
.map_err(|e| format!("Failed to connect to PostgreSQL for migrations: {}", e))?;
let migrations = P::migrations();
if !migrations.is_empty() {
let mut executor = DatabaseMigrationExecutor::new(connection.inner().clone());
executor
.apply_migrations(&migrations)
.await
.map_err(|e| format!("Failed to apply migrations: {}", e))?;
}
Ok((container, Arc::new(connection)))
}
#[fixture]
#[cfg(feature = "testcontainers")]
pub async fn mysql_container() -> (
ContainerAsync<GenericImage>,
Arc<sqlx::MySqlPool>,
u16,
String,
) {
use testcontainers::core::IntoContainerPort;
let mysql = GenericImage::new("mysql", "8.0")
.with_exposed_port(3306.tcp())
.with_wait_for(WaitFor::message_on_stderr(
"port: 3306 MySQL Community Server",
))
.with_startup_timeout(std::time::Duration::from_secs(120))
.with_env_var("MYSQL_ROOT_PASSWORD", "test")
.with_env_var("MYSQL_DATABASE", "test_db")
.start()
.await
.expect("Failed to start MySQL container");
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let mut port_retry = 0;
let max_port_retries = 7; let port = loop {
match mysql.get_host_port_ipv4(3306).await {
Ok(p) => break p,
Err(e) if port_retry < max_port_retries => {
port_retry += 1;
let delay = tokio::time::Duration::from_millis(200 * 2_u64.pow(port_retry));
eprintln!(
"MySQL port query attempt {} of {} failed: {:?}",
port_retry, max_port_retries, e
);
tokio::time::sleep(delay).await;
}
Err(e) => panic!(
"Failed to get MySQL port after {} retries: {}",
max_port_retries, e
),
}
};
let database_url = format!("mysql://root:test@localhost:{}/test_db", port);
let (max_conns, timeout_secs) = get_pool_config();
let mut retry_count = 0;
let max_retries = 7;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
let pool = loop {
match sqlx::mysql::MySqlPoolOptions::new()
.max_connections(max_conns)
.min_connections(1)
.acquire_timeout(std::time::Duration::from_secs(timeout_secs))
.idle_timeout(std::time::Duration::from_secs(600)) .max_lifetime(std::time::Duration::from_secs(1800)) .test_before_acquire(false) .connect(&database_url)
.await
{
Ok(pool) => {
match sqlx::query("SELECT 1").fetch_one(&pool).await {
Ok(_) => break pool,
Err(e) if retry_count < max_retries => {
eprintln!(
"MySQL health check attempt {} of {} failed: {:?}",
retry_count + 1,
max_retries,
e
);
retry_count += 1;
let delay = std::time::Duration::from_millis(200 * 2_u64.pow(retry_count));
tokio::time::sleep(delay).await;
continue;
}
Err(e) => panic!(
"MySQL health check failed after {} retries: {}",
max_retries, e
),
}
}
Err(e) if retry_count < max_retries => {
eprintln!(
"MySQL connection attempt {} of {} failed: {:?}",
retry_count + 1,
max_retries,
e
);
retry_count += 1;
let delay = std::time::Duration::from_millis(200 * 2_u64.pow(retry_count));
tokio::time::sleep(delay).await;
}
Err(e) => panic!(
"Failed to connect to MySQL after {} retries: {}",
max_retries, e
),
}
};
(mysql, Arc::new(pool), port, database_url)
}
#[cfg(feature = "testcontainers")]
pub async fn mysql_with_migrations_from<P: reinhardt_db::migrations::MigrationProvider>() -> (
ContainerAsync<GenericImage>,
std::sync::Arc<reinhardt_db::DatabaseConnection>,
) {
use reinhardt_db::DatabaseConnection;
use reinhardt_db::migrations::executor::DatabaseMigrationExecutor;
use std::sync::Arc;
let (container, _pool, _port, url) = mysql_container().await;
let connection = DatabaseConnection::connect_mysql(&url)
.await
.expect("Failed to connect to MySQL for migrations");
let migrations = P::migrations();
if !migrations.is_empty() {
let mut executor = DatabaseMigrationExecutor::new(connection.inner().clone());
executor
.apply_migrations(&migrations)
.await
.expect("Failed to apply migrations");
}
(container, Arc::new(connection))
}
#[cfg(feature = "testcontainers")]
pub async fn sqlite_with_migrations_from<P: reinhardt_db::migrations::MigrationProvider>()
-> std::sync::Arc<reinhardt_db::DatabaseConnection> {
use reinhardt_db::DatabaseConnection;
use reinhardt_db::migrations::executor::DatabaseMigrationExecutor;
use std::sync::Arc;
let database_url = "sqlite::memory:";
let connection = DatabaseConnection::connect_sqlite(database_url)
.await
.expect("Failed to connect to SQLite for migrations");
let migrations = P::migrations();
if !migrations.is_empty() {
let mut executor = DatabaseMigrationExecutor::new(connection.inner().clone());
executor
.apply_migrations(&migrations)
.await
.expect("Failed to apply migrations");
}
Arc::new(connection)
}
#[cfg(feature = "testcontainers")]
#[deprecated(
since = "0.1.0-rc.16",
note = "Use `postgres_with_migrations_from_dir()` instead. \
This fixture requires `collect_migrations!` macro registration \
which is being deprecated in favor of `FilesystemSource`."
)]
#[allow(deprecated)] #[rstest::fixture]
pub async fn postgres_with_all_migrations() -> Result<
(
ContainerAsync<GenericImage>,
std::sync::Arc<reinhardt_db::DatabaseConnection>,
),
Box<dyn std::error::Error>,
> {
use reinhardt_db::DatabaseConnection;
use reinhardt_db::migrations::executor::DatabaseMigrationExecutor;
use reinhardt_db::migrations::registry::{MigrationRegistry, global_registry};
use std::sync::Arc;
let (container, _pool, _port, url) = postgres_container().await;
let connection = DatabaseConnection::connect_postgres(&url)
.await
.map_err(|e| format!("Failed to connect to PostgreSQL for migrations: {}", e))?;
let migrations = global_registry().all_migrations();
if !migrations.is_empty() {
let mut executor = DatabaseMigrationExecutor::new(connection.inner().clone());
executor
.apply_migrations(&migrations)
.await
.map_err(|e| format!("Failed to apply migrations: {}", e))?;
}
Ok((container, Arc::new(connection)))
}
#[cfg(feature = "testcontainers")]
#[deprecated(
since = "0.1.0-rc.16",
note = "Use `postgres_with_migrations_from_dir()` instead. \
This function requires `collect_migrations!` macro registration \
which is being deprecated in favor of `FilesystemSource`."
)]
#[allow(deprecated)] pub async fn postgres_with_apps_migrations(
app_labels: &[&str],
) -> Result<
(
ContainerAsync<GenericImage>,
std::sync::Arc<reinhardt_db::DatabaseConnection>,
),
Box<dyn std::error::Error>,
> {
use reinhardt_db::DatabaseConnection;
use reinhardt_db::migrations::executor::DatabaseMigrationExecutor;
use reinhardt_db::migrations::registry::{MigrationRegistry, global_registry};
use std::sync::Arc;
let (container, _pool, _port, url) = postgres_container().await;
let connection = DatabaseConnection::connect_postgres(&url)
.await
.map_err(|e| format!("Failed to connect to PostgreSQL for migrations: {}", e))?;
let migrations: Vec<_> = global_registry()
.all_migrations()
.into_iter()
.filter(|m| app_labels.contains(&m.app_label.as_str()))
.collect();
if !migrations.is_empty() {
let mut executor = DatabaseMigrationExecutor::new(connection.inner().clone());
executor
.apply_migrations(&migrations)
.await
.map_err(|e| format!("Failed to apply migrations: {}", e))?;
}
Ok((container, Arc::new(connection)))
}
#[cfg(feature = "testcontainers")]
pub async fn postgres_with_migrations_from_dir(
migrations_dir: impl AsRef<std::path::Path>,
) -> Result<
(
ContainerAsync<GenericImage>,
std::sync::Arc<reinhardt_db::DatabaseConnection>,
),
Box<dyn std::error::Error>,
> {
use reinhardt_db::DatabaseConnection;
use reinhardt_db::migrations::FilesystemSource;
use reinhardt_db::migrations::MigrationSource;
use reinhardt_db::migrations::executor::DatabaseMigrationExecutor;
use std::sync::Arc;
let (container, _pool, _port, url) = postgres_container().await;
let connection = DatabaseConnection::connect_postgres(&url)
.await
.map_err(|e| format!("Failed to connect to PostgreSQL for migrations: {}", e))?;
let source = FilesystemSource::new(migrations_dir);
let migrations = source
.all_migrations()
.await
.map_err(|e| format!("Failed to load migrations from filesystem: {}", e))?;
if !migrations.is_empty() {
let mut executor = DatabaseMigrationExecutor::new(connection.inner().clone());
executor
.apply_migrations(&migrations)
.await
.map_err(|e| format!("Failed to apply migrations: {}", e))?;
}
reinhardt_db::orm::reinitialize_database(&url)
.await
.map_err(|e| format!("Failed to initialize ORM global state: {}", e))?;
Ok((container, Arc::new(connection)))
}
#[cfg(feature = "testcontainers")]
#[deprecated(
since = "0.1.0-rc.16",
note = "Use filesystem-based migration loading instead. \
This fixture requires `collect_migrations!` macro registration \
which is being deprecated in favor of `FilesystemSource`."
)]
#[allow(deprecated)] #[rstest::fixture]
pub async fn mysql_with_all_migrations() -> (
ContainerAsync<GenericImage>,
std::sync::Arc<reinhardt_db::DatabaseConnection>,
) {
use reinhardt_db::DatabaseConnection;
use reinhardt_db::migrations::executor::DatabaseMigrationExecutor;
use reinhardt_db::migrations::registry::{MigrationRegistry, global_registry};
use std::sync::Arc;
let (container, _pool, _port, url) = mysql_container().await;
let connection = DatabaseConnection::connect_mysql(&url)
.await
.expect("Failed to connect to MySQL for migrations");
let migrations = global_registry().all_migrations();
if !migrations.is_empty() {
let mut executor = DatabaseMigrationExecutor::new(connection.inner().clone());
executor
.apply_migrations(&migrations)
.await
.expect("Failed to apply migrations");
}
(container, Arc::new(connection))
}
#[cfg(feature = "testcontainers")]
#[deprecated(
since = "0.1.0-rc.16",
note = "Use filesystem-based migration loading instead. \
This function requires `collect_migrations!` macro registration \
which is being deprecated in favor of `FilesystemSource`."
)]
#[allow(deprecated)] pub async fn mysql_with_apps_migrations(
app_labels: &[&str],
) -> (
ContainerAsync<GenericImage>,
std::sync::Arc<reinhardt_db::DatabaseConnection>,
) {
use reinhardt_db::DatabaseConnection;
use reinhardt_db::migrations::executor::DatabaseMigrationExecutor;
use reinhardt_db::migrations::registry::{MigrationRegistry, global_registry};
use std::sync::Arc;
let (container, _pool, _port, url) = mysql_container().await;
let connection = DatabaseConnection::connect_mysql(&url)
.await
.expect("Failed to connect to MySQL for migrations");
let migrations: Vec<_> = global_registry()
.all_migrations()
.into_iter()
.filter(|m| app_labels.contains(&m.app_label.as_str()))
.collect();
if !migrations.is_empty() {
let mut executor = DatabaseMigrationExecutor::new(connection.inner().clone());
executor
.apply_migrations(&migrations)
.await
.expect("Failed to apply migrations");
}
(container, Arc::new(connection))
}
#[cfg(feature = "testcontainers")]
#[deprecated(
since = "0.1.0-rc.16",
note = "Use filesystem-based migration loading instead. \
This fixture requires `collect_migrations!` macro registration \
which is being deprecated in favor of `FilesystemSource`."
)]
#[allow(deprecated)] #[rstest::fixture]
pub async fn sqlite_with_all_migrations() -> std::sync::Arc<reinhardt_db::DatabaseConnection> {
use reinhardt_db::DatabaseConnection;
use reinhardt_db::migrations::executor::DatabaseMigrationExecutor;
use reinhardt_db::migrations::registry::{MigrationRegistry, global_registry};
use std::sync::Arc;
let database_url = "sqlite::memory:";
let connection = DatabaseConnection::connect_sqlite(database_url)
.await
.expect("Failed to connect to SQLite for migrations");
let migrations = global_registry().all_migrations();
if !migrations.is_empty() {
let mut executor = DatabaseMigrationExecutor::new(connection.inner().clone());
executor
.apply_migrations(&migrations)
.await
.expect("Failed to apply migrations");
}
Arc::new(connection)
}
#[cfg(feature = "testcontainers")]
#[deprecated(
since = "0.1.0-rc.16",
note = "Use filesystem-based migration loading instead. \
This function requires `collect_migrations!` macro registration \
which is being deprecated in favor of `FilesystemSource`."
)]
#[allow(deprecated)] pub async fn sqlite_with_apps_migrations(
app_labels: &[&str],
) -> std::sync::Arc<reinhardt_db::DatabaseConnection> {
use reinhardt_db::DatabaseConnection;
use reinhardt_db::migrations::executor::DatabaseMigrationExecutor;
use reinhardt_db::migrations::registry::{MigrationRegistry, global_registry};
use std::sync::Arc;
let database_url = "sqlite::memory:";
let connection = DatabaseConnection::connect_sqlite(database_url)
.await
.expect("Failed to connect to SQLite for migrations");
let migrations: Vec<_> = global_registry()
.all_migrations()
.into_iter()
.filter(|m| app_labels.contains(&m.app_label.as_str()))
.collect();
if !migrations.is_empty() {
let mut executor = DatabaseMigrationExecutor::new(connection.inner().clone());
executor
.apply_migrations(&migrations)
.await
.expect("Failed to apply migrations");
}
Arc::new(connection)
}
#[fixture]
pub async fn rabbitmq_container() -> (ContainerAsync<GenericImage>, u16, String) {
const MAX_RETRIES: u32 = 3;
const RETRY_DELAY_MS: u64 = 2000;
let mut last_error = None;
for attempt in 0..MAX_RETRIES {
match try_start_rabbitmq_container().await {
Ok(result) => return result,
Err(e) => {
eprintln!(
"RabbitMQ container start attempt {} of {} failed: {:?}",
attempt + 1,
MAX_RETRIES,
e
);
last_error = Some(e);
if attempt < MAX_RETRIES - 1 {
tokio::time::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)).await;
}
}
}
}
panic!(
"Failed to start RabbitMQ container after {} attempts: {:?}",
MAX_RETRIES, last_error
);
}
async fn try_start_rabbitmq_container()
-> Result<(ContainerAsync<GenericImage>, u16, String), Box<dyn std::error::Error>> {
use testcontainers::core::IntoContainerPort;
let rabbitmq = GenericImage::new("rabbitmq", "3-management-alpine")
.with_exposed_port(5672.tcp()) .with_exposed_port(15672.tcp()) .with_wait_for(WaitFor::message_on_stdout("Server startup complete"))
.with_startup_timeout(std::time::Duration::from_secs(120))
.start()
.await?;
let mut port_retry = 0;
let max_port_retries = 5;
let port = loop {
match rabbitmq.get_host_port_ipv4(5672).await {
Ok(p) => break p,
Err(_) if port_retry < max_port_retries => {
port_retry += 1;
let delay = std::time::Duration::from_millis(100 * 2_u64.pow(port_retry));
tokio::time::sleep(delay).await;
}
Err(e) => {
return Err(Box::new(std::io::Error::other(format!(
"Failed to get RabbitMQ port after {} retries: {}",
max_port_retries, e
))));
}
}
};
let url = format!("amqp://localhost:{}/%2f", port);
Ok((rabbitmq, port, url))
}
#[cfg(feature = "testcontainers")]
static SHARED_KAFKA: tokio::sync::OnceCell<Arc<crate::containers::KafkaContainer>> =
tokio::sync::OnceCell::const_new();
#[cfg(feature = "testcontainers")]
pub async fn shared_kafka_container() -> Arc<crate::containers::KafkaContainer> {
SHARED_KAFKA
.get_or_init(|| async { Arc::new(crate::containers::KafkaContainer::new().await) })
.await
.clone()
}
#[cfg(feature = "testcontainers")]
#[fixture]
pub async fn kafka_container() -> Arc<crate::containers::KafkaContainer> {
shared_kafka_container().await
}
#[cfg(all(test, feature = "testcontainers"))]
mod tests {
use super::*;
use rstest::*;
#[rstest]
fn test_get_pool_config_defaults() {
let (max_connections, acquire_timeout) = get_pool_config();
assert!(
max_connections > 0,
"Expected max_connections > 0, got: {}",
max_connections
);
assert!(
acquire_timeout > 0,
"Expected acquire_timeout > 0, got: {}",
acquire_timeout
);
}
#[rstest]
#[tokio::test]
async fn test_is_port_available() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("Failed to bind to random port");
let port = listener.local_addr().unwrap().port();
drop(listener);
let available = is_port_available(port).await;
assert!(available, "Expected released port {} to be available", port);
}
#[rstest]
#[tokio::test]
async fn test_postgres_container_connects(
#[future] postgres_container: (
ContainerAsync<GenericImage>,
Arc<sqlx::PgPool>,
u16,
String,
),
) {
let (_container, pool, _port, _url) = postgres_container.await;
let row: (i32,) = sqlx::query_as("SELECT 1")
.fetch_one(pool.as_ref())
.await
.expect("Failed to execute SELECT 1 on postgres container");
assert_eq!(row.0, 1);
}
#[rstest]
#[tokio::test]
async fn test_postgres_container_port_nonzero(
#[future] postgres_container: (
ContainerAsync<GenericImage>,
Arc<sqlx::PgPool>,
u16,
String,
),
) {
let (_container, _pool, port, _url) = postgres_container.await;
assert!(port > 0, "Expected port to be non-zero, got: {}", port);
}
#[rstest]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_create_test_any_pool(
#[future] postgres_container: (
ContainerAsync<GenericImage>,
Arc<sqlx::PgPool>,
u16,
String,
),
) {
let (_container, _pool, _port, url) = postgres_container.await;
sqlx::any::install_default_drivers();
let any_pool = create_test_any_pool(&url).await;
assert!(
any_pool.is_ok(),
"Expected create_test_any_pool to succeed, got: {:?}",
any_pool.err()
);
}
}