Skip to main content

Crate kafkit_client

Crate kafkit_client 

Source
Expand description

§kafkit-client

A native async Rust client for Apache Kafka 4.0 and newer.

kafkit-client is built on tokio and kafka-protocol. It exposes producer, consumer, share-group consumer, and admin APIs without wrapping librdkafka or the Java client.

§Status

This crate targets modern Kafka clusters only:

  • Apache Kafka 4.0+.
  • KRaft clusters.
  • Modern consumer groups using KIP-848.
  • Transaction protocol v2.
  • Share-group protocol support.

Classic consumer groups, ZooKeeper-era assumptions, and older broker protocol paths are intentionally out of scope.

§Install

[dependencies]
kafkit-client = "0.1.1"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

§Quick Start

Use KafkaClient when you want a compact, topic-scoped setup for application code:

use kafkit_client::{KafkaClient, KafkaMessage, RecordHeader};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let orders = KafkaClient::new("localhost:9092").topic("orders");

    let producer = orders.producer().connect().await?;
    producer
        .send_message(
            KafkaMessage::new("created".to_owned())
                .with_key("order-42")
                .with_header(RecordHeader::new("trace-id", "abc-123")),
        )
        .await?;

    producer.shutdown().await?;
    Ok(())
}

§Producer

Use KafkaProducer and ProducerConfig directly when you need lower-level control over batching, compression, idempotence, transactions, or explicit topic/partition routing:

use kafkit_client::{
    KafkaProducer, ProduceRecord, ProducerCompression, ProducerConfig, RecordHeader,
};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let producer = KafkaProducer::connect(
        ProducerConfig::new("localhost:9092")
            .with_acks(-1)
            .with_compression(ProducerCompression::Zstd)
            .with_enable_idempotence(true),
    )
    .await?;

    producer
        .send(
            ProduceRecord::new("orders", 0, "created")
                .with_key("order-42")
                .with_timestamp(1_700_000_123_456)
                .with_header(RecordHeader::new("trace-id", "abc-123")),
        )
        .await?;

    producer
        .send(ProduceRecord::tombstone("orders", 0).with_key("order-41"))
        .await?;

    producer.flush().await?;
    producer.shutdown().await?;
    Ok(())
}

§Consumer

Consumers use Kafka’s modern group protocol. The ergonomic builder subscribes to the topic selected by KafkaClient::topic(...):

use kafkit_client::{AutoOffsetReset, KafkaClient};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let consumer = KafkaClient::new("localhost:9092")
        .topic("orders")
        .consumer("orders-reader")
        .with_auto_offset_reset(AutoOffsetReset::Earliest)
        .with_instance_id("orders-reader-1")
        .connect()
        .await?;

    loop {
        let records = consumer.poll().await?;

        for record in records.iter() {
            println!(
                "{}:{}@{}",
                record.topic,
                record.partition,
                record.offset
            );
        }

        consumer.commit(&records).await?;
    }
}

§Admin

The admin client covers common cluster and topic operations:

use kafkit_client::{AdminConfig, KafkaAdmin, NewTopic};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;

    admin.create_topics([NewTopic::new("orders", 3, 1)]).await?;

    let topics = admin.list_topics().await?;
    let cluster = admin.describe_cluster().await?;

    println!("topics: {}", topics.len());
    println!("cluster id: {:?}", cluster.cluster_id);

    Ok(())
}

§Security

TLS and SASL are configured on the same builder/config types used for plain connections:

use kafkit_client::{KafkaClient, TlsConfig};

#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
    let producer = KafkaClient::new("kafka.example.com:9093")
        .topic("orders")
        .producer()
        .with_tls(
            TlsConfig::new()
                .with_ca_cert_path("/etc/ssl/certs/kafka-ca.pem")
                .with_server_name("kafka.example.com"),
        )
        .with_sasl_scram_sha_512("alice", "correct-horse-battery-staple")
        .connect()
        .await?;

    producer.send("created".to_owned()).await?;
    producer.shutdown().await?;
    Ok(())
}

Supported security features:

  • TLS with system roots, custom CA files, server-name override, and client certs.
  • SASL/PLAIN.
  • SASL/SCRAM-SHA-256.
  • SASL/SCRAM-SHA-512.

§Feature Highlights

  • Async producer, consumer, share-group consumer, and admin client.
  • Topic-scoped KafkaClient facade for concise application setup.
  • Lower-level KafkaProducer, KafkaConsumer, KafkaShareConsumer, and KafkaAdmin APIs for explicit configuration.
  • Record headers, nullable values, tombstones, and explicit record timestamps.
  • Configurable compression, linger, batching, retry backoff, request timeout, and delivery timeout.
  • Producer flush() and explicit shutdown() controls.
  • Transactional producer support, including send_offsets_to_transaction.
  • Manual assignment, topic-list subscription, and pattern subscription.
  • Seek, position, pause, resume, committed-offset lookup, beginning/end offset lookup, and timestamp offset lookup.
  • Topic create/list/describe/delete/config APIs, consumer group description, broker metadata lookup, and cluster description.
  • tracing instrumentation.

