use super::*;
impl ConsumerRuntime {
pub(crate) async fn drain_commit_request(&mut self) -> AnyResult<()> {
let Some(pending) = self.lifecycle.pending_commits.pop_front() else {
return Ok(());
};
let result = self.commit_offsets_inner(pending.offsets).await;
match result {
Ok(()) => {
if pending.kind == CommitKind::Auto {
self.lifecycle.last_auto_commit = Instant::now();
}
if let Some(reply) = pending.reply {
let _ = reply.send(Ok(()));
}
Ok(())
}
Err(error) => {
let client_error = into_client_error(error);
let error_text = format!("{client_error:#}");
if let Some(reply) = pending.reply {
let _ = reply.send(Err(client_error));
}
Err(anyhow!(error_text))
}
}
}
pub(crate) async fn commit_offsets_inner(
&mut self,
offsets: Vec<CommitOffset>,
) -> AnyResult<()> {
if offsets.is_empty() {
return Ok(());
}
debug!(offset_count = offsets.len(), "committing consumer offsets");
self.ensure_coordinator_connection().await?;
let client_id = self.config.client_id.clone();
let version = self
.connections
.coordinator_connection
.as_ref()
.context("coordinator connection is missing")?
.version_with_cap::<OffsetCommitRequest>(OFFSET_COMMIT_VERSION_CAP)?;
let (member_id, member_epoch) = self.current_group_member();
let commit_member_id = member_id.as_ref().map(ToString::to_string);
let request = build_offset_commit_request(
&self.config.group_id,
commit_member_id.as_deref(),
member_epoch,
if member_epoch > 0 {
self.config.instance_id.as_deref()
} else {
None
},
&offsets,
&self.assignment_state.assignment,
)?;
let response: OffsetCommitResponse = self
.connections
.coordinator_connection
.as_mut()
.context("coordinator connection is missing")?
.send_request::<OffsetCommitRequest>(&client_id, version, &request)
.await?;
for topic in response.topics {
let topic_name = topic.name.0.to_string();
for partition in topic.partitions {
if let Some(error) = partition.error_code.err() {
if error.is_retriable() {
self.backoff_coordinator();
return Err(error.into());
}
bail!(
"offset commit failed for {}:{}: {}",
topic_name,
partition.partition_index,
error
);
}
}
}
for offset in offsets {
self.poll_state.delivered_offsets.insert(
TopicPartitionKey::new(offset.topic, offset.partition),
offset.offset,
);
}
debug!("consumer offset commit completed");
Ok(())
}
pub(crate) fn fail_pending_commits(&mut self, message: &str) {
while let Some(pending) = self.lifecycle.pending_commits.pop_front() {
if let Some(reply) = pending.reply {
let _ = reply.send(Err(Error::Internal(anyhow!(message.to_owned()))));
}
}
}
}