testcontainers_modules/kafka/
apache.rs

1use std::{borrow::Cow, collections::HashMap};
2
3use testcontainers::{
4    core::{ContainerPort, ContainerState, ExecCommand, WaitFor},
5    Image,
6};
7
8const KAFKA_NATIVE_IMAGE_NAME: &str = "apache/kafka-native";
9const KAFKA_IMAGE_NAME: &str = "apache/kafka";
10const TAG: &str = "3.8.0";
11
12/// Port that [`Apache Kafka`] uses internally.
13/// Can be rebound externally via [`testcontainers::core::ImageExt::with_mapped_port`]
14///
15/// [`Apache Kafka`]: https://kafka.apache.org/
16pub const KAFKA_PORT: ContainerPort = ContainerPort::Tcp(9092);
17
18const START_SCRIPT: &str = "/opt/kafka/testcontainers_start.sh";
19const DEFAULT_INTERNAL_TOPIC_RF: usize = 1;
20const DEFAULT_CLUSTER_ID: &str = "5L6g3nShT-eMCtK--X86sw";
21const DEFAULT_BROKER_ID: usize = 1;
22
23/// Module to work with [`Apache Kafka`] broker
24///
25/// Starts an instance of Apache Kafka broker, with Apache Kafka Raft (KRaft) is the consensus protocol
26/// enabled.
27///
28/// This module is based on the official [`Apache Kafka docker image`](https://hub.docker.com/r/apache/kafka)
29///
30/// Module comes in two flavours:
31///
32/// - [`Apache Kafka GraalVM docker image`](https://hub.docker.com/r/apache/kafka-native), which is default as it provides faster startup and lower memory consumption.
33/// - [`Apache Kafka JVM docker image`](https://hub.docker.com/r/apache/kafka)
34///
35/// # Example
36/// ```
37/// use testcontainers_modules::{kafka::apache, testcontainers::runners::SyncRunner};
38/// let kafka_node = apache::Kafka::default().start().unwrap();
39/// // connect to kafka server to send/receive messages
40/// ```
41///
42/// [`Apache Kafka`]: https://kafka.apache.org/
43#[derive(Debug, Clone)]
44pub struct Kafka {
45    env_vars: HashMap<String, String>,
46    image_name: String,
47}
48
49impl Default for Kafka {
50    fn default() -> Self {
51        let mut env_vars = HashMap::new();
52        env_vars.insert(
53            "KAFKA_LISTENERS".to_owned(),
54            format!(
55                "PLAINTEXT://0.0.0.0:{},BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094",
56                KAFKA_PORT.as_u16()
57            ),
58        );
59        env_vars.insert("CLUSTER_ID".to_owned(), DEFAULT_CLUSTER_ID.to_owned());
60        env_vars.insert(
61            "KAFKA_PROCESS_ROLES".to_owned(),
62            "broker,controller".to_owned(),
63        );
64
65        env_vars.insert(
66            "KAFKA_CONTROLLER_LISTENER_NAMES".to_owned(),
67            "CONTROLLER".to_owned(),
68        );
69        env_vars.insert(
70            "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
71            "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT".to_owned(),
72        );
73        env_vars.insert(
74            "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(),
75            "BROKER".to_owned(),
76        );
77        env_vars.insert(
78            "KAFKA_ADVERTISED_LISTENERS".to_owned(),
79            format!(
80                "PLAINTEXT://localhost:{},BROKER://localhost:9092",
81                KAFKA_PORT.as_u16()
82            ),
83        );
84        env_vars.insert("KAFKA_BROKER_ID".to_owned(), DEFAULT_BROKER_ID.to_string());
85        env_vars.insert(
86            "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(),
87            DEFAULT_INTERNAL_TOPIC_RF.to_string(),
88        );
89        env_vars.insert(
90            "KAFKA_CONTROLLER_QUORUM_VOTERS".to_owned(),
91            format!("{DEFAULT_BROKER_ID}@localhost:9094").to_owned(),
92        );
93
94        Self {
95            env_vars,
96            image_name: KAFKA_NATIVE_IMAGE_NAME.to_string(),
97        }
98    }
99}
100
101impl Kafka {
102    /// Switches default image to `apache/kafka` instead of `apache/kafka-native`
103    pub fn with_jvm_image(mut self) -> Self {
104        self.image_name = KAFKA_IMAGE_NAME.to_string();
105
106        self
107    }
108}
109
110impl Image for Kafka {
111    fn name(&self) -> &str {
112        self.image_name.as_str()
113    }
114
115    fn tag(&self) -> &str {
116        TAG
117    }
118
119    fn ready_conditions(&self) -> Vec<WaitFor> {
120        // container will be started with custom command which will wait
121        // for a start script to be created in `exec_after_start`,
122        // thus container needs to progress to `exec_after_start`
123        //
124        // actual wait for `ready_conditions` is be done in `exec_after_start`
125        vec![]
126    }
127
128    fn entrypoint(&self) -> Option<&str> {
129        Some("bash")
130    }
131
132    fn env_vars(
133        &self,
134    ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
135        &self.env_vars
136    }
137
138    fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
139        // command starts a while (wait) loop until start script is created.
140        // start script configures kafka with exposed port as is not
141        // available at container creation,
142        //
143        // start script creation is performed in `exec_after_start`
144        vec![
145            "-c".to_string(),
146            format!("while [ ! -f {START_SCRIPT}  ]; do sleep 0.1; done; chmod 755 {START_SCRIPT} && {START_SCRIPT}"),
147        ]
148        .into_iter()
149    }
150
151    fn expose_ports(&self) -> &[ContainerPort] {
152        &[KAFKA_PORT]
153    }
154
155    fn exec_after_start(
156        &self,
157        cs: ContainerState,
158    ) -> Result<Vec<ExecCommand>, testcontainers::TestcontainersError> {
159        let mut commands = vec![];
160        // with container running, port which will accept kafka connections is known
161        // so we can proceed with creating a script which starts kafka broker
162        // with correct port configuration.
163        //
164        // note: scrip will actually be executed by wait process started in `cmd`
165        let cmd = vec![
166            "sh".to_string(),
167            "-c".to_string(),
168            format!(
169                "echo '#!/usr/bin/env bash\nexport KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:{},BROKER://localhost:9093\n/etc/kafka/docker/run \n' > {}",
170                cs.host_port_ipv4(KAFKA_PORT)?,
171                START_SCRIPT
172            ),
173        ];
174        let ready_conditions = vec![WaitFor::message_on_stdout("Kafka Server started")];
175        // as start script will be executed by `cmd` process we need to look
176        // for the message in container log, not script output.
177        commands.push(ExecCommand::new(cmd).with_container_ready_conditions(ready_conditions));
178
179        Ok(commands)
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use std::time::Duration;
186
187    use futures::StreamExt;
188    use rdkafka::{
189        consumer::{Consumer, StreamConsumer},
190        producer::{FutureProducer, FutureRecord},
191        ClientConfig, Message,
192    };
193    use testcontainers::runners::AsyncRunner;
194
195    use crate::kafka::apache;
196
197    #[tokio::test]
198    async fn produce_and_consume_messages_graalvm(
199    ) -> Result<(), Box<dyn std::error::Error + 'static>> {
200        let _ = pretty_env_logger::try_init();
201        let kafka_node = apache::Kafka::default().start().await?;
202
203        let bootstrap_servers = format!(
204            "127.0.0.1:{}",
205            kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await?
206        );
207
208        let producer = ClientConfig::new()
209            .set("bootstrap.servers", &bootstrap_servers)
210            .set("message.timeout.ms", "5000")
211            .create::<FutureProducer>()
212            .expect("Failed to create Kafka FutureProducer");
213
214        let consumer = ClientConfig::new()
215            .set("group.id", "testcontainer-rs")
216            .set("bootstrap.servers", &bootstrap_servers)
217            .set("session.timeout.ms", "6000")
218            .set("enable.auto.commit", "false")
219            .set("auto.offset.reset", "earliest")
220            .create::<StreamConsumer>()
221            .expect("Failed to create Kafka StreamConsumer");
222
223        let topic = "test-topic";
224
225        let number_of_messages_to_produce = 5_usize;
226        let expected: Vec<String> = (0..number_of_messages_to_produce)
227            .map(|i| format!("Message {i}"))
228            .collect();
229
230        for (i, message) in expected.iter().enumerate() {
231            producer
232                .send(
233                    FutureRecord::to(topic)
234                        .payload(message)
235                        .key(&format!("Key {i}")),
236                    Duration::from_secs(0),
237                )
238                .await
239                .unwrap();
240        }
241
242        consumer
243            .subscribe(&[topic])
244            .expect("Failed to subscribe to a topic");
245
246        let mut message_stream = consumer.stream();
247        for produced in expected {
248            let borrowed_message =
249                tokio::time::timeout(Duration::from_secs(10), message_stream.next())
250                    .await
251                    .unwrap()
252                    .unwrap();
253
254            assert_eq!(
255                produced,
256                borrowed_message
257                    .unwrap()
258                    .payload_view::<str>()
259                    .unwrap()
260                    .unwrap()
261            );
262        }
263
264        Ok(())
265    }
266
267    #[tokio::test]
268    async fn produce_and_consume_messages_jvm() -> Result<(), Box<dyn std::error::Error + 'static>>
269    {
270        let _ = pretty_env_logger::try_init();
271        let kafka_node = apache::Kafka::default().with_jvm_image().start().await?;
272
273        let bootstrap_servers = format!(
274            "127.0.0.1:{}",
275            kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await?
276        );
277
278        let producer = ClientConfig::new()
279            .set("bootstrap.servers", &bootstrap_servers)
280            .set("message.timeout.ms", "5000")
281            .create::<FutureProducer>()
282            .expect("Failed to create Kafka FutureProducer");
283
284        let consumer = ClientConfig::new()
285            .set("group.id", "testcontainer-rs")
286            .set("bootstrap.servers", &bootstrap_servers)
287            .set("session.timeout.ms", "6000")
288            .set("enable.auto.commit", "false")
289            .set("auto.offset.reset", "earliest")
290            .create::<StreamConsumer>()
291            .expect("Failed to create Kafka StreamConsumer");
292
293        let topic = "test-topic";
294
295        let number_of_messages_to_produce = 5_usize;
296        let expected: Vec<String> = (0..number_of_messages_to_produce)
297            .map(|i| format!("Message {i}"))
298            .collect();
299
300        for (i, message) in expected.iter().enumerate() {
301            producer
302                .send(
303                    FutureRecord::to(topic)
304                        .payload(message)
305                        .key(&format!("Key {i}")),
306                    Duration::from_secs(0),
307                )
308                .await
309                .unwrap();
310        }
311
312        consumer
313            .subscribe(&[topic])
314            .expect("Failed to subscribe to a topic");
315
316        let mut message_stream = consumer.stream();
317        for produced in expected {
318            let borrowed_message =
319                tokio::time::timeout(Duration::from_secs(10), message_stream.next())
320                    .await
321                    .unwrap()
322                    .unwrap();
323
324            assert_eq!(
325                produced,
326                borrowed_message
327                    .unwrap()
328                    .payload_view::<str>()
329                    .unwrap()
330                    .unwrap()
331            );
332        }
333
334        Ok(())
335    }
336}