🦀 Krafka
A pure Rust, async-native Apache Kafka client designed for high performance, safety, and ease of use.
✨ Features
- 🦀 Pure Rust: No librdkafka or C dependencies
- ⚡ Async-native: Built on Tokio for true async I/O
- 🔒 Zero unsafe: Safe Rust by default
- 🚀 High performance: Zero-copy buffers, inline hot paths, efficient batching, concurrent batch flushing
- 📦 Full protocol support: Kafka protocol with all compression codecs
- 🔐 TLS/SSL encryption: Using rustls for secure connections
- 🔑 SASL authentication: PLAIN, SCRAM-SHA-256/512, OAUTHBEARER mechanisms
- 💯 Transactions: Exactly-once semantics with transactional producer
- ☁️ Cloud-native: First-class AWS MSK support including IAM auth
- 🛡️ Security hardened: Secret zeroization, constant-time auth (
subtle), decompression bomb protection, allocation caps - 🔄 Built-in retry: Exponential backoff with metadata refresh on leader changes
- 📊 Metrics: Lock-free counters/gauges/latency wired into all hot paths
🚀 Quick Start
Add Krafka to your Cargo.toml:
[]
= "0.2"
= { = "1", = ["full"] }
# For AWS MSK IAM authentication with full SDK support:
# krafka = { version = "0.2", features = ["aws-msk"] }
Producer
use Producer;
use Result;
async
Consumer
use ;
use Result;
use Duration;
async
Admin Client
use ;
use Result;
use Duration;
async
Transactional Producer
For exactly-once semantics across multiple partitions:
use TransactionalProducer;
use Result;
async
Authentication
Connect to secured Kafka clusters with SASL, SCRAM, OAUTHBEARER, or AWS MSK IAM — available on all client types:
use Producer;
use Consumer;
use AdminClient;
// Producer with SASL/SCRAM-SHA-256
let producer = builder
.bootstrap_servers
.sasl_scram_sha256
.build
.await?;
// Consumer with SASL/PLAIN
let consumer = builder
.bootstrap_servers
.group_id
.sasl_plain
.build
.await?;
// Producer with SASL/OAUTHBEARER
let producer = builder
.bootstrap_servers
.sasl_oauthbearer
.build
.await?;
// Admin with AWS MSK IAM
use AuthConfig;
let auth = aws_msk_iam;
let admin = builder
.bootstrap_servers
.auth
.build
.await?;
📦 Modules
| Module | Description |
|---|---|
producer |
High-throughput message production with batching and compression |
consumer |
Consumer groups with rebalancing, offset management, and static membership |
admin |
Cluster administration (topics, groups, records, configuration, ACLs) |
interceptor |
Producer and consumer interceptor hooks for observability |
protocol |
Kafka wire protocol implementation |
auth |
Authentication (SASL/PLAIN, SASL/SCRAM, SASL/OAUTHBEARER, AWS MSK IAM) |
🗜️ Compression
Krafka supports all Kafka compression codecs:
use Producer;
use Compression;
let producer = builder
.bootstrap_servers
.compression // Fast compression
.build
.await?;
| Codec | Crate | Characteristics |
|---|---|---|
Compression::Gzip |
flate2 | Best ratio, slower |
Compression::Snappy |
snap | Good balance |
Compression::Lz4 |
lz4_flex | Fastest |
Compression::Zstd |
zstd | Best modern choice |
⚡ Performance Tuning
High Throughput Producer
use ;
use Compression;
use Duration;
let producer = builder
.bootstrap_servers
.acks
.compression
.batch_size // 1MB batches
.linger // Allow batching
.build
.await?;
Low Latency Consumer
use Consumer;
use Duration;
let consumer = builder
.bootstrap_servers
.group_id
.fetch_min_bytes
.fetch_max_wait
.build
.await?;
📚 Documentation
Full documentation is available at hupe1980.github.io/krafka
- Getting Started
- Producer Guide
- Consumer Guide
- Admin Client
- Configuration Reference
- Performance Tuning
- Architecture Overview
- Metrics & Observability
- Error Handling
- Interceptors
- Authentication
🎮 Examples
Run the examples with:
# Producer example
# Consumer example
# Advanced consumer example (pause/resume, seek, manual commits)
# Admin client example
# Transactional producer example
# Authentication examples (SASL, SCRAM, MSK IAM)
📊 Status
Krafka is feature-complete and production-ready.
Features:
- ✅ Protocol layer (all message types, compression, ACL messages, transactions)
- ✅ Network layer (async connections, pooling, TLS/SSL)
- ✅ Producer (batching with linger timer, partitioning, compression, built-in retry with exponential backoff, metadata refresh on failure, max-in-flight enforcement via semaphore, interceptor hooks)
- ✅ Consumer (polling, streaming
recv()with error propagation, offset management, auto-commit timer, seek, pause/resume, configurable partition assignment strategy, rebalance listeners, cooperative sticky assignor, static group membership (KIP-345), interceptor hooks, log compaction awareness) - ✅ Admin Client (topic CRUD, partitions, configuration, ACL management, consumer groups, record deletion, leader epoch queries)
- ✅ Authentication (SASL/PLAIN, SASL/SCRAM-SHA-256/512, SASL/OAUTHBEARER, AWS MSK IAM with SDK support)
- ✅ TLS/SSL encryption (rustls, mTLS support)
- ✅ Transactions (exactly-once semantics with transactional producer — full PID/epoch/sequence tracking)
- ✅ Metrics (counters, gauges, latency tracking — all wired into producer/consumer hot paths)
- ✅ Tracing (OpenTelemetry-compatible spans with properly declared fields)
- ✅ Security hardening (secret zeroization, constant-time comparison, PBKDF2 validation, decompression limits, allocation caps)
🤝 Contributing
Contributions are welcome!
📄 License
Licensed under the MIT License.