code0_flow/flow_queue/
connection.rs1use lapin::Connection;
2
3pub async fn build_connection(rabbitmq_url: &str) -> Connection {
4 match Connection::connect(rabbitmq_url, lapin::ConnectionProperties::default()).await {
5 Ok(env) => env,
6 Err(error) => {
7 log::error!(
8 "Cannot connect to FlowQueue (RabbitMQ) instance! Reason: {:?}",
9 error
10 );
11 panic!("Cannot connect to FlowQueue (RabbitMQ) instance!");
12 }
13 }
14}
15
16#[cfg(test)]
17mod tests {
18 use crate::flow_queue::connection::build_connection;
19 use testcontainers::GenericImage;
20 use testcontainers::core::{IntoContainerPort, WaitFor};
21 use testcontainers::runners::AsyncRunner;
22
23 macro_rules! rabbitmq_container_test {
24 ($test_name:ident, $consumer:expr) => {
25 #[tokio::test]
26 async fn $test_name() {
27 let port: u16 = 5672;
28 let image_name = "rabbitmq";
29 let wait_message = "Server startup complete";
30
31 let container = GenericImage::new(image_name, "latest")
32 .with_exposed_port(port.tcp())
33 .with_wait_for(WaitFor::message_on_stdout(wait_message))
34 .start()
35 .await
36 .unwrap();
37
38 let host_port = container.get_host_port_ipv4(port).await.unwrap();
39 let url = format!("amqp://guest:guest@localhost:{}", host_port);
40
41 $consumer(url).await;
42 }
43 };
44 }
45
46 rabbitmq_container_test!(
47 test_rabbitmq_startup,
48 (|url: String| async move {
49 println!("RabbitMQ started with the url: {}", url);
50 })
51 );
52
53 rabbitmq_container_test!(
54 test_rabbitmq_connection,
55 (|url: String| async move {
56 build_connection(&*url).await;
57 })
58 );
59}