cortex_sources/kafka/
mod.rs1use crate::types::SourceResult;
4use cortex_ai::{
5 flow::{source::Source, types::SourceOutput},
6 FlowComponent, FlowError, FlowFuture,
7};
8use rdkafka::{
9 consumer::{Consumer, StreamConsumer},
10 ClientConfig, Message,
11};
12use std::marker::PhantomData;
13use std::sync::Arc;
14use std::{error::Error, time::Duration};
15
16#[derive(Debug, Clone)]
18pub struct KafkaConfig {
19 pub bootstrap_servers: String,
21 pub topic: String,
23 pub group_id: String,
25 pub auto_offset_reset: String,
27 pub session_timeout_ms: u64,
29}
30
31impl Default for KafkaConfig {
32 fn default() -> Self {
33 Self {
34 bootstrap_servers: "localhost:9092".to_string(),
35 topic: "default-topic".to_string(),
36 group_id: "cortex-consumer".to_string(),
37 auto_offset_reset: "earliest".to_string(),
38 session_timeout_ms: 6000,
39 }
40 }
41}
42
43pub struct KafkaSource<T> {
45 consumer: Arc<StreamConsumer>,
46 timeout: Duration,
47 _phantom: PhantomData<T>,
48}
49
50impl<T> KafkaSource<T>
51where
52 T: for<'a> TryFrom<Vec<u8>, Error = Box<dyn Error + Send + Sync>> + Send + Sync + 'static,
53{
54 pub fn new(config: &KafkaConfig) -> SourceResult<Self> {
63 let consumer: StreamConsumer = ClientConfig::new()
64 .set("group.id", &config.group_id)
65 .set("bootstrap.servers", &config.bootstrap_servers)
66 .set("enable.auto.commit", "true")
67 .set("auto.offset.reset", &config.auto_offset_reset)
68 .set("session.timeout.ms", config.session_timeout_ms.to_string())
69 .create()
70 .map_err(|e| FlowError::Source(e.to_string()))?;
71
72 consumer
73 .subscribe(&[&config.topic])
74 .map_err(|e| FlowError::Source(e.to_string()))?;
75
76 Ok(Self {
77 consumer: Arc::new(consumer),
78 timeout: Duration::from_secs(1),
79 _phantom: PhantomData,
80 })
81 }
82
83 #[must_use]
87 pub const fn with_timeout(mut self, timeout: Duration) -> Self {
88 self.timeout = timeout;
89 self
90 }
91}
92
93impl<T> FlowComponent for KafkaSource<T>
94where
95 T: for<'a> TryFrom<Vec<u8>, Error = Box<dyn Error + Send + Sync>> + Send + Sync + 'static,
96{
97 type Input = ();
98 type Output = T;
99 type Error = FlowError;
100}
101
102impl<T> Source for KafkaSource<T>
103where
104 T: for<'a> TryFrom<Vec<u8>, Error = Box<dyn Error + Send + Sync>> + Send + Sync + 'static,
105{
106 fn stream(&self) -> FlowFuture<'_, SourceOutput<Self::Output, Self::Error>, Self::Error> {
107 let (source_tx, source_rx) = flume::unbounded();
108 let (feedback_tx, feedback_rx) = flume::unbounded::<Result<T, FlowError>>();
109
110 let consumer = Arc::clone(&self.consumer);
111
112 tokio::spawn({
114 async move {
115 loop {
116 match consumer.recv().await {
117 Ok(message) => {
118 if let Some(payload) = message.payload() {
119 match T::try_from(payload.to_vec()) {
120 Ok(item) => {
121 if source_tx.send(Ok(item)).is_err() {
122 break;
123 }
124 }
125 Err(e) => {
126 if source_tx
127 .send(Err(FlowError::Source(e.to_string())))
128 .is_err()
129 {
130 break;
131 }
132 }
133 }
134 }
135 }
136 Err(e) => {
137 if source_tx
138 .send(Err(FlowError::Source(e.to_string())))
139 .is_err()
140 {
141 break;
142 }
143 }
144 }
145 }
146 }
147 });
148
149 let consumer = Arc::clone(&self.consumer);
151 tokio::spawn(async move {
152 while let Ok(result) = feedback_rx.recv_async().await {
153 if result.is_ok() {
154 if let Err(e) =
155 consumer.commit_consumer_state(rdkafka::consumer::CommitMode::Async)
156 {
157 tracing::error!("Failed to commit offsets: {}", e);
158 }
159 }
160 }
161 });
162
163 Box::pin(async move {
164 Ok(SourceOutput {
165 receiver: source_rx,
166 feedback: feedback_tx,
167 })
168 })
169 }
170}