Expand description
§kafkit-client
A native async Rust client for Apache Kafka 4.0 and newer.
kafkit-client is built on tokio and kafka-protocol. It exposes producer,
consumer, share-group consumer, and admin APIs without wrapping librdkafka or the
Java client.
§Status
This crate targets modern Kafka clusters only:
- Apache Kafka 4.0+.
- KRaft clusters.
- Modern consumer groups using KIP-848.
- Transaction protocol v2.
- Share-group protocol support.
Classic consumer groups, ZooKeeper-era assumptions, and older broker protocol paths are intentionally out of scope.
§Install
[dependencies]
kafkit-client = "0.1.1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }§Quick Start
Use KafkaClient when you want a compact, topic-scoped setup for application
code:
use kafkit_client::{KafkaClient, KafkaMessage, RecordHeader};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let orders = KafkaClient::new("localhost:9092").topic("orders");
let producer = orders.producer().connect().await?;
producer
.send_message(
KafkaMessage::new("created".to_owned())
.with_key("order-42")
.with_header(RecordHeader::new("trace-id", "abc-123")),
)
.await?;
producer.shutdown().await?;
Ok(())
}§Producer
Use KafkaProducer and ProducerConfig directly when you need lower-level
control over batching, compression, idempotence, transactions, or explicit
topic/partition routing:
use kafkit_client::{
KafkaProducer, ProduceRecord, ProducerCompression, ProducerConfig, RecordHeader,
};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let producer = KafkaProducer::connect(
ProducerConfig::new("localhost:9092")
.with_acks(-1)
.with_compression(ProducerCompression::Zstd)
.with_enable_idempotence(true),
)
.await?;
producer
.send(
ProduceRecord::new("orders", 0, "created")
.with_key("order-42")
.with_timestamp(1_700_000_123_456)
.with_header(RecordHeader::new("trace-id", "abc-123")),
)
.await?;
producer
.send(ProduceRecord::tombstone("orders", 0).with_key("order-41"))
.await?;
producer.flush().await?;
producer.shutdown().await?;
Ok(())
}§Consumer
Consumers use Kafka’s modern group protocol. The ergonomic builder subscribes to
the topic selected by KafkaClient::topic(...):
use kafkit_client::{AutoOffsetReset, KafkaClient};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let consumer = KafkaClient::new("localhost:9092")
.topic("orders")
.consumer("orders-reader")
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.with_instance_id("orders-reader-1")
.connect()
.await?;
loop {
let records = consumer.poll().await?;
for record in records.iter() {
println!(
"{}:{}@{}",
record.topic,
record.partition,
record.offset
);
}
consumer.commit(&records).await?;
}
}§Admin
The admin client covers common cluster and topic operations:
use kafkit_client::{AdminConfig, KafkaAdmin, NewTopic};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;
admin.create_topics([NewTopic::new("orders", 3, 1)]).await?;
let topics = admin.list_topics().await?;
let cluster = admin.describe_cluster().await?;
println!("topics: {}", topics.len());
println!("cluster id: {:?}", cluster.cluster_id);
Ok(())
}§Security
TLS and SASL are configured on the same builder/config types used for plain connections:
use kafkit_client::{KafkaClient, TlsConfig};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let producer = KafkaClient::new("kafka.example.com:9093")
.topic("orders")
.producer()
.with_tls(
TlsConfig::new()
.with_ca_cert_path("/etc/ssl/certs/kafka-ca.pem")
.with_server_name("kafka.example.com"),
)
.with_sasl_scram_sha_512("alice", "correct-horse-battery-staple")
.connect()
.await?;
producer.send("created".to_owned()).await?;
producer.shutdown().await?;
Ok(())
}Supported security features:
- TLS with system roots, custom CA files, server-name override, and client certs.
- SASL/PLAIN.
- SASL/SCRAM-SHA-256.
- SASL/SCRAM-SHA-512.
§Feature Highlights
- Async producer, consumer, share-group consumer, and admin client.
- Topic-scoped
KafkaClientfacade for concise application setup. - Lower-level
KafkaProducer,KafkaConsumer,KafkaShareConsumer, andKafkaAdminAPIs for explicit configuration. - Record headers, nullable values, tombstones, and explicit record timestamps.
- Configurable compression, linger, batching, retry backoff, request timeout, and delivery timeout.
- Producer
flush()and explicitshutdown()controls. - Transactional producer support, including
send_offsets_to_transaction. - Manual assignment, topic-list subscription, and pattern subscription.
- Seek, position, pause, resume, committed-offset lookup, beginning/end offset lookup, and timestamp offset lookup.
- Topic create/list/describe/delete/config APIs, consumer group description, broker metadata lookup, and cluster description.
tracinginstrumentation.
§Choosing an API
- Use
KafkaClientfor service code that mostly works with one topic at a time. - Use
KafkaProducerwithProducerConfigfor explicit partitioning, batching, compression, idempotence, or transactions. - Use
KafkaConsumerwithConsumerConfigfor direct control over group, assignment, fetch, offset, and commit behavior. - Use
KafkaShareConsumerfor Kafka share-group consumption. - Use
KafkaAdminwithAdminConfigfor cluster and topic management.
§Operational Notes
- Call
shutdown().awaiton producers and consumers when your application is stopping so background tasks can drain and close connections cleanly. - Call
flush().awaiton producers when you need all currently buffered records acknowledged before continuing. - Consumers return batches from
poll().await; commit the records you have processed, or configure auto-commit if that matches your processing model. - Transactional producers must set a transactional id, call
init_transactions().await, then usebegin_transaction(),commit_transaction(), orabort_transaction().
§Runnable Examples
The crate includes real programs under examples/. For a local Kafka 4.0+
broker on localhost:9092, run:
cargo run -p kafkit-client --example order_workflowSee examples/README.md for environment variables and Docker Compose notes.
§More Docs
Modules§
- config
- Configuration types for producers, consumers, admins, TLS, and SASL. Configuration types used to build clients.
- error
- Error and result types returned by this crate. Error types returned by Kafkit.
- types
- Common record, partition, offset, and metadata value types. Common value types shared by producers, consumers, and admin calls.
Structs§
- Admin
Config - Admin Config.
- Alter
Config Op - Alter Config Op.
- Broker
Description - Broker Description.
- Broker
Feature Level - Broker Feature Level.
- Cluster
Description - Cluster Description.
- Commit
Offset - Commit Offset.
- Config
Entry - Config Entry.
- Config
Resource - Config Resource.
- Config
Resource Config - Config Resource Config.
- Consumer
Config - Consumer Config.
- Consumer
Group Description - Consumer Group Description.
- Consumer
Group Listing - Consumer Group Listing.
- Consumer
Group Member Description - Consumer Group Member Description.
- Consumer
Group Metadata - Consumer Group Metadata.
- Consumer
Record - Consumer Record.
- Consumer
Records - Consumer Records.
- Kafka
Admin - Client for Kafka admin operations.
- Kafka
Client - A small builder facade around one or more bootstrap servers.
- Kafka
Consumer - A Kafka group consumer with explicit polling and commits.
- Kafka
Message - Kafka Message.
- Kafka
Producer - A Kafka producer with batching, retries, idempotence, and transactions.
- Kafka
Share Consumer - A Kafka share group consumer.
- Kafka
Topic - A topic-scoped facade for producers and consumers.
- NewPartitions
- New Partitions.
- NewTopic
- New Topic.
- Offset
AndTimestamp - Offset And Timestamp.
- Produce
Ack - Produce Ack.
- Produce
Record - Produce Record.
- Producer
Config - Producer Config.
- Record
Header - Record Header.
- Sasl
Config - Sasl Config.
- Share
Acknowledgement Commit - Share Acknowledgement Commit.
- Share
Consumer Options - Share Consumer Options.
- Share
Record - Share Record.
- Share
Records - Share Records.
- Subscription
Pattern - Subscription Pattern.
- TlsConfig
- Tls Config.
- Topic
Description - Topic Description.
- Topic
Listing - Topic Listing.
- Topic
Partition - Topic Partition.
- Topic
Partition Description - Topic Partition Description.
- Topic
Partition Info - Topic Partition Info.
- Topic
Partition Key - Topic Partition Key.
- Topic
Partition Offset - Topic Partition Offset.
- Topic
Partition Offset AndTimestamp - Topic Partition Offset And Timestamp.
- Topic
Partition Timestamp - Topic Partition Timestamp.
Enums§
- Acknowledge
Type - Acknowledge Type.
- Admin
Error - Errors raised before or during admin operations.
- Alter
Config OpType - Alter Config Op Type.
- Auto
Offset Reset - Auto Offset Reset.
- Config
Resource Type - Config Resource Type.
- Consumer
Error - Errors raised by the consumer API.
- Consumer
Group Metadata Error - Errors found while validating consumer group metadata.
- Error
- Top-level error returned by the crate.
- Isolation
Level - Isolation Level.
- Producer
Compression - Producer Compression.
- Producer
Error - Errors raised by the producer API.
- Producer
Partitioner - Producer Partitioner.
- Sasl
Mechanism - Sasl Mechanism.
- Security
Protocol - Security Protocol.
- Share
Acquire Mode - Share Acquire Mode.
- Transaction
State Error - Errors raised by the producer transaction state machine.
Type Aliases§
- Acknowledgement
Commit Callback - Alias for acknowledgement commit callback.
- Client
Config - Alias for client config.
- Result
- Result type used by the client APIs.