use std::collections::{BTreeMap, HashMap};
use anyhow::{Context, Result};
use kafka_protocol::error::{ParseResponseErrorCode, ResponseError};
use kafka_protocol::messages::fetch_request::{FetchPartition, FetchTopic};
use kafka_protocol::messages::find_coordinator_response::FindCoordinatorResponse;
use kafka_protocol::messages::list_offsets_request::{ListOffsetsPartition, ListOffsetsTopic};
use kafka_protocol::messages::offset_commit_request::{
OffsetCommitRequestPartition, OffsetCommitRequestTopic,
};
use kafka_protocol::messages::{ListOffsetsRequest, OffsetCommitRequest};
use kafka_protocol::protocol::StrBytes;
use uuid::Uuid;
use crate::config::ConsumerConfig;
use crate::metadata::{BrokerAddress, MetadataCache};
use crate::network::duration_to_i32_ms;
use crate::types::{AssignedPartition, CommitOffset, TopicPartitionKey};
use crate::{BrokerError, Error};
pub enum CoordinatorLookupResult {
Found(BrokerAddress),
Retry(ResponseError),
}
pub fn build_offset_commit_request(
group_id: &str,
member_id: Option<&str>,
member_epoch: i32,
group_instance_id: Option<&str>,
offsets: &[CommitOffset],
assignment: &HashMap<TopicPartitionKey, AssignedPartition>,
) -> Result<OffsetCommitRequest> {
let mut topics = BTreeMap::<String, Vec<OffsetCommitRequestPartition>>::new();
for offset in offsets {
let leader_epoch = assignment
.get(&TopicPartitionKey::new(
offset.topic.clone(),
offset.partition,
))
.map(|assigned| assigned.leader_epoch)
.unwrap_or(-1);
topics.entry(offset.topic.clone()).or_default().push(
OffsetCommitRequestPartition::default()
.with_partition_index(offset.partition)
.with_committed_offset(offset.offset)
.with_committed_leader_epoch(leader_epoch)
.with_committed_metadata(None),
);
}
Ok(OffsetCommitRequest::default()
.with_group_id(StrBytes::from_string(group_id.to_owned()).into())
.with_generation_id_or_member_epoch(member_epoch)
.with_member_id(StrBytes::from_string(
member_id.unwrap_or_default().to_owned(),
))
.with_group_instance_id(
group_instance_id.map(|value| StrBytes::from_string(value.to_owned())),
)
.with_topics(
topics
.into_iter()
.map(|(topic, partitions)| {
OffsetCommitRequestTopic::default()
.with_name(StrBytes::from_string(topic).into())
.with_partitions(partitions)
})
.collect(),
))
}
pub fn build_list_offsets_request(
partitions: Vec<TopicPartitionKey>,
metadata: &MetadataCache,
version: i16,
timestamp: i64,
isolation_level: i8,
) -> Result<ListOffsetsRequest> {
let timestamps = partitions
.into_iter()
.map(|key| (key, timestamp))
.collect::<HashMap<_, _>>();
build_list_offsets_request_with_timestamps(metadata, version, ×tamps, isolation_level)
}
pub fn build_list_offsets_request_with_timestamps(
metadata: &MetadataCache,
version: i16,
timestamps: &HashMap<TopicPartitionKey, i64>,
isolation_level: i8,
) -> Result<ListOffsetsRequest> {
let mut topics = BTreeMap::<String, Vec<ListOffsetsPartition>>::new();
for (key, timestamp) in timestamps {
let leader_epoch = metadata
.partition(&key.topic, key.partition)
.map(|partition| partition.leader_epoch)
.unwrap_or(-1);
topics.entry(key.topic.clone()).or_default().push(
ListOffsetsPartition::default()
.with_partition_index(key.partition)
.with_current_leader_epoch(leader_epoch)
.with_timestamp(*timestamp),
);
}
let mut request = ListOffsetsRequest::default()
.with_replica_id((-1).into())
.with_isolation_level(isolation_level)
.with_topics(
topics
.into_iter()
.map(|(topic, partitions)| {
ListOffsetsTopic::default()
.with_name(StrBytes::from_string(topic).into())
.with_partitions(partitions)
})
.collect(),
);
if version >= 10 {
request = request.with_timeout_ms(0);
}
Ok(request)
}
pub fn build_fetch_request(
partitions: &[AssignedPartition],
version: i16,
config: &ConsumerConfig,
) -> Result<kafka_protocol::messages::FetchRequest> {
let mut topics = BTreeMap::<(String, Uuid), Vec<FetchPartition>>::new();
for assigned in partitions {
topics
.entry((assigned.key.topic.clone(), assigned.topic_id))
.or_default()
.push(
FetchPartition::default()
.with_partition(assigned.key.partition)
.with_current_leader_epoch(assigned.leader_epoch)
.with_fetch_offset(assigned.fetch_offset)
.with_last_fetched_epoch(-1)
.with_log_start_offset(-1)
.with_partition_max_bytes(config.partition_max_bytes),
);
}
let topics = topics
.into_iter()
.map(|((topic, topic_id), partitions)| {
let mut fetch_topic = FetchTopic::default().with_partitions(partitions);
if version >= 13 {
fetch_topic = fetch_topic.with_topic_id(topic_id);
} else {
fetch_topic = fetch_topic.with_topic(StrBytes::from_string(topic).into());
}
fetch_topic
})
.collect();
Ok(kafka_protocol::messages::FetchRequest::default()
.with_replica_id((-1).into())
.with_max_wait_ms(duration_to_i32_ms(config.fetch_max_wait)?)
.with_min_bytes(config.fetch_min_bytes)
.with_max_bytes(config.fetch_max_bytes)
.with_isolation_level(config.isolation_level.as_protocol_value())
.with_session_id(0)
.with_session_epoch(-1)
.with_topics(topics)
.with_forgotten_topics_data(Vec::new())
.with_rack_id(StrBytes::from_string(
config.rack_id.clone().unwrap_or_default(),
)))
}
pub fn assignment_snapshot_by_topic_id(
assignment: &HashMap<TopicPartitionKey, AssignedPartition>,
) -> BTreeMap<Uuid, Vec<i32>> {
let mut grouped = BTreeMap::<Uuid, Vec<i32>>::new();
for assigned in assignment.values() {
grouped
.entry(assigned.topic_id)
.or_default()
.push(assigned.key.partition);
}
for partitions in grouped.values_mut() {
partitions.sort_unstable();
}
grouped
}
pub fn group_topic_partitions(partitions: Vec<(String, i32)>) -> BTreeMap<String, Vec<i32>> {
let mut grouped = BTreeMap::<String, Vec<i32>>::new();
for (topic, partition) in partitions {
grouped.entry(topic).or_default().push(partition);
}
for partitions in grouped.values_mut() {
partitions.sort_unstable();
}
grouped
}
pub fn parse_find_coordinator_response(
response: FindCoordinatorResponse,
version: i16,
) -> Result<CoordinatorLookupResult> {
if version >= 4 {
let coordinator = response
.coordinators
.into_iter()
.next()
.context("find coordinator returned no coordinator entries")?;
if let Some(error) = coordinator.error_code.err() {
if error.is_retriable() {
return Ok(CoordinatorLookupResult::Retry(error));
}
return Err(Error::Broker(BrokerError::response(
"find_coordinator",
Some(coordinator.key.to_string()),
error,
))
.into());
}
return Ok(CoordinatorLookupResult::Found(BrokerAddress::new(
coordinator.host.to_string(),
u16::try_from(coordinator.port)
.with_context(|| format!("invalid coordinator port {}", coordinator.port))?,
)));
}
if let Some(error) = response.error_code.err() {
if error.is_retriable() {
return Ok(CoordinatorLookupResult::Retry(error));
}
return Err(Error::Broker(BrokerError::response(
"find_coordinator",
None::<String>,
error,
))
.into());
}
Ok(CoordinatorLookupResult::Found(BrokerAddress::new(
response.host.to_string(),
u16::try_from(response.port)
.with_context(|| format!("invalid coordinator port {}", response.port))?,
)))
}
pub fn is_retriable_error(error: &anyhow::Error) -> bool {
error
.downcast_ref::<ResponseError>()
.is_some_and(ResponseError::is_retriable)
}