testcontainers_modules/kafka/
apache.rs1use std::{borrow::Cow, collections::HashMap};
2
3use testcontainers::{
4 core::{ContainerPort, ContainerState, ExecCommand, WaitFor},
5 Image,
6};
7
8const KAFKA_NATIVE_IMAGE_NAME: &str = "apache/kafka-native";
9const KAFKA_IMAGE_NAME: &str = "apache/kafka";
10const TAG: &str = "3.8.0";
11
12pub const KAFKA_PORT: ContainerPort = ContainerPort::Tcp(9092);
17
18const START_SCRIPT: &str = "/opt/kafka/testcontainers_start.sh";
19
20pub const DEFAULT_INTERNAL_TOPIC_RF: usize = 1;
22
23pub const DEFAULT_CLUSTER_ID: &str = "5L6g3nShT-eMCtK--X86sw";
25
26pub const DEFAULT_BROKER_ID: usize = 1;
28
29#[derive(Debug, Clone)]
50pub struct Kafka {
51 env_vars: HashMap<String, String>,
52 image_name: String,
53}
54
55impl Default for Kafka {
56 fn default() -> Self {
57 let mut env_vars = HashMap::new();
58 env_vars.insert(
59 "KAFKA_LISTENERS".to_owned(),
60 format!(
61 "PLAINTEXT://0.0.0.0:{},BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094",
62 KAFKA_PORT.as_u16()
63 ),
64 );
65 env_vars.insert("CLUSTER_ID".to_owned(), DEFAULT_CLUSTER_ID.to_owned());
66 env_vars.insert(
67 "KAFKA_PROCESS_ROLES".to_owned(),
68 "broker,controller".to_owned(),
69 );
70
71 env_vars.insert(
72 "KAFKA_CONTROLLER_LISTENER_NAMES".to_owned(),
73 "CONTROLLER".to_owned(),
74 );
75 env_vars.insert(
76 "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
77 "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT".to_owned(),
78 );
79 env_vars.insert(
80 "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(),
81 "BROKER".to_owned(),
82 );
83 env_vars.insert(
84 "KAFKA_ADVERTISED_LISTENERS".to_owned(),
85 format!(
86 "PLAINTEXT://localhost:{},BROKER://localhost:9092",
87 KAFKA_PORT.as_u16()
88 ),
89 );
90 env_vars.insert("KAFKA_BROKER_ID".to_owned(), DEFAULT_BROKER_ID.to_string());
91 env_vars.insert(
92 "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(),
93 DEFAULT_INTERNAL_TOPIC_RF.to_string(),
94 );
95 env_vars.insert(
96 "KAFKA_CONTROLLER_QUORUM_VOTERS".to_owned(),
97 format!("{DEFAULT_BROKER_ID}@localhost:9094").to_owned(),
98 );
99
100 Self {
101 env_vars,
102 image_name: KAFKA_NATIVE_IMAGE_NAME.to_string(),
103 }
104 }
105}
106
107impl Kafka {
108 pub fn with_jvm_image(mut self) -> Self {
110 self.image_name = KAFKA_IMAGE_NAME.to_string();
111
112 self
113 }
114}
115
116impl Image for Kafka {
117 fn name(&self) -> &str {
118 self.image_name.as_str()
119 }
120
121 fn tag(&self) -> &str {
122 TAG
123 }
124
125 fn ready_conditions(&self) -> Vec<WaitFor> {
126 vec![]
132 }
133
134 fn entrypoint(&self) -> Option<&str> {
135 Some("bash")
136 }
137
138 fn env_vars(
139 &self,
140 ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
141 &self.env_vars
142 }
143
144 fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
145 vec![
151 "-c".to_string(),
152 format!("while [ ! -f {START_SCRIPT} ]; do sleep 0.1; done; chmod 755 {START_SCRIPT} && {START_SCRIPT}"),
153 ]
154 .into_iter()
155 }
156
157 fn expose_ports(&self) -> &[ContainerPort] {
158 &[KAFKA_PORT]
159 }
160
161 fn exec_after_start(
162 &self,
163 cs: ContainerState,
164 ) -> Result<Vec<ExecCommand>, testcontainers::TestcontainersError> {
165 let mut commands = vec![];
166 let cmd = vec![
172 "sh".to_string(),
173 "-c".to_string(),
174 format!(
175 "echo '#!/usr/bin/env bash\nexport KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:{},BROKER://localhost:9093\n/etc/kafka/docker/run \n' > {}",
176 cs.host_port_ipv4(KAFKA_PORT)?,
177 START_SCRIPT
178 ),
179 ];
180 let ready_conditions = vec![WaitFor::message_on_stdout("Kafka Server started")];
181 commands.push(ExecCommand::new(cmd).with_container_ready_conditions(ready_conditions));
184
185 Ok(commands)
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use std::time::Duration;
192
193 use futures::StreamExt;
194 use rdkafka::{
195 consumer::{Consumer, StreamConsumer},
196 producer::{FutureProducer, FutureRecord},
197 ClientConfig, Message,
198 };
199 use testcontainers::runners::AsyncRunner;
200
201 use crate::kafka::apache;
202
203 #[tokio::test]
204 async fn produce_and_consume_messages_graalvm(
205 ) -> Result<(), Box<dyn std::error::Error + 'static>> {
206 let _ = pretty_env_logger::try_init();
207 let kafka_node = apache::Kafka::default().start().await?;
208
209 let bootstrap_servers = format!(
210 "127.0.0.1:{}",
211 kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await?
212 );
213
214 let producer = ClientConfig::new()
215 .set("bootstrap.servers", &bootstrap_servers)
216 .set("message.timeout.ms", "5000")
217 .create::<FutureProducer>()
218 .expect("Failed to create Kafka FutureProducer");
219
220 let consumer = ClientConfig::new()
221 .set("group.id", "testcontainer-rs")
222 .set("bootstrap.servers", &bootstrap_servers)
223 .set("session.timeout.ms", "6000")
224 .set("enable.auto.commit", "false")
225 .set("auto.offset.reset", "earliest")
226 .create::<StreamConsumer>()
227 .expect("Failed to create Kafka StreamConsumer");
228
229 let topic = "test-topic";
230
231 let number_of_messages_to_produce = 5_usize;
232 let expected: Vec<String> = (0..number_of_messages_to_produce)
233 .map(|i| format!("Message {i}"))
234 .collect();
235
236 for (i, message) in expected.iter().enumerate() {
237 producer
238 .send(
239 FutureRecord::to(topic)
240 .payload(message)
241 .key(&format!("Key {i}")),
242 Duration::from_secs(0),
243 )
244 .await
245 .unwrap();
246 }
247
248 consumer
249 .subscribe(&[topic])
250 .expect("Failed to subscribe to a topic");
251
252 let mut message_stream = consumer.stream();
253 for produced in expected {
254 let borrowed_message =
255 tokio::time::timeout(Duration::from_secs(10), message_stream.next())
256 .await
257 .unwrap()
258 .unwrap();
259
260 assert_eq!(
261 produced,
262 borrowed_message
263 .unwrap()
264 .payload_view::<str>()
265 .unwrap()
266 .unwrap()
267 );
268 }
269
270 Ok(())
271 }
272
273 #[tokio::test]
274 async fn produce_and_consume_messages_jvm() -> Result<(), Box<dyn std::error::Error + 'static>>
275 {
276 let _ = pretty_env_logger::try_init();
277 let kafka_node = apache::Kafka::default().with_jvm_image().start().await?;
278
279 let bootstrap_servers = format!(
280 "127.0.0.1:{}",
281 kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await?
282 );
283
284 let producer = ClientConfig::new()
285 .set("bootstrap.servers", &bootstrap_servers)
286 .set("message.timeout.ms", "5000")
287 .create::<FutureProducer>()
288 .expect("Failed to create Kafka FutureProducer");
289
290 let consumer = ClientConfig::new()
291 .set("group.id", "testcontainer-rs")
292 .set("bootstrap.servers", &bootstrap_servers)
293 .set("session.timeout.ms", "6000")
294 .set("enable.auto.commit", "false")
295 .set("auto.offset.reset", "earliest")
296 .create::<StreamConsumer>()
297 .expect("Failed to create Kafka StreamConsumer");
298
299 let topic = "test-topic";
300
301 let number_of_messages_to_produce = 5_usize;
302 let expected: Vec<String> = (0..number_of_messages_to_produce)
303 .map(|i| format!("Message {i}"))
304 .collect();
305
306 for (i, message) in expected.iter().enumerate() {
307 producer
308 .send(
309 FutureRecord::to(topic)
310 .payload(message)
311 .key(&format!("Key {i}")),
312 Duration::from_secs(0),
313 )
314 .await
315 .unwrap();
316 }
317
318 consumer
319 .subscribe(&[topic])
320 .expect("Failed to subscribe to a topic");
321
322 let mut message_stream = consumer.stream();
323 for produced in expected {
324 let borrowed_message =
325 tokio::time::timeout(Duration::from_secs(10), message_stream.next())
326 .await
327 .unwrap()
328 .unwrap();
329
330 assert_eq!(
331 produced,
332 borrowed_message
333 .unwrap()
334 .payload_view::<str>()
335 .unwrap()
336 .unwrap()
337 );
338 }
339
340 Ok(())
341 }
342}