use kafka_protocol::messages::{ApiKey, BrokerId, ListOffsetsRequest, ListOffsetsResponse, RequestHeader, TopicName};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic;
use kafka_protocol::messages::list_offsets_request::ListOffsetsPartition;
use super::API_VERSION_LIST_OFFSETS;
use crate::protocol::OffsetResponse as OurOffsetResponse;
pub fn build_list_offsets_request(
correlation_id: i32,
client_id: &str,
partitions: &[(&str, i32, i64)],
) -> (RequestHeader, ListOffsetsRequest) {
let header = RequestHeader::default()
.with_client_id(Some(StrBytes::from_string(client_id.to_owned())))
.with_request_api_key(ApiKey::ListOffsets as i16)
.with_request_api_version(API_VERSION_LIST_OFFSETS)
.with_correlation_id(correlation_id);
let mut topic_map: std::collections::HashMap<&str, Vec<ListOffsetsPartition>> =
std::collections::HashMap::new();
for (topic, partition, timestamp) in partitions {
topic_map.entry(topic).or_default().push(
ListOffsetsPartition::default()
.with_partition_index(*partition)
.with_timestamp(*timestamp),
);
}
let topics: Vec<ListOffsetsTopic> = topic_map
.into_iter()
.map(|(name, partitions)| {
ListOffsetsTopic::default()
.with_name(TopicName::from(StrBytes::from_string(name.to_string())))
.with_partitions(partitions)
})
.collect();
let request = ListOffsetsRequest::default()
.with_replica_id(BrokerId::from(-1))
.with_isolation_level(0)
.with_topics(topics);
(header, request)
}
pub fn convert_list_offsets_response(
kp_resp: ListOffsetsResponse,
correlation_id: i32,
) -> OurOffsetResponse {
OurOffsetResponse {
header: crate::protocol::HeaderResponse { correlation: correlation_id },
topic_partitions: kp_resp
.topics
.into_iter()
.map(|t| crate::protocol::TopicPartitionOffsetResponse {
topic: t.name.to_string(),
partitions: t
.partitions
.into_iter()
.map(|p| crate::protocol::PartitionOffsetResponse {
partition: p.partition_index,
error: p.error_code,
offset: vec![p.offset],
})
.collect(),
})
.collect(),
}
}