Struct kafka::client::KafkaClient [] [src]

pub struct KafkaClient {
    pub topic_partitions: HashMap<StringVec<i32>>,
    // some fields omitted
}

Client struct.

It keeps track of brokers and topic metadata

Implements methods described by Kafka Protocol (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol)

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();

You will have to load metadata before making any other request.

Fields

topic_partitions: HashMap<StringVec<i32>>

HashMap where topic is the key and list of partitions is the value

Methods

impl KafkaClient
[src]

fn new(hosts: Vec<String>) -> KafkaClient

Create a new instance of KafkaClient

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));

fn load_metadata_all(&mut self) -> Result<()>

Resets and loads metadata for all topics.

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();

fn load_metadata(&mut self, topics: Vec<String>) -> Result<()>

Reloads metadata for a list of supplied topics

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata(vec!("my-topic".to_string()));

returns Result<(), error::Error>

fn reset_metadata(&mut self)

Clears metadata stored in the client. You must load metadata after this call if you want to use the client

fn fetch_offsets(&mut self, topics: Vec<String>, time: i64) -> Result<HashMap<StringVec<PartitionOffset>>>

Fetch offsets for a list of topics.

time - Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let topics = client.topic_partitions.keys().cloned().collect();
let offsets = client.fetch_offsets(topics, -1);

Returns a hashmap of (topic, PartitionOffset data). PartitionOffset will contain parition and offset info Or Error code as returned by Kafka.

fn fetch_topic_offset(&mut self, topic: String, time: i64) -> Result<HashMap<StringVec<PartitionOffset>>>

Fetch offset for a topic.

time - Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let offsets = client.fetch_topic_offset("my-topic".to_string(), -1);

Returns a hashmap of (topic, PartitionOffset data). PartitionOffset will contain parition and offset info Or Error code as returned by Kafka.

fn fetch_messages_multi(&mut self, input: Vec<TopicPartitionOffset>) -> Result<Vec<TopicMessage>>

Fetch messages from Kafka (Multiple topic, partition, offset)

It takes a vector of utils:TopicPartitionOffset and returns a vector of utils::TopicMessage or error::Error

You can figure out the appropriate partition and offset using client's client.topic_partitions and client.fetch_topic_offset(topic)

Examples

use kafka::utils;
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let msgs = client.fetch_messages_multi(vec!(utils::TopicPartitionOffset{
                                                topic: "my-topic".to_string(),
                                                partition: 0,
                                                offset: 0
                                                },
                                            utils::TopicPartitionOffset{
                                                topic: "my-topic-2".to_string(),
                                                partition: 0,
                                                offset: 0
                                            }));

fn fetch_messages(&mut self, topic: String, partition: i32, offset: i64) -> Result<Vec<TopicMessage>>

Fetch messages from Kafka (Single topic, partition, offset)

It takes a single topic, parition and offset and return a vector of messages (utils::TopicMessage) or error::Error

You can figure out the appropriate partition and offset using client's client.topic_partitions and client.fetch_topic_offset(topic)

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let msgs = client.fetch_messages("my-topic".to_string(), 0, 0);

fn send_messages(&mut self, required_acks: i16, timeout: i32, input: Vec<ProduceMessage>) -> Result<Vec<TopicPartitionOffsetError>>

Send a message to Kafka

You can figure out the appropriate partition and offset using client's client.topic_partitions and client.fetch_topic_offset(topic)

required_acks - indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (Not Implemented) (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas).

timeout - This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in required_acks

input - A vector of utils::ProduceMessage

Example

use kafka::utils;
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let m1 = "a".to_string().into_bytes();
let m2 = "b".to_string().into_bytes();
let req = vec!(utils::ProduceMessage{topic: "my-topic".to_string(), message: m1},
                utils::ProduceMessage{topic: "my-topic-2".to_string(), message: m2});
println!("{:?}", client.send_messages(1, 100, req));

The return value will contain a vector of topic, partition, offset and error if any OR error:Error

fn send_message(&mut self, required_acks: i16, timeout: i32, topic: String, message: Vec<u8>) -> Result<Vec<TopicPartitionOffsetError>>

Send a message to Kafka

You can figure out the appropriate partition and offset using client's client.topic_partitions and client.fetch_topic_offset(topic)

required_acks - indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (Not Implemented) (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas).

timeout - This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in required_acks

message - A single message as a vector of u8s

Example

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let msgs = client.send_message(1, 100, "my-topic".to_string(), "msg".to_string().into_bytes());

The return value will contain topic, partition, offset and error if any OR error:Error

fn commit_offsets(&mut self, group: String, input: Vec<TopicPartitionOffset>) -> Result<()>

Commit offset to topic, partition of a consumer group

It takes a group name and list of utils::TopicPartitionOffset and returns () or error::Error

You can figure out the appropriate partition and offset using client's client.topic_partitions and client.fetch_topic_offset(topic)

Examples

use kafka::utils;
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let resp = client.commit_offsets("my-group".to_string(), vec!(
                utils::TopicPartitionOffset{topic: "my-topic".to_string(), partition: 0, offset: 100},
                utils::TopicPartitionOffset{topic: "my-topic".to_string(), partition: 1, offset: 100}));

fn commit_offset(&mut self, group: String, topic: String, partition: i32, offset: i64) -> Result<()>

Commit offset to topic, partition of a consumer group

It takes a group name, topic, partition and offset and returns () or error::Error

You can figure out the appropriate partition and offset using client's client.topic_partitions and client.fetch_topic_offset(topic)

Examples

use kafka::utils;
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let resp = client.commit_offset("my-group".to_string(), "my-topic".to_string(), 0, 100);

fn fetch_group_topics_offset(&mut self, group: String, input: Vec<TopicPartition>) -> Result<Vec<TopicPartitionOffsetError>>

Fetch offset for vector of topic, partition of a consumer group

It takes a group name and list of utils::TopicPartition and returns utils::TopicPartitionOffsetError or error::Error

You can figure out the appropriate partition using client's client.topic_partitions

Examples

use kafka::utils;
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let resp = client.fetch_group_topics_offset("my-group".to_string(), vec!(
                utils::TopicPartition{topic: "my-topic".to_string(), partition: 0},
                utils::TopicPartition{topic: "my-topic".to_string(), partition: 1}));

fn fetch_group_topic_offset(&mut self, group: String, topic: String) -> Result<Vec<TopicPartitionOffsetError>>

Fetch offset for all partitions of a topic of a consumer group

It takes a group name and a topic and returns utils::TopicPartitionOffsetError or error::Error

You can figure out the appropriate partition using client's client.topic_partitions

Examples

use kafka::utils;
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let resp = client.fetch_group_topic_offset("my-group".to_string(),"my-topic".to_string());

fn fetch_group_offset(&mut self, group: String) -> Result<Vec<TopicPartitionOffsetError>>

Fetch offset for all partitions of all topics of a consumer group

It takes a group name and returns utils::TopicPartitionOffsetError or error::Error

You can figure out the topics using client's client.topic_partitions

Examples

use kafka::utils;
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
let res = client.load_metadata_all();
let resp = client.fetch_group_offset("my-group".to_string());

Trait Implementations

impl Debug for KafkaClient
[src]

fn fmt(&self, __arg_0: &mut Formatter) -> Result

Formats the value using the given formatter.

impl Default for KafkaClient
[src]

fn default() -> KafkaClient

Returns the "default value" for a type. Read more