§Choosing an API

  • Use KafkaClient for service code that mostly works with one topic at a time.
  • Use KafkaProducer with ProducerConfig for explicit partitioning, batching, compression, idempotence, or transactions.
  • Use KafkaConsumer with ConsumerConfig for direct control over group, assignment, fetch, offset, and commit behavior.
  • Use KafkaShareConsumer for Kafka share-group consumption.
  • Use KafkaAdmin with AdminConfig for cluster and topic management.

§Operational Notes

  • Call shutdown().await on producers and consumers when your application is stopping so background tasks can drain and close connections cleanly.
  • Call flush().await on producers when you need all currently buffered records acknowledged before continuing.
  • Consumers return batches from poll().await; commit the records you have processed, or configure auto-commit if that matches your processing model.
  • Transactional producers must set a transactional id, call init_transactions().await, then use begin_transaction(), commit_transaction(), or abort_transaction().

§Runnable Examples

The crate includes real programs under examples/. For a local Kafka 4.0+ broker on localhost:9092, run:

cargo run -p kafkit-client --example order_workflow

See examples/README.md for environment variables and Docker Compose notes.

§More Docs

Modules§

config
Configuration types for producers, consumers, admins, TLS, and SASL. Configuration types used to build clients.
error
Error and result types returned by this crate. Error types returned by Kafkit.
types
Common record, partition, offset, and metadata value types. Common value types shared by producers, consumers, and admin calls.

Structs§

AdminConfig
Admin Config.
AlterConfigOp
Alter Config Op.
BrokerDescription
Broker Description.
BrokerFeatureLevel
Broker Feature Level.
ClusterDescription
Cluster Description.
CommitOffset
Commit Offset.
ConfigEntry
Config Entry.
ConfigResource
Config Resource.
ConfigResourceConfig
Config Resource Config.
ConsumerConfig
Consumer Config.
ConsumerGroupDescription
Consumer Group Description.
ConsumerGroupListing
Consumer Group Listing.
ConsumerGroupMemberDescription
Consumer Group Member Description.
ConsumerGroupMetadata
Consumer Group Metadata.
ConsumerRecord
Consumer Record.
ConsumerRecords
Consumer Records.
KafkaAdmin
Client for Kafka admin operations.
KafkaClient
A small builder facade around one or more bootstrap servers.
KafkaConsumer
A Kafka group consumer with explicit polling and commits.
KafkaMessage
Kafka Message.
KafkaProducer
A Kafka producer with batching, retries, idempotence, and transactions.
KafkaShareConsumer
A Kafka share group consumer.
KafkaTopic
A topic-scoped facade for producers and consumers.
NewPartitions
New Partitions.
NewTopic
New Topic.
OffsetAndTimestamp
Offset And Timestamp.
ProduceAck
Produce Ack.
ProduceRecord
Produce Record.
ProducerConfig
Producer Config.
RecordHeader
Record Header.
SaslConfig
Sasl Config.
ShareAcknowledgementCommit
Share Acknowledgement Commit.
ShareConsumerOptions
Share Consumer Options.
ShareRecord
Share Record.
ShareRecords
Share Records.
SubscriptionPattern
Subscription Pattern.
TlsConfig
Tls Config.
TopicDescription
Topic Description.
TopicListing
Topic Listing.
TopicPartition
Topic Partition.
TopicPartitionDescription
Topic Partition Description.
TopicPartitionInfo
Topic Partition Info.
TopicPartitionKey
Topic Partition Key.
TopicPartitionOffset
Topic Partition Offset.
TopicPartitionOffsetAndTimestamp
Topic Partition Offset And Timestamp.
TopicPartitionTimestamp
Topic Partition Timestamp.

Enums§

AcknowledgeType
Acknowledge Type.
AdminError
Errors raised before or during admin operations.
AlterConfigOpType
Alter Config Op Type.
AutoOffsetReset
Auto Offset Reset.
ConfigResourceType
Config Resource Type.
ConsumerError
Errors raised by the consumer API.
ConsumerGroupMetadataError
Errors found while validating consumer group metadata.
Error
Top-level error returned by the crate.
IsolationLevel
Isolation Level.
ProducerCompression
Producer Compression.
ProducerError
Errors raised by the producer API.
ProducerPartitioner
Producer Partitioner.
SaslMechanism
Sasl Mechanism.
SecurityProtocol
Security Protocol.
ShareAcquireMode
Share Acquire Mode.
TransactionStateError
Errors raised by the producer transaction state machine.

Type Aliases§

AcknowledgementCommitCallback
Alias for acknowledgement commit callback.
ClientConfig
Alias for client config.
Result
Result type used by the client APIs.