code0_flow/flow_queue/
connection.rs

1use lapin::Connection;
2
3pub async fn build_connection(rabbitmq_url: &str) -> Connection {
4    match Connection::connect(rabbitmq_url, lapin::ConnectionProperties::default()).await {
5        Ok(env) => env,
6        Err(error) => panic!(
7            "Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}",
8            error
9        ),
10    }
11}
12
13#[cfg(test)]
14mod tests {
15    use crate::flow_queue::connection::build_connection;
16    use testcontainers::core::{IntoContainerPort, WaitFor};
17    use testcontainers::runners::AsyncRunner;
18    use testcontainers::GenericImage;
19
20    macro_rules! rabbitmq_container_test {
21        ($test_name:ident, $consumer:expr) => {
22            #[tokio::test]
23            async fn $test_name() {
24                let port: u16 = 5672;
25                let image_name = "rabbitmq";
26                let wait_message = "Server startup complete";
27
28                let container = GenericImage::new(image_name, "latest")
29                    .with_exposed_port(port.tcp())
30                    .with_wait_for(WaitFor::message_on_stdout(wait_message))
31                    .start()
32                    .await
33                    .unwrap();
34
35                let host_port = container.get_host_port_ipv4(port).await.unwrap();
36                let url = format!("amqp://guest:guest@localhost:{}", host_port);
37
38                $consumer(url).await;
39            }
40        };
41    }
42
43    rabbitmq_container_test!(
44        test_rabbitmq_startup,
45        (|url: String| async move {
46            println!("RabbitMQ started with the url: {}", url);
47        })
48    );
49
50    rabbitmq_container_test!(
51        test_rabbitmq_connection,
52        (|url: String| async move {
53            build_connection(&*url).await;
54        })
55    );
56}