oxigdal-kafka 0.1.3

Apache Kafka integration for OxiGDAL - async producer/consumer with schema registry and transactions
Documentation

oxigdal-kafka

Apache Kafka integration for OxiGDAL - High-performance async producer/consumer with schema registry and transactional support.

Features

  • Async Producer (~1,500 LOC)

    • Batch sending with configurable batching strategies
    • Multiple partitioning strategies (hash, round-robin, consistent hash, custom)
    • Compression support (gzip, snappy, lz4, zstd)
    • Idempotent producer for exactly-once delivery
    • Comprehensive metrics and monitoring
  • Async Consumer (~1,500 LOC)

    • Consumer groups with automatic rebalancing
    • Flexible offset management (manual, auto-per-message, auto-per-batch)
    • Exactly-once semantics with read committed isolation
    • Custom rebalance listeners
    • Batch consumption support
  • Schema Registry (~1,000 LOC)

    • Avro schema management and caching
    • Schema evolution with compatibility checking
    • Backward, forward, and full compatibility modes
    • Confluent Schema Registry HTTP client
  • Transactions (~500 LOC)

    • Transactional producer with begin/commit/abort
    • Exactly-once processing guarantees
    • Transaction coordinator with state management
    • Automatic transaction scope with RAII

COOLJAPAN Compliance

  • Pure Rust: Uses rdkafka with pure Rust backend
  • No unwrap(): All error paths handled with Result types
  • Files < 2000 lines: Modular design with focused modules
  • Workspace deps: All dependencies use workspace configuration
  • No warnings: Strict clippy lints enforced

Installation

Add to your Cargo.toml:

[dependencies]
oxigdal-kafka = "0.1"

Feature Flags

  • default: Enables producer and consumer
  • producer: Producer functionality
  • consumer: Consumer functionality
  • schema-registry: Avro schema registry support
  • transactions: Transactional producer/consumer
  • compression-*: Various compression algorithms
  • all: All features enabled

Usage

Producer

use oxigdal_kafka::producer::{ProducerConfig, KafkaProducer};
use oxigdal_kafka::config::{Acks, CompressionType};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create producer configuration
    let config = ProducerConfig::builder()
        .bootstrap_servers("localhost:9092")
        .acks(Acks::All)
        .compression_type(CompressionType::Snappy)
        .enable_idempotence(true)
        .batch_size(16384)
        .linger_ms(10)
        .build()?;

    // Create producer
    let producer = KafkaProducer::new(config).await?;

    // Send a message
    let (partition, offset) = producer
        .send("my-topic", Some(b"key"), b"value")
        .await?;

    println!("Sent to partition {} at offset {}", partition, offset);

    // Send batch
    let messages = vec![
        (Some(b"key1".to_vec()), b"value1".to_vec()),
        (Some(b"key2".to_vec()), b"value2".to_vec()),
    ];
    producer.send_batch("my-topic", messages).await?;

    // Flush pending messages
    producer.flush(std::time::Duration::from_secs(5)).await?;

    Ok(())
}

Consumer

use oxigdal_kafka::consumer::{ConsumerConfig, KafkaConsumer};
use oxigdal_kafka::config::{OffsetReset, IsolationLevel};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create consumer configuration
    let config = ConsumerConfig::builder()
        .bootstrap_servers("localhost:9092")
        .group_id("my-consumer-group")
        .auto_offset_reset(OffsetReset::Earliest)
        .isolation_level(IsolationLevel::ReadCommitted)
        .enable_auto_commit(false)
        .build()?;

    // Create consumer
    let consumer = KafkaConsumer::new(config).await?;

    // Subscribe to topics
    consumer.subscribe(&["my-topic"]).await?;

    // Consume messages
    loop {
        match consumer.receive_with_timeout(Duration::from_secs(10)).await {
            Ok(message) => {
                println!("Received: {:?}", message.payload_str()?);

                // Manual commit
                consumer.commit().await?;
            }
            Err(e) => {
                eprintln!("Error: {}", e);
                break;
            }
        }
    }

    Ok(())
}

Transactions

use oxigdal_kafka::transactions::{TransactionConfig, TransactionalProducer};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = TransactionConfig::builder()
        .bootstrap_servers("localhost:9092")
        .transactional_id("my-transaction-id")
        .transaction_timeout(Duration::from_secs(60))
        .build()?;

    let producer = TransactionalProducer::new(config).await?;

    // Begin transaction
    producer.begin_transaction().await?;

    // Send messages in transaction
    producer.send("topic", Some(b"key1"), b"value1").await?;
    producer.send("topic", Some(b"key2"), b"value2").await?;

    // Commit transaction
    producer.commit_transaction().await?;

    Ok(())
}

Schema Registry

use oxigdal_kafka::schema_registry::{SchemaRegistry, SchemaRegistryConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let config = SchemaRegistryConfig::new("http://localhost:8081");
    let registry = SchemaRegistry::new(config).await?;

    // Register schema
    let schema = r#"{"type": "string"}"#;
    let schema_id = registry.register_schema("my-subject-value", schema).await?;

    // Serialize with schema
    let value = "test data";
    let encoded = registry.serialize("my-subject-value", &value).await?;

    // Deserialize
    let decoded: String = registry.deserialize(&encoded).await?;

    Ok(())
}

Performance

Benchmarks show:

  • Producer throughput: ~500K msgs/sec (small messages, batch mode)
  • Consumer throughput: ~400K msgs/sec (small messages)
  • Partitioner latency: < 100ns (hash-based)
  • Batch assembly: < 10µs (1000 messages)
  • Schema lookup: < 1µs (cached)

Run benchmarks:

cargo bench --features all

Testing

Run unit tests:

cargo test

Run integration tests (requires Kafka):

# Start Kafka (e.g., with Docker)
docker-compose up -d

# Run tests
cargo test --features all -- --ignored

Architecture

oxigdal-kafka/
├── producer/          # Async producer implementation
│   ├── config.rs      # Producer configuration
│   ├── partitioner.rs # Partitioning strategies
│   ├── batch.rs       # Message batching
│   └── metrics.rs     # Producer metrics
├── consumer/          # Async consumer implementation
│   ├── config.rs      # Consumer configuration
│   ├── offset.rs      # Offset management
│   ├── rebalance.rs   # Rebalance handling
│   └── metrics.rs     # Consumer metrics
├── schema_registry/   # Avro schema management
│   ├── client.rs      # HTTP client
│   ├── schema.rs      # Schema types
│   ├── compatibility.rs # Compatibility checking
│   └── serializer.rs  # Avro serialization
└── transactions/      # Transaction support
    ├── config.rs      # Transaction configuration
    ├── producer.rs    # Transactional producer
    └── coordinator.rs # Transaction coordinator

License

Licensed under Apache-2.0. Copyright (c) 2025 COOLJAPAN OU (Team Kitasan).

Contributing

Contributions are welcome! Please ensure:

  • All tests pass
  • No clippy warnings
  • No unwrap() or panic!() in production code
  • Files stay under 2000 lines
  • Documentation for public APIs