1use crate::config::KafkaSourceConfig;
4use crate::context::BookmarkContext;
5use crate::decode;
6use crate::state::{Bookmark, PartitionOffset, state_key};
7use async_trait::async_trait;
8use base64::Engine;
9use faucet_core::{FaucetError, Source, Stream, StreamPage};
10use faucet_common_kafka::OnDecodeError;
11use rdkafka::ClientConfig;
12use rdkafka::Message;
13use rdkafka::config::RDKafkaLogLevel;
14use rdkafka::consumer::{Consumer, StreamConsumer};
15use rdkafka::message::Headers;
16use serde_json::{Map, Value, json};
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22#[cfg(feature = "schema-registry")]
23use faucet_common_kafka::KafkaValueFormat;
24#[cfg(feature = "schema-registry")]
25use faucet_common_kafka::schema_registry::client::SchemaRegistryClient;
26
27pub struct KafkaSource {
28 config: KafkaSourceConfig,
29 consumer: Arc<StreamConsumer<BookmarkContext>>,
30 context: BookmarkContext,
31 state_key_value: String,
32 assigned_floor: std::sync::Mutex<HashMap<(String, i32), i64>>,
37 #[cfg(feature = "schema-registry")]
38 sr_client: Option<SchemaRegistryClient>,
39}
40
41impl KafkaSource {
42 pub async fn new(config: KafkaSourceConfig) -> Result<Self, FaucetError> {
43 config.validate()?;
44
45 let mut client_config = ClientConfig::new();
46 client_config.set("bootstrap.servers", &config.brokers);
47 client_config.set("group.id", &config.group_id);
48 client_config.set("enable.auto.commit", "false");
49 client_config.set("auto.offset.reset", config.auto_offset_reset.as_str());
50 client_config.set(
51 "session.timeout.ms",
52 config.session_timeout.as_millis().to_string(),
53 );
54 client_config.set_log_level(RDKafkaLogLevel::Warning);
55
56 config.auth.apply(&mut client_config)?;
57
58 for (k, v) in &config.extra_client_config {
59 client_config.set(k, v);
60 }
61
62 let context = BookmarkContext::new();
63 let consumer: StreamConsumer<BookmarkContext> = client_config
64 .create_with_context(context.clone())
65 .map_err(|e| FaucetError::Source(format!("kafka consumer init: {e}")))?;
66
67 let topic_refs: Vec<&str> = config.topics.iter().map(String::as_str).collect();
68 consumer
69 .subscribe(&topic_refs)
70 .map_err(|e| FaucetError::Source(format!("kafka subscribe: {e}")))?;
71
72 let state_key_value = state_key(&config.group_id, &config.topics);
73
74 #[cfg(feature = "schema-registry")]
75 let sr_client = build_sr_client(&config.value_format, config.key_format.as_ref())?;
76
77 Ok(Self {
78 config,
79 consumer: Arc::new(consumer),
80 context,
81 state_key_value,
82 assigned_floor: std::sync::Mutex::new(HashMap::new()),
83 #[cfg(feature = "schema-registry")]
84 sr_client,
85 })
86 }
87
88 fn check_callback_error(&self) -> Result<(), FaucetError> {
92 let mut guard =
93 self.context.callback_error.lock().map_err(|e| {
94 FaucetError::State(format!("kafka callback_error mutex poisoned: {e}"))
95 })?;
96 if let Some(e) = guard.take() {
97 return Err(e);
98 }
99 Ok(())
100 }
101
102 async fn resolve_assigned_offsets(&self) -> Vec<PartitionOffset> {
117 let assigned = match self.consumer.assignment() {
118 Ok(tpl) => tpl,
119 Err(e) => {
120 tracing::warn!(error = %e, "kafka source: assignment() failed; bookmark falls back to consumed/carry-forward offsets");
121 return Vec::new();
122 }
123 };
124 let positions: HashMap<(String, i32), rdkafka::Offset> = self
125 .consumer
126 .position()
127 .map(|tpl| {
128 tpl.elements()
129 .iter()
130 .map(|e| ((e.topic().to_string(), e.partition()), e.offset()))
131 .collect()
132 })
133 .unwrap_or_default();
134
135 let mut out: Vec<PartitionOffset> = Vec::new();
136 let mut need_watermark: Vec<(String, i32)> = Vec::new();
137 {
138 let cache = self.assigned_floor.lock().ok();
139 for elem in assigned.elements() {
140 let key = (elem.topic().to_string(), elem.partition());
141 match positions.get(&key) {
142 Some(rdkafka::Offset::Offset(n)) => out.push(PartitionOffset {
144 topic: key.0,
145 partition: key.1,
146 offset: *n,
147 }),
148 _ => match cache.as_ref().and_then(|c| c.get(&key)) {
151 Some(&floor) => out.push(PartitionOffset {
152 topic: key.0,
153 partition: key.1,
154 offset: floor,
155 }),
156 None => need_watermark.push(key),
157 },
158 }
159 }
160 }
161
162 if !need_watermark.is_empty() {
163 let earliest = matches!(
164 self.config.auto_offset_reset,
165 crate::config::OffsetReset::Earliest
166 );
167 let consumer = Arc::clone(&self.consumer);
168 let to_fetch = need_watermark.clone();
169 let resolved = tokio::task::spawn_blocking(move || {
172 to_fetch
173 .into_iter()
174 .filter_map(|(topic, partition)| {
175 consumer
176 .fetch_watermarks(&topic, partition, Duration::from_secs(5))
177 .ok()
178 .map(|(low, high)| {
179 (topic, partition, if earliest { low } else { high })
180 })
181 })
182 .collect::<Vec<_>>()
183 })
184 .await
185 .unwrap_or_default();
186
187 if let Ok(mut cache) = self.assigned_floor.lock() {
188 for (topic, partition, floor) in resolved {
189 cache.insert((topic.clone(), partition), floor);
190 out.push(PartitionOffset {
191 topic,
192 partition,
193 offset: floor,
194 });
195 }
196 }
197 }
198 out
199 }
200
201 fn start_bookmark(&self) -> Option<Bookmark> {
204 self.context
205 .start_offsets
206 .lock()
207 .ok()
208 .and_then(|g| g.clone())
209 }
210
211 async fn build_bookmark(
216 &self,
217 consumed: &HashMap<(String, i32), i64>,
218 ) -> Result<Option<Value>, FaucetError> {
219 let assigned = self.resolve_assigned_offsets().await;
220 let merged = Bookmark::merged(self.start_bookmark().as_ref(), &assigned, consumed);
221 if merged.partition_offsets.is_empty() {
222 Ok(None)
223 } else {
224 Ok(Some(merged.to_value()?))
225 }
226 }
227
228 async fn message_to_value(
229 &self,
230 msg: &rdkafka::message::BorrowedMessage<'_>,
231 ) -> Result<Value, FaucetError> {
232 let value = decode::decode(
233 msg.payload(),
234 &self.config.value_format,
235 #[cfg(feature = "schema-registry")]
236 self.sr_client.as_ref(),
237 )
238 .await?;
239
240 let key = match &self.config.key_format {
241 Some(fmt) => {
242 decode::decode(
243 msg.key(),
244 fmt,
245 #[cfg(feature = "schema-registry")]
246 self.sr_client.as_ref(),
247 )
248 .await?
249 }
250 None => match msg.key() {
251 Some(bytes) => Value::String(
252 std::str::from_utf8(bytes)
253 .map_err(|e| FaucetError::Source(format!("kafka key utf-8: {e}")))?
254 .to_string(),
255 ),
256 None => Value::Null,
257 },
258 };
259
260 let mut headers_obj = Map::new();
261 if let Some(headers) = msg.headers() {
262 for h in headers.iter() {
263 if let Some(value_bytes) = h.value {
264 if let Ok(s) = std::str::from_utf8(value_bytes) {
265 headers_obj.insert(h.key.to_string(), Value::String(s.to_string()));
266 } else {
267 let encoded = base64::engine::general_purpose::STANDARD.encode(value_bytes);
268 headers_obj.insert(h.key.to_string(), Value::String(encoded));
269 }
270 }
271 }
272 }
273
274 Ok(json!({
275 "key": key,
276 "value": value,
277 "topic": msg.topic(),
278 "partition": msg.partition(),
279 "offset": msg.offset(),
280 "timestamp": msg.timestamp().to_millis().unwrap_or(0),
281 "headers": Value::Object(headers_obj),
282 }))
283 }
284}
285
286#[cfg(feature = "schema-registry")]
287fn build_sr_client(
288 value_format: &KafkaValueFormat,
289 key_format: Option<&KafkaValueFormat>,
290) -> Result<Option<SchemaRegistryClient>, FaucetError> {
291 fn extract_cfg(f: &KafkaValueFormat) -> Option<&faucet_common_kafka::SchemaRegistryConfig> {
292 match f {
293 KafkaValueFormat::ConfluentAvro { schema_registry }
294 | KafkaValueFormat::ConfluentProtobuf { schema_registry } => Some(schema_registry),
295 KafkaValueFormat::ConfluentJsonSchema {
296 schema_registry, ..
297 } => Some(schema_registry),
298 _ => None,
299 }
300 }
301 let cfg = extract_cfg(value_format).or_else(|| key_format.and_then(extract_cfg));
302 cfg.map(SchemaRegistryClient::new).transpose()
303}
304
305#[async_trait]
306impl Source for KafkaSource {
307 async fn fetch_with_context(
308 &self,
309 context: &HashMap<String, Value>,
310 ) -> Result<Vec<Value>, FaucetError> {
311 let (records, _bookmark) = self.fetch_with_context_incremental(context).await?;
312 Ok(records)
313 }
314
315 async fn fetch_with_context_incremental(
316 &self,
317 _context: &HashMap<String, Value>,
318 ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
319 let mut records: Vec<Value> = Vec::new();
320 let mut pending_offsets: HashMap<(String, i32), i64> = HashMap::new();
321 let mut last_message_at = Instant::now();
322 let max_messages = self.config.max_messages.unwrap_or(usize::MAX);
323 let idle_timeout = self.config.idle_timeout;
324
325 loop {
326 self.check_callback_error()?;
330
331 let idle_deadline = idle_timeout.map(|t| last_message_at + t);
332 let poll_budget = match idle_deadline {
333 Some(deadline) => deadline
334 .checked_duration_since(Instant::now())
335 .unwrap_or(Duration::ZERO),
336 None => self.config.poll_timeout,
337 };
338
339 tokio::select! {
340 biased;
341 _ = tokio::signal::ctrl_c() => {
342 tracing::info!("kafka source: ctrl_c received, stopping cleanly");
343 break;
344 }
345 recv = tokio::time::timeout(poll_budget, self.consumer.recv()) => {
346 match recv {
347 Ok(Ok(msg)) => {
348 match self.message_to_value(&msg).await {
349 Ok(record) => {
350 pending_offsets.insert(
351 (msg.topic().to_string(), msg.partition()),
352 msg.offset() + 1,
353 );
354 records.push(record);
355 last_message_at = Instant::now();
356 if records.len() >= max_messages {
357 break;
358 }
359 }
360 Err(e) => match self.config.on_decode_error {
361 OnDecodeError::Skip => {
362 tracing::warn!(error = %e, "kafka source: decode failed, skipping message");
363 }
364 OnDecodeError::Fail => return Err(e),
365 },
366 }
367 }
368 Ok(Err(e)) => {
369 return Err(FaucetError::Source(format!("kafka recv: {e}")));
370 }
371 Err(_timeout) => {
372 if let Some(deadline) = idle_deadline
373 && Instant::now() >= deadline
374 {
375 tracing::debug!("kafka source: idle_timeout reached, stopping");
376 break;
377 }
378 }
379 }
380 }
381 }
382 }
383
384 let bookmark_value = self.build_bookmark(&pending_offsets).await?;
385 Ok((records, bookmark_value))
386 }
387
388 fn stream_pages<'a>(
415 &'a self,
416 _context: &'a HashMap<String, Value>,
417 _batch_size: usize,
418 ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
419 let batch_size = self.config.batch_size;
420 let max_messages = self.config.max_messages.unwrap_or(usize::MAX);
421 let idle_timeout = self.config.idle_timeout;
422 let poll_timeout = self.config.poll_timeout;
423 let on_decode_error = self.config.on_decode_error;
424
425 let page_chunk = if batch_size == 0 {
429 usize::MAX
430 } else {
431 batch_size
432 };
433 let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
434
435 Box::pin(async_stream::try_stream! {
436 let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
437 let mut pending_offsets: HashMap<(String, i32), i64> = HashMap::new();
438 let mut last_message_at = Instant::now();
439 let mut total: usize = 0;
440
441 loop {
442 self.check_callback_error()?;
445
446 let idle_deadline = idle_timeout.map(|t| last_message_at + t);
447 let poll_budget = match idle_deadline {
448 Some(deadline) => deadline
449 .checked_duration_since(Instant::now())
450 .unwrap_or(Duration::ZERO),
451 None => poll_timeout,
452 };
453
454 let mut stop = false;
459 let mut fatal: Option<FaucetError> = None;
460 tokio::select! {
461 biased;
462 _ = tokio::signal::ctrl_c() => {
463 tracing::info!("kafka source: ctrl_c received, stopping cleanly");
464 stop = true;
465 }
466 recv = tokio::time::timeout(poll_budget, self.consumer.recv()) => {
467 match recv {
468 Ok(Ok(msg)) => {
469 match self.message_to_value(&msg).await {
470 Ok(record) => {
471 pending_offsets.insert(
472 (msg.topic().to_string(), msg.partition()),
473 msg.offset() + 1,
474 );
475 buffer.push(record);
476 last_message_at = Instant::now();
477 total += 1;
478 if total >= max_messages {
479 stop = true;
480 }
481 }
482 Err(e) => match on_decode_error {
483 OnDecodeError::Skip => {
484 tracing::warn!(error = %e, "kafka source: decode failed, skipping message");
485 }
486 OnDecodeError::Fail => fatal = Some(e),
487 },
488 }
489 }
490 Ok(Err(e)) => {
491 fatal = Some(FaucetError::Source(format!("kafka recv: {e}")));
492 }
493 Err(_timeout) => {
494 if let Some(deadline) = idle_deadline
495 && Instant::now() >= deadline
496 {
497 tracing::debug!("kafka source: idle_timeout reached, stopping");
498 stop = true;
499 }
500 }
501 }
502 }
503 }
504
505 if let Some(e) = fatal {
506 Err(e)?;
507 }
508
509 if !buffer.is_empty() && buffer.len() >= page_chunk {
516 let page_records = std::mem::replace(
517 &mut buffer,
518 Vec::with_capacity(initial_capacity),
519 );
520 let bookmark = self.build_bookmark(&pending_offsets).await?;
521 yield StreamPage { records: page_records, bookmark };
522 }
523
524 if stop {
525 break;
526 }
527 }
528
529 if !buffer.is_empty() {
533 let bookmark = self.build_bookmark(&pending_offsets).await?;
534 yield StreamPage { records: buffer, bookmark };
535 }
536
537 tracing::info!(
538 messages = total,
539 batch_size,
540 "kafka source: stream complete",
541 );
542 })
543 }
544
545 fn config_schema(&self) -> Value {
546 let schema = schemars::schema_for!(KafkaSourceConfig);
547 serde_json::to_value(&schema).unwrap_or(Value::Null)
548 }
549
550 fn state_key(&self) -> Option<String> {
551 Some(self.state_key_value.clone())
552 }
553
554 async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
555 let parsed = Bookmark::from_value(bookmark)?;
556 {
561 let mut guard = self.context.start_offsets.lock().map_err(|e| {
562 FaucetError::State(format!("kafka start_offsets mutex poisoned: {e}"))
563 })?;
564 *guard = Some(parsed.clone());
565 }
566 let mut guard = self.context.pending_bookmark.lock().map_err(|e| {
567 FaucetError::State(format!("kafka pending_bookmark mutex poisoned: {e}"))
568 })?;
569 *guard = Some(parsed);
570 Ok(())
571 }
572
573 fn connector_name(&self) -> &'static str {
574 "kafka"
575 }
576
577 async fn check(
588 &self,
589 ctx: &faucet_core::check::CheckContext,
590 ) -> Result<faucet_core::check::CheckReport, FaucetError> {
591 use faucet_core::check::{CheckReport, Probe};
592 use rdkafka::util::Timeout;
593
594 let start = std::time::Instant::now();
595 let consumer = Arc::clone(&self.consumer);
596 let rd_timeout = Timeout::After(ctx.timeout);
597
598 let fetch = tokio::task::spawn_blocking(move || {
601 consumer
602 .fetch_metadata(None, rd_timeout)
603 .map(|md| md.brokers().len())
604 .map_err(|e| e.to_string())
605 });
606
607 let probe = match tokio::time::timeout(ctx.timeout, fetch).await {
608 Ok(Ok(Ok(broker_count))) => {
609 tracing::debug!(broker_count, "kafka check: fetched cluster metadata");
610 Probe::pass("metadata", start.elapsed())
611 }
612 Ok(Ok(Err(e))) => Probe::fail_hint(
613 "metadata",
614 start.elapsed(),
615 e,
616 "verify brokers, network reachability, and auth (SASL/TLS) settings",
617 ),
618 Ok(Err(join_err)) => Probe::fail(
619 "metadata",
620 start.elapsed(),
621 format!("metadata fetch task failed: {join_err}"),
622 ),
623 Err(_elapsed) => Probe::fail_hint(
624 "metadata",
625 start.elapsed(),
626 "metadata fetch timed out",
627 "no broker responded within the check timeout",
628 ),
629 };
630 Ok(CheckReport::single(probe))
631 }
632}