Expand description
§Krafka
A pure Rust, async-native Apache Kafka client.
Krafka provides high-performance, safe, and idiomatic Rust APIs for producing and consuming messages from Apache Kafka clusters.
§Features
- Pure Rust: No librdkafka or C bindings
- Async-native: Built on Tokio for non-blocking I/O
- High-performance: Zero-copy buffers, minimal allocations
- Safe: No unsafe code by default
- Cloud-native: First-class AWS MSK support including IAM auth
§Thread Safety
All main types in Krafka implement Send + Sync:
Producer- can be shared across tasks withArcConsumer- can be shared across tasks withArcAdminClient- can be shared across tasks withArc
This allows safe concurrent access from multiple Tokio tasks:
use std::sync::Arc;
use krafka::producer::Producer;
let producer = Arc::new(Producer::builder()
.bootstrap_servers("localhost:9092")
.build()
.await?);
// Spawn multiple tasks sharing the producer
for i in 0..10 {
let producer = producer.clone();
tokio::spawn(async move {
let _ = producer.send("topic", None, b"message").await;
});
}§Quick Start
§Producer
use krafka::producer::Producer;
let producer = Producer::builder()
.bootstrap_servers("localhost:9092")
.build()
.await?;
producer.send("my-topic", Some(b"key"), b"value").await?;§Consumer
use krafka::consumer::Consumer;
let consumer = Consumer::builder()
.bootstrap_servers("localhost:9092")
.group_id("my-group")
.build()
.await?;
consumer.subscribe(&["my-topic"]).await?;
while let Some(msg) = consumer.recv().await? {
println!("{:?}", msg);
}§Cargo Features
| Feature | Default | Description |
|---|---|---|
compression | yes | Enables all compression codecs (gzip + snappy + lz4 + zstd). |
gzip | via compression | Gzip record batch compression via flate2. |
snappy | via compression | Snappy compression via snap. |
lz4 | via compression | LZ4 compression via lz4_flex. |
zstd | via compression | Zstd compression via zstd (requires C toolchain). |
aws-msk | no | AWS MSK IAM authentication with SDK credential chain. |
schema-registry | no | Confluent Schema Registry HTTP client. |
aws-glue-schema-registry | no | AWS Glue Schema Registry SDK client. |
socks5 | no | SOCKS5 proxy support via tokio-socks. |
danger-insecure-tls | no | Allow disabling TLS certificate verification (MITM risk!). |
To disable the default compression codecs and pick only what you need:
[dependencies]
krafka = { version = "0.4", default-features = false, features = ["lz4"] }Re-exports§
pub use error::KrafkaError;pub use error::Result;pub use metadata::MetadataRecoveryStrategy;
Modules§
- admin
- Admin client for Apache Kafka.
- auth
- Authentication for Kafka connections.
- consumer
- Kafka consumer implementation.
- error
- Error types for Krafka.
- interceptor
- Interceptor hooks for producers and consumers.
- metadata
- Cluster metadata management.
- metrics
- Metrics and observability for Krafka clients.
- network
- Network layer for Kafka connections.
- producer
- Kafka producer implementation.
- protocol
- Kafka protocol implementation.
- schema_
registry - Schema registry integration for Avro, Protobuf, and JSON Schema workflows.
- share_
consumer unstable-protocol - Share consumer implementation (KIP-932).
- telemetry
telemetry - KIP-714 client telemetry: subscription polling and metric push (feature-gated).
- tracing_
ext - Tracing extensions for observability.
- util
- Utility functions for Krafka.\n//!\n//! This module provides low-level utilities used throughout the crate:\n//!\n//! - Correlation ID generation: Thread-safe ID generation for request/response matching\n//! - CRC32C: Checksum calculation for Kafka record validation\n//! - Varint encoding: Variable-length integer encoding for compact protocols\n//! - SNI hostname extraction: Parse hostnames from address strings for TLS SNI
Type Aliases§
- ApiVersion
- Kafka protocol API version.
- Broker
Id - Kafka broker ID.
- Correlation
Id - Kafka correlation ID for request/response matching.
- Offset
- Kafka offset.
- Partition
Id - Kafka partition ID.
- Timestamp
- Kafka timestamp (milliseconds since epoch).