pub struct KafkaHttpClient { /* private fields */ }Expand description
A client for interacting with Kafka REST Proxy over HTTP.
This client provides methods for creating consumers, subscribing to topics, polling for records, producing messages, and committing offsets using the Kafka REST API.
§Examples
use kafka_http::KafkaHttpClient;
let mut client = KafkaHttpClient::new("http://localhost:8082");
client.set_timeout_ms(5000);Implementations§
Source§impl KafkaHttpClient
impl KafkaHttpClient
Sourcepub fn set_basic_auth(
&mut self,
username: &str,
password: &str,
) -> Result<(), Error>
pub fn set_basic_auth( &mut self, username: &str, password: &str, ) -> Result<(), Error>
Sets HTTP Basic Authentication credentials for the client.
This method configures the client to use HTTP Basic Authentication by encoding the provided username and password and adding them as a default Authorization header to all subsequent requests.
§Arguments
username- The username for Basic Authenticationpassword- The password for Basic Authentication
§Returns
Ok(())- Authentication was successfully configuredErr(Error)- An error if the client could not be rebuilt with the new headers
§Examples
use kafka_http::KafkaHttpClient;
let mut client = KafkaHttpClient::new("http://localhost:8082");
client.set_basic_auth("my-username", "my-password")?;
// All subsequent requests will include the Basic Auth headerSourcepub fn set_consumer_uri(&mut self, uri: &String)
pub fn set_consumer_uri(&mut self, uri: &String)
Sets the consumer URI for this client.
This is typically called internally after creating a consumer, but can be used to manually set a consumer URI if needed.
§Arguments
uri- The consumer instance URI to set
Sourcepub fn set_timeout_ms(&mut self, timeout_ms: u64)
pub fn set_timeout_ms(&mut self, timeout_ms: u64)
Sourcepub async fn create_consumer(
&mut self,
group: &str,
params: &CreateConsumerParams,
) -> Result<String, Error>
pub async fn create_consumer( &mut self, group: &str, params: &CreateConsumerParams, ) -> Result<String, Error>
Creates a new consumer in the specified consumer group.
This method will fail if the consumer already exists. Use try_create_consumer
if you want to handle existing consumers gracefully.
§Arguments
group- The consumer group nameparams- Parameters for creating the consumer (name, format, etc.)
§Returns
Ok(String)- The consumer instance URI on successErr(Error)- An error if the consumer creation fails or URI is not returned
§Examples
use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};
let mut client = KafkaHttpClient::new("http://localhost:8082");
let params = CreateConsumerParams {
name: "consumer-1".to_string(),
format: "json".to_string(),
..Default::default()
};
let consumer_uri = client.create_consumer("my-group", ¶ms).await?;Sourcepub async fn try_create_consumer(
&mut self,
group: &str,
params: &CreateConsumerParams,
) -> Result<Option<String>, Error>
pub async fn try_create_consumer( &mut self, group: &str, params: &CreateConsumerParams, ) -> Result<Option<String>, Error>
Attempts to create a new consumer, handling the case where the consumer already exists.
If the consumer already exists, this method will reconstruct the consumer URI based on
the group and consumer name, and return it wrapped in Some. This allows reconnecting
to existing consumers.
§Arguments
group- The consumer group nameparams- Parameters for creating the consumer (name, format, etc.)
§Returns
Ok(Some(String))- The consumer instance URI (new or reconstructed)Ok(None)- Should not occur in current implementationErr(Error)- An error if the operation fails
§Examples
use kafka_http::{KafkaHttpClient, types::CreateConsumerParams};
let mut client = KafkaHttpClient::new("http://localhost:8082");
let params = CreateConsumerParams {
name: "consumer-1".to_string(),
format: "json".to_string(),
..Default::default()
};
// This won't fail if consumer already exists
let consumer_uri = client.try_create_consumer("my-group", ¶ms).await?;Sourcepub async fn subscribe(
&self,
group: &str,
params: &SubscribeParams,
) -> Result<(), Error>
pub async fn subscribe( &self, group: &str, params: &SubscribeParams, ) -> Result<(), Error>
Subscribes the consumer to one or more topics.
A consumer must be created before calling this method. The consumer URI must be set (either by creating a consumer or manually setting it).
§Arguments
group- The consumer group name (used for logging)params- Subscription parameters including topics to subscribe to
§Returns
Ok(())- Subscription was successfulErr(Error)- An error if consumer URI is not set or subscription fails
§Examples /// Polls for new records from subscribed topics.
This method uses the timeout configured via set_timeout_ms(). A consumer must be
created and subscribed to topics before polling.
§Returns
Ok(Vec<Record>)- A vector of records (may be empty if no records available)Err(Error)- An error if consumer URI is not set or polling fails
§Examples
use kafka_http::KafkaHttpClient;
let mut client = KafkaHttpClient::new("http://localhost:8082");
// ... create consumer and subscribe ...
let records = client.poll().await?;
for record in records {
println!("Received: {:?}", record);
}use kafka_http::{KafkaHttpClient, types::{CreateConsumerParams, SubscribeParams}};
let mut client = KafkaHttpClient::new("http://localhost:8082");
// ... create consumer first ...
let subscribe_params = SubscribeParams {
topics: vec!["my-topic".to_string()],
..Default::default()
};
client.subscribe("my-group", &subscribe_params).await?;Sourcepub async fn poll(&self) -> Result<Vec<Record>, Error>
pub async fn poll(&self) -> Result<Vec<Record>, Error>
Polls for new records from subscribed topics.
This method uses the timeout configured via set_timeout_ms(). A consumer must be
created and subscribed to topics before polling.
§Returns
Ok(Vec<Record>)- A vector of records (may be empty if no records available)Err(Error)- An error if consumer URI is not set or polling fails
§Examples
use kafka_http::KafkaHttpClient;
let mut client = KafkaHttpClient::new("http://localhost:8082");
// ... create consumer and subscribe ...
let records = client.poll().awaait?;
for record in records {
println!("Received: {:?}", record);
}Sourcepub async fn produce_to_topic(
&self,
topic: &str,
params: &ProduceParams,
) -> Result<(), Error>
pub async fn produce_to_topic( &self, topic: &str, params: &ProduceParams, ) -> Result<(), Error>
Produces records to a Kafka topic.
§Arguments
topic- The name of the topic to produce toparams- Parameters containing the records to produce
§Returns
Ok(())- Records were successfully producedErr(Error)- An error if production fails
§Examples
use kafka_http::{KafkaHttpClient, types::ProduceParams};
let client = KafkaHttpClient::new("http://localhost:8082");
let params = ProduceParams {
records: vec![/* records */],
..Default::default()
};
client.produce_to_topic("my-topic", ¶ms).await?;Sourcepub fn set_target_topic(&mut self, topic: &str)
pub fn set_target_topic(&mut self, topic: &str)
Sets the target topic for subsequent produce operations.
This allows using the produce() method without specifying a topic each time.
The topic will be used for all subsequent calls to produce() until changed.
§Arguments
topic- The name of the topic to use for future produce operations
§Examples
use kafka_http::KafkaHttpClient;
let mut client = KafkaHttpClient::new("http://localhost:8082");
client.set_target_topic("my-topic");
// Now can use produce() without specifying topicSourcepub async fn produce(&self, params: &ProduceParams) -> Result<(), Error>
pub async fn produce(&self, params: &ProduceParams) -> Result<(), Error>
Produces records to the previously set target topic.
This is a convenience method that uses the topic set via set_target_topic().
If no target topic has been set, this method will return an error.
§Arguments
params- Parameters containing the records to produce
§Returns
Ok(())- Records were successfully producedErr(Error)- An error if target topic is not set or production fails
§Examples
use kafka_http::{KafkaHttpClient, types::ProduceParams};
let mut client = KafkaHttpClient::new("http://localhost:8082");
client.set_target_topic("my-topic");
let params = ProduceParams {
records: vec![/* records */],
..Default::default()
};
client.produce(¶ms).await?;Sourcepub async fn commit(
&self,
params: &PartitionOffsetCommitParams,
) -> Result<(), Error>
pub async fn commit( &self, params: &PartitionOffsetCommitParams, ) -> Result<(), Error>
Commits offsets for consumed messages.
A consumer must be created before calling this method. The consumer URI must be set.
§Arguments
params- Parameters specifying which partition offsets to commit
§Returns
Ok(())- Offsets were successfully committedErr(Error)- An error if consumer URI is not set or commit fails
§Examples
use kafka_http::{KafkaHttpClient, types::PartitionOffsetCommitParams};
let mut client = KafkaHttpClient::new("http://localhost:8082");
// ... create consumer, subscribe, and poll ...
let params = PartitionOffsetCommitParams {
partitions: vec![/* partition offsets */],
..Default::default()
};
client.commit(¶ms).await?;Trait Implementations§
Source§impl Clone for KafkaHttpClient
impl Clone for KafkaHttpClient
Source§fn clone(&self) -> KafkaHttpClient
fn clone(&self) -> KafkaHttpClient
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more