testcontainers_modules/kafka/
apache.rs1use std::{borrow::Cow, collections::HashMap};
2
3use testcontainers::{
4 core::{ContainerPort, ContainerState, ExecCommand, WaitFor},
5 Image,
6};
7
8const KAFKA_NATIVE_IMAGE_NAME: &str = "apache/kafka-native";
9const KAFKA_IMAGE_NAME: &str = "apache/kafka";
10const TAG: &str = "3.8.0";
11
12pub const KAFKA_PORT: ContainerPort = ContainerPort::Tcp(9092);
17
18const START_SCRIPT: &str = "/opt/kafka/testcontainers_start.sh";
19const DEFAULT_INTERNAL_TOPIC_RF: usize = 1;
20const DEFAULT_CLUSTER_ID: &str = "5L6g3nShT-eMCtK--X86sw";
21const DEFAULT_BROKER_ID: usize = 1;
22
23#[derive(Debug, Clone)]
44pub struct Kafka {
45 env_vars: HashMap<String, String>,
46 image_name: String,
47}
48
49impl Default for Kafka {
50 fn default() -> Self {
51 let mut env_vars = HashMap::new();
52 env_vars.insert(
53 "KAFKA_LISTENERS".to_owned(),
54 format!(
55 "PLAINTEXT://0.0.0.0:{},BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094",
56 KAFKA_PORT.as_u16()
57 ),
58 );
59 env_vars.insert("CLUSTER_ID".to_owned(), DEFAULT_CLUSTER_ID.to_owned());
60 env_vars.insert(
61 "KAFKA_PROCESS_ROLES".to_owned(),
62 "broker,controller".to_owned(),
63 );
64
65 env_vars.insert(
66 "KAFKA_CONTROLLER_LISTENER_NAMES".to_owned(),
67 "CONTROLLER".to_owned(),
68 );
69 env_vars.insert(
70 "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
71 "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT".to_owned(),
72 );
73 env_vars.insert(
74 "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(),
75 "BROKER".to_owned(),
76 );
77 env_vars.insert(
78 "KAFKA_ADVERTISED_LISTENERS".to_owned(),
79 format!(
80 "PLAINTEXT://localhost:{},BROKER://localhost:9092",
81 KAFKA_PORT.as_u16()
82 ),
83 );
84 env_vars.insert("KAFKA_BROKER_ID".to_owned(), DEFAULT_BROKER_ID.to_string());
85 env_vars.insert(
86 "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(),
87 DEFAULT_INTERNAL_TOPIC_RF.to_string(),
88 );
89 env_vars.insert(
90 "KAFKA_CONTROLLER_QUORUM_VOTERS".to_owned(),
91 format!("{DEFAULT_BROKER_ID}@localhost:9094").to_owned(),
92 );
93
94 Self {
95 env_vars,
96 image_name: KAFKA_NATIVE_IMAGE_NAME.to_string(),
97 }
98 }
99}
100
101impl Kafka {
102 pub fn with_jvm_image(mut self) -> Self {
104 self.image_name = KAFKA_IMAGE_NAME.to_string();
105
106 self
107 }
108}
109
110impl Image for Kafka {
111 fn name(&self) -> &str {
112 self.image_name.as_str()
113 }
114
115 fn tag(&self) -> &str {
116 TAG
117 }
118
119 fn ready_conditions(&self) -> Vec<WaitFor> {
120 vec![]
126 }
127
128 fn entrypoint(&self) -> Option<&str> {
129 Some("bash")
130 }
131
132 fn env_vars(
133 &self,
134 ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
135 &self.env_vars
136 }
137
138 fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
139 vec![
145 "-c".to_string(),
146 format!("while [ ! -f {START_SCRIPT} ]; do sleep 0.1; done; chmod 755 {START_SCRIPT} && {START_SCRIPT}"),
147 ]
148 .into_iter()
149 }
150
151 fn expose_ports(&self) -> &[ContainerPort] {
152 &[KAFKA_PORT]
153 }
154
155 fn exec_after_start(
156 &self,
157 cs: ContainerState,
158 ) -> Result<Vec<ExecCommand>, testcontainers::TestcontainersError> {
159 let mut commands = vec![];
160 let cmd = vec![
166 "sh".to_string(),
167 "-c".to_string(),
168 format!(
169 "echo '#!/usr/bin/env bash\nexport KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:{},BROKER://localhost:9093\n/etc/kafka/docker/run \n' > {}",
170 cs.host_port_ipv4(KAFKA_PORT)?,
171 START_SCRIPT
172 ),
173 ];
174 let ready_conditions = vec![WaitFor::message_on_stdout("Kafka Server started")];
175 commands.push(ExecCommand::new(cmd).with_container_ready_conditions(ready_conditions));
178
179 Ok(commands)
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use std::time::Duration;
186
187 use futures::StreamExt;
188 use rdkafka::{
189 consumer::{Consumer, StreamConsumer},
190 producer::{FutureProducer, FutureRecord},
191 ClientConfig, Message,
192 };
193 use testcontainers::runners::AsyncRunner;
194
195 use crate::kafka::apache;
196
197 #[tokio::test]
198 async fn produce_and_consume_messages_graalvm(
199 ) -> Result<(), Box<dyn std::error::Error + 'static>> {
200 let _ = pretty_env_logger::try_init();
201 let kafka_node = apache::Kafka::default().start().await?;
202
203 let bootstrap_servers = format!(
204 "127.0.0.1:{}",
205 kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await?
206 );
207
208 let producer = ClientConfig::new()
209 .set("bootstrap.servers", &bootstrap_servers)
210 .set("message.timeout.ms", "5000")
211 .create::<FutureProducer>()
212 .expect("Failed to create Kafka FutureProducer");
213
214 let consumer = ClientConfig::new()
215 .set("group.id", "testcontainer-rs")
216 .set("bootstrap.servers", &bootstrap_servers)
217 .set("session.timeout.ms", "6000")
218 .set("enable.auto.commit", "false")
219 .set("auto.offset.reset", "earliest")
220 .create::<StreamConsumer>()
221 .expect("Failed to create Kafka StreamConsumer");
222
223 let topic = "test-topic";
224
225 let number_of_messages_to_produce = 5_usize;
226 let expected: Vec<String> = (0..number_of_messages_to_produce)
227 .map(|i| format!("Message {i}"))
228 .collect();
229
230 for (i, message) in expected.iter().enumerate() {
231 producer
232 .send(
233 FutureRecord::to(topic)
234 .payload(message)
235 .key(&format!("Key {i}")),
236 Duration::from_secs(0),
237 )
238 .await
239 .unwrap();
240 }
241
242 consumer
243 .subscribe(&[topic])
244 .expect("Failed to subscribe to a topic");
245
246 let mut message_stream = consumer.stream();
247 for produced in expected {
248 let borrowed_message =
249 tokio::time::timeout(Duration::from_secs(10), message_stream.next())
250 .await
251 .unwrap()
252 .unwrap();
253
254 assert_eq!(
255 produced,
256 borrowed_message
257 .unwrap()
258 .payload_view::<str>()
259 .unwrap()
260 .unwrap()
261 );
262 }
263
264 Ok(())
265 }
266
267 #[tokio::test]
268 async fn produce_and_consume_messages_jvm() -> Result<(), Box<dyn std::error::Error + 'static>>
269 {
270 let _ = pretty_env_logger::try_init();
271 let kafka_node = apache::Kafka::default().with_jvm_image().start().await?;
272
273 let bootstrap_servers = format!(
274 "127.0.0.1:{}",
275 kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await?
276 );
277
278 let producer = ClientConfig::new()
279 .set("bootstrap.servers", &bootstrap_servers)
280 .set("message.timeout.ms", "5000")
281 .create::<FutureProducer>()
282 .expect("Failed to create Kafka FutureProducer");
283
284 let consumer = ClientConfig::new()
285 .set("group.id", "testcontainer-rs")
286 .set("bootstrap.servers", &bootstrap_servers)
287 .set("session.timeout.ms", "6000")
288 .set("enable.auto.commit", "false")
289 .set("auto.offset.reset", "earliest")
290 .create::<StreamConsumer>()
291 .expect("Failed to create Kafka StreamConsumer");
292
293 let topic = "test-topic";
294
295 let number_of_messages_to_produce = 5_usize;
296 let expected: Vec<String> = (0..number_of_messages_to_produce)
297 .map(|i| format!("Message {i}"))
298 .collect();
299
300 for (i, message) in expected.iter().enumerate() {
301 producer
302 .send(
303 FutureRecord::to(topic)
304 .payload(message)
305 .key(&format!("Key {i}")),
306 Duration::from_secs(0),
307 )
308 .await
309 .unwrap();
310 }
311
312 consumer
313 .subscribe(&[topic])
314 .expect("Failed to subscribe to a topic");
315
316 let mut message_stream = consumer.stream();
317 for produced in expected {
318 let borrowed_message =
319 tokio::time::timeout(Duration::from_secs(10), message_stream.next())
320 .await
321 .unwrap()
322 .unwrap();
323
324 assert_eq!(
325 produced,
326 borrowed_message
327 .unwrap()
328 .payload_view::<str>()
329 .unwrap()
330 .unwrap()
331 );
332 }
333
334 Ok(())
335 }
336}