swiftide_integrations/kafka/
mod.rs

1//! Kafka is a distributed streaming platform.
2//!
3//! This module provides a Kafka loader for Swiftide and allows you to ingest
4//! messages from Kafka topics and use them for RAG.
5//!
6//! Can be configured with [`ClientConfig`].
7//!
8//! # Example
9//!
10//! ```no_run
11//! # use swiftide_integrations::kafka::*;
12//! let kafka = Kafka::builder()
13//!     .client_config(ClientConfig::new())
14//!     .topic("Hello Kafka")
15//!     .build().unwrap();
16//! ```
17
18use anyhow::{Context, Result};
19use derive_builder::Builder;
20use rdkafka::{
21    admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
22    client::DefaultClientContext,
23    consumer::{Consumer, StreamConsumer},
24    producer::FutureProducer,
25};
26use swiftide_core::indexing::TextNode;
27
28pub use rdkafka::config::ClientConfig;
29
30mod loader;
31mod persist;
32
33#[derive(Debug, Clone, Builder)]
34#[builder(setter(into, strip_option))]
35pub struct Kafka {
36    client_config: ClientConfig,
37    topic: String,
38    #[builder(default)]
39    /// Customize the key used for persisting nodes
40    persist_key_fn: Option<fn(&TextNode) -> Result<String>>,
41    #[builder(default)]
42    /// Customize the value used for persisting nodes
43    persist_payload_fn: Option<fn(&TextNode) -> Result<String>>,
44    #[builder(default = "1")]
45    partition: i32,
46    #[builder(default = "1")]
47    factor: i32,
48    #[builder(default)]
49    create_topic_if_not_exists: bool,
50    #[builder(default = "32")]
51    batch_size: usize,
52}
53
54impl Kafka {
55    pub fn from_client_config(config: impl Into<ClientConfig>, topic: impl Into<String>) -> Kafka {
56        Kafka {
57            client_config: config.into(),
58            topic: topic.into(),
59            persist_key_fn: None,
60            persist_payload_fn: None,
61            partition: 1,
62            factor: 1,
63            create_topic_if_not_exists: false,
64            batch_size: 32,
65        }
66    }
67
68    pub fn builder() -> KafkaBuilder {
69        KafkaBuilder::default()
70    }
71
72    fn producer(&self) -> Result<FutureProducer<DefaultClientContext>> {
73        self.client_config
74            .create()
75            .context("Failed to create producer")
76    }
77
78    fn topic_exists(&self) -> Result<bool> {
79        let consumer: StreamConsumer = self
80            .client_config
81            .create()
82            .context("Failed to create consumer")?;
83        let metadata = consumer.fetch_metadata(Some(&self.topic), None)?;
84        Ok(!metadata.topics().is_empty())
85    }
86
87    async fn create_topic(&self) -> Result<()> {
88        let admin_client: AdminClient<DefaultClientContext> = self
89            .client_config
90            .create()
91            .context("Failed to create admin client")?;
92        admin_client
93            .create_topics(
94                vec![&NewTopic::new(
95                    &self.topic,
96                    self.partition,
97                    TopicReplication::Fixed(self.factor),
98                )],
99                &AdminOptions::new(),
100            )
101            .await?;
102        Ok(())
103    }
104
105    /// Generates a ky for a given node to be persisted in Kafka.
106    fn persist_key_for_node(&self, node: &TextNode) -> Result<String> {
107        if let Some(key_fn) = self.persist_key_fn {
108            key_fn(node)
109        } else {
110            let hash = node.id();
111            Ok(format!("{}:{}", node.path.to_string_lossy(), hash))
112        }
113    }
114
115    /// Generates a value for a given node to be persisted in Kafka.
116    /// By default, the node is serialized as JSON.
117    /// If a custom function is provided, it is used to generate the value.
118    /// Otherwise, the node is serialized as JSON.
119    fn persist_value_for_node(&self, node: &TextNode) -> Result<String> {
120        if let Some(value_fn) = self.persist_payload_fn {
121            value_fn(node)
122        } else {
123            Ok(serde_json::to_string(node)?)
124        }
125    }
126
127    fn node_to_key_payload(&self, node: &TextNode) -> Result<(String, String)> {
128        let key = self.persist_key_for_node(node).map_err(|e| {
129            anyhow::anyhow!("persist_key_for_node failed: {:?} (node: {:?})", e, node)
130        })?;
131        let payload = self.persist_value_for_node(node).map_err(|e| {
132            anyhow::anyhow!("persist_value_for_node failed: {:?} (node: {:?})", e, node)
133        })?;
134
135        Ok((key, payload))
136    }
137}