Struct kafka::client::KafkaClient [] [src]

pub struct KafkaClient { /* fields omitted */ }

Client struct keeping track of brokers and topic metadata.

Implements methods described by the Kafka Protocol.

You will have to load metadata before making any other request.

Methods

impl KafkaClient
[src]

Creates a new instance of KafkaClient. Before being able to successfully use the new client, you'll have to load metadata.

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();

Creates a new secure instance of KafkaClient. Before being able to successfully use the new client, you'll have to load metadata.

Examples

extern crate openssl;
extern crate kafka;

use openssl::ssl::{SslConnectorBuilder, SslMethod, SSL_VERIFY_PEER};
use openssl::x509::X509_FILETYPE_PEM;
use kafka::client::{KafkaClient, SecurityConfig};

fn main() {
    let (key, cert) = ("client.key".to_string(), "client.crt".to_string());

    // OpenSSL offers a variety of complex configurations. Here is an example:
    let mut builder = SslConnectorBuilder::new(SslMethod::tls()).unwrap();
    {
        let mut ctx = builder.builder_mut();
        ctx.set_cipher_list("DEFAULT").unwrap();
        ctx.set_certificate_file(&cert, X509_FILETYPE_PEM).unwrap();
        ctx.set_private_key_file(&key, X509_FILETYPE_PEM).unwrap();
        ctx.set_default_verify_paths().unwrap();
        ctx.set_verify(SSL_VERIFY_PEER);
    }
    let connector = builder.build();

    let mut client = KafkaClient::new_secure(vec!("localhost:9092".to_owned()),
                                             SecurityConfig::new(connector));
    client.load_metadata_all().unwrap();
}

See also SecurityConfig#with_hostname_verification to disable hostname verification.

See also KafkaClient::load_metadatata_all and KafkaClient::load_metadata methods, the creates openssl and openssl_verify, as well as Kafka's documentation.

Exposes the hosts used for discovery of the target kafka cluster. This set of hosts corresponds to the values supplied to KafkaClient::new.

Sets the client_id to be sent along every request to the remote Kafka brokers. By default, this value is the empty string.

Kafka brokers write out this client id to their request/response trace log - if configured appropriately.

Retrieves the current KafkaClient::set_client_id setting.

Sets the compression algorithm to use when sending out messages.

Example

use kafka::client::{Compression, KafkaClient};

let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();
client.set_compression(Compression::NONE);

Retrieves the current KafkaClient::set_compression setting.

Sets the maximum time in milliseconds to wait for insufficient data to become available when fetching messages.

See also KafkaClient::set_fetch_min_bytes(..) and KafkaClient::set_fetch_max_bytes_per_partition(..).

Retrieves the current KafkaClient::set_fetch_max_wait_time setting.

Sets the minimum number of bytes of available data to wait for as long as specified by KafkaClient::set_fetch_max_wait_time when fetching messages.

By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate 64k of data before responding).

Example

use std::time::Duration;
use kafka::client::{KafkaClient, FetchPartition};

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
client.set_fetch_max_wait_time(Duration::from_millis(100));
client.set_fetch_min_bytes(64 * 1024);
let r = client.fetch_messages(&[FetchPartition::new("my-topic", 0, 0)]);

See also KafkaClient::set_fetch_max_wait_time(..) and KafkaClient::set_fetch_max_bytes_per_partition(..).

Retrieves the current KafkaClient::set_fetch_min_bytes setting.

Sets the default maximum number of bytes to obtain from a single kafka partition when fetching messages.

This basically determines the maximum message size this client will be able to fetch. If a topic partition contains a message larger than this specified number of bytes, the server will not deliver it.

Note that this setting is related to a single partition. The overall potential data size in a fetch messages response will thus be determined by the number of partitions in the fetch messages request times this "max bytes per partitions."

This client will use this setting by default for all queried partitions, however, fetch_messages does allow you to override this setting for a particular partition being queried.

See also KafkaClient::set_fetch_max_wait_time, KafkaClient::set_fetch_min_bytes, and KafkaClient::fetch_messages.

Retrieves the current KafkaClient::set_fetch_max_bytes_per_partition setting.

