Struct kafka::client::KafkaClient [] [src]

pub struct KafkaClient {
    // some fields omitted
}

Client struct keeping track of brokers and topic metadata.

Implements methods described by the Kafka Protocol.

You will have to load metadata before making any other request.

Methods

impl KafkaClient
[src]

fn new(hosts: Vec<String>) -> KafkaClient

Creates a new instance of KafkaClient. Before being able to successfully use the new client, you'll have to load metadata.

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();

fn new_secure(hosts: Vec<String>, security: SecurityConfig) -> KafkaClient

Creates a new secure instance of KafkaClient. Before being able to successfully use the new client, you'll have to load metadata.

Examples

extern crate openssl;
extern crate kafka;

use openssl::ssl::{Ssl, SslContext, SslStream, SslMethod, SSL_VERIFY_PEER};
use openssl::x509::X509FileType;
use kafka::client::{KafkaClient, SecurityConfig};

fn main() {
    let (key, cert) = ("client.key".to_string(), "client.crt".to_string());

     // OpenSSL offers a variety of complex configurations. Here is an example:
     let mut ctx = SslContext::new(SslMethod::Sslv23).unwrap();
     ctx.set_cipher_list("DEFAULT").unwrap();
     ctx.set_certificate_file(&cert, X509FileType::PEM).unwrap();
     ctx.set_private_key_file(&key, X509FileType::PEM).unwrap();
     ctx.set_default_verify_paths().unwrap();
     ctx.set_verify(SSL_VERIFY_PEER, None);

     let mut client = KafkaClient::new_secure(vec!("localhost:9092".to_owned()),
                                              SecurityConfig::new(ctx));
     client.load_metadata_all().unwrap();
}

See also KafkaClient::load_metadatata_all and KafkaClient::load_metadata methods, the creates openssl and openssl_verify, as well as Kafka's documentation.

fn hosts(&self) -> &[String]

Exposes the hosts used for discovery of the target kafka cluster. This set of hosts corresponds to the values supplied to KafkaClient::new.

fn set_compression(&mut self, compression: Compression)

Sets the compression algorithm to use when sending out messages.

Example

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();
client.set_compression(kafka::client::Compression::SNAPPY);

fn compression(&self) -> Compression

Retrieves the current KafkaClient::set_compression setting.

fn set_fetch_max_wait_time(&mut self, max_wait_time: i32)

Sets the maximum time in milliseconds to wait for insufficient data to become available when fetching messages.

See also KafkaClient::set_fetch_min_bytes(..) and KafkaClient::set_fetch_max_bytes_per_partition(..).

fn fetch_max_wait_time(&self) -> i32

Retrieves the current KafkaClient::set_fetch_max_wait_time setting.

fn set_fetch_min_bytes(&mut self, min_bytes: i32)

Sets the minimum number of bytes of available data to wait for as long as specified by KafkaClient::set_fetch_max_wait_time when fetching messages.

By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate 64k of data before responding).

Example

use kafka::client::{KafkaClient, FetchPartition};

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
client.set_fetch_max_wait_time(100);
client.set_fetch_min_bytes(64 * 1024);
let r = client.fetch_messages(&[FetchPartition::new("my-topic", 0, 0)]);

See also KafkaClient::set_fetch_max_wait_time(..) and KafkaClient::set_fetch_max_bytes_per_partition(..).

fn fetch_min_bytes(&self) -> i32

Retrieves the current KafkaClient::set_fetch_min_bytes setting.

fn set_fetch_max_bytes_per_partition(&mut self, max_bytes: i32)

Sets the default maximum number of bytes to obtain from a single kafka partition when fetching messages.

This basically determines the maximum message size this client will be able to fetch. If a topic partition contains a message larger than this specified number of bytes, the server will not deliver it.

Note that this setting is related to a single partition. The overall potential data size in a fetch messages response will thus be determined by the number of partitions in the fetch messages request times this "max bytes per partitions."

This client will use this setting by default for all queried partitions, however, fetch_messages does allow you to override this setting for a particular partition being queried.

See also KafkaClient::set_fetch_max_wait_time, KafkaClient::set_fetch_min_bytes, and KafkaClient::fetch_messages.

fn fetch_max_bytes_per_partition(&self) -> i32

Retrieves the current KafkaClient::set_fetch_max_bytes_per_partition setting.

