camel-test 0.10.0

Testing utilities for rust-camel
Documentation
#![allow(dead_code)]

use std::time::Duration;

use testcontainers::{
    ContainerAsync, GenericImage, ImageExt,
    core::{ContainerPort, WaitFor},
    runners::AsyncRunner,
};
use tokio::net::TcpStream;
use tokio::sync::OnceCell;
use tokio::time::{Instant, sleep};

pub const ARTEMIS_PORT: u16 = 61616;

async fn wait_for_broker_port(port: u16, timeout: Duration) -> Result<(), String> {
    let deadline = Instant::now() + timeout;
    loop {
        match TcpStream::connect(("127.0.0.1", port)).await {
            Ok(stream) => {
                drop(stream);
                return Ok(());
            }
            Err(err) => {
                if Instant::now() >= deadline {
                    return Err(format!(
                        "Artemis broker port {port} did not become reachable within {:?}: {err}",
                        timeout
                    ));
                }
                sleep(Duration::from_millis(250)).await;
            }
        }
    }
}

/// Shared Artemis container — started once and reused across all tests.
/// Uses `ANONYMOUS_LOGIN=true` for broad compatibility.
static ARTEMIS: OnceCell<(ContainerAsync<GenericImage>, String)> = OnceCell::const_new();

/// Shared Artemis container with **mandatory auth** (no ANONYMOUS_LOGIN).
/// Used to test that the bridge authenticates correctly and that the health
/// check does not time out under real auth conditions.
static ARTEMIS_AUTH: OnceCell<(ContainerAsync<GenericImage>, String)> = OnceCell::const_new();

/// Artemis startup is relatively heavy and can be flaky when two containers
/// (anonymous + mandatory-auth variants) initialize at the same time.
/// Serialize startup to reduce CI nondeterminism while keeping test execution
/// itself parallel.
static ARTEMIS_STARTUP_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());

/// Return a reference to the shared Artemis container and its broker URL.
pub async fn shared_artemis() -> &'static (ContainerAsync<GenericImage>, String) {
    ARTEMIS
        .get_or_init(|| async {
            let _startup_guard = ARTEMIS_STARTUP_LOCK.lock().await;
            let image = GenericImage::new("apache/activemq-artemis", "2.36.0-alpine")
                .with_exposed_port(ContainerPort::Tcp(ARTEMIS_PORT))
                .with_wait_for(WaitFor::message_on_stdout("AMQ221020"))
                .with_env_var("ARTEMIS_USER", "artemis")
                .with_env_var("ARTEMIS_PASSWORD", "artemis")
                .with_env_var("ANONYMOUS_LOGIN", "true")
                .with_startup_timeout(Duration::from_secs(120));

            let container = image
                .start()
                .await
                .expect("Artemis container failed to start");
            let port = container
                .get_host_port_ipv4(ARTEMIS_PORT)
                .await
                .expect("Artemis port not available");
            wait_for_broker_port(port, Duration::from_secs(30))
                .await
                .expect("Artemis broker did not become reachable");

            let broker_url = format!("tcp://127.0.0.1:{port}");
            eprintln!("Artemis ready at: {broker_url}");
            (container, broker_url)
        })
        .await
}

/// Return a reference to an Artemis container with **mandatory auth**.
/// ANONYMOUS_LOGIN is intentionally absent — the broker rejects unauthenticated
/// connections. Credentials: artemis / artemis.
pub async fn shared_artemis_auth() -> &'static (ContainerAsync<GenericImage>, String) {
    ARTEMIS_AUTH
        .get_or_init(|| async {
            let _startup_guard = ARTEMIS_STARTUP_LOCK.lock().await;
            let image = GenericImage::new("apache/activemq-artemis", "2.36.0-alpine")
                .with_exposed_port(ContainerPort::Tcp(ARTEMIS_PORT))
                .with_wait_for(WaitFor::message_on_stdout("AMQ221020"))
                .with_env_var("ARTEMIS_USER", "artemis")
                .with_env_var("ARTEMIS_PASSWORD", "artemis")
                // ANONYMOUS_LOGIN intentionally not set → mandatory auth
                .with_startup_timeout(Duration::from_secs(120));

            let container = image
                .start()
                .await
                .expect("Artemis auth container failed to start");
            let port = container
                .get_host_port_ipv4(ARTEMIS_PORT)
                .await
                .expect("Artemis auth port not available");
            wait_for_broker_port(port, Duration::from_secs(30))
                .await
                .expect("Artemis auth broker did not become reachable");

            let broker_url = format!("tcp://127.0.0.1:{port}");
            eprintln!("Artemis (auth) ready at: {broker_url}");
            (container, broker_url)
        })
        .await
}