arkflow_plugin/input/
kafka.rs1use arkflow_core::input::{register_input_builder, Ack, Input, InputBuilder};
6use arkflow_core::{Error, MessageBatch};
7use async_trait::async_trait;
8use rdkafka::config::ClientConfig;
9use rdkafka::consumer::{Consumer, StreamConsumer};
10use rdkafka::message::Message as KafkaMessage;
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct KafkaInputConfig {
18 pub brokers: Vec<String>,
20 pub topics: Vec<String>,
22 pub consumer_group: String,
24 pub client_id: Option<String>,
26 pub start_from_latest: bool,
28}
29
30pub struct KafkaInput {
32 config: KafkaInputConfig,
33 consumer: Arc<RwLock<Option<StreamConsumer>>>,
34}
35
36impl KafkaInput {
37 pub fn new(config: KafkaInputConfig) -> Result<Self, Error> {
39 Ok(Self {
40 config,
41 consumer: Arc::new(RwLock::new(None)),
42 })
43 }
44}
45
46#[async_trait]
47impl Input for KafkaInput {
48 async fn connect(&self) -> Result<(), Error> {
49 let mut client_config = ClientConfig::new();
50
51 client_config.set("bootstrap.servers", &self.config.brokers.join(","));
53
54 client_config.set("group.id", &self.config.consumer_group);
56
57 if let Some(client_id) = &self.config.client_id {
59 client_config.set("client.id", client_id);
60 }
61
62 if self.config.start_from_latest {
64 client_config.set("auto.offset.reset", "latest");
65 } else {
66 client_config.set("auto.offset.reset", "earliest");
67 }
68
69 let consumer: StreamConsumer = client_config
71 .create()
72 .map_err(|e| Error::Connection(format!("Unable to create a Kafka consumer: {}", e)))?;
73
74 let x: Vec<&str> = self
76 .config
77 .topics
78 .iter()
79 .map(|topic| topic.as_str())
80 .collect();
81 consumer.subscribe(&x).map_err(|e| {
82 Error::Connection(format!("You cannot subscribe to a Kafka topic: {}", e))
83 })?;
84
85 let consumer_arc = self.consumer.clone();
87 let mut consumer_guard = consumer_arc.write().await;
88 *consumer_guard = Some(consumer);
89
90 Ok(())
91 }
92
93 async fn read(&self) -> Result<(MessageBatch, Arc<dyn Ack>), Error> {
94 let consumer_arc = self.consumer.clone();
95 let consumer_guard = consumer_arc.read().await;
96 if consumer_guard.is_none() {
97 return Err(Error::Connection("The input is not connected".to_string()));
98 }
99 let consumer = consumer_guard.as_ref().unwrap();
100
101 match consumer.recv().await {
102 Ok(kafka_message) => {
103 let payload = kafka_message.payload().ok_or_else(|| {
105 Error::Process("The Kafka message has no content".to_string())
106 })?;
107
108 let mut binary_data = Vec::new();
109 binary_data.push(payload.to_vec());
110 let msg_batch = MessageBatch::new_binary(binary_data);
111
112 let topic = kafka_message.topic().to_string();
114 let partition = kafka_message.partition();
115 let offset = kafka_message.offset();
116
117 let ack = KafkaAck {
118 consumer: self.consumer.clone(),
119 topic,
120 partition,
121 offset,
122 };
123
124 Ok((msg_batch, Arc::new(ack)))
125 }
126 Err(e) => Err(Error::Connection(format!(
127 "Error receiving Kafka message: {}",
128 e
129 ))),
130 }
131 }
132
133 async fn close(&self) -> Result<(), Error> {
134 let mut consumer_guard = self.consumer.write().await;
135 if let Some(consumer) = consumer_guard.take() {
136 if let Err(e) = consumer.unassign() {
137 tracing::warn!("Error unassigning Kafka consumer: {}", e);
138 }
139 }
140 Ok(())
141 }
142}
143
144pub struct KafkaAck {
146 consumer: Arc<RwLock<Option<StreamConsumer>>>,
147 topic: String,
148 partition: i32,
149 offset: i64,
150}
151
152#[async_trait]
153impl Ack for KafkaAck {
154 async fn ack(&self) {
155 let consumer_mutex_guard = self.consumer.read().await;
157 if let Some(v) = &*consumer_mutex_guard {
158 if let Err(e) = v.store_offset(&self.topic, self.partition, self.offset) {
159 tracing::error!("Error committing Kafka offset: {}", e);
160 }
161 }
162 }
163}
164
165pub(crate) struct KafkaInputBuilder;
166impl InputBuilder for KafkaInputBuilder {
167 fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Input>, Error> {
168 if config.is_none() {
169 return Err(Error::Config(
170 "Kafka input configuration is missing".to_string(),
171 ));
172 }
173 let config: KafkaInputConfig = serde_json::from_value(config.clone().unwrap())?;
174 Ok(Arc::new(KafkaInput::new(config)?))
175 }
176}
177
178pub fn init() {
179 register_input_builder("kafka", Arc::new(KafkaInputBuilder));
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185
186 #[tokio::test]
187 async fn test_kafka_input_new() {
188 let config = KafkaInputConfig {
189 brokers: vec!["localhost:9092".to_string()],
190 topics: vec!["test-topic".to_string()],
191 consumer_group: "test-group".to_string(),
192 client_id: Some("test-client".to_string()),
193 start_from_latest: false,
194 };
195
196 let input = KafkaInput::new(config);
197 assert!(input.is_ok());
198 let input = input.unwrap();
199 assert_eq!(input.config.brokers, vec!["localhost:9092".to_string()]);
200 assert_eq!(input.config.topics, vec!["test-topic".to_string()]);
201 assert_eq!(input.config.consumer_group, "test-group".to_string());
202 assert_eq!(input.config.client_id, Some("test-client".to_string()));
203 assert_eq!(input.config.start_from_latest, false);
204 }
205
206 #[tokio::test]
207 async fn test_kafka_input_read_not_connected() {
208 let config = KafkaInputConfig {
209 brokers: vec!["localhost:9092".to_string()],
210 topics: vec!["test-topic".to_string()],
211 consumer_group: "test-group".to_string(),
212 client_id: None,
213 start_from_latest: true,
214 };
215
216 let input = KafkaInput::new(config).unwrap();
217 let result = input.read().await;
219 assert!(result.is_err());
220 match result {
221 Err(Error::Connection(msg)) => {
222 assert_eq!(msg, "The input is not connected");
223 }
224 _ => panic!("Expected Connection error"),
225 }
226 }
227
228 #[tokio::test]
229 async fn test_kafka_ack() {
230 let config = KafkaInputConfig {
231 brokers: vec!["localhost:9092".to_string()],
232 topics: vec!["test-topic".to_string()],
233 consumer_group: "test-group".to_string(),
234 client_id: None,
235 start_from_latest: true,
236 };
237
238 let input = KafkaInput::new(config).unwrap();
239 let ack = KafkaAck {
240 consumer: input.consumer.clone(),
241 topic: "test-topic".to_string(),
242 partition: 0,
243 offset: 100,
244 };
245
246 ack.ack().await;
248 }
250}