Struct kafka::client::KafkaClient
[−]
[src]
pub struct KafkaClient { // some fields omitted }
Client struct keeping track of brokers and topic metadata.
Implements methods described by the Kafka Protocol.
You will have to load metadata before making any other request.
Methods
impl KafkaClient
[src]
fn new(hosts: Vec<String>) -> KafkaClient
Creates a new instance of KafkaClient. Before being able to successfully use the new client, you'll have to load metadata.
Examples
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned())); client.load_metadata_all().unwrap();
fn new_secure(hosts: Vec<String>, security: SecurityConfig) -> KafkaClient
Creates a new secure instance of KafkaClient. Before being able to successfully use the new client, you'll have to load metadata.
Examples
extern crate openssl; extern crate kafka; use openssl::ssl::{Ssl, SslContext, SslStream, SslMethod, SSL_VERIFY_PEER}; use openssl::x509::X509FileType; use kafka::client::{KafkaClient, SecurityConfig}; fn main() { let (key, cert) = ("client.key".to_string(), "client.crt".to_string()); // OpenSSL offers a variety of complex configurations. Here is an example: let mut ctx = SslContext::new(SslMethod::Sslv23).unwrap(); ctx.set_cipher_list("DEFAULT").unwrap(); ctx.set_certificate_file(&cert, X509FileType::PEM).unwrap(); ctx.set_private_key_file(&key, X509FileType::PEM).unwrap(); ctx.set_default_verify_paths().unwrap(); ctx.set_verify(SSL_VERIFY_PEER, None); let mut client = KafkaClient::new_secure(vec!("localhost:9092".to_owned()), SecurityConfig::new(ctx)); client.load_metadata_all().unwrap(); }
See also KafkaClient::load_metadatata_all
and
KafkaClient::load_metadata
methods, the creates
openssl
and openssl_verify,
as well as Kafka's documentation.
fn hosts(&self) -> &[String]
Exposes the hosts used for discovery of the target kafka
cluster. This set of hosts corresponds to the values supplied
to KafkaClient::new
.
fn set_compression(&mut self, compression: Compression)
Sets the compression algorithm to use when sending out messages.
Example
use kafka::client::{Compression, KafkaClient}; let mut client = KafkaClient::new(vec!("localhost:9092".to_owned())); client.load_metadata_all().unwrap(); client.set_compression(Compression::NONE);
fn compression(&self) -> Compression
Retrieves the current KafkaClient::set_compression
setting.
fn set_fetch_max_wait_time(&mut self, max_wait_time: i32)
Sets the maximum time in milliseconds to wait for insufficient data to become available when fetching messages.
See also KafkaClient::set_fetch_min_bytes(..)
and
KafkaClient::set_fetch_max_bytes_per_partition(..)
.
fn fetch_max_wait_time(&self) -> i32
Retrieves the current KafkaClient::set_fetch_max_wait_time
setting.
fn set_fetch_min_bytes(&mut self, min_bytes: i32)
Sets the minimum number of bytes of available data to wait for
as long as specified by KafkaClient::set_fetch_max_wait_time
when fetching messages.
By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate 64k of data before responding).
Example
use kafka::client::{KafkaClient, FetchPartition}; let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]); client.load_metadata_all().unwrap(); client.set_fetch_max_wait_time(100); client.set_fetch_min_bytes(64 * 1024); let r = client.fetch_messages(&[FetchPartition::new("my-topic", 0, 0)]);
See also KafkaClient::set_fetch_max_wait_time(..)
and
KafkaClient::set_fetch_max_bytes_per_partition(..)
.
fn fetch_min_bytes(&self) -> i32
Retrieves the current KafkaClient::set_fetch_min_bytes
setting.
fn set_fetch_max_bytes_per_partition(&mut self, max_bytes: i32)
Sets the default maximum number of bytes to obtain from a single kafka partition when fetching messages.
This basically determines the maximum message size this client will be able to fetch. If a topic partition contains a message larger than this specified number of bytes, the server will not deliver it.
Note that this setting is related to a single partition. The overall potential data size in a fetch messages response will thus be determined by the number of partitions in the fetch messages request times this "max bytes per partitions."
This client will use this setting by default for all queried
partitions, however, fetch_messages
does allow you to
override this setting for a particular partition being
queried.
See also KafkaClient::set_fetch_max_wait_time
,
KafkaClient::set_fetch_min_bytes
, and KafkaClient::fetch_messages
.
fn fetch_max_bytes_per_partition(&self) -> i32
Retrieves the current
KafkaClient::set_fetch_max_bytes_per_partition
setting.
fn set_fetch_crc_validation(&mut self, validate_crc: bool)
Specifies whether the to perform CRC validation on fetched messages.
This ensures detection of on-the-wire or on-disk corruption to fetched messages. This check adds some overhead, so it may be disabled in cases seeking extreme performance.
fn fetch_crc_validation(&self) -> bool
Retrieves the current KafkaClient::set_fetch_crc_validation
setting.
fn set_group_offset_storage(&mut self, storage: GroupOffsetStorage)
Specifies the group offset storage to address when fetching or committing group offsets.
In addition to Zookeeper, Kafka 0.8.2 brokers or later offer a more performant (and scalable) way to manage group offset directly by itself. Note that the remote storages are separate and independent on each other. Hence, you typically want consistently hard-code your choice in your program.
Unless you have a 0.8.1 broker or want to participate in a
group which is already based on Zookeeper, you generally want
to choose GroupOffsetStorage::Kafka
here.
See also KafkaClient::fetch_group_offsets
and
KafkaClient::commit_offsets
.
fn group_offset_storage(&self) -> GroupOffsetStorage
Retrieves the current KafkaClient::set_group_offset_storage
settings.
fn set_retry_backoff_time(&mut self, millis: u32)
Specifies the number of milliseconds to wait before retrying a failed, repeatable operation against Kafka. This avoids retrying such operations in a tight loop.
fn retry_backoff_time(&self) -> u32
Retrieves the current KafkaClient::set_retry_backoff_time
setting.
fn set_retry_max_attempts(&mut self, attempts: u32)
Specifies the upper limit of retry attempts for failed, repeatable operations against kafka. This avoids retrying them forever.
fn retry_max_attempts(&self) -> u32
Retrieves the current KafkaClient::set_retry_max_attempts
setting.
fn topics(&self) -> Topics
Provides a view onto the currently loaded metadata of known .
Examples
use kafka::client::KafkaClient; use kafka::client::metadata::Broker; let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]); client.load_metadata_all().unwrap(); for topic in client.topics() { for partition in topic.partitions() { println!("{} #{} => {}", topic.name(), partition.id(), partition.leader() .map(Broker::host) .unwrap_or("no-leader!")); } }
fn load_metadata_all(&mut self) -> Result<()>
Resets and loads metadata for all topics from the underlying brokers.
Examples
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned())); client.load_metadata_all().unwrap(); for topic in client.topics().names() { println!("topic: {}", topic); }
Returns the metadata for all loaded topics underlying this client.
fn load_metadata<T: AsRef<str>>(&mut self, topics: &[T]) -> Result<()>
Reloads metadata for a list of supplied topics.
Note: if any of the specified topics does not exist yet on the
underlying brokers and these have the configuration for "auto
create topics"
enabled,
the remote kafka instance will create the yet missing topics
on the fly as a result of explicitely loading their metadata.
This is in contrast to other methods of this KafkaClient
which will silently filter out requests to
not-yet-loaded/not-yet-known topics and, thus, not cause
topics to be automatically created.
Examples
let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned())); let _ = client.load_metadata(&["my-topic"]).unwrap();
Returns the metadata for all loaded topics underlying this client (this might be more topics than specified right to this method call.)
fn reset_metadata(&mut self)
Clears metadata stored in the client. You must load metadata after this call if you want to use the client.
fn fetch_offsets<T: AsRef<str>>(&mut self, topics: &[T], offset: FetchOffset) -> Result<HashMap<String, Vec<PartitionOffset>>>
Fetch offsets for a list of topics
Examples
use kafka::client::KafkaClient; let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]); client.load_metadata_all().unwrap(); let topics: Vec<String> = client.topics().names().map(ToOwned::to_owned).collect(); let offsets = client.fetch_offsets(&topics, kafka::client::FetchOffset::Latest).unwrap();
Returns a mapping of topic name to PartitionOffset
s for each
currently available partition of the corresponding topic.
fn fetch_topic_offsets<T: AsRef<str>>(&mut self, topic: T, offset: FetchOffset) -> Result<Vec<PartitionOffset>>
Fetch offset for a single topic.
Examples
use kafka::client::{KafkaClient, FetchOffset}; let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]); client.load_metadata_all().unwrap(); let offsets = client.fetch_topic_offsets("my-topic", FetchOffset::Latest).unwrap();
Returns a vector of the offset data for each available partition.
See also KafkaClient::fetch_offsets
.
fn fetch_messages<'a, I, J>(&mut self, input: I) -> Result<Vec<Response>> where J: AsRef<FetchPartition<'a>>, I: IntoIterator<Item=J>
Fetch messages from Kafka (multiple topic, partitions).
It takes a vector specifying the topic partitions and their
offsets as of which to fetch messages. Additionally, the
default "max fetch size per partition" can be explicitely
overriden if it is "defined" - this is, if max_bytes
is
greater than zero.
The result is exposed in a raw, complicated manner but allows for very efficient consumption possibilities. All of the data available through the returned fetch responses is bound to their lifetime as that data is merely a "view" into parts of the response structs. If you need to keep individual messages for a longer time than the whole fetch responses, you'll need to make a copy of the message data.
This method transparently uncompresses messages (while Kafka might sent them in compressed format.)
This method ensures to skip messages with a lower offset than requested (while Kafka might for efficiency reasons sent messages with a lower offset.)
Note: before using this method consider using
kafka::consumer::Consumer
instead which provides an easier
to use API for the regular use-case of fetching messesage from
Kafka.
Example
This example demonstrates iterating all fetched messages from
two topic partitions. From one partition we allow Kafka to
deliver to us the default number bytes as defined by
KafkaClient::set_fetch_max_bytes_per_partition
, from the
other partition we allow Kafka to deliver up to 1MiB of
messages.
use kafka::client::{KafkaClient, FetchPartition}; let mut client = KafkaClient::new(vec!("localhost:9092".to_owned())); client.load_metadata_all().unwrap(); let reqs = &[FetchPartition::new("my-topic", 0, 0), FetchPartition::new("my-topic-2", 0, 0).with_max_bytes(1024*1024)]; let resps = client.fetch_messages(reqs).unwrap(); for resp in resps { for t in resp.topics() { for p in t.partitions() { match p.data() { &Err(ref e) => { println!("partition error: {}:{}: {}", t.topic(), p.partition(), e) } &Ok(ref data) => { println!("topic: {} / partition: {} / latest available message offset: {}", t.topic(), p.partition(), data.highwatermark_offset()); for msg in data.messages() { println!("topic: {} / partition: {} / message.offset: {} / message.len: {}", t.topic(), p.partition(), msg.offset, msg.value.len()); } } } } } }
See also kafka::consumer
.
See also KafkaClient::set_fetch_max_bytes_per_partition
.
fn fetch_messages_for_partition<'a>(&mut self, req: &FetchPartition<'a>) -> Result<Vec<Response>>
Fetch messages from a single kafka partition.
See KafkaClient::fetch_messages
.
fn produce_messages<'a, 'b, I, J>(&mut self, ack: RequiredAcks, ack_timeout: i32, messages: I) -> Result<Vec<TopicPartitionOffset>> where J: AsRef<ProduceMessage<'a, 'b>>, I: IntoIterator<Item=J>
Send a message to Kafka
required_acks
- indicates how many acknowledgements the
servers should receive before responding to the request
ack_timeout
- provides a maximum time in milliseconds the
server can await the receipt of the number of acknowledgements
in required_acks
input
- the set of ProduceMessage
s to send
Note: Unlike the higher-level Producer
API, this method will
not automatically determine the partition to deliver the
message to. It will strictly try to send the message to the
specified partition.
Note: Trying to send messages to non-existing topics or non-existing partitions will result in an error.
Example
use kafka::client::{KafkaClient, ProduceMessage, RequiredAcks}; let mut client = KafkaClient::new(vec!("localhost:9092".to_owned())); client.load_metadata_all().unwrap(); let req = vec![ProduceMessage::new("my-topic", 0, None, Some("a".as_bytes())), ProduceMessage::new("my-topic-2", 0, None, Some("b".as_bytes()))]; println!("{:?}", client.produce_messages(RequiredAcks::One, 100, req));
The return value will contain a vector of topic, partition, offset and error if any OR error:Error.
fn commit_offsets<'a, J, I>(&mut self, group: &str, offsets: I) -> Result<()> where J: AsRef<CommitOffset<'a>>, I: IntoIterator<Item=J>
Commit offset for a topic partitions on behalf of a consumer group.
Examples
use kafka::client::{KafkaClient, CommitOffset}; let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]); client.load_metadata_all().unwrap(); client.commit_offsets("my-group", &[CommitOffset::new("my-topic", 0, 100), CommitOffset::new("my-topic", 1, 99)]) .unwrap();
In this example, we commit the offset 100 for the topic
partition "my-topic:0" and 99 for the topic partition
"my-topic:1". Once successfully committed, these can then be
retrieved using fetch_group_offsets
even from another
process or at much later point in time to resume comusing the
topic partitions as of these offsets.
fn commit_offset(&mut self, group: &str, topic: &str, partition: i32, offset: i64) -> Result<()>
Commit offset of a particular topic partition on behalf of a consumer group.
Examples
use kafka::client::KafkaClient; let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]); client.load_metadata_all().unwrap(); client.commit_offset("my-group", "my-topic", 0, 100).unwrap();
See also KafkaClient::commit_offsets
.
fn fetch_group_offsets<'a, J, I>(&mut self, group: &str, partitions: I) -> Result<Vec<TopicPartitionOffset>> where J: AsRef<FetchGroupOffset<'a>>, I: IntoIterator<Item=J>
Fetch offset for a specified list of topic partitions of a consumer group
Examples
use kafka::client::{KafkaClient, FetchGroupOffset}; let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]); client.load_metadata_all().unwrap(); let offsets = client.fetch_group_offsets("my-group", &[FetchGroupOffset::new("my-topic", 0), FetchGroupOffset::new("my-topic", 1)]) .unwrap();
See also KafkaClient::fetch_group_topic_offsets
.
fn fetch_group_topic_offsets(&mut self, group: &str, topic: &str) -> Result<Vec<TopicPartitionOffset>>
Fetch offset for all partitions of a particular topic of a consumer group
Examples
use kafka::client::KafkaClient; let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]); client.load_metadata_all().unwrap(); let offsets = client.fetch_group_topic_offsets("my-group", "my-topic").unwrap();