use colored::Colorize;
use env_logger::{Builder, Env};
use std::any::Any;
use std::collections::HashMap;
use std::sync::mpsc::{self, Receiver, Sender};
use std::{io::Write, sync::OnceLock, thread};
use testcontainers::bollard::Docker;
use testcontainers::bollard::query_parameters::{
InspectContainerOptionsBuilder, RemoveContainerOptionsBuilder,
};
use testcontainers::bollard::secret::ContainerInspectResponse;
use thiserror::Error;
use tokio::sync::Mutex;
use tokio::task;
use tracing::info;
use crate::Server;
use crate::settings::Settings;
pub type Result<T, E = TestError> = std::result::Result<T, E>;
#[derive(Debug, Error)]
pub enum TestError {
#[error("Failed to get absolute path for mount source : {0}")]
AbsolutePathConversion(String),
#[error("Failed to create container : {0}")]
ContainerCreation(String),
#[error("{0}")]
Custom(String),
}
enum ContainerCommands {
Stop,
}
struct Channel<T> {
tx: Sender<T>,
rx: Mutex<Receiver<T>>,
}
fn channel<T>() -> Channel<T> {
let (tx, rx) = mpsc::channel();
Channel {
tx,
rx: Mutex::new(rx),
}
}
static CONTAINERS: OnceLock<Mutex<HashMap<String, String>>> = std::sync::OnceLock::new();
static CONTAINER_NOTIFIER_CHANNEL: OnceLock<Channel<ContainerCommands>> = OnceLock::new();
fn container_notifier_channel() -> &'static Channel<ContainerCommands> {
CONTAINER_NOTIFIER_CHANNEL.get_or_init(channel)
}
static SHUTDOWN_NOTIFIER_CHANNEL: OnceLock<Channel<()>> = OnceLock::new();
fn shutdown_notifier_channel() -> &'static Channel<()> {
SHUTDOWN_NOTIFIER_CHANNEL.get_or_init(channel)
}
static INITIALIZE_NOTIFIER_CHANNEL: OnceLock<Channel<()>> = OnceLock::new();
fn initialize_notifier_channel() -> &'static Channel<()> {
INITIALIZE_NOTIFIER_CHANNEL.get_or_init(channel)
}
static DOCKER_CLIENT: OnceLock<Docker> = OnceLock::new();
pub(crate) fn docker_client() -> &'static Docker {
DOCKER_CLIENT.get_or_init(|| {
Docker::connect_with_defaults().expect("Failed to connect to Docker daemon.")
})
}
pub async fn get_container(name: &str) -> Option<ContainerInspectResponse> {
let container_id = CONTAINERS
.get_or_init(|| Mutex::new(HashMap::new()))
.lock()
.await
.get(name)
.cloned();
if let Some(container_id) = container_id {
let options = InspectContainerOptionsBuilder::default().build();
let res = docker_client()
.inspect_container(container_id.as_str(), Some(options))
.await;
if let Ok(data) = res {
return Some(data);
}
}
None
}
pub async fn add_container(name: &str, id: String) {
CONTAINERS
.get_or_init(|| Mutex::new(HashMap::new()))
.lock()
.await
.insert(name.to_string(), id);
}
pub fn setup<F, P, Fut, PostFut>(init: F, post_init: P)
where
F: FnOnce() -> Fut + Send + 'static,
P: FnOnce() -> PostFut + Send + 'static,
Fut: Future<Output = (Vec<Box<dyn Any + Send>>, Settings)> + Send + 'static,
PostFut: Future<Output = ()> + Send + 'static,
{
configure_log();
let ascii_art = r#"
____ __ __ _ ______ __
/ _/___ / /_ ___ ___ _ ____ ___ _ / /_ (_)___ ___ /_ __/___ ___ / /_ ___
_/ / / _ \/ __// -_)/ _ `// __// _ `// __// // _ \ / _ \ / / / -_)(_-</ __/(_-<
/___//_//_/\__/ \__/ \_, //_/ \_,_/ \__//_/ \___//_//_/ /_/ \__//___/\__//___/
/___/
"#;
println!("{}", ascii_art);
info!("Initializing Test Environment ...");
thread::spawn(move || {
let body = async move {
let (mut _containers, settings) = init().await;
info!("Starting Server ...");
let result = Server::new_with_settings(settings).await;
match result {
Ok(server) => {
let result = server.intialize_database().await;
if let Ok(server) = result {
info!("{}", "Server started successfully!".bright_blue());
Server::set_global(server);
}
}
Err(e) => {
panic!("Failed to start server: {}", e);
}
}
info!("Processing Post Initialization Tasks...");
post_init().await;
initialize_notifier_channel()
.tx
.send(())
.expect("Failed to send setup signal.");
let _ = task::spawn_blocking(move || {
let rx = container_notifier_channel()
.rx
.blocking_lock()
.recv()
.expect("Failed to receive container command notification.");
match rx {
ContainerCommands::Stop => {
info!("Shutting Down Test Environment. Stopping Containers...");
CONTAINERS
.get_or_init(|| Mutex::new(HashMap::new()))
.blocking_lock()
.iter()
.for_each(|(name, container)| {
execute_blocking(async || {
info!(
"Stopping Container with name {} and id {}",
name.bright_blue(),
(&container[..13]).bright_blue()
);
let opts = RemoveContainerOptionsBuilder::default()
.force(true)
.v(true)
.build();
let res = docker_client()
.remove_container(container, Some(opts))
.await;
if res.is_err() {
info!("Failed to remove container: {:?}", res);
}
});
});
info!("All containers have been successfully stopped.");
}
}
})
.await;
shutdown_notifier_channel()
.tx
.send(())
.expect("Failed to send shutdown signal.");
info!(
"{}",
"The test environment has been shut down successfully.".bright_green()
);
};
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Cannot create Tests Tokio Runtime.")
.block_on(body);
});
initialize_notifier_channel()
.rx
.blocking_lock()
.recv()
.expect("Failed to receive setup signal.");
info!(
"{} {}",
"The test environment has been initialized successfully.",
"Starting Tests...".bright_green()
);
}
pub fn teardown() {
let _ = container_notifier_channel()
.tx
.send(ContainerCommands::Stop);
let guard = shutdown_notifier_channel().rx.try_lock();
if guard.is_err() {
panic!("Failed to receive shutdown signal.");
}
if let Ok(rx) = guard {
let _ = rx.recv();
}
}
pub(crate) fn execute_blocking<F, Fut>(future: F)
where
F: FnOnce() -> Fut,
Fut: Future<Output = ()>,
{
let rt = tokio::runtime::Runtime::new().expect("Cannot create Tokio Runtime.");
rt.block_on(future());
}
fn configure_log() {
let level = Env::default().default_filter_or("info,actix_web=error,actix_web_prom=error");
let _ = Builder::from_env(level)
.format(|buf, record| {
let level = match record.level() {
log::Level::Info => record.level().as_str().bright_green(),
log::Level::Debug => record.level().as_str().bright_blue(),
log::Level::Trace => record.level().as_str().bright_cyan(),
log::Level::Warn => record.level().as_str().bright_yellow(),
log::Level::Error => record.level().as_str().bright_red(),
};
let datetime = chrono::Local::now()
.format("%d-%m-%YT%H:%M:%S%.3f%:z")
.to_string()
.white();
writeln!(
buf,
"{:<24} {:<5} [{:<40}] - {}",
datetime, level, record.module_path().unwrap_or("unknown").blue(), record.args() )
})
.try_init();
}
pub mod containers {
use colored::Colorize;
use std::{fs, path::Path, time::Duration};
use testcontainers::{
ContainerAsync, CopyDataSource, GenericImage, ImageExt,
core::{Mount, WaitFor, ports::IntoContainerPort, wait::HttpWaitStrategy},
runners::AsyncRunner,
};
use testcontainers_modules::postgres::Postgres;
use tracing::{debug, info};
use crate::test::Result;
use crate::test::TestError;
fn absolute_path(path: &str) -> Result<String> {
let path = Path::new(path)
.canonicalize()
.map_err(|e| TestError::AbsolutePathConversion(e.to_string()))?
.to_str()
.ok_or_else(|| {
TestError::AbsolutePathConversion("Path is not valid UTF-8".to_string())
})?
.to_string();
Ok(path)
}
pub async fn keycloak(
realm_data_path: &str,
network: &str,
) -> Result<(ContainerAsync<GenericImage>, String)> {
let realm =
fs::read(realm_data_path).map_err(|e| TestError::ContainerCreation(e.to_string()))?;
let container = GenericImage::new("quay.io/keycloak/keycloak", "26.5.2")
.with_exposed_port(8080.tcp())
.with_exposed_port(9000.tcp())
.with_wait_for(WaitFor::http(
HttpWaitStrategy::new("/health/ready")
.with_port(9000.into())
.with_expected_status_code(200u16),
))
.with_cmd(vec!["start-dev", "--import-realm"])
.with_network(network)
.with_copy_to(
"/opt/keycloak/data/import/realm-export.json",
CopyDataSource::Data(realm),
)
.with_startup_timeout(Duration::from_secs(60))
.with_env_var("KC_BOOTSTRAP_ADMIN_USERNAME", "admin")
.with_env_var("KC_BOOTSTRAP_ADMIN_PASSWORD", "123456")
.with_env_var("KC_HTTP_ENABLED", "true")
.with_env_var("KC_HTTP_HOST", "0.0.0.0")
.with_env_var("KC_HEALTH_ENABLED", "true")
.with_env_var("KC_CACHE", "local")
.with_env_var("KC_FEATURES", "scripts")
.with_env_var("TZ", "America/Sao_Paulo")
.start()
.await
.map_err(|e| {
info!("Error starting Keycloak container: {}", e.to_string());
TestError::ContainerCreation(e.to_string())
})?;
let container_ip = container
.get_host()
.await
.map_err(|e| TestError::ContainerCreation(e.to_string()))?
.to_string();
let container_port = container
.get_host_port_ipv4(8080)
.await
.map_err(|e| TestError::ContainerCreation(e.to_string()))?
.to_string();
super::add_container("keycloak", container.id().to_string()).await;
let uri = format!("http://{}:{}", container_ip, container_port);
debug!("Keycloak Connection URL: {}", uri.bright_blue());
Ok((container, uri))
}
pub async fn postgres(
path: Option<String>,
database: Option<String>,
network: Option<String>,
user: Option<String>,
password: Option<String>,
) -> Result<(ContainerAsync<Postgres>, String)> {
info!("Creating Default Postgres Container...");
let mut entry_point = None;
if let Some(path) = path {
let mount_source_str = absolute_path(&path)?;
entry_point = Some(Mount::bind_mount(
mount_source_str.as_str(),
"/docker-entrypoint-initdb.d",
));
}
let builder = Postgres::default()
.with_db_name(database.clone().unwrap_or("postgres".into()).as_str());
let mut builder = builder
.with_network(network.unwrap_or("test_network".into()))
.with_startup_timeout(Duration::from_secs(30));
if let Some(entry_point) = entry_point {
builder = builder.with_mount(entry_point);
}
let container = builder
.start()
.await
.map_err(|e| TestError::ContainerCreation(e.to_string()))?;
let container_ip = container
.get_host()
.await
.map_err(|e| TestError::ContainerCreation(e.to_string()))?;
let container_port = container
.get_host_port_ipv4(5432)
.await
.map_err(|e| TestError::ContainerCreation(e.to_string()))?;
let uri = format!(
"postgres://{}:{}@{}:{}/{}",
user.unwrap_or("postgres".into()),
password.unwrap_or("postgres".into()),
container_ip,
container_port,
database.unwrap_or("postgres".into())
);
debug!("Default Postgres Connection URL: {}", uri.bright_blue());
super::add_container("postgres", container.id().to_string()).await;
Ok((container, uri))
}
}