kafkit-client 0.1.7

Kafka 4.0+ pure Rust client.
Documentation

A native async Rust client for Apache Kafka 4.0 and newer.

kafkit-client is built on tokio and kafka-protocol. It exposes producer, consumer, share-group consumer, and admin APIs without wrapping librdkafka or the Java client.

Status

This crate targets modern Kafka clusters only:

  • Apache Kafka 4.0+.
  • KRaft clusters.
  • Modern consumer groups using KIP-848.
  • Transaction protocol v2.
  • Share-group protocol support.

Classic consumer runtime protocols, ZooKeeper-era assumptions, and older broker protocol paths are intentionally out of scope. The admin client can still list, describe, and delete classic groups.

Install

[dependencies]
kafkit-client = "0.1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

Performance

kafkit-client is designed as a native async Rust client rather than a wrapper around librdkafka. In the local release-mode comparison app in this repository, it currently outperforms rdkafka on both producer and consumer throughput for the measured workload.

Benchmark setup:

  • Command mode: --release.
  • Machine: Apple M3 Pro, 12 logical CPUs, 36 GiB RAM.
  • OS/toolchain: macOS 26.3.1, Rust 1.95.0, Docker 29.4.0.
  • Kafka: apache/kafka:4.2.0 managed by testcontainers.
  • Payload: 100,000 messages, 256-byte values.
  • Topic: three partitions.
  • Compression: none.
  • Producer: linger.ms = 0, concurrency 256.
  • Consumer: topic prefilled with 100,000 records by the comparison app, commits disabled.
Workload kafkit-client msg/s rdkafka msg/s kafkit speedup
Produce 122,024.45 12,129.49 10.06x
Consume 114,726.34 31,006.31 3.70x

Producer p50, p95, and p99 latency in that run was also lower:

Client p50 us p95 us p99 us max us
kafkit-client 1,038 2,709 8,495 272,895
rdkafka 18,751 37,727 55,807 87,935

Consumer throughput was also higher in the same comparison:

Client Elapsed ms Msg/s MiB/s
kafkit-client 871.64 114,726.34 28.01
rdkafka 3,225.15 31,006.31 7.57

Consumer latency percentiles are intentionally not highlighted because the benchmark records payload age from prefill time. For consumers, the useful number from this run is fetch/decode throughput with commits disabled.

These are not universal Kafka performance numbers. They are a repeatable local comparison for this repo's benchmark shape. Different broker versions, network latency, compression, partition counts, batch sizes, commit behavior, message sizes, and durability settings can move the result. In this run, broker env vars were not set, so the benchmark app started separate Kafka testcontainers for kafkit-client and rdkafka. That isolates the clients but does not put both clients on the same broker process. The full report, commands, and specs are in examples/perf-comparison/PERFORMANCE_SUMMARY.md.

How It Works

The producer has the same broad shape as Kafka's Java producer: application calls enqueue records, a dedicated sender loop drains ready batches, batches are grouped by leader, and produce requests are dispatched asynchronously. The sender keeps a bounded number of requests in flight per broker connection, defaulting to five with ProducerConfig::max_in_flight_requests_per_connection, which matches the Java producer default.

Internally, the producer path is:

  1. send(...) validates and enqueues a ProduceRecord into the accumulator.
  2. The accumulator groups records by topic/partition and seals batches when they reach batch_size, expire linger, or need to flush.
  3. Metadata maps each partition to its current leader.
  4. Ready batches are grouped by leader and encoded into Kafka produce requests.
  5. Produce requests are sent in the background with bounded in-flight tracking.
  6. Completions resolve the original application futures, retry retriable errors, or fail records when delivery timeout is exceeded.

The consumer runtime follows a similar single-owner async model. It owns the metadata cache, coordinator connection, leader connections, assignment state, offset state, heartbeat scheduling, and buffered records. Polling asks the runtime for records; the runtime fetches from partition leaders, decodes record batches with kafka-protocol, and preserves partial trailing batches for the next fetch instead of treating a fetch-boundary partial batch as fatal.

Internally, the consumer path is:

  1. Subscription or manual assignment updates the runtime-owned assignment state.
  2. Metadata maps assigned partitions to their current leaders.
  3. Fetch requests are grouped by leader connection.
  4. Responses are decoded into ConsumerRecord batches in the runtime.
  5. Any records beyond the current poll demand stay buffered for later polls.
  6. Offset commits are explicit unless auto-commit is enabled.

Why It Is Faster In These Benchmarks

The performance advantage in the current comparison comes from a few concrete implementation choices:

  • Native Tokio I/O avoids crossing an FFI/native client boundary for every produce or consume operation.
  • The client targets Kafka 4.x and modern protocols directly, so it does not carry compatibility branches for older broker eras.
  • Producer dispatch is pipelined: the sender no longer waits for one produce response before issuing the next request to a leader.
  • Completion handling is decoupled from request dispatch, so the sender can keep batches moving while previous requests are still awaiting broker responses.
  • The default benchmark uses zero linger, which avoids intentionally delaying partial batches in a latency-sensitive workload.
  • Consumer fetches are runtime-owned and decode directly into Rust records without an FFI/native-client handoff.
  • Fetch decoding handles partial batch boundaries without forcing an error/retry path, which matters when a broker response ends at a batch boundary.
  • The consumer benchmark disables commits, so the comparison isolates fetch and decode throughput rather than coordinator commit latency.

librdkafka remains a mature and heavily optimized Kafka client. The claim here is narrower: for this repo's current release-mode benchmark, with the settings above, kafkit-client is faster.

