testcontainers_modules/kafka/
confluent.rs

1use std::{borrow::Cow, collections::HashMap};
2
3use testcontainers::{
4    core::{ContainerPort, ContainerState, ExecCommand, WaitFor},
5    Image,
6};
7
8const NAME: &str = "confluentinc/cp-kafka";
9const TAG: &str = "6.1.1";
10/// Port that the [`Kafka`] part of the container has internally
11/// Can be rebound externally via [`testcontainers::core::ImageExt::with_mapped_port`]
12///
13/// [`Kafka`]: https://kafka.apache.org/
14pub const KAFKA_PORT: ContainerPort = ContainerPort::Tcp(9093);
15/// Port that the [`Zookeeper`] part of the container has internally
16/// Can be rebound externally via [`testcontainers::core::ImageExt::with_mapped_port`]
17///
18/// [`Zookeeper`]: https://zookeeper.apache.org/
19pub const ZOOKEEPER_PORT: ContainerPort = ContainerPort::Tcp(2181);
20
21/// Module to work with [`Apache Kafka`] inside of tests.
22///
23/// Starts an instance of Kafka based on the official [`Confluent Kafka docker image`].
24///
25/// This module provides a local Kafka instance with an embedded Zookeeper for testing purposes.
26/// The container exposes port 9093 for Kafka ([`KAFKA_PORT`]) and port 2181 for Zookeeper
27/// ([`ZOOKEEPER_PORT`]) by default.
28///
29/// The Kafka instance is pre-configured with:
30/// - Single broker setup (broker ID: 1)
31/// - Embedded Zookeeper
32/// - PLAINTEXT security protocol
33/// - Auto-created topics with replication factor 1
34///
35/// # Example
36/// ```
37/// use testcontainers_modules::{kafka::Kafka, testcontainers::runners::SyncRunner};
38///
39/// let kafka_instance = Kafka::default().start().unwrap();
40/// let host = kafka_instance.get_host().unwrap();
41/// let port = kafka_instance.get_host_port_ipv4(9093).unwrap();
42///
43/// // Use the Kafka bootstrap server at {host}:{port}
44/// ```
45///
46/// [`Apache Kafka`]: https://kafka.apache.org/
47/// [`Confluent Kafka docker image`]: https://hub.docker.com/r/confluentinc/cp-kafka
48#[derive(Debug, Clone)]
49pub struct Kafka {
50    env_vars: HashMap<String, String>,
51}
52
53impl Default for Kafka {
54    fn default() -> Self {
55        let mut env_vars = HashMap::new();
56
57        env_vars.insert(
58            "KAFKA_ZOOKEEPER_CONNECT".to_owned(),
59            format!("localhost:{}", ZOOKEEPER_PORT.as_u16()),
60        );
61        env_vars.insert(
62            "KAFKA_LISTENERS".to_owned(),
63            format!(
64                "PLAINTEXT://0.0.0.0:{port},BROKER://0.0.0.0:9092",
65                port = KAFKA_PORT.as_u16(),
66            ),
67        );
68        env_vars.insert(
69            "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
70            "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT".to_owned(),
71        );
72        env_vars.insert(
73            "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(),
74            "BROKER".to_owned(),
75        );
76        env_vars.insert(
77            "KAFKA_ADVERTISED_LISTENERS".to_owned(),
78            format!(
79                "PLAINTEXT://localhost:{port},BROKER://localhost:9092",
80                port = KAFKA_PORT.as_u16(),
81            ),
82        );
83        env_vars.insert("KAFKA_BROKER_ID".to_owned(), "1".to_owned());
84        env_vars.insert(
85            "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(),
86            "1".to_owned(),
87        );
88
89        Self { env_vars }
90    }
91}
92
93impl Image for Kafka {
94    fn name(&self) -> &str {
95        NAME
96    }
97
98    fn tag(&self) -> &str {
99        TAG
100    }
101
102    fn ready_conditions(&self) -> Vec<WaitFor> {
103        vec![WaitFor::message_on_stdout("Creating new log file")]
104    }
105
106    fn env_vars(
107        &self,
108    ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
109        &self.env_vars
110    }
111
112    fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
113        vec![
114            "/bin/bash".to_owned(),
115            "-c".to_owned(),
116            format!(
117                r#"
118echo 'clientPort={ZOOKEEPER_PORT}' > zookeeper.properties;
119echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties;
120echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties;
121zookeeper-server-start zookeeper.properties &
122. /etc/confluent/docker/bash-config &&
123/etc/confluent/docker/configure &&
124/etc/confluent/docker/launch"#,
125                ZOOKEEPER_PORT = ZOOKEEPER_PORT.as_u16()
126            ),
127        ]
128    }
129
130    fn expose_ports(&self) -> &[ContainerPort] {
131        &[KAFKA_PORT]
132    }
133
134    fn exec_after_start(
135        &self,
136        cs: ContainerState,
137    ) -> Result<Vec<ExecCommand>, testcontainers::TestcontainersError> {
138        let mut commands = vec![];
139        let cmd = vec![
140            "kafka-configs".to_string(),
141            "--alter".to_string(),
142            "--bootstrap-server".to_string(),
143            "0.0.0.0:9092".to_string(),
144            "--entity-type".to_string(),
145            "brokers".to_string(),
146            "--entity-name".to_string(),
147            "1".to_string(),
148            "--add-config".to_string(),
149            format!(
150                "advertised.listeners=[PLAINTEXT://127.0.0.1:{},BROKER://localhost:9092]",
151                cs.host_port_ipv4(KAFKA_PORT)?
152            ),
153        ];
154        let ready_conditions = vec![WaitFor::message_on_stdout(
155            "Checking need to trigger auto leader balancing",
156        )];
157        commands.push(ExecCommand::new(cmd).with_container_ready_conditions(ready_conditions));
158        Ok(commands)
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use std::time::Duration;
165
166    use futures::StreamExt;
167    use rdkafka::{
168        consumer::{Consumer, StreamConsumer},
169        producer::{FutureProducer, FutureRecord},
170        ClientConfig, Message,
171    };
172    use testcontainers::runners::AsyncRunner;
173
174    use crate::kafka;
175
176    #[tokio::test]
177    async fn produce_and_consume_messages() -> Result<(), Box<dyn std::error::Error + 'static>> {
178        let _ = pretty_env_logger::try_init();
179        let kafka_node = kafka::Kafka::default().start().await?;
180
181        let bootstrap_servers = format!(
182            "127.0.0.1:{}",
183            kafka_node.get_host_port_ipv4(kafka::KAFKA_PORT).await?
184        );
185
186        let producer = ClientConfig::new()
187            .set("bootstrap.servers", &bootstrap_servers)
188            .set("message.timeout.ms", "5000")
189            .create::<FutureProducer>()
190            .expect("Failed to create Kafka FutureProducer");
191
192        let consumer = ClientConfig::new()
193            .set("group.id", "testcontainer-rs")
194            .set("bootstrap.servers", &bootstrap_servers)
195            .set("session.timeout.ms", "6000")
196            .set("enable.auto.commit", "false")
197            .set("auto.offset.reset", "earliest")
198            .create::<StreamConsumer>()
199            .expect("Failed to create Kafka StreamConsumer");
200
201        let topic = "test-topic";
202
203        let number_of_messages_to_produce = 5_usize;
204        let expected: Vec<String> = (0..number_of_messages_to_produce)
205            .map(|i| format!("Message {i}"))
206            .collect();
207
208        for (i, message) in expected.iter().enumerate() {
209            producer
210                .send(
211                    FutureRecord::to(topic)
212                        .payload(message)
213                        .key(&format!("Key {i}")),
214                    Duration::from_secs(0),
215                )
216                .await
217                .unwrap();
218        }
219
220        consumer
221            .subscribe(&[topic])
222            .expect("Failed to subscribe to a topic");
223
224        let mut message_stream = consumer.stream();
225        for produced in expected {
226            let borrowed_message =
227                tokio::time::timeout(Duration::from_secs(10), message_stream.next())
228                    .await
229                    .unwrap()
230                    .unwrap();
231
232            assert_eq!(
233                produced,
234                borrowed_message
235                    .unwrap()
236                    .payload_view::<str>()
237                    .unwrap()
238                    .unwrap()
239            );
240        }
241
242        Ok(())
243    }
244}