swiftide_integrations/kafka/
persist.rs1use std::{sync::Arc, time::Duration};
2
3use anyhow::Result;
4use async_trait::async_trait;
5
6use rdkafka::producer::FutureRecord;
7use swiftide_core::{
8 Persist,
9 indexing::{IndexingStream, TextNode},
10};
11
12use super::Kafka;
13
14#[async_trait]
15impl Persist for Kafka {
16 type Input = String;
17 type Output = String;
18
19 async fn setup(&self) -> Result<()> {
20 if self.topic_exists()? {
21 return Ok(());
22 }
23 if !self.create_topic_if_not_exists {
24 return Err(anyhow::anyhow!("Topic {} does not exist", self.topic));
25 }
26 self.create_topic().await?;
27 Ok(())
28 }
29
30 fn batch_size(&self) -> Option<usize> {
31 Some(self.batch_size)
32 }
33
34 async fn store(&self, node: TextNode) -> Result<TextNode> {
35 let (key, payload) = self.node_to_key_payload(&node)?;
36 self.producer()?
37 .send(
38 FutureRecord::to(&self.topic).key(&key).payload(&payload),
39 Duration::from_secs(0),
40 )
41 .await
42 .map_err(|(e, _)| anyhow::anyhow!("Failed to send node: {:?}", e))?;
43 Ok(node)
44 }
45
46 async fn batch_store(&self, nodes: Vec<TextNode>) -> IndexingStream<String> {
47 let producer = Arc::new(self.producer().expect("Failed to create producer"));
48
49 for node in &nodes {
50 match self.node_to_key_payload(node) {
51 Ok((key, payload)) => {
52 if let Err(e) = producer
53 .send(
54 FutureRecord::to(&self.topic).payload(&payload).key(&key),
55 Duration::from_secs(0),
56 )
57 .await
58 {
59 return vec![Err(anyhow::anyhow!("failed to send node: {:?}", e))].into();
60 }
61 }
62 Err(e) => {
63 return vec![Err(e)].into();
64 }
65 }
66 }
67
68 IndexingStream::iter(nodes.into_iter().map(Ok))
69 }
70}
71
72#[cfg(test)]
73mod tests {
74 use super::*;
75 use futures_util::TryStreamExt;
76 use rdkafka::ClientConfig;
77 use testcontainers::runners::AsyncRunner;
78 use testcontainers_modules::kafka::apache::{self};
79
80 #[test_log::test(tokio::test)]
81 async fn test_kafka_persist() {
82 static TOPIC_NAME: &str = "topic";
83
84 let kafka_node = apache::Kafka::default()
85 .start()
86 .await
87 .expect("failed to start kafka");
88 let bootstrap_servers = format!(
89 "127.0.0.1:{}",
90 kafka_node
91 .get_host_port_ipv4(apache::KAFKA_PORT)
92 .await
93 .expect("failed to get kafka port")
94 );
95
96 let mut client_config = ClientConfig::new();
97 client_config.set("bootstrap.servers", &bootstrap_servers);
98 let storage = Kafka::builder()
99 .client_config(client_config)
100 .topic(TOPIC_NAME)
101 .build()
102 .unwrap();
103
104 let node = TextNode::new("chunk");
105
106 storage.setup().await.unwrap();
107 storage.store(node.clone()).await.unwrap();
108 }
109
110 #[test_log::test(tokio::test)]
111 async fn test_kafka_batch_persist() {
112 static TOPIC_NAME: &str = "topic";
113
114 let kafka_node = apache::Kafka::default()
115 .start()
116 .await
117 .expect("failed to start kafka");
118 let bootstrap_servers = format!(
119 "127.0.0.1:{}",
120 kafka_node
121 .get_host_port_ipv4(apache::KAFKA_PORT)
122 .await
123 .expect("failed to get kafka port")
124 );
125
126 let mut client_config = ClientConfig::new();
127 client_config.set("bootstrap.servers", &bootstrap_servers);
128 let storage = Kafka::builder()
129 .client_config(client_config)
130 .topic(TOPIC_NAME)
131 .create_topic_if_not_exists(true)
132 .batch_size(2usize)
133 .build()
134 .unwrap();
135
136 let nodes = vec![TextNode::default(); 6];
137
138 storage.setup().await.unwrap();
139
140 let stream = storage.batch_store(nodes.clone()).await;
141
142 let result: Vec<TextNode> = stream.try_collect().await.unwrap();
143
144 assert_eq!(result.len(), 6);
145 assert_eq!(result[0], nodes[0]);
146 assert_eq!(result[1], nodes[1]);
147 assert_eq!(result[2], nodes[2]);
148 assert_eq!(result[3], nodes[3]);
149 assert_eq!(result[4], nodes[4]);
150 assert_eq!(result[5], nodes[5]);
151 }
152}