code0_flow/flow_queue/
connection.rs

1use lapin::Connection;
2
3pub async fn build_connection(rabbitmq_url: &str) -> Connection {
4    match Connection::connect(rabbitmq_url, lapin::ConnectionProperties::default()).await {
5        Ok(env) => env,
6        Err(error) => {
7            log::error!(
8                "Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}",
9                error
10            );
11            panic!("Cannot connect to FlowQueue (RabbitMQ) instance!");
12        }
13    }
14}
15
16#[cfg(test)]
17mod tests {
18    use crate::flow_queue::connection::build_connection;
19    use testcontainers::GenericImage;
20    use testcontainers::core::{IntoContainerPort, WaitFor};
21    use testcontainers::runners::AsyncRunner;
22
23    macro_rules! rabbitmq_container_test {
24        ($test_name:ident, $consumer:expr) => {
25            #[tokio::test]
26            async fn $test_name() {
27                let port: u16 = 5672;
28                let image_name = "rabbitmq";
29                let wait_message = "Server startup complete";
30
31                let container = GenericImage::new(image_name, "latest")
32                    .with_exposed_port(port.tcp())
33                    .with_wait_for(WaitFor::message_on_stdout(wait_message))
34                    .start()
35                    .await
36                    .unwrap();
37
38                let host_port = container.get_host_port_ipv4(port).await.unwrap();
39                let url = format!("amqp://guest:guest@localhost:{}", host_port);
40
41                $consumer(url).await;
42            }
43        };
44    }
45
46    rabbitmq_container_test!(
47        test_rabbitmq_startup,
48        (|url: String| async move {
49            println!("RabbitMQ started with the url: {}", url);
50        })
51    );
52
53    rabbitmq_container_test!(
54        test_rabbitmq_connection,
55        (|url: String| async move {
56            build_connection(&*url).await;
57        })
58    );
59}