Expand description
Kafkit Client
A native async Rust client for Apache Kafka 4.0 and newer.
kafkit-client is built on tokio and kafka-protocol. It exposes producer,
consumer, share-group consumer, and admin APIs without wrapping librdkafka or the
Java client.
§Status
This crate targets modern Kafka clusters only:
- Apache Kafka 4.0+.
- KRaft clusters.
- Modern consumer groups using KIP-848.
- Transaction protocol v2.
- Share-group protocol support.
Classic consumer runtime protocols, ZooKeeper-era assumptions, and older broker protocol paths are intentionally out of scope. The admin client can still list, describe, and delete classic groups.
§Install
[dependencies]
kafkit-client = "0.1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }§Quick Start
Use KafkaClient when you want a compact, topic-scoped setup for application
code:
use kafkit_client::{KafkaClient, KafkaMessage, RecordHeader};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let orders = KafkaClient::new("localhost:9092").topic("orders");
let producer = orders.producer().connect().await?;
producer
.send_message(
KafkaMessage::new("created".to_owned())
.with_key("order-42")
.with_header(RecordHeader::new("trace-id", "abc-123")),
)
.await?;
producer.shutdown().await?;
Ok(())
}§Producer
Use KafkaProducer and ProducerConfig directly when you need lower-level
control over batching, compression, idempotence, transactions, or explicit
topic/partition routing:
use kafkit_client::{
KafkaProducer, ProduceRecord, ProducerCompression, ProducerConfig, RecordHeader,
};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let producer = KafkaProducer::connect(
ProducerConfig::new("localhost:9092")
.with_acks(-1)
.with_compression(ProducerCompression::Zstd)
.with_enable_idempotence(true),
)
.await?;
producer
.send(
ProduceRecord::new("orders", 0, "created")
.with_key("order-42")
.with_timestamp(1_700_000_123_456)
.with_header(RecordHeader::new("trace-id", "abc-123")),
)
.await?;
producer
.send(ProduceRecord::tombstone("orders", 0).with_key("order-41"))
.await?;
producer.flush().await?;
producer.shutdown().await?;
Ok(())
}§Consumer
Consumers use Kafka’s modern group protocol. The ergonomic builder subscribes to
the topic selected by KafkaClient::topic(...):
use kafkit_client::{AutoOffsetReset, KafkaClient};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let consumer = KafkaClient::new("localhost:9092")
.topic("orders")
.consumer("orders-reader")
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.with_instance_id("orders-reader-1")
.connect()
.await?;
loop {
let records = consumer.poll().await?;
for record in records.iter() {
println!(
"{}:{}@{}",
record.topic,
record.partition,
record.offset
);
}
consumer.commit(&records).await?;
}
}§Admin
The admin client covers common cluster and topic operations:
use kafkit_client::{AdminConfig, KafkaAdmin, NewTopic};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;
admin.create_topics([NewTopic::new("orders", 3, 1)]).await?;
let topics = admin.list_topics().await?;
let cluster = admin.describe_cluster().await?;
println!("topics: {}", topics.len());
println!("cluster id: {:?}", cluster.cluster_id);
Ok(())
}§Security
TLS and SASL are configured on the same builder/config types used for plain connections:
use kafkit_client::{KafkaClient, TlsConfig};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let producer = KafkaClient::new("kafka.example.com:9093")
.topic("orders")
.producer()
.with_tls(
TlsConfig::new()
.with_ca_cert_path("/etc/ssl/certs/kafka-ca.pem")
.with_server_name("kafka.example.com"),
)
.with_sasl_scram_sha_512("alice", "correct-horse-battery-staple")
.connect()
.await?;
producer.send("created".to_owned()).await?;
producer.shutdown().await?;
Ok(())
}Supported security features:
- TLS with system roots, custom CA files, server-name override, and client certs.
- SASL/PLAIN.
- SASL/SCRAM-SHA-256.
- SASL/SCRAM-SHA-512.
§Feature Highlights
- Async producer, consumer, share-group consumer, and admin client.
- Topic-scoped
KafkaClientfacade for concise application setup. - Lower-level
KafkaProducer,KafkaConsumer,KafkaShareConsumer, andKafkaAdminAPIs for explicit configuration. - Record headers, nullable values, tombstones, and explicit record timestamps.
- Configurable compression, linger, batching, retry backoff, request timeout, and delivery timeout.
- Producer
flush()and explicitshutdown()controls. - Transactional producer support, including
send_offsets_to_transaction. - Manual assignment, topic-list subscription, and pattern subscription.
- Seek, position, pause, resume, committed-offset lookup, beginning/end offset lookup, and timestamp offset lookup.
- Topic create/list/describe/delete/config APIs, consumer group description, broker metadata lookup, and cluster description.
tracinginstrumentation andmetricsfacade measurements for OpenTelemetry-compatible export.
§Choosing an API
- Use
KafkaClientfor service code that mostly works with one topic at a time. - Use
KafkaProducerwithProducerConfigfor explicit partitioning, batching, bounded buffering, request sizing, compression, idempotence, or transactions. - Use
KafkaConsumerwithConsumerConfigfor direct control over group, assignment, fetch, offset, and commit behavior. - Use
KafkaShareConsumerfor Kafka share-group consumption. - Use
KafkaAdminwithAdminConfigfor cluster and topic management.
§Operational Notes
- Call
shutdown().awaiton producers and consumers when your application is stopping so background tasks can drain and close connections cleanly. - Call
flush().awaiton producers when you need all currently buffered records acknowledged before continuing. - Consumers return batches from
poll().await; commit the records you have processed, or configure auto-commit if that matches your processing model. - Transactional producers must set a transactional id. Prefer
begin_transaction()for the normal lifecycle; it initializes transactions on first use.init_transactions().awaitis available for eager readiness checks.
§Performance
kafkit-client is designed as a native async Rust client rather than a wrapper
around librdkafka. In the local release-mode comparison app in this repository,
it currently outperforms rdkafka on both producer and consumer throughput for
the measured workload.
Benchmark setup:
- Command mode:
--release. - Machine: Apple M3 Pro, 12 logical CPUs, 36 GiB RAM.
- OS/toolchain: macOS 26.3.1, Rust 1.95.0, Docker 29.4.0.
- Kafka:
apache/kafka:4.2.0managed by testcontainers. - Payload: 100,000 messages, 256-byte values.
- Topic: three partitions.
- Compression: none.
- Producer:
linger.ms = 0, concurrency 256. - Consumer: topic prefilled with 100,000 records by the comparison app, commits disabled.
| Workload | kafkit-client msg/s | rdkafka msg/s | kafkit speedup |
|---|---|---|---|
| Produce | 122,024.45 | 12,129.49 | 10.06x |
| Consume | 114,726.34 | 31,006.31 | 3.70x |
Producer p50, p95, and p99 latency in that run was also lower:
| Client | p50 us | p95 us | p99 us | max us |
|---|---|---|---|---|
| kafkit-client | 1,038 | 2,709 | 8,495 | 272,895 |
| rdkafka | 18,751 | 37,727 | 55,807 | 87,935 |
Consumer throughput was also higher in the same comparison:
| Client | Elapsed ms | Msg/s | MiB/s |
|---|---|---|---|
| kafkit-client | 871.64 | 114,726.34 | 28.01 |
| rdkafka | 3,225.15 | 31,006.31 | 7.57 |
Consumer latency percentiles are intentionally not highlighted because the benchmark records payload age from prefill time. For consumers, the useful number from this run is fetch/decode throughput with commits disabled.
These are not universal Kafka performance numbers. They are a repeatable local
comparison for this repo’s benchmark shape. Different broker versions, network
latency, compression, partition counts, batch sizes, commit behavior, message
sizes, and durability settings can move the result. In this run, broker env vars
were not set, so the benchmark app started separate Kafka testcontainers for
kafkit-client and rdkafka. That isolates the clients but does not put both
clients on the same broker process. The full report, commands, and specs are in
examples/perf-comparison/PERFORMANCE_SUMMARY.md.
§How It Works
The producer has the same broad shape as Kafka’s Java producer: application calls
enqueue records, a dedicated sender loop drains ready batches, batches are grouped
by leader, and produce requests are dispatched asynchronously. The sender keeps a
bounded number of requests in flight per broker connection, defaulting to five
with ProducerConfig::max_in_flight_requests_per_connection, which matches the
Java producer default.
Internally, the producer path is:
send(...)validates and enqueues aProduceRecordinto the accumulator.- The accumulator groups records by topic/partition and seals batches when they
reach
batch_size, expirelinger, or need to flush. - Metadata maps each partition to its current leader.
- Ready batches are grouped by leader and encoded into Kafka produce requests.
- Produce requests are sent in the background with bounded in-flight tracking.
- Completions resolve the original application futures, retry retriable errors, or fail records when delivery timeout is exceeded.
The consumer runtime follows a similar single-owner async model. It owns the
metadata cache, coordinator connection, leader connections, assignment state,
offset state, heartbeat scheduling, and buffered records. Polling asks the runtime
for records; the runtime fetches from partition leaders, decodes record batches
with kafka-protocol, and preserves partial trailing batches for the next fetch
instead of treating a fetch-boundary partial batch as fatal.
Internally, the consumer path is:
- Subscription or manual assignment updates the runtime-owned assignment state.
- Metadata maps assigned partitions to their current leaders.
- Fetch requests are grouped by leader connection.
- Responses are decoded into
ConsumerRecordbatches in the runtime. - Any records beyond the current poll demand stay buffered for later polls.
- Offset commits are explicit unless auto-commit is enabled.
§Why It Is Faster In These Benchmarks
The performance advantage in the current comparison comes from a few concrete implementation choices:
- Native Tokio I/O avoids crossing an FFI/native client boundary for every produce or consume operation.
- The client targets Kafka 4.x and modern protocols directly, so it does not carry compatibility branches for older broker eras.
- Producer dispatch is pipelined: the sender no longer waits for one produce response before issuing the next request to a leader.
- Completion handling is decoupled from request dispatch, so the sender can keep batches moving while previous requests are still awaiting broker responses.
- The default benchmark uses zero linger, which avoids intentionally delaying partial batches in a latency-sensitive workload.
- Consumer fetches are runtime-owned and decode directly into Rust records without an FFI/native-client handoff.
- Fetch decoding handles partial batch boundaries without forcing an error/retry path, which matters when a broker response ends at a batch boundary.
- The consumer benchmark disables commits, so the comparison isolates fetch and decode throughput rather than coordinator commit latency.
librdkafka remains a mature and heavily optimized Kafka client. The claim here
is narrower: for this repo’s current release-mode benchmark, with the settings
above, kafkit-client is faster.
§Runnable Examples
The crate includes real programs under examples/. For a local Kafka 4.0+
broker on localhost:9092, run:
cargo run -p kafkit-client --example order_workflow
cargo run -p kafkit-client --example transactional_order_worker
cargo run -p kafkit-client --example customer_profile_projection
cargo run -p kafkit-client --example cluster_operations_reportSee examples/README.md for environment variables and Docker Compose notes.
§More Docs
Modules§
- config
- Configuration types for producers, consumers, admins, TLS, and SASL. Configuration types used to build clients.
- error
- Error and result types returned by this crate. Error types returned by Kafkit.
- types
- Common record, partition, offset, and metadata value types. Common value types shared by producers, consumers, and admin calls.
Structs§
- Access
Control Entry - Access control entry: principal, host, operation, and permission type.
- Access
Control Entry Filter - Filter form of
AccessControlEntry. - AclBinding
- Binding between a resource pattern and an access control entry.
- AclBinding
Filter - Filter matching ACL bindings.
- Admin
Config - Admin Config.
- Alter
Config Op - One config mutation used with
KafkaAdmin::incremental_alter_configs. - Broker
Description - Broker endpoint metadata returned in a
ClusterDescription. - Broker
Feature Level - Finalized feature level reported by the cluster.
- Broker
LogDirs - Log directory usage returned by
KafkaAdmin::describe_log_dirs. - Cancellation
Token - A token which can be used to signal a cancellation request to one or more tasks.
- Cluster
Description - Cluster metadata returned by
KafkaAdmin::describe_cluster. - Commit
Offset - Commit Offset.
- Config
Entry - One Kafka configuration entry returned by
KafkaAdmin::describe_configs. - Config
Resource - Kafka resource identified for config describe or alter operations.
- Config
Resource Config - Described configuration for one Kafka resource.
- Consumer
Config - Consumer Config.
- Consumer
Group Description - Detailed group metadata returned by group describe APIs.
- Consumer
Group Listing - Summary returned by
KafkaAdmin::list_groups. - Consumer
Group Member Description - Metadata for one member of a described consumer group.
- Consumer
Group Metadata - Consumer Group Metadata.
- Consumer
Rebalance Listener - Synchronous callback invoked when a group consumer assignment changes.
- Consumer
Record - Consumer Record.
- Consumer
Records - Consumer Records.
- Delete
Acls Result - ACLs deleted for one delete filter.
- Error
Classification - Operational classification for a client error.
- Feature
Update - Finalized broker feature update.
- Kafka
Admin - Client for Kafka admin operations.
- Kafka
Client - A small builder facade around one or more bootstrap servers.
- Kafka
Consumer - A Kafka group consumer with explicit polling and commits.
- Kafka
Message - Kafka Message.
- Kafka
Producer - A Kafka producer with batching, retries, idempotence, and transactions.
- Kafka
Share Consumer - A Kafka share group consumer.
- Kafka
Topic - A topic-scoped facade for producers and consumers.
- LogDir
Description - One broker log directory.
- NewPartitions
- Partition increase request used with
KafkaAdmin::create_partitions. - NewTopic
- Topic definition used with
KafkaAdmin::create_topics. - Offset
AndTimestamp - Offset And Timestamp.
- Produce
Ack - Produce Ack.
- Produce
Record - Produce Record.
- Producer
Config - Producer Config.
- Record
Header - Record Header.
- Replica
LogDir Description - Size information for one replica log in a broker log directory.
- Resource
Pattern - Represents a pattern that ACLs use to match resources.
- Resource
Pattern Filter - Filter form of
ResourcePattern. - Sasl
Config - Sasl Config.
- Share
Acknowledgement Commit - Share Acknowledgement Commit.
- Share
Consumer Options - Share Consumer Options.
- Share
Record - Share Record.
- Share
Records - Share Records.
- Subscription
Pattern - Subscription Pattern.
- TlsConfig
- Tls Config.
- Tokio
TcpConnector - Topic
Description - Detailed metadata returned by
KafkaAdmin::describe_topics. - Topic
Listing - Summary returned by
KafkaAdmin::list_topics. - Topic
Partition - Topic Partition.
- Topic
Partition Description - Metadata for one partition in a
TopicDescription. - Topic
Partition Info - Topic Partition Info.
- Topic
Partition Key - Topic Partition Key.
- Topic
Partition Offset - Topic Partition Offset.
- Topic
Partition Offset AndTimestamp - Topic Partition Offset And Timestamp.
- Topic
Partition Timestamp - Topic Partition Timestamp.
Enums§
- Acknowledge
Type - Acknowledge Type.
- AclOperation
- Operation granted or denied by an ACL.
- AclPermission
Type - Whether an ACL grants or denies access.
- Admin
Error - Errors raised before or during admin operations.
- Alter
Config OpType - Operation type for an incremental config change.
- Auto
Offset Reset - Auto Offset Reset.
- Broker
Error - Kafka broker error returned in a response payload.
- Config
Resource Type - Kafka resource type used by config APIs.
- Connected
TcpStream - Consumer
Error - Errors raised by the consumer API.
- Consumer
Group Metadata Error - Errors found while validating consumer group metadata.
- Consumer
Rebalance Event - Consumer rebalance notification.
- Error
- Top-level error returned by the crate.
- Feature
Upgrade Type - Type of finalized feature update to perform.
- Isolation
Level - Isolation Level.
- Pattern
Type - Resource pattern type used by ACLs.
- Producer
Compression - Producer Compression.
- Producer
Error - Errors raised by the producer API.
- Producer
Partitioner - Producer Partitioner.
- Protocol
Error - Kafka protocol-level errors raised before a typed broker error is available.
- Resource
Type - Type of Kafka resource an ACL applies to.
- Sasl
Mechanism - Sasl Mechanism.
- Security
Protocol - Security Protocol.
- Share
Acquire Mode - Share Acquire Mode.
- Transaction
State Error - Errors raised by the producer transaction state machine.
- Validation
Error - Errors raised while validating public API inputs.
Traits§
Type Aliases§
- Acknowledgement
Commit Callback - Alias for acknowledgement commit callback.
- Client
Config - Alias for client config.
- Group
Description - Backwards-compatible alias for
ConsumerGroupDescription. - Group
Listing - Backwards-compatible alias for
ConsumerGroupListing. - Group
Member Description - Backwards-compatible alias for
ConsumerGroupMemberDescription. - Result
- Result type used by the client APIs.