kafkit-client 0.1.4

Kafka 4.0+ pure Rust client.
Documentation
use super::*;

impl ConsumerRuntime {
    pub(crate) fn topic_partitions_from_keys<I>(keys: I) -> Vec<TopicPartition>
    where
        I: IntoIterator<Item = TopicPartitionKey>,
    {
        keys.into_iter()
            .map(|key| TopicPartition::new(key.topic, key.partition))
            .collect()
    }

    pub(crate) fn subscribe(&mut self, topics: Vec<String>) -> ClientResult<()> {
        let normalized: BTreeSet<String> = topics
            .into_iter()
            .map(|topic| topic.trim().to_owned())
            .filter(|topic| !topic.is_empty())
            .collect();

        if normalized.is_empty() {
            return Err(ConsumerError::EmptySubscription.into());
        }

        self.notify_rebalance_revoked_with(|| {
            Self::topic_partitions_from_keys(
                self.assignment_state
                    .assignment
                    .keys()
                    .cloned()
                    .collect::<Vec<_>>(),
            )
        });
        self.assignment_state.group_subscription = ConsumerSubscription::Topics(normalized);
        self.assignment_state.manual_assignment.clear();
        self.assignment_state.paused_partitions.clear();
        self.poll_state.buffered_records.clear();
        self.assignment_state.assignment.clear();
        self.assignment_state.pending_assignment = None;
        self.poll_state.delivered_offsets.clear();
        self.heartbeat_state.mark_subscription_changed();
        self.connections.coordinator_retry_at = None;
        self.poll_state.pending_poll = None;
        debug!(
            topics = ?self.subscribed_topic_names(),
            topic_count = self.subscribed_topic_names().len(),
            "updated consumer subscription"
        );
        Ok(())
    }

    pub(crate) fn subscribe_pattern(
        &mut self,
        pattern: crate::types::SubscriptionPattern,
    ) -> ClientResult<()> {
        let pattern = pattern.pattern().trim().to_owned();
        if pattern.is_empty() {
            return Err(ConsumerError::EmptySubscriptionPattern.into());
        }

        self.notify_rebalance_revoked_with(|| {
            Self::topic_partitions_from_keys(
                self.assignment_state
                    .assignment
                    .keys()
                    .cloned()
                    .collect::<Vec<_>>(),
            )
        });
        self.assignment_state.group_subscription = ConsumerSubscription::Pattern(pattern.clone());
        self.assignment_state.manual_assignment.clear();
        self.assignment_state.paused_partitions.clear();
        self.poll_state.buffered_records.clear();
        self.assignment_state.assignment.clear();
        self.assignment_state.pending_assignment = None;
        self.poll_state.delivered_offsets.clear();
        self.heartbeat_state.mark_subscription_changed();
        self.connections.coordinator_retry_at = None;
        self.poll_state.pending_poll = None;
        debug!(%pattern, "updated consumer regex subscription");
        Ok(())
    }

    pub(crate) async fn subscribe_regex(&mut self, pattern: String) -> ClientResult<()> {
        let pattern = pattern.trim().to_owned();
        if pattern.is_empty() {
            return Err(ConsumerError::EmptySubscriptionPattern.into());
        }

        let regex = regex::Regex::new(&pattern).map_err(|error| {
            ConsumerError::InvalidSubscriptionRegex {
                message: error.to_string(),
            }
        })?;

        self.notify_rebalance_revoked_with(|| {
            Self::topic_partitions_from_keys(
                self.assignment_state
                    .assignment
                    .keys()
                    .cloned()
                    .collect::<Vec<_>>(),
            )
        });
        self.assignment_state.group_subscription = ConsumerSubscription::Regex { regex };
        self.assignment_state.manual_assignment.clear();
        self.assignment_state.paused_partitions.clear();
        self.poll_state.buffered_records.clear();
        self.assignment_state.assignment.clear();
        self.assignment_state.pending_assignment = None;
        self.poll_state.delivered_offsets.clear();
        self.heartbeat_state.mark_subscription_changed();
        self.connections.coordinator_retry_at = None;
        self.poll_state.pending_poll = None;
        debug!(%pattern, "updated consumer client-side regex subscription");
        self.refresh_subscription_metadata()
            .await
            .map_err(Error::from)?;
        Ok(())
    }

