testcontainers_modules/kafka/
confluent.rs

1use std::{borrow::Cow, collections::HashMap};
2
3use testcontainers::{
4    core::{ContainerPort, ContainerState, ExecCommand, WaitFor},
5    Image,
6};
7
8const NAME: &str = "confluentinc/cp-kafka";
9const TAG: &str = "6.1.1";
10/// Port that the [`Kafka`] part of the container has internally
11/// Can be rebound externally via [`testcontainers::core::ImageExt::with_mapped_port`]
12///
13/// [`Kafka`]: https://kafka.apache.org/
14pub const KAFKA_PORT: ContainerPort = ContainerPort::Tcp(9093);
15/// Port that the [`Zookeeper`] part of the container has internally
16/// Can be rebound externally via [`testcontainers::core::ImageExt::with_mapped_port`]
17///
18/// [`Zookeeper`]: https://zookeeper.apache.org/
19pub const ZOOKEEPER_PORT: ContainerPort = ContainerPort::Tcp(2181);
20
21/// The default Replication Factor to use.
22pub const DEFAULT_INTERNAL_TOPIC_RF: usize = 1;
23
24/// The default cluster id to use.
25pub const DEFAULT_CLUSTER_ID: &str = "5L6g3nShT-eMCtK--X86sw";
26
27/// The default broker id.
28pub const DEFAULT_BROKER_ID: usize = 1;
29
30/// Module to work with [`Apache Kafka`] inside of tests.
31///
32/// Starts an instance of Kafka based on the official [`Confluent Kafka docker image`].
33///
34/// This module provides a local Kafka instance with an embedded Zookeeper for testing purposes.
35/// The container exposes port 9093 for Kafka ([`KAFKA_PORT`]) and port 2181 for Zookeeper
36/// ([`ZOOKEEPER_PORT`]) by default.
37///
38/// The Kafka instance is pre-configured with:
39/// - Single broker setup (broker ID: 1)
40/// - Embedded Zookeeper
41/// - PLAINTEXT security protocol
42/// - Auto-created topics with replication factor 1
43///
44/// # Example
45/// ```
46/// use testcontainers_modules::{kafka::Kafka, testcontainers::runners::SyncRunner};
47///
48/// let kafka_instance = Kafka::default().start().unwrap();
49/// let host = kafka_instance.get_host().unwrap();
50/// let port = kafka_instance.get_host_port_ipv4(9093).unwrap();
51///
52/// // Use the Kafka bootstrap server at {host}:{port}
53/// ```
54///
55/// [`Apache Kafka`]: https://kafka.apache.org/
56/// [`Confluent Kafka docker image`]: https://hub.docker.com/r/confluentinc/cp-kafka
57#[derive(Debug, Clone)]
58pub struct Kafka {
59    env_vars: HashMap<String, String>,
60}
61
62impl Default for Kafka {
63    fn default() -> Self {
64        let mut env_vars = HashMap::new();
65
66        env_vars.insert(
67            "KAFKA_ZOOKEEPER_CONNECT".to_owned(),
68            format!("localhost:{}", ZOOKEEPER_PORT.as_u16()),
69        );
70        env_vars.insert(
71            "KAFKA_LISTENERS".to_owned(),
72            format!(
73                "PLAINTEXT://0.0.0.0:{port},BROKER://0.0.0.0:9092",
74                port = KAFKA_PORT.as_u16(),
75            ),
76        );
77        env_vars.insert(
78            "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
79            "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT".to_owned(),
80        );
81        env_vars.insert(
82            "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(),
83            "BROKER".to_owned(),
84        );
85        env_vars.insert(
86            "KAFKA_ADVERTISED_LISTENERS".to_owned(),
87            format!(
88                "PLAINTEXT://localhost:{port},BROKER://localhost:9092",
89                port = KAFKA_PORT.as_u16(),
90            ),
91        );
92        env_vars.insert("KAFKA_BROKER_ID".to_owned(), DEFAULT_BROKER_ID.to_string());
93        env_vars.insert(
94            "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(),
95            DEFAULT_INTERNAL_TOPIC_RF.to_string(),
96        );
97        env_vars.insert("CLUSTER_ID".to_owned(), DEFAULT_CLUSTER_ID.to_owned());
98
99        Self { env_vars }
100    }
101}
102
103impl Image for Kafka {
104    fn name(&self) -> &str {
105        NAME
106    }
107
108    fn tag(&self) -> &str {
109        TAG
110    }
111
112    fn ready_conditions(&self) -> Vec<WaitFor> {
113        vec![WaitFor::message_on_stdout("Creating new log file")]
114    }
115
116    fn env_vars(
117        &self,
118    ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
119        &self.env_vars
120    }
121
122    fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
123        vec![
124            "/bin/bash".to_owned(),
125            "-c".to_owned(),
126            format!(
127                r#"
128echo 'clientPort={ZOOKEEPER_PORT}' > zookeeper.properties;
129echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties;
130echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties;
131zookeeper-server-start zookeeper.properties &
132. /etc/confluent/docker/bash-config &&
133/etc/confluent/docker/configure &&
134/etc/confluent/docker/launch"#,
135                ZOOKEEPER_PORT = ZOOKEEPER_PORT.as_u16()
136            ),
137        ]
138    }
139
140    fn expose_ports(&self) -> &[ContainerPort] {
141        &[KAFKA_PORT]
142    }
143
144    fn exec_after_start(
145        &self,
146        cs: ContainerState,
147    ) -> Result<Vec<ExecCommand>, testcontainers::TestcontainersError> {
148        let mut commands = vec![];
149        let cmd = vec![
150            "kafka-configs".to_string(),
151            "--alter".to_string(),
152            "--bootstrap-server".to_string(),
153            "0.0.0.0:9092".to_string(),
154            "--entity-type".to_string(),
155            "brokers".to_string(),
156            "--entity-name".to_string(),
157            "1".to_string(),
158            "--add-config".to_string(),
159            format!(
160                "advertised.listeners=[PLAINTEXT://127.0.0.1:{},BROKER://localhost:9092]",
161                cs.host_port_ipv4(KAFKA_PORT)?
162            ),
163        ];
164        let ready_conditions = vec![WaitFor::message_on_stdout(
165            "Checking need to trigger auto leader balancing",
166        )];
167        commands.push(ExecCommand::new(cmd).with_container_ready_conditions(ready_conditions));
168        Ok(commands)
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use std::time::Duration;
175
176    use futures::StreamExt;
177    use rdkafka::{
178        consumer::{Consumer, StreamConsumer},
179        producer::{FutureProducer, FutureRecord},
180        ClientConfig, Message,
181    };
182    use testcontainers::runners::AsyncRunner;
183
184    use crate::kafka;
185
186    #[tokio::test]
187    async fn produce_and_consume_messages() -> Result<(), Box<dyn std::error::Error + 'static>> {
188        let _ = pretty_env_logger::try_init();
189        let kafka_node = kafka::Kafka::default().start().await?;
190
191        let bootstrap_servers = format!(
192            "127.0.0.1:{}",
193            kafka_node.get_host_port_ipv4(kafka::KAFKA_PORT).await?
194        );
195
196        let producer = ClientConfig::new()
197            .set("bootstrap.servers", &bootstrap_servers)
198            .set("message.timeout.ms", "5000")
199            .create::<FutureProducer>()
200            .expect("Failed to create Kafka FutureProducer");
201
202        let consumer = ClientConfig::new()
203            .set("group.id", "testcontainer-rs")
204            .set("bootstrap.servers", &bootstrap_servers)
205            .set("session.timeout.ms", "6000")
206            .set("enable.auto.commit", "false")
207            .set("auto.offset.reset", "earliest")
208            .create::<StreamConsumer>()
209            .expect("Failed to create Kafka StreamConsumer");
210
211        let topic = "test-topic";
212
213        let number_of_messages_to_produce = 5_usize;
214        let expected: Vec<String> = (0..number_of_messages_to_produce)
215            .map(|i| format!("Message {i}"))
216            .collect();
217
218        for (i, message) in expected.iter().enumerate() {
219            producer
220                .send(
221                    FutureRecord::to(topic)
222                        .payload(message)
223                        .key(&format!("Key {i}")),
224                    Duration::from_secs(0),
225                )
226                .await
227                .unwrap();
228        }
229
230        consumer
231            .subscribe(&[topic])
232            .expect("Failed to subscribe to a topic");
233
234        let mut message_stream = consumer.stream();
235        for produced in expected {
236            let borrowed_message =
237                tokio::time::timeout(Duration::from_secs(10), message_stream.next())
238                    .await
239                    .unwrap()
240                    .unwrap();
241
242            assert_eq!(
243                produced,
244                borrowed_message
245                    .unwrap()
246                    .payload_view::<str>()
247                    .unwrap()
248                    .unwrap()
249            );
250        }
251
252        Ok(())
253    }
254}