use super::*;
const RECORD_BATCH_BASE_OFFSET_BYTES: usize = 8;
const RECORD_BATCH_LENGTH_BYTES: usize = 4;
const RECORD_BATCH_LENGTH_OFFSET: usize = RECORD_BATCH_BASE_OFFSET_BYTES;
const RECORD_BATCH_LENGTH_END: usize = RECORD_BATCH_LENGTH_OFFSET + RECORD_BATCH_LENGTH_BYTES;
const RECORD_BATCH_LAST_OFFSET_DELTA_OFFSET: usize = 23;
const RECORD_BATCH_LAST_OFFSET_DELTA_END: usize = RECORD_BATCH_LAST_OFFSET_DELTA_OFFSET + 4;
const RECORD_BATCH_HEADER_MIN_BYTES: usize = 17;
const RECORD_BATCH_TOTAL_LENGTH_OVERHEAD: usize =
RECORD_BATCH_BASE_OFFSET_BYTES + RECORD_BATCH_LENGTH_BYTES;
impl ConsumerRuntime {
pub(crate) async fn fetch_from_leader(
&mut self,
leader_id: i32,
partitions: Vec<AssignedPartition>,
) -> AnyResult<()> {
trace!(
leader_id,
partition_count = partitions.len(),
"fetching records from leader"
);
let client_id = self.config.client_id.clone();
let version = {
let connection = self.leader_connection(leader_id).await?;
connection
.version_with_cap::<kafka_protocol::messages::FetchRequest>(FETCH_VERSION_CAP)?
};
let request = build_fetch_request(&partitions, version, &self.config)?;
let requested_offsets = partitions
.iter()
.map(|partition| (partition.key.clone(), partition.fetch_offset))
.collect::<HashMap<_, _>>();
let connection = self.leader_connection(leader_id).await?;
let response: FetchResponse = match connection
.send_request::<kafka_protocol::messages::FetchRequest>(&client_id, version, &request)
.await
{
Ok(response) => response,
Err(error) => {
self.connections.leader_connections.remove(&leader_id);
for partition in &partitions {
self.connections
.metadata
.invalidate_topic(&partition.key.topic);
}
return Err(error);
}
};
if let Some(error) = response.error_code.err() {
if error.is_retriable() {
self.connections.metadata.invalidate_all();
self.connections.leader_connections.clear();
return Err(error.into());
}
return Err(broker_response_error("fetch", None::<String>, error).into());
}
self.process_fetch_response(response, version, &requested_offsets)
.await?;
Ok(())
}
pub(crate) async fn process_fetch_response(
&mut self,
response: FetchResponse,
version: i16,
requested_offsets: &HashMap<TopicPartitionKey, i64>,
) -> AnyResult<()> {
let mut fetched = Vec::new();
let mut fetched_partition_count = 0usize;
for topic in response.responses {
let topic_name = if version >= 13 && !topic.topic_id.is_nil() {
self.connections
.metadata
.topic_name(&topic.topic_id)
.cloned()
.unwrap_or_else(|| topic.topic.0.to_string())
} else {
topic.topic.0.to_string()
};
for partition in topic.partitions {
let key = TopicPartitionKey::new(topic_name.clone(), partition.partition_index);
if let Some(error) = partition.error_code.err() {
if error == ResponseError::OffsetOutOfRange {
if let Some(requested_offset) = requested_offsets.get(&key).copied() {
self.reset_out_of_range_fetch_offset(key, requested_offset)
.await?;
}
continue;
}
if error.is_retriable() {
self.connections.metadata.invalidate_topic(&topic_name);
return Err(error.into());
}
return Err(broker_response_error(
"fetch",
Some(format!("{}:{}", topic_name, partition.partition_index)),
error,
)
.into());
}
let Some(records) = partition.records else {
continue;
};
if records.is_empty() {
continue;
}
let fetched_before = fetched.len();
let mut aborted_transactions = partition.aborted_transactions.unwrap_or_default();
aborted_transactions.sort_by_key(|transaction| transaction.first_offset);
let mut next_aborted_transaction = 0usize;
let mut aborted_producer_ids = std::collections::HashSet::<i64>::new();
let mut bytes = records;
let mut batches = Vec::new();
while bytes.has_remaining() {
let remaining = bytes.remaining();
if remaining < RECORD_BATCH_HEADER_MIN_BYTES {
trace!(
topic = %topic_name,
partition = partition.partition_index,
trailing_bytes = remaining,
"ignoring incomplete trailing fetch record bytes"
);
break;
}
let chunk = bytes.chunk();
let batch_next_offset = record_batch_next_offset(chunk);
if chunk.len() >= RECORD_BATCH_HEADER_MIN_BYTES {
let batch_len = i32::from_be_bytes(
chunk[RECORD_BATCH_LENGTH_OFFSET..RECORD_BATCH_LENGTH_END]
.try_into()
.expect("record batch length slice is exactly four bytes"),
);
if batch_len < 0 {
bail!(
"negative fetch record batch length {} for {}:{}",
batch_len,
topic_name,
partition.partition_index
);
}
let batch_total_len =
RECORD_BATCH_TOTAL_LENGTH_OVERHEAD + batch_len as usize;
if batch_total_len > remaining {
trace!(
topic = %topic_name,
partition = partition.partition_index,
trailing_bytes = remaining,
batch_total_len,
"leaving partial trailing fetch record batch for next fetch"
);
break;
}
}
let batch = RecordBatchDecoder::decode(&mut bytes).with_context(|| {
format!(
"decode fetch records for {}:{} with {} bytes remaining",
topic_name, partition.partition_index, remaining
)
})?;
let batch_next_offset = batch_next_offset.or_else(|| {
batch
.records
.iter()
.map(|record| record.offset)
.max()
.map(|offset| offset + 1)
});
batches.push((batch, batch_next_offset));
}
for (batch, batch_next_offset) in batches {
let batch_last_offset = batch_next_offset
.map(|offset| offset - 1)
.or_else(|| batch.records.iter().map(|record| record.offset).max());
if self.config.isolation_level == crate::config::IsolationLevel::ReadCommitted {
if let Some(batch_last_offset) = batch_last_offset {
while next_aborted_transaction < aborted_transactions.len()
&& aborted_transactions[next_aborted_transaction].first_offset
<= batch_last_offset
{
aborted_producer_ids.insert(
*aborted_transactions[next_aborted_transaction].producer_id,
);
next_aborted_transaction += 1;
}
}
if let Some(producer_id) =
batch.records.first().map(|record| record.producer_id)
{
if batch.records.iter().any(is_abort_marker) {
aborted_producer_ids.remove(&producer_id);
} else if batch
.records
.first()
.is_some_and(|record| record.transactional)
&& aborted_producer_ids.contains(&producer_id)
{
if let Some(batch_next_offset) = batch_next_offset {
self.advance_fetch_offset(&key, batch_next_offset);
}
continue;
}
}
}
for record in batch.records {
if record.control {
continue;
}
let offset = record.offset;
fetched.push(ConsumerRecord {
topic: topic_name.clone(),
partition: partition.partition_index,
offset,
timestamp: record.timestamp,
headers: record
.headers
.into_iter()
.map(|(key, value)| RecordHeader {
key: key.to_string(),
value,
})
.collect(),
key: record.key,
value: record.value,
});
if let Some(assigned) = self.assignment_state.assignment.get_mut(&key) {
assigned.fetch_offset = offset + 1;
}
}
if let Some(batch_next_offset) = batch_next_offset {
self.advance_fetch_offset(&key, batch_next_offset);
}
}
if fetched.len() > fetched_before {
fetched_partition_count += 1;
}
if let Some(assigned) = self.assignment_state.assignment.get_mut(&key) {
assigned.leader_epoch = partition.current_leader.leader_epoch;
}
}
}
let fetched_records = fetched.len();
self.poll_state.buffered_records.extend(fetched);
telemetry::record_consumer_records_fetched(
&self.config.client_id,
&self.config.group_id,
fetched_records,
fetched_partition_count,
self.poll_state.buffered_records.len(),
);
debug!(
fetched_records,
fetched_partition_count,
buffered_records = self.poll_state.buffered_records.len(),
"consumer fetch buffered records"
);
Ok(())
}
async fn reset_out_of_range_fetch_offset(
&mut self,
key: TopicPartitionKey,
requested_offset: i64,
) -> AnyResult<()> {
let Some(assigned) = self.assignment_state.assignment.get(&key) else {
debug!(
topic = %key.topic,
partition = key.partition,
requested_offset,
"discarding out-of-range fetch response for unassigned partition"
);
return Ok(());
};
if assigned.fetch_offset != requested_offset {
debug!(
topic = %key.topic,
partition = key.partition,
requested_offset,
current_offset = assigned.fetch_offset,
"discarding stale out-of-range fetch response"
);
return Ok(());
}
let reset_timestamp = match self.config.auto_offset_reset {
AutoOffsetReset::Earliest => LIST_OFFSETS_EARLIEST,
AutoOffsetReset::Latest => LIST_OFFSETS_LATEST,
};
let reset_offsets = self
.lookup_offsets_at_timestamp(std::slice::from_ref(&key), reset_timestamp)
.await?;
let reset_offset = reset_offsets.get(&key).copied().with_context(|| {
format!(
"missing reset offset for out-of-range fetch on {}:{}",
key.topic, key.partition
)
})?;
if let Some(assigned) = self.assignment_state.assignment.get_mut(&key)
&& assigned.fetch_offset == requested_offset
{
assigned.fetch_offset = reset_offset;
self.poll_state
.delivered_offsets
.insert(key.clone(), reset_offset);
telemetry::record_consumer_offset_reset(
&self.config.client_id,
&self.config.group_id,
&key.topic,
key.partition,
);
debug!(
topic = %key.topic,
partition = key.partition,
requested_offset,
reset_offset,
reset_policy = ?self.config.auto_offset_reset,
"reset out-of-range fetch offset"
);
}
Ok(())
}
fn advance_fetch_offset(&mut self, key: &TopicPartitionKey, next_offset: i64) {
if let Some(assigned) = self.assignment_state.assignment.get_mut(key)
&& assigned.fetch_offset < next_offset
{
assigned.fetch_offset = next_offset;
}
}
}
fn record_batch_next_offset(chunk: &[u8]) -> Option<i64> {
if chunk.len() < RECORD_BATCH_LAST_OFFSET_DELTA_END {
return None;
}
let base_offset = i64::from_be_bytes(chunk[..RECORD_BATCH_BASE_OFFSET_BYTES].try_into().ok()?);
let last_offset_delta = i32::from_be_bytes(
chunk[RECORD_BATCH_LAST_OFFSET_DELTA_OFFSET..RECORD_BATCH_LAST_OFFSET_DELTA_END]
.try_into()
.ok()?,
);
Some(base_offset + i64::from(last_offset_delta) + 1)
}
fn is_abort_marker(record: &kafka_protocol::records::Record) -> bool {
if !record.control {
return false;
}
let Some(key) = record.key.as_ref() else {
return false;
};
if key.len() < 4 {
return false;
}
let version = i16::from_be_bytes([key[0], key[1]]);
let control_type = i16::from_be_bytes([key[2], key[3]]);
version >= 0 && control_type == 0
}
#[cfg(test)]
mod tests {
use bytes::{Bytes, BytesMut};
use kafka_protocol::messages::fetch_response::{
AbortedTransaction, FetchableTopicResponse, PartitionData,
};
use kafka_protocol::messages::{ProducerId, TopicName};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::records::{
Compression, NO_PRODUCER_EPOCH, NO_PRODUCER_ID, Record, RecordBatchEncoder,
RecordEncodeOptions, TimestampType,
};
use super::*;
#[tokio::test]
async fn fetch_response_advances_position_after_control_batch() {
let topic = "topic-a";
let key = TopicPartitionKey::new(topic.to_owned(), 0);
let mut runtime = runtime_with_assignment(key.clone(), 5);
let records = encode_records([record_at(5, true, false, NO_PRODUCER_ID)]);
let response = fetch_response(topic, records, None);
runtime
.process_fetch_response(response, 12, &HashMap::from([(key.clone(), 5)]))
.await
.unwrap();
assert!(runtime.poll_state.buffered_records.is_empty());
assert_eq!(
runtime
.assignment_state
.assignment
.get(&key)
.expect("partition should remain assigned")
.fetch_offset,
6
);
}
#[tokio::test]
async fn fetch_response_advances_position_after_aborted_batch() {
let topic = "topic-a";
let key = TopicPartitionKey::new(topic.to_owned(), 0);
let mut runtime = runtime_with_assignment(key.clone(), 7);
runtime.config = runtime
.config
.clone()
.with_isolation_level(crate::config::IsolationLevel::ReadCommitted);
let records = encode_records([record_at(7, false, true, 42)]);
let aborted = vec![
AbortedTransaction::default()
.with_producer_id(ProducerId(42))
.with_first_offset(7),
];
let response = fetch_response(topic, records, Some(aborted));
runtime
.process_fetch_response(response, 12, &HashMap::from([(key.clone(), 7)]))
.await
.unwrap();
assert!(runtime.poll_state.buffered_records.is_empty());
assert_eq!(
runtime
.assignment_state
.assignment
.get(&key)
.expect("partition should remain assigned")
.fetch_offset,
8
);
}
#[tokio::test]
async fn read_committed_abort_marker_clears_aborted_producer_state() {
let topic = "topic-a";
let key = TopicPartitionKey::new(topic.to_owned(), 0);
let mut runtime = runtime_with_assignment(key.clone(), 0);
runtime.config = runtime
.config
.clone()
.with_isolation_level(crate::config::IsolationLevel::ReadCommitted);
let records = encode_records([
record_at(0, false, true, 42),
abort_marker_at(1, 42),
record_at(2, false, true, 42),
]);
let aborted = vec![
AbortedTransaction::default()
.with_producer_id(ProducerId(42))
.with_first_offset(0),
];
let response = fetch_response(topic, records, Some(aborted));
runtime
.process_fetch_response(response, 12, &HashMap::from([(key.clone(), 0)]))
.await
.unwrap();
let records = std::mem::take(&mut runtime.poll_state.buffered_records);
assert_eq!(records.len(), 1);
assert_eq!(records[0].offset, 2);
assert_eq!(records[0].value.as_deref(), Some(&b"value"[..]));
assert_eq!(
runtime
.assignment_state
.assignment
.get(&key)
.expect("partition should remain assigned")
.fetch_offset,
3
);
}
#[test]
fn record_batch_next_offset_reads_base_offset_and_delta() {
let mut chunk = vec![0; RECORD_BATCH_LAST_OFFSET_DELTA_END];
chunk[..RECORD_BATCH_BASE_OFFSET_BYTES].copy_from_slice(&10_i64.to_be_bytes());
chunk[RECORD_BATCH_LAST_OFFSET_DELTA_OFFSET..RECORD_BATCH_LAST_OFFSET_DELTA_END]
.copy_from_slice(&4_i32.to_be_bytes());
assert_eq!(record_batch_next_offset(&chunk), Some(15));
}
fn runtime_with_assignment(key: TopicPartitionKey, fetch_offset: i64) -> ConsumerRuntime {
let mut runtime = ConsumerRuntime::new(ConsumerConfig::new("localhost:9092", "group-a"));
runtime.assignment_state.assignment.insert(
key.clone(),
AssignedPartition {
key,
topic_id: uuid::Uuid::nil(),
leader_id: 1,
leader_epoch: 0,
fetch_offset,
},
);
runtime
}
fn fetch_response(
topic: &str,
records: Bytes,
aborted_transactions: Option<Vec<AbortedTransaction>>,
) -> FetchResponse {
FetchResponse::default().with_responses(vec![
FetchableTopicResponse::default()
.with_topic(TopicName(StrBytes::from_string(topic.to_owned())))
.with_partitions(vec![
PartitionData::default()
.with_partition_index(0)
.with_records(Some(records))
.with_aborted_transactions(aborted_transactions),
]),
])
}
fn encode_records<const N: usize>(records: [Record; N]) -> Bytes {
let mut encoded = BytesMut::new();
RecordBatchEncoder::encode(
&mut encoded,
records.iter(),
&RecordEncodeOptions {
version: 2,
compression: Compression::None,
},
)
.unwrap();
encoded.freeze()
}
fn record_at(offset: i64, control: bool, transactional: bool, producer_id: i64) -> Record {
Record {
transactional,
control,
partition_leader_epoch: 0,
producer_id,
producer_epoch: if producer_id == NO_PRODUCER_ID {
NO_PRODUCER_EPOCH
} else {
0
},
timestamp_type: TimestampType::Creation,
offset,
sequence: 0,
timestamp: 0,
key: None,
value: Some(Bytes::from_static(b"value")),
headers: Default::default(),
}
}
fn abort_marker_at(offset: i64, producer_id: i64) -> Record {
let mut record = record_at(offset, true, true, producer_id);
record.key = Some(Bytes::from_static(&[0, 0, 0, 0]));
record.value = None;
record
}
}