    pub(crate) async fn assign(
        &mut self,
        partitions: Vec<crate::types::TopicPartition>,
    ) -> ClientResult<()> {
        let normalized: BTreeSet<TopicPartitionKey> = partitions
            .into_iter()
            .map(TopicPartitionKey::from)
            .map(|key| TopicPartitionKey::new(key.topic.trim().to_owned(), key.partition))
            .filter(|key| !key.topic.is_empty())
            .collect();

        if normalized.is_empty() {
            if self.heartbeat_state.member_epoch > 0 {
                self.leave_group().await?;
            } else {
                self.notify_rebalance_revoked_with(|| {
                    Self::topic_partitions_from_keys(
                        self.assignment_state
                            .assignment
                            .keys()
                            .cloned()
                            .collect::<Vec<_>>(),
                    )
                });
                self.heartbeat_state.mark_left();
            }
            self.assignment_state.group_subscription = ConsumerSubscription::None;
            self.assignment_state.manual_assignment.clear();
            self.assignment_state.paused_partitions.clear();
            self.poll_state.buffered_records.clear();
            self.assignment_state.assignment.clear();
            self.assignment_state.pending_assignment = None;
            self.poll_state.delivered_offsets.clear();
            debug!("cleared manual consumer assignment");
            return Ok(());
        }

        if !self.has_group_subscription() && self.has_rebalance_listener() {
            let previous_assignment = self
                .assignment_state
                .assignment
                .keys()
                .cloned()
                .collect::<BTreeSet<_>>();
            self.notify_rebalance_revoked(Self::topic_partitions_from_keys(
                previous_assignment.difference(&normalized).cloned(),
            ));
        }

        if self.has_group_subscription() && self.heartbeat_state.member_epoch > 0 {
            self.leave_group().await?;
        } else if self.has_group_subscription() {
            self.heartbeat_state.mark_left();
        }

        self.assignment_state.group_subscription = ConsumerSubscription::None;
        self.assignment_state.pending_assignment = None;
        self.assignment_state.manual_assignment = normalized;
        self.assignment_state
            .paused_partitions
            .retain(|key| self.assignment_state.manual_assignment.contains(key));
        self.poll_state.buffered_records.clear();
        self.poll_state
            .delivered_offsets
            .retain(|key, _| self.assignment_state.manual_assignment.contains(key));
        debug!(
            partitions = self.assignment_state.manual_assignment.len(),
            "updated manual consumer assignment"
        );
        self.resolve_manual_assignment().await
    }

    pub(crate) fn normalize_assigned_partitions(
        &self,
        partitions: Vec<crate::types::TopicPartition>,
    ) -> ClientResult<Vec<TopicPartitionKey>> {
        let mut keys = Vec::new();
        let mut seen = BTreeSet::new();

        for partition in partitions {
            let key =
                TopicPartitionKey::new(partition.topic.trim().to_owned(), partition.partition);
            if key.topic.is_empty() {
                return Err(ConsumerError::EmptyTopicPartition.into());
            }
            if !self.assignment_state.assignment.contains_key(&key) {
                return Err(ConsumerError::PartitionNotAssigned {
                    operation: "operate on",
                    topic: key.topic.clone(),
                    partition: key.partition,
                }
                .into());
            }
            if seen.insert(key.clone()) {
                keys.push(key);
            }
        }

        Ok(keys)
    }

    pub(crate) fn normalize_timestamp_queries(
        &self,
        partitions: Vec<TopicPartitionTimestamp>,
    ) -> ClientResult<Vec<TopicPartitionTimestamp>> {
        let mut normalized = Vec::new();
        let mut seen = BTreeSet::new();

        for partition in partitions {
            let topic = partition.topic.trim().to_owned();
            if topic.is_empty() {
                return Err(ConsumerError::EmptyTopicPartition.into());
            }

            let key = TopicPartitionKey::new(topic.clone(), partition.partition);
            if !self.assignment_state.assignment.contains_key(&key) {
                return Err(ConsumerError::PartitionNotAssigned {
                    operation: "seek_to_timestamp",
                    topic,
                    partition: partition.partition,
                }
                .into());
            }

            if seen.insert(key) {
                normalized.push(TopicPartitionTimestamp::new(
                    partition.topic.trim().to_owned(),
                    partition.partition,
                    partition.timestamp,
                ));
            }
        }

        Ok(normalized)
    }

