kafkit-client 0.1.9

Kafka 4.0+ pure Rust client.
Documentation
<h1 align="center">
  <img src="./kafkit-icon.svg" alt="Kafkit icon" width="64" height="64" align="center">
  Kafkit Client
</h1>

A native async Rust client for Apache Kafka 4.0 and newer.

`kafkit-client` is built on `tokio` and `kafka-protocol`. It exposes producer,
consumer, share-group consumer, and admin APIs without wrapping librdkafka or the
Java client.

## Status

This crate targets modern Kafka clusters only:

- Apache Kafka 4.0+.
- KRaft clusters.
- Modern consumer groups using KIP-848.
- Transaction protocol v2.
- Share-group protocol support.

Classic consumer runtime protocols, ZooKeeper-era assumptions, and older broker
protocol paths are intentionally out of scope. The admin client can still list,
describe, and delete classic groups.

## Install

```toml
[dependencies]
kafkit-client = "0.1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
```

## Quick Start

Use `KafkaClient` when you want a compact, topic-scoped setup for application
code:

```rust,no_run
use kafkit_client::{KafkaClient, KafkaMessage, RecordHeader};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let orders = KafkaClient::new("localhost:9092").topic("orders");

    let producer = orders.producer().connect().await?;
    producer
        .send_message(
            KafkaMessage::new("created".to_owned())
                .with_key("order-42")
                .with_header(RecordHeader::new("trace-id", "abc-123")),
        )
        .await?;

    producer.shutdown().await?;
    Ok(())
}
```

## Producer

Use `KafkaProducer` and `ProducerConfig` directly when you need lower-level
control over batching, compression, idempotence, transactions, or explicit
topic/partition routing:

```rust,no_run
use kafkit_client::{
    KafkaProducer, ProduceRecord, ProducerCompression, ProducerConfig, RecordHeader,
};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let producer = KafkaProducer::connect(
        ProducerConfig::new("localhost:9092")
            .with_acks(-1)
            .with_compression(ProducerCompression::Zstd)
            .with_enable_idempotence(true),
    )
    .await?;

    producer
        .send(
            ProduceRecord::new("orders", 0, "created")
                .with_key("order-42")
                .with_timestamp(1_700_000_123_456)
                .with_header(RecordHeader::new("trace-id", "abc-123")),
        )
        .await?;

    producer
        .send(ProduceRecord::tombstone("orders", 0).with_key("order-41"))
        .await?;

    producer.flush().await?;
    producer.shutdown().await?;
    Ok(())
}
```

## Consumer

Consumers use Kafka's modern group protocol. The ergonomic builder subscribes to
the topic selected by `KafkaClient::topic(...)`:

```rust,no_run
use kafkit_client::{AutoOffsetReset, KafkaClient};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let consumer = KafkaClient::new("localhost:9092")
        .topic("orders")
        .consumer("orders-reader")
        .with_auto_offset_reset(AutoOffsetReset::Earliest)
        .with_instance_id("orders-reader-1")
        .connect()
        .await?;

    loop {
        let records = consumer.poll().await?;

        for record in records.iter() {
            println!(
                "{}:{}@{}",
                record.topic,
                record.partition,
                record.offset
            );
        }

        consumer.commit(&records).await?;
    }
}
```

## Admin

The admin client covers common cluster and topic operations:

```rust,no_run
use kafkit_client::{AdminConfig, KafkaAdmin, NewTopic};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;

    admin.create_topics([NewTopic::new("orders", 3, 1)]).await?;

    let topics = admin.list_topics().await?;
    let cluster = admin.describe_cluster().await?;

    println!("topics: {}", topics.len());
    println!("cluster id: {:?}", cluster.cluster_id);

    Ok(())
}
```

## Security

TLS and SASL are configured on the same builder/config types used for plain
connections:

```rust,no_run
use kafkit_client::{KafkaClient, TlsConfig};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let producer = KafkaClient::new("kafka.example.com:9093")
        .topic("orders")
        .producer()
        .with_tls(
            TlsConfig::new()
                .with_ca_cert_path("/etc/ssl/certs/kafka-ca.pem")
                .with_server_name("kafka.example.com"),
        )
        .with_sasl_scram_sha_512("alice", "correct-horse-battery-staple")
        .connect()
        .await?;

    producer.send("created".to_owned()).await?;
    producer.shutdown().await?;
    Ok(())
}
```

