testcontainers_modules/kafka/
confluent.rs1use std::{borrow::Cow, collections::HashMap};
2
3use testcontainers::{
4 core::{ContainerPort, ContainerState, ExecCommand, WaitFor},
5 Image,
6};
7
8const NAME: &str = "confluentinc/cp-kafka";
9const TAG: &str = "6.1.1";
10pub const KAFKA_PORT: ContainerPort = ContainerPort::Tcp(9093);
15pub const ZOOKEEPER_PORT: ContainerPort = ContainerPort::Tcp(2181);
20
21#[derive(Debug, Clone)]
49pub struct Kafka {
50 env_vars: HashMap<String, String>,
51}
52
53impl Default for Kafka {
54 fn default() -> Self {
55 let mut env_vars = HashMap::new();
56
57 env_vars.insert(
58 "KAFKA_ZOOKEEPER_CONNECT".to_owned(),
59 format!("localhost:{}", ZOOKEEPER_PORT.as_u16()),
60 );
61 env_vars.insert(
62 "KAFKA_LISTENERS".to_owned(),
63 format!(
64 "PLAINTEXT://0.0.0.0:{port},BROKER://0.0.0.0:9092",
65 port = KAFKA_PORT.as_u16(),
66 ),
67 );
68 env_vars.insert(
69 "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
70 "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT".to_owned(),
71 );
72 env_vars.insert(
73 "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(),
74 "BROKER".to_owned(),
75 );
76 env_vars.insert(
77 "KAFKA_ADVERTISED_LISTENERS".to_owned(),
78 format!(
79 "PLAINTEXT://localhost:{port},BROKER://localhost:9092",
80 port = KAFKA_PORT.as_u16(),
81 ),
82 );
83 env_vars.insert("KAFKA_BROKER_ID".to_owned(), "1".to_owned());
84 env_vars.insert(
85 "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(),
86 "1".to_owned(),
87 );
88
89 Self { env_vars }
90 }
91}
92
93impl Image for Kafka {
94 fn name(&self) -> &str {
95 NAME
96 }
97
98 fn tag(&self) -> &str {
99 TAG
100 }
101
102 fn ready_conditions(&self) -> Vec<WaitFor> {
103 vec![WaitFor::message_on_stdout("Creating new log file")]
104 }
105
106 fn env_vars(
107 &self,
108 ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
109 &self.env_vars
110 }
111
112 fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
113 vec![
114 "/bin/bash".to_owned(),
115 "-c".to_owned(),
116 format!(
117 r#"
118echo 'clientPort={ZOOKEEPER_PORT}' > zookeeper.properties;
119echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties;
120echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties;
121zookeeper-server-start zookeeper.properties &
122. /etc/confluent/docker/bash-config &&
123/etc/confluent/docker/configure &&
124/etc/confluent/docker/launch"#,
125 ZOOKEEPER_PORT = ZOOKEEPER_PORT.as_u16()
126 ),
127 ]
128 }
129
130 fn expose_ports(&self) -> &[ContainerPort] {
131 &[KAFKA_PORT]
132 }
133
134 fn exec_after_start(
135 &self,
136 cs: ContainerState,
137 ) -> Result<Vec<ExecCommand>, testcontainers::TestcontainersError> {
138 let mut commands = vec![];
139 let cmd = vec![
140 "kafka-configs".to_string(),
141 "--alter".to_string(),
142 "--bootstrap-server".to_string(),
143 "0.0.0.0:9092".to_string(),
144 "--entity-type".to_string(),
145 "brokers".to_string(),
146 "--entity-name".to_string(),
147 "1".to_string(),
148 "--add-config".to_string(),
149 format!(
150 "advertised.listeners=[PLAINTEXT://127.0.0.1:{},BROKER://localhost:9092]",
151 cs.host_port_ipv4(KAFKA_PORT)?
152 ),
153 ];
154 let ready_conditions = vec![WaitFor::message_on_stdout(
155 "Checking need to trigger auto leader balancing",
156 )];
157 commands.push(ExecCommand::new(cmd).with_container_ready_conditions(ready_conditions));
158 Ok(commands)
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use std::time::Duration;
165
166 use futures::StreamExt;
167 use rdkafka::{
168 consumer::{Consumer, StreamConsumer},
169 producer::{FutureProducer, FutureRecord},
170 ClientConfig, Message,
171 };
172 use testcontainers::runners::AsyncRunner;
173
174 use crate::kafka;
175
176 #[tokio::test]
177 async fn produce_and_consume_messages() -> Result<(), Box<dyn std::error::Error + 'static>> {
178 let _ = pretty_env_logger::try_init();
179 let kafka_node = kafka::Kafka::default().start().await?;
180
181 let bootstrap_servers = format!(
182 "127.0.0.1:{}",
183 kafka_node.get_host_port_ipv4(kafka::KAFKA_PORT).await?
184 );
185
186 let producer = ClientConfig::new()
187 .set("bootstrap.servers", &bootstrap_servers)
188 .set("message.timeout.ms", "5000")
189 .create::<FutureProducer>()
190 .expect("Failed to create Kafka FutureProducer");
191
192 let consumer = ClientConfig::new()
193 .set("group.id", "testcontainer-rs")
194 .set("bootstrap.servers", &bootstrap_servers)
195 .set("session.timeout.ms", "6000")
196 .set("enable.auto.commit", "false")
197 .set("auto.offset.reset", "earliest")
198 .create::<StreamConsumer>()
199 .expect("Failed to create Kafka StreamConsumer");
200
201 let topic = "test-topic";
202
203 let number_of_messages_to_produce = 5_usize;
204 let expected: Vec<String> = (0..number_of_messages_to_produce)
205 .map(|i| format!("Message {i}"))
206 .collect();
207
208 for (i, message) in expected.iter().enumerate() {
209 producer
210 .send(
211 FutureRecord::to(topic)
212 .payload(message)
213 .key(&format!("Key {i}")),
214 Duration::from_secs(0),
215 )
216 .await
217 .unwrap();
218 }
219
220 consumer
221 .subscribe(&[topic])
222 .expect("Failed to subscribe to a topic");
223
224 let mut message_stream = consumer.stream();
225 for produced in expected {
226 let borrowed_message =
227 tokio::time::timeout(Duration::from_secs(10), message_stream.next())
228 .await
229 .unwrap()
230 .unwrap();
231
232 assert_eq!(
233 produced,
234 borrowed_message
235 .unwrap()
236 .payload_view::<str>()
237 .unwrap()
238 .unwrap()
239 );
240 }
241
242 Ok(())
243 }
244}