faucet-source-kafka 1.0.1

Apache Kafka consumer source for the faucet-stream ecosystem
Documentation

faucet-source-kafka

Apache Kafka consumer source for faucet-stream. Subscribes to one or more topics, drains messages until a max_messages count or idle_timeout is reached, and emits each Kafka message as a structured JSON record. Built on rdkafka (librdkafka bindings). Supports durable offset tracking via any faucet-core StateStore so pipelines can resume exactly where they left off.


Quick start

# pipeline.yaml
version: 1

source:
  kind: kafka
  config:
    brokers: "broker1:9092,broker2:9092"
    topics:
      - orders
    group_id: faucet-orders-consumer
    value_format:
      type: json
    auto_offset_reset: earliest
    idle_timeout: 30      # stop after 30 s of no new messages
    max_messages: 10000   # or stop after 10 000 messages, whichever comes first

sink:
  kind: jsonl
  config:
    path: orders.jsonl

Run it:

faucet run pipeline.yaml

To resume from where the last run stopped, add a state store:

state:
  kind: file
  config:
    directory: .faucet-state

Full config reference

All fields are top-level keys under source.config in the pipeline YAML.

Field Type Default Description
brokers string required Comma-separated list of bootstrap broker addresses, e.g. "broker1:9092,broker2:9092".
topics string[] required One or more topic names to subscribe to.
group_id string required Kafka consumer group ID. Used for partition assignment and is part of the state-store key.
auth KafkaAuth {type: none} Authentication mode. See Auth below. Full details in the faucet-common-kafka README.
value_format KafkaValueFormat {type: json} How message value bytes are decoded. See Value formats.
key_format KafkaValueFormat | null null How message key bytes are decoded. When absent or null, key bytes are decoded as UTF-8 (or null if no key was set on the message).
auto_offset_reset "earliest" | "latest" "latest" Where to start consuming a partition that has no bookmarked offset. On a resume, every partition assigned in a prior run carries a bookmarked offset (see Resume and state store), so this only governs first-ever encounters — a fresh run or a newly-added partition.
max_messages integer | null null Stop after this many messages have been consumed. At least one of max_messages and idle_timeout must be set.
idle_timeout integer | null null Stop after this many seconds with no new messages. At least one of max_messages and idle_timeout must be set.
poll_timeout integer 1 Maximum seconds to wait on a single consumer.recv() call before checking termination conditions. Rarely needs tuning.
session_timeout integer 30 Kafka session.timeout.ms in seconds. Increase for slow brokers or long GC pauses.
on_decode_error "fail" | "skip" "fail" What to do when a single message fails to decode. fail aborts the batch; skip drops the message and logs a warning.
extra_client_config object {} Raw librdkafka client properties passed directly to the consumer. These can override anything set by auth or the typed fields above — use with care.
batch_size integer 1000 Messages per emitted StreamPage when the source is driven through Source::stream_pages (i.e. Pipeline::run / run_stream). See Streaming and batching below.

Record shape

Each Kafka message becomes one JSON object in the output stream:

{
  "key": "order-42",
  "value": { "id": 42, "status": "shipped", "amount": 99.95 },
  "topic": "orders",
  "partition": 2,
  "offset": 10483,
  "timestamp": 1747483200000,
  "headers": {
    "content-type": "application/json",
    "trace-id": "abc123"
  }
}

Field notes:

  • key — the message key decoded as UTF-8, or decoded according to key_format if set. null when the Kafka message carried no key.
  • value — the decoded message payload. Shape depends on value_format (see below).
  • topic — the topic name the message was consumed from.
  • partition — the partition number (integer).
  • offset — the offset of this message within its partition.
  • timestamp — milliseconds since the Unix epoch, from the Kafka message timestamp. 0 when no timestamp is present in the message.
  • headers — a flat object of string key → string value. Values that are not valid UTF-8 are base64-encoded. Empty object {} when no headers were set on the message.

Value formats

Configured via value_format (and optionally key_format). All formats use a type discriminator field.

Format type Description
JSON json Parse value bytes as a JSON document. Default.
Raw string raw_string Treat value bytes as a UTF-8 string. The string becomes the value field.
Bytes bytes Pass bytes through as a base64-encoded string inside value. No parsing is attempted.
Confluent Avro confluent_avro Confluent wire-format Avro: [0x00][schema_id 4B][Avro binary]. Requires schema-registry feature.
Confluent Protobuf confluent_protobuf Confluent wire-format Protobuf. Requires schema-registry feature. v1 returns an error — full descriptor support tracked in issue #44.
Confluent JSON Schema confluent_json_schema Confluent wire-format JSON: [0x00][schema_id 4B][JSON bytes]. Optional schema validation. Requires schema-registry feature.

The three Confluent formats require building with the schema-registry feature flag:

[dependencies]
faucet-source-kafka = { version = "...", features = ["schema-registry"] }

Each Confluent format takes a schema_registry block — see the faucet-common-kafka README for the full SchemaRegistryConfig options (URL, basic auth, cache capacity, request timeout).


Auth

Authentication is configured via the auth field using KafkaAuth from faucet-common-kafka. The full auth reference — SASL/PLAIN, SASL/SCRAM, SSL client certificates, and SASL+SSL — is in the faucet-common-kafka README.

Quick example for SASL/PLAIN (Confluent Cloud, MSK with SASL):

auth:
  type: sasl_plain
  username: "${env:KAFKA_USERNAME}"
  password: "${env:KAFKA_PASSWORD}"

Use ${env:VAR} interpolation so credentials never land in the YAML file.


