kafkit-client 0.1.2

Kafka 4.0+ pure Rust client.
Documentation
use std::collections::{BTreeMap, HashMap};

use anyhow::{Result as AnyResult, bail};
use kafka_protocol::error::ParseResponseErrorCode;
use kafka_protocol::messages::ShareAcknowledgeRequest;
use kafka_protocol::messages::share_acknowledge_request::{AcknowledgePartition, AcknowledgeTopic};
use kafka_protocol::messages::share_acknowledge_response::ShareAcknowledgeResponse;
use kafka_protocol::protocol::StrBytes;
use tokio::time::sleep;
use uuid::Uuid;

use crate::constants::SHARE_ACKNOWLEDGE_VERSION_CAP;

use super::acknowledgements::{Acknowledgements, share_acknowledgement_commits};
use super::types::{ShareAcknowledgementCommit, TopicIdPartitionKey};
use super::{KafkaShareConsumer, SHARE_SESSION_CLOSE_EPOCH, SHARE_SESSION_OPEN_EPOCH};

impl KafkaShareConsumer {
    pub(super) async fn send_acknowledge_to_leader(
        &mut self,
        leader_id: i32,
        acknowledgements: HashMap<TopicIdPartitionKey, Acknowledgements>,
        close_session: bool,
    ) -> AnyResult<()> {
        let version = self
            .leader_connection(leader_id)
            .await?
            .version_with_cap::<ShareAcknowledgeRequest>(SHARE_ACKNOWLEDGE_VERSION_CAP)?;
        let session_epoch = if close_session {
            SHARE_SESSION_CLOSE_EPOCH
        } else {
            *self
                .share_session_epochs
                .get(&leader_id)
                .unwrap_or(&SHARE_SESSION_OPEN_EPOCH)
        };
        let acknowledgement_commits = share_acknowledgement_commits(&acknowledgements, None);
        let request = self.build_share_acknowledge_request(acknowledgements.clone(), session_epoch);
        let client_id = self.config.client_id.clone();
        let mut last_error = None;

        for attempt in 0..self.config.max_retries.max(1) {
            let result = self
                .leader_connection(leader_id)
                .await?
                .send_request::<ShareAcknowledgeRequest>(&client_id, version, &request)
                .await
                .and_then(|response: ShareAcknowledgeResponse| {
                    self.process_share_acknowledge_response(response)
                });

            match result {
                Ok(()) => {
                    if close_session {
                        self.share_session_epochs.remove(&leader_id);
                    } else {
                        self.advance_share_session_epoch(leader_id, session_epoch);
                    }
                    self.invoke_acknowledgement_commit_callback(acknowledgement_commits.clone());
                    return Ok(());
                }
                Err(error) => {
                    last_error = Some(error);
                    if attempt + 1 < self.config.max_retries.max(1) {
                        sleep(self.config.retry_backoff).await;
                    }
                }
            }
        }

        let error = last_error.expect("share acknowledge attempt count is at least one");
        self.invoke_acknowledgement_commit_callback(share_acknowledgement_commits(
            &acknowledgements,
            Some(error.to_string()),
        ));
        if !close_session {
            self.restore_acknowledgements(acknowledgements);
        }
        Err(error)
    }

    pub(super) fn advance_share_session_epoch(&mut self, leader_id: i32, previous_epoch: i32) {
        if previous_epoch == SHARE_SESSION_CLOSE_EPOCH {
            self.share_session_epochs.remove(&leader_id);
        } else {
            self.share_session_epochs
                .insert(leader_id, previous_epoch + 1);
        }
    }

    pub(super) fn invoke_acknowledgement_commit_callback(
        &self,
        commits: Vec<ShareAcknowledgementCommit>,
    ) {
        if commits.is_empty() {
            return;
        }
        if let Some(callback) = &self.acknowledgement_commit_callback {
            callback(commits);
        }
    }

    fn build_share_acknowledge_request(
        &self,
        acknowledgements: HashMap<TopicIdPartitionKey, Acknowledgements>,
        session_epoch: i32,
    ) -> ShareAcknowledgeRequest {
        let mut topics = BTreeMap::<Uuid, Vec<AcknowledgePartition>>::new();
        for (key, acks) in acknowledgements {
            topics.entry(key.topic_id).or_default().push(
                AcknowledgePartition::default()
                    .with_partition_index(key.partition)
                    .with_acknowledgement_batches(acks.into_share_acknowledge_batches()),
            );
        }

        ShareAcknowledgeRequest::default()
            .with_group_id(Some(
                StrBytes::from_string(self.config.group_id.clone()).into(),
            ))
            .with_member_id(Some(StrBytes::from_string(self.member_id.clone())))
            .with_share_session_epoch(session_epoch)
            .with_topics(
                topics
                    .into_iter()
                    .map(|(topic_id, partitions)| {
                        AcknowledgeTopic::default()
                            .with_topic_id(topic_id)
                            .with_partitions(partitions)
                    })
                    .collect(),
            )
    }

    fn process_share_acknowledge_response(
        &mut self,
        response: ShareAcknowledgeResponse,
    ) -> AnyResult<()> {
        if let Some(error) = response.error_code.err() {
            bail!("share acknowledge failed: {error}");
        }
        for topic in response.responses {
            let topic_name = self
                .metadata
                .topic_name(&topic.topic_id)
                .cloned()
                .unwrap_or_else(|| topic.topic_id.to_string());
            for partition in topic.partitions {
                if let Some(error) = partition.error_code.err() {
                    bail!(
                        "share acknowledge failed for {}:{}: {}",
                        topic_name,
                        partition.partition_index,
                        error
                    );
                }
            }
        }
        Ok(())
    }
}