use testcontainers::core::WaitFor;
use testcontainers::runners::AsyncRunner;
use testcontainers::{ContainerAsync, GenericImage, ImageExt};
use testcontainers_modules::mysql::Mysql;
const TEST_WAIT_READY_KEY: &str = "_test_wait_ready";
#[async_trait::async_trait]
pub trait TestDatabase: Send + Sync {
fn connection_url(&self) -> String;
fn database_type(&self) -> &'static str;
async fn wait_ready(&self) -> Result<(), Box<dyn std::error::Error>>;
}
pub struct PostgresContainer {
#[allow(dead_code)]
container: ContainerAsync<GenericImage>,
host: String,
port: u16,
database: String,
username: String,
password: String,
}
pub async fn start_postgres() -> (PostgresContainer, String) {
let container = PostgresContainer::new().await;
let url = container.connection_url();
(container, url)
}
pub async fn start_postgres_with_credentials(
username: &str,
password: &str,
database: &str,
) -> (PostgresContainer, String) {
let container = PostgresContainer::with_credentials(username, password, database).await;
let url = container.connection_url();
(container, url)
}
impl PostgresContainer {
pub async fn new() -> Self {
Self::with_credentials("postgres", "postgres", "test").await
}
pub async fn with_credentials(username: &str, password: &str, database: &str) -> Self {
use testcontainers::core::IntoContainerPort;
let image = GenericImage::new("postgres", "17-alpine")
.with_exposed_port(5432.tcp())
.with_wait_for(WaitFor::message_on_stderr(
"database system is ready to accept connections",
))
.with_env_var("POSTGRES_USER", username)
.with_env_var("POSTGRES_PASSWORD", password)
.with_env_var("POSTGRES_DB", database);
let container = AsyncRunner::start(image)
.await
.expect("Failed to start PostgreSQL container");
let port = container
.get_host_port_ipv4(5432)
.await
.expect("Failed to get PostgreSQL port");
Self {
container,
host: "localhost".to_string(),
port,
database: database.to_string(),
username: username.to_string(),
password: password.to_string(),
}
}
pub fn port(&self) -> u16 {
self.port
}
}
#[async_trait::async_trait]
impl TestDatabase for PostgresContainer {
fn connection_url(&self) -> String {
format!(
"postgres://{}:{}@{}:{}/{}?sslmode=disable",
self.username, self.password, self.host, self.port, self.database
)
}
fn database_type(&self) -> &'static str {
"postgres"
}
async fn wait_ready(&self) -> Result<(), Box<dyn std::error::Error>> {
let url = self.connection_url();
let pool = sqlx::postgres::PgPool::connect(&url).await?;
sqlx::query("SELECT 1").execute(&pool).await?;
pool.close().await;
Ok(())
}
}
pub struct MySqlContainer {
#[allow(dead_code)]
container: ContainerAsync<Mysql>,
host: String,
port: u16,
database: String,
username: String,
password: String,
}
impl MySqlContainer {
pub async fn new() -> Self {
Self::with_credentials("root", "test", "test").await
}
pub async fn with_credentials(username: &str, password: &str, database: &str) -> Self {
let image = Mysql::default()
.with_tag("8.0")
.with_env_var("MYSQL_ROOT_PASSWORD", password)
.with_env_var("MYSQL_DATABASE", database);
let container = AsyncRunner::start(image)
.await
.expect("Failed to start MySQL container");
let port = container
.get_host_port_ipv4(3306)
.await
.expect("MySQL container port should be available after startup");
Self {
container,
host: "localhost".to_string(),
port,
database: database.to_string(),
username: username.to_string(),
password: password.to_string(),
}
}
pub fn port(&self) -> u16 {
self.port
}
}
#[async_trait::async_trait]
impl TestDatabase for MySqlContainer {
fn connection_url(&self) -> String {
format!(
"mysql://{}:{}@{}:{}/{}",
self.username, self.password, self.host, self.port, self.database
)
}
fn database_type(&self) -> &'static str {
"mysql"
}
async fn wait_ready(&self) -> Result<(), Box<dyn std::error::Error>> {
let url = self.connection_url();
let pool = sqlx::mysql::MySqlPool::connect(&url).await?;
sqlx::query("SELECT 1").execute(&pool).await?;
pool.close().await;
Ok(())
}
}
pub struct RedisContainer {
#[allow(dead_code)]
container: ContainerAsync<GenericImage>,
host: String,
port: u16,
}
pub async fn start_redis() -> (RedisContainer, String) {
let container = RedisContainer::new().await;
let url = container.connection_url();
(container, url)
}
impl RedisContainer {
pub async fn new() -> Self {
use testcontainers::core::IntoContainerPort;
let image = GenericImage::new("redis", "7-alpine")
.with_exposed_port(6379.tcp())
.with_wait_for(WaitFor::message_on_stdout("Ready to accept connections"));
let container = AsyncRunner::start(image)
.await
.expect("Failed to start Redis container");
let port = container
.get_host_port_ipv4(6379)
.await
.expect("Redis container port should be available after startup");
let redis_container = Self {
container,
host: "localhost".to_string(),
port,
};
redis_container
.wait_until_ready()
.await
.expect("Redis container failed to become ready");
redis_container
}
async fn wait_until_ready(&self) -> Result<(), Box<dyn std::error::Error>> {
use redis::AsyncCommands;
use tokio::time::{Duration, sleep};
let connection_url = self.connection_url();
for attempt in 1..=30 {
match redis::Client::open(connection_url.as_str()) {
Ok(client) => {
match client.get_multiplexed_async_connection().await {
Ok(mut conn) => {
match conn.ping::<String>().await {
Ok(_) => {
return Ok(());
}
Err(e) if attempt < 30 => {
eprintln!("Redis PING attempt {}/30 failed: {}", attempt, e);
sleep(Duration::from_millis(500)).await;
}
Err(e) => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!(
"Redis failed to become ready after 30 attempts: {}",
e
),
)));
}
}
}
Err(e) if attempt < 30 => {
eprintln!("Redis connection attempt {}/30 failed: {}", attempt, e);
sleep(Duration::from_millis(500)).await;
}
Err(e) => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!("Redis failed to become ready after 30 attempts: {}", e),
)));
}
}
}
Err(e) if attempt < 30 => {
eprintln!("Redis client creation attempt {}/30 failed: {}", attempt, e);
sleep(Duration::from_millis(500)).await;
}
Err(e) => {
return Err(Box::new(e));
}
}
}
Ok(())
}
pub fn connection_url(&self) -> String {
format!("redis://{}:{}", self.host, self.port)
}
pub fn port(&self) -> u16 {
self.port
}
}
pub struct MemcachedContainer {
#[allow(dead_code)]
container: ContainerAsync<GenericImage>,
host: String,
port: u16,
}
pub async fn start_memcached() -> (MemcachedContainer, String) {
let container = MemcachedContainer::new().await;
let url = container.connection_url();
(container, url)
}
impl MemcachedContainer {
pub async fn new() -> Self {
use testcontainers::core::IntoContainerPort;
let image = GenericImage::new("memcached", "1.6-alpine").with_exposed_port(11211.tcp());
let container = AsyncRunner::start(image)
.await
.expect("Failed to start Memcached container");
let port = container
.get_host_port_ipv4(11211)
.await
.expect("Memcached container port should be available after startup");
let instance = Self {
container,
host: "localhost".to_string(),
port,
};
instance
.wait_ready()
.await
.expect("Failed to wait for Memcached to be ready");
instance
}
pub fn connection_url(&self) -> String {
format!("{}:{}", self.host, self.port)
}
pub fn port(&self) -> u16 {
self.port
}
pub async fn wait_ready(&self) -> Result<(), Box<dyn std::error::Error>> {
use memcache_async::ascii::Protocol;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::sleep;
use tokio_util::compat::TokioAsyncReadCompatExt;
let max_attempts = 10;
let mut attempt = 0;
let base_delay = Duration::from_millis(100);
let test_key = TEST_WAIT_READY_KEY.to_string();
let test_value = b"ready";
while attempt < max_attempts {
match TcpStream::connect(format!("{}:{}", self.host, self.port)).await {
Ok(stream) => {
let compat_stream = stream.compat();
let mut proto = Protocol::new(compat_stream);
if let Ok(()) = proto.set(&test_key, test_value, 10).await {
if let Ok(retrieved) = proto.get(&test_key).await
&& retrieved == test_value
{
let _ = proto.delete(&test_key).await;
sleep(Duration::from_millis(50)).await;
return Ok(());
}
}
attempt += 1;
let delay = base_delay * 2_u32.pow(attempt.min(5));
sleep(delay).await;
}
Err(e) => {
attempt += 1;
if attempt >= max_attempts {
return Err(format!(
"Memcached not ready after {} attempts: {}",
max_attempts, e
)
.into());
}
let delay = base_delay * 2_u32.pow(attempt.min(5));
sleep(delay).await;
}
}
}
Err("Memcached not ready: set/get test failed after maximum retry attempts".into())
}
}
pub async fn with_memcached<F, Fut>(f: F) -> Result<(), Box<dyn std::error::Error>>
where
F: FnOnce(MemcachedContainer) -> Fut,
Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error>>>,
{
let container = MemcachedContainer::new().await;
f(container).await
}
pub async fn with_postgres<F, Fut>(f: F) -> Result<(), Box<dyn std::error::Error>>
where
F: FnOnce(PostgresContainer) -> Fut,
Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error>>>,
{
let container = PostgresContainer::new().await;
container.wait_ready().await?;
f(container).await
}
pub async fn with_mysql<F, Fut>(f: F) -> Result<(), Box<dyn std::error::Error>>
where
F: FnOnce(MySqlContainer) -> Fut,
Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error>>>,
{
let container = MySqlContainer::new().await;
container.wait_ready().await?;
f(container).await
}
pub async fn with_redis<F, Fut>(f: F) -> Result<(), Box<dyn std::error::Error>>
where
F: FnOnce(RedisContainer) -> Fut,
Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error>>>,
{
let container = RedisContainer::new().await;
f(container).await
}
pub struct RabbitMQContainer {
#[allow(dead_code)]
container: ContainerAsync<GenericImage>,
host: String,
port: u16,
management_port: u16,
username: String,
password: String,
}
pub async fn start_rabbitmq() -> (RabbitMQContainer, String, String) {
let container = RabbitMQContainer::new().await;
let url = container.connection_url();
let mgmt_url = container.management_url();
(container, url, mgmt_url)
}
impl RabbitMQContainer {
pub async fn new() -> Self {
Self::with_credentials("guest", "guest").await
}
pub async fn with_credentials(username: &str, password: &str) -> Self {
use testcontainers::core::IntoContainerPort;
let image = GenericImage::new("rabbitmq", "3-management-alpine")
.with_exposed_port(5672.tcp()) .with_exposed_port(15672.tcp()) .with_wait_for(WaitFor::message_on_stdout("Server startup complete"))
.with_env_var("RABBITMQ_DEFAULT_USER", username)
.with_env_var("RABBITMQ_DEFAULT_PASS", password);
let container = AsyncRunner::start(image)
.await
.expect("Failed to start RabbitMQ container");
let port = container
.get_host_port_ipv4(5672)
.await
.expect("RabbitMQ AMQP container port should be available after startup");
let management_port = container
.get_host_port_ipv4(15672)
.await
.expect("RabbitMQ management container port should be available after startup");
let rabbitmq_container = Self {
container,
host: "localhost".to_string(),
port,
management_port,
username: username.to_string(),
password: password.to_string(),
};
rabbitmq_container
.wait_until_ready()
.await
.expect("RabbitMQ container failed to become ready");
rabbitmq_container
}
async fn wait_until_ready(&self) -> Result<(), Box<dyn std::error::Error>> {
use tokio::time::{Duration, sleep};
let connection_url = self.connection_url();
for attempt in 1..=30 {
match lapin::Connection::connect(
&connection_url,
lapin::ConnectionProperties::default(),
)
.await
{
Ok(conn) => {
let _ = conn.close(200, "OK").await;
return Ok(());
}
Err(e) if attempt < 30 => {
eprintln!("RabbitMQ connection attempt {}/30 failed: {}", attempt, e);
sleep(Duration::from_millis(500)).await;
}
Err(e) => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!("RabbitMQ failed to become ready after 30 attempts: {}", e),
)));
}
}
}
Ok(())
}
pub fn connection_url(&self) -> String {
format!(
"amqp://{}:{}@{}:{}",
self.username, self.password, self.host, self.port
)
}
pub fn management_url(&self) -> String {
format!("http://{}:{}", self.host, self.management_port)
}
pub fn port(&self) -> u16 {
self.port
}
pub fn management_port(&self) -> u16 {
self.management_port
}
}
pub async fn with_rabbitmq<F, Fut>(f: F) -> Result<(), Box<dyn std::error::Error>>
where
F: FnOnce(RabbitMQContainer) -> Fut,
Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error>>>,
{
let container = RabbitMQContainer::new().await;
f(container).await
}
pub struct MailpitContainer {
#[allow(dead_code)]
container: ContainerAsync<GenericImage>,
host: String,
smtp_port: u16,
http_port: u16,
}
pub async fn start_mailpit() -> (MailpitContainer, String, String) {
let container = MailpitContainer::new().await;
let smtp_url = container.smtp_url();
let http_url = container.http_url();
(container, smtp_url, http_url)
}
impl MailpitContainer {
pub async fn new() -> Self {
use testcontainers::core::IntoContainerPort;
let image = GenericImage::new("axllent/mailpit", "latest")
.with_exposed_port(1025.tcp()) .with_exposed_port(8025.tcp()) .with_cmd(["--smtp-auth-accept-any", "--smtp-auth-allow-insecure"]);
let container = AsyncRunner::start(image)
.await
.expect("Failed to start Mailpit container");
let smtp_port = container
.get_host_port_ipv4(1025)
.await
.expect("Mailpit SMTP container port should be available after startup");
let http_port = container
.get_host_port_ipv4(8025)
.await
.expect("Mailpit HTTP container port should be available after startup");
let mailpit_container = Self {
container,
host: "localhost".to_string(),
smtp_port,
http_port,
};
mailpit_container
.wait_until_ready()
.await
.expect("Mailpit container failed to become ready");
mailpit_container
}
async fn wait_until_ready(&self) -> Result<(), Box<dyn std::error::Error>> {
use tokio::time::{Duration, sleep};
let http_url = format!("{}/api/v1/messages", self.http_url());
for attempt in 1..=30 {
match reqwest::get(&http_url).await {
Ok(response) if response.status().is_success() => {
return Ok(());
}
Ok(response) if attempt < 30 => {
eprintln!(
"Mailpit HTTP check attempt {}/30 failed with status: {}",
attempt,
response.status()
);
sleep(Duration::from_millis(500)).await;
}
Ok(response) => {
return Err(format!(
"Mailpit HTTP API not ready after 30 attempts, last status: {}",
response.status()
)
.into());
}
Err(e) if attempt < 30 => {
eprintln!("Mailpit HTTP check attempt {}/30 failed: {}", attempt, e);
sleep(Duration::from_millis(500)).await;
}
Err(e) => {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::ConnectionRefused,
format!("Mailpit failed to become ready after 30 attempts: {}", e),
)));
}
}
}
Ok(())
}
pub fn smtp_url(&self) -> String {
format!("smtp://{}:{}", self.host, self.smtp_port)
}
pub fn http_url(&self) -> String {
format!("http://{}:{}", self.host, self.http_port)
}
pub fn smtp_port(&self) -> u16 {
self.smtp_port
}
pub fn http_port(&self) -> u16 {
self.http_port
}
}
pub async fn with_mailpit<F, Fut>(f: F) -> Result<(), Box<dyn std::error::Error>>
where
F: FnOnce(MailpitContainer) -> Fut,
Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error>>>,
{
let container = MailpitContainer::new().await;
f(container).await
}
pub mod sqlite {
pub fn memory_url() -> &'static str {
"sqlite::memory:"
}
pub fn temp_file_url(name: &str) -> String {
assert!(!name.is_empty(), "temp_file_url: name must not be empty");
assert!(
!name.contains(".."),
"temp_file_url: name must not contain '..' (path traversal)"
);
assert!(
!name.contains('/') && !name.contains('\\'),
"temp_file_url: name must not contain path separators ('/' or '\\')"
);
assert!(
!name.contains('\0'),
"temp_file_url: name must not contain null bytes"
);
assert!(
name.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.'),
"temp_file_url: name must contain only alphanumeric characters, hyphens, underscores, or dots"
);
format!("sqlite:/tmp/{}.db", name)
}
}
#[cfg(test)]
mod tests {
use super::*;
use rstest::rstest;
#[rstest]
#[tokio::test]
async fn test_rabbitmq_connection_url_uses_default_credentials() {
let container = RabbitMQContainer::new().await;
let url = container.connection_url();
assert!(url.starts_with("amqp://guest:guest@"));
}
#[rstest]
#[tokio::test]
async fn test_rabbitmq_connection_url_uses_custom_credentials() {
let container = RabbitMQContainer::with_credentials("admin", "secret_pass").await;
let url = container.connection_url();
assert!(url.starts_with("amqp://admin:secret_pass@"));
}
#[tokio::test]
async fn test_postgres_container() {
with_postgres(|db| async move {
let url = db.connection_url();
assert!(url.starts_with("postgres://"));
assert_eq!(db.database_type(), "postgres");
Ok(())
})
.await
.unwrap();
}
#[tokio::test]
async fn test_mysql_container() {
with_mysql(|db| async move {
let url = db.connection_url();
assert!(url.starts_with("mysql://"));
assert_eq!(db.database_type(), "mysql");
Ok(())
})
.await
.unwrap();
}
#[tokio::test]
async fn test_redis_container() {
with_redis(|redis| async move {
let url = redis.connection_url();
assert!(url.starts_with("redis://"));
Ok(())
})
.await
.unwrap();
}
#[rstest]
fn test_temp_file_url_accepts_valid_name() {
let name = "test_db";
let url = sqlite::temp_file_url(name);
assert_eq!(url, "sqlite:/tmp/test_db.db");
}
#[rstest]
fn test_temp_file_url_accepts_name_with_dots_and_hyphens() {
let name = "my-test.db-v2";
let url = sqlite::temp_file_url(name);
assert_eq!(url, "sqlite:/tmp/my-test.db-v2.db");
}
#[rstest]
#[should_panic(expected = "must not be empty")]
fn test_temp_file_url_rejects_empty_name() {
let name = "";
sqlite::temp_file_url(name);
}
#[rstest]
#[should_panic(expected = "path traversal")]
fn test_temp_file_url_rejects_path_traversal() {
let name = "../../etc/passwd";
sqlite::temp_file_url(name);
}
#[rstest]
#[should_panic(expected = "path separators")]
fn test_temp_file_url_rejects_forward_slash() {
let name = "foo/bar";
sqlite::temp_file_url(name);
}
#[rstest]
#[should_panic(expected = "path separators")]
fn test_temp_file_url_rejects_backslash() {
let name = "foo\\bar";
sqlite::temp_file_url(name);
}
#[rstest]
#[should_panic(expected = "null bytes")]
fn test_temp_file_url_rejects_null_bytes() {
let name = "test\0db";
sqlite::temp_file_url(name);
}
#[rstest]
#[should_panic(expected = "alphanumeric")]
fn test_temp_file_url_rejects_special_characters() {
let name = "test db!@#";
sqlite::temp_file_url(name);
}
}
pub struct KafkaContainer {
#[allow(dead_code)] container: ContainerAsync<GenericImage>,
host: String,
port: u16,
}
pub async fn start_kafka() -> (KafkaContainer, Vec<String>) {
let container = KafkaContainer::new().await;
let brokers = container.brokers();
(container, brokers)
}
impl KafkaContainer {
pub async fn new() -> Self {
use testcontainers::core::IntoContainerPort;
let host_port = reserve_free_port();
let image = GenericImage::new("apache/kafka", "3.8.1")
.with_exposed_port(9092.tcp())
.with_wait_for(WaitFor::message_on_stdout("Kafka Server started"))
.with_env_var("KAFKA_NODE_ID", "0")
.with_env_var("KAFKA_PROCESS_ROLES", "controller,broker")
.with_env_var(
"KAFKA_LISTENERS",
"PLAINTEXT://:9092,CONTROLLER://:9093",
)
.with_env_var(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT",
)
.with_env_var("KAFKA_CONTROLLER_QUORUM_VOTERS", "0@localhost:9093")
.with_env_var("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
.with_env_var("KAFKA_INTER_BROKER_LISTENER_NAME", "PLAINTEXT")
.with_env_var(
"KAFKA_ADVERTISED_LISTENERS",
format!("PLAINTEXT://localhost:{host_port}"),
)
.with_mapped_port(host_port, 9092.tcp());
let container = AsyncRunner::start(image)
.await
.expect("Failed to start Kafka container");
let host = container
.get_host()
.await
.expect("Failed to get Kafka host")
.to_string();
Self {
container,
host,
port: host_port,
}
}
pub fn brokers(&self) -> Vec<String> {
vec![format!("{}:{}", self.host, self.port)]
}
}
fn reserve_free_port() -> u16 {
use std::net::TcpListener;
TcpListener::bind("127.0.0.1:0")
.expect("Failed to bind ephemeral port for Kafka")
.local_addr()
.expect("Failed to read local_addr")
.port()
}