Resume and state store

When a StateStore is wired into the pipeline (via state: in the YAML, or Pipeline::with_state_store in Rust), the source participates in durable offset tracking:

  1. Before each run, the pipeline reads the stored bookmark from the StateStore using the source's state key and calls apply_start_bookmark. The bookmark is buffered in memory; no seeking happens yet.

  2. On partition assignment (the consumer's rebalance callback, which fires before any message is fetched), the bookmarked offset for each assigned (topic, partition) is injected into the assignment so the consumer starts there. Setting the offset as part of the assignment — rather than seeking after the first poll — means no message from before the bookmark is ever delivered, so a resume never produces a duplicate.

  3. After the sink confirms the batch, the pipeline persists the new bookmark — a list of {topic, partition, offset} entries recording one past the highest offset written.

    The persisted bookmark records an offset for every assigned partition, not only the partitions that delivered a message in this run. A partition that was assigned but produced nothing is recorded at the consumer's current position (and a partition known from a prior run is carried forward). This is deliberate: if an empty-this-run partition were omitted, the next resume would have no offset for it and would fall back to auto_offset_reset (default latest) — silently skipping any records that arrived in the meantime. Recording its position closes that gap. The bookmark is only written on successful sink completion, so a crashed run never marks data as consumed.

    A partition that has never been assigned in any run (e.g. one added to the topic after the last run) has no recorded offset and so honours auto_offset_reset on first encounter — earliest reads it from the start, latest from the tail.

State key format:

kafka:{group_id}:{topic1}:{topic2}...

Topics are sorted alphabetically before joining, so the key is stable regardless of the order you list topics in the config. They are joined with : (not .) because a Kafka topic name may legally contain ., which would otherwise let ["a.b", "c"] and ["a", "b.c"] collide on the same group_id. For example, group_id = "my-group" and topics = ["beta", "alpha"] produces:

kafka:my-group:alpha:beta

Delivery semantics: Offsets are persisted via faucet-stream's state store only after the sink confirms a batch, and on restart the consumer seeds the partition assignment with the bookmarked offset before any message is fetched. End-to-end this is at-least-once if the sink can fail mid-batch; pair with idempotent sinks if you need stricter guarantees.


Streaming and batching

When the source is driven through Source::stream_pages (which is what Pipeline::run and run_stream use internally), messages are accumulated into a batch_size-sized in-memory buffer and emitted as a StreamPage whenever:

  1. The buffer reaches batch_size — yield a full page, reset the buffer, and keep polling for more messages.
  2. The idle_timeout window flushes a partial buffer — when the idle deadline fires with a non-empty buffer, the buffer is emitted as a trailing page and the loop continues.
  3. max_messages is reached or Ctrl+C is received — emit the final partial page (if any) and exit.

Each emitted page carries a snapshot of the cumulative (topic, partition) -> next_offset bookmark. The pipeline persists this via the configured StateStore after the sink confirms the write, giving at-least-once delivery with per-page durability — a crash between pages re-reads only the uncommitted page on resume.

Why this matters: the batch-mode fetch_all_incremental waits for the entire run to complete before returning, which means the sink sees nothing until termination and the state store is only updated once. With streaming, the sink writes (and the state store advances) every batch_size messages.

batch_size = 0 — drain entire run window. Passes the special "no batching" sentinel: the source accumulates every message produced by the run (until max_messages or idle_timeout fires) into a single page before yielding. This negates the streaming benefit and is intended only for tests or one-shot drain scenarios. For production pipelines, prefer a finite batch_size so the state store advances with each successful sink write.


Termination

The consume loop exits when any of the following is true:

  • max_messages — the configured number of messages has been collected.
  • idle_timeout — no new message has arrived within the configured number of seconds.
  • Ctrl+C (SIGINT) — the source catches the signal and exits the loop cleanly. The bookmark for everything consumed up to that point is returned to the pipeline, which persists it before exiting.

At least one of max_messages and idle_timeout must be set; the config is rejected at construction time otherwise.

Both conditions can be combined — the loop stops as soon as either threshold is hit:

max_messages: 50000
idle_timeout: 60

Throughput notes

librdkafka is one of the fastest Kafka client implementations available, benchmarked at millions of messages per second on well-provisioned hardware. For most pipelines the bottleneck is the downstream sink (BigQuery inserts, Postgres writes, S3 uploads), not the consume loop itself.

The v1 consume loop is single-threaded — one goroutine polls one StreamConsumer. For higher throughput, partition your topic and run multiple faucet pipeline instances with the same group_id. Kafka assigns disjoint partition sets to each instance, and they scale linearly.

Tuning tips:

  • Increase max_messages for larger batch sizes — fewer pipeline runs per unit of data.
  • Reduce idle_timeout for latency-sensitive pipelines where you want frequent small batches.
  • Use extra_client_config to pass fetch.max.bytes, max.partition.fetch.bytes, or queued.max.messages.kbytes if you need to push beyond librdkafka's defaults.

CLI integration

Run with the faucet binary:

faucet run pipeline.yaml
faucet validate pipeline.yaml
faucet preview pipeline.yaml --limit 5   # consume 5 messages and print to stdout

Inspect the full JSON Schema for every config field:

faucet schema source kafka

A complete working example is in cli/examples/kafka_to_jsonl.yaml.


See also

  • faucet-sink-kafka — produce records to Kafka topics.
  • faucet-common-kafka — shared auth modes, value formats, schema registry client, and policy enums used by both connectors.

License

Dual-licensed under MIT and Apache-2.0, matching the workspace license field.