    pub(crate) fn discard_buffered_partition(&mut self, key: &TopicPartitionKey) {
        self.poll_state
            .buffered_records
            .retain(|record| !(record.topic == key.topic && record.partition == key.partition));
    }

    pub(crate) fn desired_topics(&self) -> Vec<String> {
        let mut topics = self
            .subscribed_topic_names()
            .into_iter()
            .collect::<Vec<_>>();
        topics.extend(
            self.assignment_state
                .manual_assignment
                .iter()
                .map(|key| key.topic.clone())
                .collect::<BTreeSet<_>>(),
        );
        topics.extend(
            self.assignment_state
                .assignment
                .keys()
                .map(|key| key.topic.clone())
                .collect::<BTreeSet<_>>(),
        );
        topics.sort_unstable();
        topics.dedup();
        topics
    }

    pub(crate) async fn resolve_manual_assignment(&mut self) -> ClientResult<()> {
        if self.assignment_state.manual_assignment.is_empty() {
            self.notify_rebalance_revoked_with(|| {
                Self::topic_partitions_from_keys(
                    self.assignment_state
                        .assignment
                        .keys()
                        .cloned()
                        .collect::<Vec<_>>(),
                )
            });
            self.assignment_state.assignment.clear();
            self.poll_state.buffered_records.clear();
            return Ok(());
        }

        let previous_assignment = self.has_rebalance_listener().then(|| {
            self.assignment_state
                .assignment
                .keys()
                .cloned()
                .collect::<BTreeSet<_>>()
        });
        self.refresh_subscription_metadata()
            .await
            .map_err(Error::from)?;
        let keys = self
            .assignment_state
            .manual_assignment
            .iter()
            .cloned()
            .collect::<Vec<_>>();
        let mut initial_offsets = self.fetch_initial_offsets_with_retry(&keys).await?;
        let mut next_assignment = HashMap::new();

        for key in keys {
            let topic_id = self
                .connections
                .metadata
                .topic_id(&key.topic)
                .with_context(|| format!("missing topic id for {}", key.topic))
                .map_err(Error::from)?;
            let partition_metadata = self
                .connections
                .metadata
                .partition(&key.topic, key.partition)
                .with_context(|| format!("missing metadata for {}:{}", key.topic, key.partition))
                .map_err(Error::from)?;
            let fetch_offset = self
                .assignment_state
                .assignment
                .get(&key)
                .map(|assigned| assigned.fetch_offset)
                .or_else(|| initial_offsets.remove(&key))
                .unwrap_or(0);

            next_assignment.insert(
                key.clone(),
                AssignedPartition {
                    key: key.clone(),
                    topic_id,
                    leader_id: partition_metadata.leader_id,
                    leader_epoch: partition_metadata.leader_epoch,
                    fetch_offset,
                },
            );
            self.poll_state
                .delivered_offsets
                .entry(key)
                .or_insert(fetch_offset);
        }

        self.poll_state
            .delivered_offsets
            .retain(|key, _| next_assignment.contains_key(key));
        self.assignment_state
            .paused_partitions
            .retain(|key| next_assignment.contains_key(key));
        self.assignment_state.assignment = next_assignment;
        self.poll_state.buffered_records.clear();
        if let Some(previous_assignment) = previous_assignment {
            self.notify_rebalance_assigned_with(|| {
                let next_assignment = self
                    .assignment_state
                    .assignment
                    .keys()
                    .cloned()
                    .collect::<BTreeSet<_>>();
                Self::topic_partitions_from_keys(
                    next_assignment.difference(&previous_assignment).cloned(),
                )
            });
        }
        Ok(())
    }

    pub(crate) fn can_resolve_assignment(&self, assignment: &HeartbeatAssignment) -> bool {
        assignment.topic_partitions.iter().all(|topic_partitions| {
            self.connections
                .metadata
                .topic_name(&topic_partitions.topic_id)
                .is_some()
        })
    }
}