use super::*;
impl ConsumerRuntime {
pub(crate) fn topic_partitions_from_keys<I>(keys: I) -> Vec<TopicPartition>
where
I: IntoIterator<Item = TopicPartitionKey>,
{
keys.into_iter()
.map(|key| TopicPartition::new(key.topic, key.partition))
.collect()
}
pub(crate) fn subscribe(&mut self, topics: Vec<String>) -> ClientResult<()> {
let normalized: BTreeSet<String> = topics
.into_iter()
.map(|topic| topic.trim().to_owned())
.filter(|topic| !topic.is_empty())
.collect();
if normalized.is_empty() {
return Err(ConsumerError::EmptySubscription.into());
}
self.notify_rebalance_revoked_with(|| {
Self::topic_partitions_from_keys(
self.assignment_state
.assignment
.keys()
.cloned()
.collect::<Vec<_>>(),
)
});
self.assignment_state.group_subscription = ConsumerSubscription::Topics(normalized);
self.assignment_state.manual_assignment.clear();
self.assignment_state.paused_partitions.clear();
self.poll_state.buffered_records.clear();
self.assignment_state.assignment.clear();
self.assignment_state.pending_assignment = None;
self.poll_state.delivered_offsets.clear();
self.heartbeat_state.mark_subscription_changed();
self.connections.coordinator_retry_at = None;
self.poll_state.pending_poll = None;
debug!(
topics = ?self.subscribed_topic_names(),
topic_count = self.subscribed_topic_names().len(),
"updated consumer subscription"
);
Ok(())
}
pub(crate) fn subscribe_pattern(
&mut self,
pattern: crate::types::SubscriptionPattern,
) -> ClientResult<()> {
let pattern = pattern.pattern().trim().to_owned();
if pattern.is_empty() {
return Err(ConsumerError::EmptySubscriptionPattern.into());
}
self.notify_rebalance_revoked_with(|| {
Self::topic_partitions_from_keys(
self.assignment_state
.assignment
.keys()
.cloned()
.collect::<Vec<_>>(),
)
});
self.assignment_state.group_subscription = ConsumerSubscription::Pattern(pattern.clone());
self.assignment_state.manual_assignment.clear();
self.assignment_state.paused_partitions.clear();
self.poll_state.buffered_records.clear();
self.assignment_state.assignment.clear();
self.assignment_state.pending_assignment = None;
self.poll_state.delivered_offsets.clear();
self.heartbeat_state.mark_subscription_changed();
self.connections.coordinator_retry_at = None;
self.poll_state.pending_poll = None;
debug!(%pattern, "updated consumer regex subscription");
Ok(())
}
pub(crate) async fn subscribe_regex(&mut self, pattern: String) -> ClientResult<()> {
let pattern = pattern.trim().to_owned();
if pattern.is_empty() {
return Err(ConsumerError::EmptySubscriptionPattern.into());
}
let regex = regex::Regex::new(&pattern).map_err(|error| {
ConsumerError::InvalidSubscriptionRegex {
message: error.to_string(),
}
})?;
self.notify_rebalance_revoked_with(|| {
Self::topic_partitions_from_keys(
self.assignment_state
.assignment
.keys()
.cloned()
.collect::<Vec<_>>(),
)
});
self.assignment_state.group_subscription = ConsumerSubscription::Regex { regex };
self.assignment_state.manual_assignment.clear();
self.assignment_state.paused_partitions.clear();
self.poll_state.buffered_records.clear();
self.assignment_state.assignment.clear();
self.assignment_state.pending_assignment = None;
self.poll_state.delivered_offsets.clear();
self.heartbeat_state.mark_subscription_changed();
self.connections.coordinator_retry_at = None;
self.poll_state.pending_poll = None;
debug!(%pattern, "updated consumer client-side regex subscription");
self.refresh_subscription_metadata()
.await
.map_err(Error::from)?;
Ok(())
}
pub(crate) async fn assign(
&mut self,
partitions: Vec<crate::types::TopicPartition>,
) -> ClientResult<()> {
let normalized: BTreeSet<TopicPartitionKey> = partitions
.into_iter()
.map(TopicPartitionKey::from)
.map(|key| TopicPartitionKey::new(key.topic.trim().to_owned(), key.partition))
.filter(|key| !key.topic.is_empty())
.collect();
if normalized.is_empty() {
if self.heartbeat_state.member_epoch > 0 {
self.leave_group().await?;
} else {
self.notify_rebalance_revoked_with(|| {
Self::topic_partitions_from_keys(
self.assignment_state
.assignment
.keys()
.cloned()
.collect::<Vec<_>>(),
)
});
self.heartbeat_state.mark_left();
}
self.assignment_state.group_subscription = ConsumerSubscription::None;
self.assignment_state.manual_assignment.clear();
self.assignment_state.paused_partitions.clear();
self.poll_state.buffered_records.clear();
self.assignment_state.assignment.clear();
self.assignment_state.pending_assignment = None;
self.poll_state.delivered_offsets.clear();
debug!("cleared manual consumer assignment");
return Ok(());
}
if !self.has_group_subscription() && self.has_rebalance_listener() {
let previous_assignment = self
.assignment_state
.assignment
.keys()
.cloned()
.collect::<BTreeSet<_>>();
self.notify_rebalance_revoked(Self::topic_partitions_from_keys(
previous_assignment.difference(&normalized).cloned(),
));
}
if self.has_group_subscription() && self.heartbeat_state.member_epoch > 0 {
self.leave_group().await?;
} else if self.has_group_subscription() {
self.heartbeat_state.mark_left();
}
self.assignment_state.group_subscription = ConsumerSubscription::None;
self.assignment_state.pending_assignment = None;
self.assignment_state.manual_assignment = normalized;
self.assignment_state
.paused_partitions
.retain(|key| self.assignment_state.manual_assignment.contains(key));
self.poll_state.buffered_records.clear();
self.poll_state
.delivered_offsets
.retain(|key, _| self.assignment_state.manual_assignment.contains(key));
debug!(
partitions = self.assignment_state.manual_assignment.len(),
"updated manual consumer assignment"
);
self.resolve_manual_assignment().await
}
pub(crate) fn normalize_assigned_partitions(
&self,
partitions: Vec<crate::types::TopicPartition>,
) -> ClientResult<Vec<TopicPartitionKey>> {
let mut keys = Vec::new();
let mut seen = BTreeSet::new();
for partition in partitions {
let key =
TopicPartitionKey::new(partition.topic.trim().to_owned(), partition.partition);
if key.topic.is_empty() {
return Err(ConsumerError::EmptyTopicPartition.into());
}
if !self.assignment_state.assignment.contains_key(&key) {
return Err(ConsumerError::PartitionNotAssigned {
operation: "operate on",
topic: key.topic.clone(),
partition: key.partition,
}
.into());
}
if seen.insert(key.clone()) {
keys.push(key);
}
}
Ok(keys)
}
pub(crate) fn normalize_timestamp_queries(
&self,
partitions: Vec<TopicPartitionTimestamp>,
) -> ClientResult<Vec<TopicPartitionTimestamp>> {
let mut normalized = Vec::new();
let mut seen = BTreeSet::new();
for partition in partitions {
let topic = partition.topic.trim().to_owned();
if topic.is_empty() {
return Err(ConsumerError::EmptyTopicPartition.into());
}
let key = TopicPartitionKey::new(topic.clone(), partition.partition);
if !self.assignment_state.assignment.contains_key(&key) {
return Err(ConsumerError::PartitionNotAssigned {
operation: "seek_to_timestamp",
topic,
partition: partition.partition,
}
.into());
}
if seen.insert(key) {
normalized.push(TopicPartitionTimestamp::new(
partition.topic.trim().to_owned(),
partition.partition,
partition.timestamp,
));
}
}
Ok(normalized)
}
pub(crate) fn discard_buffered_partition(&mut self, key: &TopicPartitionKey) {
self.poll_state
.buffered_records
.retain(|record| !(record.topic == key.topic && record.partition == key.partition));
}
pub(crate) fn desired_topics(&self) -> Vec<String> {
let mut topics = self
.subscribed_topic_names()
.into_iter()
.collect::<Vec<_>>();
topics.extend(
self.assignment_state
.manual_assignment
.iter()
.map(|key| key.topic.clone())
.collect::<BTreeSet<_>>(),
);
topics.extend(
self.assignment_state
.assignment
.keys()
.map(|key| key.topic.clone())
.collect::<BTreeSet<_>>(),
);
topics.sort_unstable();
topics.dedup();
topics
}
pub(crate) async fn resolve_manual_assignment(&mut self) -> ClientResult<()> {
if self.assignment_state.manual_assignment.is_empty() {
self.notify_rebalance_revoked_with(|| {
Self::topic_partitions_from_keys(
self.assignment_state
.assignment
.keys()
.cloned()
.collect::<Vec<_>>(),
)
});
self.assignment_state.assignment.clear();
self.poll_state.buffered_records.clear();
return Ok(());
}
let previous_assignment = self.has_rebalance_listener().then(|| {
self.assignment_state
.assignment
.keys()
.cloned()
.collect::<BTreeSet<_>>()
});
self.refresh_subscription_metadata()
.await
.map_err(Error::from)?;
let keys = self
.assignment_state
.manual_assignment
.iter()
.cloned()
.collect::<Vec<_>>();
let mut initial_offsets = self.fetch_initial_offsets_with_retry(&keys).await?;
let mut next_assignment = HashMap::new();
for key in keys {
let topic_id = self
.connections
.metadata
.topic_id(&key.topic)
.with_context(|| format!("missing topic id for {}", key.topic))
.map_err(Error::from)?;
let partition_metadata = self
.connections
.metadata
.partition(&key.topic, key.partition)
.with_context(|| format!("missing metadata for {}:{}", key.topic, key.partition))
.map_err(Error::from)?;
let fetch_offset = self
.assignment_state
.assignment
.get(&key)
.map(|assigned| assigned.fetch_offset)
.or_else(|| initial_offsets.remove(&key))
.unwrap_or(0);
next_assignment.insert(
key.clone(),
AssignedPartition {
key: key.clone(),
topic_id,
leader_id: partition_metadata.leader_id,
leader_epoch: partition_metadata.leader_epoch,
fetch_offset,
},
);
self.poll_state
.delivered_offsets
.entry(key)
.or_insert(fetch_offset);
}
self.poll_state
.delivered_offsets
.retain(|key, _| next_assignment.contains_key(key));
self.assignment_state
.paused_partitions
.retain(|key| next_assignment.contains_key(key));
self.assignment_state.assignment = next_assignment;
self.poll_state.buffered_records.clear();
if let Some(previous_assignment) = previous_assignment {
self.notify_rebalance_assigned_with(|| {
let next_assignment = self
.assignment_state
.assignment
.keys()
.cloned()
.collect::<BTreeSet<_>>();
Self::topic_partitions_from_keys(
next_assignment.difference(&previous_assignment).cloned(),
)
});
}
Ok(())
}
pub(crate) fn can_resolve_assignment(&self, assignment: &HeartbeatAssignment) -> bool {
assignment.topic_partitions.iter().all(|topic_partitions| {
self.connections
.metadata
.topic_name(&topic_partitions.topic_id)
.is_some()
})
}
}