Skip to main content

Crate kafkit_client

Crate kafkit_client 

Source
Expand description

Kafkit icon Kafkit Client

A native async Rust client for Apache Kafka 4.0 and newer.

kafkit-client is built on tokio and kafka-protocol. It exposes producer, consumer, share-group consumer, and admin APIs without wrapping librdkafka or the Java client.

§Status

This crate targets modern Kafka clusters only:

  • Apache Kafka 4.0+.
  • KRaft clusters.
  • Modern consumer groups using KIP-848.
  • Transaction protocol v2.
  • Share-group protocol support.

Classic consumer runtime protocols, ZooKeeper-era assumptions, and older broker protocol paths are intentionally out of scope. The admin client can still list, describe, and delete classic groups.

§Install

[dependencies]
kafkit-client = "0.1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

§Quick Start

Use KafkaClient when you want a compact, topic-scoped setup for application code:

use kafkit_client::{KafkaClient, KafkaMessage, RecordHeader};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let orders = KafkaClient::new("localhost:9092").topic("orders");

    let producer = orders.producer().connect().await?;
    producer
        .send_message(
            KafkaMessage::new("created".to_owned())
                .with_key("order-42")
                .with_header(RecordHeader::new("trace-id", "abc-123")),
        )
        .await?;

    producer.shutdown().await?;
    Ok(())
}

§Producer

Use KafkaProducer and ProducerConfig directly when you need lower-level control over batching, compression, idempotence, transactions, or explicit topic/partition routing:

use kafkit_client::{
    KafkaProducer, ProduceRecord, ProducerCompression, ProducerConfig, RecordHeader,
};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let producer = KafkaProducer::connect(
        ProducerConfig::new("localhost:9092")
            .with_acks(-1)
            .with_compression(ProducerCompression::Zstd)
            .with_enable_idempotence(true),
    )
    .await?;

    producer
        .send(
            ProduceRecord::new("orders", 0, "created")
                .with_key("order-42")
                .with_timestamp(1_700_000_123_456)
                .with_header(RecordHeader::new("trace-id", "abc-123")),
        )
        .await?;

    producer
        .send(ProduceRecord::tombstone("orders", 0).with_key("order-41"))
        .await?;

    producer.flush().await?;
    producer.shutdown().await?;
    Ok(())
}

§Consumer

Consumers use Kafka’s modern group protocol. The ergonomic builder subscribes to the topic selected by KafkaClient::topic(...):

use kafkit_client::{AutoOffsetReset, KafkaClient};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let consumer = KafkaClient::new("localhost:9092")
        .topic("orders")
        .consumer("orders-reader")
        .with_auto_offset_reset(AutoOffsetReset::Earliest)
        .with_instance_id("orders-reader-1")
        .connect()
        .await?;

    loop {
        let records = consumer.poll().await?;

        for record in records.iter() {
            println!(
                "{}:{}@{}",
                record.topic,
                record.partition,
                record.offset
            );
        }

        consumer.commit(&records).await?;
    }
}

§Admin

The admin client covers common cluster and topic operations:

use kafkit_client::{AdminConfig, KafkaAdmin, NewTopic};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;

    admin.create_topics([NewTopic::new("orders", 3, 1)]).await?;

    let topics = admin.list_topics().await?;
    let cluster = admin.describe_cluster().await?;

    println!("topics: {}", topics.len());
    println!("cluster id: {:?}", cluster.cluster_id);

    Ok(())
}

§Security

TLS and SASL are configured on the same builder/config types used for plain connections:

use kafkit_client::{KafkaClient, TlsConfig};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let producer = KafkaClient::new("kafka.example.com:9093")
        .topic("orders")
        .producer()
        .with_tls(
            TlsConfig::new()
                .with_ca_cert_path("/etc/ssl/certs/kafka-ca.pem")
                .with_server_name("kafka.example.com"),
        )
        .with_sasl_scram_sha_512("alice", "correct-horse-battery-staple")
        .connect()
        .await?;

    producer.send("created".to_owned()).await?;
    producer.shutdown().await?;
    Ok(())
}

Supported security features:

  • TLS with system roots, custom CA files, server-name override, and client certs.
  • SASL/PLAIN.
  • SASL/SCRAM-SHA-256.
  • SASL/SCRAM-SHA-512.

