krafka 0.9.0

A pure Rust, async-native Apache Kafka client
Documentation
# ๐Ÿฆ€ Krafka

[![CI](https://github.com/hupe1980/krafka/actions/workflows/ci.yml/badge.svg)](https://github.com/hupe1980/krafka/actions/workflows/ci.yml)
[![Crates.io](https://img.shields.io/crates/v/krafka.svg)](https://crates.io/crates/krafka)
[![Documentation](https://docs.rs/krafka/badge.svg)](https://docs.rs/krafka)
[![MSRV](https://img.shields.io/badge/MSRV-1.88-blue.svg)](https://github.com/rust-lang/rust/releases/tag/1.88.0)
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE)

A pure Rust, async-native Apache Kafka client designed for high performance, safety, and ease of use.

## โœจ Features

- ๐Ÿฆ€ **Pure Rust**: No librdkafka or C dependencies
- โšก **Async-native**: Built on Tokio for true async I/O
- ๐Ÿ”’ **Zero unsafe**: Safe Rust by default
- ๐Ÿš€ **High performance**: Zero-copy buffers, inline hot paths, efficient batching, concurrent batch flushing
- ๐Ÿ“ฆ **Full protocol support**: Kafka protocol with all compression codecs
- ๐Ÿ”„ **Incremental fetch sessions**: KIP-227 fetch sessions for bandwidth-efficient multi-partition consumers
- ๐Ÿ” **TLS/SSL encryption**: Using rustls for secure connections
- ๐Ÿ”‘ **SASL authentication**: PLAIN, SCRAM-SHA-256/512, OAUTHBEARER mechanisms
- ๐Ÿ’ฏ **Transactions**: Exactly-once semantics with transactional producer
- โ˜๏ธ **Cloud-native**: First-class AWS MSK support including IAM auth
- ๐Ÿ›ก๏ธ **Security hardened**: Secret zeroization, constant-time auth (`subtle`), decompression bomb protection, decode loop bounds (`MAX_DECODE_ARRAY_LEN`)
- ๐Ÿ”„ **Built-in retry**: Exponential backoff with metadata refresh on leader changes
- ๐Ÿ“Š **Metrics**: Lock-free counters/gauges/latency wired into all hot paths
- ๐Ÿงช **Fuzz tested**: cargo-fuzz targets for protocol arrays, record batches, and response decoders

> **Minimum Broker Version:** Krafka requires **Apache Kafka 3.9+**. Protocol versions older than the Kafka 3.9 baseline have been removed.

## ๐Ÿš€ Quick Start

Add Krafka to your `Cargo.toml`:

```toml
[dependencies]
krafka = "0.9.0"
tokio = { version = "1", features = ["full"] }

# For AWS MSK IAM authentication with full SDK support:
# krafka = { version = "0.9.0", features = ["aws-msk"] }
```

### Producer

```rust
use krafka::producer::Producer;
use krafka::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
    let producer = Producer::builder()
        .bootstrap_servers("localhost:9092")
        .client_id("my-producer")
        .build()
        .await?;

    // Send a message
    let metadata = producer
        .send("my-topic", Some(b"key"), b"Hello, Kafka!")
        .await?;
    
    println!("Sent to partition {} at offset {}", 
             metadata.partition, metadata.offset);

    producer.close().await;
    Ok(())
}
```

### Consumer

```rust
use krafka::consumer::{Consumer, AutoOffsetReset};
use krafka::error::Result;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let consumer = Consumer::builder()
        .bootstrap_servers("localhost:9092")
        .group_id("my-consumer-group")
        .auto_offset_reset(AutoOffsetReset::Earliest)
        .build()
        .await?;

    consumer.subscribe(&["my-topic"]).await?;

    loop {
        let records = consumer.poll(Duration::from_secs(1)).await?;
        for record in records {
            if let Some(ref value) = record.value {
                println!(
                    "Received: topic={}, partition={}, offset={}, value={:?}",
                    record.topic,
                    record.partition,
                    record.offset,
                    String::from_utf8_lossy(value)
                );
            }
        }
    }
}
```

### Admin Client

```rust
use krafka::admin::{AdminClient, NewTopic};
use krafka::error::Result;
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<()> {
    let admin = AdminClient::builder()
        .bootstrap_servers("localhost:9092")
        .build()
        .await?;

    // Create a topic
    let topic = NewTopic::new("new-topic", 6, 3)
        .with_config("retention.ms", "604800000");

    admin.create_topics(vec![topic], Duration::from_secs(30), false).await?;

    // List topics
    let topics = admin.list_topics().await?;
    println!("Topics: {:?}", topics);

    Ok(())
}
```

### Transactional Producer

For exactly-once semantics across multiple partitions:

```rust
use krafka::producer::TransactionalProducer;
use krafka::error::Result;

#[tokio::main]
async fn main() -> Result<()> {
    let producer = TransactionalProducer::builder()
        .bootstrap_servers("localhost:9092")
        .transactional_id("my-transaction")
        .build()
        .await?;

    // Initialize transactions (once per producer)
    producer.init_transactions().await?;

    // Atomic transaction
    producer.begin_transaction()?;
    producer.send("topic-a", Some(b"key"), b"value1").await?;
    producer.send("topic-b", Some(b"key"), b"value2").await?;
    producer.commit_transaction().await?;

    Ok(())
}
```

### Authentication

Connect to secured Kafka clusters with SASL, SCRAM, OAUTHBEARER, or AWS MSK IAM โ€” available on all client types:

```rust
use krafka::producer::Producer;
use krafka::consumer::Consumer;
use krafka::AdminClient;

// Producer with SASL/SCRAM-SHA-256
let producer = Producer::builder()
    .bootstrap_servers("broker:9093")
    .sasl_scram_sha256("username", "password")
    .build()
    .await?;

// Consumer with SASL/PLAIN
let consumer = Consumer::builder()
    .bootstrap_servers("broker:9092")
    .group_id("secure-group")
    .sasl_plain("username", "password")
    .build()
    .await?;

// Producer with SASL/OAUTHBEARER
let producer = Producer::builder()
    .bootstrap_servers("broker:9093")
    .sasl_oauthbearer("your-jwt-token")
    .build()
    .await?;

// Admin with AWS MSK IAM
use krafka::auth::AuthConfig;
let auth = AuthConfig::aws_msk_iam("access_key", "secret_key", "us-east-1");
let admin = AdminClient::builder()
    .bootstrap_servers("broker:9094")
    .auth(auth)
    .build()
    .await?;
```

## ๐Ÿ“ฆ Modules

| Module | Description |
|--------|-------------|
| `producer` | High-throughput message production with batching and compression |
| `consumer` | Consumer groups with rebalancing, offset management, and static membership |
| `admin` | Cluster administration (topics, groups, records, configuration, ACLs) |
| `interceptor` | Producer and consumer interceptor hooks for observability |
| `protocol` | Kafka wire protocol implementation |
| `auth` | Authentication (SASL/PLAIN, SASL/SCRAM, SASL/OAUTHBEARER, AWS MSK IAM) |

## ๐Ÿ—œ๏ธ Compression

Krafka supports all Kafka compression codecs, individually feature-gated:

```rust
use krafka::producer::Producer;
use krafka::protocol::Compression;

let producer = Producer::builder()
    .bootstrap_servers("localhost:9092")
    .compression(Compression::Lz4)  // Fast compression
    .build()
    .await?;
```

| Codec | Cargo Feature | Crate | Characteristics |
|-------|---------------|-------|-----------------|
| `Compression::Gzip` | `gzip` | flate2 | Best ratio, slower |
| `Compression::Snappy` | `snappy` | snap | Good balance |
| `Compression::Lz4` | `lz4` | lz4_flex | Fastest |
| `Compression::Zstd` | `zstd` | zstd | Best modern choice (requires C toolchain) |

The default `compression` feature enables the pure-Rust codecs: gzip, snappy,
and LZ4. Zstd remains available through the explicit `zstd` or
`compression-all` feature because it requires a C toolchain via `zstd-sys`.
To select only what you need:

```toml
# Option 1: enable only the codecs you need
krafka = { version = "0.9.0", default-features = false, features = ["lz4", "snappy"] }

# Option 2: enable all compression codecs, including zstd
# krafka = { version = "0.9.0", features = ["compression-all"] }
```

## โšก Performance Tuning

### High Throughput Producer

```rust
use krafka::producer::{Producer, Acks};
use krafka::protocol::Compression;
use std::time::Duration;

let producer = Producer::builder()
    .bootstrap_servers("localhost:9092")
    .acks(Acks::Leader)
    .compression(Compression::Lz4)
    .batch_size(1048576)                  // 1MB batches
    .linger(Duration::from_millis(10))    // Allow batching
    .build()
    .await?;
```

### Low Latency Consumer

```rust
use krafka::consumer::Consumer;
use std::time::Duration;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("low-latency")
    .fetch_min_bytes(1)
    .fetch_max_wait(Duration::from_millis(10))
    .build()
    .await?;
```

## ๐Ÿ“š Documentation

Full documentation is available at **[hupe1980.github.io/krafka](https://hupe1980.github.io/krafka)**

- [Getting Started]https://hupe1980.github.io/krafka/getting-started
- [Producer Guide]https://hupe1980.github.io/krafka/producer
- [Consumer Guide]https://hupe1980.github.io/krafka/consumer
- [Admin Client]https://hupe1980.github.io/krafka/admin
- [Configuration Reference]https://hupe1980.github.io/krafka/configuration
- [Performance Tuning]https://hupe1980.github.io/krafka/performance
- [Architecture Overview]https://hupe1980.github.io/krafka/architecture
- [Metrics & Observability]https://hupe1980.github.io/krafka/metrics
- [Error Handling]https://hupe1980.github.io/krafka/errors
- [Interceptors]https://hupe1980.github.io/krafka/interceptors
- [Authentication]https://hupe1980.github.io/krafka/authentication

## ๐ŸŽฎ Examples

Run the examples with:

```bash
# Producer example
cargo run --example producer

# Consumer example
cargo run --example consumer

# Advanced consumer example (pause/resume, seek, manual commits)
cargo run --example consumer_advanced

# Admin client example
cargo run --example admin

# Transactional producer example
cargo run --example transactional_producer

# Authentication examples (SASL, SCRAM, MSK IAM)
cargo run --example authentication
```

## ๐Ÿ“Š Status

Krafka is feature-complete and production-ready.

**Features:**
- โœ… Protocol layer (all message types, compression, ACL messages, transactions, unified versioned encode/decode dispatch, wire-format validation)
- โœ… Network layer (async connections, pooling, TLS/SSL, IPv6 support)
- โœ… Producer (batching with linger timer, partitioning, compression, built-in retry with exponential backoff, metadata refresh on failure, max-in-flight enforcement via semaphore, buffer backpressure via `ProducerConfig::max_block`, interceptor hooks, zero-copy `Bytes` pipeline)
- โœ… Consumer (polling, streaming `recv()` with error propagation, offset management, auto-commit timer, seek, pause/resume, configurable partition assignment strategy, rebalance listeners, cooperative sticky assignor, static group membership (KIP-345), interceptor hooks, log compaction awareness, batched offset resolution, per-partition retry backoff)
- โœ… Admin Client (topic CRUD, partitions, configuration, ACL management, consumer groups, record deletion, leader epoch queries, automatic API version negotiation)
- โœ… Authentication (SASL/PLAIN, SASL/SCRAM-SHA-256/512, SASL/OAUTHBEARER, AWS MSK IAM with SDK support)
- โœ… TLS/SSL encryption (rustls, mTLS support)
- โœ… Transactions (exactly-once semantics with transactional producer โ€” full PID/epoch/sequence tracking)
- โœ… Metrics (counters, gauges, latency tracking โ€” all wired into producer/consumer hot paths)
- โœ… Tracing (OpenTelemetry-compatible spans with properly declared fields)
- โœ… Security hardening (secret zeroization, constant-time comparison, PBKDF2 validation, decompression limits, decode loop bounds)
- โœ… Fuzz testing (cargo-fuzz targets for protocol decode paths)

## ๐Ÿค Contributing

Contributions are welcome!

## ๐Ÿ“„ License

Licensed under the [MIT License](LICENSE).