Struct kafka::client::KafkaClient
[−]
[src]
pub struct KafkaClient { pub topic_partitions: HashMap<String, Vec<i32>>, // some fields omitted }
Client struct.
It keeps track of brokers and topic metadata
Implements methods described by Kafka Protocol (https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol)
Examples
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string())); let res = client.load_metadata_all();
You will have to load metadata before making any other request.
Fields
topic_partitions: HashMap<String, Vec<i32>>
HashMap where topic
is the key and list of partitions
is the value
Methods
impl KafkaClient
[src]
fn new(hosts: Vec<String>) -> KafkaClient
Create a new instance of KafkaClient
Examples
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string()));
fn load_metadata_all(&mut self) -> Result<()>
Resets and loads metadata for all topics.
Examples
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string())); let res = client.load_metadata_all();
fn load_metadata(&mut self, topics: Vec<String>) -> Result<()>
Reloads metadata for a list of supplied topics
Examples
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string())); let res = client.load_metadata(vec!("my-topic".to_string()));
returns Result<(), error::Error>
fn reset_metadata(&mut self)
Clears metadata stored in the client. You must load metadata after this call if you want to use the client
fn fetch_offsets(&mut self, topics: Vec<String>, time: i64) -> Result<HashMap<String, Vec<PartitionOffset>>>
Fetch offsets for a list of topics.
time
- Used to ask for all messages before a certain time (ms). There are two special values.
Specify -1 to receive the latest offset (i.e. the offset of the next coming message)
and -2 to receive the earliest available offset
Examples
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string())); let res = client.load_metadata_all(); let topics = client.topic_partitions.keys().cloned().collect(); let offsets = client.fetch_offsets(topics, -1);
Returns a hashmap of (topic, PartitionOffset data). PartitionOffset will contain parition and offset info Or Error code as returned by Kafka.
fn fetch_topic_offset(&mut self, topic: String, time: i64) -> Result<HashMap<String, Vec<PartitionOffset>>>
Fetch offset for a topic.
time
- Used to ask for all messages before a certain time (ms). There are two special values.
Specify -1 to receive the latest offset (i.e. the offset of the next coming message)
and -2 to receive the earliest available offset
Examples
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string())); let res = client.load_metadata_all(); let offsets = client.fetch_topic_offset("my-topic".to_string(), -1);
Returns a hashmap of (topic, PartitionOffset data). PartitionOffset will contain parition and offset info Or Error code as returned by Kafka.
fn fetch_messages_multi(&mut self, input: Vec<TopicPartitionOffset>) -> Result<Vec<TopicMessage>>
Fetch messages from Kafka (Multiple topic, partition, offset)
It takes a vector of utils:TopicPartitionOffset
and returns a vector of utils::TopicMessage
or error::Error
You can figure out the appropriate partition and offset using client's
client.topic_partitions
and client.fetch_topic_offset(topic)
Examples
use kafka::utils; let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string())); let res = client.load_metadata_all(); let msgs = client.fetch_messages_multi(vec!(utils::TopicPartitionOffset{ topic: "my-topic".to_string(), partition: 0, offset: 0 }, utils::TopicPartitionOffset{ topic: "my-topic-2".to_string(), partition: 0, offset: 0 }));
fn fetch_messages(&mut self, topic: String, partition: i32, offset: i64) -> Result<Vec<TopicMessage>>
Fetch messages from Kafka (Single topic, partition, offset)
It takes a single topic, parition and offset and return a vector of messages (utils::TopicMessage
)
or error::Error
You can figure out the appropriate partition and offset using client's client.topic_partitions and client.fetch_topic_offset(topic)
Examples
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string())); let res = client.load_metadata_all(); let msgs = client.fetch_messages("my-topic".to_string(), 0, 0);
fn send_messages(&mut self, required_acks: i16, timeout: i32, input: Vec<ProduceMessage>) -> Result<Vec<TopicPartitionOffsetError>>
Send a message to Kafka
You can figure out the appropriate partition and offset using client's
client.topic_partitions
and client.fetch_topic_offset(topic)
required_acks
- indicates how many acknowledgements the servers should receive before
responding to the request. If it is 0 the server will not send any response (Not Implemented)
(this is the only case where the server will not reply to a request).
If it is 1, the server will wait the data is written to the local log before sending
a response. If it is -1 the server will block until the message is committed by all
in sync replicas before sending a response. For any number > 1 the server will block
waiting for this number of acknowledgements to occur (but the server will never wait
for more acknowledgements than there are in-sync replicas).
timeout
- This provides a maximum time in milliseconds the server can await the
receipt of the number of acknowledgements in required_acks
input
- A vector of utils::ProduceMessage
Example
use kafka::utils; let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string())); let res = client.load_metadata_all(); let m1 = "a".to_string().into_bytes(); let m2 = "b".to_string().into_bytes(); let req = vec!(utils::ProduceMessage{topic: "my-topic".to_string(), message: m1}, utils::ProduceMessage{topic: "my-topic-2".to_string(), message: m2}); println!("{:?}", client.send_messages(1, 100, req));
The return value will contain a vector of topic, partition, offset and error if any OR error:Error
fn send_message(&mut self, required_acks: i16, timeout: i32, topic: String, message: Vec<u8>) -> Result<Vec<TopicPartitionOffsetError>>
Send a message to Kafka
You can figure out the appropriate partition and offset using client's
client.topic_partitions
and client.fetch_topic_offset(topic)
required_acks
- indicates how many acknowledgements the servers should receive before
responding to the request. If it is 0 the server will not send any response (Not Implemented)
(this is the only case where the server will not reply to a request).
If it is 1, the server will wait the data is written to the local log before sending
a response. If it is -1 the server will block until the message is committed by all
in sync replicas before sending a response. For any number > 1 the server will block
waiting for this number of acknowledgements to occur (but the server will never wait
for more acknowledgements than there are in-sync replicas).
timeout
- This provides a maximum time in milliseconds the server can await the
receipt of the number of acknowledgements in required_acks
message
- A single message as a vector of u8s
Example
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string())); let res = client.load_metadata_all(); let msgs = client.send_message(1, 100, "my-topic".to_string(), "msg".to_string().into_bytes());
The return value will contain topic, partition, offset and error if any OR error:Error
fn commit_offsets(&mut self, group: String, input: Vec<TopicPartitionOffset>) -> Result<()>
Commit offset to topic, partition of a consumer group
It takes a group name and list of utils::TopicPartitionOffset
and returns ()
or error::Error
You can figure out the appropriate partition and offset using client's
client.topic_partitions
and client.fetch_topic_offset(topic)
Examples
use kafka::utils; let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string())); let res = client.load_metadata_all(); let resp = client.commit_offsets("my-group".to_string(), vec!( utils::TopicPartitionOffset{topic: "my-topic".to_string(), partition: 0, offset: 100}, utils::TopicPartitionOffset{topic: "my-topic".to_string(), partition: 1, offset: 100}));
fn commit_offset(&mut self, group: String, topic: String, partition: i32, offset: i64) -> Result<()>
Commit offset to topic, partition of a consumer group
It takes a group name, topic, partition and offset and returns ()
or error::Error
You can figure out the appropriate partition and offset using client's
client.topic_partitions
and client.fetch_topic_offset(topic)
Examples
use kafka::utils; let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string())); let res = client.load_metadata_all(); let resp = client.commit_offset("my-group".to_string(), "my-topic".to_string(), 0, 100);
fn fetch_group_topics_offset(&mut self, group: String, input: Vec<TopicPartition>) -> Result<Vec<TopicPartitionOffsetError>>
Fetch offset for vector of topic, partition of a consumer group
It takes a group name and list of utils::TopicPartition
and returns utils::TopicPartitionOffsetError
or error::Error
You can figure out the appropriate partition using client's
client.topic_partitions
Examples
use kafka::utils; let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string())); let res = client.load_metadata_all(); let resp = client.fetch_group_topics_offset("my-group".to_string(), vec!( utils::TopicPartition{topic: "my-topic".to_string(), partition: 0}, utils::TopicPartition{topic: "my-topic".to_string(), partition: 1}));
fn fetch_group_topic_offset(&mut self, group: String, topic: String) -> Result<Vec<TopicPartitionOffsetError>>
Fetch offset for all partitions of a topic of a consumer group
It takes a group name and a topic and returns utils::TopicPartitionOffsetError
or error::Error
You can figure out the appropriate partition using client's
client.topic_partitions
Examples
use kafka::utils; let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string())); let res = client.load_metadata_all(); let resp = client.fetch_group_topic_offset("my-group".to_string(),"my-topic".to_string());
fn fetch_group_offset(&mut self, group: String) -> Result<Vec<TopicPartitionOffsetError>>
Fetch offset for all partitions of all topics of a consumer group
It takes a group name and returns utils::TopicPartitionOffsetError
or error::Error
You can figure out the topics using client's
client.topic_partitions
Examples
use kafka::utils; let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_string())); let res = client.load_metadata_all(); let resp = client.fetch_group_offset("my-group".to_string());
Trait Implementations
impl Debug for KafkaClient
[src]
impl Default for KafkaClient
[src]
fn default() -> KafkaClient
Returns the "default value" for a type. Read more