Specifies whether the to perform CRC validation on fetched messages.

This ensures detection of on-the-wire or on-disk corruption to fetched messages. This check adds some overhead, so it may be disabled in cases seeking extreme performance.

Retrieves the current KafkaClient::set_fetch_crc_validation setting.

Specifies the group offset storage to address when fetching or committing group offsets.

In addition to Zookeeper, Kafka 0.8.2 brokers or later offer a more performant (and scalable) way to manage group offset directly by itself. Note that the remote storages are separate and independent on each other. Hence, you typically want consistently hard-code your choice in your program.

Unless you have a 0.8.1 broker or want to participate in a group which is already based on Zookeeper, you generally want to choose GroupOffsetStorage::Kafka here.

See also KafkaClient::fetch_group_offsets and KafkaClient::commit_offsets.

Retrieves the current KafkaClient::set_group_offset_storage settings.

Specifies the time to wait before retrying a failed, repeatable operation against Kafka. This avoids retrying such operations in a tight loop.

Retrieves the current KafkaClient::set_retry_backoff_time setting.

Specifies the upper limit of retry attempts for failed, repeatable operations against kafka. This avoids retrying them forever.

Retrieves the current KafkaClient::set_retry_max_attempts setting.

Specifies the timeout after which idle connections will transparently be closed/re-established by KafkaClient.

To be effective this value must be smaller than the remote broker's connections.max.idle.ms setting.

Retrieves the current KafkaClient::set_connection_idle_timeout setting.

Provides a view onto the currently loaded metadata of known .

Examples

use kafka::client::KafkaClient;
use kafka::client::metadata::Broker;

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
for topic in client.topics() {
  for partition in topic.partitions() {
    println!("{} #{} => {}", topic.name(), partition.id(),
             partition.leader()
                      .map(Broker::host)
                      .unwrap_or("no-leader!"));
  }
}

Resets and loads metadata for all topics from the underlying brokers.

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();
for topic in client.topics().names() {
  println!("topic: {}", topic);
}

Returns the metadata for all loaded topics underlying this client.

Reloads metadata for a list of supplied topics.

Note: if any of the specified topics does not exist yet on the underlying brokers and these have the configuration for "auto create topics" enabled, the remote kafka instance will create the yet missing topics on the fly as a result of explicitely loading their metadata. This is in contrast to other methods of this KafkaClient which will silently filter out requests to not-yet-loaded/not-yet-known topics and, thus, not cause topics to be automatically created.

Examples

let mut client = kafka::client::KafkaClient::new(vec!("localhost:9092".to_owned()));
let _ = client.load_metadata(&["my-topic"]).unwrap();

Returns the metadata for all loaded topics underlying this client (this might be more topics than specified right to this method call.)

Clears metadata stored in the client. You must load metadata after this call if you want to use the client.

Fetch offsets for a list of topics

Examples

use kafka::client::KafkaClient;

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
let topics: Vec<String> = client.topics().names().map(ToOwned::to_owned).collect();
let offsets = client.fetch_offsets(&topics, kafka::client::FetchOffset::Latest).unwrap();

Returns a mapping of topic name to PartitionOffsets for each currently available partition of the corresponding topic.

Fetch offset for a single topic.

Examples

use kafka::client::{KafkaClient, FetchOffset};

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
let offsets = client.fetch_topic_offsets("my-topic", FetchOffset::Latest).unwrap();

Returns a vector of the offset data for each available partition. See also KafkaClient::fetch_offsets.

Fetch messages from Kafka (multiple topic, partitions).

It takes a vector specifying the topic partitions and their offsets as of which to fetch messages. Additionally, the default "max fetch size per partition" can be explicitely overriden if it is "defined" - this is, if max_bytes is greater than zero.

The result is exposed in a raw, complicated manner but allows for very efficient consumption possibilities. All of the data available through the returned fetch responses is bound to their lifetime as that data is merely a "view" into parts of the response structs. If you need to keep individual messages for a longer time than the whole fetch responses, you'll need to make a copy of the message data.

  • This method transparently uncompresses messages (while Kafka might sent them in compressed format.)

  • This method ensures to skip messages with a lower offset than requested (while Kafka might for efficiency reasons sent messages with a lower offset.)

