mod assignment;
mod commits;
mod fetch;
mod group;
mod handlers;
mod metadata;
mod poll;
use std::collections::{BTreeSet, HashMap};
use std::panic::{AssertUnwindSafe, catch_unwind};
use std::time::Duration;
use anyhow::{Context, Result as AnyResult, anyhow, bail};
use bytes::Buf;
use kafka_protocol::error::{ParseResponseErrorCode, ResponseError};
use kafka_protocol::messages::consumer_group_heartbeat_response::Assignment as HeartbeatAssignment;
use kafka_protocol::messages::fetch_response::FetchResponse;
use kafka_protocol::messages::list_offsets_response::ListOffsetsResponse;
use kafka_protocol::messages::offset_commit_response::OffsetCommitResponse;
use kafka_protocol::messages::offset_fetch_response::OffsetFetchResponse;
use kafka_protocol::messages::{
ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse, FindCoordinatorRequest,
ListOffsetsRequest, OffsetCommitRequest, OffsetFetchRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::records::RecordBatchDecoder;
use tokio::sync::mpsc;
use tokio::time::{Instant, sleep};
use tracing::{Instrument, debug, trace, warn};
use super::ConsumerRuntimeEvent;
use super::protocol::{
CoordinatorLookupResult, assignment_snapshot_by_topic_id, build_fetch_request,
build_list_offsets_request, build_list_offsets_request_with_timestamps,
build_offset_commit_request, group_topic_partitions, is_retriable_error,
parse_find_coordinator_response,
};
use super::scheduler::{ConsumerNetworkAction, ConsumerRequestManagers};
use super::state::{
CommitKind, ConsumerAssignmentState, ConsumerConnectionState, ConsumerLifecycleState,
ConsumerPollState, HeartbeatState, PendingCommit, PendingPoll,
};
use crate::config::{AutoOffsetReset, ConsumerConfig};
use crate::constants::{
FETCH_VERSION_CAP, FIND_COORDINATOR_GROUP_KEY_TYPE, FIND_COORDINATOR_VERSION_CAP,
HEARTBEAT_VERSION_CAP, LIST_OFFSETS_EARLIEST, LIST_OFFSETS_LATEST, LIST_OFFSETS_VERSION_CAP,
OFFSET_COMMIT_VERSION_CAP, OFFSET_FETCH_VERSION_CAP,
};
use crate::metadata::{MetadataRefresh, refresh_metadata};
use crate::network::{BrokerConnection, connect_to_any_bootstrap, duration_to_i32_ms};
use crate::telemetry;
use crate::types::{
AssignedPartition, CommitOffset, ConsumerGroupMetadata, ConsumerRebalanceEvent, ConsumerRecord,
ConsumerRecords, RecordHeader, TopicPartition, TopicPartitionInfo, TopicPartitionKey,
TopicPartitionOffset, TopicPartitionOffsetAndTimestamp, TopicPartitionTimestamp,
};
use crate::{
BrokerError, CancellationToken, ConsumerError, Error, ProtocolError, Result as ClientResult,
};
#[derive(Debug, Clone, Default)]
pub enum ConsumerSubscription {
#[default]
None,
Topics(BTreeSet<String>),
Regex {
regex: regex::Regex,
},
Pattern(String),
}
pub struct ConsumerRuntime {
pub config: ConsumerConfig,
pub connections: ConsumerConnectionState,
pub assignment_state: ConsumerAssignmentState,
pub heartbeat_state: HeartbeatState,
pub poll_state: ConsumerPollState,
pub lifecycle: ConsumerLifecycleState,
}
impl ConsumerRuntime {
pub fn new(config: ConsumerConfig) -> Self {
Self {
config,
connections: ConsumerConnectionState::default(),
assignment_state: ConsumerAssignmentState::default(),
heartbeat_state: HeartbeatState::new(),
poll_state: ConsumerPollState::default(),
lifecycle: ConsumerLifecycleState::new(),
}
}
pub async fn run(mut self, mut rx: mpsc::Receiver<ConsumerRuntimeEvent>) {
let client_id = self.config.client_id.clone();
let group_id = self.config.group_id.clone();
async move {
let mut running = true;
while running {
let wake = ConsumerRequestManagers::next_wakeup(&self);
tokio::select! {
biased;
maybe_event = rx.recv() => {
match maybe_event {
Some(event) => self.lifecycle.runtime_events.push_back(event),
None => {
debug!("consumer application event channel closed");
self.lifecycle.shutting_down = true;
}
}
}
_ = sleep(wake) => {}
}
match self.run_once(&mut rx).await {
Ok(should_continue) => running = should_continue,
Err(error) => {
if is_retriable_error(&error) {
debug!(error = %error, "consumer network action failed with retriable error");
continue;
}
let client_error = into_client_error(error);
warn!(
error = %client_error,
shutting_down = self.lifecycle.shutting_down,
"consumer runtime encountered non-retriable error"
);
let error_text = format!("{client_error:#}");
self.fail_pending_poll(client_error);
self.fail_pending_commits(&error_text);
if self.lifecycle.shutting_down {
self.finish_shutdown_with_error(&error_text);
running = false;
}
}
}
}
self.fail_pending_commits("consumer runtime stopped");
if self.lifecycle.close_reply.is_some() {
self.finish_shutdown_with_error("consumer runtime stopped");
}
}
.instrument(tracing::debug_span!(
"consumer_runtime",
%client_id,
%group_id
))
.await;
}
async fn run_once(&mut self, rx: &mut mpsc::Receiver<ConsumerRuntimeEvent>) -> AnyResult<bool> {
self.drain_runtime_events(rx);
self.process_runtime_events().await?;
self.maybe_enqueue_auto_commit();
let actions = ConsumerRequestManagers::poll(self);
for action in actions {
self.execute_network_action(action).await?;
}
self.maybe_reconcile_pending_assignment().await?;
self.maybe_complete_poll();
Ok(self.finish_shutdown_if_ready())
}
pub(crate) fn has_group_subscription(&self) -> bool {
!matches!(
self.assignment_state.group_subscription,
ConsumerSubscription::None
)
}
pub(crate) fn is_pattern_subscription(&self) -> bool {
matches!(
self.assignment_state.group_subscription,
ConsumerSubscription::Regex { .. } | ConsumerSubscription::Pattern(_)
)
}
pub(crate) fn subscribed_topic_names(&self) -> BTreeSet<String> {
match &self.assignment_state.group_subscription {
ConsumerSubscription::Topics(topics) => topics.clone(),
ConsumerSubscription::Regex { regex, .. } => self
.connections
.metadata
.topic_names()
.into_iter()
.filter(|topic| regex.is_match(topic))
.collect(),
ConsumerSubscription::Pattern(_) | ConsumerSubscription::None => BTreeSet::new(),
}
}
pub(crate) fn subscription_pattern(&self) -> Option<&str> {
match &self.assignment_state.group_subscription {
ConsumerSubscription::Pattern(pattern) => Some(pattern.as_str()),
ConsumerSubscription::Topics(_)
| ConsumerSubscription::Regex { .. }
| ConsumerSubscription::None => None,
}
}
pub(crate) fn current_assignment(&self) -> BTreeSet<TopicPartition> {
self.assignment_state
.assignment
.keys()
.map(|key| TopicPartition::new(key.topic.clone(), key.partition))
.collect()
}
pub(crate) fn has_rebalance_listener(&self) -> bool {
self.config.rebalance_listener.is_some()
}
pub(crate) fn notify_rebalance(&self, event: ConsumerRebalanceEvent) {
if event.partitions().is_empty() {
return;
}
if let Some(listener) = &self.config.rebalance_listener {
let event_name = match &event {
ConsumerRebalanceEvent::Revoked(_) => "revoked",
ConsumerRebalanceEvent::Assigned(_) => "assigned",
ConsumerRebalanceEvent::Lost(_) => "lost",
};
let partition_count = event.partitions().len();
if catch_unwind(AssertUnwindSafe(|| listener.on_rebalance(event))).is_err() {
warn!(
event = event_name,
partition_count, "consumer rebalance listener panicked"
);
}
}
}
pub(crate) fn notify_rebalance_revoked(&self, partitions: Vec<TopicPartition>) {
self.notify_rebalance(ConsumerRebalanceEvent::Revoked(partitions));
}
pub(crate) fn notify_rebalance_revoked_with(
&self,
partitions: impl FnOnce() -> Vec<TopicPartition>,
) {
if self.has_rebalance_listener() {
self.notify_rebalance_revoked(partitions());
}
}
pub(crate) fn notify_rebalance_assigned(&self, partitions: Vec<TopicPartition>) {
self.notify_rebalance(ConsumerRebalanceEvent::Assigned(partitions));
}
pub(crate) fn notify_rebalance_assigned_with(
&self,
partitions: impl FnOnce() -> Vec<TopicPartition>,
) {
if self.has_rebalance_listener() {
self.notify_rebalance_assigned(partitions());
}
}
pub(crate) fn notify_rebalance_lost(&self, partitions: Vec<TopicPartition>) {
self.notify_rebalance(ConsumerRebalanceEvent::Lost(partitions));
}
pub(crate) fn notify_rebalance_lost_with(
&self,
partitions: impl FnOnce() -> Vec<TopicPartition>,
) {
if self.has_rebalance_listener() {
self.notify_rebalance_lost(partitions());
}
}
}
fn consumer_event_name(event: &ConsumerRuntimeEvent) -> &'static str {
match event {
ConsumerRuntimeEvent::WarmUp { .. } => "warm_up",
ConsumerRuntimeEvent::Subscribe { .. } => "subscribe",
ConsumerRuntimeEvent::SubscribePattern { .. } => "subscribe_pattern",
ConsumerRuntimeEvent::SubscribeRegex { .. } => "subscribe_regex",
ConsumerRuntimeEvent::Unsubscribe { .. } => "unsubscribe",
ConsumerRuntimeEvent::Assign { .. } => "assign",
ConsumerRuntimeEvent::Poll { .. } => "poll",
ConsumerRuntimeEvent::Seek { .. } => "seek",
ConsumerRuntimeEvent::SeekToBeginning { .. } => "seek_to_beginning",
ConsumerRuntimeEvent::SeekToEnd { .. } => "seek_to_end",
ConsumerRuntimeEvent::SeekToTimestamp { .. } => "seek_to_timestamp",
ConsumerRuntimeEvent::Position { .. } => "position",
ConsumerRuntimeEvent::Pause { .. } => "pause",
ConsumerRuntimeEvent::Resume { .. } => "resume",
ConsumerRuntimeEvent::GroupMetadata { .. } => "group_metadata",
ConsumerRuntimeEvent::Assignment { .. } => "assignment",
ConsumerRuntimeEvent::Committed { .. } => "committed",
ConsumerRuntimeEvent::BeginningOffsets { .. } => "beginning_offsets",
ConsumerRuntimeEvent::EndOffsets { .. } => "end_offsets",
ConsumerRuntimeEvent::OffsetsForTimes { .. } => "offsets_for_times",
ConsumerRuntimeEvent::PartitionsFor { .. } => "partitions_for",
ConsumerRuntimeEvent::ListTopics { .. } => "list_topics",
ConsumerRuntimeEvent::Commit { .. } => "commit",
ConsumerRuntimeEvent::Wakeup => "wakeup",
ConsumerRuntimeEvent::Shutdown { .. } => "shutdown",
}
}
fn consumer_network_action_name(action: &ConsumerNetworkAction) -> &'static str {
match action {
ConsumerNetworkAction::RefreshMetadata => "refresh_metadata",
ConsumerNetworkAction::EnsureCoordinator => "ensure_coordinator",
ConsumerNetworkAction::Heartbeat => "heartbeat",
ConsumerNetworkAction::Commit => "commit",
ConsumerNetworkAction::Fetch { .. } => "fetch",
ConsumerNetworkAction::LeaveGroup => "leave_group",
}
}
pub(crate) fn into_client_error(error: anyhow::Error) -> Error {
match error.downcast::<Error>() {
Ok(error) => error,
Err(error) => match error.downcast::<ResponseError>() {
Ok(error) => Error::Broker(BrokerError::response("consumer", None::<String>, error)),
Err(error) => Error::Internal(error),
},
}
}
pub(crate) fn broker_response_error(
operation: &'static str,
resource: impl Into<Option<String>>,
error: ResponseError,
) -> Error {
Error::Broker(BrokerError::response(operation, resource, error))
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use std::sync::{Arc, Mutex};
use kafka_protocol::error::ResponseError;
use tokio::sync::oneshot;
use super::*;
use crate::consumer::scheduler::{ConsumerNetworkAction, ConsumerRequestManagers};
use crate::types::ConsumerRebalanceListener;
#[test]
fn into_client_error_preserves_broker_response_errors() {
let error = into_client_error(ResponseError::GroupAuthorizationFailed.into());
match error {
Error::Broker(BrokerError::Response {
operation,
name,
retriable,
..
}) => {
assert_eq!(operation, "consumer");
assert_eq!(name, "GroupAuthorizationFailed");
assert!(!retriable);
}
error => panic!("expected broker error, got {error:?}"),
}
}
#[test]
fn into_client_error_preserves_explicit_broker_context() {
let error = into_client_error(
broker_response_error(
"fetch",
Some("topic-a:2".to_owned()),
ResponseError::UnknownTopicOrPartition,
)
.into(),
);
match error {
Error::Broker(BrokerError::Response {
operation,
resource,
name,
retriable,
..
}) => {
assert_eq!(operation, "fetch");
assert_eq!(resource, " for topic-a:2");
assert_eq!(name, "UnknownTopicOrPartition");
assert!(retriable);
}
error => panic!("expected broker error, got {error:?}"),
}
}
#[test]
fn heartbeat_request_sends_full_state_on_join_then_delta() {
let config = ConsumerConfig::new("localhost:9092", "group-a")
.with_server_assignor("uniform")
.with_rack_id("rack-a");
let mut runtime = ConsumerRuntime::new(config);
runtime.assignment_state.group_subscription =
ConsumerSubscription::Topics(BTreeSet::from(["topic-a".to_owned()]));
let first = runtime.build_heartbeat_request(1).unwrap();
assert_eq!(first.member_epoch, 0);
assert!(first.subscribed_topic_names.is_some());
assert!(first.server_assignor.is_some());
assert!(first.rebalance_timeout_ms > 0);
runtime.heartbeat_state.member_epoch = 3;
let second = runtime.build_heartbeat_request(1).unwrap();
assert!(second.subscribed_topic_names.is_none());
assert!(second.server_assignor.is_none());
assert_eq!(second.rebalance_timeout_ms, -1);
}
#[test]
fn heartbeat_request_sends_regex_subscription() {
let mut runtime = ConsumerRuntime::new(ConsumerConfig::new("localhost:9092", "group-a"));
runtime.assignment_state.group_subscription =
ConsumerSubscription::Pattern("topic-.*".to_owned());
let request = runtime.build_heartbeat_request(1).unwrap();
assert_eq!(request.subscribed_topic_names, Some(Vec::new()));
assert_eq!(
request
.subscribed_topic_regex
.as_ref()
.map(ToString::to_string),
Some("topic-.*".to_owned())
);
}
#[test]
fn subscription_changes_keep_live_member_epoch() {
let mut runtime = ConsumerRuntime::new(ConsumerConfig::new("localhost:9092", "group-a"));
runtime.heartbeat_state.member_epoch = 3;
runtime
.subscribe_pattern(crate::types::SubscriptionPattern::new("topic-.*"))
.unwrap();
assert_eq!(runtime.heartbeat_state.member_epoch, 3);
let request = runtime.build_heartbeat_request(1).unwrap();
assert_eq!(request.member_epoch, 3);
assert_eq!(
request
.subscribed_topic_regex
.as_ref()
.map(ToString::to_string),
Some("topic-.*".to_owned())
);
}
#[test]
fn static_members_use_java_leave_epoch() {
let static_runtime = ConsumerRuntime::new(
ConsumerConfig::new("localhost:9092", "group-a").with_instance_id("instance-a"),
);
assert_eq!(static_runtime.leave_group_epoch(), -2);
let dynamic_runtime =
ConsumerRuntime::new(ConsumerConfig::new("localhost:9092", "group-a"));
assert_eq!(dynamic_runtime.leave_group_epoch(), -1);
}
#[test]
fn rebalance_listener_receives_events() {
let events = Arc::new(Mutex::new(Vec::new()));
let captured = Arc::clone(&events);
let runtime = ConsumerRuntime::new(
ConsumerConfig::new("localhost:9092", "group-a").with_rebalance_listener(
ConsumerRebalanceListener::new(move |event| {
captured.lock().expect("events lock").push(event);
}),
),
);
runtime.notify_rebalance_revoked(vec![TopicPartition::new("topic-a", 0)]);
runtime.notify_rebalance_assigned(vec![TopicPartition::new("topic-a", 1)]);
runtime.notify_rebalance_lost(vec![TopicPartition::new("topic-a", 2)]);
let events = events.lock().expect("events lock");
assert_eq!(events.len(), 3);
assert!(
matches!(&events[0], ConsumerRebalanceEvent::Revoked(partitions) if partitions == &vec![TopicPartition::new("topic-a", 0)])
);
assert!(
matches!(&events[1], ConsumerRebalanceEvent::Assigned(partitions) if partitions == &vec![TopicPartition::new("topic-a", 1)])
);
assert!(
matches!(&events[2], ConsumerRebalanceEvent::Lost(partitions) if partitions == &vec![TopicPartition::new("topic-a", 2)])
);
}
#[test]
fn rebalance_listener_panic_does_not_escape_runtime() {
let runtime = ConsumerRuntime::new(
ConsumerConfig::new("localhost:9092", "group-a")
.with_rebalance_callback(|_| panic!("callback failed")),
);
runtime.notify_rebalance_assigned(vec![TopicPartition::new("topic-a", 0)]);
}
#[test]
fn shutdown_drains_commits_before_leave() {
let mut runtime = ConsumerRuntime::new(ConsumerConfig::new("localhost:9092", "group-a"));
runtime.lifecycle.shutting_down = true;
runtime.heartbeat_state.member_epoch = 4;
runtime.lifecycle.pending_commits.push_back(PendingCommit {
offsets: vec![CommitOffset {
topic: "topic-a".to_owned(),
partition: 0,
offset: 12,
}],
cancellation: None,
reply: None,
kind: CommitKind::Manual,
});
let actions = ConsumerRequestManagers::poll(&runtime);
assert_eq!(actions.len(), 1);
assert!(matches!(
actions[0],
ConsumerNetworkAction::EnsureCoordinator
));
assert!(
actions
.iter()
.all(|action| !matches!(action, ConsumerNetworkAction::LeaveGroup))
);
}
#[test]
fn fatal_consumer_state_clears_group_ownership_and_stops_scheduling() {
let mut runtime = ConsumerRuntime::new(
ConsumerConfig::new("localhost:9092", "group-a").with_instance_id("instance-a"),
);
runtime.assignment_state.group_subscription =
ConsumerSubscription::Topics(BTreeSet::from(["topic-a".to_owned()]));
runtime.heartbeat_state.member_epoch = 3;
let key = TopicPartitionKey::new("topic-a".to_owned(), 0);
runtime
.assignment_state
.paused_partitions
.insert(key.clone());
runtime.poll_state.delivered_offsets.insert(key.clone(), 4);
runtime.assignment_state.assignment.insert(
key.clone(),
AssignedPartition {
key,
topic_id: uuid::Uuid::new_v4(),
leader_id: 1,
leader_epoch: 7,
fetch_offset: 4,
},
);
runtime.lifecycle.pending_commits.push_back(PendingCommit {
offsets: vec![CommitOffset {
topic: "topic-a".to_owned(),
partition: 0,
offset: 5,
}],
cancellation: None,
reply: None,
kind: CommitKind::Manual,
});
runtime.enter_fatal_state(Error::Consumer(ConsumerError::FencedInstanceId {
instance_id: "instance-a".to_owned(),
message: "fenced".to_owned(),
}));
assert!(runtime.lifecycle.fatal_error.is_some());
assert!(runtime.assignment_state.assignment.is_empty());
assert!(runtime.assignment_state.pending_assignment.is_none());
assert!(runtime.assignment_state.paused_partitions.is_empty());
assert!(runtime.poll_state.delivered_offsets.is_empty());
assert!(runtime.lifecycle.pending_commits.is_empty());
assert_eq!(runtime.heartbeat_state.member_epoch, 0);
assert!(runtime.heartbeat_state.next_heartbeat.is_none());
assert!(ConsumerRequestManagers::poll(&runtime).is_empty());
}
#[tokio::test]
async fn fatal_consumer_state_rejects_future_events() {
let mut runtime = ConsumerRuntime::new(
ConsumerConfig::new("localhost:9092", "group-a").with_instance_id("instance-a"),
);
runtime.enter_fatal_state(Error::Consumer(ConsumerError::UnreleasedInstanceId {
instance_id: "instance-a".to_owned(),
message: "still owned".to_owned(),
}));
let (reply, result) = oneshot::channel();
runtime
.lifecycle
.runtime_events
.push_back(ConsumerRuntimeEvent::Assignment {
cancellation: None,
reply,
});
runtime.process_runtime_events().await.unwrap();
let error = result.await.unwrap().unwrap_err();
assert!(error.to_string().contains("still owned"));
}
}