swiftide_integrations/kafka/
loader.rs

1use futures_util::{StreamExt as _, stream};
2use rdkafka::{
3    Message,
4    consumer::{Consumer, StreamConsumer},
5    message::BorrowedMessage,
6};
7use swiftide_core::{Loader, indexing::IndexingStream, indexing::Node};
8
9use super::Kafka;
10
11impl Loader for Kafka {
12    type Output = String;
13
14    #[tracing::instrument]
15    fn into_stream(self) -> IndexingStream<String> {
16        let client_config = self.client_config;
17        let topic = self.topic.clone();
18
19        let consumer: StreamConsumer = client_config
20            .create()
21            .expect("Failed to create Kafka consumer");
22
23        consumer
24            .subscribe(&[&topic])
25            .expect("Failed to subscribe to topic");
26
27        let swiftide_stream = stream::unfold(consumer, |consumer| async move {
28            loop {
29                match consumer.recv().await {
30                    Ok(message) => {
31                        // only handle Some(Ok(s))
32                        if let Some(Ok(payload)) = message.payload_view::<str>() {
33                            let mut node = Node::<String>::new(payload);
34                            msg_metadata(&mut node, &message);
35                            tracing::trace!(?node, ?payload, "received message");
36                            return Some((Ok(node), consumer));
37                        }
38                        // otherwise, like a message with an invalid payload or payload is None
39                        tracing::debug!("Skipping message with invalid payload");
40                    }
41                    Err(e) => return Some((Err(anyhow::Error::from(e)), consumer)),
42                }
43            }
44        });
45
46        swiftide_stream.boxed().into()
47    }
48
49    fn into_stream_boxed(self: Box<Self>) -> IndexingStream<String> {
50        (*self).into_stream()
51    }
52}
53
54fn msg_metadata(node: &mut Node<String>, message: &BorrowedMessage) {
55    // Add Kafka-specific metadata
56    node.metadata
57        .insert("kafka_topic", message.topic().to_string());
58
59    node.metadata
60        .insert("kafka_partition", message.partition().to_string());
61    node.metadata
62        .insert("kafka_offset", message.offset().to_string());
63
64    // Add timestamp if present
65    if let Some(timestamp) = message.timestamp().to_millis() {
66        node.metadata
67            .insert("kafka_timestamp", timestamp.to_string());
68    }
69
70    // Add key if present
71    if let Some(Ok(key)) = message.key_view::<str>() {
72        node.metadata.insert("kafka_key", key.to_string());
73    }
74}
75
76#[cfg(test)]
77mod tests {
78    use std::time::Duration;
79
80    use super::*;
81    use crate::kafka::Kafka;
82    use anyhow::Result;
83    use futures_util::TryStreamExt;
84    use rdkafka::{
85        ClientConfig,
86        admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
87        client::DefaultClientContext,
88        producer::{FutureProducer, FutureRecord, Producer},
89    };
90    use swiftide_core::indexing::TextNode;
91    use testcontainers::{ContainerAsync, runners::AsyncRunner};
92    use testcontainers_modules::kafka::apache::{self};
93
94    struct KafkaBroker {
95        _broker: ContainerAsync<apache::Kafka>,
96        partitions: i32,
97        replicas: i32,
98        client_config: ClientConfig,
99    }
100
101    impl KafkaBroker {
102        pub async fn start() -> Result<Self> {
103            static PARTITIONS: i32 = 1;
104            static REPLICAS: i32 = 1;
105
106            let kafka_node = apache::Kafka::default().start().await?;
107            let bootstrap_servers = format!(
108                "127.0.0.1:{}",
109                kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await?
110            );
111
112            let mut client_config = ClientConfig::new();
113            client_config.set("bootstrap.servers", &bootstrap_servers);
114            client_config.set("group.id", "group_id");
115            client_config.set("auto.offset.reset", "earliest");
116
117            let broker = KafkaBroker {
118                _broker: kafka_node,
119                client_config,
120                partitions: PARTITIONS,
121                replicas: REPLICAS,
122            };
123
124            Ok(broker)
125        }
126
127        pub async fn create_topic(&self, topic: impl AsRef<str>) -> Result<()> {
128            let admin = self.admin_client();
129            admin
130                .create_topics(
131                    &[NewTopic {
132                        name: topic.as_ref(),
133                        num_partitions: self.partitions,
134                        replication: TopicReplication::Fixed(self.replicas),
135                        config: vec![],
136                    }],
137                    &AdminOptions::default(),
138                )
139                .await
140                .expect("topic creation failed");
141            Ok(())
142        }
143
144        fn admin_client(&self) -> AdminClient<DefaultClientContext> {
145            self.client_config.create().unwrap()
146        }
147
148        fn producer(&self) -> FutureProducer {
149            self.client_config.create().unwrap()
150        }
151    }
152
153    #[test_log::test(tokio::test(flavor = "multi_thread"))]
154    async fn test_kafka_loader() {
155        static TOPIC_NAME: &str = "topic";
156        let kafka_broker = KafkaBroker::start().await.unwrap();
157        kafka_broker.create_topic(TOPIC_NAME).await.unwrap();
158
159        let producer = kafka_broker.producer();
160        producer
161            .send(
162                FutureRecord::to(TOPIC_NAME).payload("payload").key("key"),
163                Duration::from_secs(0),
164            )
165            .await
166            .unwrap();
167        producer.flush(Duration::from_secs(0)).unwrap();
168
169        let loader = Kafka::builder()
170            .client_config(kafka_broker.client_config.clone())
171            .topic(TOPIC_NAME)
172            .build()
173            .unwrap();
174
175        let node: TextNode = loader.into_stream().try_next().await.unwrap().unwrap();
176        assert_eq!(node.chunk, "payload");
177    }
178}