swiftide_integrations/kafka/
loader.rs1use futures_util::{StreamExt as _, stream};
2use rdkafka::{
3 Message,
4 consumer::{Consumer, StreamConsumer},
5 message::BorrowedMessage,
6};
7use swiftide_core::{Loader, indexing::IndexingStream, indexing::Node};
8
9use super::Kafka;
10
11impl Loader for Kafka {
12 #[tracing::instrument]
13 fn into_stream(self) -> IndexingStream {
14 let client_config = self.client_config;
15 let topic = self.topic.clone();
16
17 let consumer: StreamConsumer = client_config
18 .create()
19 .expect("Failed to create Kafka consumer");
20
21 consumer
22 .subscribe(&[&topic])
23 .expect("Failed to subscribe to topic");
24
25 let swiftide_stream = stream::unfold(consumer, |consumer| async move {
26 loop {
27 match consumer.recv().await {
28 Ok(message) => {
29 if let Some(Ok(payload)) = message.payload_view::<str>() {
31 let mut node = Node::new(payload);
32 msg_metadata(&mut node, &message);
33 tracing::trace!(?node, ?payload, "received message");
34 return Some((Ok(node), consumer));
35 }
36 tracing::debug!("Skipping message with invalid payload");
38 }
39 Err(e) => return Some((Err(anyhow::Error::from(e)), consumer)),
40 }
41 }
42 });
43
44 swiftide_stream.boxed().into()
45 }
46
47 fn into_stream_boxed(self: Box<Self>) -> IndexingStream {
48 (*self).into_stream()
49 }
50}
51
52fn msg_metadata(node: &mut Node, message: &BorrowedMessage) {
53 node.metadata
55 .insert("kafka_topic", message.topic().to_string());
56
57 node.metadata
58 .insert("kafka_partition", message.partition().to_string());
59 node.metadata
60 .insert("kafka_offset", message.offset().to_string());
61
62 if let Some(timestamp) = message.timestamp().to_millis() {
64 node.metadata
65 .insert("kafka_timestamp", timestamp.to_string());
66 }
67
68 if let Some(Ok(key)) = message.key_view::<str>() {
70 node.metadata.insert("kafka_key", key.to_string());
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use std::time::Duration;
77
78 use super::*;
79 use crate::kafka::Kafka;
80 use anyhow::Result;
81 use futures_util::TryStreamExt;
82 use rdkafka::{
83 ClientConfig,
84 admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
85 client::DefaultClientContext,
86 producer::{FutureProducer, FutureRecord, Producer},
87 };
88 use testcontainers::{ContainerAsync, runners::AsyncRunner};
89 use testcontainers_modules::kafka::apache::{self};
90
91 struct KafkaBroker {
92 _broker: ContainerAsync<apache::Kafka>,
93 partitions: i32,
94 replicas: i32,
95 client_config: ClientConfig,
96 }
97
98 impl KafkaBroker {
99 pub async fn start() -> Result<Self> {
100 static PARTITIONS: i32 = 1;
101 static REPLICAS: i32 = 1;
102
103 let kafka_node = apache::Kafka::default().start().await?;
104 let bootstrap_servers = format!(
105 "127.0.0.1:{}",
106 kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await?
107 );
108
109 let mut client_config = ClientConfig::new();
110 client_config.set("bootstrap.servers", &bootstrap_servers);
111 client_config.set("group.id", "group_id");
112 client_config.set("auto.offset.reset", "earliest");
113
114 let broker = KafkaBroker {
115 _broker: kafka_node,
116 client_config,
117 partitions: PARTITIONS,
118 replicas: REPLICAS,
119 };
120
121 Ok(broker)
122 }
123
124 pub async fn create_topic(&self, topic: impl AsRef<str>) -> Result<()> {
125 let admin = self.admin_client();
126 admin
127 .create_topics(
128 &[NewTopic {
129 name: topic.as_ref(),
130 num_partitions: self.partitions,
131 replication: TopicReplication::Fixed(self.replicas),
132 config: vec![],
133 }],
134 &AdminOptions::default(),
135 )
136 .await
137 .expect("topic creation failed");
138 Ok(())
139 }
140
141 fn admin_client(&self) -> AdminClient<DefaultClientContext> {
142 self.client_config.create().unwrap()
143 }
144
145 fn producer(&self) -> FutureProducer {
146 self.client_config.create().unwrap()
147 }
148 }
149
150 #[test_log::test(tokio::test(flavor = "multi_thread"))]
151 async fn test_kafka_loader() {
152 static TOPIC_NAME: &str = "topic";
153 let kafka_broker = KafkaBroker::start().await.unwrap();
154 kafka_broker.create_topic(TOPIC_NAME).await.unwrap();
155
156 let producer = kafka_broker.producer();
157 producer
158 .send(
159 FutureRecord::to(TOPIC_NAME).payload("payload").key("key"),
160 Duration::from_secs(0),
161 )
162 .await
163 .unwrap();
164 producer.flush(Duration::from_secs(0)).unwrap();
165
166 let loader = Kafka::builder()
167 .client_config(kafka_broker.client_config.clone())
168 .topic(TOPIC_NAME)
169 .build()
170 .unwrap();
171
172 let node: Node = loader.into_stream().try_next().await.unwrap().unwrap();
173 assert_eq!(node.chunk, "payload");
174 }
175}