Note: before using this method consider using kafka::consumer::Consumer instead which provides an easier to use API for the regular use-case of fetching messesage from Kafka.

Example

This example demonstrates iterating all fetched messages from two topic partitions. From one partition we allow Kafka to deliver to us the default number bytes as defined by KafkaClient::set_fetch_max_bytes_per_partition, from the other partition we allow Kafka to deliver up to 1MiB of messages.

use kafka::client::{KafkaClient, FetchPartition};

let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();
let reqs = &[FetchPartition::new("my-topic", 0, 0),
             FetchPartition::new("my-topic-2", 0, 0).with_max_bytes(1024*1024)];
let resps = client.fetch_messages(reqs).unwrap();
for resp in resps {
  for t in resp.topics() {
    for p in t.partitions() {
      match p.data() {
        &Err(ref e) => {
          println!("partition error: {}:{}: {}", t.topic(), p.partition(), e)
        }
        &Ok(ref data) => {
          println!("topic: {} / partition: {} / latest available message offset: {}",
                   t.topic(), p.partition(), data.highwatermark_offset());
          for msg in data.messages() {
            println!("topic: {} / partition: {} / message.offset: {} / message.len: {}",
                     t.topic(), p.partition(), msg.offset, msg.value.len());
          }
        }
      }
    }
  }
}

See also kafka::consumer. See also KafkaClient::set_fetch_max_bytes_per_partition.

Fetch messages from a single kafka partition.

See KafkaClient::fetch_messages.

Send a message to Kafka

required_acks - indicates how many acknowledgements the servers should receive before responding to the request

ack_timeout - provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in required_acks

input - the set of ProduceMessages to send

Note: Unlike the higher-level Producer API, this method will not automatically determine the partition to deliver the message to. It will strictly try to send the message to the specified partition.

Note: Trying to send messages to non-existing topics or non-existing partitions will result in an error.

Example

use std::time::Duration;
use kafka::client::{KafkaClient, ProduceMessage, RequiredAcks};

let mut client = KafkaClient::new(vec!("localhost:9092".to_owned()));
client.load_metadata_all().unwrap();
let req = vec![ProduceMessage::new("my-topic", 0, None, Some("a".as_bytes())),
               ProduceMessage::new("my-topic-2", 0, None, Some("b".as_bytes()))];
let resp = client.produce_messages(RequiredAcks::One, Duration::from_millis(100), req);
println!("{:?}", resp);

The return value will contain a vector of topic, partition, offset and error if any OR error:Error.

Commit offset for a topic partitions on behalf of a consumer group.

Examples

use kafka::client::{KafkaClient, CommitOffset};

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
client.commit_offsets("my-group",
    &[CommitOffset::new("my-topic", 0, 100),
      CommitOffset::new("my-topic", 1, 99)])
   .unwrap();

In this example, we commit the offset 100 for the topic partition "my-topic:0" and 99 for the topic partition "my-topic:1". Once successfully committed, these can then be retrieved using fetch_group_offsets even from another process or at much later point in time to resume comusing the topic partitions as of these offsets.

Commit offset of a particular topic partition on behalf of a consumer group.

Examples

use kafka::client::KafkaClient;

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
client.commit_offset("my-group", "my-topic", 0, 100).unwrap();

See also KafkaClient::commit_offsets.

Fetch offset for a specified list of topic partitions of a consumer group

Examples

use kafka::client::{KafkaClient, FetchGroupOffset};

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();

let offsets =
     client.fetch_group_offsets("my-group",
            &[FetchGroupOffset::new("my-topic", 0),
              FetchGroupOffset::new("my-topic", 1)])
            .unwrap();

See also KafkaClient::fetch_group_topic_offsets.

Fetch offset for all partitions of a particular topic of a consumer group

Examples

use kafka::client::KafkaClient;

let mut client = KafkaClient::new(vec!["localhost:9092".to_owned()]);
client.load_metadata_all().unwrap();
let offsets = client.fetch_group_topic_offsets("my-group", "my-topic").unwrap();

Trait Implementations

impl Debug for KafkaClient
[src]

Formats the value using the given formatter.