§Feature Highlights

  • Async producer, consumer, share-group consumer, and admin client.
  • Topic-scoped KafkaClient facade for concise application setup.
  • Lower-level KafkaProducer, KafkaConsumer, KafkaShareConsumer, and KafkaAdmin APIs for explicit configuration.
  • Record headers, nullable values, tombstones, and explicit record timestamps.
  • Configurable compression, linger, batching, retry backoff, request timeout, and delivery timeout.
  • Producer flush() and explicit shutdown() controls.
  • Transactional producer support, including send_offsets_to_transaction.
  • Manual assignment, topic-list subscription, and pattern subscription.
  • Seek, position, pause, resume, committed-offset lookup, beginning/end offset lookup, and timestamp offset lookup.
  • Topic create/list/describe/delete/config APIs, consumer group description, broker metadata lookup, and cluster description.
  • tracing instrumentation and metrics facade measurements for OpenTelemetry-compatible export.

§Choosing an API

  • Use KafkaClient for service code that mostly works with one topic at a time.
  • Use KafkaProducer with ProducerConfig for explicit partitioning, batching, bounded buffering, request sizing, compression, idempotence, or transactions.
  • Use KafkaConsumer with ConsumerConfig for direct control over group, assignment, fetch, offset, and commit behavior.
  • Use KafkaShareConsumer for Kafka share-group consumption.
  • Use KafkaAdmin with AdminConfig for cluster and topic management.

§Operational Notes

  • Call shutdown().await on producers and consumers when your application is stopping so background tasks can drain and close connections cleanly.
  • Call flush().await on producers when you need all currently buffered records acknowledged before continuing.
  • Consumers return batches from poll().await; commit the records you have processed, or configure auto-commit if that matches your processing model.
  • Transactional producers must set a transactional id. Prefer begin_transaction() for the normal lifecycle; it initializes transactions on first use. init_transactions().await is available for eager readiness checks.

§Performance

kafkit-client is designed as a native async Rust client rather than a wrapper around librdkafka. In the local release-mode comparison app in this repository, it currently outperforms rdkafka on both producer and consumer throughput for the measured workload.

Benchmark setup:

  • Command mode: --release.
  • Machine: Apple M3 Pro, 12 logical CPUs, 36 GiB RAM.
  • OS/toolchain: macOS 26.3.1, Rust 1.95.0, Docker 29.4.0.
  • Kafka: apache/kafka:4.2.0 managed by testcontainers.
  • Payload: 100,000 messages, 256-byte values.
  • Topic: three partitions.
  • Compression: none.
  • Producer: linger.ms = 0, concurrency 256.
  • Consumer: topic prefilled with 100,000 records by the comparison app, commits disabled.
Workloadkafkit-client msg/srdkafka msg/skafkit speedup
Produce122,024.4512,129.4910.06x
Consume114,726.3431,006.313.70x

Producer p50, p95, and p99 latency in that run was also lower:

Clientp50 usp95 usp99 usmax us
kafkit-client1,0382,7098,495272,895
rdkafka18,75137,72755,80787,935

Consumer throughput was also higher in the same comparison:

ClientElapsed msMsg/sMiB/s
kafkit-client871.64114,726.3428.01
rdkafka3,225.1531,006.317.57

Consumer latency percentiles are intentionally not highlighted because the benchmark records payload age from prefill time. For consumers, the useful number from this run is fetch/decode throughput with commits disabled.

These are not universal Kafka performance numbers. They are a repeatable local comparison for this repo’s benchmark shape. Different broker versions, network latency, compression, partition counts, batch sizes, commit behavior, message sizes, and durability settings can move the result. In this run, broker env vars were not set, so the benchmark app started separate Kafka testcontainers for kafkit-client and rdkafka. That isolates the clients but does not put both clients on the same broker process. The full report, commands, and specs are in examples/perf-comparison/PERFORMANCE_SUMMARY.md.

§How It Works

The producer has the same broad shape as Kafka’s Java producer: application calls enqueue records, a dedicated sender loop drains ready batches, batches are grouped by leader, and produce requests are dispatched asynchronously. The sender keeps a bounded number of requests in flight per broker connection, defaulting to five with ProducerConfig::max_in_flight_requests_per_connection, which matches the Java producer default.

