swiftide_integrations/kafka/
loader.rs1use futures_util::{StreamExt as _, stream};
2use rdkafka::{
3 Message,
4 consumer::{Consumer, StreamConsumer},
5 message::BorrowedMessage,
6};
7use swiftide_core::{Loader, indexing::IndexingStream, indexing::Node};
8
9use super::Kafka;
10
11impl Loader for Kafka {
12 type Output = String;
13
14 #[tracing::instrument]
15 fn into_stream(self) -> IndexingStream<String> {
16 let client_config = self.client_config;
17 let topic = self.topic.clone();
18
19 let consumer: StreamConsumer = client_config
20 .create()
21 .expect("Failed to create Kafka consumer");
22
23 consumer
24 .subscribe(&[&topic])
25 .expect("Failed to subscribe to topic");
26
27 let swiftide_stream = stream::unfold(consumer, |consumer| async move {
28 loop {
29 match consumer.recv().await {
30 Ok(message) => {
31 if let Some(Ok(payload)) = message.payload_view::<str>() {
33 let mut node = Node::<String>::new(payload);
34 msg_metadata(&mut node, &message);
35 tracing::trace!(?node, ?payload, "received message");
36 return Some((Ok(node), consumer));
37 }
38 tracing::debug!("Skipping message with invalid payload");
40 }
41 Err(e) => return Some((Err(anyhow::Error::from(e)), consumer)),
42 }
43 }
44 });
45
46 swiftide_stream.boxed().into()
47 }
48
49 fn into_stream_boxed(self: Box<Self>) -> IndexingStream<String> {
50 (*self).into_stream()
51 }
52}
53
54fn msg_metadata(node: &mut Node<String>, message: &BorrowedMessage) {
55 node.metadata
57 .insert("kafka_topic", message.topic().to_string());
58
59 node.metadata
60 .insert("kafka_partition", message.partition().to_string());
61 node.metadata
62 .insert("kafka_offset", message.offset().to_string());
63
64 if let Some(timestamp) = message.timestamp().to_millis() {
66 node.metadata
67 .insert("kafka_timestamp", timestamp.to_string());
68 }
69
70 if let Some(Ok(key)) = message.key_view::<str>() {
72 node.metadata.insert("kafka_key", key.to_string());
73 }
74}
75
76#[cfg(test)]
77mod tests {
78 use std::time::Duration;
79
80 use super::*;
81 use crate::kafka::Kafka;
82 use anyhow::Result;
83 use futures_util::TryStreamExt;
84 use rdkafka::{
85 ClientConfig,
86 admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
87 client::DefaultClientContext,
88 producer::{FutureProducer, FutureRecord, Producer},
89 };
90 use swiftide_core::indexing::TextNode;
91 use testcontainers::{ContainerAsync, runners::AsyncRunner};
92 use testcontainers_modules::kafka::apache::{self};
93
94 struct KafkaBroker {
95 _broker: ContainerAsync<apache::Kafka>,
96 partitions: i32,
97 replicas: i32,
98 client_config: ClientConfig,
99 }
100
101 impl KafkaBroker {
102 pub async fn start() -> Result<Self> {
103 static PARTITIONS: i32 = 1;
104 static REPLICAS: i32 = 1;
105
106 let kafka_node = apache::Kafka::default().start().await?;
107 let bootstrap_servers = format!(
108 "127.0.0.1:{}",
109 kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await?
110 );
111
112 let mut client_config = ClientConfig::new();
113 client_config.set("bootstrap.servers", &bootstrap_servers);
114 client_config.set("group.id", "group_id");
115 client_config.set("auto.offset.reset", "earliest");
116
117 let broker = KafkaBroker {
118 _broker: kafka_node,
119 client_config,
120 partitions: PARTITIONS,
121 replicas: REPLICAS,
122 };
123
124 Ok(broker)
125 }
126
127 pub async fn create_topic(&self, topic: impl AsRef<str>) -> Result<()> {
128 let admin = self.admin_client();
129 admin
130 .create_topics(
131 &[NewTopic {
132 name: topic.as_ref(),
133 num_partitions: self.partitions,
134 replication: TopicReplication::Fixed(self.replicas),
135 config: vec![],
136 }],
137 &AdminOptions::default(),
138 )
139 .await
140 .expect("topic creation failed");
141 Ok(())
142 }
143
144 fn admin_client(&self) -> AdminClient<DefaultClientContext> {
145 self.client_config.create().unwrap()
146 }
147
148 fn producer(&self) -> FutureProducer {
149 self.client_config.create().unwrap()
150 }
151 }
152
153 #[test_log::test(tokio::test(flavor = "multi_thread"))]
154 async fn test_kafka_loader() {
155 static TOPIC_NAME: &str = "topic";
156 let kafka_broker = KafkaBroker::start().await.unwrap();
157 kafka_broker.create_topic(TOPIC_NAME).await.unwrap();
158
159 let producer = kafka_broker.producer();
160 producer
161 .send(
162 FutureRecord::to(TOPIC_NAME).payload("payload").key("key"),
163 Duration::from_secs(0),
164 )
165 .await
166 .unwrap();
167 producer.flush(Duration::from_secs(0)).unwrap();
168
169 let loader = Kafka::builder()
170 .client_config(kafka_broker.client_config.clone())
171 .topic(TOPIC_NAME)
172 .build()
173 .unwrap();
174
175 let node: TextNode = loader.into_stream().try_next().await.unwrap().unwrap();
176 assert_eq!(node.chunk, "payload");
177 }
178}