kafkit-client 0.1.7

Kafka 4.0+ pure Rust client.
Documentation
use super::*;

const RECORD_BATCH_BASE_OFFSET_BYTES: usize = 8;
const RECORD_BATCH_LENGTH_BYTES: usize = 4;
const RECORD_BATCH_LENGTH_OFFSET: usize = RECORD_BATCH_BASE_OFFSET_BYTES;
const RECORD_BATCH_LENGTH_END: usize = RECORD_BATCH_LENGTH_OFFSET + RECORD_BATCH_LENGTH_BYTES;
const RECORD_BATCH_HEADER_MIN_BYTES: usize = 17;
const RECORD_BATCH_TOTAL_LENGTH_OVERHEAD: usize =
    RECORD_BATCH_BASE_OFFSET_BYTES + RECORD_BATCH_LENGTH_BYTES;

impl ConsumerRuntime {
    pub(crate) async fn fetch_from_leader(
        &mut self,
        leader_id: i32,
        partitions: Vec<AssignedPartition>,
    ) -> AnyResult<()> {
        trace!(
            leader_id,
            partition_count = partitions.len(),
            "fetching records from leader"
        );
        let client_id = self.config.client_id.clone();
        let version = {
            let connection = self.leader_connection(leader_id).await?;
            connection
                .version_with_cap::<kafka_protocol::messages::FetchRequest>(FETCH_VERSION_CAP)?
        };
        let request = build_fetch_request(&partitions, version, &self.config)?;
        let connection = self.leader_connection(leader_id).await?;
        let response: FetchResponse = connection
            .send_request::<kafka_protocol::messages::FetchRequest>(&client_id, version, &request)
            .await?;

        if let Some(error) = response.error_code.err() {
            if error.is_retriable() {
                self.connections.metadata.invalidate_all();
                self.connections.leader_connections.clear();
                return Err(error.into());
            }
            bail!("fetch failed: {error}");
        }

        self.process_fetch_response(response, version)?;
        Ok(())
    }

    pub(crate) fn process_fetch_response(
        &mut self,
        response: FetchResponse,
        version: i16,
    ) -> AnyResult<()> {
        let mut fetched = Vec::new();

        for topic in response.responses {
            let topic_name = if version >= 13 && !topic.topic_id.is_nil() {
                self.connections
                    .metadata
                    .topic_name(&topic.topic_id)
                    .cloned()
                    .unwrap_or_else(|| topic.topic.0.to_string())
            } else {
                topic.topic.0.to_string()
            };

            for partition in topic.partitions {
                let key = TopicPartitionKey::new(topic_name.clone(), partition.partition_index);
                if let Some(error) = partition.error_code.err() {
                    if error.is_retriable() {
                        self.connections.metadata.invalidate_topic(&topic_name);
                        return Err(error.into());
                    }
                    bail!(
                        "fetch failed for {}:{}: {}",
                        topic_name,
                        partition.partition_index,
                        error
                    );
                }

                let Some(records) = partition.records else {
                    continue;
                };
                if records.is_empty() {
                    continue;
                }

                let aborted_transactions = partition.aborted_transactions.unwrap_or_default();
                let mut bytes = records;
                let mut batches = Vec::new();
                while bytes.has_remaining() {
                    let remaining = bytes.remaining();
                    if remaining < RECORD_BATCH_HEADER_MIN_BYTES {
                        trace!(
                            topic = %topic_name,
                            partition = partition.partition_index,
                            trailing_bytes = remaining,
                            "ignoring incomplete trailing fetch record bytes"
                        );
                        break;
                    }
                    let chunk = bytes.chunk();
                    if chunk.len() >= RECORD_BATCH_HEADER_MIN_BYTES {
                        let batch_len = i32::from_be_bytes(
                            chunk[RECORD_BATCH_LENGTH_OFFSET..RECORD_BATCH_LENGTH_END]
                                .try_into()
                                .expect("record batch length slice is exactly four bytes"),
                        );
                        if batch_len < 0 {
                            bail!(
                                "negative fetch record batch length {} for {}:{}",
                                batch_len,
                                topic_name,
                                partition.partition_index
                            );
                        }
                        let batch_total_len =
                            RECORD_BATCH_TOTAL_LENGTH_OVERHEAD + batch_len as usize;
                        if batch_total_len > remaining {
                            trace!(
                                topic = %topic_name,
                                partition = partition.partition_index,
                                trailing_bytes = remaining,
                                batch_total_len,
                                "leaving partial trailing fetch record batch for next fetch"
                            );
                            break;
                        }
                    }
                    batches.push(RecordBatchDecoder::decode(&mut bytes).with_context(|| {
                        format!(
                            "decode fetch records for {}:{} with {} bytes remaining",
                            topic_name, partition.partition_index, remaining
                        )
                    })?);
                }
                for batch in batches {
                    for record in batch.records {
                        if record.control {
                            continue;
                        }
                        if self.config.isolation_level
                            == crate::config::IsolationLevel::ReadCommitted
                            && aborted_transactions.iter().any(|aborted| {
                                record.transactional
                                    && record.producer_id == aborted.producer_id
                                    && record.offset >= aborted.first_offset
                            })
                        {
                            continue;
                        }
                        let offset = record.offset;
                        fetched.push(ConsumerRecord {
                            topic: topic_name.clone(),
                            partition: partition.partition_index,
                            offset,
                            timestamp: record.timestamp,
                            headers: record
                                .headers
                                .into_iter()
                                .map(|(key, value)| RecordHeader {
                                    key: key.to_string(),
                                    value,
                                })
                                .collect(),
                            key: record.key,
                            value: record.value,
                        });
                        if let Some(assigned) = self.assignment_state.assignment.get_mut(&key) {
                            assigned.fetch_offset = offset + 1;
                        }
                    }
                }

                if let Some(assigned) = self.assignment_state.assignment.get_mut(&key) {
                    assigned.leader_epoch = partition.current_leader.leader_epoch;
                }
            }
        }

        self.poll_state.buffered_records.extend(fetched);
        debug!(
            buffered_records = self.poll_state.buffered_records.len(),
            "buffered fetched consumer records"
        );
        Ok(())
    }
}