testcontainers_modules/kafka/
confluent.rs1use std::{borrow::Cow, collections::HashMap};
2
3use testcontainers::{
4 core::{ContainerPort, ContainerState, ExecCommand, WaitFor},
5 Image,
6};
7
8const NAME: &str = "confluentinc/cp-kafka";
9const TAG: &str = "6.1.1";
10pub const KAFKA_PORT: ContainerPort = ContainerPort::Tcp(9093);
15pub const ZOOKEEPER_PORT: ContainerPort = ContainerPort::Tcp(2181);
20
21pub const DEFAULT_INTERNAL_TOPIC_RF: usize = 1;
23
24pub const DEFAULT_CLUSTER_ID: &str = "5L6g3nShT-eMCtK--X86sw";
26
27pub const DEFAULT_BROKER_ID: usize = 1;
29
30#[derive(Debug, Clone)]
58pub struct Kafka {
59 env_vars: HashMap<String, String>,
60}
61
62impl Default for Kafka {
63 fn default() -> Self {
64 let mut env_vars = HashMap::new();
65
66 env_vars.insert(
67 "KAFKA_ZOOKEEPER_CONNECT".to_owned(),
68 format!("localhost:{}", ZOOKEEPER_PORT.as_u16()),
69 );
70 env_vars.insert(
71 "KAFKA_LISTENERS".to_owned(),
72 format!(
73 "PLAINTEXT://0.0.0.0:{port},BROKER://0.0.0.0:9092",
74 port = KAFKA_PORT.as_u16(),
75 ),
76 );
77 env_vars.insert(
78 "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
79 "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT".to_owned(),
80 );
81 env_vars.insert(
82 "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(),
83 "BROKER".to_owned(),
84 );
85 env_vars.insert(
86 "KAFKA_ADVERTISED_LISTENERS".to_owned(),
87 format!(
88 "PLAINTEXT://localhost:{port},BROKER://localhost:9092",
89 port = KAFKA_PORT.as_u16(),
90 ),
91 );
92 env_vars.insert("KAFKA_BROKER_ID".to_owned(), DEFAULT_BROKER_ID.to_string());
93 env_vars.insert(
94 "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(),
95 DEFAULT_INTERNAL_TOPIC_RF.to_string(),
96 );
97 env_vars.insert("CLUSTER_ID".to_owned(), DEFAULT_CLUSTER_ID.to_owned());
98
99 Self { env_vars }
100 }
101}
102
103impl Image for Kafka {
104 fn name(&self) -> &str {
105 NAME
106 }
107
108 fn tag(&self) -> &str {
109 TAG
110 }
111
112 fn ready_conditions(&self) -> Vec<WaitFor> {
113 vec![WaitFor::message_on_stdout("Creating new log file")]
114 }
115
116 fn env_vars(
117 &self,
118 ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
119 &self.env_vars
120 }
121
122 fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
123 vec![
124 "/bin/bash".to_owned(),
125 "-c".to_owned(),
126 format!(
127 r#"
128echo 'clientPort={ZOOKEEPER_PORT}' > zookeeper.properties;
129echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties;
130echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties;
131zookeeper-server-start zookeeper.properties &
132. /etc/confluent/docker/bash-config &&
133/etc/confluent/docker/configure &&
134/etc/confluent/docker/launch"#,
135 ZOOKEEPER_PORT = ZOOKEEPER_PORT.as_u16()
136 ),
137 ]
138 }
139
140 fn expose_ports(&self) -> &[ContainerPort] {
141 &[KAFKA_PORT]
142 }
143
144 fn exec_after_start(
145 &self,
146 cs: ContainerState,
147 ) -> Result<Vec<ExecCommand>, testcontainers::TestcontainersError> {
148 let mut commands = vec![];
149 let cmd = vec![
150 "kafka-configs".to_string(),
151 "--alter".to_string(),
152 "--bootstrap-server".to_string(),
153 "0.0.0.0:9092".to_string(),
154 "--entity-type".to_string(),
155 "brokers".to_string(),
156 "--entity-name".to_string(),
157 "1".to_string(),
158 "--add-config".to_string(),
159 format!(
160 "advertised.listeners=[PLAINTEXT://127.0.0.1:{},BROKER://localhost:9092]",
161 cs.host_port_ipv4(KAFKA_PORT)?
162 ),
163 ];
164 let ready_conditions = vec![WaitFor::message_on_stdout(
165 "Checking need to trigger auto leader balancing",
166 )];
167 commands.push(ExecCommand::new(cmd).with_container_ready_conditions(ready_conditions));
168 Ok(commands)
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use std::time::Duration;
175
176 use futures::StreamExt;
177 use rdkafka::{
178 consumer::{Consumer, StreamConsumer},
179 producer::{FutureProducer, FutureRecord},
180 ClientConfig, Message,
181 };
182 use testcontainers::runners::AsyncRunner;
183
184 use crate::kafka;
185
186 #[tokio::test]
187 async fn produce_and_consume_messages() -> Result<(), Box<dyn std::error::Error + 'static>> {
188 let _ = pretty_env_logger::try_init();
189 let kafka_node = kafka::Kafka::default().start().await?;
190
191 let bootstrap_servers = format!(
192 "127.0.0.1:{}",
193 kafka_node.get_host_port_ipv4(kafka::KAFKA_PORT).await?
194 );
195
196 let producer = ClientConfig::new()
197 .set("bootstrap.servers", &bootstrap_servers)
198 .set("message.timeout.ms", "5000")
199 .create::<FutureProducer>()
200 .expect("Failed to create Kafka FutureProducer");
201
202 let consumer = ClientConfig::new()
203 .set("group.id", "testcontainer-rs")
204 .set("bootstrap.servers", &bootstrap_servers)
205 .set("session.timeout.ms", "6000")
206 .set("enable.auto.commit", "false")
207 .set("auto.offset.reset", "earliest")
208 .create::<StreamConsumer>()
209 .expect("Failed to create Kafka StreamConsumer");
210
211 let topic = "test-topic";
212
213 let number_of_messages_to_produce = 5_usize;
214 let expected: Vec<String> = (0..number_of_messages_to_produce)
215 .map(|i| format!("Message {i}"))
216 .collect();
217
218 for (i, message) in expected.iter().enumerate() {
219 producer
220 .send(
221 FutureRecord::to(topic)
222 .payload(message)
223 .key(&format!("Key {i}")),
224 Duration::from_secs(0),
225 )
226 .await
227 .unwrap();
228 }
229
230 consumer
231 .subscribe(&[topic])
232 .expect("Failed to subscribe to a topic");
233
234 let mut message_stream = consumer.stream();
235 for produced in expected {
236 let borrowed_message =
237 tokio::time::timeout(Duration::from_secs(10), message_stream.next())
238 .await
239 .unwrap()
240 .unwrap();
241
242 assert_eq!(
243 produced,
244 borrowed_message
245 .unwrap()
246 .payload_view::<str>()
247 .unwrap()
248 .unwrap()
249 );
250 }
251
252 Ok(())
253 }
254}