rustfs-kafka 0.21.0

Rust client for Apache Kafka
Documentation
use kafka_protocol::messages::{ApiKey, BrokerId, ListOffsetsRequest, ListOffsetsResponse, RequestHeader, TopicName};
use kafka_protocol::protocol::StrBytes;

// Re-exports of sub-types from kafka_protocol for convenience
use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic;
use kafka_protocol::messages::list_offsets_request::ListOffsetsPartition;

use super::API_VERSION_LIST_OFFSETS;
use crate::protocol::OffsetResponse as OurOffsetResponse;

pub fn build_list_offsets_request(
    correlation_id: i32,
    client_id: &str,
    partitions: &[(&str, i32, i64)],
) -> (RequestHeader, ListOffsetsRequest) {
    let header = RequestHeader::default()
        .with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
        .with_request_api_key(ApiKey::ListOffsets as i16)
        .with_request_api_version(API_VERSION_LIST_OFFSETS)
        .with_correlation_id(correlation_id);

    let mut topic_map: std::collections::HashMap<&str, Vec<ListOffsetsPartition>> =
        std::collections::HashMap::new();

    for (topic, partition, timestamp) in partitions {
        topic_map.entry(topic).or_default().push(
            ListOffsetsPartition::default()
                .with_partition_index(*partition)
                .with_timestamp(*timestamp),
        );
    }

    let topics: Vec<ListOffsetsTopic> = topic_map
        .into_iter()
        .map(|(name, partitions)| {
            ListOffsetsTopic::default()
                .with_name(TopicName::from(StrBytes::from_string(name.to_string())))
                .with_partitions(partitions)
        })
        .collect();

    let request = ListOffsetsRequest::default()
        .with_replica_id(BrokerId::from(-1))
        .with_isolation_level(0)
        .with_topics(topics);

    (header, request)
}

pub fn convert_list_offsets_response(
    kp_resp: ListOffsetsResponse,
    correlation_id: i32,
) -> OurOffsetResponse {
    OurOffsetResponse {
        header: crate::protocol::HeaderResponse { correlation: correlation_id },
        topic_partitions: kp_resp
            .topics
            .into_iter()
            .map(|t| crate::protocol::TopicPartitionOffsetResponse {
                topic: t.name.to_string(),
                partitions: t
                    .partitions
                    .into_iter()
                    .map(|p| crate::protocol::PartitionOffsetResponse {
                        partition: p.partition_index,
                        error: p.error_code,
                        offset: vec![p.offset],
                    })
                    .collect(),
            })
            .collect(),
    }
}