kafkit-client 0.1.0

Kafka 4.0+ pure Rust client.
Documentation
use super::*;

impl ConsumerRuntime {
    pub(crate) async fn fetch_from_leader(
        &mut self,
        leader_id: i32,
        partitions: Vec<AssignedPartition>,
    ) -> AnyResult<()> {
        trace!(
            leader_id,
            partition_count = partitions.len(),
            "fetching records from leader"
        );
        let client_id = self.config.client_id.clone();
        let version = {
            let connection = self.leader_connection(leader_id).await?;
            connection
                .version_with_cap::<kafka_protocol::messages::FetchRequest>(FETCH_VERSION_CAP)?
        };
        let request = build_fetch_request(&partitions, version, &self.config)?;
        let connection = self.leader_connection(leader_id).await?;
        let response: FetchResponse = connection
            .send_request::<kafka_protocol::messages::FetchRequest>(&client_id, version, &request)
            .await?;

        if let Some(error) = response.error_code.err() {
            if error.is_retriable() {
                self.connections.metadata.invalidate_all();
                self.connections.leader_connections.clear();
                return Err(error.into());
            }
            bail!("fetch failed: {error}");
        }

        self.process_fetch_response(response, version)?;
        Ok(())
    }

    pub(crate) fn process_fetch_response(
        &mut self,
        response: FetchResponse,
        version: i16,
    ) -> AnyResult<()> {
        let mut fetched = Vec::new();

        for topic in response.responses {
            let topic_name = if version >= 13 && !topic.topic_id.is_nil() {
                self.connections
                    .metadata
                    .topic_name(&topic.topic_id)
                    .cloned()
                    .unwrap_or_else(|| topic.topic.0.to_string())
            } else {
                topic.topic.0.to_string()
            };

            for partition in topic.partitions {
                let key = TopicPartitionKey::new(topic_name.clone(), partition.partition_index);
                if let Some(error) = partition.error_code.err() {
                    if error.is_retriable() {
                        self.connections.metadata.invalidate_topic(&topic_name);
                        return Err(error.into());
                    }
                    bail!(
                        "fetch failed for {}:{}: {}",
                        topic_name,
                        partition.partition_index,
                        error
                    );
                }

                let Some(records) = partition.records else {
                    continue;
                };
                if records.is_empty() {
                    continue;
                }

                let aborted_transactions = partition.aborted_transactions.unwrap_or_default();
                let mut bytes = records;
                let batches = RecordBatchDecoder::decode_all(&mut bytes)?;
                for batch in batches {
                    for record in batch.records {
                        if record.control {
                            continue;
                        }
                        if self.config.isolation_level
                            == crate::config::IsolationLevel::ReadCommitted
                            && aborted_transactions.iter().any(|aborted| {
                                record.transactional
                                    && record.producer_id == aborted.producer_id
                                    && record.offset >= aborted.first_offset
                            })
                        {
                            continue;
                        }
                        let offset = record.offset;
                        fetched.push(ConsumerRecord {
                            topic: topic_name.clone(),
                            partition: partition.partition_index,
                            offset,
                            timestamp: record.timestamp,
                            headers: record
                                .headers
                                .into_iter()
                                .map(|(key, value)| RecordHeader {
                                    key: key.to_string(),
                                    value,
                                })
                                .collect(),
                            key: record.key,
                            value: record.value,
                        });
                        if let Some(assigned) = self.assignment_state.assignment.get_mut(&key) {
                            assigned.fetch_offset = offset + 1;
                        }
                    }
                }

                if let Some(assigned) = self.assignment_state.assignment.get_mut(&key) {
                    assigned.leader_epoch = partition.current_leader.leader_epoch;
                }
            }
        }

        self.poll_state.buffered_records.extend(fetched);
        debug!(
            buffered_records = self.poll_state.buffered_records.len(),
            "buffered fetched consumer records"
        );
        Ok(())
    }
}