swiftide_integrations/kafka/
mod.rs1use anyhow::{Context, Result};
19use derive_builder::Builder;
20use rdkafka::{
21 admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
22 client::DefaultClientContext,
23 consumer::{Consumer, StreamConsumer},
24 producer::FutureProducer,
25};
26use swiftide_core::indexing::TextNode;
27
28pub use rdkafka::config::ClientConfig;
29
30mod loader;
31mod persist;
32
33#[derive(Debug, Clone, Builder)]
34#[builder(setter(into, strip_option))]
35pub struct Kafka {
36 client_config: ClientConfig,
37 topic: String,
38 #[builder(default)]
39 persist_key_fn: Option<fn(&TextNode) -> Result<String>>,
41 #[builder(default)]
42 persist_payload_fn: Option<fn(&TextNode) -> Result<String>>,
44 #[builder(default = "1")]
45 partition: i32,
46 #[builder(default = "1")]
47 factor: i32,
48 #[builder(default)]
49 create_topic_if_not_exists: bool,
50 #[builder(default = "32")]
51 batch_size: usize,
52}
53
54impl Kafka {
55 pub fn from_client_config(config: impl Into<ClientConfig>, topic: impl Into<String>) -> Kafka {
56 Kafka {
57 client_config: config.into(),
58 topic: topic.into(),
59 persist_key_fn: None,
60 persist_payload_fn: None,
61 partition: 1,
62 factor: 1,
63 create_topic_if_not_exists: false,
64 batch_size: 32,
65 }
66 }
67
68 pub fn builder() -> KafkaBuilder {
69 KafkaBuilder::default()
70 }
71
72 fn producer(&self) -> Result<FutureProducer<DefaultClientContext>> {
73 self.client_config
74 .create()
75 .context("Failed to create producer")
76 }
77
78 fn topic_exists(&self) -> Result<bool> {
79 let consumer: StreamConsumer = self
80 .client_config
81 .create()
82 .context("Failed to create consumer")?;
83 let metadata = consumer.fetch_metadata(Some(&self.topic), None)?;
84 Ok(!metadata.topics().is_empty())
85 }
86
87 async fn create_topic(&self) -> Result<()> {
88 let admin_client: AdminClient<DefaultClientContext> = self
89 .client_config
90 .create()
91 .context("Failed to create admin client")?;
92 admin_client
93 .create_topics(
94 vec![&NewTopic::new(
95 &self.topic,
96 self.partition,
97 TopicReplication::Fixed(self.factor),
98 )],
99 &AdminOptions::new(),
100 )
101 .await?;
102 Ok(())
103 }
104
105 fn persist_key_for_node(&self, node: &TextNode) -> Result<String> {
107 if let Some(key_fn) = self.persist_key_fn {
108 key_fn(node)
109 } else {
110 let hash = node.id();
111 Ok(format!("{}:{}", node.path.to_string_lossy(), hash))
112 }
113 }
114
115 fn persist_value_for_node(&self, node: &TextNode) -> Result<String> {
120 if let Some(value_fn) = self.persist_payload_fn {
121 value_fn(node)
122 } else {
123 Ok(serde_json::to_string(node)?)
124 }
125 }
126
127 fn node_to_key_payload(&self, node: &TextNode) -> Result<(String, String)> {
128 let key = self.persist_key_for_node(node).map_err(|e| {
129 anyhow::anyhow!("persist_key_for_node failed: {:?} (node: {:?})", e, node)
130 })?;
131 let payload = self.persist_value_for_node(node).map_err(|e| {
132 anyhow::anyhow!("persist_value_for_node failed: {:?} (node: {:?})", e, node)
133 })?;
134
135 Ok((key, payload))
136 }
137}