kafkit-client 0.1.2

Kafka 4.0+ pure Rust client.
Documentation
# kafkit-client

A native async Rust client for Apache Kafka 4.0 and newer.

`kafkit-client` is built on `tokio` and `kafka-protocol`. It exposes producer,
consumer, share-group consumer, and admin APIs without wrapping librdkafka or the
Java client.

## Status

This crate targets modern Kafka clusters only:

- Apache Kafka 4.0+.
- KRaft clusters.
- Modern consumer groups using KIP-848.
- Transaction protocol v2.
- Share-group protocol support.

Classic consumer groups, ZooKeeper-era assumptions, and older broker protocol
paths are intentionally out of scope.

## Install

```toml
[dependencies]
kafkit-client = "0.1.1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
```

## Quick Start

Use `KafkaClient` when you want a compact, topic-scoped setup for application
code:

```rust,no_run
use kafkit_client::{KafkaClient, KafkaMessage, RecordHeader};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let orders = KafkaClient::new("localhost:9092").topic("orders");

    let producer = orders.producer().connect().await?;
    producer
        .send_message(
            KafkaMessage::new("created".to_owned())
                .with_key("order-42")
                .with_header(RecordHeader::new("trace-id", "abc-123")),
        )
        .await?;

    producer.shutdown().await?;
    Ok(())
}
```

## Producer

Use `KafkaProducer` and `ProducerConfig` directly when you need lower-level
control over batching, compression, idempotence, transactions, or explicit
topic/partition routing:

```rust,no_run
use kafkit_client::{
    KafkaProducer, ProduceRecord, ProducerCompression, ProducerConfig, RecordHeader,
};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let producer = KafkaProducer::connect(
        ProducerConfig::new("localhost:9092")
            .with_acks(-1)
            .with_compression(ProducerCompression::Zstd)
            .with_enable_idempotence(true),
    )
    .await?;

    producer
        .send(
            ProduceRecord::new("orders", 0, "created")
                .with_key("order-42")
                .with_timestamp(1_700_000_123_456)
                .with_header(RecordHeader::new("trace-id", "abc-123")),
        )
        .await?;

    producer
        .send(ProduceRecord::tombstone("orders", 0).with_key("order-41"))
        .await?;

    producer.flush().await?;
    producer.shutdown().await?;
    Ok(())
}
```

## Consumer

Consumers use Kafka's modern group protocol. The ergonomic builder subscribes to
the topic selected by `KafkaClient::topic(...)`:

```rust,no_run
use kafkit_client::{AutoOffsetReset, KafkaClient};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let consumer = KafkaClient::new("localhost:9092")
        .topic("orders")
        .consumer("orders-reader")
        .with_auto_offset_reset(AutoOffsetReset::Earliest)
        .with_instance_id("orders-reader-1")
        .connect()
        .await?;

    loop {
        let records = consumer.poll().await?;

        for record in records.iter() {
            println!(
                "{}:{}@{}",
                record.topic,
                record.partition,
                record.offset
            );
        }

        consumer.commit(&records).await?;
    }
}
```

## Admin

The admin client covers common cluster and topic operations:

```rust,no_run
use kafkit_client::{AdminConfig, KafkaAdmin, NewTopic};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;

    admin.create_topics([NewTopic::new("orders", 3, 1)]).await?;

    let topics = admin.list_topics().await?;
    let cluster = admin.describe_cluster().await?;

    println!("topics: {}", topics.len());
    println!("cluster id: {:?}", cluster.cluster_id);

    Ok(())
}
```

## Security

TLS and SASL are configured on the same builder/config types used for plain
connections:

```rust,no_run
use kafkit_client::{KafkaClient, TlsConfig};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let producer = KafkaClient::new("kafka.example.com:9093")
        .topic("orders")
        .producer()
        .with_tls(
            TlsConfig::new()
                .with_ca_cert_path("/etc/ssl/certs/kafka-ca.pem")
                .with_server_name("kafka.example.com"),
        )
        .with_sasl_scram_sha_512("alice", "correct-horse-battery-staple")
        .connect()
        .await?;

    producer.send("created".to_owned()).await?;
    producer.shutdown().await?;
    Ok(())
}
```

Supported security features:

- TLS with system roots, custom CA files, server-name override, and client certs.
- SASL/PLAIN.
- SASL/SCRAM-SHA-256.
- SASL/SCRAM-SHA-512.

## Feature Highlights

- Async producer, consumer, share-group consumer, and admin client.
- Topic-scoped `KafkaClient` facade for concise application setup.
- Lower-level `KafkaProducer`, `KafkaConsumer`, `KafkaShareConsumer`, and
  `KafkaAdmin` APIs for explicit configuration.
- Record headers, nullable values, tombstones, and explicit record timestamps.
- Configurable compression, linger, batching, retry backoff, request timeout,
  and delivery timeout.
- Producer `flush()` and explicit `shutdown()` controls.
- Transactional producer support, including `send_offsets_to_transaction`.
- Manual assignment, topic-list subscription, and pattern subscription.
- Seek, position, pause, resume, committed-offset lookup, beginning/end offset
  lookup, and timestamp offset lookup.
- Topic create/list/describe/delete/config APIs, consumer group description,
  broker metadata lookup, and cluster description.
- `tracing` instrumentation.

## Choosing an API

- Use `KafkaClient` for service code that mostly works with one topic at a time.
- Use `KafkaProducer` with `ProducerConfig` for explicit partitioning, batching,
  compression, idempotence, or transactions.
- Use `KafkaConsumer` with `ConsumerConfig` for direct control over group,
  assignment, fetch, offset, and commit behavior.
- Use `KafkaShareConsumer` for Kafka share-group consumption.
- Use `KafkaAdmin` with `AdminConfig` for cluster and topic management.

## Operational Notes

- Call `shutdown().await` on producers and consumers when your application is
  stopping so background tasks can drain and close connections cleanly.
- Call `flush().await` on producers when you need all currently buffered records
  acknowledged before continuing.
- Consumers return batches from `poll().await`; commit the records you have
  processed, or configure auto-commit if that matches your processing model.
- Transactional producers must set a transactional id, call
  `init_transactions().await`, then use `begin_transaction()`, `commit_transaction()`,
  or `abort_transaction()`.

## Runnable Examples

The crate includes real programs under `examples/`. For a local Kafka 4.0+
broker on `localhost:9092`, run:

```sh
cargo run -p kafkit-client --example order_workflow
```

See [examples/README.md](examples/README.md) for environment variables and
Docker Compose notes.

## More Docs

- [Quickstart]docs/quickstart.md
- [Configuration]docs/configuration.md
- [Producer]docs/producer.md
- [Consumer]docs/consumer.md
- [Groups]docs/groups.md
- [Transactions]docs/transactions.md
- [Admin]docs/admin.md
- [Security]docs/security.md
- [Troubleshooting]docs/troubleshooting.md
- [Compatibility]docs/compatibility.md
- [Limitations]docs/limitations.md
- [Migration notes]docs/migration.md