Skip to main content

streamling_e2e/resources/
kafka.rs

1//! Kafka resource manager for creating isolated topics per test.
2
3use crate::{E2eError, Result};
4use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
5use rdkafka::client::DefaultClientContext;
6use rdkafka::config::ClientConfig;
7use rdkafka::consumer::{Consumer, StreamConsumer};
8use rdkafka::message::{Header, Message, OwnedHeaders};
9use rdkafka::producer::{FutureProducer, FutureRecord};
10use rdkafka::util::Timeout;
11use schema_registry_converter::async_impl::avro::{AvroDecoder, AvroEncoder};
12use schema_registry_converter::async_impl::schema_registry::{post_schema, SrSettings};
13use schema_registry_converter::schema_registry_common::{
14    SchemaType, SubjectNameStrategy, SuppliedSchema,
15};
16use serde::Serialize;
17use std::time::Duration;
18use tracing::info;
19
20/// Kafka resource manager
21pub struct KafkaResource {
22    /// Kafka broker address
23    pub broker: String,
24    /// Schema registry URL
25    pub schema_registry_url: String,
26    /// Name of the isolated topic
27    pub topic: String,
28    /// Admin client for topic management (kept for potential future use)
29    #[allow(dead_code)]
30    admin_client: AdminClient<DefaultClientContext>,
31    /// Producer for sending messages
32    producer: FutureProducer,
33    /// Schema registry settings
34    sr_settings: SrSettings,
35}
36
37impl KafkaResource {
38    /// Create a new Kafka resource with an isolated topic
39    pub async fn new(broker: &str, schema_registry_url: &str, topic: &str) -> Result<Self> {
40        // Create admin client
41        let admin_client: AdminClient<DefaultClientContext> = ClientConfig::new()
42            .set("bootstrap.servers", broker)
43            .set("socket.timeout.ms", "10000")
44            .create()
45            .map_err(|e| E2eError::Kafka(e.to_string()))?;
46
47        // Create the topic
48        let new_topic = NewTopic::new(topic, 1, TopicReplication::Fixed(1));
49        let opts = AdminOptions::new().request_timeout(Some(Duration::from_secs(30)));
50
51        admin_client
52            .create_topics(&[new_topic], &opts)
53            .await
54            .map_err(|e| E2eError::Kafka(e.to_string()))?;
55
56        info!("Created Kafka topic: {}", topic);
57
58        // Create producer
59        let producer: FutureProducer = ClientConfig::new()
60            .set("bootstrap.servers", broker)
61            .set("message.timeout.ms", "30000")
62            .create()
63            .map_err(|e| E2eError::Kafka(e.to_string()))?;
64
65        // Schema registry settings
66        let sr_settings = SrSettings::new(schema_registry_url.to_string());
67
68        Ok(Self {
69            broker: broker.to_string(),
70            schema_registry_url: schema_registry_url.to_string(),
71            topic: topic.to_string(),
72            admin_client,
73            producer,
74            sr_settings,
75        })
76    }
77
78    /// Register an Avro schema for the topic
79    pub async fn register_schema(&self, schema: &str) -> Result<u32> {
80        let subject_strategy = SubjectNameStrategy::TopicNameStrategy(self.topic.clone(), false);
81
82        let supplied_schema = SuppliedSchema {
83            name: None,
84            schema_type: SchemaType::Avro,
85            schema: schema.to_string(),
86            references: vec![],
87        };
88
89        let result = post_schema(
90            &self.sr_settings,
91            subject_strategy.get_subject().unwrap(),
92            supplied_schema,
93        )
94        .await
95        .map_err(|e| E2eError::Kafka(e.to_string()))?;
96
97        info!(
98            "Registered schema for topic {}: id={}",
99            self.topic, result.id
100        );
101        Ok(result.id)
102    }
103
104    /// Produce JSON records to the topic (without schema registry)
105    pub async fn produce_json_records<T: Serialize>(&self, records: &[T]) -> Result<()> {
106        for record in records {
107            let payload = serde_json::to_vec(record).map_err(|e| E2eError::Kafka(e.to_string()))?;
108
109            let kafka_record = FutureRecord::to(&self.topic).payload(&payload).key("");
110
111            self.producer
112                .send(kafka_record, Timeout::After(Duration::from_secs(15)))
113                .await
114                .map_err(|(e, _)| E2eError::Kafka(e.to_string()))?;
115        }
116
117        info!(
118            "Produced {} JSON records to topic {}",
119            records.len(),
120            self.topic
121        );
122        Ok(())
123    }
124
125    /// Produce Avro records to the topic (with schema registry)
126    /// Uses dbz.op='c' (create/insert) header by default
127    pub async fn produce_avro_records<T: Serialize>(&self, records: &[T]) -> Result<()> {
128        self.produce_avro_records_with_op(records, "c").await
129    }
130
131    /// Produce Avro records with a specific debezium operation type
132    /// op: "c" = create/insert, "u" = update, "d" = delete
133    pub async fn produce_avro_records_with_op<T: Serialize>(
134        &self,
135        records: &[T],
136        op: &str,
137    ) -> Result<()> {
138        let encoder = AvroEncoder::new(self.sr_settings.clone());
139        let subject_strategy = SubjectNameStrategy::TopicNameStrategy(self.topic.clone(), false);
140
141        for record in records {
142            let payload = encoder
143                .encode_struct(record, &subject_strategy)
144                .await
145                .map_err(|e| E2eError::Kafka(e.to_string()))?;
146
147            let headers = OwnedHeaders::new().insert(Header {
148                key: "dbz.op",
149                value: Some(op),
150            });
151
152            let kafka_record = FutureRecord::to(&self.topic)
153                .payload(&payload)
154                .key("")
155                .headers(headers);
156
157            self.producer
158                .send(kafka_record, Timeout::After(Duration::from_secs(15)))
159                .await
160                .map_err(|(e, _)| E2eError::Kafka(e.to_string()))?;
161        }
162
163        info!(
164            "Produced {} Avro records to topic {} (op={})",
165            records.len(),
166            self.topic,
167            op
168        );
169        Ok(())
170    }
171
172    /// Produce raw bytes to the topic
173    pub async fn produce_raw(&self, records: &[Vec<u8>]) -> Result<()> {
174        for payload in records {
175            let kafka_record = FutureRecord::to(&self.topic)
176                .payload(payload.as_slice())
177                .key("");
178
179            self.producer
180                .send(kafka_record, Timeout::After(Duration::from_secs(15)))
181                .await
182                .map_err(|(e, _)| E2eError::Kafka(e.to_string()))?;
183        }
184
185        info!(
186            "Produced {} raw records to topic {}",
187            records.len(),
188            self.topic
189        );
190        Ok(())
191    }
192
193    /// Inspect messages in an existing topic without creating it
194    /// Returns (messages, highest_offset) where messages are (offset, key, id_string) tuples
195    pub async fn inspect_topic_messages(
196        broker: &str,
197        schema_registry_url: &str,
198        topic: &str,
199        max_messages: usize,
200        max_show: usize,
201    ) -> Result<(Vec<(i64, String, String)>, Option<i64>)> {
202        use apache_avro::types::Value;
203        use rdkafka::Offset;
204
205        // Create consumer for inspection
206        let consumer: StreamConsumer = ClientConfig::new()
207            .set("bootstrap.servers", broker)
208            .set("group.id", format!("inspect-{}", uuid::Uuid::new_v4()))
209            .set("enable.partition.eof", "false")
210            .set("session.timeout.ms", "6000")
211            .set("enable.auto.commit", "false")
212            .set("auto.offset.reset", "earliest")
213            .create()
214            .map_err(|e| E2eError::Kafka(e.to_string()))?;
215
216        // Check if topic exists
217        let topic_exists = match consumer.fetch_metadata(Some(topic), Duration::from_secs(5)) {
218            Ok(metadata) => {
219                !metadata.topics().is_empty() && !metadata.topics()[0].partitions().is_empty()
220            }
221            Err(_) => false,
222        };
223
224        if !topic_exists {
225            return Ok((vec![], None));
226        }
227
228        consumer
229            .subscribe(&[topic])
230            .map_err(|e| E2eError::Kafka(e.to_string()))?;
231
232        // Poll to trigger partition assignment
233        let assignment_timeout = Duration::from_secs(10);
234        let assignment_start = std::time::Instant::now();
235        let mut assignment = None;
236        let mut poll_count = 0;
237
238        while assignment_start.elapsed() < assignment_timeout {
239            if let Ok(assigned) = consumer.assignment() {
240                if assigned.count() > 0 {
241                    assignment = Some(assigned);
242                    break;
243                }
244            }
245
246            match tokio::time::timeout(Duration::from_millis(500), consumer.recv()).await {
247                Ok(Ok(msg)) => {
248                    if let Ok(assigned) = consumer.assignment() {
249                        if assigned.count() > 0 {
250                            assignment = Some(assigned);
251                            break;
252                        }
253                    }
254                    drop(msg);
255                }
256                Ok(Err(e)) => {
257                    let err_str = e.to_string();
258                    if err_str.contains("Partition EOF") {
259                        if let Ok(assigned) = consumer.assignment() {
260                            if assigned.count() > 0 {
261                                assignment = Some(assigned);
262                                break;
263                            }
264                        }
265                    }
266                }
267                Err(_) => {}
268            }
269
270            poll_count += 1;
271            if poll_count % 5 == 0 {
272                tokio::time::sleep(Duration::from_millis(100)).await;
273            }
274        }
275
276        let mut results = Vec::new();
277        let mut highest_offset: Option<i64> = None;
278
279        if let Some(assignment_list) = assignment {
280            // Seek to beginning
281            for tp in assignment_list.elements() {
282                let _ = consumer.seek(
283                    tp.topic(),
284                    tp.partition(),
285                    Offset::Beginning,
286                    Duration::from_secs(5),
287                );
288            }
289
290            // Create Avro decoder
291            let sr_settings = SrSettings::new(schema_registry_url.to_string());
292            let decoder = AvroDecoder::new(sr_settings);
293
294            // Consume messages
295            let timeout = Duration::from_secs(5);
296            let start = std::time::Instant::now();
297            let mut message_count = 0;
298
299            while message_count < max_messages && start.elapsed() < timeout {
300                match tokio::time::timeout(Duration::from_millis(1000), consumer.recv()).await {
301                    Ok(Ok(msg)) => {
302                        message_count += 1;
303                        let offset = msg.offset();
304
305                        // Track highest offset
306                        highest_offset =
307                            Some(highest_offset.map(|h| h.max(offset)).unwrap_or(offset));
308
309                        let key = msg
310                            .key()
311                            .map(|k| String::from_utf8_lossy(k).to_string())
312                            .unwrap_or_default();
313
314                        // Decode Avro message to extract ID
315                        let id_str = match msg.payload() {
316                            Some(payload) => match decoder.decode(Some(payload)).await {
317                                Ok(decoded_result) => {
318                                    // Unwrap Union if present
319                                    let value = match &decoded_result.value {
320                                        Value::Union(_, inner) => inner.as_ref(),
321                                        other => other,
322                                    };
323
324                                    match value {
325                                        Value::Record(fields) => {
326                                            let parts: Vec<String> = fields
327                                                .iter()
328                                                .map(|(name, fv)| {
329                                                    let val_str = match fv {
330                                                        Value::Int(i) => i.to_string(),
331                                                        Value::Long(l) => l.to_string(),
332                                                        Value::String(s) => s.clone(),
333                                                        Value::Union(_, v) => format!("{:?}", v),
334                                                        _ => format!("{:?}", fv),
335                                                    };
336                                                    format!("{}={}", name, val_str)
337                                                })
338                                                .collect();
339                                            if parts.is_empty() {
340                                                format!("[{} bytes]", payload.len())
341                                            } else {
342                                                parts.join(",")
343                                            }
344                                        }
345                                        _ => format!("[{} bytes]", payload.len()),
346                                    }
347                                }
348                                Err(_) => format!("[{} bytes]", payload.len()),
349                            },
350                            None => "[0 bytes]".to_string(),
351                        };
352
353                        if message_count <= max_show {
354                            results.push((offset, key, id_str));
355                        }
356                    }
357                    Ok(Err(e)) => {
358                        if e.to_string().contains("Partition EOF") {
359                            break;
360                        }
361                        break;
362                    }
363                    Err(_) => break,
364                }
365            }
366        }
367
368        Ok((results, highest_offset))
369    }
370
371    /// Delete the topic (can be called explicitly if needed)
372    #[allow(dead_code)]
373    pub async fn cleanup(&self) -> Result<()> {
374        let opts = AdminOptions::new().request_timeout(Some(Duration::from_secs(30)));
375
376        self.admin_client
377            .delete_topics(&[&self.topic], &opts)
378            .await
379            .map_err(|e| E2eError::Kafka(e.to_string()))?;
380
381        info!("Deleted Kafka topic: {}", self.topic);
382        Ok(())
383    }
384}
385
386impl Drop for KafkaResource {
387    fn drop(&mut self) {
388        let topic = self.topic.clone();
389        let broker = self.broker.clone();
390
391        let delete = async move {
392            let admin_client: std::result::Result<AdminClient<DefaultClientContext>, _> =
393                ClientConfig::new()
394                    .set("bootstrap.servers", &broker)
395                    .set("socket.timeout.ms", "5000")
396                    .create();
397
398            if let Ok(client) = admin_client {
399                let opts = AdminOptions::new().request_timeout(Some(Duration::from_secs(10)));
400                if let Err(e) = client.delete_topics(&[&topic], &opts).await {
401                    tracing::warn!("Failed to delete Kafka topic {}: {}", topic, e);
402                } else {
403                    info!("Deleted Kafka topic: {}", topic);
404                }
405            }
406        };
407
408        // Run cleanup on a dedicated thread with its own runtime so it works regardless of the
409        // caller's runtime flavor (current-thread or multi-thread) and doesn't race shutdown.
410        std::thread::spawn(move || {
411            tokio::runtime::Builder::new_current_thread()
412                .enable_all()
413                .build()
414                .expect("cleanup runtime")
415                .block_on(delete);
416        })
417        .join()
418        .ok();
419    }
420}