streamling_e2e/resources/
kafka.rs1use crate::{E2eError, Result};
4use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
5use rdkafka::client::DefaultClientContext;
6use rdkafka::config::ClientConfig;
7use rdkafka::consumer::{Consumer, StreamConsumer};
8use rdkafka::message::{Header, Message, OwnedHeaders};
9use rdkafka::producer::{FutureProducer, FutureRecord};
10use rdkafka::util::Timeout;
11use schema_registry_converter::async_impl::avro::{AvroDecoder, AvroEncoder};
12use schema_registry_converter::async_impl::schema_registry::{post_schema, SrSettings};
13use schema_registry_converter::schema_registry_common::{
14 SchemaType, SubjectNameStrategy, SuppliedSchema,
15};
16use serde::Serialize;
17use std::time::Duration;
18use tracing::info;
19
20pub struct KafkaResource {
22 pub broker: String,
24 pub schema_registry_url: String,
26 pub topic: String,
28 #[allow(dead_code)]
30 admin_client: AdminClient<DefaultClientContext>,
31 producer: FutureProducer,
33 sr_settings: SrSettings,
35}
36
37impl KafkaResource {
38 pub async fn new(broker: &str, schema_registry_url: &str, topic: &str) -> Result<Self> {
40 let admin_client: AdminClient<DefaultClientContext> = ClientConfig::new()
42 .set("bootstrap.servers", broker)
43 .set("socket.timeout.ms", "10000")
44 .create()
45 .map_err(|e| E2eError::Kafka(e.to_string()))?;
46
47 let new_topic = NewTopic::new(topic, 1, TopicReplication::Fixed(1));
49 let opts = AdminOptions::new().request_timeout(Some(Duration::from_secs(30)));
50
51 admin_client
52 .create_topics(&[new_topic], &opts)
53 .await
54 .map_err(|e| E2eError::Kafka(e.to_string()))?;
55
56 info!("Created Kafka topic: {}", topic);
57
58 let producer: FutureProducer = ClientConfig::new()
60 .set("bootstrap.servers", broker)
61 .set("message.timeout.ms", "30000")
62 .create()
63 .map_err(|e| E2eError::Kafka(e.to_string()))?;
64
65 let sr_settings = SrSettings::new(schema_registry_url.to_string());
67
68 Ok(Self {
69 broker: broker.to_string(),
70 schema_registry_url: schema_registry_url.to_string(),
71 topic: topic.to_string(),
72 admin_client,
73 producer,
74 sr_settings,
75 })
76 }
77
78 pub async fn register_schema(&self, schema: &str) -> Result<u32> {
80 let subject_strategy = SubjectNameStrategy::TopicNameStrategy(self.topic.clone(), false);
81
82 let supplied_schema = SuppliedSchema {
83 name: None,
84 schema_type: SchemaType::Avro,
85 schema: schema.to_string(),
86 references: vec![],
87 };
88
89 let result = post_schema(
90 &self.sr_settings,
91 subject_strategy.get_subject().unwrap(),
92 supplied_schema,
93 )
94 .await
95 .map_err(|e| E2eError::Kafka(e.to_string()))?;
96
97 info!(
98 "Registered schema for topic {}: id={}",
99 self.topic, result.id
100 );
101 Ok(result.id)
102 }
103
104 pub async fn produce_json_records<T: Serialize>(&self, records: &[T]) -> Result<()> {
106 for record in records {
107 let payload = serde_json::to_vec(record).map_err(|e| E2eError::Kafka(e.to_string()))?;
108
109 let kafka_record = FutureRecord::to(&self.topic).payload(&payload).key("");
110
111 self.producer
112 .send(kafka_record, Timeout::After(Duration::from_secs(15)))
113 .await
114 .map_err(|(e, _)| E2eError::Kafka(e.to_string()))?;
115 }
116
117 info!(
118 "Produced {} JSON records to topic {}",
119 records.len(),
120 self.topic
121 );
122 Ok(())
123 }
124
125 pub async fn produce_avro_records<T: Serialize>(&self, records: &[T]) -> Result<()> {
128 self.produce_avro_records_with_op(records, "c").await
129 }
130
131 pub async fn produce_avro_records_with_op<T: Serialize>(
134 &self,
135 records: &[T],
136 op: &str,
137 ) -> Result<()> {
138 let encoder = AvroEncoder::new(self.sr_settings.clone());
139 let subject_strategy = SubjectNameStrategy::TopicNameStrategy(self.topic.clone(), false);
140
141 for record in records {
142 let payload = encoder
143 .encode_struct(record, &subject_strategy)
144 .await
145 .map_err(|e| E2eError::Kafka(e.to_string()))?;
146
147 let headers = OwnedHeaders::new().insert(Header {
148 key: "dbz.op",
149 value: Some(op),
150 });
151
152 let kafka_record = FutureRecord::to(&self.topic)
153 .payload(&payload)
154 .key("")
155 .headers(headers);
156
157 self.producer
158 .send(kafka_record, Timeout::After(Duration::from_secs(15)))
159 .await
160 .map_err(|(e, _)| E2eError::Kafka(e.to_string()))?;
161 }
162
163 info!(
164 "Produced {} Avro records to topic {} (op={})",
165 records.len(),
166 self.topic,
167 op
168 );
169 Ok(())
170 }
171
172 pub async fn produce_raw(&self, records: &[Vec<u8>]) -> Result<()> {
174 for payload in records {
175 let kafka_record = FutureRecord::to(&self.topic)
176 .payload(payload.as_slice())
177 .key("");
178
179 self.producer
180 .send(kafka_record, Timeout::After(Duration::from_secs(15)))
181 .await
182 .map_err(|(e, _)| E2eError::Kafka(e.to_string()))?;
183 }
184
185 info!(
186 "Produced {} raw records to topic {}",
187 records.len(),
188 self.topic
189 );
190 Ok(())
191 }
192
193 pub async fn inspect_topic_messages(
196 broker: &str,
197 schema_registry_url: &str,
198 topic: &str,
199 max_messages: usize,
200 max_show: usize,
201 ) -> Result<(Vec<(i64, String, String)>, Option<i64>)> {
202 use apache_avro::types::Value;
203 use rdkafka::Offset;
204
205 let consumer: StreamConsumer = ClientConfig::new()
207 .set("bootstrap.servers", broker)
208 .set("group.id", format!("inspect-{}", uuid::Uuid::new_v4()))
209 .set("enable.partition.eof", "false")
210 .set("session.timeout.ms", "6000")
211 .set("enable.auto.commit", "false")
212 .set("auto.offset.reset", "earliest")
213 .create()
214 .map_err(|e| E2eError::Kafka(e.to_string()))?;
215
216 let topic_exists = match consumer.fetch_metadata(Some(topic), Duration::from_secs(5)) {
218 Ok(metadata) => {
219 !metadata.topics().is_empty() && !metadata.topics()[0].partitions().is_empty()
220 }
221 Err(_) => false,
222 };
223
224 if !topic_exists {
225 return Ok((vec![], None));
226 }
227
228 consumer
229 .subscribe(&[topic])
230 .map_err(|e| E2eError::Kafka(e.to_string()))?;
231
232 let assignment_timeout = Duration::from_secs(10);
234 let assignment_start = std::time::Instant::now();
235 let mut assignment = None;
236 let mut poll_count = 0;
237
238 while assignment_start.elapsed() < assignment_timeout {
239 if let Ok(assigned) = consumer.assignment() {
240 if assigned.count() > 0 {
241 assignment = Some(assigned);
242 break;
243 }
244 }
245
246 match tokio::time::timeout(Duration::from_millis(500), consumer.recv()).await {
247 Ok(Ok(msg)) => {
248 if let Ok(assigned) = consumer.assignment() {
249 if assigned.count() > 0 {
250 assignment = Some(assigned);
251 break;
252 }
253 }
254 drop(msg);
255 }
256 Ok(Err(e)) => {
257 let err_str = e.to_string();
258 if err_str.contains("Partition EOF") {
259 if let Ok(assigned) = consumer.assignment() {
260 if assigned.count() > 0 {
261 assignment = Some(assigned);
262 break;
263 }
264 }
265 }
266 }
267 Err(_) => {}
268 }
269
270 poll_count += 1;
271 if poll_count % 5 == 0 {
272 tokio::time::sleep(Duration::from_millis(100)).await;
273 }
274 }
275
276 let mut results = Vec::new();
277 let mut highest_offset: Option<i64> = None;
278
279 if let Some(assignment_list) = assignment {
280 for tp in assignment_list.elements() {
282 let _ = consumer.seek(
283 tp.topic(),
284 tp.partition(),
285 Offset::Beginning,
286 Duration::from_secs(5),
287 );
288 }
289
290 let sr_settings = SrSettings::new(schema_registry_url.to_string());
292 let decoder = AvroDecoder::new(sr_settings);
293
294 let timeout = Duration::from_secs(5);
296 let start = std::time::Instant::now();
297 let mut message_count = 0;
298
299 while message_count < max_messages && start.elapsed() < timeout {
300 match tokio::time::timeout(Duration::from_millis(1000), consumer.recv()).await {
301 Ok(Ok(msg)) => {
302 message_count += 1;
303 let offset = msg.offset();
304
305 highest_offset =
307 Some(highest_offset.map(|h| h.max(offset)).unwrap_or(offset));
308
309 let key = msg
310 .key()
311 .map(|k| String::from_utf8_lossy(k).to_string())
312 .unwrap_or_default();
313
314 let id_str = match msg.payload() {
316 Some(payload) => match decoder.decode(Some(payload)).await {
317 Ok(decoded_result) => {
318 let value = match &decoded_result.value {
320 Value::Union(_, inner) => inner.as_ref(),
321 other => other,
322 };
323
324 match value {
325 Value::Record(fields) => {
326 let parts: Vec<String> = fields
327 .iter()
328 .map(|(name, fv)| {
329 let val_str = match fv {
330 Value::Int(i) => i.to_string(),
331 Value::Long(l) => l.to_string(),
332 Value::String(s) => s.clone(),
333 Value::Union(_, v) => format!("{:?}", v),
334 _ => format!("{:?}", fv),
335 };
336 format!("{}={}", name, val_str)
337 })
338 .collect();
339 if parts.is_empty() {
340 format!("[{} bytes]", payload.len())
341 } else {
342 parts.join(",")
343 }
344 }
345 _ => format!("[{} bytes]", payload.len()),
346 }
347 }
348 Err(_) => format!("[{} bytes]", payload.len()),
349 },
350 None => "[0 bytes]".to_string(),
351 };
352
353 if message_count <= max_show {
354 results.push((offset, key, id_str));
355 }
356 }
357 Ok(Err(e)) => {
358 if e.to_string().contains("Partition EOF") {
359 break;
360 }
361 break;
362 }
363 Err(_) => break,
364 }
365 }
366 }
367
368 Ok((results, highest_offset))
369 }
370
371 #[allow(dead_code)]
373 pub async fn cleanup(&self) -> Result<()> {
374 let opts = AdminOptions::new().request_timeout(Some(Duration::from_secs(30)));
375
376 self.admin_client
377 .delete_topics(&[&self.topic], &opts)
378 .await
379 .map_err(|e| E2eError::Kafka(e.to_string()))?;
380
381 info!("Deleted Kafka topic: {}", self.topic);
382 Ok(())
383 }
384}
385
386impl Drop for KafkaResource {
387 fn drop(&mut self) {
388 let topic = self.topic.clone();
389 let broker = self.broker.clone();
390
391 let delete = async move {
392 let admin_client: std::result::Result<AdminClient<DefaultClientContext>, _> =
393 ClientConfig::new()
394 .set("bootstrap.servers", &broker)
395 .set("socket.timeout.ms", "5000")
396 .create();
397
398 if let Ok(client) = admin_client {
399 let opts = AdminOptions::new().request_timeout(Some(Duration::from_secs(10)));
400 if let Err(e) = client.delete_topics(&[&topic], &opts).await {
401 tracing::warn!("Failed to delete Kafka topic {}: {}", topic, e);
402 } else {
403 info!("Deleted Kafka topic: {}", topic);
404 }
405 }
406 };
407
408 std::thread::spawn(move || {
411 tokio::runtime::Builder::new_current_thread()
412 .enable_all()
413 .build()
414 .expect("cleanup runtime")
415 .block_on(delete);
416 })
417 .join()
418 .ok();
419 }
420}