kafkit-client 0.1.0

Kafka 4.0+ pure Rust client.
Documentation

kafkit-client

Full-fledged Kafka client in native Rust.

What It Is

  • Async Kafka client built with Rust, Tokio, and kafka-protocol.
  • Application-facing facade for producer, consumer, share-group consumer, and admin APIs.
  • Kafka 4.0+ focused.
  • Designed for direct use in services, tools, tests, and operational workflows.

Features

  • Async producer.
  • Async consumer.
  • Kafka share-group consumer.
  • Admin client.
  • Topic-scoped KafkaClient builder.
  • Record headers.
  • Nullable values and tombstones.
  • Explicit record timestamps.
  • Configurable compression.
  • Flush and shutdown controls.
  • Delivery timeout handling.
  • Manual assignment.
  • Subscribe by topic list.
  • Subscribe by topic pattern.
  • Seek, position, pause, and resume.
  • Beginning, end, committed, and timestamp offset lookup.
  • Transactional producer support.
  • send_offsets_to_transaction.
  • Transaction protocol v2 support.
  • TLS.
  • SASL/PLAIN.
  • SASL/SCRAM-SHA-256.
  • SASL/SCRAM-SHA-512.
  • Broker metadata lookup.
  • Topic create, list, describe, delete, and config APIs.
  • Consumer group listing and description.
  • Broker and cluster description.
  • Tracing instrumentation.

Quick Start

use kafkit_client::{KafkaClient, KafkaMessage, RecordHeader};

# async fn example() -> kafkit_client::Result<()> {
let orders = KafkaClient::new("localhost:9092").topic("orders");

let producer = orders.producer().connect().await?;
producer
    .send_message(
        KafkaMessage::new("created".to_owned())
            .with_key("order-42")
            .with_header(RecordHeader::new("trace-id", "abc-123")),
    )
    .await?;
producer.shutdown().await?;
# Ok(())
# }

Producer

use kafkit_client::{KafkaProducer, ProduceRecord, ProducerConfig};

# async fn example() -> kafkit_client::Result<()> {
let producer = KafkaProducer::connect(ProducerConfig::new("localhost:9092")).await?;
producer
    .send(ProduceRecord::new("orders", 0, "created"))
    .await?;
producer.flush().await?;
producer.shutdown().await?;
# Ok(())
# }

Consumer

use kafkit_client::{AutoOffsetReset, KafkaClient};

# async fn example() -> kafkit_client::Result<()> {
let consumer = KafkaClient::new("localhost:9092")
    .topic("orders")
    .consumer("orders-reader")
    .with_auto_offset_reset(AutoOffsetReset::Earliest)
    .connect()
    .await?;

let records = consumer.poll().await?;
consumer.commit(&records).await?;
consumer.shutdown().await?;
# Ok(())
# }

Admin

use kafkit_client::{AdminConfig, KafkaAdmin, NewTopic};

# async fn example() -> kafkit_client::Result<()> {
let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;
admin.create_topics([NewTopic::new("orders", 3, 1)]).await?;
let topics = admin.list_topics().await?;
let cluster = admin.describe_cluster().await?;
# let _ = (topics, cluster);
# Ok(())
# }

Security

  • TLS with custom CA support.
  • TLS server-name override.
  • TLS client certificates.
  • SASL/PLAIN.
  • SASL/SCRAM-SHA-256.
  • SASL/SCRAM-SHA-512.

Compatibility

  • Apache Kafka 4.0+.
  • Modern consumer groups.
  • Transaction protocol v2.
  • Older brokers are intentionally unsupported.

Crate Surface

  • KafkaClient for ergonomic application setup.
  • KafkaProducer for lower-level producer control.
  • KafkaConsumer for lower-level consumer control.
  • KafkaShareConsumer for share-group consumption.
  • KafkaAdmin for administrative operations.
  • ProducerConfig, ConsumerConfig, and AdminConfig for direct configuration.

More Docs