cortex_sources/kafka/
mod.rs

1//! Kafka source implementations
2
3use crate::types::SourceResult;
4use cortex_ai::{
5    flow::{source::Source, types::SourceOutput},
6    FlowComponent, FlowError, FlowFuture,
7};
8use rdkafka::{
9    consumer::{Consumer, StreamConsumer},
10    ClientConfig, Message,
11};
12use std::marker::PhantomData;
13use std::sync::Arc;
14use std::{error::Error, time::Duration};
15
16/// Configuration for Kafka source
17#[derive(Debug, Clone)]
18pub struct KafkaConfig {
19    /// Bootstrap servers (comma-separated list)
20    pub bootstrap_servers: String,
21    /// Topic to consume from
22    pub topic: String,
23    /// Consumer group ID
24    pub group_id: String,
25    /// Auto offset reset (earliest/latest)
26    pub auto_offset_reset: String,
27    /// Session timeout (in milliseconds)
28    pub session_timeout_ms: u64,
29}
30
31impl Default for KafkaConfig {
32    fn default() -> Self {
33        Self {
34            bootstrap_servers: "localhost:9092".to_string(),
35            topic: "default-topic".to_string(),
36            group_id: "cortex-consumer".to_string(),
37            auto_offset_reset: "earliest".to_string(),
38            session_timeout_ms: 6000,
39        }
40    }
41}
42
43/// A source that reads from Kafka
44pub struct KafkaSource<T> {
45    consumer: Arc<StreamConsumer>,
46    timeout: Duration,
47    _phantom: PhantomData<T>,
48}
49
50impl<T> KafkaSource<T>
51where
52    T: for<'a> TryFrom<Vec<u8>, Error = Box<dyn Error + Send + Sync>> + Send + Sync + 'static,
53{
54    /// Create a new Kafka source with the given configuration
55    ///
56    /// # Errors
57    ///
58    /// Returns an error if:
59    /// * Failed to create Kafka consumer
60    /// * Failed to subscribe to topic
61    /// * Invalid configuration parameters
62    pub fn new(config: &KafkaConfig) -> SourceResult<Self> {
63        let consumer: StreamConsumer = ClientConfig::new()
64            .set("group.id", &config.group_id)
65            .set("bootstrap.servers", &config.bootstrap_servers)
66            .set("enable.auto.commit", "true")
67            .set("auto.offset.reset", &config.auto_offset_reset)
68            .set("session.timeout.ms", config.session_timeout_ms.to_string())
69            .create()
70            .map_err(|e| FlowError::Source(e.to_string()))?;
71
72        consumer
73            .subscribe(&[&config.topic])
74            .map_err(|e| FlowError::Source(e.to_string()))?;
75
76        Ok(Self {
77            consumer: Arc::new(consumer),
78            timeout: Duration::from_secs(1),
79            _phantom: PhantomData,
80        })
81    }
82
83    /// Set the timeout for reading messages
84    ///
85    /// Returns a new `KafkaSource` with the updated timeout
86    #[must_use]
87    pub const fn with_timeout(mut self, timeout: Duration) -> Self {
88        self.timeout = timeout;
89        self
90    }
91}
92
93impl<T> FlowComponent for KafkaSource<T>
94where
95    T: for<'a> TryFrom<Vec<u8>, Error = Box<dyn Error + Send + Sync>> + Send + Sync + 'static,
96{
97    type Input = ();
98    type Output = T;
99    type Error = FlowError;
100}
101
102impl<T> Source for KafkaSource<T>
103where
104    T: for<'a> TryFrom<Vec<u8>, Error = Box<dyn Error + Send + Sync>> + Send + Sync + 'static,
105{
106    fn stream(&self) -> FlowFuture<'_, SourceOutput<Self::Output, Self::Error>, Self::Error> {
107        let (source_tx, source_rx) = flume::unbounded();
108        let (feedback_tx, feedback_rx) = flume::unbounded::<Result<T, FlowError>>();
109
110        let consumer = Arc::clone(&self.consumer);
111
112        // Spawn a task to handle message consumption
113        tokio::spawn({
114            async move {
115                loop {
116                    match consumer.recv().await {
117                        Ok(message) => {
118                            if let Some(payload) = message.payload() {
119                                match T::try_from(payload.to_vec()) {
120                                    Ok(item) => {
121                                        if source_tx.send(Ok(item)).is_err() {
122                                            break;
123                                        }
124                                    }
125                                    Err(e) => {
126                                        if source_tx
127                                            .send(Err(FlowError::Source(e.to_string())))
128                                            .is_err()
129                                        {
130                                            break;
131                                        }
132                                    }
133                                }
134                            }
135                        }
136                        Err(e) => {
137                            if source_tx
138                                .send(Err(FlowError::Source(e.to_string())))
139                                .is_err()
140                            {
141                                break;
142                            }
143                        }
144                    }
145                }
146            }
147        });
148
149        // Spawn a task to handle feedback and commit offsets
150        let consumer = Arc::clone(&self.consumer);
151        tokio::spawn(async move {
152            while let Ok(result) = feedback_rx.recv_async().await {
153                if result.is_ok() {
154                    if let Err(e) =
155                        consumer.commit_consumer_state(rdkafka::consumer::CommitMode::Async)
156                    {
157                        tracing::error!("Failed to commit offsets: {}", e);
158                    }
159                }
160            }
161        });
162
163        Box::pin(async move {
164            Ok(SourceOutput {
165                receiver: source_rx,
166                feedback: feedback_tx,
167            })
168        })
169    }
170}