swiftide_integrations/kafka/
loader.rs

1use futures_util::{StreamExt as _, stream};
2use rdkafka::{
3    Message,
4    consumer::{Consumer, StreamConsumer},
5    message::BorrowedMessage,
6};
7use swiftide_core::{Loader, indexing::IndexingStream, indexing::Node};
8
9use super::Kafka;
10
11impl Loader for Kafka {
12    #[tracing::instrument]
13    fn into_stream(self) -> IndexingStream {
14        let client_config = self.client_config;
15        let topic = self.topic.clone();
16
17        let consumer: StreamConsumer = client_config
18            .create()
19            .expect("Failed to create Kafka consumer");
20
21        consumer
22            .subscribe(&[&topic])
23            .expect("Failed to subscribe to topic");
24
25        let swiftide_stream = stream::unfold(consumer, |consumer| async move {
26            loop {
27                match consumer.recv().await {
28                    Ok(message) => {
29                        // only handle Some(Ok(s))
30                        if let Some(Ok(payload)) = message.payload_view::<str>() {
31                            let mut node = Node::new(payload);
32                            msg_metadata(&mut node, &message);
33                            tracing::trace!(?node, ?payload, "received message");
34                            return Some((Ok(node), consumer));
35                        }
36                        // otherwise, like a message with an invalid payload or payload is None
37                        tracing::debug!("Skipping message with invalid payload");
38                    }
39                    Err(e) => return Some((Err(anyhow::Error::from(e)), consumer)),
40                }
41            }
42        });
43
44        swiftide_stream.boxed().into()
45    }
46
47    fn into_stream_boxed(self: Box<Self>) -> IndexingStream {
48        (*self).into_stream()
49    }
50}
51
52fn msg_metadata(node: &mut Node, message: &BorrowedMessage) {
53    // Add Kafka-specific metadata
54    node.metadata
55        .insert("kafka_topic", message.topic().to_string());
56
57    node.metadata
58        .insert("kafka_partition", message.partition().to_string());
59    node.metadata
60        .insert("kafka_offset", message.offset().to_string());
61
62    // Add timestamp if present
63    if let Some(timestamp) = message.timestamp().to_millis() {
64        node.metadata
65            .insert("kafka_timestamp", timestamp.to_string());
66    }
67
68    // Add key if present
69    if let Some(Ok(key)) = message.key_view::<str>() {
70        node.metadata.insert("kafka_key", key.to_string());
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use std::time::Duration;
77
78    use super::*;
79    use crate::kafka::Kafka;
80    use anyhow::Result;
81    use futures_util::TryStreamExt;
82    use rdkafka::{
83        ClientConfig,
84        admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
85        client::DefaultClientContext,
86        producer::{FutureProducer, FutureRecord, Producer},
87    };
88    use testcontainers::{ContainerAsync, runners::AsyncRunner};
89    use testcontainers_modules::kafka::apache::{self};
90
91    struct KafkaBroker {
92        _broker: ContainerAsync<apache::Kafka>,
93        partitions: i32,
94        replicas: i32,
95        client_config: ClientConfig,
96    }
97
98    impl KafkaBroker {
99        pub async fn start() -> Result<Self> {
100            static PARTITIONS: i32 = 1;
101            static REPLICAS: i32 = 1;
102
103            let kafka_node = apache::Kafka::default().start().await?;
104            let bootstrap_servers = format!(
105                "127.0.0.1:{}",
106                kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await?
107            );
108
109            let mut client_config = ClientConfig::new();
110            client_config.set("bootstrap.servers", &bootstrap_servers);
111            client_config.set("group.id", "group_id");
112            client_config.set("auto.offset.reset", "earliest");
113
114            let broker = KafkaBroker {
115                _broker: kafka_node,
116                client_config,
117                partitions: PARTITIONS,
118                replicas: REPLICAS,
119            };
120
121            Ok(broker)
122        }
123
124        pub async fn create_topic(&self, topic: impl AsRef<str>) -> Result<()> {
125            let admin = self.admin_client();
126            admin
127                .create_topics(
128                    &[NewTopic {
129                        name: topic.as_ref(),
130                        num_partitions: self.partitions,
131                        replication: TopicReplication::Fixed(self.replicas),
132                        config: vec![],
133                    }],
134                    &AdminOptions::default(),
135                )
136                .await
137                .expect("topic creation failed");
138            Ok(())
139        }
140
141        fn admin_client(&self) -> AdminClient<DefaultClientContext> {
142            self.client_config.create().unwrap()
143        }
144
145        fn producer(&self) -> FutureProducer {
146            self.client_config.create().unwrap()
147        }
148    }
149
150    #[test_log::test(tokio::test(flavor = "multi_thread"))]
151    async fn test_kafka_loader() {
152        static TOPIC_NAME: &str = "topic";
153        let kafka_broker = KafkaBroker::start().await.unwrap();
154        kafka_broker.create_topic(TOPIC_NAME).await.unwrap();
155
156        let producer = kafka_broker.producer();
157        producer
158            .send(
159                FutureRecord::to(TOPIC_NAME).payload("payload").key("key"),
160                Duration::from_secs(0),
161            )
162            .await
163            .unwrap();
164        producer.flush(Duration::from_secs(0)).unwrap();
165
166        let loader = Kafka::builder()
167            .client_config(kafka_broker.client_config.clone())
168            .topic(TOPIC_NAME)
169            .build()
170            .unwrap();
171
172        let node: Node = loader.into_stream().try_next().await.unwrap().unwrap();
173        assert_eq!(node.chunk, "payload");
174    }
175}