testcontainers_modules/rabbitmq/
mod.rs1use testcontainers::{core::WaitFor, Image};
2
3const NAME: &str = "rabbitmq";
4const TAG: &str = "3.8.22-management";
5
6#[derive(Debug, Default, Clone)]
33pub struct RabbitMq {
34 _priv: (),
38}
39
40impl Image for RabbitMq {
41 fn name(&self) -> &str {
42 NAME
43 }
44
45 fn tag(&self) -> &str {
46 TAG
47 }
48
49 fn ready_conditions(&self) -> Vec<WaitFor> {
50 vec![WaitFor::message_on_stdout(
51 "Server startup complete; 4 plugins started.",
52 )]
53 }
54}
55
56#[cfg(test)]
57mod tests {
58 use std::time::Duration;
59
60 use futures::StreamExt;
61 use lapin::{
62 options::{
63 BasicConsumeOptions, BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions,
64 QueueDeclareOptions,
65 },
66 types::FieldTable,
67 BasicProperties, Connection, ConnectionProperties, ExchangeKind,
68 };
69
70 use crate::{rabbitmq, testcontainers::runners::AsyncRunner};
71
72 #[tokio::test]
73 async fn rabbitmq_produce_and_consume_messages(
74 ) -> Result<(), Box<dyn std::error::Error + 'static>> {
75 let _ = pretty_env_logger::try_init();
76 let rabbit_node = rabbitmq::RabbitMq::default().start().await?;
77
78 let amqp_url = format!(
79 "amqp://{}:{}",
80 rabbit_node.get_host().await?,
81 rabbit_node.get_host_port_ipv4(5672).await?
82 );
83
84 let options = ConnectionProperties::default();
85 let connection = Connection::connect(amqp_url.as_str(), options)
86 .await
87 .unwrap();
88
89 let channel = connection.create_channel().await.unwrap();
90
91 assert!(channel.status().connected());
92
93 channel
94 .exchange_declare(
95 "test_exchange",
96 ExchangeKind::Topic,
97 ExchangeDeclareOptions::default(),
98 FieldTable::default(),
99 )
100 .await
101 .unwrap();
102
103 let queue = channel
104 .queue_declare(
105 "test_queue",
106 QueueDeclareOptions::default(),
107 FieldTable::default(),
108 )
109 .await
110 .unwrap();
111
112 channel
113 .queue_bind(
114 queue.name().as_str(),
115 "test_exchange",
116 "#",
117 QueueBindOptions::default(),
118 FieldTable::default(),
119 )
120 .await
121 .unwrap();
122
123 let mut consumer = channel
124 .basic_consume(
125 queue.name().as_str(),
126 "test_consumer_tag",
127 BasicConsumeOptions::default(),
128 FieldTable::default(),
129 )
130 .await
131 .unwrap();
132
133 channel
134 .basic_publish(
135 "test_exchange",
136 "routing-key",
137 BasicPublishOptions::default(),
138 b"Test Payload",
139 BasicProperties::default(),
140 )
141 .await
142 .unwrap();
143
144 let consumed = tokio::time::timeout(Duration::from_secs(10), consumer.next())
145 .await
146 .unwrap()
147 .unwrap();
148
149 let delivery = consumed.expect("Failed to consume delivery!");
150 assert_eq!(
151 String::from_utf8(delivery.data.clone()).unwrap(),
152 "Test Payload"
153 );
154 assert_eq!(delivery.exchange.as_str(), "test_exchange");
155 assert_eq!(delivery.routing_key.as_str(), "routing-key");
156 Ok(())
157 }
158}