arkflow_plugin/input/
kafka.rs

1//! Kafka input component
2//!
3//! Receive data from a Kafka topic
4
5use arkflow_core::input::{register_input_builder, Ack, Input, InputBuilder};
6use arkflow_core::{Error, MessageBatch};
7use async_trait::async_trait;
8use rdkafka::config::ClientConfig;
9use rdkafka::consumer::{Consumer, StreamConsumer};
10use rdkafka::message::Message as KafkaMessage;
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13use tokio::sync::RwLock;
14
15/// Kafka input configuration
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct KafkaInputConfig {
18    /// List of Kafka server addresses
19    pub brokers: Vec<String>,
20    /// Subscribed to a topics
21    pub topics: Vec<String>,
22    /// Consumer group ID
23    pub consumer_group: String,
24    /// Client ID (optional)
25    pub client_id: Option<String>,
26    /// Start with the most news
27    pub start_from_latest: bool,
28}
29
30/// Kafka input component
31pub struct KafkaInput {
32    config: KafkaInputConfig,
33    consumer: Arc<RwLock<Option<StreamConsumer>>>,
34}
35
36impl KafkaInput {
37    /// Create a new Kafka input component
38    pub fn new(config: KafkaInputConfig) -> Result<Self, Error> {
39        Ok(Self {
40            config,
41            consumer: Arc::new(RwLock::new(None)),
42        })
43    }
44}
45
46#[async_trait]
47impl Input for KafkaInput {
48    async fn connect(&self) -> Result<(), Error> {
49        let mut client_config = ClientConfig::new();
50
51        // Configure the Kafka server address
52        client_config.set("bootstrap.servers", &self.config.brokers.join(","));
53
54        // Set the consumer group ID
55        client_config.set("group.id", &self.config.consumer_group);
56
57        // Set the client ID
58        if let Some(client_id) = &self.config.client_id {
59            client_config.set("client.id", client_id);
60        }
61
62        // Set the offset reset policy
63        if self.config.start_from_latest {
64            client_config.set("auto.offset.reset", "latest");
65        } else {
66            client_config.set("auto.offset.reset", "earliest");
67        }
68
69        // Create consumers
70        let consumer: StreamConsumer = client_config
71            .create()
72            .map_err(|e| Error::Connection(format!("Unable to create a Kafka consumer: {}", e)))?;
73
74        // Subscribe to a topic
75        let x: Vec<&str> = self
76            .config
77            .topics
78            .iter()
79            .map(|topic| topic.as_str())
80            .collect();
81        consumer.subscribe(&x).map_err(|e| {
82            Error::Connection(format!("You cannot subscribe to a Kafka topic: {}", e))
83        })?;
84
85        // Update consumer and connection status
86        let consumer_arc = self.consumer.clone();
87        let mut consumer_guard = consumer_arc.write().await;
88        *consumer_guard = Some(consumer);
89
90        Ok(())
91    }
92
93    async fn read(&self) -> Result<(MessageBatch, Arc<dyn Ack>), Error> {
94        let consumer_arc = self.consumer.clone();
95        let consumer_guard = consumer_arc.read().await;
96        if consumer_guard.is_none() {
97            return Err(Error::Connection("The input is not connected".to_string()));
98        }
99        let consumer = consumer_guard.as_ref().unwrap();
100
101        match consumer.recv().await {
102            Ok(kafka_message) => {
103                // Create internal message from Kafka message
104                let payload = kafka_message.payload().ok_or_else(|| {
105                    Error::Process("The Kafka message has no content".to_string())
106                })?;
107
108                let mut binary_data = Vec::new();
109                binary_data.push(payload.to_vec());
110                let msg_batch = MessageBatch::new_binary(binary_data);
111
112                // Create acknowledgment object
113                let topic = kafka_message.topic().to_string();
114                let partition = kafka_message.partition();
115                let offset = kafka_message.offset();
116
117                let ack = KafkaAck {
118                    consumer: self.consumer.clone(),
119                    topic,
120                    partition,
121                    offset,
122                };
123
124                Ok((msg_batch, Arc::new(ack)))
125            }
126            Err(e) => Err(Error::Connection(format!(
127                "Error receiving Kafka message: {}",
128                e
129            ))),
130        }
131    }
132
133    async fn close(&self) -> Result<(), Error> {
134        let mut consumer_guard = self.consumer.write().await;
135        if let Some(consumer) = consumer_guard.take() {
136            if let Err(e) = consumer.unassign() {
137                tracing::warn!("Error unassigning Kafka consumer: {}", e);
138            }
139        }
140        Ok(())
141    }
142}
143
144/// Kafka message acknowledgment
145pub struct KafkaAck {
146    consumer: Arc<RwLock<Option<StreamConsumer>>>,
147    topic: String,
148    partition: i32,
149    offset: i64,
150}
151
152#[async_trait]
153impl Ack for KafkaAck {
154    async fn ack(&self) {
155        // Commit offsets
156        let consumer_mutex_guard = self.consumer.read().await;
157        if let Some(v) = &*consumer_mutex_guard {
158            if let Err(e) = v.store_offset(&self.topic, self.partition, self.offset) {
159                tracing::error!("Error committing Kafka offset: {}", e);
160            }
161        }
162    }
163}
164
165pub(crate) struct KafkaInputBuilder;
166impl InputBuilder for KafkaInputBuilder {
167    fn build(&self, config: &Option<serde_json::Value>) -> Result<Arc<dyn Input>, Error> {
168        if config.is_none() {
169            return Err(Error::Config(
170                "Kafka input configuration is missing".to_string(),
171            ));
172        }
173        let config: KafkaInputConfig = serde_json::from_value(config.clone().unwrap())?;
174        Ok(Arc::new(KafkaInput::new(config)?))
175    }
176}
177
178pub fn init() {
179    register_input_builder("kafka", Arc::new(KafkaInputBuilder));
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    #[tokio::test]
187    async fn test_kafka_input_new() {
188        let config = KafkaInputConfig {
189            brokers: vec!["localhost:9092".to_string()],
190            topics: vec!["test-topic".to_string()],
191            consumer_group: "test-group".to_string(),
192            client_id: Some("test-client".to_string()),
193            start_from_latest: false,
194        };
195
196        let input = KafkaInput::new(config);
197        assert!(input.is_ok());
198        let input = input.unwrap();
199        assert_eq!(input.config.brokers, vec!["localhost:9092".to_string()]);
200        assert_eq!(input.config.topics, vec!["test-topic".to_string()]);
201        assert_eq!(input.config.consumer_group, "test-group".to_string());
202        assert_eq!(input.config.client_id, Some("test-client".to_string()));
203        assert_eq!(input.config.start_from_latest, false);
204    }
205
206    #[tokio::test]
207    async fn test_kafka_input_read_not_connected() {
208        let config = KafkaInputConfig {
209            brokers: vec!["localhost:9092".to_string()],
210            topics: vec!["test-topic".to_string()],
211            consumer_group: "test-group".to_string(),
212            client_id: None,
213            start_from_latest: true,
214        };
215
216        let input = KafkaInput::new(config).unwrap();
217        // Try to read in unconnected state, should return error
218        let result = input.read().await;
219        assert!(result.is_err());
220        match result {
221            Err(Error::Connection(msg)) => {
222                assert_eq!(msg, "The input is not connected");
223            }
224            _ => panic!("Expected Connection error"),
225        }
226    }
227
228    #[tokio::test]
229    async fn test_kafka_ack() {
230        let config = KafkaInputConfig {
231            brokers: vec!["localhost:9092".to_string()],
232            topics: vec!["test-topic".to_string()],
233            consumer_group: "test-group".to_string(),
234            client_id: None,
235            start_from_latest: true,
236        };
237
238        let input = KafkaInput::new(config).unwrap();
239        let ack = KafkaAck {
240            consumer: input.consumer.clone(),
241            topic: "test-topic".to_string(),
242            partition: 0,
243            offset: 100,
244        };
245
246        // Test acknowledgment, should have no effect since there is no actual consumer
247        ack.ack().await;
248        // This test mainly verifies that the ack method does not panic
249    }
250}