use super::*;
impl ConsumerRuntime {
pub(crate) async fn committed_offsets(
&mut self,
partitions: Vec<crate::types::TopicPartition>,
) -> ClientResult<Vec<TopicPartitionOffset>> {
let keys = partitions
.into_iter()
.map(TopicPartitionKey::from)
.map(|key| TopicPartitionKey::new(key.topic.trim().to_owned(), key.partition))
.filter(|key| !key.topic.is_empty())
.collect::<Vec<_>>();
let committed = self
.fetch_committed_offsets(&keys)
.await
.map_err(into_client_error)?;
let mut offsets = committed
.into_iter()
.filter_map(|(key, offset)| {
offset.map(|offset| TopicPartitionOffset::new(key.topic, key.partition, offset))
})
.collect::<Vec<_>>();
offsets.sort_by(|a, b| a.topic.cmp(&b.topic).then(a.partition.cmp(&b.partition)));
Ok(offsets)
}
pub(crate) async fn beginning_offsets(
&mut self,
partitions: Vec<crate::types::TopicPartition>,
) -> ClientResult<Vec<TopicPartitionOffset>> {
self.offsets_at_timestamp(partitions, LIST_OFFSETS_EARLIEST)
.await
}
pub(crate) async fn end_offsets(
&mut self,
partitions: Vec<crate::types::TopicPartition>,
) -> ClientResult<Vec<TopicPartitionOffset>> {
self.offsets_at_timestamp(partitions, LIST_OFFSETS_LATEST)
.await
}
pub(crate) async fn offsets_at_timestamp(
&mut self,
partitions: Vec<crate::types::TopicPartition>,
timestamp: i64,
) -> ClientResult<Vec<TopicPartitionOffset>> {
let keys = partitions
.into_iter()
.map(TopicPartitionKey::from)
.map(|key| TopicPartitionKey::new(key.topic.trim().to_owned(), key.partition))
.filter(|key| !key.topic.is_empty())
.collect::<Vec<_>>();
let offsets = self
.lookup_offsets_at_timestamp(&keys, timestamp)
.await
.map_err(into_client_error)?;
let mut results = offsets
.into_iter()
.map(|(key, offset)| TopicPartitionOffset::new(key.topic, key.partition, offset))
.collect::<Vec<_>>();
results.sort_by(|a, b| a.topic.cmp(&b.topic).then(a.partition.cmp(&b.partition)));
Ok(results)
}
pub(crate) async fn offsets_for_times(
&mut self,
partitions: Vec<TopicPartitionTimestamp>,
) -> ClientResult<Vec<TopicPartitionOffsetAndTimestamp>> {
let offsets = self
.lookup_offsets_for_timestamps(&partitions)
.await
.map_err(into_client_error)?;
let mut results = offsets
.into_iter()
.map(|(key, offset)| {
TopicPartitionOffsetAndTimestamp::new(
key.topic,
key.partition,
offset.offset,
offset.timestamp,
)
})
.collect::<Vec<_>>();
results.sort_by(|a, b| a.topic.cmp(&b.topic).then(a.partition.cmp(&b.partition)));
Ok(results)
}
pub(crate) async fn partitions_for(
&mut self,
topic: String,
) -> ClientResult<Vec<TopicPartitionInfo>> {
let topic = topic.trim().to_owned();
if topic.is_empty() {
return Err(ConsumerError::EmptySubscription.into());
}
self.refresh_metadata_for_topics(vec![topic.clone()])
.await?;
let mut partitions = self
.connections
.metadata
.partitions_for(&topic)
.unwrap_or_default()
.into_iter()
.map(|(partition, metadata)| TopicPartitionInfo {
partition,
leader_id: metadata.leader_id,
leader_epoch: metadata.leader_epoch,
replica_nodes: metadata.replica_nodes,
isr_nodes: metadata.isr_nodes,
offline_replicas: metadata.offline_replicas,
})
.collect::<Vec<_>>();
partitions.sort_by_key(|partition| partition.partition);
Ok(partitions)
}
pub(crate) async fn list_topics(&mut self) -> ClientResult<Vec<String>> {
refresh_metadata(MetadataRefresh {
bootstrap_servers: &self.config.bootstrap_servers,
client_id: &self.config.client_id,
request_timeout: self.config.request_timeout,
security_protocol: self.config.security_protocol,
tls: &self.config.tls,
sasl: &self.config.sasl,
tcp_connector: &self.config.tcp_connector,
metadata: &mut self.connections.metadata,
topics: &[],
})
.await?;
Ok(self.connections.metadata.topic_names())
}
pub(crate) async fn refresh_metadata_for_topics(
&mut self,
topics: Vec<String>,
) -> ClientResult<()> {
refresh_metadata(MetadataRefresh {
bootstrap_servers: &self.config.bootstrap_servers,
client_id: &self.config.client_id,
request_timeout: self.config.request_timeout,
security_protocol: self.config.security_protocol,
tls: &self.config.tls,
sasl: &self.config.sasl,
tcp_connector: &self.config.tcp_connector,
metadata: &mut self.connections.metadata,
topics: &topics,
})
.await?;
self.retain_valid_leader_connections();
Ok(())
}
pub(crate) async fn refresh_subscription_metadata(&mut self) -> AnyResult<()> {
let topics = self.desired_topics();
let needs_full_refresh = self.is_pattern_subscription()
|| self
.assignment_state
.pending_assignment
.as_ref()
.is_some_and(|assignment| !self.can_resolve_assignment(assignment));
if topics.is_empty() && !needs_full_refresh {
return Ok(());
}
debug!(
topics = ?topics,
topic_count = topics.len(),
full_refresh = needs_full_refresh,
"refreshing consumer metadata"
);
refresh_metadata(MetadataRefresh {
bootstrap_servers: &self.config.bootstrap_servers,
client_id: &self.config.client_id,
request_timeout: self.config.request_timeout,
security_protocol: self.config.security_protocol,
tls: &self.config.tls,
sasl: &self.config.sasl,
tcp_connector: &self.config.tcp_connector,
metadata: &mut self.connections.metadata,
topics: if needs_full_refresh { &[] } else { &topics },
})
.await?;
self.retain_valid_leader_connections();
debug!("consumer metadata refresh completed");
Ok(())
}
pub(crate) fn retain_valid_leader_connections(&mut self) {
self.connections
.leader_connections
.retain(|broker_id, _| self.connections.metadata.contains_broker(*broker_id));
}
pub(crate) async fn fetch_initial_offsets(
&mut self,
assignments: &[TopicPartitionKey],
) -> AnyResult<HashMap<TopicPartitionKey, i64>> {
let mut resolved = HashMap::new();
if assignments.is_empty() {
return Ok(resolved);
}
let committed = self.fetch_committed_offsets(assignments).await?;
let mut need_reset = Vec::new();
for key in assignments {
match committed.get(key).copied().flatten() {
Some(offset) if offset >= 0 => {
resolved.insert(key.clone(), offset);
}
_ => need_reset.push(key.clone()),
}
}
if !need_reset.is_empty() {
let reset_offsets = self
.lookup_offsets_at_timestamp(
&need_reset,
match self.config.auto_offset_reset {
AutoOffsetReset::Earliest => LIST_OFFSETS_EARLIEST,
AutoOffsetReset::Latest => LIST_OFFSETS_LATEST,
},
)
.await?;
resolved.extend(reset_offsets);
}
Ok(resolved)
}
pub(crate) async fn fetch_initial_offsets_with_retry(
&mut self,
assignments: &[TopicPartitionKey],
) -> AnyResult<HashMap<TopicPartitionKey, i64>> {
let deadline = Instant::now() + self.config.request_timeout;
loop {
match self.fetch_initial_offsets(assignments).await {
Ok(offsets) => return Ok(offsets),
Err(error) if is_retriable_error(&error) && Instant::now() < deadline => {
sleep(self.config.retry_backoff).await;
}
Err(error) => return Err(error),
}
}
}
pub(crate) async fn fetch_committed_offsets(
&mut self,
assignments: &[TopicPartitionKey],
) -> AnyResult<HashMap<TopicPartitionKey, Option<i64>>> {
self.ensure_coordinator_connection().await?;
let version = self
.connections
.coordinator_connection
.as_ref()
.context("coordinator connection is missing")?
.version_with_cap::<OffsetFetchRequest>(OFFSET_FETCH_VERSION_CAP)?;
let topics = group_topic_partitions(
assignments
.iter()
.map(|key| (key.topic.clone(), key.partition))
.collect(),
);
let (member_id, member_epoch) = self.current_group_member();
let request = OffsetFetchRequest::default().with_groups(vec![
kafka_protocol::messages::offset_fetch_request::OffsetFetchRequestGroup::default()
.with_group_id(StrBytes::from_string(self.config.group_id.clone()).into())
.with_member_id(member_id)
.with_member_epoch(member_epoch)
.with_topics(Some(
topics
.into_iter()
.map(|(topic, partitions)| {
kafka_protocol::messages::offset_fetch_request::OffsetFetchRequestTopics::default()
.with_name(StrBytes::from_string(topic).into())
.with_partition_indexes(partitions)
})
.collect(),
)),
]);
let client_id = self.config.client_id.clone();
let response: OffsetFetchResponse = self
.connections
.coordinator_connection
.as_mut()
.context("coordinator connection is missing")?
.send_request::<OffsetFetchRequest>(&client_id, version, &request)
.await?;
let mut offsets = HashMap::new();
for group in response.groups {
if let Some(error) = group.error_code.err() {
if error.is_retriable() {
self.backoff_coordinator();
return Err(error.into());
}
return Err(broker_response_error(
"offset_fetch",
Some(self.config.group_id.clone()),
error,
)
.into());
}
for topic in group.topics {
let topic_name = if version >= 10 && !topic.topic_id.is_nil() {
self.connections
.metadata
.topic_name(&topic.topic_id)
.cloned()
.unwrap_or_else(|| topic.name.0.to_string())
} else {
topic.name.0.to_string()
};
for partition in topic.partitions {
if let Some(error) = partition.error_code.err() {
if error.is_retriable() {
self.backoff_coordinator();
return Err(error.into());
}
return Err(broker_response_error(
"offset_fetch",
Some(format!(
"{}:{} in group {}",
topic_name, partition.partition_index, self.config.group_id
)),
error,
)
.into());
}
offsets.insert(
TopicPartitionKey::new(topic_name.clone(), partition.partition_index),
if partition.committed_offset >= 0 {
Some(partition.committed_offset)
} else {
None
},
);
}
}
}
Ok(offsets)
}
pub(crate) async fn lookup_offsets_at_timestamp(
&mut self,
partitions: &[TopicPartitionKey],
timestamp: i64,
) -> AnyResult<HashMap<TopicPartitionKey, i64>> {
let queries = partitions
.iter()
.map(|key| TopicPartitionTimestamp::new(key.topic.clone(), key.partition, timestamp))
.collect::<Vec<_>>();
let offsets = self.lookup_offsets_for_timestamps(&queries).await?;
Ok(offsets
.into_iter()
.map(|(key, offset)| (key, offset.offset))
.collect())
}
pub(crate) async fn lookup_offsets_for_timestamps(
&mut self,
partitions: &[TopicPartitionTimestamp],
) -> AnyResult<HashMap<TopicPartitionKey, crate::types::OffsetAndTimestamp>> {
if partitions.is_empty() {
return Ok(HashMap::new());
}
let topics = partitions
.iter()
.map(|partition| partition.topic.clone())
.collect::<std::collections::BTreeSet<_>>()
.into_iter()
.collect::<Vec<_>>();
self.refresh_metadata_for_topics(topics).await?;
let mut by_leader = HashMap::<i32, Vec<TopicPartitionKey>>::new();
let mut timestamps = HashMap::<TopicPartitionKey, i64>::new();
for partition in partitions {
let key = TopicPartitionKey::new(partition.topic.clone(), partition.partition);
let leader_id = self
.connections
.metadata
.leader_for(&key.topic, key.partition)
.with_context(|| format!("missing leader for {}:{}", key.topic, key.partition))?;
by_leader.entry(leader_id).or_default().push(key.clone());
timestamps.insert(key, partition.timestamp);
}
let mut offsets = HashMap::new();
for (leader_id, keys) in by_leader {
let client_id = self.config.client_id.clone();
let version = {
let connection = self.leader_connection(leader_id).await?;
connection.version_with_cap::<ListOffsetsRequest>(LIST_OFFSETS_VERSION_CAP)?
};
let request = build_list_offsets_request(
keys.clone(),
&self.connections.metadata,
version,
*timestamps
.get(&keys[0])
.context("list offsets request had no timestamp")?,
self.config.isolation_level.as_protocol_value(),
)?;
let request = if keys
.iter()
.all(|key| timestamps.get(key) == timestamps.get(&keys[0]))
{
request
} else {
let leader_timestamps = keys
.iter()
.map(|key| {
Ok((
key.clone(),
*timestamps
.get(key)
.context("list offsets request had no timestamp")?,
))
})
.collect::<AnyResult<HashMap<_, _>>>()?;
build_list_offsets_request_with_timestamps(
&self.connections.metadata,
version,
&leader_timestamps,
self.config.isolation_level.as_protocol_value(),
)?
};
let connection = self.leader_connection(leader_id).await?;
let response: ListOffsetsResponse = connection
.send_request::<ListOffsetsRequest>(&client_id, version, &request)
.await?;
for topic in response.topics {
let topic_name = topic.name.0.to_string();
for partition in topic.partitions {
let key = TopicPartitionKey::new(topic_name.clone(), partition.partition_index);
let requested_timestamp = *timestamps
.get(&key)
.with_context(|| format!("missing list offset timestamp for {key:?}"))?;
if let Some(error) = partition.error_code.err() {
if error == ResponseError::UnsupportedForMessageFormat
&& requested_timestamp >= 0
{
continue;
}
if error.is_retriable() {
self.connections.metadata.invalidate_topic(&topic_name);
return Err(error.into());
}
return Err(broker_response_error(
"list_offsets",
Some(format!("{}:{}", topic_name, partition.partition_index)),
error,
)
.into());
}
if partition.offset < 0 {
if requested_timestamp >= 0 {
continue;
}
bail!(
"list_offsets returned unknown offset for {}:{}",
topic_name,
partition.partition_index
);
}
offsets.insert(
key,
crate::types::OffsetAndTimestamp {
offset: partition.offset,
timestamp: partition.timestamp,
},
);
}
}
}
Ok(offsets)
}
pub(crate) async fn leader_connection(
&mut self,
broker_id: i32,
) -> AnyResult<&mut BrokerConnection> {
if !self.connections.leader_connections.contains_key(&broker_id) {
let broker = self
.connections
.metadata
.broker(broker_id)
.cloned()
.with_context(|| format!("broker {broker_id} is missing from metadata"))?;
let connection = BrokerConnection::connect_with_transport(
&broker.address(),
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
&self.config.tcp_connector,
)
.await?;
self.connections
.leader_connections
.insert(broker_id, connection);
trace!(broker_id, "opened consumer leader connection");
}
self.connections
.leader_connections
.get_mut(&broker_id)
.with_context(|| format!("leader connection {broker_id} was not created"))
}
}