#![allow(dead_code)]
use std::time::Duration;
use testcontainers::{
ContainerAsync, GenericImage, ImageExt,
core::{ContainerPort, WaitFor},
runners::AsyncRunner,
};
use tokio::net::TcpStream;
use tokio::sync::OnceCell;
use tokio::time::{Instant, sleep};
pub const ARTEMIS_PORT: u16 = 61616;
async fn wait_for_broker_port(port: u16, timeout: Duration) -> Result<(), String> {
let deadline = Instant::now() + timeout;
loop {
match TcpStream::connect(("127.0.0.1", port)).await {
Ok(stream) => {
drop(stream);
return Ok(());
}
Err(err) => {
if Instant::now() >= deadline {
return Err(format!(
"Artemis broker port {port} did not become reachable within {:?}: {err}",
timeout
));
}
sleep(Duration::from_millis(250)).await;
}
}
}
}
static ARTEMIS: OnceCell<(ContainerAsync<GenericImage>, String)> = OnceCell::const_new();
static ARTEMIS_AUTH: OnceCell<(ContainerAsync<GenericImage>, String)> = OnceCell::const_new();
static ARTEMIS_STARTUP_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
pub async fn shared_artemis() -> &'static (ContainerAsync<GenericImage>, String) {
ARTEMIS
.get_or_init(|| async {
let _startup_guard = ARTEMIS_STARTUP_LOCK.lock().await;
let image = GenericImage::new("apache/activemq-artemis", "2.36.0-alpine")
.with_exposed_port(ContainerPort::Tcp(ARTEMIS_PORT))
.with_wait_for(WaitFor::message_on_stdout("AMQ221020"))
.with_env_var("ARTEMIS_USER", "artemis")
.with_env_var("ARTEMIS_PASSWORD", "artemis")
.with_env_var("ANONYMOUS_LOGIN", "true")
.with_startup_timeout(Duration::from_secs(120));
let container = image
.start()
.await
.expect("Artemis container failed to start");
let port = container
.get_host_port_ipv4(ARTEMIS_PORT)
.await
.expect("Artemis port not available");
wait_for_broker_port(port, Duration::from_secs(30))
.await
.expect("Artemis broker did not become reachable");
let broker_url = format!("tcp://127.0.0.1:{port}");
eprintln!("Artemis ready at: {broker_url}");
(container, broker_url)
})
.await
}
pub async fn shared_artemis_auth() -> &'static (ContainerAsync<GenericImage>, String) {
ARTEMIS_AUTH
.get_or_init(|| async {
let _startup_guard = ARTEMIS_STARTUP_LOCK.lock().await;
let image = GenericImage::new("apache/activemq-artemis", "2.36.0-alpine")
.with_exposed_port(ContainerPort::Tcp(ARTEMIS_PORT))
.with_wait_for(WaitFor::message_on_stdout("AMQ221020"))
.with_env_var("ARTEMIS_USER", "artemis")
.with_env_var("ARTEMIS_PASSWORD", "artemis")
.with_startup_timeout(Duration::from_secs(120));
let container = image
.start()
.await
.expect("Artemis auth container failed to start");
let port = container
.get_host_port_ipv4(ARTEMIS_PORT)
.await
.expect("Artemis auth port not available");
wait_for_broker_port(port, Duration::from_secs(30))
.await
.expect("Artemis auth broker did not become reachable");
let broker_url = format!("tcp://127.0.0.1:{port}");
eprintln!("Artemis (auth) ready at: {broker_url}");
(container, broker_url)
})
.await
}