KafkaHttpClient

Struct KafkaHttpClient 

Source
pub struct KafkaHttpClient { /* private fields */ }
Expand description

A client for interacting with Kafka REST Proxy over HTTP.

This client provides methods for creating consumers, subscribing to topics, polling for records, producing messages, and committing offsets using the Kafka REST API.

§Examples

use kafka_http::KafkaHttpClient;

let mut client = KafkaHttpClient::new("http://localhost:8082");
client.set_timeout_ms(5000);

Implementations§

Source§

impl KafkaHttpClient

Source

pub fn new(base_uri: &str) -> Self

Creates a new KafkaHttpClient instance.

§Arguments
  • base_uri - The base URI of the Kafka REST Proxy (e.g., “http://localhost:8082”)
§Returns

A new KafkaHttpClient with default timeout of 1000ms

§Examples
use kafka_http::KafkaHttpClient;

let client = KafkaHttpClient::new("http://localhost:8082");
Source

pub fn set_basic_auth( &mut self, username: &str, password: &str, ) -> Result<(), Error>

Sets HTTP Basic Authentication credentials for the client.

This method configures the client to use HTTP Basic Authentication by encoding the provided username and password and adding them as a default Authorization header to all subsequent requests.

§Arguments
  • username - The username for Basic Authentication
  • password - The password for Basic Authentication
§Returns
  • Ok(()) - Authentication was successfully configured
  • Err(Error) - An error if the client could not be rebuilt with the new headers
§Examples
use kafka_http::KafkaHttpClient;

let mut client = KafkaHttpClient::new("http://localhost:8082");
client.set_basic_auth("my-username", "my-password")?;
// All subsequent requests will include the Basic Auth header
Source

pub fn set_consumer_uri(&mut self, uri: &String)

Sets the consumer URI for this client.

This is typically called internally after creating a consumer, but can be used to manually set a consumer URI if needed.

§Arguments
  • uri - The consumer instance URI to set
Source

pub fn set_timeout_ms(&mut self, timeout_ms: u64)

Sets the timeout duration for polling operations.

§Arguments
  • timeout_ms - The timeout duration in milliseconds
§Examples
use kafka_http::KafkaHttpClient;

let mut client = KafkaHttpClient::new("http://localhost:8082");
client.set_timeout_ms(5000);  // Set 5 second timeout
Source

pub async fn create_consumer( &mut self, group: &str, params: &CreateConsumerParams, ) -> Result<String, Error>

Creates a new consumer in the specified consumer group.

This method will fail if the consumer already exists. Use try_create_consumer if you want to handle existing consumers gracefully.

§Arguments
  • group - The consumer group name
  • params - Parameters for creating the consumer (name, format, etc.)
§Returns
  • Ok(String) - The consumer instance URI on success
  • Err(Error) - An error if the consumer creation fails or URI is not returned
§Examples
use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};

let mut client = KafkaHttpClient::new("http://localhost:8082");
let params = CreateConsumerParams {
    name: "consumer-1".to_string(),
    format: "json".to_string(),
    ..Default::default()
};
let consumer_uri = client.create_consumer("my-group", &params).await?;
Source

pub async fn try_create_consumer( &mut self, group: &str, params: &CreateConsumerParams, ) -> Result<Option<String>, Error>

Attempts to create a new consumer, handling the case where the consumer already exists.

If the consumer already exists, this method will reconstruct the consumer URI based on the group and consumer name, and return it wrapped in Some. This allows reconnecting to existing consumers.

§Arguments
  • group - The consumer group name
  • params - Parameters for creating the consumer (name, format, etc.)
§Returns
  • Ok(Some(String)) - The consumer instance URI (new or reconstructed)
  • Ok(None) - Should not occur in current implementation
  • Err(Error) - An error if the operation fails
§Examples
use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};

let mut client = KafkaHttpClient::new("http://localhost:8082");
let params = CreateConsumerParams {
    name: "consumer-1".to_string(),
    format: "json".to_string(),
    ..Default::default()
};
// This won't fail if consumer already exists
let consumer_uri = client.try_create_consumer("my-group", &params).await?;
Source

pub async fn subscribe( &self, group: &str, params: &SubscribeParams, ) -> Result<(), Error>

Subscribes the consumer to one or more topics.

A consumer must be created before calling this method. The consumer URI must be set (either by creating a consumer or manually setting it).

§Arguments
  • group - The consumer group name (used for logging)
  • params - Subscription parameters including topics to subscribe to
