kafkit-client 0.1.2

Kafka 4.0+ pure Rust client.
Documentation

kafkit-client

A native async Rust client for Apache Kafka 4.0 and newer.

kafkit-client is built on tokio and kafka-protocol. It exposes producer, consumer, share-group consumer, and admin APIs without wrapping librdkafka or the Java client.

Status

This crate targets modern Kafka clusters only:

  • Apache Kafka 4.0+.
  • KRaft clusters.
  • Modern consumer groups using KIP-848.
  • Transaction protocol v2.
  • Share-group protocol support.

Classic consumer groups, ZooKeeper-era assumptions, and older broker protocol paths are intentionally out of scope.

Install

[dependencies]
kafkit-client = "0.1.1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

Quick Start

Use KafkaClient when you want a compact, topic-scoped setup for application code:

use kafkit_client::{KafkaClient, KafkaMessage, RecordHeader};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let orders = KafkaClient::new("localhost:9092").topic("orders");

    let producer = orders.producer().connect().await?;
    producer
        .send_message(
            KafkaMessage::new("created".to_owned())
                .with_key("order-42")
                .with_header(RecordHeader::new("trace-id", "abc-123")),
        )
        .await?;

    producer.shutdown().await?;
    Ok(())
}

Producer

Use KafkaProducer and ProducerConfig directly when you need lower-level control over batching, compression, idempotence, transactions, or explicit topic/partition routing:

use kafkit_client::{
    KafkaProducer, ProduceRecord, ProducerCompression, ProducerConfig, RecordHeader,
};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let producer = KafkaProducer::connect(
        ProducerConfig::new("localhost:9092")
            .with_acks(-1)
            .with_compression(ProducerCompression::Zstd)
            .with_enable_idempotence(true),
    )
    .await?;

    producer
        .send(
            ProduceRecord::new("orders", 0, "created")
                .with_key("order-42")
                .with_timestamp(1_700_000_123_456)
                .with_header(RecordHeader::new("trace-id", "abc-123")),
        )
        .await?;

    producer
        .send(ProduceRecord::tombstone("orders", 0).with_key("order-41"))
        .await?;

    producer.flush().await?;
    producer.shutdown().await?;
    Ok(())
}

Consumer

Consumers use Kafka's modern group protocol. The ergonomic builder subscribes to the topic selected by KafkaClient::topic(...):

use kafkit_client::{AutoOffsetReset, KafkaClient};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let consumer = KafkaClient::new("localhost:9092")
        .topic("orders")
        .consumer("orders-reader")
        .with_auto_offset_reset(AutoOffsetReset::Earliest)
        .with_instance_id("orders-reader-1")
        .connect()
        .await?;

    loop {
        let records = consumer.poll().await?;

        for record in records.iter() {
            println!(
                "{}:{}@{}",
                record.topic,
                record.partition,
                record.offset
            );
        }

        consumer.commit(&records).await?;
    }
}

Admin

The admin client covers common cluster and topic operations:

use kafkit_client::{AdminConfig, KafkaAdmin, NewTopic};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;

    admin.create_topics([NewTopic::new("orders", 3, 1)]).await?;

    let topics = admin.list_topics().await?;
    let cluster = admin.describe_cluster().await?;

    println!("topics: {}", topics.len());
    println!("cluster id: {:?}", cluster.cluster_id);

    Ok(())
}

Security

TLS and SASL are configured on the same builder/config types used for plain connections:

use kafkit_client::{KafkaClient, TlsConfig};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let producer = KafkaClient::new("kafka.example.com:9093")
        .topic("orders")
        .producer()
        .with_tls(
            TlsConfig::new()
                .with_ca_cert_path("/etc/ssl/certs/kafka-ca.pem")
                .with_server_name("kafka.example.com"),
        )
        .with_sasl_scram_sha_512("alice", "correct-horse-battery-staple")
        .connect()
        .await?;

    producer.send("created".to_owned()).await?;
    producer.shutdown().await?;
    Ok(())
}

Supported security features:

  • TLS with system roots, custom CA files, server-name override, and client certs.
  • SASL/PLAIN.
  • SASL/SCRAM-SHA-256.
  • SASL/SCRAM-SHA-512.

Feature Highlights

  • Async producer, consumer, share-group consumer, and admin client.
  • Topic-scoped KafkaClient facade for concise application setup.
  • Lower-level KafkaProducer, KafkaConsumer, KafkaShareConsumer, and KafkaAdmin APIs for explicit configuration.
  • Record headers, nullable values, tombstones, and explicit record timestamps.
  • Configurable compression, linger, batching, retry backoff, request timeout, and delivery timeout.
  • Producer flush() and explicit shutdown() controls.
  • Transactional producer support, including send_offsets_to_transaction.
  • Manual assignment, topic-list subscription, and pattern subscription.
  • Seek, position, pause, resume, committed-offset lookup, beginning/end offset lookup, and timestamp offset lookup.
  • Topic create/list/describe/delete/config APIs, consumer group description, broker metadata lookup, and cluster description.
  • tracing instrumentation.

Choosing an API

  • Use KafkaClient for service code that mostly works with one topic at a time.
  • Use KafkaProducer with ProducerConfig for explicit partitioning, batching, compression, idempotence, or transactions.
  • Use KafkaConsumer with ConsumerConfig for direct control over group, assignment, fetch, offset, and commit behavior.
  • Use KafkaShareConsumer for Kafka share-group consumption.
  • Use KafkaAdmin with AdminConfig for cluster and topic management.

Operational Notes

  • Call shutdown().await on producers and consumers when your application is stopping so background tasks can drain and close connections cleanly.
  • Call flush().await on producers when you need all currently buffered records acknowledged before continuing.
  • Consumers return batches from poll().await; commit the records you have processed, or configure auto-commit if that matches your processing model.
  • Transactional producers must set a transactional id, call init_transactions().await, then use begin_transaction(), commit_transaction(), or abort_transaction().

Runnable Examples

The crate includes real programs under examples/. For a local Kafka 4.0+ broker on localhost:9092, run:

cargo run -p kafkit-client --example order_workflow

See examples/README.md for environment variables and Docker Compose notes.

More Docs