Supported security features:

- TLS with system roots, custom CA files, server-name override, and client certs.
- SASL/PLAIN.
- SASL/SCRAM-SHA-256.
- SASL/SCRAM-SHA-512.

## Feature Highlights

- Async producer, consumer, share-group consumer, and admin client.
- Topic-scoped `KafkaClient` facade for concise application setup.
- Lower-level `KafkaProducer`, `KafkaConsumer`, `KafkaShareConsumer`, and
  `KafkaAdmin` APIs for explicit configuration.
- Record headers, nullable values, tombstones, and explicit record timestamps.
- Configurable compression, linger, batching, retry backoff, request timeout,
  and delivery timeout.
- Producer `flush()` and explicit `shutdown()` controls.
- Transactional producer support, including `send_offsets_to_transaction`.
- Manual assignment, topic-list subscription, and pattern subscription.
- Seek, position, pause, resume, committed-offset lookup, beginning/end offset
  lookup, and timestamp offset lookup.
- Topic create/list/describe/delete/config APIs, consumer group description,
  broker metadata lookup, and cluster description.
- `tracing` instrumentation and `metrics` facade measurements for
  OpenTelemetry-compatible export.

## Choosing an API

- Use `KafkaClient` for service code that mostly works with one topic at a time.
- Use `KafkaProducer` with `ProducerConfig` for explicit partitioning, batching,
  bounded buffering, request sizing, compression, idempotence, or transactions.
- Use `KafkaConsumer` with `ConsumerConfig` for direct control over group,
  assignment, fetch, offset, and commit behavior.
- Use `KafkaShareConsumer` for Kafka share-group consumption.
- Use `KafkaAdmin` with `AdminConfig` for cluster and topic management.

## Operational Notes

- Call `shutdown().await` on producers and consumers when your application is
  stopping so background tasks can drain and close connections cleanly.
- Call `flush().await` on producers when you need all currently buffered records
  acknowledged before continuing.
- Consumers return batches from `poll().await`; commit the records you have
  processed, or configure auto-commit if that matches your processing model.
- Transactional producers must set a transactional id. Prefer
  `begin_transaction()` for the normal lifecycle; it initializes transactions on
  first use. `init_transactions().await` is available for eager readiness checks.

## Performance

`kafkit-client` is designed as a native async Rust client rather than a wrapper
around `librdkafka`. In the local release-mode comparison app in this repository,
it currently outperforms `rdkafka` on both producer and consumer throughput for
the measured workload.

Benchmark setup:

- Command mode: `--release`.
- Machine: Apple M3 Pro, 12 logical CPUs, 36 GiB RAM.
- OS/toolchain: macOS 26.3.1, Rust 1.95.0, Docker 29.4.0.
- Kafka: `apache/kafka:4.2.0` managed by testcontainers.
- Payload: 100,000 messages, 256-byte values.
- Topic: three partitions.
- Compression: none.
- Producer: `linger.ms = 0`, concurrency 256.
- Consumer: topic prefilled with 100,000 records by the comparison app, commits
  disabled.

| Workload | kafkit-client msg/s | rdkafka msg/s | kafkit speedup |
| --- | ---: | ---: | ---: |
| Produce | 122,024.45 | 12,129.49 | 10.06x |
| Consume | 114,726.34 | 31,006.31 | 3.70x |

Producer p50, p95, and p99 latency in that run was also lower:

| Client | p50 us | p95 us | p99 us | max us |
| --- | ---: | ---: | ---: | ---: |
| kafkit-client | 1,038 | 2,709 | 8,495 | 272,895 |
| rdkafka | 18,751 | 37,727 | 55,807 | 87,935 |

Consumer throughput was also higher in the same comparison:

| Client | Elapsed ms | Msg/s | MiB/s |
| --- | ---: | ---: | ---: |
| kafkit-client | 871.64 | 114,726.34 | 28.01 |
| rdkafka | 3,225.15 | 31,006.31 | 7.57 |

Consumer latency percentiles are intentionally not highlighted because the
benchmark records payload age from prefill time. For consumers, the useful number
from this run is fetch/decode throughput with commits disabled.

