use super::*;
impl ConsumerRuntime {
pub(crate) fn current_group_member(&self) -> (Option<StrBytes>, i32) {
if !self.has_group_subscription() || self.heartbeat_state.member_epoch <= 0 {
(None, -1)
} else {
(
Some(StrBytes::from_string(
self.heartbeat_state.member_id.clone(),
)),
self.heartbeat_state.member_epoch,
)
}
}
pub(crate) fn current_group_metadata(&self) -> ConsumerGroupMetadata {
let mut metadata = ConsumerGroupMetadata::new(self.config.group_id.clone());
if self.has_group_subscription() && self.heartbeat_state.member_epoch > 0 {
metadata = metadata
.with_generation_id(self.heartbeat_state.member_epoch)
.with_member_id(self.heartbeat_state.member_id.clone());
if let Some(instance_id) = self.config.instance_id.clone() {
metadata = metadata.with_group_instance_id(instance_id);
}
}
metadata
}
pub(crate) fn maybe_enqueue_auto_commit(&mut self) {
if self.lifecycle.shutting_down
|| !self.config.enable_auto_commit
|| self.poll_state.delivered_offsets.is_empty()
|| self.lifecycle.last_auto_commit.elapsed() < self.config.auto_commit_interval
|| self
.lifecycle
.pending_commits
.iter()
.any(|pending| pending.kind == CommitKind::Auto)
{
return;
}
let offsets = self
.poll_state
.delivered_offsets
.iter()
.map(|(key, offset)| CommitOffset {
topic: key.topic.clone(),
partition: key.partition,
offset: *offset,
})
.collect();
self.lifecycle.pending_commits.push_back(PendingCommit {
offsets,
cancellation: None,
reply: None,
kind: CommitKind::Auto,
});
}
pub(crate) async fn maybe_reconcile_pending_assignment(&mut self) -> AnyResult<()> {
let Some(assignment) = self.assignment_state.pending_assignment.clone() else {
return Ok(());
};
if !self.can_resolve_assignment(&assignment) {
return Ok(());
}
self.reconcile_assignment(assignment).await?;
self.assignment_state.pending_assignment = None;
self.heartbeat_state.assignment_dirty = true;
Ok(())
}
pub(crate) fn finish_shutdown_if_ready(&mut self) -> bool {
if !self.lifecycle.shutting_down {
return true;
}
if !self.lifecycle.pending_commits.is_empty() {
return true;
}
if self.heartbeat_state.member_epoch > 0 {
return true;
}
if let Some(reply) = self.lifecycle.close_reply.take() {
let _ = reply.send(Ok(()));
}
false
}
pub(crate) fn finish_shutdown_with_error(&mut self, message: &str) {
if let Some(reply) = self.lifecycle.close_reply.take() {
let _ = reply.send(Err(Error::Consumer(ConsumerError::Fatal {
message: message.to_owned(),
})));
}
}
pub(crate) fn enter_fatal_state(&mut self, error: Error) {
let message = format!("{error:#}");
if self.lifecycle.fatal_error.is_none() {
warn!(error = %message, "consumer runtime entered fatal state");
}
self.lifecycle.fatal_error = Some(message.clone());
self.notify_rebalance_lost_with(|| {
Self::topic_partitions_from_keys(
self.assignment_state
.assignment
.keys()
.cloned()
.collect::<Vec<_>>(),
)
});
self.assignment_state.assignment.clear();
self.assignment_state.pending_assignment = None;
self.assignment_state.paused_partitions.clear();
self.poll_state.buffered_records.clear();
self.poll_state.delivered_offsets.clear();
self.connections.coordinator_connection = None;
self.heartbeat_state.mark_left();
self.fail_pending_poll(error);
self.fail_pending_commits(&message);
}
pub(crate) fn needs_coordinator(&self) -> bool {
self.has_group_subscription()
|| !self.lifecycle.pending_commits.is_empty()
|| (self.lifecycle.shutting_down && self.heartbeat_state.member_epoch > 0)
}
pub(crate) fn coordinator_retry_ready(&self) -> bool {
self.connections
.coordinator_retry_at
.map(|deadline| Instant::now() >= deadline)
.unwrap_or(true)
}
pub(crate) fn backoff_coordinator(&mut self) {
self.connections.coordinator_connection = None;
self.connections.coordinator_retry_at = Some(Instant::now() + self.config.retry_backoff);
self.heartbeat_state.backoff(self.config.retry_backoff);
}
pub(crate) async fn send_heartbeat(&mut self) -> AnyResult<()> {
self.ensure_coordinator_connection().await?;
let version = self
.connections
.coordinator_connection
.as_ref()
.context("coordinator connection is missing")?
.version_with_cap::<ConsumerGroupHeartbeatRequest>(HEARTBEAT_VERSION_CAP)?;
let request = self.build_heartbeat_request(version)?;
let client_id = self.config.client_id.clone();
let response: ConsumerGroupHeartbeatResponse = self
.connections
.coordinator_connection
.as_mut()
.context("coordinator connection is missing")?
.send_request::<ConsumerGroupHeartbeatRequest>(&client_id, version, &request)
.await?;
if let Some(error) = response.error_code.err() {
if error.is_retriable() {
self.backoff_coordinator();
return Err(error.into());
}
let error_message = response.error_message.as_ref().map(ToString::to_string);
if Self::is_fatal_heartbeat_error(error) {
let client_error = self.fatal_heartbeat_error(error, error_message);
self.enter_fatal_state(client_error);
return Ok(());
}
if let Some(mapped) = self.map_heartbeat_error(error, error_message) {
return Err(Error::Consumer(mapped).into());
}
return Err(broker_response_error(
"consumer_group_heartbeat",
Some(self.config.group_id.clone()),
error,
)
.into());
}
if let Some(member_id) = response.member_id {
self.heartbeat_state.member_id = member_id.to_string();
}
self.heartbeat_state.member_epoch = response.member_epoch;
self.heartbeat_state
.mark_heartbeat_success(Duration::from_millis(
u64::try_from(response.heartbeat_interval_ms.max(0)).unwrap_or(0),
));
trace!(
member_epoch = self.heartbeat_state.member_epoch,
member_id = %self.heartbeat_state.member_id,
assignment = response.assignment.is_some(),
heartbeat_interval_ms = response.heartbeat_interval_ms,
"processed consumer heartbeat response"
);
if response.member_epoch < 0 {
self.notify_rebalance_lost_with(|| {
Self::topic_partitions_from_keys(
self.assignment_state
.assignment
.keys()
.cloned()
.collect::<Vec<_>>(),
)
});
self.assignment_state.assignment.clear();
self.assignment_state.pending_assignment = None;
self.poll_state.buffered_records.clear();
self.heartbeat_state.mark_left();
return Ok(());
}
if let Some(assignment) = response.assignment {
self.assignment_state.pending_assignment = Some(assignment);
}
Ok(())
}
fn is_fatal_heartbeat_error(error: ResponseError) -> bool {
matches!(
error,
ResponseError::UnsupportedVersion
| ResponseError::UnreleasedInstanceId
| ResponseError::FencedInstanceId
)
}
fn fatal_heartbeat_error(&self, error: ResponseError, error_message: Option<String>) -> Error {
if let Some(mapped) = self.map_heartbeat_error(error, error_message) {
return Error::Consumer(mapped);
}
Error::Broker(
BrokerError::response(
"consumer_group_heartbeat",
Some(self.config.group_id.clone()),
error,
)
.fatal(),
)
}
pub(crate) fn build_heartbeat_request(
&mut self,
version: i16,
) -> AnyResult<ConsumerGroupHeartbeatRequest> {
let mut request = ConsumerGroupHeartbeatRequest::default()
.with_group_id(StrBytes::from_string(self.config.group_id.clone()).into())
.with_member_id(StrBytes::from_string(
self.heartbeat_state.member_id.clone(),
))
.with_member_epoch(self.heartbeat_state.member_epoch)
.with_instance_id(self.config.instance_id.clone().map(StrBytes::from_string));
let joining = self.heartbeat_state.is_joining();
let rebalance_timeout_ms = duration_to_i32_ms(self.config.rebalance_timeout)?;
if joining
|| self.heartbeat_state.sent_fields.rebalance_timeout_ms != Some(rebalance_timeout_ms)
{
request = request.with_rebalance_timeout_ms(rebalance_timeout_ms);
self.heartbeat_state.sent_fields.rebalance_timeout_ms = Some(rebalance_timeout_ms);
}
let subscribed = self.subscribed_topic_names();
if joining
|| self.heartbeat_state.sent_fields.subscribed_topics.as_ref() != Some(&subscribed)
{
let topics = subscribed
.iter()
.cloned()
.map(StrBytes::from_string)
.map(Into::into)
.collect();
request = request.with_subscribed_topic_names(Some(topics));
self.heartbeat_state.sent_fields.subscribed_topics = Some(subscribed);
}
let subscribed_pattern = self.subscription_pattern().map(ToOwned::to_owned);
if version >= 1
&& ((joining && subscribed_pattern.is_some())
|| self.heartbeat_state.sent_fields.subscribed_topic_regex != subscribed_pattern)
{
request = request
.with_subscribed_topic_regex(subscribed_pattern.clone().map(StrBytes::from_string));
self.heartbeat_state.sent_fields.subscribed_topic_regex = subscribed_pattern;
}
if joining {
request = request.with_rack_id(self.config.rack_id.clone().map(StrBytes::from_string));
}
if joining
|| self
.heartbeat_state
.server_assignor_changed(&self.config.server_assignor)
{
request = request.with_server_assignor(
self.config
.server_assignor
.clone()
.map(StrBytes::from_string),
);
self.heartbeat_state.sent_fields.server_assignor = self.config.server_assignor.clone();
}
let assignment_snapshot =
assignment_snapshot_by_topic_id(&self.assignment_state.assignment);
if joining
|| self.heartbeat_state.assignment_dirty
|| self
.heartbeat_state
.sent_fields
.assignment_snapshot
.as_ref()
!= Some(&assignment_snapshot)
{
let topic_partitions = assignment_snapshot
.iter()
.map(|(topic_id, partitions)| {
kafka_protocol::messages::consumer_group_heartbeat_request::TopicPartitions::default()
.with_topic_id(*topic_id)
.with_partitions(partitions.clone())
})
.collect();
request = request.with_topic_partitions(Some(topic_partitions));
self.heartbeat_state.sent_fields.assignment_snapshot = Some(assignment_snapshot);
self.heartbeat_state.assignment_dirty = false;
}
Ok(request)
}
fn map_heartbeat_error(
&self,
error: kafka_protocol::error::ResponseError,
error_message: Option<String>,
) -> Option<ConsumerError> {
let message = error_message
.filter(|message| !message.is_empty())
.unwrap_or_else(|| error.to_string());
match error {
kafka_protocol::error::ResponseError::InvalidRegularExpression => {
Some(ConsumerError::InvalidRegularExpression { message })
}
kafka_protocol::error::ResponseError::UnsupportedAssignor => {
Some(ConsumerError::UnsupportedAssignor {
assignor: self.config.server_assignor.clone().unwrap_or_default(),
message,
})
}
kafka_protocol::error::ResponseError::UnreleasedInstanceId => {
Some(ConsumerError::UnreleasedInstanceId {
instance_id: self.config.instance_id.clone().unwrap_or_default(),
message,
})
}
kafka_protocol::error::ResponseError::FencedInstanceId => {
Some(ConsumerError::FencedInstanceId {
instance_id: self.config.instance_id.clone().unwrap_or_default(),
message,
})
}
_ => None,
}
}
pub(crate) async fn reconcile_assignment(
&mut self,
assignment: HeartbeatAssignment,
) -> AnyResult<()> {
let previous_assignment = self.has_rebalance_listener().then(|| {
self.assignment_state
.assignment
.keys()
.cloned()
.collect::<BTreeSet<_>>()
});
let assigned_partition_count = assignment
.topic_partitions
.iter()
.map(|topic_partitions| topic_partitions.partitions.len())
.sum::<usize>();
debug!(
assigned_partition_count,
topic_count = assignment.topic_partitions.len(),
"reconciling consumer assignment"
);
if self.config.enable_auto_commit
&& !self.assignment_state.assignment.is_empty()
&& !self.poll_state.delivered_offsets.is_empty()
{
let offsets = self
.poll_state
.delivered_offsets
.iter()
.map(|(key, offset)| CommitOffset {
topic: key.topic.clone(),
partition: key.partition,
offset: *offset,
})
.collect();
self.commit_offsets_inner(offsets).await?;
}
let new_keys: Vec<TopicPartitionKey> = assignment
.topic_partitions
.iter()
.flat_map(|topic_partitions| {
let topic_name = self
.connections
.metadata
.topic_name(&topic_partitions.topic_id)
.cloned()
.unwrap_or_default();
topic_partitions
.partitions
.iter()
.copied()
.map(move |partition| TopicPartitionKey::new(topic_name.clone(), partition))
})
.collect();
let new_assignment = previous_assignment
.is_some()
.then(|| new_keys.iter().cloned().collect::<BTreeSet<_>>());
if let (Some(previous_assignment), Some(new_assignment)) =
(&previous_assignment, &new_assignment)
{
self.notify_rebalance_revoked_with(|| {
Self::topic_partitions_from_keys(
previous_assignment.difference(new_assignment).cloned(),
)
});
}
self.refresh_subscription_metadata().await?;
let mut next_assignment = HashMap::new();
let mut initial_offsets = self.fetch_initial_offsets_with_retry(&new_keys).await?;
for topic_partitions in assignment.topic_partitions {
let topic_name = self
.connections
.metadata
.topic_name(&topic_partitions.topic_id)
.cloned()
.with_context(|| {
format!(
"topic id {} was not found in metadata",
topic_partitions.topic_id
)
})?;
for partition in topic_partitions.partitions {
let key = TopicPartitionKey::new(topic_name.clone(), partition);
let partition_metadata = self
.connections
.metadata
.partition(&topic_name, partition)
.with_context(|| {
format!("missing metadata for {}:{}", topic_name, partition)
})?;
let fetch_offset = self
.assignment_state
.assignment
.get(&key)
.map(|assigned| assigned.fetch_offset)
.or_else(|| initial_offsets.remove(&key))
.unwrap_or(0);
next_assignment.insert(
key.clone(),
AssignedPartition {
key,
topic_id: topic_partitions.topic_id,
leader_id: partition_metadata.leader_id,
leader_epoch: partition_metadata.leader_epoch,
fetch_offset,
},
);
}
}
self.poll_state
.delivered_offsets
.retain(|key, _| next_assignment.contains_key(key));
self.assignment_state
.paused_partitions
.retain(|key| next_assignment.contains_key(key));
self.assignment_state.assignment = next_assignment;
self.poll_state.buffered_records.clear();
if let (Some(previous_assignment), Some(new_assignment)) =
(&previous_assignment, &new_assignment)
{
self.notify_rebalance_assigned_with(|| {
Self::topic_partitions_from_keys(
new_assignment.difference(previous_assignment).cloned(),
)
});
}
debug!(
assignment_size = self.assignment_state.assignment.len(),
paused = self.assignment_state.paused_partitions.len(),
"consumer assignment reconciled"
);
Ok(())
}
pub(crate) async fn leave_group(&mut self) -> AnyResult<()> {
if self.heartbeat_state.member_epoch <= 0 {
self.notify_rebalance_revoked_with(|| {
Self::topic_partitions_from_keys(
self.assignment_state
.assignment
.keys()
.cloned()
.collect::<Vec<_>>(),
)
});
self.heartbeat_state.mark_left();
return Ok(());
}
self.ensure_coordinator_connection().await?;
let request = ConsumerGroupHeartbeatRequest::default()
.with_group_id(StrBytes::from_string(self.config.group_id.clone()).into())
.with_member_id(StrBytes::from_string(
self.heartbeat_state.member_id.clone(),
))
.with_member_epoch(self.leave_group_epoch())
.with_instance_id(self.config.instance_id.clone().map(StrBytes::from_string));
let client_id = self.config.client_id.clone();
let version = self
.connections
.coordinator_connection
.as_ref()
.context("coordinator connection is missing")?
.version_with_cap::<ConsumerGroupHeartbeatRequest>(HEARTBEAT_VERSION_CAP)?;
let _ = self
.connections
.coordinator_connection
.as_mut()
.context("coordinator connection is missing")?
.send_request::<ConsumerGroupHeartbeatRequest>(&client_id, version, &request)
.await;
self.notify_rebalance_revoked_with(|| {
Self::topic_partitions_from_keys(
self.assignment_state
.assignment
.keys()
.cloned()
.collect::<Vec<_>>(),
)
});
self.assignment_state.assignment.clear();
self.assignment_state.pending_assignment = None;
self.poll_state.buffered_records.clear();
self.heartbeat_state.mark_left();
Ok(())
}
pub(crate) fn leave_group_epoch(&self) -> i32 {
if self.config.instance_id.is_some() {
-2
} else {
-1
}
}
pub(crate) async fn ensure_coordinator_connection(&mut self) -> AnyResult<()> {
if self.connections.coordinator_connection.is_some() {
return Ok(());
}
if !self.coordinator_retry_ready() {
return Ok(());
}
let mut bootstrap = connect_to_any_bootstrap(
&self.config.bootstrap_servers,
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
&self.config.tcp_connector,
)
.await?;
let version =
bootstrap.version_with_cap::<FindCoordinatorRequest>(FIND_COORDINATOR_VERSION_CAP)?;
let request = if version >= 4 {
FindCoordinatorRequest::default()
.with_key_type(FIND_COORDINATOR_GROUP_KEY_TYPE)
.with_coordinator_keys(vec![StrBytes::from_string(self.config.group_id.clone())])
} else {
FindCoordinatorRequest::default()
.with_key(StrBytes::from_string(self.config.group_id.clone()))
.with_key_type(FIND_COORDINATOR_GROUP_KEY_TYPE)
};
let response = bootstrap
.send_request::<FindCoordinatorRequest>(&self.config.client_id, version, &request)
.await?;
let coordinator = match parse_find_coordinator_response(response, version)? {
CoordinatorLookupResult::Found(coordinator) => coordinator,
CoordinatorLookupResult::Retry(error) => {
self.backoff_coordinator();
return Err(error.into());
}
};
let connection = BrokerConnection::connect_with_transport(
&coordinator.address(),
&self.config.client_id,
self.config.request_timeout,
self.config.security_protocol,
&self.config.tls,
&self.config.sasl,
&self.config.tcp_connector,
)
.await?;
self.connections.coordinator_connection = Some(connection);
self.connections.coordinator_retry_at = None;
debug!(coordinator = %coordinator.address(), "connected to consumer group coordinator");
Ok(())
}
}