Skip to main content

faucet_source_kafka/
stream.rs

1//! `KafkaSource` — the Kafka consumer implementation.
2
3use crate::config::KafkaSourceConfig;
4use crate::context::BookmarkContext;
5use crate::decode;
6use crate::state::{Bookmark, PartitionOffset, state_key};
7use async_trait::async_trait;
8use base64::Engine;
9use faucet_core::{FaucetError, Source, Stream, StreamPage};
10use faucet_common_kafka::OnDecodeError;
11use rdkafka::ClientConfig;
12use rdkafka::Message;
13use rdkafka::config::RDKafkaLogLevel;
14use rdkafka::consumer::{Consumer, StreamConsumer};
15use rdkafka::message::Headers;
16use serde_json::{Map, Value, json};
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22#[cfg(feature = "schema-registry")]
23use faucet_common_kafka::KafkaValueFormat;
24#[cfg(feature = "schema-registry")]
25use faucet_common_kafka::schema_registry::client::SchemaRegistryClient;
26
27pub struct KafkaSource {
28    config: KafkaSourceConfig,
29    consumer: Arc<StreamConsumer<BookmarkContext>>,
30    context: BookmarkContext,
31    state_key_value: String,
32    /// Cache of watermark-derived floor offsets for assigned partitions that
33    /// have produced no message (so `position()` reports no concrete offset).
34    /// Resolved once per partition and reused so the streaming bookmark build
35    /// doesn't issue a broker `fetch_watermarks` every page (#146 H9).
36    assigned_floor: std::sync::Mutex<HashMap<(String, i32), i64>>,
37    #[cfg(feature = "schema-registry")]
38    sr_client: Option<SchemaRegistryClient>,
39}
40
41impl KafkaSource {
42    pub async fn new(config: KafkaSourceConfig) -> Result<Self, FaucetError> {
43        config.validate()?;
44
45        let mut client_config = ClientConfig::new();
46        client_config.set("bootstrap.servers", &config.brokers);
47        client_config.set("group.id", &config.group_id);
48        client_config.set("enable.auto.commit", "false");
49        client_config.set("auto.offset.reset", config.auto_offset_reset.as_str());
50        client_config.set(
51            "session.timeout.ms",
52            config.session_timeout.as_millis().to_string(),
53        );
54        client_config.set_log_level(RDKafkaLogLevel::Warning);
55
56        config.auth.apply(&mut client_config)?;
57
58        for (k, v) in &config.extra_client_config {
59            client_config.set(k, v);
60        }
61
62        let context = BookmarkContext::new();
63        let consumer: StreamConsumer<BookmarkContext> = client_config
64            .create_with_context(context.clone())
65            .map_err(|e| FaucetError::Source(format!("kafka consumer init: {e}")))?;
66
67        let topic_refs: Vec<&str> = config.topics.iter().map(String::as_str).collect();
68        consumer
69            .subscribe(&topic_refs)
70            .map_err(|e| FaucetError::Source(format!("kafka subscribe: {e}")))?;
71
72        let state_key_value = state_key(&config.group_id, &config.topics);
73
74        #[cfg(feature = "schema-registry")]
75        let sr_client = build_sr_client(&config.value_format, config.key_format.as_ref())?;
76
77        Ok(Self {
78            config,
79            consumer: Arc::new(consumer),
80            context,
81            state_key_value,
82            assigned_floor: std::sync::Mutex::new(HashMap::new()),
83            #[cfg(feature = "schema-registry")]
84            sr_client,
85        })
86    }
87
88    /// Drain the callback-error slot. Called once per poll iteration so
89    /// rebalance-callback failures surface before any further messages are
90    /// processed (matches the batch-mode invariant).
91    fn check_callback_error(&self) -> Result<(), FaucetError> {
92        let mut guard =
93            self.context.callback_error.lock().map_err(|e| {
94                FaucetError::State(format!("kafka callback_error mutex poisoned: {e}"))
95            })?;
96        if let Some(e) = guard.take() {
97            return Err(e);
98        }
99        Ok(())
100    }
101
102    /// Resolve a starting offset for **every assigned partition**, so the
103    /// persisted bookmark records partitions that produced no message this run
104    /// — otherwise such a partition is absent on resume and resets to
105    /// `auto.offset.reset` (default `latest`), silently skipping any records
106    /// that arrived in the meantime (the H9 fix).
107    ///
108    /// For a partition that delivered messages, `position()` reports a concrete
109    /// next-offset (cheap, local). For one that produced nothing, `position()`
110    /// reports `Offset::Invalid` (librdkafka only tracks *consumed* offsets), so
111    /// we fall back to its watermark via `fetch_watermarks`: the **low**
112    /// watermark under `earliest`, the **high** watermark under `latest` — i.e.
113    /// exactly where the consumer is positioned for that reset policy. Those
114    /// watermark lookups are a broker round-trip, so each empty partition's
115    /// floor is resolved once and cached.
116    async fn resolve_assigned_offsets(&self) -> Vec<PartitionOffset> {
117        let assigned = match self.consumer.assignment() {
118            Ok(tpl) => tpl,
119            Err(e) => {
120                tracing::warn!(error = %e, "kafka source: assignment() failed; bookmark falls back to consumed/carry-forward offsets");
121                return Vec::new();
122            }
123        };
124        let positions: HashMap<(String, i32), rdkafka::Offset> = self
125            .consumer
126            .position()
127            .map(|tpl| {
128                tpl.elements()
129                    .iter()
130                    .map(|e| ((e.topic().to_string(), e.partition()), e.offset()))
131                    .collect()
132            })
133            .unwrap_or_default();
134
135        let mut out: Vec<PartitionOffset> = Vec::new();
136        let mut need_watermark: Vec<(String, i32)> = Vec::new();
137        {
138            let cache = self.assigned_floor.lock().ok();
139            for elem in assigned.elements() {
140                let key = (elem.topic().to_string(), elem.partition());
141                match positions.get(&key) {
142                    // A delivered partition: its concrete next-offset.
143                    Some(rdkafka::Offset::Offset(n)) => out.push(PartitionOffset {
144                        topic: key.0,
145                        partition: key.1,
146                        offset: *n,
147                    }),
148                    // No concrete position → use a cached watermark floor, or
149                    // schedule a lookup for it.
150                    _ => match cache.as_ref().and_then(|c| c.get(&key)) {
151                        Some(&floor) => out.push(PartitionOffset {
152                            topic: key.0,
153                            partition: key.1,
154                            offset: floor,
155                        }),
156                        None => need_watermark.push(key),
157                    },
158                }
159            }
160        }
161
162        if !need_watermark.is_empty() {
163            let earliest = matches!(
164                self.config.auto_offset_reset,
165                crate::config::OffsetReset::Earliest
166            );
167            let consumer = Arc::clone(&self.consumer);
168            let to_fetch = need_watermark.clone();
169            // `fetch_watermarks` is a blocking librdkafka broker call — run it
170            // off the async runtime.
171            let resolved = tokio::task::spawn_blocking(move || {
172                to_fetch
173                    .into_iter()
174                    .filter_map(|(topic, partition)| {
175                        consumer
176                            .fetch_watermarks(&topic, partition, Duration::from_secs(5))
177                            .ok()
178                            .map(|(low, high)| {
179                                (topic, partition, if earliest { low } else { high })
180                            })
181                    })
182                    .collect::<Vec<_>>()
183            })
184            .await
185            .unwrap_or_default();
186
187            if let Ok(mut cache) = self.assigned_floor.lock() {
188                for (topic, partition, floor) in resolved {
189                    cache.insert((topic.clone(), partition), floor);
190                    out.push(PartitionOffset {
191                        topic,
192                        partition,
193                        offset: floor,
194                    });
195                }
196            }
197        }
198        out
199    }
200
201    /// The start bookmark applied via [`apply_start_bookmark`], retained for
202    /// carry-forward (cloned, not consumed). `None` on a fresh run.
203    fn start_bookmark(&self) -> Option<Bookmark> {
204        self.context
205            .start_offsets
206            .lock()
207            .ok()
208            .and_then(|g| g.clone())
209    }
210
211    /// Build the bookmark to persist from the offsets consumed this page/run,
212    /// merging in the assigned-partition offsets and the carry-forward start
213    /// bookmark. Returns `None` only when there is nothing at all to record (no
214    /// partition assigned, nothing consumed, no prior state).
215    async fn build_bookmark(
216        &self,
217        consumed: &HashMap<(String, i32), i64>,
218    ) -> Result<Option<Value>, FaucetError> {
219        let assigned = self.resolve_assigned_offsets().await;
220        let merged = Bookmark::merged(self.start_bookmark().as_ref(), &assigned, consumed);
221        if merged.partition_offsets.is_empty() {
222            Ok(None)
223        } else {
224            Ok(Some(merged.to_value()?))
225        }
226    }
227
228    async fn message_to_value(
229        &self,
230        msg: &rdkafka::message::BorrowedMessage<'_>,
231    ) -> Result<Value, FaucetError> {
232        let value = decode::decode(
233            msg.payload(),
234            &self.config.value_format,
235            #[cfg(feature = "schema-registry")]
236            self.sr_client.as_ref(),
237        )
238        .await?;
239
240        let key = match &self.config.key_format {
241            Some(fmt) => {
242                decode::decode(
243                    msg.key(),
244                    fmt,
245                    #[cfg(feature = "schema-registry")]
246                    self.sr_client.as_ref(),
247                )
248                .await?
249            }
250            None => match msg.key() {
251                Some(bytes) => Value::String(
252                    std::str::from_utf8(bytes)
253                        .map_err(|e| FaucetError::Source(format!("kafka key utf-8: {e}")))?
254                        .to_string(),
255                ),
256                None => Value::Null,
257            },
258        };
259
260        let mut headers_obj = Map::new();
261        if let Some(headers) = msg.headers() {
262            for h in headers.iter() {
263                if let Some(value_bytes) = h.value {
264                    if let Ok(s) = std::str::from_utf8(value_bytes) {
265                        headers_obj.insert(h.key.to_string(), Value::String(s.to_string()));
266                    } else {
267                        let encoded = base64::engine::general_purpose::STANDARD.encode(value_bytes);
268                        headers_obj.insert(h.key.to_string(), Value::String(encoded));
269                    }
270                }
271            }
272        }
273
274        Ok(json!({
275            "key": key,
276            "value": value,
277            "topic": msg.topic(),
278            "partition": msg.partition(),
279            "offset": msg.offset(),
280            "timestamp": msg.timestamp().to_millis().unwrap_or(0),
281            "headers": Value::Object(headers_obj),
282        }))
283    }
284}
285
286#[cfg(feature = "schema-registry")]
287fn build_sr_client(
288    value_format: &KafkaValueFormat,
289    key_format: Option<&KafkaValueFormat>,
290) -> Result<Option<SchemaRegistryClient>, FaucetError> {
291    fn extract_cfg(f: &KafkaValueFormat) -> Option<&faucet_common_kafka::SchemaRegistryConfig> {
292        match f {
293            KafkaValueFormat::ConfluentAvro { schema_registry }
294            | KafkaValueFormat::ConfluentProtobuf { schema_registry } => Some(schema_registry),
295            KafkaValueFormat::ConfluentJsonSchema {
296                schema_registry, ..
297            } => Some(schema_registry),
298            _ => None,
299        }
300    }
301    let cfg = extract_cfg(value_format).or_else(|| key_format.and_then(extract_cfg));
302    cfg.map(SchemaRegistryClient::new).transpose()
303}
304
305#[async_trait]
306impl Source for KafkaSource {
307    async fn fetch_with_context(
308        &self,
309        context: &HashMap<String, Value>,
310    ) -> Result<Vec<Value>, FaucetError> {
311        let (records, _bookmark) = self.fetch_with_context_incremental(context).await?;
312        Ok(records)
313    }
314
315    async fn fetch_with_context_incremental(
316        &self,
317        _context: &HashMap<String, Value>,
318    ) -> Result<(Vec<Value>, Option<Value>), FaucetError> {
319        let mut records: Vec<Value> = Vec::new();
320        let mut pending_offsets: HashMap<(String, i32), i64> = HashMap::new();
321        let mut last_message_at = Instant::now();
322        let max_messages = self.config.max_messages.unwrap_or(usize::MAX);
323        let idle_timeout = self.config.idle_timeout;
324
325        loop {
326            // Surface any error raised by the rebalance callback (e.g. a
327            // failed seek). Done before the next poll so we don't process
328            // additional messages after a bookmark application failure.
329            self.check_callback_error()?;
330
331            let idle_deadline = idle_timeout.map(|t| last_message_at + t);
332            let poll_budget = match idle_deadline {
333                Some(deadline) => deadline
334                    .checked_duration_since(Instant::now())
335                    .unwrap_or(Duration::ZERO),
336                None => self.config.poll_timeout,
337            };
338
339            tokio::select! {
340                biased;
341                _ = tokio::signal::ctrl_c() => {
342                    tracing::info!("kafka source: ctrl_c received, stopping cleanly");
343                    break;
344                }
345                recv = tokio::time::timeout(poll_budget, self.consumer.recv()) => {
346                    match recv {
347                        Ok(Ok(msg)) => {
348                            match self.message_to_value(&msg).await {
349                                Ok(record) => {
350                                    pending_offsets.insert(
351                                        (msg.topic().to_string(), msg.partition()),
352                                        msg.offset() + 1,
353                                    );
354                                    records.push(record);
355                                    last_message_at = Instant::now();
356                                    if records.len() >= max_messages {
357                                        break;
358                                    }
359                                }
360                                Err(e) => match self.config.on_decode_error {
361                                    OnDecodeError::Skip => {
362                                        tracing::warn!(error = %e, "kafka source: decode failed, skipping message");
363                                    }
364                                    OnDecodeError::Fail => return Err(e),
365                                },
366                            }
367                        }
368                        Ok(Err(e)) => {
369                            return Err(FaucetError::Source(format!("kafka recv: {e}")));
370                        }
371                        Err(_timeout) => {
372                            if let Some(deadline) = idle_deadline
373                                && Instant::now() >= deadline
374                            {
375                                tracing::debug!("kafka source: idle_timeout reached, stopping");
376                                break;
377                            }
378                        }
379                    }
380                }
381            }
382        }
383
384        let bookmark_value = self.build_bookmark(&pending_offsets).await?;
385        Ok((records, bookmark_value))
386    }
387
388    /// Stream Kafka messages page-by-page. Mirrors the
389    /// [`fetch_with_context_incremental`](Self::fetch_with_context_incremental)
390    /// poll loop but emits a [`StreamPage`] each time the in-memory buffer
391    /// reaches [`KafkaSourceConfig::batch_size`] (or whenever the idle window
392    /// flushes a partially-filled buffer), and continues polling until the
393    /// configured `max_messages` / `idle_timeout` termination conditions are
394    /// hit.
395    ///
396    /// The trait-level `batch_size` argument is intentionally ignored in
397    /// favour of the config field — the config is the user-facing knob the
398    /// README documents and routing the pipeline-supplied hint through it
399    /// would silently override an explicit config value.
400    ///
401    /// **Per-page bookmark:** each yielded page carries a snapshot of the
402    /// cumulative `(topic, partition) -> next_offset` map seen so far. The
403    /// streaming pipeline persists this via the configured `StateStore`
404    /// *after* the sink confirms the write, giving at-least-once delivery
405    /// with per-page durability — a crash between pages re-reads only the
406    /// uncommitted page on resume (the rebalance callback seeds the assigned
407    /// partitions with the bookmarked offsets before any fetch happens).
408    ///
409    /// **`batch_size = 0`:** drains the entire run window (until
410    /// `max_messages` or `idle_timeout` fires) into a single page. This
411    /// negates the streaming benefit; prefer a finite `batch_size` in
412    /// production so the state store advances with each successful sink
413    /// write.
414    fn stream_pages<'a>(
415        &'a self,
416        _context: &'a HashMap<String, Value>,
417        _batch_size: usize,
418    ) -> Pin<Box<dyn Stream<Item = Result<StreamPage, FaucetError>> + Send + 'a>> {
419        let batch_size = self.config.batch_size;
420        let max_messages = self.config.max_messages.unwrap_or(usize::MAX);
421        let idle_timeout = self.config.idle_timeout;
422        let poll_timeout = self.config.poll_timeout;
423        let on_decode_error = self.config.on_decode_error;
424
425        // batch_size == 0 means "drain entire run window into one page" —
426        // effectively no per-page flush boundary. Otherwise the page flushes
427        // as soon as it accumulates `batch_size` messages.
428        let page_chunk = if batch_size == 0 {
429            usize::MAX
430        } else {
431            batch_size
432        };
433        let initial_capacity = if batch_size == 0 { 1024 } else { batch_size };
434
435        Box::pin(async_stream::try_stream! {
436            let mut buffer: Vec<Value> = Vec::with_capacity(initial_capacity);
437            let mut pending_offsets: HashMap<(String, i32), i64> = HashMap::new();
438            let mut last_message_at = Instant::now();
439            let mut total: usize = 0;
440
441            loop {
442                // Surface any error raised by the rebalance callback (e.g. a
443                // failed seek) before processing the next poll batch.
444                self.check_callback_error()?;
445
446                let idle_deadline = idle_timeout.map(|t| last_message_at + t);
447                let poll_budget = match idle_deadline {
448                    Some(deadline) => deadline
449                        .checked_duration_since(Instant::now())
450                        .unwrap_or(Duration::ZERO),
451                    None => poll_timeout,
452                };
453
454                // Accumulators set by the select arm — `?` cannot cross the
455                // select's match boundary into the outer `try_stream!`, so we
456                // collect errors and termination flags here and act on them
457                // after the select returns.
458                let mut stop = false;
459                let mut fatal: Option<FaucetError> = None;
460                tokio::select! {
461                    biased;
462                    _ = tokio::signal::ctrl_c() => {
463                        tracing::info!("kafka source: ctrl_c received, stopping cleanly");
464                        stop = true;
465                    }
466                    recv = tokio::time::timeout(poll_budget, self.consumer.recv()) => {
467                        match recv {
468                            Ok(Ok(msg)) => {
469                                match self.message_to_value(&msg).await {
470                                    Ok(record) => {
471                                        pending_offsets.insert(
472                                            (msg.topic().to_string(), msg.partition()),
473                                            msg.offset() + 1,
474                                        );
475                                        buffer.push(record);
476                                        last_message_at = Instant::now();
477                                        total += 1;
478                                        if total >= max_messages {
479                                            stop = true;
480                                        }
481                                    }
482                                    Err(e) => match on_decode_error {
483                                        OnDecodeError::Skip => {
484                                            tracing::warn!(error = %e, "kafka source: decode failed, skipping message");
485                                        }
486                                        OnDecodeError::Fail => fatal = Some(e),
487                                    },
488                                }
489                            }
490                            Ok(Err(e)) => {
491                                fatal = Some(FaucetError::Source(format!("kafka recv: {e}")));
492                            }
493                            Err(_timeout) => {
494                                if let Some(deadline) = idle_deadline
495                                    && Instant::now() >= deadline
496                                {
497                                    tracing::debug!("kafka source: idle_timeout reached, stopping");
498                                    stop = true;
499                                }
500                            }
501                        }
502                    }
503                }
504
505                if let Some(e) = fatal {
506                    Err(e)?;
507                }
508
509                // Yield a full page as soon as the buffer hits `page_chunk`.
510                // Bookmark = cumulative snapshot of pending_offsets after this
511                // page's last message. Snapshot before flushing the buffer so
512                // a crash between yield and the next loop iteration re-reads
513                // only the uncommitted messages, not those already in this
514                // page.
515                if !buffer.is_empty() && buffer.len() >= page_chunk {
516                    let page_records = std::mem::replace(
517                        &mut buffer,
518                        Vec::with_capacity(initial_capacity),
519                    );
520                    let bookmark = self.build_bookmark(&pending_offsets).await?;
521                    yield StreamPage { records: page_records, bookmark };
522                }
523
524                if stop {
525                    break;
526                }
527            }
528
529            // Flush the trailing buffer (may be empty if the run terminated
530            // exactly on a page boundary). When non-empty, emit one final
531            // page carrying the cumulative bookmark.
532            if !buffer.is_empty() {
533                let bookmark = self.build_bookmark(&pending_offsets).await?;
534                yield StreamPage { records: buffer, bookmark };
535            }
536
537            tracing::info!(
538                messages = total,
539                batch_size,
540                "kafka source: stream complete",
541            );
542        })
543    }
544
545    fn config_schema(&self) -> Value {
546        let schema = schemars::schema_for!(KafkaSourceConfig);
547        serde_json::to_value(&schema).unwrap_or(Value::Null)
548    }
549
550    fn state_key(&self) -> Option<String> {
551        Some(self.state_key_value.clone())
552    }
553
554    async fn apply_start_bookmark(&self, bookmark: Value) -> Result<(), FaucetError> {
555        let parsed = Bookmark::from_value(bookmark)?;
556        // `pending_bookmark` is consumed by the rebalance callback to seed the
557        // assigned partitions' starting offsets. `start_offsets` keeps a
558        // retained copy so previously-known partitions that are empty this run
559        // carry their offset forward into the next bookmark (the H9 fix).
560        {
561            let mut guard = self.context.start_offsets.lock().map_err(|e| {
562                FaucetError::State(format!("kafka start_offsets mutex poisoned: {e}"))
563            })?;
564            *guard = Some(parsed.clone());
565        }
566        let mut guard = self.context.pending_bookmark.lock().map_err(|e| {
567            FaucetError::State(format!("kafka pending_bookmark mutex poisoned: {e}"))
568        })?;
569        *guard = Some(parsed);
570        Ok(())
571    }
572
573    fn connector_name(&self) -> &'static str {
574        "kafka"
575    }
576
577    /// Preflight probe that does **not** consume any message.
578    ///
579    /// The default `Source::check` would call `stream_pages`, which polls for
580    /// a message and would block on an empty topic until the idle/max-message
581    /// terminator fires. Instead we fetch cluster metadata
582    /// (`fetch_metadata(None, timeout)`), which validates broker connectivity +
583    /// auth without consuming or committing anything.
584    ///
585    /// `fetch_metadata` is a blocking librdkafka call, so it runs on a blocking
586    /// thread; the whole probe is additionally bounded by `ctx.timeout`.
587    async fn check(
588        &self,
589        ctx: &faucet_core::check::CheckContext,
590    ) -> Result<faucet_core::check::CheckReport, FaucetError> {
591        use faucet_core::check::{CheckReport, Probe};
592        use rdkafka::util::Timeout;
593
594        let start = std::time::Instant::now();
595        let consumer = Arc::clone(&self.consumer);
596        let rd_timeout = Timeout::After(ctx.timeout);
597
598        // Run the blocking fetch_metadata off the async runtime, bounded by the
599        // same wall-clock budget so an unreachable broker can't hang the probe.
600        let fetch = tokio::task::spawn_blocking(move || {
601            consumer
602                .fetch_metadata(None, rd_timeout)
603                .map(|md| md.brokers().len())
604                .map_err(|e| e.to_string())
605        });
606
607        let probe = match tokio::time::timeout(ctx.timeout, fetch).await {
608            Ok(Ok(Ok(broker_count))) => {
609                tracing::debug!(broker_count, "kafka check: fetched cluster metadata");
610                Probe::pass("metadata", start.elapsed())
611            }
612            Ok(Ok(Err(e))) => Probe::fail_hint(
613                "metadata",
614                start.elapsed(),
615                e,
616                "verify brokers, network reachability, and auth (SASL/TLS) settings",
617            ),
618            Ok(Err(join_err)) => Probe::fail(
619                "metadata",
620                start.elapsed(),
621                format!("metadata fetch task failed: {join_err}"),
622            ),
623            Err(_elapsed) => Probe::fail_hint(
624                "metadata",
625                start.elapsed(),
626                "metadata fetch timed out",
627                "no broker responded within the check timeout",
628            ),
629        };
630        Ok(CheckReport::single(probe))
631    }
632}