kafka-http 0.1.2

Simple Rust Kafka HTTP client to allow a rust native way to interact with the restful kafka HTTP proxy
Documentation
# kafka-http

A Rust client library for interacting with Kafka REST Proxy over HTTP.

#### Motivation
The Rust kafka binary clients are pretty bad to work with, buggy, incomplete, and the popular [rdkafka](https://crates.io/crates/rdkafka) crate requires a lot of dependencies and forces the use of C bindings and the C++ standard library, build system, etc. that is very heavyweight for a simple Kafka client.

The other projects such as [kafka-rust](https://crates.io/crates/kafka-rust) and [samsa](https://github.com/CallistoLabsNYC/samsa) are also not very ergonomic along with being effectively unmaintained. After working with them, I decided to write my own client via the http client proxy to have a lightweight and simple client that is easy to use without any large dependencies or C bindings.

Note that some features are missing, such as admin APIs, etc. but feel free to extend, fork, etc.

## Overview

`kafka-http` provides a simple and ergonomic interface for working with Apache Kafka through the Kafka REST Proxy. It supports consumer group management, message consumption, message production, and offset management.

## Features

- ✅ Create and manage Kafka consumers
- ✅ Subscribe to topics
- ✅ Poll for messages
- ✅ Produce messages to topics
- ✅ Commit offsets
- ✅ Async/await support with Tokio
- ✅ Configurable timeouts
- ✅ Comprehensive error handling
- ✅ Built-in tracing/logging support

## Installation

Add this to your `Cargo.toml`:
```toml
[dependencies] 
kafka-http = "0.1" # or the latest version
```
## Quick Start

### Consuming Messages

Note that this example uses the redpanda kafka rest proxy. If you want to use Kafka, confluence HTTP Proxy, etc. the port
usually is 8082 but should be set to whatever your port is.

```rust
use std::error;
use std::error::Error;
use kafka_http::{KafkaHttpClient, CreateConsumerParams, SubscribeParams};

#[tokio::main]
async fn main() -> Result<(), Box<dyn error::Error>> { // Create a client let mut client = KafkaHttpClient::new(" http://localhost:8082");
    // Create a client 
    let mut client = KafkaHttpClient::new(" http://localhost:18082");

    // Set polling timeout (optional, default is 1000ms)
    client.set_timeout_ms(5000);

    // Create a consumer
    let consumer_params = CreateConsumerParams {
        name: "my-consumer".to_string(),
        format: "json".to_string(),
        auto_offset_reset: Some("earliest".to_string()),
        ..Default::default()
    };

    client.create_consumer("my-group", &consumer_params).await?;

    // Subscribe to topics
    let subscribe_params = SubscribeParams {
        topics: vec!["my-topic".to_string()],
        ..Default::default()
    };

    client.subscribe("my-group", &subscribe_params).await?;

    // Poll for messages
    loop {
        let records = client.poll().await?;
        for record in records {
            println!("Received: key={:?}, value={:?}", record.key, record.value);
        }
    }

    Ok(())
}
```

### Producing Messages

```rust
use std::error;
use std::error::Error;
use kafka_http::{KafkaHttpClient, CreateConsumerParams, SubscribeParams};

#[tokio::main]
async fn main() -> Result<(), Box<dyn error::Error>> { // Create a client let mut client = KafkaHttpClient::new(" http://localhost:8082");
    // Create a client 
    let mut client = KafkaHttpClient::new(" http://localhost:18082");

    let params = ProduceParams {
        records: vec![
            // Add your records here
        ],
        ..Default::default()
    };

    client.produce("my-topic", &params).await?;

    Ok(())
}
```


### Committing Offsets

```rust
use std::error;
use std::error::Error;
use kafka_http::{KafkaHttpClient, CreateConsumerParams, SubscribeParams};

#[tokio::main]
async fn main() -> Result<(), Box<dyn error::Error>> { // Create a client let mut client = KafkaHttpClient::new(" http://localhost:8082");
    // Create a client 
    let mut client = KafkaHttpClient::new(" http://localhost:18082");

    let commit_params = PartitionOffsetCommitParams {
        offsets: vec![
            // Add partition offset information here
        ],
        ..Default::default()
    };

    client.commit(&commit_params).await?;

    Ok(())
}
```

## API Documentation

### `KafkaHttpClient`

The main client struct for interacting with Kafka REST Proxy.

#### Methods

- `new(base_uri: &str) -> Self` - Create a new client instance
- `set_timeout_ms(&mut self, timeout_ms: u64)` - Set the polling timeout
- `set_consumer_uri(&mut self, uri: &String)` - Manually set consumer URI
- `create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<String, Error>` - Create a new consumer (fails if exists)
- `try_create_consumer(&mut self, group: &str, params: &CreateConsumerParams) -> Result<Option<String>, Error>` - Create or reconnect to existing consumer
- `subscribe(&self, group: &str, params: &SubscribeParams) -> Result<(), Error>` - Subscribe to topics
- `poll(&self) -> Result<Vec<Record>, Error>` - Poll for new records
- `produce(&self, topic: &str, params: &ProduceParams) -> Result<(), Error>` - Produce messages
- `commit(&self, params: &PartitionOffsetCommitParams) -> Result<(), Error>` - Commit offsets

## Requirements

- Rust 1.90.0 or later
- Kafka REST Proxy running and accessible

## Dependencies

- `reqwest` - HTTP client
- `serde` / `serde_json` - JSON serialization
- `tracing` - Logging and diagnostics

## Error Handling

The library uses a custom `Error` type that provides detailed error information. All async methods return `Result<T, Error>`.

## Logging
The library uses `tracing` for logging. To enable logs in your application: `tracing`

## License
See the [LICENSE](LICENSE) file for details.

## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.

## Related Projects
- [Apache Kafka]https://kafka.apache.org/

## Examples
For more examples, see the `tests/` directory.

## Contributing

Feel free to open an issue or submit a pull request.