# kafkit-client
A native async Rust client for Apache Kafka 4.0 and newer.
`kafkit-client` is built on `tokio` and `kafka-protocol`. It exposes producer,
consumer, share-group consumer, and admin APIs without wrapping librdkafka or the
Java client.
## Status
This crate targets modern Kafka clusters only:
- Apache Kafka 4.0+.
- KRaft clusters.
- Modern consumer groups using KIP-848.
- Transaction protocol v2.
- Share-group protocol support.
Classic consumer groups, ZooKeeper-era assumptions, and older broker protocol
paths are intentionally out of scope.
## Install
```toml
[dependencies]
kafkit-client = "0.1.1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
```
## Quick Start
Use `KafkaClient` when you want a compact, topic-scoped setup for application
code:
```rust,no_run
use kafkit_client::{KafkaClient, KafkaMessage, RecordHeader};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let orders = KafkaClient::new("localhost:9092").topic("orders");
let producer = orders.producer().connect().await?;
producer
.send_message(
KafkaMessage::new("created".to_owned())
.with_key("order-42")
.with_header(RecordHeader::new("trace-id", "abc-123")),
)
.await?;
producer.shutdown().await?;
Ok(())
}
```
## Producer
Use `KafkaProducer` and `ProducerConfig` directly when you need lower-level
control over batching, compression, idempotence, transactions, or explicit
topic/partition routing:
```rust,no_run
use kafkit_client::{
KafkaProducer, ProduceRecord, ProducerCompression, ProducerConfig, RecordHeader,
};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let producer = KafkaProducer::connect(
ProducerConfig::new("localhost:9092")
.with_compression(ProducerCompression::Zstd)
.with_enable_idempotence(true),
)
.await?;
producer
.send(
ProduceRecord::new("orders", 0, "created")
.with_key("order-42")
.with_timestamp(1_700_000_123_456)
.with_header(RecordHeader::new("trace-id", "abc-123")),
)
.await?;
producer
.send(ProduceRecord::tombstone("orders", 0).with_key("order-41"))
.await?;
producer.flush().await?;
producer.shutdown().await?;
Ok(())
}
```
## Consumer
Consumers use Kafka's modern group protocol. The ergonomic builder subscribes to
the topic selected by `KafkaClient::topic(...)`:
```rust,no_run
use kafkit_client::{AutoOffsetReset, KafkaClient};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let consumer = KafkaClient::new("localhost:9092")
.topic("orders")
.consumer("orders-reader")
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.with_instance_id("orders-reader-1")
.connect()
.await?;
loop {
let records = consumer.poll().await?;
for record in records.iter() {
println!(
"{}:{}@{}",
record.topic,
record.partition,
record.offset
);
}
consumer.commit(&records).await?;
}
}
```
## Admin
The admin client covers common cluster and topic operations:
```rust,no_run
use kafkit_client::{AdminConfig, KafkaAdmin, NewTopic};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;
admin.create_topics([NewTopic::new("orders", 3, 1)]).await?;
let topics = admin.list_topics().await?;
let cluster = admin.describe_cluster().await?;
println!("topics: {}", topics.len());
println!("cluster id: {:?}", cluster.cluster_id);
Ok(())
}
```
## Security
TLS and SASL are configured on the same builder/config types used for plain
connections:
```rust,no_run
use kafkit_client::{KafkaClient, TlsConfig};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let producer = KafkaClient::new("kafka.example.com:9093")
.topic("orders")
.producer()
.with_tls(
TlsConfig::new()
.with_ca_cert_path("/etc/ssl/certs/kafka-ca.pem")
.with_server_name("kafka.example.com"),
)
.with_sasl_scram_sha_512("alice", "correct-horse-battery-staple")
.connect()
.await?;
producer.send("created".to_owned()).await?;
producer.shutdown().await?;
Ok(())
}
```
Supported security features:
- TLS with system roots, custom CA files, server-name override, and client certs.
- SASL/PLAIN.
- SASL/SCRAM-SHA-256.
- SASL/SCRAM-SHA-512.
## Feature Highlights
- Async producer, consumer, share-group consumer, and admin client.
- Topic-scoped `KafkaClient` facade for concise application setup.
- Lower-level `KafkaProducer`, `KafkaConsumer`, `KafkaShareConsumer`, and
`KafkaAdmin` APIs for explicit configuration.
- Record headers, nullable values, tombstones, and explicit record timestamps.
- Configurable compression, linger, batching, retry backoff, request timeout,
and delivery timeout.
- Producer `flush()` and explicit `shutdown()` controls.
- Transactional producer support, including `send_offsets_to_transaction`.
- Manual assignment, topic-list subscription, and pattern subscription.
- Seek, position, pause, resume, committed-offset lookup, beginning/end offset
lookup, and timestamp offset lookup.
- Topic create/list/describe/delete/config APIs, consumer group description,
broker metadata lookup, and cluster description.
- `tracing` instrumentation.
## Choosing an API
- Use `KafkaClient` for service code that mostly works with one topic at a time.
- Use `KafkaProducer` with `ProducerConfig` for explicit partitioning, batching,
compression, idempotence, or transactions.
- Use `KafkaConsumer` with `ConsumerConfig` for direct control over group,
assignment, fetch, offset, and commit behavior.
- Use `KafkaShareConsumer` for Kafka share-group consumption.
- Use `KafkaAdmin` with `AdminConfig` for cluster and topic management.
## Operational Notes
- Call `shutdown().await` on producers and consumers when your application is
stopping so background tasks can drain and close connections cleanly.
- Call `flush().await` on producers when you need all currently buffered records
acknowledged before continuing.
- Consumers return batches from `poll().await`; commit the records you have
processed, or configure auto-commit if that matches your processing model.
- Transactional producers must set a transactional id, call
`init_transactions().await`, then use `begin_transaction()`, `commit_transaction()`,
or `abort_transaction()`.
## More Docs
- [Quickstart](docs/quickstart.md)
- [Configuration](docs/configuration.md)
- [Producer](docs/producer.md)
- [Consumer](docs/consumer.md)
- [Groups](docs/groups.md)
- [Transactions](docs/transactions.md)
- [Admin](docs/admin.md)
- [Security](docs/security.md)
- [Troubleshooting](docs/troubleshooting.md)
- [Compatibility](docs/compatibility.md)
- [Limitations](docs/limitations.md)
- [Migration notes](docs/migration.md)