kafka_protocol

Module records

source
Expand description

Provides utilities for working with records (Kafka messages).

FetchResponse and associated APIs for interacting with reading and writing contain records in a raw format, allowing the user to implement their own logic for interacting with those values.

§Example

Decoding a set of records from a FetchResponse:

use kafka_protocol::messages::FetchResponse;
use kafka_protocol::protocol::Decodable;
use kafka_protocol::records::RecordBatchDecoder;
use bytes::Bytes;
use kafka_protocol::records::Compression;


let res = FetchResponse::decode(&mut buf, 4).unwrap();

for topic in res.responses {
    for partition in topic.partitions {
         let mut records = partition.records.unwrap();
         let records = RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data)).unwrap();
    }
}

fn decompress_record_batch_data(compressed_buffer: &mut bytes::Bytes, compression: Compression) -> anyhow::Result<Bytes> {
        match compression {
            Compression::None => Ok(compressed_buffer.to_vec().into()),
            _ => { panic!("Compression not implemented") }
        }
 }

Structs§

  • A Kafka message containing key, payload value, and all associated metadata.
  • Batch decoder for Kafka records.
  • Batch encoder for Kafka records.
  • Options for encoding and compressing a batch of records. Note, not all compression algorithms are currently implemented by this library.

Enums§

  • The different types of compression supported by Kafka.
  • Indicates the meaning of the timestamp field on a record.

Constants§