fn set_fetch_crc_validation(&mut self, validate_crc: bool)

Specifies whether the to perform CRC validation on fetched messages.

This ensures detection of on-the-wire or on-disk corruption to fetched messages. This check adds some overhead, so it may be disabled in cases seeking extreme performance.

fn fetch_crc_validation(&self) -> bool

Retrieves the current KafkaClient::set_fetch_crc_validation setting.

fn topics(&self) -> Topics

Provides a view onto the currently loaded metadata of known .

Examples

use kafka::client::KafkaClient;
use kafka::client::metadata::Broker;

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
for topic in client.topics() {
  for partition in topic.partitions() {
    println!("{} #{} => {}", topic.name(), partition.id(),
             partition.leader()
                      .map(Broker::host)
                      .unwrap_or("no-leader!"));
  }
}

fn load_metadata_all(&mut self) -> Result<()>

Resets and loads metadata for all topics from the underlying brokers.

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();
for topic in client.topics().names() {
  println!("topic: {}", topic);
}

Returns the metadata for all loaded topics underlying this client.

fn load_metadata<T: AsRef<str>>(&mut self, topics: &[T]) -> Result<()>

Reloads metadata for a list of supplied topics.

Note: if any of the specified topics does not exist yet on the underlying brokers and these have the configuration for "auto create topics" enabled, the remote kafka instance will create the yet missing topics on the fly as a result of explicitely loading their metadata. This is in contrast to other methods of this KafkaClient which will silently filter out requests to not-yet-loaded/not-yet-known topics and, thus, not cause topics to be automatically created.

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
let _ = client.load_metadata(&["my-topic"]).unwrap();

Returns the metadata for all loaded topics underlying this client (this might be more topics than specified right to this method call.)

fn reset_metadata(&mut self)

Clears metadata stored in the client. You must load metadata after this call if you want to use the client.

fn fetch_offsets<T: AsRef<str>>(&mut self, topics: &[T], offset: FetchOffset) -> Result<HashMap<StringVec<PartitionOffset>>>

Fetch offsets for a list of topics

Examples

use kafka::client::KafkaClient;

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
let topics: Vec<String> = client.topics().names().map(ToOwned::to_owned).collect();
let offsets = client.fetch_offsets(&topics, kafka::client::FetchOffset::Latest).unwrap();

Returns a mapping of topic name to PartitionOffsets for each currently available partition of the corresponding topic.

fn fetch_topic_offsets<T: AsRef<str>>(&mut self, topic: T, offset: FetchOffset) -> Result<Vec<PartitionOffset>>

Fetch offset for a single topic.

Examples

use kafka::client::{KafkaClient, FetchOffset};

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
let offsets = client.fetch_topic_offsets("my-topic", FetchOffset::Latest).unwrap();

Returns a vector of the offset data for each available partition. See also KafkaClient::fetch_offsets.

fn fetch_messages<'a, I, J>(&mut self, input: I) -> Result<Vec<Response>> where J: AsRef<FetchPartition<'a>>, I: IntoIterator<Item=J>

Fetch messages from Kafka (multiple topic, partitions).

It takes a vector specifying the topic partitions and their offsets as of which to fetch messages. Additionally, the default "max fetch size per partition" can be explicitely overriden if it is "defined" - this is, if max_bytes is greater than zero.

The result is exposed in a raw, complicated manner but allows for very efficient consumption possibilities. All of the data available through the returned fetch responses is bound to their lifetime as that data is merely a "view" into parts of the response structs. If you need to keep individual messages for a longer time than the whole fetch responses, you'll need to make a copy of the message data.

  • This method transparently uncompresses messages (while Kafka might sent them in compressed format.)

  • This method ensures to skip messages with a lower offset than requested (while Kafka might for efficiency reasons sent messages with a lower offset.)

Note: before using this method consider using kafka::consumer::Consumer instead which provides an easier to use API for the regular use-case of fetching messesage from Kafka.

Example

This example demonstrates iterating all fetched messages from two topic partitions. From one partition we allow Kafka to deliver to us the default number bytes as defined by KafkaClient::set_fetch_max_bytes_per_partition, from the other partition we allow Kafka to deliver up to 1MiB of messages.

use kafka::client::{KafkaClient, FetchPartition};