These are not universal Kafka performance numbers. They are a repeatable local
comparison for this repo's benchmark shape. Different broker versions, network
latency, compression, partition counts, batch sizes, commit behavior, message
sizes, and durability settings can move the result. In this run, broker env vars
were not set, so the benchmark app started separate Kafka testcontainers for
`kafkit-client` and `rdkafka`. That isolates the clients but does not put both
clients on the same broker process. The full report, commands, and specs are in
[`examples/perf-comparison/PERFORMANCE_SUMMARY.md`](../../examples/perf-comparison/PERFORMANCE_SUMMARY.md).

## How It Works

The producer has the same broad shape as Kafka's Java producer: application calls
enqueue records, a dedicated sender loop drains ready batches, batches are grouped
by leader, and produce requests are dispatched asynchronously. The sender keeps a
bounded number of requests in flight per broker connection, defaulting to five
with `ProducerConfig::max_in_flight_requests_per_connection`, which matches the
Java producer default.

Internally, the producer path is:

1. `send(...)` validates and enqueues a `ProduceRecord` into the accumulator.
2. The accumulator groups records by topic/partition and seals batches when they
   reach `batch_size`, expire `linger`, or need to flush.
3. Metadata maps each partition to its current leader.
4. Ready batches are grouped by leader and encoded into Kafka produce requests.
5. Produce requests are sent in the background with bounded in-flight tracking.
6. Completions resolve the original application futures, retry retriable errors,
   or fail records when delivery timeout is exceeded.

The consumer runtime follows a similar single-owner async model. It owns the
metadata cache, coordinator connection, leader connections, assignment state,
offset state, heartbeat scheduling, and buffered records. Polling asks the runtime
for records; the runtime fetches from partition leaders, decodes record batches
with `kafka-protocol`, and preserves partial trailing batches for the next fetch
instead of treating a fetch-boundary partial batch as fatal.

Internally, the consumer path is:

1. Subscription or manual assignment updates the runtime-owned assignment state.
2. Metadata maps assigned partitions to their current leaders.
3. Fetch requests are grouped by leader connection.
4. Responses are decoded into `ConsumerRecord` batches in the runtime.
5. Any records beyond the current poll demand stay buffered for later polls.
6. Offset commits are explicit unless auto-commit is enabled.

## Why It Is Faster In These Benchmarks

The performance advantage in the current comparison comes from a few concrete
implementation choices:

- Native Tokio I/O avoids crossing an FFI/native client boundary for every
  produce or consume operation.
- The client targets Kafka 4.x and modern protocols directly, so it does not carry
  compatibility branches for older broker eras.
- Producer dispatch is pipelined: the sender no longer waits for one produce
  response before issuing the next request to a leader.
- Completion handling is decoupled from request dispatch, so the sender can keep
  batches moving while previous requests are still awaiting broker responses.
- The default benchmark uses zero linger, which avoids intentionally delaying
  partial batches in a latency-sensitive workload.
- Consumer fetches are runtime-owned and decode directly into Rust records
  without an FFI/native-client handoff.
- Fetch decoding handles partial batch boundaries without forcing an error/retry
  path, which matters when a broker response ends at a batch boundary.
- The consumer benchmark disables commits, so the comparison isolates fetch and
  decode throughput rather than coordinator commit latency.

`librdkafka` remains a mature and heavily optimized Kafka client. The claim here
is narrower: for this repo's current release-mode benchmark, with the settings
above, `kafkit-client` is faster.

## Runnable Examples

The crate includes real programs under `examples/`. For a local Kafka 4.0+
broker on `localhost:9092`, run:

```sh
cargo run -p kafkit-client --example order_workflow
cargo run -p kafkit-client --example transactional_order_worker
cargo run -p kafkit-client --example customer_profile_projection
cargo run -p kafkit-client --example cluster_operations_report
```

See [examples/README.md](examples/README.md) for environment variables and
Docker Compose notes.

## More Docs

- [Quickstart]docs/quickstart.md
- [Configuration]docs/configuration.md
- [Producer]docs/producer.md
- [Consumer]docs/consumer.md
- [Groups]docs/groups.md
- [Transactions]docs/transactions.md
- [Admin]docs/admin.md
- [Security]docs/security.md
- [Troubleshooting]docs/troubleshooting.md
- [Compatibility]docs/compatibility.md
- [Limitations]docs/limitations.md
- [Migration notes]docs/migration.md