Internally, the producer path is:

  1. send(...) validates and enqueues a ProduceRecord into the accumulator.
  2. The accumulator groups records by topic/partition and seals batches when they reach batch_size, expire linger, or need to flush.
  3. Metadata maps each partition to its current leader.
  4. Ready batches are grouped by leader and encoded into Kafka produce requests.
  5. Produce requests are sent in the background with bounded in-flight tracking.
  6. Completions resolve the original application futures, retry retriable errors, or fail records when delivery timeout is exceeded.

The consumer runtime follows a similar single-owner async model. It owns the metadata cache, coordinator connection, leader connections, assignment state, offset state, heartbeat scheduling, and buffered records. Polling asks the runtime for records; the runtime fetches from partition leaders, decodes record batches with kafka-protocol, and preserves partial trailing batches for the next fetch instead of treating a fetch-boundary partial batch as fatal.

Internally, the consumer path is:

  1. Subscription or manual assignment updates the runtime-owned assignment state.
  2. Metadata maps assigned partitions to their current leaders.
  3. Fetch requests are grouped by leader connection.
  4. Responses are decoded into ConsumerRecord batches in the runtime.
  5. Any records beyond the current poll demand stay buffered for later polls.
  6. Offset commits are explicit unless auto-commit is enabled.

§Why It Is Faster In These Benchmarks

The performance advantage in the current comparison comes from a few concrete implementation choices:

  • Native Tokio I/O avoids crossing an FFI/native client boundary for every produce or consume operation.
  • The client targets Kafka 4.x and modern protocols directly, so it does not carry compatibility branches for older broker eras.
  • Producer dispatch is pipelined: the sender no longer waits for one produce response before issuing the next request to a leader.
  • Completion handling is decoupled from request dispatch, so the sender can keep batches moving while previous requests are still awaiting broker responses.
  • The default benchmark uses zero linger, which avoids intentionally delaying partial batches in a latency-sensitive workload.
  • Consumer fetches are runtime-owned and decode directly into Rust records without an FFI/native-client handoff.
  • Fetch decoding handles partial batch boundaries without forcing an error/retry path, which matters when a broker response ends at a batch boundary.
  • The consumer benchmark disables commits, so the comparison isolates fetch and decode throughput rather than coordinator commit latency.

librdkafka remains a mature and heavily optimized Kafka client. The claim here is narrower: for this repo’s current release-mode benchmark, with the settings above, kafkit-client is faster.

§Runnable Examples

The crate includes real programs under examples/. For a local Kafka 4.0+ broker on localhost:9092, run:

cargo run -p kafkit-client --example order_workflow
cargo run -p kafkit-client --example transactional_order_worker
cargo run -p kafkit-client --example customer_profile_projection
cargo run -p kafkit-client --example cluster_operations_report

See examples/README.md for environment variables and Docker Compose notes.

§More Docs

Modules§

config
Configuration types for producers, consumers, admins, TLS, and SASL. Configuration types used to build clients.
error
Error and result types returned by this crate. Error types returned by Kafkit.
types
Common record, partition, offset, and metadata value types. Common value types shared by producers, consumers, and admin calls.

Structs§