let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();
let reqs = &[FetchPartition::new("my-topic", 0, 0),
             FetchPartition::new("my-topic-2", 0, 0).with_max_bytes(1024*1024)];
let resps = client.fetch_messages(reqs).unwrap();
for resp in resps {
  for t in resp.topics() {
    for p in t.partitions() {
      match p.data() {
        &Err(ref e) => {
          println!("partition error: {}:{}: {}", t.topic(), p.partition(), e)
        }
        &Ok(ref data) => {
          println!("topic: {} / partition: {} / latest available message offset: {}",
                   t.topic(), p.partition(), data.highwatermark_offset());
          for msg in data.messages() {
            println!("topic: {} / partition: {} / message.offset: {} / message.len: {}",
                     t.topic(), p.partition(), msg.offset, msg.value.len());
          }
        }
      }
    }
  }
}

See also kafka::consumer. See also KafkaClient::set_fetch_max_bytes_per_partition.

fn fetch_messages_for_partition<'a>(&mut self, req: &FetchPartition<'a>) -> Result<Vec<Response>>

Fetch messages from a single kafka partition.

See KafkaClient::fetch_messages.

fn produce_messages<'a, 'b, I, J>(&mut self, required_acks: i16, ack_timeout: i32, messages: I) -> Result<Vec<TopicPartitionOffset>> where J: AsRef<ProduceMessage<'a, 'b>>, I: IntoIterator<Item=J>

Send a message to Kafka

required_acks - indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas).

ack_timeout - This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in required_acks

input - A set of ProduceMessages

Note: Unlike the higher-level Producer API, this method will not automatically determine the partition to deliver the message to. It will strictly try to send the message to the specified partition.

Note: Trying to send messages to non-existing topics or non-existing partitions will result in an error.

Example

use kafka::client::{KafkaClient, ProduceMessage};

let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();
let req = vec![ProduceMessage::new("my-topic", 0, None, Some("a".as_bytes())),
               ProduceMessage::new("my-topic-2", 0, None, Some("b".as_bytes()))];
println!("{:?}", client.produce_messages(1, 100, req));

The return value will contain a vector of topic, partition, offset and error if any OR error:Error.

fn commit_offsets<'a, J, I>(&mut self, group: &str, offsets: I) -> Result<()> where J: AsRef<CommitOffset<'a>>, I: IntoIterator<Item=J>

Commit offset for a topic partitions on behalf of a consumer group.

Examples

use kafka::client::{KafkaClient, CommitOffset};

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
client.commit_offsets("my-group",
    &[CommitOffset::new("my-topic", 0, 100),
      CommitOffset::new("my-topic", 1, 99)])
   .unwrap();

In this example, we commit the offset 100 for the topic partition "my-topic:0" and 99 for the topic partition "my-topic:1". Once successfully committed, these can then be retrieved using fetch_group_offsets even from another process or at much later point in time to resume comusing the topic partitions as of these offsets.

fn commit_offset(&mut self, group: &str, topic: &str, partition: i32, offset: i64) -> Result<()>

Commit offset of a particular topic partition on behalf of a consumer group.

Examples

use kafka::client::KafkaClient;

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
client.commit_offset("my-group", "my-topic", 0, 100).unwrap();

See also KafkaClient::commit_offsets.

fn fetch_group_offsets<'a, J, I>(&mut self, group: &str, partitions: I) -> Result<Vec<TopicPartitionOffset>> where J: AsRef<FetchGroupOffset<'a>>, I: IntoIterator<Item=J>

Fetch offset for a specified list of topic partitions of a consumer group

Examples

use kafka::client::{KafkaClient, FetchGroupOffset};

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();

let offsets =
     client.fetch_group_offsets("my-group",
            &[FetchGroupOffset::new("my-topic", 0),
              FetchGroupOffset::new("my-topic", 1)])
            .unwrap();

See also KafkaClient::fetch_group_topic_offsets.

fn fetch_group_topic_offsets(&mut self, group: &str, topic: &str) -> Result<Vec<TopicPartitionOffset>>

Fetch offset for all partitions of a particular topic of a consumer group

Examples

use kafka::client::KafkaClient;

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
let offsets = client.fetch_group_topic_offsets("my-group", "my-topic").unwrap();

Trait Implementations

impl Debug for KafkaClient
[src]

fn fmt(&self, __arg_0: &mut Formatter) -> Result

Formats the value using the given formatter.