use std::collections::{BTreeMap, HashMap};
use std::time::Instant;
use anyhow::Result as AnyResult;
use kafka_protocol::error::ParseResponseErrorCode;
use kafka_protocol::messages::ShareAcknowledgeRequest;
use kafka_protocol::messages::share_acknowledge_request::{AcknowledgePartition, AcknowledgeTopic};
use kafka_protocol::messages::share_acknowledge_response::ShareAcknowledgeResponse;
use kafka_protocol::protocol::StrBytes;
use tokio::time::sleep;
use uuid::Uuid;
use crate::constants::SHARE_ACKNOWLEDGE_VERSION_CAP;
use crate::telemetry;
use super::acknowledgements::{Acknowledgements, share_acknowledgement_commits};
use super::coordinator::is_transport_error;
use super::fetch::is_share_session_reset_error;
use super::types::{ShareAcknowledgementCommit, TopicIdPartitionKey};
use super::{KafkaShareConsumer, SHARE_SESSION_CLOSE_EPOCH, SHARE_SESSION_OPEN_EPOCH};
impl KafkaShareConsumer {
pub(super) async fn send_acknowledge_to_leader(
&mut self,
leader_id: i32,
acknowledgements: HashMap<TopicIdPartitionKey, Acknowledgements>,
close_session: bool,
) -> AnyResult<()> {
let acknowledgement_commits = share_acknowledgement_commits(&acknowledgements, None);
let acknowledgement_count = acknowledgements
.values()
.map(|acks| acks.offsets.len())
.sum::<usize>();
let client_id = self.config.client_id.clone();
let started_at = Instant::now();
let mut last_error = None;
for attempt in 0..self.config.max_retries.max(1) {
let session_epoch = if close_session {
SHARE_SESSION_CLOSE_EPOCH
} else {
*self
.share_session_epochs
.get(&leader_id)
.unwrap_or(&SHARE_SESSION_OPEN_EPOCH)
};
let request =
self.build_share_acknowledge_request(acknowledgements.clone(), session_epoch);
let result = async {
let connection = self.leader_connection(leader_id).await?;
let version = connection
.version_with_cap::<ShareAcknowledgeRequest>(SHARE_ACKNOWLEDGE_VERSION_CAP)?;
connection
.send_request::<ShareAcknowledgeRequest>(&client_id, version, &request)
.await
}
.await
.and_then(|response: ShareAcknowledgeResponse| {
self.process_share_acknowledge_response(response)
});
match result {
Ok(()) => {
telemetry::record_share_acknowledge_completed(
&self.config.client_id,
&self.config.group_id,
leader_id,
acknowledgement_count,
close_session,
started_at.elapsed(),
true,
);
if close_session {
self.share_session_epochs.remove(&leader_id);
} else {
self.advance_share_session_epoch(leader_id, session_epoch);
}
self.invoke_acknowledgement_commit_callback(acknowledgement_commits.clone());
return Ok(());
}
Err(error) => {
if is_transport_error(&error) || is_share_session_reset_error(&error) {
self.drop_leader_connection(leader_id);
}
last_error = Some(error);
if attempt + 1 < self.config.max_retries.max(1) {
sleep(self.config.retry_backoff).await;
}
}
}
}
let error = last_error.expect("share acknowledge attempt count is at least one");
self.invoke_acknowledgement_commit_callback(share_acknowledgement_commits(
&acknowledgements,
Some(error.to_string()),
));
telemetry::record_share_acknowledge_completed(
&self.config.client_id,
&self.config.group_id,
leader_id,
acknowledgement_count,
close_session,
started_at.elapsed(),
false,
);
if !close_session {
self.restore_acknowledgements(acknowledgements);
}
Err(error)
}
pub(super) fn advance_share_session_epoch(&mut self, leader_id: i32, previous_epoch: i32) {
if previous_epoch == SHARE_SESSION_CLOSE_EPOCH {
self.share_session_epochs.remove(&leader_id);
} else {
self.share_session_epochs
.insert(leader_id, previous_epoch + 1);
}
}
pub(super) fn invoke_acknowledgement_commit_callback(
&self,
commits: Vec<ShareAcknowledgementCommit>,
) {
if commits.is_empty() {
return;
}
if let Some(callback) = &self.acknowledgement_commit_callback {
callback(commits);
}
}
fn build_share_acknowledge_request(
&self,
acknowledgements: HashMap<TopicIdPartitionKey, Acknowledgements>,
session_epoch: i32,
) -> ShareAcknowledgeRequest {
let mut topics = BTreeMap::<Uuid, Vec<AcknowledgePartition>>::new();
for (key, acks) in acknowledgements {
topics.entry(key.topic_id).or_default().push(
AcknowledgePartition::default()
.with_partition_index(key.partition)
.with_acknowledgement_batches(acks.into_share_acknowledge_batches()),
);
}
ShareAcknowledgeRequest::default()
.with_group_id(Some(
StrBytes::from_string(self.config.group_id.clone()).into(),
))
.with_member_id(Some(StrBytes::from_string(self.member_id.clone())))
.with_share_session_epoch(session_epoch)
.with_topics(
topics
.into_iter()
.map(|(topic_id, partitions)| {
AcknowledgeTopic::default()
.with_topic_id(topic_id)
.with_partitions(partitions)
})
.collect(),
)
}
fn process_share_acknowledge_response(
&mut self,
response: ShareAcknowledgeResponse,
) -> AnyResult<()> {
if let Some(error) = response.error_code.err() {
return Err(error.into());
}
for topic in response.responses {
for partition in topic.partitions {
if let Some(error) = partition.error_code.err() {
return Err(error.into());
}
}
}
Ok(())
}
}