swiftide_integrations/kafka/
persist.rs

1use std::{sync::Arc, time::Duration};
2
3use anyhow::Result;
4use async_trait::async_trait;
5
6use rdkafka::producer::FutureRecord;
7use swiftide_core::{
8    Persist,
9    indexing::{IndexingStream, TextNode},
10};
11
12use super::Kafka;
13
14#[async_trait]
15impl Persist for Kafka {
16    type Input = String;
17    type Output = String;
18
19    async fn setup(&self) -> Result<()> {
20        if self.topic_exists()? {
21            return Ok(());
22        }
23        if !self.create_topic_if_not_exists {
24            return Err(anyhow::anyhow!("Topic {} does not exist", self.topic));
25        }
26        self.create_topic().await?;
27        Ok(())
28    }
29
30    fn batch_size(&self) -> Option<usize> {
31        Some(self.batch_size)
32    }
33
34    async fn store(&self, node: TextNode) -> Result<TextNode> {
35        let (key, payload) = self.node_to_key_payload(&node)?;
36        self.producer()?
37            .send(
38                FutureRecord::to(&self.topic).key(&key).payload(&payload),
39                Duration::from_secs(0),
40            )
41            .await
42            .map_err(|(e, _)| anyhow::anyhow!("Failed to send node: {:?}", e))?;
43        Ok(node)
44    }
45
46    async fn batch_store(&self, nodes: Vec<TextNode>) -> IndexingStream<String> {
47        let producer = Arc::new(self.producer().expect("Failed to create producer"));
48
49        for node in &nodes {
50            match self.node_to_key_payload(node) {
51                Ok((key, payload)) => {
52                    if let Err(e) = producer
53                        .send(
54                            FutureRecord::to(&self.topic).payload(&payload).key(&key),
55                            Duration::from_secs(0),
56                        )
57                        .await
58                    {
59                        return vec![Err(anyhow::anyhow!("failed to send node: {:?}", e))].into();
60                    }
61                }
62                Err(e) => {
63                    return vec![Err(e)].into();
64                }
65            }
66        }
67
68        IndexingStream::iter(nodes.into_iter().map(Ok))
69    }
70}
71
72#[cfg(test)]
73mod tests {
74    use super::*;
75    use futures_util::TryStreamExt;
76    use rdkafka::ClientConfig;
77    use testcontainers::runners::AsyncRunner;
78    use testcontainers_modules::kafka::apache::{self};
79
80    #[test_log::test(tokio::test)]
81    async fn test_kafka_persist() {
82        static TOPIC_NAME: &str = "topic";
83
84        let kafka_node = apache::Kafka::default()
85            .start()
86            .await
87            .expect("failed to start kafka");
88        let bootstrap_servers = format!(
89            "127.0.0.1:{}",
90            kafka_node
91                .get_host_port_ipv4(apache::KAFKA_PORT)
92                .await
93                .expect("failed to get kafka port")
94        );
95
96        let mut client_config = ClientConfig::new();
97        client_config.set("bootstrap.servers", &bootstrap_servers);
98        let storage = Kafka::builder()
99            .client_config(client_config)
100            .topic(TOPIC_NAME)
101            .build()
102            .unwrap();
103
104        let node = TextNode::new("chunk");
105
106        storage.setup().await.unwrap();
107        storage.store(node.clone()).await.unwrap();
108    }
109
110    #[test_log::test(tokio::test)]
111    async fn test_kafka_batch_persist() {
112        static TOPIC_NAME: &str = "topic";
113
114        let kafka_node = apache::Kafka::default()
115            .start()
116            .await
117            .expect("failed to start kafka");
118        let bootstrap_servers = format!(
119            "127.0.0.1:{}",
120            kafka_node
121                .get_host_port_ipv4(apache::KAFKA_PORT)
122                .await
123                .expect("failed to get kafka port")
124        );
125
126        let mut client_config = ClientConfig::new();
127        client_config.set("bootstrap.servers", &bootstrap_servers);
128        let storage = Kafka::builder()
129            .client_config(client_config)
130            .topic(TOPIC_NAME)
131            .create_topic_if_not_exists(true)
132            .batch_size(2usize)
133            .build()
134            .unwrap();
135
136        let nodes = vec![TextNode::default(); 6];
137
138        storage.setup().await.unwrap();
139
140        let stream = storage.batch_store(nodes.clone()).await;
141
142        let result: Vec<TextNode> = stream.try_collect().await.unwrap();
143
144        assert_eq!(result.len(), 6);
145        assert_eq!(result[0], nodes[0]);
146        assert_eq!(result[1], nodes[1]);
147        assert_eq!(result[2], nodes[2]);
148        assert_eq!(result[3], nodes[3]);
149        assert_eq!(result[4], nodes[4]);
150        assert_eq!(result[5], nodes[5]);
151    }
152}