use super::*;
const RECORD_BATCH_BASE_OFFSET_BYTES: usize = 8;
const RECORD_BATCH_LENGTH_BYTES: usize = 4;
const RECORD_BATCH_LENGTH_OFFSET: usize = RECORD_BATCH_BASE_OFFSET_BYTES;
const RECORD_BATCH_LENGTH_END: usize = RECORD_BATCH_LENGTH_OFFSET + RECORD_BATCH_LENGTH_BYTES;
const RECORD_BATCH_HEADER_MIN_BYTES: usize = 17;
const RECORD_BATCH_TOTAL_LENGTH_OVERHEAD: usize =
RECORD_BATCH_BASE_OFFSET_BYTES + RECORD_BATCH_LENGTH_BYTES;
impl ConsumerRuntime {
pub(crate) async fn fetch_from_leader(
&mut self,
leader_id: i32,
partitions: Vec<AssignedPartition>,
) -> AnyResult<()> {
trace!(
leader_id,
partition_count = partitions.len(),
"fetching records from leader"
);
let client_id = self.config.client_id.clone();
let version = {
let connection = self.leader_connection(leader_id).await?;
connection
.version_with_cap::<kafka_protocol::messages::FetchRequest>(FETCH_VERSION_CAP)?
};
let request = build_fetch_request(&partitions, version, &self.config)?;
let connection = self.leader_connection(leader_id).await?;
let response: FetchResponse = connection
.send_request::<kafka_protocol::messages::FetchRequest>(&client_id, version, &request)
.await?;
if let Some(error) = response.error_code.err() {
if error.is_retriable() {
self.connections.metadata.invalidate_all();
self.connections.leader_connections.clear();
return Err(error.into());
}
bail!("fetch failed: {error}");
}
self.process_fetch_response(response, version)?;
Ok(())
}
pub(crate) fn process_fetch_response(
&mut self,
response: FetchResponse,
version: i16,
) -> AnyResult<()> {
let mut fetched = Vec::new();
for topic in response.responses {
let topic_name = if version >= 13 && !topic.topic_id.is_nil() {
self.connections
.metadata
.topic_name(&topic.topic_id)
.cloned()
.unwrap_or_else(|| topic.topic.0.to_string())
} else {
topic.topic.0.to_string()
};
for partition in topic.partitions {
let key = TopicPartitionKey::new(topic_name.clone(), partition.partition_index);
if let Some(error) = partition.error_code.err() {
if error.is_retriable() {
self.connections.metadata.invalidate_topic(&topic_name);
return Err(error.into());
}
bail!(
"fetch failed for {}:{}: {}",
topic_name,
partition.partition_index,
error
);
}
let Some(records) = partition.records else {
continue;
};
if records.is_empty() {
continue;
}
let aborted_transactions = partition.aborted_transactions.unwrap_or_default();
let mut bytes = records;
let mut batches = Vec::new();
while bytes.has_remaining() {
let remaining = bytes.remaining();
if remaining < RECORD_BATCH_HEADER_MIN_BYTES {
trace!(
topic = %topic_name,
partition = partition.partition_index,
trailing_bytes = remaining,
"ignoring incomplete trailing fetch record bytes"
);
break;
}
let chunk = bytes.chunk();
if chunk.len() >= RECORD_BATCH_HEADER_MIN_BYTES {
let batch_len = i32::from_be_bytes(
chunk[RECORD_BATCH_LENGTH_OFFSET..RECORD_BATCH_LENGTH_END]
.try_into()
.expect("record batch length slice is exactly four bytes"),
);
if batch_len < 0 {
bail!(
"negative fetch record batch length {} for {}:{}",
batch_len,
topic_name,
partition.partition_index
);
}
let batch_total_len =
RECORD_BATCH_TOTAL_LENGTH_OVERHEAD + batch_len as usize;
if batch_total_len > remaining {
trace!(
topic = %topic_name,
partition = partition.partition_index,
trailing_bytes = remaining,
batch_total_len,
"leaving partial trailing fetch record batch for next fetch"
);
break;
}
}
batches.push(RecordBatchDecoder::decode(&mut bytes).with_context(|| {
format!(
"decode fetch records for {}:{} with {} bytes remaining",
topic_name, partition.partition_index, remaining
)
})?);
}
for batch in batches {
for record in batch.records {
if record.control {
continue;
}
if self.config.isolation_level
== crate::config::IsolationLevel::ReadCommitted
&& aborted_transactions.iter().any(|aborted| {
record.transactional
&& record.producer_id == aborted.producer_id
&& record.offset >= aborted.first_offset
})
{
continue;
}
let offset = record.offset;
fetched.push(ConsumerRecord {
topic: topic_name.clone(),
partition: partition.partition_index,
offset,
timestamp: record.timestamp,
headers: record
.headers
.into_iter()
.map(|(key, value)| RecordHeader {
key: key.to_string(),
value,
})
.collect(),
key: record.key,
value: record.value,
});
if let Some(assigned) = self.assignment_state.assignment.get_mut(&key) {
assigned.fetch_offset = offset + 1;
}
}
}
if let Some(assigned) = self.assignment_state.assignment.get_mut(&key) {
assigned.leader_epoch = partition.current_leader.leader_epoch;
}
}
}
self.poll_state.buffered_records.extend(fetched);
debug!(
buffered_records = self.poll_state.buffered_records.len(),
"buffered fetched consumer records"
);
Ok(())
}
}