testcontainers_modules/kafka/
confluent.rs1use std::{borrow::Cow, collections::HashMap};
2
3use testcontainers::{
4 core::{ContainerPort, ContainerState, ExecCommand, WaitFor},
5 Image,
6};
7
8const NAME: &str = "confluentinc/cp-kafka";
9const TAG: &str = "6.1.1";
10pub const KAFKA_PORT: ContainerPort = ContainerPort::Tcp(9093);
15pub const ZOOKEEPER_PORT: ContainerPort = ContainerPort::Tcp(2181);
20
21#[allow(missing_docs)]
22#[derive(Debug, Clone)]
24pub struct Kafka {
25 env_vars: HashMap<String, String>,
26}
27
28impl Default for Kafka {
29 fn default() -> Self {
30 let mut env_vars = HashMap::new();
31
32 env_vars.insert(
33 "KAFKA_ZOOKEEPER_CONNECT".to_owned(),
34 format!("localhost:{}", ZOOKEEPER_PORT.as_u16()),
35 );
36 env_vars.insert(
37 "KAFKA_LISTENERS".to_owned(),
38 format!(
39 "PLAINTEXT://0.0.0.0:{port},BROKER://0.0.0.0:9092",
40 port = KAFKA_PORT.as_u16(),
41 ),
42 );
43 env_vars.insert(
44 "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
45 "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT".to_owned(),
46 );
47 env_vars.insert(
48 "KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(),
49 "BROKER".to_owned(),
50 );
51 env_vars.insert(
52 "KAFKA_ADVERTISED_LISTENERS".to_owned(),
53 format!(
54 "PLAINTEXT://localhost:{port},BROKER://localhost:9092",
55 port = KAFKA_PORT.as_u16(),
56 ),
57 );
58 env_vars.insert("KAFKA_BROKER_ID".to_owned(), "1".to_owned());
59 env_vars.insert(
60 "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(),
61 "1".to_owned(),
62 );
63
64 Self { env_vars }
65 }
66}
67
68impl Image for Kafka {
69 fn name(&self) -> &str {
70 NAME
71 }
72
73 fn tag(&self) -> &str {
74 TAG
75 }
76
77 fn ready_conditions(&self) -> Vec<WaitFor> {
78 vec![WaitFor::message_on_stdout("Creating new log file")]
79 }
80
81 fn env_vars(
82 &self,
83 ) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
84 &self.env_vars
85 }
86
87 fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
88 vec![
89 "/bin/bash".to_owned(),
90 "-c".to_owned(),
91 format!(
92 r#"
93echo 'clientPort={ZOOKEEPER_PORT}' > zookeeper.properties;
94echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties;
95echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties;
96zookeeper-server-start zookeeper.properties &
97. /etc/confluent/docker/bash-config &&
98/etc/confluent/docker/configure &&
99/etc/confluent/docker/launch"#,
100 ZOOKEEPER_PORT = ZOOKEEPER_PORT.as_u16()
101 ),
102 ]
103 }
104
105 fn expose_ports(&self) -> &[ContainerPort] {
106 &[KAFKA_PORT]
107 }
108
109 fn exec_after_start(
110 &self,
111 cs: ContainerState,
112 ) -> Result<Vec<ExecCommand>, testcontainers::TestcontainersError> {
113 let mut commands = vec![];
114 let cmd = vec![
115 "kafka-configs".to_string(),
116 "--alter".to_string(),
117 "--bootstrap-server".to_string(),
118 "0.0.0.0:9092".to_string(),
119 "--entity-type".to_string(),
120 "brokers".to_string(),
121 "--entity-name".to_string(),
122 "1".to_string(),
123 "--add-config".to_string(),
124 format!(
125 "advertised.listeners=[PLAINTEXT://127.0.0.1:{},BROKER://localhost:9092]",
126 cs.host_port_ipv4(KAFKA_PORT)?
127 ),
128 ];
129 let ready_conditions = vec![WaitFor::message_on_stdout(
130 "Checking need to trigger auto leader balancing",
131 )];
132 commands.push(ExecCommand::new(cmd).with_container_ready_conditions(ready_conditions));
133 Ok(commands)
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use std::time::Duration;
140
141 use futures::StreamExt;
142 use rdkafka::{
143 consumer::{Consumer, StreamConsumer},
144 producer::{FutureProducer, FutureRecord},
145 ClientConfig, Message,
146 };
147 use testcontainers::runners::AsyncRunner;
148
149 use crate::kafka;
150
151 #[tokio::test]
152 async fn produce_and_consume_messages() -> Result<(), Box<dyn std::error::Error + 'static>> {
153 let _ = pretty_env_logger::try_init();
154 let kafka_node = kafka::Kafka::default().start().await?;
155
156 let bootstrap_servers = format!(
157 "127.0.0.1:{}",
158 kafka_node.get_host_port_ipv4(kafka::KAFKA_PORT).await?
159 );
160
161 let producer = ClientConfig::new()
162 .set("bootstrap.servers", &bootstrap_servers)
163 .set("message.timeout.ms", "5000")
164 .create::<FutureProducer>()
165 .expect("Failed to create Kafka FutureProducer");
166
167 let consumer = ClientConfig::new()
168 .set("group.id", "testcontainer-rs")
169 .set("bootstrap.servers", &bootstrap_servers)
170 .set("session.timeout.ms", "6000")
171 .set("enable.auto.commit", "false")
172 .set("auto.offset.reset", "earliest")
173 .create::<StreamConsumer>()
174 .expect("Failed to create Kafka StreamConsumer");
175
176 let topic = "test-topic";
177
178 let number_of_messages_to_produce = 5_usize;
179 let expected: Vec<String> = (0..number_of_messages_to_produce)
180 .map(|i| format!("Message {i}"))
181 .collect();
182
183 for (i, message) in expected.iter().enumerate() {
184 producer
185 .send(
186 FutureRecord::to(topic)
187 .payload(message)
188 .key(&format!("Key {i}")),
189 Duration::from_secs(0),
190 )
191 .await
192 .unwrap();
193 }
194
195 consumer
196 .subscribe(&[topic])
197 .expect("Failed to subscribe to a topic");
198
199 let mut message_stream = consumer.stream();
200 for produced in expected {
201 let borrowed_message =
202 tokio::time::timeout(Duration::from_secs(10), message_stream.next())
203 .await
204 .unwrap()
205 .unwrap();
206
207 assert_eq!(
208 produced,
209 borrowed_message
210 .unwrap()
211 .payload_view::<str>()
212 .unwrap()
213 .unwrap()
214 );
215 }
216
217 Ok(())
218 }
219}