kafkit-client 0.1.9

Kafka 4.0+ pure Rust client.
Documentation
use std::collections::{BTreeMap, HashMap};
use std::time::Instant;

use anyhow::Result as AnyResult;
use kafka_protocol::error::ParseResponseErrorCode;
use kafka_protocol::messages::ShareAcknowledgeRequest;
use kafka_protocol::messages::share_acknowledge_request::{AcknowledgePartition, AcknowledgeTopic};
use kafka_protocol::messages::share_acknowledge_response::ShareAcknowledgeResponse;
use kafka_protocol::protocol::StrBytes;
use tokio::time::sleep;
use uuid::Uuid;

use crate::constants::SHARE_ACKNOWLEDGE_VERSION_CAP;
use crate::telemetry;

use super::acknowledgements::{Acknowledgements, share_acknowledgement_commits};
use super::coordinator::is_transport_error;
use super::fetch::is_share_session_reset_error;
use super::types::{ShareAcknowledgementCommit, TopicIdPartitionKey};
use super::{KafkaShareConsumer, SHARE_SESSION_CLOSE_EPOCH, SHARE_SESSION_OPEN_EPOCH};

impl KafkaShareConsumer {
    pub(super) async fn send_acknowledge_to_leader(
        &mut self,
        leader_id: i32,
        acknowledgements: HashMap<TopicIdPartitionKey, Acknowledgements>,
        close_session: bool,
    ) -> AnyResult<()> {
        let acknowledgement_commits = share_acknowledgement_commits(&acknowledgements, None);
        let acknowledgement_count = acknowledgements
            .values()
            .map(|acks| acks.offsets.len())
            .sum::<usize>();
        let client_id = self.config.client_id.clone();
        let started_at = Instant::now();
        let mut last_error = None;

        for attempt in 0..self.config.max_retries.max(1) {
            let session_epoch = if close_session {
                SHARE_SESSION_CLOSE_EPOCH
            } else {
                *self
                    .share_session_epochs
                    .get(&leader_id)
                    .unwrap_or(&SHARE_SESSION_OPEN_EPOCH)
            };
            let request =
                self.build_share_acknowledge_request(acknowledgements.clone(), session_epoch);
            let result = async {
                let connection = self.leader_connection(leader_id).await?;
                let version = connection
                    .version_with_cap::<ShareAcknowledgeRequest>(SHARE_ACKNOWLEDGE_VERSION_CAP)?;
                connection
                    .send_request::<ShareAcknowledgeRequest>(&client_id, version, &request)
                    .await
            }
            .await
            .and_then(|response: ShareAcknowledgeResponse| {
                self.process_share_acknowledge_response(response)
            });

            match result {
                Ok(()) => {
                    telemetry::record_share_acknowledge_completed(
                        &self.config.client_id,
                        &self.config.group_id,
                        leader_id,
                        acknowledgement_count,
                        close_session,
                        started_at.elapsed(),
                        true,
                    );
                    if close_session {
                        self.share_session_epochs.remove(&leader_id);
                    } else {
                        self.advance_share_session_epoch(leader_id, session_epoch);
                    }
                    self.invoke_acknowledgement_commit_callback(acknowledgement_commits.clone());
                    return Ok(());
                }
                Err(error) => {
                    if is_transport_error(&error) || is_share_session_reset_error(&error) {
                        self.drop_leader_connection(leader_id);
                    }
                    last_error = Some(error);
                    if attempt + 1 < self.config.max_retries.max(1) {
                        sleep(self.config.retry_backoff).await;
                    }
                }
            }
        }

        let error = last_error.expect("share acknowledge attempt count is at least one");
        self.invoke_acknowledgement_commit_callback(share_acknowledgement_commits(
            &acknowledgements,
            Some(error.to_string()),
        ));
        telemetry::record_share_acknowledge_completed(
            &self.config.client_id,
            &self.config.group_id,
            leader_id,
            acknowledgement_count,
            close_session,
            started_at.elapsed(),
            false,
        );
        if !close_session {
            self.restore_acknowledgements(acknowledgements);
        }
        Err(error)
    }

    pub(super) fn advance_share_session_epoch(&mut self, leader_id: i32, previous_epoch: i32) {
        if previous_epoch == SHARE_SESSION_CLOSE_EPOCH {
            self.share_session_epochs.remove(&leader_id);
        } else {
            self.share_session_epochs
                .insert(leader_id, previous_epoch + 1);
        }
    }

    pub(super) fn invoke_acknowledgement_commit_callback(
        &self,
        commits: Vec<ShareAcknowledgementCommit>,
    ) {
        if commits.is_empty() {
            return;
        }
        if let Some(callback) = &self.acknowledgement_commit_callback {
            callback(commits);
        }
    }

    fn build_share_acknowledge_request(
        &self,
        acknowledgements: HashMap<TopicIdPartitionKey, Acknowledgements>,
        session_epoch: i32,
    ) -> ShareAcknowledgeRequest {
        let mut topics = BTreeMap::<Uuid, Vec<AcknowledgePartition>>::new();
        for (key, acks) in acknowledgements {
            topics.entry(key.topic_id).or_default().push(
                AcknowledgePartition::default()
                    .with_partition_index(key.partition)
                    .with_acknowledgement_batches(acks.into_share_acknowledge_batches()),
            );
        }

        ShareAcknowledgeRequest::default()
            .with_group_id(Some(
                StrBytes::from_string(self.config.group_id.clone()).into(),
            ))
            .with_member_id(Some(StrBytes::from_string(self.member_id.clone())))
            .with_share_session_epoch(session_epoch)
            .with_topics(
                topics
                    .into_iter()
                    .map(|(topic_id, partitions)| {
                        AcknowledgeTopic::default()
                            .with_topic_id(topic_id)
                            .with_partitions(partitions)
                    })
                    .collect(),
            )
    }

    fn process_share_acknowledge_response(
        &mut self,
        response: ShareAcknowledgeResponse,
    ) -> AnyResult<()> {
        if let Some(error) = response.error_code.err() {
            return Err(error.into());
        }
        for topic in response.responses {
            for partition in topic.partitions {
                if let Some(error) = partition.error_code.err() {
                    return Err(error.into());
                }
            }
        }
        Ok(())
    }
}