Skip to main content

Crate krafka

Crate krafka 

Source
Expand description

§Krafka

A pure Rust, async-native Apache Kafka client.

Krafka provides high-performance, safe, and idiomatic Rust APIs for producing and consuming messages from Apache Kafka clusters.

§Features

  • Pure Rust: No librdkafka or C bindings
  • Async-native: Built on Tokio for non-blocking I/O
  • High-performance: Zero-copy buffers, minimal allocations
  • Safe: No unsafe code by default
  • Cloud-native: First-class AWS MSK support including IAM auth

§Thread Safety

All main types in Krafka implement Send + Sync:

  • Producer - can be shared across tasks with Arc
  • Consumer - can be shared across tasks with Arc
  • AdminClient - can be shared across tasks with Arc

This allows safe concurrent access from multiple Tokio tasks:

use std::sync::Arc;
use krafka::producer::Producer;

let producer = Arc::new(Producer::builder()
    .bootstrap_servers("localhost:9092")
    .build()
    .await?);

// Spawn multiple tasks sharing the producer
for i in 0..10 {
    let producer = producer.clone();
    tokio::spawn(async move {
        let _ = producer.send("topic", None, b"message").await;
    });
}

§Quick Start

§Producer

use krafka::producer::Producer;

let producer = Producer::builder()
    .bootstrap_servers("localhost:9092")
    .build()
    .await?;

producer.send("my-topic", Some(b"key"), b"value").await?;

§Consumer

use krafka::consumer::Consumer;

let consumer = Consumer::builder()
    .bootstrap_servers("localhost:9092")
    .group_id("my-group")
    .build()
    .await?;

consumer.subscribe(&["my-topic"]).await?;

while let Some(msg) = consumer.recv().await? {
    println!("{:?}", msg);
}

§Cargo Features

FeatureDefaultDescription
compressionyesEnables all compression codecs (gzip + snappy + lz4 + zstd).
gzipvia compressionGzip record batch compression via flate2.
snappyvia compressionSnappy compression via snap.
lz4via compressionLZ4 compression via lz4_flex.
zstdvia compressionZstd compression via zstd (requires C toolchain).
aws-msknoAWS MSK IAM authentication with SDK credential chain.
schema-registrynoConfluent Schema Registry HTTP client.
aws-glue-schema-registrynoAWS Glue Schema Registry SDK client.
socks5noSOCKS5 proxy support via tokio-socks.
danger-insecure-tlsnoAllow disabling TLS certificate verification (MITM risk!).

To disable the default compression codecs and pick only what you need:

[dependencies]
krafka = { version = "0.4", default-features = false, features = ["lz4"] }

Re-exports§

pub use error::KrafkaError;
pub use error::Result;
pub use metadata::MetadataRecoveryStrategy;

Modules§

admin
Admin client for Apache Kafka.
auth
Authentication for Kafka connections.
consumer
Kafka consumer implementation.
error
Error types for Krafka.
interceptor
Interceptor hooks for producers and consumers.
metadata
Cluster metadata management.
metrics
Metrics and observability for Krafka clients.
network
Network layer for Kafka connections.
producer
Kafka producer implementation.
protocol
Kafka protocol implementation.
schema_registry
Schema registry integration for Avro, Protobuf, and JSON Schema workflows.
share_consumerunstable-protocol
Share consumer implementation (KIP-932).
telemetrytelemetry
KIP-714 client telemetry: subscription polling and metric push (feature-gated).
tracing_ext
Tracing extensions for observability.
util
Utility functions for Krafka.\n//!\n//! This module provides low-level utilities used throughout the crate:\n//!\n//! - Correlation ID generation: Thread-safe ID generation for request/response matching\n//! - CRC32C: Checksum calculation for Kafka record validation\n//! - Varint encoding: Variable-length integer encoding for compact protocols\n//! - SNI hostname extraction: Parse hostnames from address strings for TLS SNI

Type Aliases§

ApiVersion
Kafka protocol API version.
BrokerId
Kafka broker ID.
CorrelationId
Kafka correlation ID for request/response matching.
Offset
Kafka offset.
PartitionId
Kafka partition ID.
Timestamp
Kafka timestamp (milliseconds since epoch).