# kafkit-client
Full-fledged Kafka client in native Rust.
## What It Is
- Async Kafka client built with Rust, Tokio, and `kafka-protocol`.
- Application-facing facade for producer, consumer, share-group consumer, and admin APIs.
- Kafka 4.0+ focused.
- Designed for direct use in services, tools, tests, and operational workflows.
## Features
- Async producer.
- Async consumer.
- Kafka share-group consumer.
- Admin client.
- Topic-scoped `KafkaClient` builder.
- Record headers.
- Nullable values and tombstones.
- Explicit record timestamps.
- Configurable compression.
- Flush and shutdown controls.
- Delivery timeout handling.
- Manual assignment.
- Subscribe by topic list.
- Subscribe by topic pattern.
- Seek, position, pause, and resume.
- Beginning, end, committed, and timestamp offset lookup.
- Transactional producer support.
- `send_offsets_to_transaction`.
- Transaction protocol v2 support.
- TLS.
- SASL/PLAIN.
- SASL/SCRAM-SHA-256.
- SASL/SCRAM-SHA-512.
- Broker metadata lookup.
- Topic create, list, describe, delete, and config APIs.
- Consumer group listing and description.
- Broker and cluster description.
- Tracing instrumentation.
## Quick Start
```rust
use kafkit_client::{KafkaClient, KafkaMessage, RecordHeader};
# async fn example() -> kafkit_client::Result<()> {
let orders = KafkaClient::new("localhost:9092").topic("orders");
let producer = orders.producer().connect().await?;
producer
.send_message(
KafkaMessage::new("created".to_owned())
.with_key("order-42")
.with_header(RecordHeader::new("trace-id", "abc-123")),
)
.await?;
producer.shutdown().await?;
# Ok(())
# }
```
## Producer
```rust
use kafkit_client::{KafkaProducer, ProduceRecord, ProducerConfig};
# async fn example() -> kafkit_client::Result<()> {
let producer = KafkaProducer::connect(ProducerConfig::new("localhost:9092")).await?;
producer
.send(ProduceRecord::new("orders", 0, "created"))
.await?;
producer.flush().await?;
producer.shutdown().await?;
# Ok(())
# }
```
## Consumer
```rust
use kafkit_client::{AutoOffsetReset, KafkaClient};
# async fn example() -> kafkit_client::Result<()> {
let consumer = KafkaClient::new("localhost:9092")
.topic("orders")
.consumer("orders-reader")
.with_auto_offset_reset(AutoOffsetReset::Earliest)
.connect()
.await?;
let records = consumer.poll().await?;
consumer.commit(&records).await?;
consumer.shutdown().await?;
# Ok(())
# }
```
## Admin
```rust
use kafkit_client::{AdminConfig, KafkaAdmin, NewTopic};
# async fn example() -> kafkit_client::Result<()> {
let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;
admin.create_topics([NewTopic::new("orders", 3, 1)]).await?;
let topics = admin.list_topics().await?;
let cluster = admin.describe_cluster().await?;
# let _ = (topics, cluster);
# Ok(())
# }
```
## Security
- TLS with custom CA support.
- TLS server-name override.
- TLS client certificates.
- SASL/PLAIN.
- SASL/SCRAM-SHA-256.
- SASL/SCRAM-SHA-512.
## Compatibility
- Apache Kafka 4.0+.
- Modern consumer groups.
- Transaction protocol v2.
- Older brokers are intentionally unsupported.
## Crate Surface
- `KafkaClient` for ergonomic application setup.
- `KafkaProducer` for lower-level producer control.
- `KafkaConsumer` for lower-level consumer control.
- `KafkaShareConsumer` for share-group consumption.
- `KafkaAdmin` for administrative operations.
- `ProducerConfig`, `ConsumerConfig`, and `AdminConfig` for direct configuration.
## More Docs
- [Quickstart](docs/quickstart.md)
- [Configuration](docs/configuration.md)
- [Producer](docs/producer.md)
- [Consumer](docs/consumer.md)
- [Transactions](docs/transactions.md)
- [Admin](docs/admin.md)
- [Security](docs/security.md)
- [Troubleshooting](docs/troubleshooting.md)
- [Compatibility](docs/compatibility.md)
- [Limitations](docs/limitations.md)
- [Migration notes](docs/migration.md)