testcontainers_modules/kafka/
apache.rs

1use std::{borrow::Cow, collections::HashMap};
2
3use testcontainers::{
4    core::{ContainerPort, ContainerState, ExecCommand, WaitFor},
5    Image,
6};
7
8const KAFKA_NATIVE_IMAGE_NAME: &str = "apache/kafka-native";
9const KAFKA_IMAGE_NAME: &str = "apache/kafka";
10const TAG: &str = "3.8.0";
11
12/// Port that [`Apache Kafka`] uses internally.
13/// Can be rebound externally via [`testcontainers::core::ImageExt::with_mapped_port`]
14///
15/// [`Apache Kafka`]: https://kafka.apache.org/
16pub const KAFKA_PORT: ContainerPort = ContainerPort::Tcp(9092);
17
18const START_SCRIPT: &str = "/opt/kafka/testcontainers_start.sh";
19
20/// The default Replication Factor to use.
21pub const DEFAULT_INTERNAL_TOPIC_RF: usize = 1;
22
23/// The default cluster id to use.
24pub const DEFAULT_CLUSTER_ID: &str = "5L6g3nShT-eMCtK--X86sw";
25
26/// The default broker id.
27pub const DEFAULT_BROKER_ID: usize = 1;
28
29/// Module to work with [`Apache Kafka`] broker
30///
31/// Starts an instance of Apache Kafka broker, with Apache Kafka Raft (KRaft) is the consensus protocol
32/// enabled.
33///
34/// This module is based on the official [`Apache Kafka docker image`](https://hub.docker.com/r/apache/kafka)
35///
36/// Module comes in two flavours:
37///
38/// - [`Apache Kafka GraalVM docker image`](https://hub.docker.com/r/apache/kafka-native), which is default as it provides faster startup and lower memory consumption.
39/// - [`Apache Kafka JVM docker image`](https://hub.docker.com/r/apache/kafka)
40///
41/// # Example
42/// ```
43/// use testcontainers_modules::{kafka::apache, testcontainers::runners::SyncRunner};
44/// let kafka_node = apache::Kafka::default().start().unwrap();
45/// // connect to kafka server to send/receive messages
46/// ```
47///
48/// [`Apache Kafka`]: https://kafka.apache.org/
49#[derive(Debug, Clone)]
50pub struct Kafka {
51    env_vars: HashMap<String, String>,
52    image_name: String,
53}
54
55impl Default for Kafka {
56    fn default() -> Self {
57        let mut env_vars = HashMap::new();
58        env_vars.insert(
59            "KAFKA_LISTENERS".to_owned(),
60            format!(
61                "PLAINTEXT://0.0.0.0:{},BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094",
62                KAFKA_PORT.as_u16()
63            ),
64        );
65        env_vars.insert("CLUSTER_ID".to_owned(), DEFAULT_CLUSTER_ID.to_owned());
66        env_vars.insert(
67            "KAFKA_PROCESS_ROLES".to_owned(),
68            "broker,controller".to_owned(),
69        );
70
71        env_vars.insert(
72            "KAFKA_CONTROLLER_LISTENER_NAMES".to_owned(),
73            "CONTROLLER".to_owned(),
74        );
75        env_vars.insert(
76            "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
77            "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT".to_owned(),
78        );
79        env_vars.insert(
80            "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(),
81            "BROKER".to_owned(),
82        );
83        env_vars.insert(
84            "KAFKA_ADVERTISED_LISTENERS".to_owned(),
85            format!(
86                "PLAINTEXT://localhost:{},BROKER://localhost:9092",
87                KAFKA_PORT.as_u16()
88            ),
89        );
90        env_vars.insert("KAFKA_BROKER_ID".to_owned(), DEFAULT_BROKER_ID.to_string());
91        env_vars.insert(
92            "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(),
93            DEFAULT_INTERNAL_TOPIC_RF.to_string(),
94        );
95        env_vars.insert(
96            "KAFKA_CONTROLLER_QUORUM_VOTERS".to_owned(),
97            format!("{DEFAULT_BROKER_ID}@localhost:9094").to_owned(),
98        );
99
100        Self {
101            env_vars,
102            image_name: KAFKA_NATIVE_IMAGE_NAME.to_string(),
103        }
104    }
105}
106
107impl Kafka {
108    /// Switches default image to `apache/kafka` instead of `apache/kafka-native`
109    pub fn with_jvm_image(mut self) -> Self {
110        self.image_name = KAFKA_IMAGE_NAME.to_string();
111
112        self
113    }
114}
115
116impl Image for Kafka {
117    fn name(&self) -> &str {
118        self.image_name.as_str()
119    }
120
121    fn tag(&self) -> &str {
122        TAG
123    }
124
125    fn ready_conditions(&self) -> Vec<WaitFor> {
126        // container will be started with custom command which will wait
127        // for a start script to be created in `exec_after_start`,
128        // thus container needs to progress to `exec_after_start`
129        //
130        // actual wait for `ready_conditions` is be done in `exec_after_start`
131        vec![]
132    }
133
134    fn entrypoint(&self) -> Option<&str> {
135        Some("bash")
136    }
137
138    fn env_vars(
139        &self,
140    ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
141        &self.env_vars
142    }
143
144    fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
145        // command starts a while (wait) loop until start script is created.
146        // start script configures kafka with exposed port as is not
147        // available at container creation,
148        //
149        // start script creation is performed in `exec_after_start`
150        vec![
151            "-c".to_string(),
152            format!("while [ ! -f {START_SCRIPT}  ]; do sleep 0.1; done; chmod 755 {START_SCRIPT} && {START_SCRIPT}"),
153        ]
154        .into_iter()
155    }
156
157    fn expose_ports(&self) -> &[ContainerPort] {
158        &[KAFKA_PORT]
159    }
160
161    fn exec_after_start(
162        &self,
163        cs: ContainerState,
164    ) -> Result<Vec<ExecCommand>, testcontainers::TestcontainersError> {
165        let mut commands = vec![];
166        // with container running, port which will accept kafka connections is known
167        // so we can proceed with creating a script which starts kafka broker
168        // with correct port configuration.
169        //
170        // note: scrip will actually be executed by wait process started in `cmd`
171        let cmd = vec![
172            "sh".to_string(),
173            "-c".to_string(),
174            format!(
175                "echo '#!/usr/bin/env bash\nexport KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:{},BROKER://localhost:9093\n/etc/kafka/docker/run \n' > {}",
176                cs.host_port_ipv4(KAFKA_PORT)?,
177                START_SCRIPT
178            ),
179        ];
180        let ready_conditions = vec![WaitFor::message_on_stdout("Kafka Server started")];
181        // as start script will be executed by `cmd` process we need to look
182        // for the message in container log, not script output.
183        commands.push(ExecCommand::new(cmd).with_container_ready_conditions(ready_conditions));
184
185        Ok(commands)
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use std::time::Duration;
192
193    use futures::StreamExt;
194    use rdkafka::{
195        consumer::{Consumer, StreamConsumer},
196        producer::{FutureProducer, FutureRecord},
197        ClientConfig, Message,
198    };
199    use testcontainers::runners::AsyncRunner;
200
201    use crate::kafka::apache;
202
203    #[tokio::test]
204    async fn produce_and_consume_messages_graalvm(
205    ) -> Result<(), Box<dyn std::error::Error + 'static>> {
206        let _ = pretty_env_logger::try_init();
207        let kafka_node = apache::Kafka::default().start().await?;
208
209        let bootstrap_servers = format!(
210            "127.0.0.1:{}",
211            kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await?
212        );
213
214        let producer = ClientConfig::new()
215            .set("bootstrap.servers", &bootstrap_servers)
216            .set("message.timeout.ms", "5000")
217            .create::<FutureProducer>()
218            .expect("Failed to create Kafka FutureProducer");
219
220        let consumer = ClientConfig::new()
221            .set("group.id", "testcontainer-rs")
222            .set("bootstrap.servers", &bootstrap_servers)
223            .set("session.timeout.ms", "6000")
224            .set("enable.auto.commit", "false")
225            .set("auto.offset.reset", "earliest")
226            .create::<StreamConsumer>()
227            .expect("Failed to create Kafka StreamConsumer");
228
229        let topic = "test-topic";
230
231        let number_of_messages_to_produce = 5_usize;
232        let expected: Vec<String> = (0..number_of_messages_to_produce)
233            .map(|i| format!("Message {i}"))
234            .collect();
235
236        for (i, message) in expected.iter().enumerate() {
237            producer
238                .send(
239                    FutureRecord::to(topic)
240                        .payload(message)
241                        .key(&format!("Key {i}")),
242                    Duration::from_secs(0),
243                )
244                .await
245                .unwrap();
246        }
247
248        consumer
249            .subscribe(&[topic])
250            .expect("Failed to subscribe to a topic");
251
252        let mut message_stream = consumer.stream();
253        for produced in expected {
254            let borrowed_message =
255                tokio::time::timeout(Duration::from_secs(10), message_stream.next())
256                    .await
257                    .unwrap()
258                    .unwrap();
259
260            assert_eq!(
261                produced,
262                borrowed_message
263                    .unwrap()
264                    .payload_view::<str>()
265                    .unwrap()
266                    .unwrap()
267            );
268        }
269
270        Ok(())
271    }
272
273    #[tokio::test]
274    async fn produce_and_consume_messages_jvm() -> Result<(), Box<dyn std::error::Error + 'static>>
275    {
276        let _ = pretty_env_logger::try_init();
277        let kafka_node = apache::Kafka::default().with_jvm_image().start().await?;
278
279        let bootstrap_servers = format!(
280            "127.0.0.1:{}",
281            kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await?
282        );
283
284        let producer = ClientConfig::new()
285            .set("bootstrap.servers", &bootstrap_servers)
286            .set("message.timeout.ms", "5000")
287            .create::<FutureProducer>()
288            .expect("Failed to create Kafka FutureProducer");
289
290        let consumer = ClientConfig::new()
291            .set("group.id", "testcontainer-rs")
292            .set("bootstrap.servers", &bootstrap_servers)
293            .set("session.timeout.ms", "6000")
294            .set("enable.auto.commit", "false")
295            .set("auto.offset.reset", "earliest")
296            .create::<StreamConsumer>()
297            .expect("Failed to create Kafka StreamConsumer");
298
299        let topic = "test-topic";
300
301        let number_of_messages_to_produce = 5_usize;
302        let expected: Vec<String> = (0..number_of_messages_to_produce)
303            .map(|i| format!("Message {i}"))
304            .collect();
305
306        for (i, message) in expected.iter().enumerate() {
307            producer
308                .send(
309                    FutureRecord::to(topic)
310                        .payload(message)
311                        .key(&format!("Key {i}")),
312                    Duration::from_secs(0),
313                )
314                .await
315                .unwrap();
316        }
317
318        consumer
319            .subscribe(&[topic])
320            .expect("Failed to subscribe to a topic");
321
322        let mut message_stream = consumer.stream();
323        for produced in expected {
324            let borrowed_message =
325                tokio::time::timeout(Duration::from_secs(10), message_stream.next())
326                    .await
327                    .unwrap()
328                    .unwrap();
329
330            assert_eq!(
331                produced,
332                borrowed_message
333                    .unwrap()
334                    .payload_view::<str>()
335                    .unwrap()
336                    .unwrap()
337            );
338        }
339
340        Ok(())
341    }
342}