code0_flow/flow_queue/
connection.rs1use lapin::Connection;
2
3pub async fn build_connection(rabbitmq_url: &str) -> Connection {
4 match Connection::connect(rabbitmq_url, lapin::ConnectionProperties::default()).await {
5 Ok(env) => env,
6 Err(error) => panic!(
7 "Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}",
8 error
9 ),
10 }
11}
12
13#[cfg(test)]
14mod tests {
15 use crate::flow_queue::connection::build_connection;
16 use testcontainers::core::{IntoContainerPort, WaitFor};
17 use testcontainers::runners::AsyncRunner;
18 use testcontainers::GenericImage;
19
20 macro_rules! rabbitmq_container_test {
21 ($test_name:ident, $consumer:expr) => {
22 #[tokio::test]
23 async fn $test_name() {
24 let port: u16 = 5672;
25 let image_name = "rabbitmq";
26 let wait_message = "Server startup complete";
27
28 let container = GenericImage::new(image_name, "latest")
29 .with_exposed_port(port.tcp())
30 .with_wait_for(WaitFor::message_on_stdout(wait_message))
31 .start()
32 .await
33 .unwrap();
34
35 let host_port = container.get_host_port_ipv4(port).await.unwrap();
36 let url = format!("amqp://guest:guest@localhost:{}", host_port);
37
38 $consumer(url).await;
39 }
40 };
41 }
42
43 rabbitmq_container_test!(
44 test_rabbitmq_startup,
45 (|url: String| async move {
46 println!("RabbitMQ started with the url: {}", url);
47 })
48 );
49
50 rabbitmq_container_test!(
51 test_rabbitmq_connection,
52 (|url: String| async move {
53 build_connection(&*url).await;
54 })
55 );
56}