Quick Start

Use KafkaClient when you want a compact, topic-scoped setup for application code:

use kafkit_client::{KafkaClient, KafkaMessage, RecordHeader};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let orders = KafkaClient::new("localhost:9092").topic("orders");

    let producer = orders.producer().connect().await?;
    producer
        .send_message(
            KafkaMessage::new("created".to_owned())
                .with_key("order-42")
                .with_header(RecordHeader::new("trace-id", "abc-123")),
        )
        .await?;

    producer.shutdown().await?;
    Ok(())
}

Producer

Use KafkaProducer and ProducerConfig directly when you need lower-level control over batching, compression, idempotence, transactions, or explicit topic/partition routing:

use kafkit_client::{
    KafkaProducer, ProduceRecord, ProducerCompression, ProducerConfig, RecordHeader,
};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let producer = KafkaProducer::connect(
        ProducerConfig::new("localhost:9092")
            .with_acks(-1)
            .with_compression(ProducerCompression::Zstd)
            .with_enable_idempotence(true),
    )
    .await?;

    producer
        .send(
            ProduceRecord::new("orders", 0, "created")
                .with_key("order-42")
                .with_timestamp(1_700_000_123_456)
                .with_header(RecordHeader::new("trace-id", "abc-123")),
        )
        .await?;

    producer
        .send(ProduceRecord::tombstone("orders", 0).with_key("order-41"))
        .await?;

    producer.flush().await?;
    producer.shutdown().await?;
    Ok(())
}

Consumer

Consumers use Kafka's modern group protocol. The ergonomic builder subscribes to the topic selected by KafkaClient::topic(...):

use kafkit_client::{AutoOffsetReset, KafkaClient};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let consumer = KafkaClient::new("localhost:9092")
        .topic("orders")
        .consumer("orders-reader")
        .with_auto_offset_reset(AutoOffsetReset::Earliest)
        .with_instance_id("orders-reader-1")
        .connect()
        .await?;

    loop {
        let records = consumer.poll().await?;

        for record in records.iter() {
            println!(
                "{}:{}@{}",
                record.topic,
                record.partition,
                record.offset
            );
        }

        consumer.commit(&records).await?;
    }
}

Admin

The admin client covers common cluster and topic operations:

use kafkit_client::{AdminConfig, KafkaAdmin, NewTopic};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;

    admin.create_topics([NewTopic::new("orders", 3, 1)]).await?;

    let topics = admin.list_topics().await?;
    let cluster = admin.describe_cluster().await?;

    println!("topics: {}", topics.len());
    println!("cluster id: {:?}", cluster.cluster_id);

    Ok(())
}

Security

TLS and SASL are configured on the same builder/config types used for plain connections:

use kafkit_client::{KafkaClient, TlsConfig};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let producer = KafkaClient::new("kafka.example.com:9093")
        .topic("orders")
        .producer()
        .with_tls(
            TlsConfig::new()
                .with_ca_cert_path("/etc/ssl/certs/kafka-ca.pem")
                .with_server_name("kafka.example.com"),
        )
        .with_sasl_scram_sha_512("alice", "correct-horse-battery-staple")
        .connect()
        .await?;

    producer.send("created".to_owned()).await?;
    producer.shutdown().await?;
    Ok(())
}

Supported security features:

  • TLS with system roots, custom CA files, server-name override, and client certs.
  • SASL/PLAIN.
  • SASL/SCRAM-SHA-256.
  • SASL/SCRAM-SHA-512.

Feature Highlights

  • Async producer, consumer, share-group consumer, and admin client.
  • Topic-scoped KafkaClient facade for concise application setup.
  • Lower-level KafkaProducer, KafkaConsumer, KafkaShareConsumer, and KafkaAdmin APIs for explicit configuration.
  • Record headers, nullable values, tombstones, and explicit record timestamps.
  • Configurable compression, linger, batching, retry backoff, request timeout, and delivery timeout.
  • Producer flush() and explicit shutdown() controls.
  • Transactional producer support, including send_offsets_to_transaction.
  • Manual assignment, topic-list subscription, and pattern subscription.
  • Seek, position, pause, resume, committed-offset lookup, beginning/end offset lookup, and timestamp offset lookup.
  • Topic create/list/describe/delete/config APIs, consumer group description, broker metadata lookup, and cluster description.
  • tracing instrumentation.

Choosing an API

  • Use KafkaClient for service code that mostly works with one topic at a time.
  • Use KafkaProducer with ProducerConfig for explicit partitioning, batching, compression, idempotence, or transactions.
  • Use KafkaConsumer with ConsumerConfig for direct control over group, assignment, fetch, offset, and commit behavior.
  • Use KafkaShareConsumer for Kafka share-group consumption.
  • Use KafkaAdmin with AdminConfig for cluster and topic management.

Operational Notes

  • Call shutdown().await on producers and consumers when your application is stopping so background tasks can drain and close connections cleanly.
  • Call flush().await on producers when you need all currently buffered records acknowledged before continuing.
  • Consumers return batches from poll().await; commit the records you have processed, or configure auto-commit if that matches your processing model.
  • Transactional producers must set a transactional id, call init_transactions().await, then use begin_transaction(), commit_transaction(), or abort_transaction().

Runnable Examples

The crate includes real programs under examples/. For a local Kafka 4.0+ broker on localhost:9092, run:

cargo run -p kafkit-client --example order_workflow

See examples/README.md for environment variables and Docker Compose notes.

More Docs