testcontainers_modules/rabbitmq/
mod.rs

1use testcontainers::{core::WaitFor, Image};
2
3const NAME: &str = "rabbitmq";
4const TAG: &str = "3.8.22-management";
5
6/// Module to work with [`RabbitMQ`] inside of tests.
7///
8/// Starts an instance of RabbitMQ with the [`management-plugin`] started by default,
9/// so you are able to use the [`RabbitMQ Management HTTP API`] to manage the configuration if the started [`RabbitMQ`] instance at test runtime.
10///
11/// This module is based on the official [`RabbitMQ docker image`].
12///
13/// # Example
14/// ```
15/// use testcontainers_modules::{rabbitmq, testcontainers::runners::SyncRunner};
16///
17/// let rabbitmq_instance = rabbitmq::RabbitMq::default().start().unwrap();
18///
19/// let amqp_url = format!(
20///     "amqp://{}:{}",
21///     rabbitmq_instance.get_host().unwrap(),
22///     rabbitmq_instance.get_host_port_ipv4(5672).unwrap()
23/// );
24///
25/// // do something with the started rabbitmq instance..
26/// ```
27///
28/// [`RabbitMQ`]: https://www.rabbitmq.com/
29/// [`management-plugin`]: https://www.rabbitmq.com/management.html
30/// [`RabbitMQ Management HTTP API`]: https://www.rabbitmq.com/management.html#http-api
31/// [`RabbitMQ docker image`]: https://hub.docker.com/_/rabbitmq
32#[derive(Debug, Default, Clone)]
33pub struct RabbitMq {
34    /// (remove if there is another variable)
35    /// Field is included to prevent this struct to be a unit struct.
36    /// This allows extending functionality (and thus further variables) without breaking changes
37    _priv: (),
38}
39
40impl Image for RabbitMq {
41    fn name(&self) -> &str {
42        NAME
43    }
44
45    fn tag(&self) -> &str {
46        TAG
47    }
48
49    fn ready_conditions(&self) -> Vec<WaitFor> {
50        vec![WaitFor::message_on_stdout(
51            "Server startup complete; 4 plugins started.",
52        )]
53    }
54}
55
56#[cfg(test)]
57mod tests {
58    use std::time::Duration;
59
60    use futures::StreamExt;
61    use lapin::{
62        options::{
63            BasicConsumeOptions, BasicPublishOptions, ExchangeDeclareOptions, QueueBindOptions,
64            QueueDeclareOptions,
65        },
66        types::FieldTable,
67        BasicProperties, Connection, ConnectionProperties, ExchangeKind,
68    };
69
70    use crate::{rabbitmq, testcontainers::runners::AsyncRunner};
71
72    #[tokio::test]
73    async fn rabbitmq_produce_and_consume_messages(
74    ) -> Result<(), Box<dyn std::error::Error + 'static>> {
75        let _ = pretty_env_logger::try_init();
76        let rabbit_node = rabbitmq::RabbitMq::default().start().await?;
77
78        let amqp_url = format!(
79            "amqp://{}:{}",
80            rabbit_node.get_host().await?,
81            rabbit_node.get_host_port_ipv4(5672).await?
82        );
83
84        let options = ConnectionProperties::default();
85        let connection = Connection::connect(amqp_url.as_str(), options)
86            .await
87            .unwrap();
88
89        let channel = connection.create_channel().await.unwrap();
90
91        assert!(channel.status().connected());
92
93        channel
94            .exchange_declare(
95                "test_exchange",
96                ExchangeKind::Topic,
97                ExchangeDeclareOptions::default(),
98                FieldTable::default(),
99            )
100            .await
101            .unwrap();
102
103        let queue = channel
104            .queue_declare(
105                "test_queue",
106                QueueDeclareOptions::default(),
107                FieldTable::default(),
108            )
109            .await
110            .unwrap();
111
112        channel
113            .queue_bind(
114                queue.name().as_str(),
115                "test_exchange",
116                "#",
117                QueueBindOptions::default(),
118                FieldTable::default(),
119            )
120            .await
121            .unwrap();
122
123        let mut consumer = channel
124            .basic_consume(
125                queue.name().as_str(),
126                "test_consumer_tag",
127                BasicConsumeOptions::default(),
128                FieldTable::default(),
129            )
130            .await
131            .unwrap();
132
133        channel
134            .basic_publish(
135                "test_exchange",
136                "routing-key",
137                BasicPublishOptions::default(),
138                b"Test Payload",
139                BasicProperties::default(),
140            )
141            .await
142            .unwrap();
143
144        let consumed = tokio::time::timeout(Duration::from_secs(10), consumer.next())
145            .await
146            .unwrap()
147            .unwrap();
148
149        let delivery = consumed.expect("Failed to consume delivery!");
150        assert_eq!(
151            String::from_utf8(delivery.data.clone()).unwrap(),
152            "Test Payload"
153        );
154        assert_eq!(delivery.exchange.as_str(), "test_exchange");
155        assert_eq!(delivery.routing_key.as_str(), "routing-key");
156        Ok(())
157    }
158}