AccessControlEntry
Access control entry: principal, host, operation, and permission type.
AccessControlEntryFilter
Filter form of AccessControlEntry.
AclBinding
Binding between a resource pattern and an access control entry.
AclBindingFilter
Filter matching ACL bindings.
AdminConfig
Admin Config.
AlterConfigOp
One config mutation used with KafkaAdmin::incremental_alter_configs.
BrokerDescription
Broker endpoint metadata returned in a ClusterDescription.
BrokerFeatureLevel
Finalized feature level reported by the cluster.
BrokerLogDirs
Log directory usage returned by KafkaAdmin::describe_log_dirs.
CancellationToken
A token which can be used to signal a cancellation request to one or more tasks.
ClusterDescription
Cluster metadata returned by KafkaAdmin::describe_cluster.
CommitOffset
Commit Offset.
ConfigEntry
One Kafka configuration entry returned by KafkaAdmin::describe_configs.
ConfigResource
Kafka resource identified for config describe or alter operations.
ConfigResourceConfig
Described configuration for one Kafka resource.
ConsumerConfig
Consumer Config.
ConsumerGroupDescription
Detailed group metadata returned by group describe APIs.
ConsumerGroupListing
Summary returned by KafkaAdmin::list_groups.
ConsumerGroupMemberDescription
Metadata for one member of a described consumer group.
ConsumerGroupMetadata
Consumer Group Metadata.
ConsumerRebalanceListener
Synchronous callback invoked when a group consumer assignment changes.
ConsumerRecord
Consumer Record.
ConsumerRecords
Consumer Records.
DeleteAclsResult
ACLs deleted for one delete filter.
ErrorClassification
Operational classification for a client error.
FeatureUpdate
Finalized broker feature update.
KafkaAdmin
Client for Kafka admin operations.
KafkaClient
A small builder facade around one or more bootstrap servers.
KafkaConsumer
A Kafka group consumer with explicit polling and commits.
KafkaMessage
Kafka Message.
KafkaProducer
A Kafka producer with batching, retries, idempotence, and transactions.
KafkaShareConsumer
A Kafka share group consumer.
KafkaTopic
A topic-scoped facade for producers and consumers.
LogDirDescription
One broker log directory.
NewPartitions
Partition increase request used with KafkaAdmin::create_partitions.
NewTopic
Topic definition used with KafkaAdmin::create_topics.
OffsetAndTimestamp
Offset And Timestamp.
ProduceAck
Produce Ack.
ProduceRecord
Produce Record.
ProducerConfig
Producer Config.
RecordHeader
Record Header.
ReplicaLogDirDescription
Size information for one replica log in a broker log directory.
ResourcePattern
Represents a pattern that ACLs use to match resources.
ResourcePatternFilter
Filter form of ResourcePattern.
SaslConfig
Sasl Config.
ShareAcknowledgementCommit
Share Acknowledgement Commit.
ShareConsumerOptions
Share Consumer Options.
ShareRecord
Share Record.
ShareRecords
Share Records.
SubscriptionPattern
Subscription Pattern.
TlsConfig
Tls Config.
TokioTcpConnector
TopicDescription
Detailed metadata returned by KafkaAdmin::describe_topics.
TopicListing
Summary returned by KafkaAdmin::list_topics.
TopicPartition
Topic Partition.
TopicPartitionDescription
Metadata for one partition in a TopicDescription.
TopicPartitionInfo
Topic Partition Info.
TopicPartitionKey
Topic Partition Key.
TopicPartitionOffset
Topic Partition Offset.
TopicPartitionOffsetAndTimestamp
Topic Partition Offset And Timestamp.
TopicPartitionTimestamp
Topic Partition Timestamp.

Enums§

AcknowledgeType
Acknowledge Type.
AclOperation
Operation granted or denied by an ACL.
AclPermissionType
Whether an ACL grants or denies access.
AdminError
Errors raised before or during admin operations.
AlterConfigOpType
Operation type for an incremental config change.
AutoOffsetReset
Auto Offset Reset.
BrokerError
Kafka broker error returned in a response payload.
ConfigResourceType
Kafka resource type used by config APIs.
ConnectedTcpStream
ConsumerError
Errors raised by the consumer API.
ConsumerGroupMetadataError
Errors found while validating consumer group metadata.
ConsumerRebalanceEvent
Consumer rebalance notification.
Error
Top-level error returned by the crate.
FeatureUpgradeType
Type of finalized feature update to perform.
IsolationLevel
Isolation Level.
PatternType
Resource pattern type used by ACLs.
ProducerCompression
Producer Compression.
ProducerError
Errors raised by the producer API.
ProducerPartitioner
Producer Partitioner.
ProtocolError
Kafka protocol-level errors raised before a typed broker error is available.
ResourceType
Type of Kafka resource an ACL applies to.
SaslMechanism
Sasl Mechanism.
SecurityProtocol
Security Protocol.
ShareAcquireMode
Share Acquire Mode.
TransactionStateError
Errors raised by the producer transaction state machine.
ValidationError
Errors raised while validating public API inputs.

Traits§

TcpConnector

Type Aliases§

AcknowledgementCommitCallback
Alias for acknowledgement commit callback.
ClientConfig
Alias for client config.
GroupDescription
Backwards-compatible alias for ConsumerGroupDescription.
GroupListing
Backwards-compatible alias for ConsumerGroupListing.
GroupMemberDescription
Backwards-compatible alias for ConsumerGroupMemberDescription.
Result
Result type used by the client APIs.