kafkit-client 0.1.9

Kafka 4.0+ pure Rust client.
Documentation
use std::collections::{BTreeMap, HashMap};

use anyhow::{Context, Result};
use kafka_protocol::error::{ParseResponseErrorCode, ResponseError};
use kafka_protocol::messages::fetch_request::{FetchPartition, FetchTopic};
use kafka_protocol::messages::find_coordinator_response::FindCoordinatorResponse;
use kafka_protocol::messages::list_offsets_request::{ListOffsetsPartition, ListOffsetsTopic};
use kafka_protocol::messages::offset_commit_request::{
    OffsetCommitRequestPartition, OffsetCommitRequestTopic,
};
use kafka_protocol::messages::{ListOffsetsRequest, OffsetCommitRequest};
use kafka_protocol::protocol::StrBytes;
use uuid::Uuid;

use crate::config::ConsumerConfig;
use crate::metadata::{BrokerAddress, MetadataCache};
use crate::network::duration_to_i32_ms;
use crate::types::{AssignedPartition, CommitOffset, TopicPartitionKey};
use crate::{BrokerError, Error};

pub enum CoordinatorLookupResult {
    Found(BrokerAddress),
    Retry(ResponseError),
}

pub fn build_offset_commit_request(
    group_id: &str,
    member_id: Option<&str>,
    member_epoch: i32,
    group_instance_id: Option<&str>,
    offsets: &[CommitOffset],
    assignment: &HashMap<TopicPartitionKey, AssignedPartition>,
) -> Result<OffsetCommitRequest> {
    let mut topics = BTreeMap::<String, Vec<OffsetCommitRequestPartition>>::new();
    for offset in offsets {
        let leader_epoch = assignment
            .get(&TopicPartitionKey::new(
                offset.topic.clone(),
                offset.partition,
            ))
            .map(|assigned| assigned.leader_epoch)
            .unwrap_or(-1);
        topics.entry(offset.topic.clone()).or_default().push(
            OffsetCommitRequestPartition::default()
                .with_partition_index(offset.partition)
                .with_committed_offset(offset.offset)
                .with_committed_leader_epoch(leader_epoch)
                .with_committed_metadata(None),
        );
    }

    Ok(OffsetCommitRequest::default()
        .with_group_id(StrBytes::from_string(group_id.to_owned()).into())
        .with_generation_id_or_member_epoch(member_epoch)
        .with_member_id(StrBytes::from_string(
            member_id.unwrap_or_default().to_owned(),
        ))
        .with_group_instance_id(
            group_instance_id.map(|value| StrBytes::from_string(value.to_owned())),
        )
        .with_topics(
            topics
                .into_iter()
                .map(|(topic, partitions)| {
                    OffsetCommitRequestTopic::default()
                        .with_name(StrBytes::from_string(topic).into())
                        .with_partitions(partitions)
                })
                .collect(),
        ))
}

pub fn build_list_offsets_request(
    partitions: Vec<TopicPartitionKey>,
    metadata: &MetadataCache,
    version: i16,
    timestamp: i64,
    isolation_level: i8,
) -> Result<ListOffsetsRequest> {
    let timestamps = partitions
        .into_iter()
        .map(|key| (key, timestamp))
        .collect::<HashMap<_, _>>();
    build_list_offsets_request_with_timestamps(metadata, version, &timestamps, isolation_level)
}

pub fn build_list_offsets_request_with_timestamps(
    metadata: &MetadataCache,
    version: i16,
    timestamps: &HashMap<TopicPartitionKey, i64>,
    isolation_level: i8,
) -> Result<ListOffsetsRequest> {
    let mut topics = BTreeMap::<String, Vec<ListOffsetsPartition>>::new();
    for (key, timestamp) in timestamps {
        let leader_epoch = metadata
            .partition(&key.topic, key.partition)
            .map(|partition| partition.leader_epoch)
            .unwrap_or(-1);
        topics.entry(key.topic.clone()).or_default().push(
            ListOffsetsPartition::default()
                .with_partition_index(key.partition)
                .with_current_leader_epoch(leader_epoch)
                .with_timestamp(*timestamp),
        );
    }

    let mut request = ListOffsetsRequest::default()
        .with_replica_id((-1).into())
        .with_isolation_level(isolation_level)
        .with_topics(
            topics
                .into_iter()
                .map(|(topic, partitions)| {
                    ListOffsetsTopic::default()
                        .with_name(StrBytes::from_string(topic).into())
                        .with_partitions(partitions)
                })
                .collect(),
        );
    if version >= 10 {
        request = request.with_timeout_ms(0);
    }
    Ok(request)
}