§Returns
  • Ok(()) - Subscription was successful
  • Err(Error) - An error if consumer URI is not set or subscription fails
§Examples /// Polls for new records from subscribed topics.

This method uses the timeout configured via set_timeout_ms(). A consumer must be created and subscribed to topics before polling.

§Returns
  • Ok(Vec<Record>) - A vector of records (may be empty if no records available)
  • Err(Error) - An error if consumer URI is not set or polling fails
§Examples
use kafka_http::KafkaHttpClient;

let mut client = KafkaHttpClient::new("http://localhost:8082");
// ... create consumer and subscribe ...
let records = client.poll().await?;
for record in records {
    println!("Received: {:?}", record);
}
use kafka_http::{KafkaHttpClient, types::{CreateConsumerParams, SubscribeParams}};

let mut client = KafkaHttpClient::new("http://localhost:8082");
// ... create consumer first ...
let subscribe_params = SubscribeParams {
    topics: vec!["my-topic".to_string()],
    ..Default::default()
};
client.subscribe("my-group", &subscribe_params).await?;
Source

pub async fn poll(&self) -> Result<Vec<Record>, Error>

Polls for new records from subscribed topics.

This method uses the timeout configured via set_timeout_ms(). A consumer must be created and subscribed to topics before polling.

§Returns
  • Ok(Vec<Record>) - A vector of records (may be empty if no records available)
  • Err(Error) - An error if consumer URI is not set or polling fails
§Examples
use kafka_http::KafkaHttpClient;

let mut client = KafkaHttpClient::new("http://localhost:8082");
// ... create consumer and subscribe ...
let records = client.poll().awaait?;
for record in records {
    println!("Received: {:?}", record);
}
Source

pub async fn produce_to_topic( &self, topic: &str, params: &ProduceParams, ) -> Result<(), Error>

Produces records to a Kafka topic.

§Arguments
  • topic - The name of the topic to produce to
  • params - Parameters containing the records to produce
§Returns
  • Ok(()) - Records were successfully produced
  • Err(Error) - An error if production fails
§Examples
use kafka_http::{KafkaHttpClient, types::ProduceParams};

let client = KafkaHttpClient::new("http://localhost:8082");
let params = ProduceParams {
    records: vec![/* records */],
    ..Default::default()
};
client.produce_to_topic("my-topic", &params).await?;
Source

pub fn set_target_topic(&mut self, topic: &str)

Sets the target topic for subsequent produce operations.

This allows using the produce() method without specifying a topic each time. The topic will be used for all subsequent calls to produce() until changed.

§Arguments
  • topic - The name of the topic to use for future produce operations
§Examples
use kafka_http::KafkaHttpClient;

let mut client = KafkaHttpClient::new("http://localhost:8082");
client.set_target_topic("my-topic");
// Now can use produce() without specifying topic
Source

pub async fn produce(&self, params: &ProduceParams) -> Result<(), Error>

Produces records to the previously set target topic.

This is a convenience method that uses the topic set via set_target_topic(). If no target topic has been set, this method will return an error.

§Arguments
  • params - Parameters containing the records to produce
§Returns
  • Ok(()) - Records were successfully produced
  • Err(Error) - An error if target topic is not set or production fails
§Examples
use kafka_http::{KafkaHttpClient, types::ProduceParams};

let mut client = KafkaHttpClient::new("http://localhost:8082");
client.set_target_topic("my-topic");

let params = ProduceParams {
    records: vec![/* records */],
    ..Default::default()
};
client.produce(&params).await?;
Source

pub async fn commit( &self, params: &PartitionOffsetCommitParams, ) -> Result<(), Error>

Commits offsets for consumed messages.

A consumer must be created before calling this method. The consumer URI must be set.

§Arguments
  • params - Parameters specifying which partition offsets to commit
§Returns
  • Ok(()) - Offsets were successfully committed
  • Err(Error) - An error if consumer URI is not set or commit fails
§Examples
use kafka_http::{KafkaHttpClient, types::PartitionOffsetCommitParams};

let mut client = KafkaHttpClient::new("http://localhost:8082");
// ... create consumer, subscribe, and poll ...
let params = PartitionOffsetCommitParams {
    partitions: vec![/* partition offsets */],
    ..Default::default()
};
client.commit(&params).await?;

Trait Implementations§

Source§

impl Clone for KafkaHttpClient

Source§

fn clone(&self) -> KafkaHttpClient

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for KafkaHttpClient

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,