Struct kafka::client::KafkaClient [] [src]

pub struct KafkaClient {
    pub topic_partitions: HashMap<StringVec<i32>>,
    // some fields omitted
}

Client struct. It keeps track of brokers and topic metadata

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();

You will have to load metadata before making any other request.

Fields

topic_partitions: HashMap<StringVec<i32>>

Methods

impl KafkaClient
[src]

fn new(hosts: Vec<String>) -> KafkaClient

Create a new instance of KafkaClient

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));

fn load_metadata_all(&mut self) -> Result<()>

Resets and loads metadata for all topics.

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();

fn load_metadata(&mut self, topics: Vec<String>) -> Result<()>

Reloads metadata for a list of supplied topics

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata(vec!("my-topic".to_string()));

returns Result<(), error::Error>

fn reset_metadata(&mut self)

Clears metadata stored in the client. You must load metadata after this call if you want to use the client

fn fetch_offsets(&mut self, topics: Vec<String>) -> Result<HashMap<StringVec<PartitionOffset>>>

Fetch offsets for a list of topics. It gets the latest offset only. Support for getting earliest will be added soon

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let topics = client.topic_partitions.keys().cloned().collect();
let offsets = client.fetch_offsets(topics);

Returns a hashmap of (topic, PartitionOffset data). PartitionOffset will contain parition and offset info Or Error code as returned by Kafka.

fn fetch_topic_offset(&mut self, topic: String) -> Result<HashMap<StringVec<PartitionOffset>>>

Fetch offset for a topic. It gets the latest offset only. Support for getting earliest will be added soon

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let offsets = client.fetch_topic_offset("my-topic".to_string());

Returns a hashmap of (topic, PartitionOffset data). PartitionOffset will contain parition and offset info Or Error code as returned by Kafka.

fn fetch_messages_multi(&mut self, input: Vec<TopicPartitionOffset>) -> Result<Vec<TopicMessage>>

Fetch messages from Kafka (Multiple topic, partition, offset)

It takes a vector of utils:TopicPartitionOffset and returns a vector of utils::TopicMessage or error::Error You can figure out the appropriate partition and offset using client's client.topic_partitions and client.fetch_topic_offset(topic)

Examples

use kafka::utils;
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let msgs = client.fetch_messages_multi(vec!(utils::TopicPartitionOffset{
                                                topic: "my-topic".to_string(),
                                                partition: 0,
                                                offset: 0
                                                },
                                            utils::TopicPartitionOffset{
                                                topic: "my-topic-2".to_string(),
                                                partition: 0,
                                                offset: 0
                                            }));

fn fetch_messages(&mut self, topic: String, partition: i32, offset: i64) -> Result<Vec<TopicMessage>>

Fetch messages from Kafka (Single topic, partition, offset)

It takes a single topic, parition and offset and return a vector of messages or error::Error You can figure out the appropriate partition and offset using client's client.topic_partitions and client.fetch_topic_offset(topic)

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let msgs = client.fetch_messages("my-topic".to_string(), 0, 0);

fn send_messages(&mut self, required_acks: i16, timeout: i32, input: Vec<ProduceMessage>) -> Result<Vec<TopicPartitionOffsetError>>

Send a message to Kafka

You can figure out the appropriate partition and offset using client's client.topic_partitions and client.fetch_topic_offset(topic)

required_acks - indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (Not Implemented) (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas).

timeout - This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in required_acks input - A vector of utils::ProduceMessage

Example

use kafka::utils;
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let m1 = "a".to_string().into_bytes();
let m2 = "b".to_string().into_bytes();
let req = vec!(utils::ProduceMessage{topic: "my-topic".to_string(), message: m1},
                utils::ProduceMessage{topic: "my-topic-2".to_string(), message: m2});
println!("{:?}", client.send_messages(1, 100, req));

The return value will contain a vector of topic, partition, offset and error if any OR error:Error

fn send_message(&mut self, required_acks: i16, timeout: i32, topic: String, message: Vec<u8>) -> Result<Vec<TopicPartitionOffsetError>>

Send a message to Kafka

You can figure out the appropriate partition and offset using client's client.topic_partitions and client.fetch_topic_offset(topic)

required_acks - indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (Not Implemented) (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas).

timeout - This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in required_acks message - A single message as a vector of u8s

Example

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let msgs = client.send_message(1, 100, "my-topic".to_string(), "msg".to_string().into_bytes());

The return value will contain topic, partition, offset and error if any OR error:Error

fn commit_offsets(&mut self, group: String, input: Vec<(String, i32, i64)>) -> Result<()>

fn commit_offset(&mut self, group: String, topic: String, partition: i32, offset: i64) -> Result<()>

fn fetch_group_topic_offsets(&mut self, group: String, input: Vec<TopicPartition>) -> Result<Vec<TopicPartitionOffsetError>>

fn fetch_group_topic_offset(&mut self, group: String, topic: String, partition: i32) -> Result<Vec<TopicPartitionOffsetError>>

Trait Implementations

impl Debug for KafkaClient
[src]

fn fmt(&self, __arg_0: &mut Formatter) -> Result

Formats the value using the given formatter.

impl Default for KafkaClient
[src]

fn default() -> KafkaClient

Returns the "default value" for a type. Read more