pub fn build_fetch_request(
    partitions: &[AssignedPartition],
    version: i16,
    config: &ConsumerConfig,
) -> Result<kafka_protocol::messages::FetchRequest> {
    let mut topics = BTreeMap::<(String, Uuid), Vec<FetchPartition>>::new();
    for assigned in partitions {
        topics
            .entry((assigned.key.topic.clone(), assigned.topic_id))
            .or_default()
            .push(
                FetchPartition::default()
                    .with_partition(assigned.key.partition)
                    .with_current_leader_epoch(assigned.leader_epoch)
                    .with_fetch_offset(assigned.fetch_offset)
                    .with_last_fetched_epoch(-1)
                    .with_log_start_offset(-1)
                    .with_partition_max_bytes(config.partition_max_bytes),
            );
    }

    let topics = topics
        .into_iter()
        .map(|((topic, topic_id), partitions)| {
            let mut fetch_topic = FetchTopic::default().with_partitions(partitions);
            if version >= 13 {
                fetch_topic = fetch_topic.with_topic_id(topic_id);
            } else {
                fetch_topic = fetch_topic.with_topic(StrBytes::from_string(topic).into());
            }
            fetch_topic
        })
        .collect();

    Ok(kafka_protocol::messages::FetchRequest::default()
        .with_replica_id((-1).into())
        .with_max_wait_ms(duration_to_i32_ms(config.fetch_max_wait)?)
        .with_min_bytes(config.fetch_min_bytes)
        .with_max_bytes(config.fetch_max_bytes)
        .with_isolation_level(config.isolation_level.as_protocol_value())
        .with_session_id(0)
        .with_session_epoch(-1)
        .with_topics(topics)
        .with_forgotten_topics_data(Vec::new())
        .with_rack_id(StrBytes::from_string(
            config.rack_id.clone().unwrap_or_default(),
        )))
}

pub fn assignment_snapshot_by_topic_id(
    assignment: &HashMap<TopicPartitionKey, AssignedPartition>,
) -> BTreeMap<Uuid, Vec<i32>> {
    let mut grouped = BTreeMap::<Uuid, Vec<i32>>::new();
    for assigned in assignment.values() {
        grouped
            .entry(assigned.topic_id)
            .or_default()
            .push(assigned.key.partition);
    }

    for partitions in grouped.values_mut() {
        partitions.sort_unstable();
    }

    grouped
}

pub fn group_topic_partitions(partitions: Vec<(String, i32)>) -> BTreeMap<String, Vec<i32>> {
    let mut grouped = BTreeMap::<String, Vec<i32>>::new();
    for (topic, partition) in partitions {
        grouped.entry(topic).or_default().push(partition);
    }
    for partitions in grouped.values_mut() {
        partitions.sort_unstable();
    }
    grouped
}

pub fn parse_find_coordinator_response(
    response: FindCoordinatorResponse,
    version: i16,
) -> Result<CoordinatorLookupResult> {
    if version >= 4 {
        let coordinator = response
            .coordinators
            .into_iter()
            .next()
            .context("find coordinator returned no coordinator entries")?;
        if let Some(error) = coordinator.error_code.err() {
            if error.is_retriable() {
                return Ok(CoordinatorLookupResult::Retry(error));
            }
            return Err(Error::Broker(BrokerError::response(
                "find_coordinator",
                Some(coordinator.key.to_string()),
                error,
            ))
            .into());
        }
        return Ok(CoordinatorLookupResult::Found(BrokerAddress::new(
            coordinator.host.to_string(),
            u16::try_from(coordinator.port)
                .with_context(|| format!("invalid coordinator port {}", coordinator.port))?,
        )));
    }

    if let Some(error) = response.error_code.err() {
        if error.is_retriable() {
            return Ok(CoordinatorLookupResult::Retry(error));
        }
        return Err(Error::Broker(BrokerError::response(
            "find_coordinator",
            None::<String>,
            error,
        ))
        .into());
    }
    Ok(CoordinatorLookupResult::Found(BrokerAddress::new(
        response.host.to_string(),
        u16::try_from(response.port)
            .with_context(|| format!("invalid coordinator port {}", response.port))?,
    )))
}

pub fn is_retriable_error(error: &anyhow::Error) -> bool {
    error
        .downcast_ref::<ResponseError>()
        .is_some_and(ResponseError::is_retriable)
}