Module kafka::consumer

source ·
Expand description

Kafka Consumer - A higher-level API for consuming kafka topics.

A consumer for Kafka topics on behalf of a specified group providing help in offset management. The consumer requires at least one topic for consumption and allows consuming multiple topics at the same time. Further, clients can restrict the consumer to only specific topic partitions as demonstrated in the following example.

Example

use kafka::consumer::{Consumer, FetchOffset, GroupOffsetStorage};

let mut consumer =
   Consumer::from_hosts(vec!("localhost:9092".to_owned()))
      .with_topic_partitions("my-topic".to_owned(), &[0, 1])
      .with_fallback_offset(FetchOffset::Earliest)
      .with_group("my-group".to_owned())
      .with_offset_storage(Some(GroupOffsetStorage::Kafka))
      .create()
      .unwrap();
loop {
  for ms in consumer.poll().unwrap().iter() {
    for m in ms.messages() {
      println!("{:?}", m);
    }
    consumer.consume_messageset(ms);
  }
  consumer.commit_consumed().unwrap();
}

Please refer to the documentation of the individual “with” methods used to set up the consumer. These contain further information or links to such.

A call to .poll() on the consumer will ask for the next available “chunk of data” for the client code to process. The returned data are MessageSets. There is at most one for each partition of the consumed topics. Individual messages are embedded in the retrieved messagesets and can be processed using the messages() iterator. Due to this embedding, an individual message’s lifetime is bound to the MessageSet it is part of. Typically, client code accesses the raw data/bytes, parses it into custom data types, and passes that along for further processing within the application. Although inconvenient, this helps in reducing the number of allocations within the pipeline of processing incoming messages.

If the consumer is configured for a non-empty group, it helps in keeping track of already consumed messages by maintaining a map of the consumed offsets. Messages can be told “consumed” either through consume_message or consume_messages methods. Once these consumed messages are committed to Kafka using commit_consumed, the consumer will start fetching messages from here even after restart. Since committing is a certain overhead, it is up to the client to decide the frequency of the commits. The consumer will not commit any messages to Kafka automatically.

The configuration of a group is optional. If the consumer has no group configured, it will behave as if it had one, only that committing consumed message offsets resolves into a void operation.

Re-exports

Structs

  • A Kafka Consumer builder easing the process of setting up various configuration settings.
  • The Kafka Consumer
  • A set of messages successfully retrieved from a specific topic partition.
  • Messages retrieved from kafka in one fetch request. This is a concatenation of blocks of messages successfully retrieved from the consumed topic partitions. Each such partitions is guaranteed to be present at most once in this structure.
  • An iterator over the consumed topic partition message sets.

Constants