kafka-http 0.1.2

Simple Rust Kafka HTTP client to allow a rust native way to interact with the restful kafka HTTP proxy
Documentation

kafka-http

A Rust client library for interacting with Kafka REST Proxy over HTTP.

Motivation

The Rust kafka binary clients are pretty bad to work with, buggy, incomplete, and the popular rdkafka crate requires a lot of dependencies and forces the use of C bindings and the C++ standard library, build system, etc. that is very heavyweight for a simple Kafka client.

The other projects such as kafka-rust and samsa are also not very ergonomic along with being effectively unmaintained. After working with them, I decided to write my own client via the http client proxy to have a lightweight and simple client that is easy to use without any large dependencies or C bindings.

Note that some features are missing, such as admin APIs, etc. but feel free to extend, fork, etc.

Overview

kafka-http provides a simple and ergonomic interface for working with Apache Kafka through the Kafka REST Proxy. It supports consumer group management, message consumption, message production, and offset management.

Features

  • ✅ Create and manage Kafka consumers
  • ✅ Subscribe to topics
  • ✅ Poll for messages
  • ✅ Produce messages to topics
  • ✅ Commit offsets
  • ✅ Async/await support with Tokio
  • ✅ Configurable timeouts
  • ✅ Comprehensive error handling
  • ✅ Built-in tracing/logging support

Installation

Add this to your Cargo.toml:

[dependencies] 
kafka-http = "0.1" # or the latest version

Quick Start

Consuming Messages

Note that this example uses the redpanda kafka rest proxy. If you want to use Kafka, confluence HTTP Proxy, etc. the port usually is 8082 but should be set to whatever your port is.

use std::error;
use std::error::Error;
use kafka_http::{KafkaHttpClient, CreateConsumerParams, SubscribeParams};

#[tokio::main]
async fn main() -> Result<(), Box<dyn error::Error>> { // Create a client let mut client = KafkaHttpClient::new(" http://localhost:8082");
    // Create a client 
    let mut client = KafkaHttpClient::new(" http://localhost:18082");

    // Set polling timeout (optional, default is 1000ms)
    client.set_timeout_ms(5000);

    // Create a consumer
    let consumer_params = CreateConsumerParams {
        name: "my-consumer".to_string(),
        format: "json".to_string(),
        auto_offset_reset: Some("earliest".to_string()),
        ..Default::default()
    };

    client.create_consumer("my-group", &consumer_params).await?;

    // Subscribe to topics
    let subscribe_params = SubscribeParams {
        topics: vec!["my-topic".to_string()],
        ..Default::default()
    };

    client.subscribe("my-group", &subscribe_params).await?;

    // Poll for messages
    loop {
        let records = client.poll().await?;
        for record in records {
            println!("Received: key={:?}, value={:?}", record.key, record.value);
        }
    }

    Ok(())
}

Producing Messages

use std::error;
use std::error::Error;
use kafka_http::{KafkaHttpClient, CreateConsumerParams, SubscribeParams};

#[tokio::main]
async fn main() -> Result<(), Box<dyn error::Error>> { // Create a client let mut client = KafkaHttpClient::new(" http://localhost:8082");
    // Create a client 
    let mut client = KafkaHttpClient::new(" http://localhost:18082");

    let params = ProduceParams {
        records: vec![
            // Add your records here
        ],
        ..Default::default()
    };

    client.produce("my-topic", &params).await?;

    Ok(())
}

Committing Offsets

use std::error;
use std::error::Error;
use kafka_http::{KafkaHttpClient, CreateConsumerParams, SubscribeParams};

#[tokio::main]
async fn main() -> Result<(), Box<dyn error::Error>> { // Create a client let mut client = KafkaHttpClient::new(" http://localhost:8082");
    // Create a client 
    let mut client = KafkaHttpClient::new(" http://localhost:18082");

    let commit_params = PartitionOffsetCommitParams {
        offsets: vec![
            // Add partition offset information here
        ],
        ..Default::default()
    };

    client.commit(&commit_params).await?;

    Ok(())
}

API Documentation

KafkaHttpClient

The main client struct for interacting with Kafka REST Proxy.

Methods

  • new(base_uri: &str) -> Self - Create a new client instance
  • set_timeout_ms(&mut self, timeout_ms: u64) - Set the polling timeout
  • set_consumer_uri(&mut self, uri: &String) - Manually set consumer URI
  • create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<String, Error> - Create a new consumer (fails if exists)
  • try_create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<Option<String>, Error> - Create or reconnect to existing consumer
  • subscribe(&self, group: &str, params: &SubscribeParams) -> Result<(), Error> - Subscribe to topics
  • poll(&self) -> Result<Vec<Record>, Error> - Poll for new records
  • produce(&self, topic: &str, params: &ProduceParams) -> Result<(), Error> - Produce messages
  • commit(&self, params: &PartitionOffsetCommitParams) -> Result<(), Error> - Commit offsets

Requirements

  • Rust 1.90.0 or later
  • Kafka REST Proxy running and accessible

Dependencies

  • reqwest - HTTP client
  • serde / serde_json - JSON serialization
  • tracing - Logging and diagnostics

Error Handling

The library uses a custom Error type that provides detailed error information. All async methods return Result<T, Error>.

Logging

The library uses tracing for logging. To enable logs in your application: tracing

License

See the LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Related Projects

Examples

For more examples, see the tests/ directory.

Contributing

Feel free to open an issue or submit a pull request.